Skip to content
Snippets Groups Projects
test_email.py 7.95 KiB
Newer Older
Stéphane Diemer's avatar
Stéphane Diemer committed
#!/usr/bin/env python3
Checks that emails can be sent.
from ipaddress import ip_address
from pathlib import Path
import random
import socket
Stéphane Diemer's avatar
Stéphane Diemer committed
import subprocess
import sys
Stéphane Diemer's avatar
Stéphane Diemer committed

import spf
sys.path.append(str(Path(__file__).parents[1].resolve()))
Florent Thiery's avatar
Florent Thiery committed

# pylint: disable=wrong-import-position
from utilities import logging as lg  # noqa: E402
from utilities.config import load_conf  # noqa: E402
from utilities.network import get_ip  # noqa: E402
Stéphane Diemer's avatar
Stéphane Diemer committed

def check_listen() -> tuple:
    """Check that Postfix is listening on 127.0.0.1:25.
Stéphane Diemer's avatar
Stéphane Diemer committed

    :return: Exit return codes
    :rtype: tuple
    warnings = 0
    errors = 0
    # get listening state from ss
    status, out = subprocess.getstatusoutput("ss -pant | grep master | grep ':25'")
    if status != 0 or ("127.0.0.1:25" not in out and "[::1]:25" not in out):
        lg.warning("Postfix is not listening on localhost:25")
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        warnings += 1
        lg.success("Postfix is listening on localhost:25")
Stéphane Diemer's avatar
Stéphane Diemer committed

    return warnings, errors
Stéphane Diemer's avatar
Stéphane Diemer committed

Stéphane Diemer's avatar
Stéphane Diemer committed

def check_relay(relay_host: str, relay_port: str, domain: str) -> tuple:
    """Check that Postfix is not an open relay.

    :param relay_host: Hostname or IP address of relay host
    :type relay_host: str
    :param relay_port: Port of relay host
    :type relay_port: str
    :param domain: Domain name under which mails will be send
    :type domain: str
    :return: Exit return codes
    :rtype: tuple
    warnings = 0
    errors = 0

    # get relayhost value from Postfix config
    status, out = subprocess.getstatusoutput("grep -E '^relayhost' /etc/postfix/main.cf")
            out.replace("relayhost", "").strip(" \t=").replace("[", "").replace("]", "")
        if Path("/etc/mailname").exists():
            with open("/etc/mailname", "r") as mailname:
                myorigin = mailname.read().strip()
        else:
            out = subprocess.getoutput("grep -E '^myorigin' /etc/postfix/main.cf")
            myorigin = out.replace("myorigin", "").strip()
        # possible origin names
        origins = (
            domain or None,
            socket.gethostname(),
            socket.getfqdn(),
        )
        if myorigin not in origins:
            lg.warning('"myorigin" setting does not contain a valid domain')
            warnings += 1
    relay = "{}:{}".format(relay_host, relay_port) if relay_port else relay_host
    if relay != configured_relay:
        lg.error("STMP relay must be {}".format(relay))
        errors += 1
    if not errors and not warnings:
        lg.success("STMP relay is properly set")
    return warnings, errors
def check_send(sender: str) -> tuple:
    """Check that Postfix can send email.

    :param sender: Sender mail address
    :type sender: str
    :return: Exit return codes
    :rtype: tuple
    warnings = 0
    errors = 0
    # send email
    email = "noreply+{}-{}@ubicast.eu".format(time.time(), random.randint(0, 1000))
    if sender:
        sender = "-a 'From: {}' ".format(sender)
    else:
        lg.info("Sender address is not set")
    cmd = "echo 'test email' | mail -s 'Email used to test configuration.' {}{}".format(
        sender, email
    )
    subprocess.getoutput(cmd)
    if Path("/var/log/maillog").is_file():
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        cmd = "grep '{}' /var/log/maillog".format(email)
    elif Path("/var/log/mail.log").is_file():
        cmd = "grep '{}' /var/log/mail.log".format(email)
        lg.info("/var/log/mail.log not found, trying journalctl")
        cmd = "journalctl -t postfix/smtp | grep {}".format(email)
    lg.log("Using following command to search for sending log:\n{}".format(cmd))

    # init vars
    timeout = 120
    waited = 0
    delay = 1
    timed_out = False
    out = ""
    lg.log("Email sending timeout is {} seconds.".format(timeout))
    sys.stdout.write("Waiting for sending log")
    sys.stdout.flush()
    while not timed_out:
        # wait
        time.sleep(delay)
        waited += delay
        delay *= 2
        # run command
        status, out = subprocess.getstatusoutput(cmd)
        # log loop
        sys.stdout.write(".")
        sys.stdout.flush()
            out = out.strip().split("\n")[-1]
            if "status=deferred" not in out:
        timed_out = waited >= timeout
    sys.stdout.write("\n")

    # check if the sending has timed out
    if timed_out:
        lg.error("Failed to send email (timed out).")
            lg.info("> sending log line:\n{}".format(out))
            lg.info("> no log entry found.")
    elif "bounced" in out or "you tried to reach does not exist" in out:
        lg.error("Failed to send email")
        lg.info("> sending log line:\n{}".format(out))
        errors += 1

    if not errors:
        lg.success("Can send email")

    return warnings, errors

def check_spf(ip_addr: str, sender: str, domain: str) -> tuple:
    """Check that SPF records passes.

    :param ip_addr: Host ip address of server or relay
    :type ip_addr: str
    :param sender: Sender mail address
    :type sender: str
    :param domain: Domain name under which mails will be send
    :type domain: str
    :return: Exit return codes
    :rtype: tuple
    """

    warnings = 0
    errors = 0

    if ip_address(ip_addr).is_private:
        lg.info("{} is a private address, cannot check SPF".format(ip_addr))
    elif ip_addr and sender:
        # check spf
        result, _ = spf.check2(i=ip_addr, s=domain, h="")
        if result in ("pass", "neutral"):
            lg.success("SPF for {} in {}: {}".format(ip_addr, domain, result))
        elif result == "none":
            lg.info("SPF for {} in {}: {}".format(ip_addr, domain, result))
            lg.warning("SPF for {} in {}: {}".format(ip_addr, domain, result))
            warnings += 1
    else:
        lg.info("IP or sender not set, cannot check SPF")

    return warnings, errors


def main():
    """Run all checks and exits with corresponding exit code."""

    warnings = 0
    errors = 0
    lg.log("Checking email settings:")
    if not Path("/etc/postfix").exists():
        lg.info("postfix is not installed")
Stéphane Diemer's avatar
Stéphane Diemer committed

    # get settings
    conf = load_conf()
    relay = conf.get("EMAIL_SMTP_SERVER", "").replace("[", "").replace("]", "")
    relay_host = relay.split(":")[0] if ":" in relay else relay
    relay_port = relay.split(":")[-1] if ":" in relay else ""
    ip_addr = (
        (socket.gethostbyname(relay_host) if relay_host else None)
        or conf.get("NETWORK_IP_NAT")
        or conf.get("NETWORK_IP")
        or get_ip()
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    sender = conf.get("EMAIL_SENDER") or ""
    if not sender and Path("/etc/postfix/generic").exists():
        with open("/etc/postfix/generic") as sender_fo:
            sender = sender_fo.readline().split()[-1]
    domain = sender.split("@")[-1] or None

    # check that we are not an open relay
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    check_warn, check_err = check_listen()
    warnings += check_warn if check_warn else warnings
    errors += check_err if check_err else errors

    # check that relayhost is correct
    check_warn, check_err = check_relay(relay_host, relay_port, domain)
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    warnings += check_warn if check_warn else warnings
    errors += check_err if check_err else errors

    # check that we can send emails
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    check_warn, check_err = check_send(sender)
    warnings += check_warn if check_warn else warnings
    errors += check_err if check_err else errors

    # check that spf record is correct
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    check_warn, check_err = check_spf(ip_addr, sender, domain)
    warnings += check_warn if check_warn else warnings
    errors += check_err if check_err else errors

    if errors:
        exit(1)
    elif warnings:
        exit(3)

    exit(0)
if __name__ == "__main__":
    main()