#!/usr/bin/env python3 """ Criticality: High Checks that emails can be sent. """ from pathlib import Path import os import subprocess import sys import random import time # install spf lib if not present try: import spf except ImportError: subprocess.check_call(["apt-get", "-qq", "-y", "install", "python3-spf"]) import spf # install netstat if not present if subprocess.call(["which", "netstat"], stdout=subprocess.DEVNULL) != 0: subprocess.check_call(["apt-get", "-qq", "-y", "install", "net-tools"]) sys.path.append(str(Path(__file__).parents[1].resolve())) # pylint: disable=wrong-import-position from envsetup import utils as u # noqa: E402 def check_listen() -> int: """Check that Postfix is listening on 127.0.0.1:25. :return: Exit return code :rtype: int """ print("Checking that postfix is listening locally:") # get listening state from netstat status, out = subprocess.getstatusoutput("netstat -pant | grep master | grep ':25'") if status != 0 or "127.0.0.1:25" not in out: u.error("Postfix is not listening on 127.0.0.1:25") return 1 u.success("Postfix is listening on 127.0.0.1:25") return 0 def check_relay(conf: dict) -> int: """Check that Postfix is not an open relay. :param conf: EnvSetup configuration settings :type conf: dict :return: Exit return code :rtype: int """ print("Checking that SMTP relay conforms to conf:") # get relayhost value from Postfix config status, out = subprocess.getstatusoutput("grep relayhost /etc/postfix/main.cf") if status == 0: configured_relay = ( out[len("relayhost"):].strip(" \t=").replace("[", "").replace("]", "") ) else: configured_relay = "" if not configured_relay: # check public ip address ip_addr = conf.get("NETWORK_IP_NAT") if not ip_addr: ip_addr = conf.get("NETWORK_IP") if not ip_addr: u.warning("Cannot determine public IP address") return 3 # check domain origin with open("/etc/mailname", "r") as mailname: data = mailname.read().strip() if data not in ("ubicast.tv", "ubicast.eu"): u.warning("/etc/mailname does not contain ubicast.eu or ubicast.tv") return 3 # check spf result, _ = spf.check2(i=ip_addr, s="support@ubicast.eu", h="") if result != "pass": u.error("SPF record for {} in ubicast.eu is missing".format(ip_addr)) return 3 # get relayhost value from envsetup config conf_relay = conf.get("EMAIL_SMTP_SERVER", "").replace("[", "").replace("]", "") if conf_relay != configured_relay: u.error("STMP relay must be {}".format(conf_relay)) return 3 u.success("STMP relay is properly set") return 0 def check_send_test_email(conf: dict) -> int: """Check that Postfix can send email. :return: Exit return code :rtype: int """ print("Checking Postfix can send email:") # check if s-nail is installed, if not set from address sender = "" status, out = subprocess.getstatusoutput("dpkg -l s-nail | grep -E '^ii'") if status == 0: u.warning("The package 's-nail' is installed, the email sender address will be ignored.") else: sender = conf.get("EMAIL_SENDER") or "" # send email email = "noreply+{}-{}@ubicast.eu".format(time.time(), random.randint(0, 1000)) u.info("Sending test email to '{}'".format(email)) if sender: u.info("Sender address is '{}'".format(sender)) sender = "-a 'From: {}' ".format(sender) else: u.info("Sender address is not set") cmd = "echo 'test email' | mail -s 'Email used to test configuration.' {}{}".format(sender, email) subprocess.getstatusoutput(cmd) # init vars timeout = 120 waited = 1 delay = 2 out = "" # find logs if os.path.isfile("/var/log/mail.log"): cmd = "grep '{}' /var/log/mail.log".format(email) else: u.info("/var/log/mail.log not found, trying journalctl") cmd = "journalctl -u postfix | grep {}".format(email) # logs polling time.sleep(1) while True: status, out = subprocess.getstatusoutput(cmd) # found if status == 0: out = out.strip().split("\n")[-1] if "status=deferred" not in out: u.info("Log entry found after {} seconds".format(waited)) break # timeout if waited >= timeout: u.error("Failed to send email.") u.info( "No log entry found after {} seconds using command: {}".format( waited, cmd ) ) return 1 # not found yet u.info( "No log entry found after {} seconds, waiting {} more seconds...".format( waited, delay ) ) # wait before next iteration time.sleep(delay) waited += delay delay *= 2 # check output for errors if "bounced" in out or "you tried to reach does not exist" in out: u.error("Failed to send email") u.info("Sending log line:\n{}".format(out)) return 1 u.success("Email sent.") return 0 def main(): """Run all checks and exits with corresponding exit code.""" if not os.path.exists("/etc/postfix"): u.error("Postfix dir does not exists, please install postfix") sys.exit(1) conf = u.load_conf() rcode = check_listen() if rcode == 0: rcode = check_relay(conf) if check_send_test_email(conf) == 1: rcode = 1 sys.exit(rcode) if __name__ == "__main__": main()