diff --git a/tests/test_email.py b/tests/test_email.py index 60ebfaa2b208161e6134833ad0c2b6b3a35eeb3a..2a1ccfea9f441d19519777c122ee2510cc5fa3db 100755 --- a/tests/test_email.py +++ b/tests/test_email.py @@ -5,9 +5,10 @@ Criticality: High Checks that emails can be sent. """ -import os +from ipaddress import ip_address from pathlib import Path import random +import socket import subprocess import sys import time @@ -20,107 +21,100 @@ sys.path.append(str(Path(__file__).parents[1].resolve())) from envsetup import utils as u # noqa: E402 -def check_listen() -> int: +def check_listen() -> tuple: """Check that Postfix is listening on 127.0.0.1:25. - :return: Exit return code - :rtype: int + :return: Exit return codes + :rtype: tuple """ - print("Checking that postfix is listening locally:") + warnings = 0 + errors = 0 - # get listening state from netstat - status, out = subprocess.getstatusoutput("netstat -pant | grep master | grep ':25'") + # get listening state from ss + status, out = subprocess.getstatusoutput("ss -pant | grep master | grep ':25'") - if status != 0 or "127.0.0.1:25" not in out: - u.error("Postfix is not listening on 127.0.0.1:25") - return 1 + if status != 0 or ("127.0.0.1:25" not in out and "[::1]:25" not in out): + u.error("Postfix is not listening on localhost:25") + errors += 1 - u.success("Postfix is listening on 127.0.0.1:25") + u.success("Postfix is listening on localhost:25") - return 0 + return warnings, errors -def check_relay(conf: dict) -> int: +def check_relay(relay: str, domain: str) -> tuple: """Check that Postfix is not an open relay. - :param conf: EnvSetup configuration settings - :type conf: dict - :return: Exit return code - :rtype: int + :param relay: Hostname or IP address of relay host + :type relay: str + :param domain: Domain name under which mails will be send + :type domain: str + :return: Exit return codes + :rtype: tuple """ - print("Checking that SMTP relay conforms to conf:") + warnings = 0 + errors = 0 # get relayhost value from Postfix config status, out = subprocess.getstatusoutput("grep relayhost /etc/postfix/main.cf") if status == 0: configured_relay = ( - out[len("relayhost"):].strip(" \t=").replace("[", "").replace("]", "") + out.replace("relayhost", "").strip(" \t=").replace("[", "").replace("]", "") ) else: configured_relay = "" if not configured_relay: - # check public ip address - ip_addr = conf.get("NETWORK_IP_NAT") or conf.get("NETWORK_IP") - if not ip_addr: - u.warning("Cannot determine public IP address") - return 3 # check domain origin - if os.path.exists("/etc/mailname"): + if Path("/etc/mailname").exists(): with open("/etc/mailname", "r") as mailname: myorigin = mailname.read().strip() else: out = subprocess.getoutput("grep myorigin /etc/postfix/main.cf") myorigin = out.replace("myorigin", "").strip() - if myorigin not in ("ubicast.tv", "ubicast.eu"): - u.warning("The \"myorigin\" setting does not contain ubicast.eu or ubicast.tv") - return 3 - # check spf - result, _ = spf.check2(i=ip_addr, s="support@ubicast.eu", h="") - if result != "pass": - u.error("SPF record for {} in ubicast.eu is missing".format(ip_addr)) - return 3 + # possible origin names + origins = set( + (domain or None, u.exec_cmd("hostname", log_output=False)[1] or None) + ) + if myorigin not in origins: + u.warning('"myorigin" setting does not contain a valid domain') + warnings += 1 - # get relayhost value from envsetup config - conf_relay = conf.get("EMAIL_SMTP_SERVER", "").replace("[", "").replace("]", "") + if relay != configured_relay: + u.error("STMP relay must be {}".format(relay)) + errors += 1 - if conf_relay != configured_relay: - u.error("STMP relay must be {}".format(conf_relay)) - return 3 + if not errors and not warnings: + u.success("STMP relay is properly set") - u.success("STMP relay is properly set") - return 0 + return warnings, errors -def check_send_test_email(conf: dict) -> int: +def check_send(sender: str) -> tuple: """Check that Postfix can send email. - :return: Exit return code - :rtype: int + :param sender: Sender mail address + :type sender: str + :return: Exit return codes + :rtype: tuple """ - print("Checking Postfix can send email:") - - # check if s-nail is installed, if not set from address - sender = "" - status, out = subprocess.getstatusoutput("dpkg -l s-nail | grep -E '^ii'") - if status == 0: - u.warning("The package 's-nail' is installed, the email sender address will be ignored.") - else: - sender = conf.get("EMAIL_SENDER") or "" + warnings = 0 + errors = 0 # send email email = "noreply+{}-{}@ubicast.eu".format(time.time(), random.randint(0, 1000)) - u.info("Sending test email to '{}'".format(email)) if sender: - u.info("Sender address is '{}'".format(sender)) sender = "-a 'From: {}' ".format(sender) else: - u.info("Sender address is not set") - cmd = "echo 'test email' | mail -s 'Email used to test configuration.' {}{}".format(sender, email) + u.warning("Sender address is not set") + warnings += 1 + cmd = "echo 'test email' | mail -s 'Email used to test configuration.' {}{}".format( + sender, email + ) subprocess.getoutput(cmd) # init vars @@ -130,7 +124,7 @@ def check_send_test_email(conf: dict) -> int: out = "" # find logs - if os.path.isfile("/var/log/mail.log"): + if Path("/var/log/mail.log").is_file(): cmd = "grep '{}' /var/log/mail.log".format(email) else: u.info("/var/log/mail.log not found, trying journalctl") @@ -144,23 +138,13 @@ def check_send_test_email(conf: dict) -> int: if status == 0: out = out.strip().split("\n")[-1] if "status=deferred" not in out: - u.info("Log entry found after {} seconds".format(waited)) break # timeout if waited >= timeout: u.error("Failed to send email.") - u.info( - "No log entry found after {} seconds using command: {}".format( - waited, cmd - ) - ) - return 1 - # not found yet - u.info( - "No log entry found after {} seconds, waiting {} more seconds...".format( - waited, delay - ) - ) + u.info("> no log entry found using command: {}".format(cmd)) + errors += 1 + break # wait before next iteration time.sleep(delay) waited += delay @@ -169,29 +153,102 @@ def check_send_test_email(conf: dict) -> int: # check output for errors if "bounced" in out or "you tried to reach does not exist" in out: u.error("Failed to send email") - u.info("Sending log line:\n{}".format(out)) - return 1 + u.info("> sending log line:\n{}".format(out)) + errors += 1 + + if not errors: + u.success("Can send email") + + return warnings, errors - u.success("Email sent.") - return 0 + +def check_spf(ip_addr: str, sender: str, domain: str) -> tuple: + """Check that SPF records passes. + + :param ip_addr: Host ip address of server or relay + :type ip_addr: str + :param sender: Sender mail address + :type sender: str + :param domain: Domain name under which mails will be send + :type domain: str + :return: Exit return codes + :rtype: tuple + """ + + warnings = 0 + errors = 0 + + if ip_address(ip_addr).is_private: + u.info("{} is a private address, cannot check SPF") + elif ip_addr and sender: + # check spf + result, _ = spf.check2(i=ip_addr, s=domain, h="") + if result in ("pass", "neutral"): + u.success("SPF for {} in {}: {}".format(ip_addr, domain, result)) + elif result == "none": + u.info("SPF for {} in {}: {}".format(ip_addr, domain, result)) + else: + u.warning("SPF for {} in {}: {}".format(ip_addr, domain, result)) + warnings += 1 + else: + u.info("IP or sender not set, cannot check SPF") + + return warnings, errors def main(): """Run all checks and exits with corresponding exit code.""" - if not os.path.exists("/etc/postfix"): - u.error("Postfix dir does not exists, please install postfix") - sys.exit(1) + warnings = 0 + errors = 0 - conf = u.load_conf() + print("Checking email settings:") - rcode = check_listen() - if rcode == 0: - rcode = check_relay(conf) - if check_send_test_email(conf) == 1: - rcode = 1 + if not Path("/etc/postfix").exists(): + u.info("postfix is not installed") + exit(2) - sys.exit(rcode) + # get settings + conf = u.load_conf() + relay = conf.get("EMAIL_SMTP_SERVER", "").replace("[", "").replace("]", "") + ip_addr = ( + (socket.gethostbyname(relay) if relay else None) + or conf.get("NETWORK_IP_NAT") + or conf.get("NETWORK_IP") + or None + ) + sender = conf.get("EMAIL_SENDER") or None + if not sender and Path("/etc/postfix/generic").exists(): + with open("/etc/postfix/generic") as sender_fo: + sender = sender_fo.readline().split()[-1] + domain = sender.split("@")[-1] or None + + # check that we are not an open relay + check_listen_warn, check_listen_err = check_listen() + warnings += check_listen_warn if check_listen_warn else warnings + errors += check_listen_err if check_listen_err else errors + + # check that relayhost is correct + check_relay_warn, check_relay_err = check_relay(relay, domain) + warnings += check_relay_warn if check_relay_warn else warnings + errors += check_relay_err if check_relay_err else errors + + # check that we can send emails + check_send_warn, check_send_err = check_send(sender) + warnings += check_send_warn if check_send_warn else warnings + errors += check_send_err if check_send_err else errors + + # check that spf record is correct + check_spf_warn, check_spf_err = check_spf(ip_addr, sender, domain) + warnings += check_spf_warn if check_spf_warn else warnings + errors += check_spf_err if check_spf_err else errors + + if errors: + exit(1) + elif warnings: + exit(3) + + exit(0) if __name__ == "__main__":