#!/usr/bin/env python3 """ Criticality: High Checks that emails can be sent. """ from ipaddress import ip_address from pathlib import Path import random import socket import subprocess import sys import time import spf sys.path.append(str(Path(__file__).parents[1].resolve())) # pylint: disable=wrong-import-position from envsetup import utils as u # noqa: E402 def check_listen() -> tuple: """Check that Postfix is listening on 127.0.0.1:25. :return: Exit return codes :rtype: tuple """ warnings = 0 errors = 0 # get listening state from ss status, out = subprocess.getstatusoutput("ss -pant | grep master | grep ':25'") if status != 0 or ("127.0.0.1:25" not in out and "[::1]:25" not in out): u.warning("Postfix is not listening on localhost:25") warnings += 1 else: u.success("Postfix is listening on localhost:25") return warnings, errors def check_relay(relay_host: str, relay_port: str, domain: str) -> tuple: """Check that Postfix is not an open relay. :param relay_host: Hostname or IP address of relay host :type relay_host: str :param relay_port: Port of relay host :type relay_port: str :param domain: Domain name under which mails will be send :type domain: str :return: Exit return codes :rtype: tuple """ warnings = 0 errors = 0 # get relayhost value from Postfix config status, out = subprocess.getstatusoutput("grep relayhost /etc/postfix/main.cf") if status == 0: configured_relay = ( out.replace("relayhost", "").strip(" \t=").replace("[", "").replace("]", "") ) else: configured_relay = "" if not configured_relay: # check domain origin if Path("/etc/mailname").exists(): with open("/etc/mailname", "r") as mailname: myorigin = mailname.read().strip() else: out = subprocess.getoutput("grep myorigin /etc/postfix/main.cf") myorigin = out.replace("myorigin", "").strip() # possible origin names origins = set( (domain or None, u.exec_cmd("hostname", log_output=False)[1] or None) ) if myorigin not in origins: u.warning('"myorigin" setting does not contain a valid domain') warnings += 1 relay = "{}:{}".format(relay_host, relay_port) if relay_port else relay_host if relay != configured_relay: u.error("STMP relay must be {}".format(relay)) errors += 1 if not errors and not warnings: u.success("STMP relay is properly set") return warnings, errors def check_send(sender: str) -> tuple: """Check that Postfix can send email. :param sender: Sender mail address :type sender: str :return: Exit return codes :rtype: tuple """ warnings = 0 errors = 0 # send email email = "noreply+{}-{}@ubicast.eu".format(time.time(), random.randint(0, 1000)) if sender: sender = "-a 'From: {}' ".format(sender) else: u.info("Sender address is not set") cmd = "echo 'test email' | mail -s 'Email used to test configuration.' {}{}".format( sender, email ) subprocess.getoutput(cmd) # find logs if Path("/var/log/maillog").is_file(): cmd = "grep '{}' /var/log/maillog".format(email) elif Path("/var/log/mail.log").is_file(): cmd = "grep '{}' /var/log/mail.log".format(email) else: u.info("/var/log/mail.log not found, trying journalctl") cmd = "journalctl -t postfix/smtp | grep {}".format(email) u.log("Using following command to search for sending log:\n{}".format(cmd)) # init vars timeout = 120 waited = 0 delay = 1 timed_out = False out = "" u.log("Email sending timeout is {} seconds.".format(timeout)) # logs polling sys.stdout.write("Waiting for sending log") sys.stdout.flush() while not timed_out: # wait time.sleep(delay) waited += delay delay *= 2 # run command status, out = subprocess.getstatusoutput(cmd) # log loop sys.stdout.write(".") sys.stdout.flush() # found if status == 0: out = out.strip().split("\n")[-1] if "status=deferred" not in out: break # timeout timed_out = waited >= timeout sys.stdout.write("\n") # check if the sending has timed out if timed_out: u.error("Failed to send email (timed out).") if out: u.info("> sending log line:\n{}".format(out)) else: u.info("> no log entry found.") errors += 1 # check output for errors elif "bounced" in out or "you tried to reach does not exist" in out: u.error("Failed to send email") u.info("> sending log line:\n{}".format(out)) errors += 1 if not errors: u.success("Can send email") return warnings, errors def check_spf(ip_addr: str, sender: str, domain: str) -> tuple: """Check that SPF records passes. :param ip_addr: Host ip address of server or relay :type ip_addr: str :param sender: Sender mail address :type sender: str :param domain: Domain name under which mails will be send :type domain: str :return: Exit return codes :rtype: tuple """ warnings = 0 errors = 0 if ip_address(ip_addr).is_private: u.info("{} is a private address, cannot check SPF".format(ip_addr)) elif ip_addr and sender: # check spf result, _ = spf.check2(i=ip_addr, s=domain, h="") if result in ("pass", "neutral"): u.success("SPF for {} in {}: {}".format(ip_addr, domain, result)) elif result == "none": u.info("SPF for {} in {}: {}".format(ip_addr, domain, result)) else: u.warning("SPF for {} in {}: {}".format(ip_addr, domain, result)) warnings += 1 else: u.info("IP or sender not set, cannot check SPF") return warnings, errors def main(): """Run all checks and exits with corresponding exit code.""" warnings = 0 errors = 0 print("Checking email settings:") if not Path("/etc/postfix").exists(): u.info("postfix is not installed") exit(2) # get settings conf = u.load_conf() relay = conf.get("EMAIL_SMTP_SERVER", "").replace("[", "").replace("]", "") relay_host = relay.split(":")[0] if ":" in relay else relay relay_port = relay.split(":")[-1] if ":" in relay else "" ip_addr = ( (socket.gethostbyname(relay_host) if relay_host else None) or conf.get("NETWORK_IP_NAT") or conf.get("NETWORK_IP") or u.get_ip() or None ) sender = conf.get("EMAIL_SENDER") or "" if not sender and Path("/etc/postfix/generic").exists(): with open("/etc/postfix/generic") as sender_fo: sender = sender_fo.readline().split()[-1] domain = sender.split("@")[-1] or None # check that we are not an open relay check_warn, check_err = check_listen() warnings += check_warn if check_warn else warnings errors += check_err if check_err else errors # check that relayhost is correct check_warn, check_err = check_relay(relay_host, relay_port, domain) warnings += check_warn if check_warn else warnings errors += check_err if check_err else errors # check that we can send emails check_warn, check_err = check_send(sender) warnings += check_warn if check_warn else warnings errors += check_err if check_err else errors # check that spf record is correct check_warn, check_err = check_spf(ip_addr, sender, domain) warnings += check_warn if check_warn else warnings errors += check_err if check_err else errors if errors: exit(1) elif warnings: exit(3) exit(0) if __name__ == "__main__": main()