Newer
Older

Stéphane Diemer
committed
Criticality: High
import time
sys.path.append(str(Path(__file__).parents[1].resolve()))
# pylint: disable=wrong-import-position
from utilities import logging as lg # noqa: E402
from utilities.config import load_conf # noqa: E402
from utilities.network import get_ip # noqa: E402
"""Check that Postfix is listening on 127.0.0.1:25.
:return: Exit return codes
:rtype: tuple
# get listening state from ss
status, out = subprocess.getstatusoutput("ss -pant | grep master | grep ':25'")
if status != 0 or ("127.0.0.1:25" not in out and "[::1]:25" not in out):
lg.warning("Postfix is not listening on localhost:25")
lg.success("Postfix is listening on localhost:25")
def check_relay(relay_host: str, relay_port: str, domain: str) -> tuple:
"""Check that Postfix is not an open relay.
:param relay_host: Hostname or IP address of relay host
:type relay_host: str
:param relay_port: Port of relay host
:type relay_port: str
:param domain: Domain name under which mails will be send
:type domain: str
:return: Exit return codes
:rtype: tuple
# get relayhost value from Postfix config
status, out = subprocess.getstatusoutput("grep -E '^relayhost' /etc/postfix/main.cf")
if status == 0:
configured_relay = (
out.replace("relayhost", "").strip(" \t=").replace("[", "").replace("]", "")
configured_relay = ""
if not configured_relay:
# check domain origin
with open("/etc/mailname", "r") as mailname:
myorigin = mailname.read().strip()
else:
out = subprocess.getoutput("grep -E '^myorigin' /etc/postfix/main.cf")
myorigin = out.replace("myorigin", "").strip()
origins = (
domain or None,
socket.gethostname(),
socket.getfqdn(),
lg.warning('"myorigin" setting does not contain a valid domain')
relay = "{}:{}".format(relay_host, relay_port) if relay_port else relay_host
lg.error("STMP relay must be {}".format(relay))
lg.success("STMP relay is properly set")
"""Check that Postfix can send email.
:param sender: Sender mail address
:type sender: str
:return: Exit return codes
:rtype: tuple
# send email
email = "noreply+{}-{}@ubicast.eu".format(time.time(), random.randint(0, 1000))
if sender:
sender = "-a 'From: {}' ".format(sender)
else:
cmd = "echo 'test email' | mail -s 'Email used to test configuration.' {}{}".format(
sender, email
)
subprocess.getoutput(cmd)
if Path("/var/log/maillog").is_file():
elif Path("/var/log/mail.log").is_file():
cmd = "grep '{}' /var/log/mail.log".format(email)
lg.info("/var/log/mail.log not found, trying journalctl")
cmd = "journalctl -t postfix/smtp | grep {}".format(email)
lg.log("Using following command to search for sending log:\n{}".format(cmd))
# init vars
timeout = 120
waited = 0
delay = 1
timed_out = False
out = ""
lg.log("Email sending timeout is {} seconds.".format(timeout))
# logs polling
sys.stdout.write("Waiting for sending log")
sys.stdout.flush()
while not timed_out:
# wait
time.sleep(delay)
waited += delay
delay *= 2
# run command
status, out = subprocess.getstatusoutput(cmd)
# log loop
sys.stdout.write(".")
sys.stdout.flush()
if status == 0:
out = out.strip().split("\n")[-1]
if "status=deferred" not in out:
timed_out = waited >= timeout
sys.stdout.write("\n")
# check if the sending has timed out
if timed_out:
lg.error("Failed to send email (timed out).")
lg.info("> sending log line:\n{}".format(out))
# check output for errors
elif "bounced" in out or "you tried to reach does not exist" in out:
lg.error("Failed to send email")
lg.info("> sending log line:\n{}".format(out))
def check_spf(ip_addr: str, sender: str, domain: str) -> tuple:
"""Check that SPF records passes.
:param ip_addr: Host ip address of server or relay
:type ip_addr: str
:param sender: Sender mail address
:type sender: str
:param domain: Domain name under which mails will be send
:type domain: str
:return: Exit return codes
:rtype: tuple
"""
warnings = 0
errors = 0
if ip_address(ip_addr).is_private:
lg.info("{} is a private address, cannot check SPF".format(ip_addr))
elif ip_addr and sender:
# check spf
result, _ = spf.check2(i=ip_addr, s=domain, h="")
if result in ("pass", "neutral"):
lg.success("SPF for {} in {}: {}".format(ip_addr, domain, result))
lg.info("SPF for {} in {}: {}".format(ip_addr, domain, result))
lg.warning("SPF for {} in {}: {}".format(ip_addr, domain, result))
lg.info("IP or sender not set, cannot check SPF")
def main():
"""Run all checks and exits with corresponding exit code."""
lg.log("Checking email settings:")
relay = conf.get("EMAIL_SMTP_SERVER", "").replace("[", "").replace("]", "")
relay_host = relay.split(":")[0] if ":" in relay else relay
relay_port = relay.split(":")[-1] if ":" in relay else ""
(socket.gethostbyname(relay_host) if relay_host else None)
or conf.get("NETWORK_IP_NAT")
or conf.get("NETWORK_IP")
if not sender and Path("/etc/postfix/generic").exists():
with open("/etc/postfix/generic") as sender_fo:
sender = sender_fo.readline().split()[-1]
domain = sender.split("@")[-1] or None
# check that we are not an open relay
check_warn, check_err = check_listen()
warnings += check_warn if check_warn else warnings
errors += check_err if check_err else errors
check_warn, check_err = check_relay(relay_host, relay_port, domain)
warnings += check_warn if check_warn else warnings
errors += check_err if check_err else errors
check_warn, check_err = check_send(sender)
warnings += check_warn if check_warn else warnings
errors += check_err if check_err else errors
# check that spf record is correct
check_warn, check_err = check_spf(ip_addr, sender, domain)
warnings += check_warn if check_warn else warnings
errors += check_err if check_err else errors
if errors:
exit(1)
elif warnings:
exit(3)
exit(0)
if __name__ == "__main__":
main()