Newer
Older

Stéphane Diemer
committed
Criticality: High
import time
sys.path.append(str(Path(__file__).parents[1].resolve()))
# pylint: disable=wrong-import-position
from envsetup import utils as u # noqa: E402
"""Check that Postfix is listening on 127.0.0.1:25.
:return: Exit return codes
:rtype: tuple
# get listening state from ss
status, out = subprocess.getstatusoutput("ss -pant | grep master | grep ':25'")
if status != 0 or ("127.0.0.1:25" not in out and "[::1]:25" not in out):
u.error("Postfix is not listening on localhost:25")
errors += 1
u.success("Postfix is listening on localhost:25")
def check_relay(relay: str, domain: str) -> tuple:
"""Check that Postfix is not an open relay.
:param relay: Hostname or IP address of relay host
:type relay: str
:param domain: Domain name under which mails will be send
:type domain: str
:return: Exit return codes
:rtype: tuple
# get relayhost value from Postfix config
status, out = subprocess.getstatusoutput("grep relayhost /etc/postfix/main.cf")
if status == 0:
configured_relay = (
out.replace("relayhost", "").strip(" \t=").replace("[", "").replace("]", "")
configured_relay = ""
if not configured_relay:
# check domain origin
with open("/etc/mailname", "r") as mailname:
myorigin = mailname.read().strip()
else:
out = subprocess.getoutput("grep myorigin /etc/postfix/main.cf")
myorigin = out.replace("myorigin", "").strip()
# possible origin names
origins = set(
(domain or None, u.exec_cmd("hostname", log_output=False)[1] or None)
)
if myorigin not in origins:
u.warning('"myorigin" setting does not contain a valid domain')
warnings += 1
if relay != configured_relay:
u.error("STMP relay must be {}".format(relay))
errors += 1
if not errors and not warnings:
u.success("STMP relay is properly set")
"""Check that Postfix can send email.
:param sender: Sender mail address
:type sender: str
:return: Exit return codes
:rtype: tuple
# send email
email = "noreply+{}-{}@ubicast.eu".format(time.time(), random.randint(0, 1000))
if sender:
sender = "-a 'From: {}' ".format(sender)
else:
u.warning("Sender address is not set")
warnings += 1
cmd = "echo 'test email' | mail -s 'Email used to test configuration.' {}{}".format(
sender, email
)
subprocess.getoutput(cmd)
timeout = 120
out = ""
# find logs
if Path("/var/log/mail.log").is_file():
cmd = "grep '{}' /var/log/mail.log".format(email)
u.info("/var/log/mail.log not found, trying journalctl")
cmd = "journalctl -t postfix/smtp | grep {}".format(email)
# logs polling
while True:
status, out = subprocess.getstatusoutput(cmd)
if status == 0:
out = out.strip().split("\n")[-1]
if "status=deferred" not in out:
if waited >= timeout:
u.error("Failed to send email.")
u.info("> no log entry found using command: {}".format(cmd))
errors += 1
break
# wait before next iteration
time.sleep(delay)
waited += delay
delay *= 2
# check output for errors
if "bounced" in out or "you tried to reach does not exist" in out:
u.error("Failed to send email")
u.info("> sending log line:\n{}".format(out))
errors += 1
if not errors:
u.success("Can send email")
return warnings, errors
def check_spf(ip_addr: str, sender: str, domain: str) -> tuple:
"""Check that SPF records passes.
:param ip_addr: Host ip address of server or relay
:type ip_addr: str
:param sender: Sender mail address
:type sender: str
:param domain: Domain name under which mails will be send
:type domain: str
:return: Exit return codes
:rtype: tuple
"""
warnings = 0
errors = 0
if ip_address(ip_addr).is_private:
u.info("{} is a private address, cannot check SPF".format(ip_addr))
elif ip_addr and sender:
# check spf
result, _ = spf.check2(i=ip_addr, s=domain, h="")
if result in ("pass", "neutral"):
u.success("SPF for {} in {}: {}".format(ip_addr, domain, result))
elif result == "none":
u.info("SPF for {} in {}: {}".format(ip_addr, domain, result))
else:
u.warning("SPF for {} in {}: {}".format(ip_addr, domain, result))
warnings += 1
else:
u.info("IP or sender not set, cannot check SPF")
return warnings, errors
def main():
"""Run all checks and exits with corresponding exit code."""
if not Path("/etc/postfix").exists():
u.info("postfix is not installed")
exit(2)
# get settings
conf = u.load_conf()
relay = conf.get("EMAIL_SMTP_SERVER", "").replace("[", "").replace("]", "")
ip_addr = (
(socket.gethostbyname(relay) if relay else None)
or conf.get("NETWORK_IP_NAT")
or conf.get("NETWORK_IP")
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
or None
)
sender = conf.get("EMAIL_SENDER") or None
if not sender and Path("/etc/postfix/generic").exists():
with open("/etc/postfix/generic") as sender_fo:
sender = sender_fo.readline().split()[-1]
domain = sender.split("@")[-1] or None
# check that we are not an open relay
check_listen_warn, check_listen_err = check_listen()
warnings += check_listen_warn if check_listen_warn else warnings
errors += check_listen_err if check_listen_err else errors
# check that relayhost is correct
check_relay_warn, check_relay_err = check_relay(relay, domain)
warnings += check_relay_warn if check_relay_warn else warnings
errors += check_relay_err if check_relay_err else errors
# check that we can send emails
check_send_warn, check_send_err = check_send(sender)
warnings += check_send_warn if check_send_warn else warnings
errors += check_send_err if check_send_err else errors
# check that spf record is correct
check_spf_warn, check_spf_err = check_spf(ip_addr, sender, domain)
warnings += check_spf_warn if check_spf_warn else warnings
errors += check_spf_err if check_spf_err else errors
if errors:
exit(1)
elif warnings:
exit(3)
exit(0)
if __name__ == "__main__":
main()