Newer
Older

Stéphane Diemer
committed
Criticality: High
import random
import time
# install spf lib if not present
subprocess.check_call(["apt-get", "-qq", "-y", "install", "python3-spf"])
# install netstat if not present
if subprocess.call(["which", "netstat"], stdout=subprocess.DEVNULL) != 0:
subprocess.check_call(["apt-get", "-qq", "-y", "install", "net-tools"])
sys.path.append(str(Path(__file__).parents[1].resolve()))
# pylint: disable=wrong-import-position
from envsetup import utils as u # noqa: E402
def check_listen() -> int:
"""Check that Postfix is listening on 127.0.0.1:25.
:return: Exit return code
:rtype: int
"""
print("Checking that postfix is listening locally:")
# get listening state from netstat
status, out = subprocess.getstatusoutput("netstat -pant | grep master | grep ':25'")
if status != 0 or "127.0.0.1:25" not in out:
u.error("Postfix is not listening on 127.0.0.1:25")
return 1
u.success("Postfix is listening on 127.0.0.1:25")
def check_relay(conf: dict) -> int:
"""Check that Postfix is not an open relay.
:param conf: EnvSetup configuration settings
:type conf: dict
:return: Exit return code
:rtype: int
"""
print("Checking that SMTP relay conforms to conf:")
# get relayhost value from Postfix config
status, out = subprocess.getstatusoutput("grep relayhost /etc/postfix/main.cf")
if status == 0:
configured_relay = (
out[len("relayhost"):].strip(" \t=").replace("[", "").replace("]", "")
configured_relay = ""
if not configured_relay:
# check public ip address
ip_addr = conf.get("NETWORK_IP_NAT")
if not ip_addr:
ip_addr = conf.get("NETWORK_IP")
if not ip_addr:
u.warning("Cannot determine public IP address")
# check domain origin
if os.path.exists("/etc/mailname"):
with open("/etc/mailname", "r") as mailname:
myorigin = mailname.read().strip()
else:
out = subprocess.getoutput("grep myorigin /etc/postfix/main.cf")
myorigin = out.replace("myorigin", "").strip()
if myorigin not in ("ubicast.tv", "ubicast.eu"):
u.warning("The \"myorigin\" setting does not contain ubicast.eu or ubicast.tv")
return 3
# check spf
result, _ = spf.check2(i=ip_addr, s="support@ubicast.eu", h="")
if result != "pass":
u.error("SPF record for {} in ubicast.eu is missing".format(ip_addr))
# get relayhost value from envsetup config
conf_relay = conf.get("EMAIL_SMTP_SERVER", "").replace("[", "").replace("]", "")
u.error("STMP relay must be {}".format(conf_relay))
u.success("STMP relay is properly set")
return 0
def check_send_test_email(conf: dict) -> int:
"""Check that Postfix can send email.
:return: Exit return code
:rtype: int
"""
print("Checking Postfix can send email:")
# check if s-nail is installed, if not set from address
sender = ""
status, out = subprocess.getstatusoutput("dpkg -l s-nail | grep -E '^ii'")
if status == 0:
u.warning("The package 's-nail' is installed, the email sender address will be ignored.")
else:
sender = conf.get("EMAIL_SENDER") or ""
# send email
email = "noreply+{}-{}@ubicast.eu".format(time.time(), random.randint(0, 1000))
u.info("Sending test email to '{}'".format(email))
if sender:
u.info("Sender address is '{}'".format(sender))
sender = "-a 'From: {}' ".format(sender)
else:
u.info("Sender address is not set")
cmd = "echo 'test email' | mail -s 'Email used to test configuration.' {}{}".format(sender, email)
subprocess.getoutput(cmd)
timeout = 120
out = ""
# find logs
if os.path.isfile("/var/log/mail.log"):
cmd = "grep '{}' /var/log/mail.log".format(email)
u.info("/var/log/mail.log not found, trying journalctl")
cmd = "journalctl -u postfix | grep {}".format(email)
# logs polling
while True:
status, out = subprocess.getstatusoutput(cmd)
if status == 0:
out = out.strip().split("\n")[-1]
if "status=deferred" not in out:
u.info("Log entry found after {} seconds".format(waited))
if waited >= timeout:
u.error("Failed to send email.")
u.info(
"No log entry found after {} seconds using command: {}".format(
waited, cmd
)
)
return 1
# not found yet
u.info(
"No log entry found after {} seconds, waiting {} more seconds...".format(
waited, delay
)
)
# wait before next iteration
time.sleep(delay)
waited += delay
delay *= 2
# check output for errors
if "bounced" in out or "you tried to reach does not exist" in out:
u.error("Failed to send email")
u.info("Sending log line:\n{}".format(out))
return 1
u.success("Email sent.")
return 0
def main():
"""Run all checks and exits with corresponding exit code."""
if not os.path.exists("/etc/postfix"):
u.error("Postfix dir does not exists, please install postfix")
sys.exit(1)
conf = u.load_conf()
rcode = check_listen()
if rcode == 0:
rcode = check_relay(conf)
if check_send_test_email(conf) == 1:
if __name__ == "__main__":
main()