Skip to content
Snippets Groups Projects
test_dns_records.py 4.1 KiB
Newer Older
#!/usr/bin/env python3
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
"""
Checks that DNS records are provided by the customer servers are correctly set
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
"""
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed

from pathlib import Path
import subprocess
import sys

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
sys.path.append(str(Path(__file__).parents[1].resolve()))

# pylint: disable=wrong-import-position
from envsetup import utils as u  # noqa: E402
Stéphane Diemer's avatar
Stéphane Diemer committed

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
def get_dns_servers() -> list:
    servers = list()
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    with open("/etc/resolv.conf", "r") as f:
        d = f.read().strip()
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        for l in d.split("\n"):
            if l.startswith("nameserver "):
                servers.append(l.split("nameserver ")[1])
    if servers == ["127.0.1.1"]:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        _, output = subprocess.getstatusoutput(
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
            "nmcli -f all device show | grep IP4.DNS | awk '{ print $2 }'"
        )
        servers.extend(output.split("\n"))
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    return servers
Stéphane Diemer's avatar
Stéphane Diemer committed

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
def get_result(output: str) -> str:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    for line in output.split("\n"):
        if "has address " in line:
            return line.split("has address ")[1]
Stéphane Diemer's avatar
Stéphane Diemer committed

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
def check_dns(hostname: str, expected_ip: str, resolvers: list) -> tuple:
    warnings = 0
    errors = 0

    for resolver in resolvers:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        status, output = subprocess.getstatusoutput(
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
            "host {} {}".format(hostname, resolver)
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        )
Stéphane Diemer's avatar
Stéphane Diemer committed
        if status == 0:
            address = get_result(output)
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
            if address == expected_ip:
                u.success("dns({}): {} -> {}".format(resolver, hostname, address))
            elif address == "127.0.0.1" and resolver == "127.0.0.53":
                u.success("dns({}): {} -> {}".format(resolver, hostname, address))
            else:
                u.error(
                    "dns({}): {} -> {} (should be {})".format(
                        resolver, hostname, address, expected_ip
                    )
                )
                errors += 1
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
            u.error("dns({}): cannot resolve {}".format(resolver, hostname))
            errors += 1

    return warnings, errors


def check_resolver(conf: dict, resolvers: list, ip: str) -> tuple:
    warnings = 0
    errors = 0

    conf_resolvers_keys = ("NETWORK_DNS1", "NETWORK_DNS2")
    for conf_resolver_key in conf_resolvers_keys:
        conf_resolver = conf.get(conf_resolver_key)
        if conf_resolver and conf_resolver not in resolvers:
            u.error("resolver {} not configured".format(conf_resolver))
            errors += 1

    if not ip and (not errors):
        u.info("no IP set in configuration , unable to test DNS")
        exit(2)

    return warnings, errors


def main():
    print("Check DNS settings:")

    warnings = 0
    errors = 0
    conf = u.load_conf()
    resolvers = get_dns_servers()
    ip = conf.get("NETWORK_IP_NAT") or conf.get("NETWORK_IP")

    check_resolver_warn, check_resolver_err = check_resolver(conf, resolvers, ip)
    if check_resolver_err:
        errors += check_resolver_err
    if check_resolver_warn:
        warnings += check_resolver_warn

    services_info = (
        ("MS_SERVER_NAME", "mediaserver", "ubicast-mediaserver"),
        ("MONITOR_SERVER_NAME", "monitor", "ubicast-monitor"),
        ("CM_SERVER_NAME", "mirismanager", "ubicast-skyreach"),
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    )

    for conf_name, default_domain, package in services_info:
        domain = conf.get(conf_name)
        resolution_ignored = conf.get("TESTER_DNS_RESOLUTION_IGNORED", "").split(",")
        if (
            domain
            and domain not in ("localhost", default_domain)
            and domain not in resolution_ignored
        ):
            # check that the service is installed on this system
            status, _ = subprocess.getstatusoutput("dpkg -s {}".format(package))
            if status == 0:
                u.info("- checking IP of {}".format(domain))
                check_dns_warn, check_dns_err = check_dns(domain, ip, resolvers)
                if check_dns_err:
                    errors += check_dns_err
                if check_dns_warn:
                    warnings += check_dns_warn
            else:
                u.info("{} not installed, skip {}".format(package, domain))

    if errors:
        exit(1)
    elif warnings:
        exit(3)


if __name__ == "__main__":
    main()