Newer
Older

Stéphane Diemer
committed
Criticality: Normal

Florent Thiery
committed
Checks that DNS records are provided by the customer servers are correctly set
sys.path.append(str(Path(__file__).parents[1].resolve()))
# pylint: disable=wrong-import-position
from envsetup import utils as u # noqa: E402
def get_dns_servers() -> set:
if subprocess.getstatusoutput("command -v nmcli")[0] == 0:
# network-manager
"nmcli -f all device show | grep IP4.DNS | awk '{ print $2 }'"
)
if not len(servers):
# resolvconf
with open("/etc/resolv.conf", "r") as f:
d = f.read().strip()
servers = [l.split()[1] for l in d.split("\n") if l.startswith("nameserver")]
if "127.0.0.53" in servers:
# systemd-resolved
_, output = subprocess.getstatusoutput("systemd-resolve --status")
lines = [l.strip() for l in output.split("\n")]
dns_line = False
ip_pattern = re.compile("\d+\.\d+\.\d+\.\d+")
for line in lines:
if line.startswith("DNS Servers:"):
dns_line = True
servers.append(line.split()[-1])
elif dns_line and ip_pattern.match(line):
servers.append(line)
else:
dns_line = False
for line in output.split("\n"):
if "has address " in line:
return line.split("has address ")[1]
def check_dns(hostname: str, expected_ip: str, resolvers: set) -> tuple:
if address == expected_ip:
u.success("dns({}): {} -> {}".format(resolver, hostname, address))
else:
u.error(
"dns({}): {} -> {} (should be {})".format(
resolver, hostname, address, expected_ip
)
)
errors += 1
u.error("dns({}): cannot resolve {}".format(resolver, hostname))
errors += 1
return warnings, errors
def check_resolver(conf: dict, resolvers: set, ip: str) -> tuple:
conf_resolvers_keys = ("NETWORK_DNS1", "NETWORK_DNS2", "NETWORK_DNS3")
for conf_resolver_key in conf_resolvers_keys:
conf_resolver = conf.get(conf_resolver_key)
if conf_resolver and conf_resolver not in resolvers:
u.warning("resolver {} not configured".format(conf_resolver))
warnings += 1
u.info("no resolver defined in envsetup configuration, unable to test DNS")
exit(2)
return warnings, errors
def main():
print("Check DNS settings:")
warnings = 0
errors = 0
conf = u.load_conf()
resolvers = get_dns_servers()
ip = conf.get("NETWORK_IP_NAT") or conf.get("NETWORK_IP")
check_resolver_warn, check_resolver_err = check_resolver(conf, resolvers, ip)
if check_resolver_err:
errors += check_resolver_err
if check_resolver_warn:
warnings += check_resolver_warn
services_info = (
("MS_SERVER_NAME", "mediaserver", "ubicast-mediaserver"),
("MONITOR_SERVER_NAME", "monitor", "ubicast-monitor"),
("CM_SERVER_NAME", "mirismanager", "ubicast-skyreach"),
)
for conf_name, default_domain, package in services_info:
domain = conf.get(conf_name)
resolution_ignored = conf.get("TESTER_DNS_RESOLUTION_IGNORED", "").split(",")
if (
domain
and domain not in ("localhost", default_domain)
and domain not in resolution_ignored
):
# check that the service is installed on this system
status, _ = subprocess.getstatusoutput("dpkg -s {}".format(package))
if status == 0:
check_dns_warn, check_dns_err = check_dns(domain, ip, resolvers)
if check_dns_err:
errors += check_dns_err
if check_dns_warn:
warnings += check_dns_warn
else:
u.info("{} not installed, skip {}".format(package, domain))
if errors:
exit(1)
elif warnings:
exit(3)
if __name__ == "__main__":
main()