Skip to content
Snippets Groups Projects
test_email.py 5.1 KiB
Newer Older
Stéphane Diemer's avatar
Stéphane Diemer committed
#!/usr/bin/env python3
Checks that emails can be sent.
from pathlib import Path
Stéphane Diemer's avatar
Stéphane Diemer committed
import os
import subprocess
import sys
Stéphane Diemer's avatar
Stéphane Diemer committed

# install spf lib if not present
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
try:
    import spf
except ImportError:
    subprocess.check_call(["apt-get", "-qq", "-y", "install", "python3-spf"])
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    import spf

# install netstat if not present
if subprocess.call(["which", "netstat"]) != 0:
    subprocess.check_call(["apt-get", "-qq", "-y", "install", "net-tools"])
sys.path.append(str(Path(__file__).parents[1].resolve()))
Florent Thiery's avatar
Florent Thiery committed

# pylint: disable=wrong-import-position
from envsetup import utils as u  # noqa: E402
Stéphane Diemer's avatar
Stéphane Diemer committed

def check_listen() -> int:
    """Check that Postfix is listening on 127.0.0.1:25.
Stéphane Diemer's avatar
Stéphane Diemer committed

    :return: Exit return code
    :rtype: int
    """
    print("Checking that postfix is listening locally:")
    # get listening state from netstat
    status, out = subprocess.getstatusoutput("netstat -pant | grep master | grep ':25'")
    if status != 0 or "127.0.0.1:25" not in out:
        u.error("Postfix is not listening on 127.0.0.1:25")
        return 1
Stéphane Diemer's avatar
Stéphane Diemer committed

    u.success("Postfix is listening on 127.0.0.1:25")
Stéphane Diemer's avatar
Stéphane Diemer committed

Stéphane Diemer's avatar
Stéphane Diemer committed

Stéphane Diemer's avatar
Stéphane Diemer committed

def check_relay(conf: dict) -> int:
    """Check that Postfix is not an open relay.

    :param conf: EnvSetup configuration settings
    :type conf: dict
    :return: Exit return code
    :rtype: int
    """

    print("Checking that SMTP relay conforms to conf:")

    # get relayhost value from Postfix config
    status, out = subprocess.getstatusoutput("grep relayhost /etc/postfix/main.cf")

        configured_relay = (
            out[len("relayhost") :].strip(" \t=").replace("[", "").replace("]", "")
        )
        # check public ip address
        ip_addr = conf.get("NETWORK_IP_NAT")
        if not ip_addr:
            ip_addr = conf.get("NETWORK_IP")
        if not ip_addr:
            u.warning("Cannot determine public IP address")
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
            return 3
        # check domain origin
        with open("/etc/mailname", "r") as mailname:
            data = mailname.read().strip()
        if data not in ("ubicast.tv", "ubicast.eu"):
            u.warning("/etc/mailname does not contain ubicast.eu or ubicast.tv")
        result, _ = spf.check2(i=ip_addr, s="support@ubicast.eu", h="")
        if result != "pass":
            u.error("SPF record for {} in ubicast.eu is missing".format(ip_addr))
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
            return 3

    # get relayhost value from envsetup config
    conf_relay = conf.get("EMAIL_SMTP_SERVER", "").replace("[", "").replace("]", "")

Florent Thiery's avatar
Florent Thiery committed
    if conf_relay != configured_relay:
        u.error("STMP relay must be {}".format(conf_relay))
    u.success("STMP relay is properly set")
    return 0


def check_send_test_email() -> int:
    """Check that Postfix can send email.

    :return: Exit return code
    :rtype: int
    """

    print("Checking Postfix can send email:")
Stéphane Diemer's avatar
Stéphane Diemer committed

    # send email
    email = "noreply+{}-{}@ubicast.eu".format(time.time(), random.randint(0, 1000))
    u.info("Sending test email to '{}'".format(email))
    cmd = "echo 'test email' | mail -s 'test email' {}".format(email)
    subprocess.getstatusoutput(cmd)
    out = ""

    # find logs
    if os.path.isfile("/var/log/mail.log"):
        cmd = "grep '{}' /var/log/mail.log".format(email)
        u.info("/var/log/mail.log not found, trying journalctl")
        cmd = "journalctl -u postfix | grep {}".format(email)

    # logs polling
        status, out = subprocess.getstatusoutput(cmd)
            out = out.strip().split("\n")[-1]
            if "status=deferred" not in out:
                u.info("Log entry found after {} seconds".format(waited))
            u.error("Failed to send email.")
            u.info(
                "No log entry found after {} seconds using command: {}".format(
                    waited, cmd
                )
            )
        # not found yet
        u.info(
            "No log entry found after {} seconds, waiting {} more seconds...".format(
                waited, delay
            )
        )
        # wait before next iteration
        time.sleep(delay)
        waited += delay
        delay *= 2

    # check output for errors
    if "bounced" in out or "you tried to reach does not exist" in out:
        u.error("Failed to send email")
        u.info("Sending log line:\n{}".format(out))
    u.success("Email sent.")
    return 0


def main():
    """Run all checks and exits with corresponding exit code."""

    if not os.path.exists("/etc/postfix"):
        u.error("Postfix dir does not exists, please install postfix")
        sys.exit(1)

    conf = u.load_conf()

    rcode = check_listen()
    if rcode == 0:
        rcode = check_relay(conf)
        if check_send_test_email() == 1:
            rcode = 1
Stéphane Diemer's avatar
Stéphane Diemer committed

if __name__ == "__main__":
    main()