Skip to content
Snippets Groups Projects
test_dns_records.py 5.21 KiB
Newer Older
#!/usr/bin/env python3
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
"""
Checks that DNS records are provided by the customer servers are correctly set
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
"""
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed

from pathlib import Path
import re
import subprocess
import sys
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
import dns.resolver
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
try:
    import pydbus
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
except ImportError:
    exit(2)
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
sys.path.append(str(Path(__file__).parents[1].resolve()))

# pylint: disable=wrong-import-position
from envsetup import utils as u  # noqa: E402
Stéphane Diemer's avatar
Stéphane Diemer committed

def get_dns_servers() -> set:
    servers = list()
    ip_pattern = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
    # dbus method
    try:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        bus = pydbus.SystemBus()
        bus_client = bus.get("org.freedesktop.resolve1", "/org/freedesktop/resolve1")
        servers.extend(
            [".".join(map(str, dns[2])) for dns in bus_client.DNS if dns[1] == 2]
        )  # IPv4
        servers.extend(
            [":".join(map(str, dns[2])) for dns in bus_client.DNS if dns[1] == 10]
        )  # IPv6
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    except Exception:
        pass
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed

    # network-manager method
    if not len(servers) and subprocess.getstatusoutput("command -v nmcli")[0] == 0:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        _, output = subprocess.getstatusoutput(
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
            "nmcli -f all device show | grep IP4.DNS | awk '{ print $2 }'"
        )
        servers = [l for l in output.split("\n") if ip_pattern.match(l)]
    # resolvconf method
    if not len(servers) and Path("/etc/resolv.conf").exists():
        with open("/etc/resolv.conf", "r") as f:
            d = f.read().strip()
            servers = [
                l.split()[1] for l in d.split("\n") if l.startswith("nameserver")
            ]
    # systemd-resolved method
    if "127.0.0.53" in servers:
        servers.remove("127.0.0.53")
        _, output = subprocess.getstatusoutput("systemd-resolve --status")
        lines = [l.strip() for l in output.split("\n")]
        dns_line = False
        for line in lines:
            if line.startswith("DNS Servers:"):
                dns_line = True
                servers.append(line.split()[-1])
            elif dns_line and ip_pattern.match(line):
                servers.append(line)
            else:
                dns_line = False

Stéphane Diemer's avatar
Stéphane Diemer committed

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
def get_result(output: str) -> str:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    for line in output.split("\n"):
        if "has address " in line:
            return line.split("has address ")[1]
Stéphane Diemer's avatar
Stéphane Diemer committed

def check_dns(hostname: str, expected_ip: str, resolvers: set) -> tuple:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    warnings = 0
    errors = 0

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    resolver = dns.resolver.Resolver(configure=False)
    resolver.nameservers = list(resolvers)
    try:
        answers = [rdata.address for rdata in resolver.query(hostname)]
    except Exception as dns_err:
        u.error("cannot resolve {}: {}".format(hostname, dns_err))
        errors += 1
    else:
        for address in answers:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
            if address == expected_ip:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
                u.success("{}".format(address))
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
            else:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
                u.error("{} instead of {}".format(address, expected_ip))
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
                errors += 1

    return warnings, errors


def check_resolver(conf: dict, resolvers: set, ip: str) -> tuple:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    warnings = 0
    errors = 0

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    conf_resolvers_keys = ("NETWORK_DNS1", "NETWORK_DNS2", "NETWORK_DNS3")
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    for conf_resolver_key in conf_resolvers_keys:
        conf_resolver = conf.get(conf_resolver_key)
        if conf_resolver and conf_resolver not in resolvers:
            u.warning("resolver {} not configured".format(conf_resolver))
            warnings += 1
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed

    if not ip and (not errors):
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        u.info("no resolver defined in envsetup configuration, unable to test DNS")
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        exit(2)

    return warnings, errors


def main():
    print("Check DNS settings:")
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    if not u.supported_platform():
        u.info("Platform not supported")
        exit(2)
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed

    warnings = 0
    errors = 0
    conf = u.load_conf()
    resolvers = get_dns_servers()
    ip = conf.get("NETWORK_IP_NAT") or conf.get("NETWORK_IP")

    check_resolver_warn, check_resolver_err = check_resolver(conf, resolvers, ip)
    if check_resolver_err:
        errors += check_resolver_err
    if check_resolver_warn:
        warnings += check_resolver_warn

    services_info = (
        ("MS_SERVER_NAME", "mediaserver", "ubicast-mediaserver"),
        ("MONITOR_SERVER_NAME", "monitor", "ubicast-monitor"),
        ("CM_SERVER_NAME", "mirismanager", "ubicast-skyreach"),
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    )

    for conf_name, default_domain, package in services_info:
        domain = conf.get(conf_name)
        resolution_ignored = conf.get("TESTER_DNS_RESOLUTION_IGNORED", "").split(",")
        if (
            domain
            and domain not in ("localhost", default_domain)
            and domain not in resolution_ignored
        ):
            # check that the service is installed on this system
            status, _ = subprocess.getstatusoutput("dpkg -s {}".format(package))
            if status == 0:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
                u.info("resolving {}".format(domain))
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
                check_dns_warn, check_dns_err = check_dns(domain, ip, resolvers)
                if check_dns_err:
                    errors += check_dns_err
                if check_dns_warn:
                    warnings += check_dns_warn
            else:
                u.info("{} not installed, skip {}".format(package, domain))

    if errors:
        exit(1)
    elif warnings:
        exit(3)


if __name__ == "__main__":
    main()