Skip to content
Snippets Groups Projects
test_dns_records.py 4.61 KiB
Newer Older
#!/usr/bin/env python3
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
"""
Checks that DNS records are provided by the customer servers are correctly set
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
"""
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed

from pathlib import Path
import re
import subprocess
import sys

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
sys.path.append(str(Path(__file__).parents[1].resolve()))

# pylint: disable=wrong-import-position
from envsetup import utils as u  # noqa: E402
Stéphane Diemer's avatar
Stéphane Diemer committed

def get_dns_servers() -> set:
    if subprocess.getstatusoutput("command -v nmcli")[0] == 0:
        # network-manager
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        _, output = subprocess.getstatusoutput(
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
            "nmcli -f all device show | grep IP4.DNS | awk '{ print $2 }'"
        )
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        servers = [l for l in output.split("\n") if l]
        # resolvconf
        with open("/etc/resolv.conf", "r") as f:
            d = f.read().strip()
            servers = [l.split()[1] for l in d.split("\n") if l.startswith("nameserver")]

    if "127.0.0.53" in servers:
        # systemd-resolved
        servers = list()
        _, output = subprocess.getstatusoutput("systemd-resolve --status")
        lines = [l.strip() for l in output.split("\n")]
        dns_line = False
        ip_pattern = re.compile("\d+\.\d+\.\d+\.\d+")
        for line in lines:
            if line.startswith("DNS Servers:"):
                dns_line = True
                servers.append(line.split()[-1])
            elif dns_line and ip_pattern.match(line):
                servers.append(line)
            else:
                dns_line = False

Stéphane Diemer's avatar
Stéphane Diemer committed

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
def get_result(output: str) -> str:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    for line in output.split("\n"):
        if "has address " in line:
            return line.split("has address ")[1]
Stéphane Diemer's avatar
Stéphane Diemer committed

def check_dns(hostname: str, expected_ip: str, resolvers: set) -> tuple:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    warnings = 0
    errors = 0

    for resolver in resolvers:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        status, output = subprocess.getstatusoutput(
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
            "host {} {}".format(hostname, resolver)
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        )
Stéphane Diemer's avatar
Stéphane Diemer committed
        if status == 0:
            address = get_result(output)
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
            if address == expected_ip:
                u.success("dns({}): {} -> {}".format(resolver, hostname, address))
            else:
                u.error(
                    "dns({}): {} -> {} (should be {})".format(
                        resolver, hostname, address, expected_ip
                    )
                )
                errors += 1
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
            u.error("dns({}): cannot resolve {}".format(resolver, hostname))
            errors += 1

    return warnings, errors


def check_resolver(conf: dict, resolvers: set, ip: str) -> tuple:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    warnings = 0
    errors = 0

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    conf_resolvers_keys = ("NETWORK_DNS1", "NETWORK_DNS2", "NETWORK_DNS3")
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    for conf_resolver_key in conf_resolvers_keys:
        conf_resolver = conf.get(conf_resolver_key)
        if conf_resolver and conf_resolver not in resolvers:
            u.warning("resolver {} not configured".format(conf_resolver))
            warnings += 1
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed

    if not ip and (not errors):
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        u.info("no resolver defined in envsetup configuration, unable to test DNS")
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        exit(2)

    return warnings, errors


def main():
    print("Check DNS settings:")

    warnings = 0
    errors = 0
    conf = u.load_conf()
    resolvers = get_dns_servers()
    ip = conf.get("NETWORK_IP_NAT") or conf.get("NETWORK_IP")

    check_resolver_warn, check_resolver_err = check_resolver(conf, resolvers, ip)
    if check_resolver_err:
        errors += check_resolver_err
    if check_resolver_warn:
        warnings += check_resolver_warn

    services_info = (
        ("MS_SERVER_NAME", "mediaserver", "ubicast-mediaserver"),
        ("MONITOR_SERVER_NAME", "monitor", "ubicast-monitor"),
        ("CM_SERVER_NAME", "mirismanager", "ubicast-skyreach"),
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    )

    for conf_name, default_domain, package in services_info:
        domain = conf.get(conf_name)
        resolution_ignored = conf.get("TESTER_DNS_RESOLUTION_IGNORED", "").split(",")
        if (
            domain
            and domain not in ("localhost", default_domain)
            and domain not in resolution_ignored
        ):
            # check that the service is installed on this system
            status, _ = subprocess.getstatusoutput("dpkg -s {}".format(package))
            if status == 0:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
                u.info("checking IP of {}".format(domain))
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
                check_dns_warn, check_dns_err = check_dns(domain, ip, resolvers)
                if check_dns_err:
                    errors += check_dns_err
                if check_dns_warn:
                    warnings += check_dns_warn
            else:
                u.info("{} not installed, skip {}".format(package, domain))

    if errors:
        exit(1)
    elif warnings:
        exit(3)


if __name__ == "__main__":
    main()