Newer
Older

Stéphane Diemer
committed
Criticality: Normal

Florent Thiery
committed
Checks that DNS records are provided by the customer servers are correctly set
sys.path.append(str(Path(__file__).parents[1].resolve()))
# pylint: disable=wrong-import-position
from envsetup import utils as u # noqa: E402
def get_dns_servers() -> set:
ip_pattern = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
bus = pydbus.SystemBus()
bus_client = bus.get("org.freedesktop.resolve1", "/org/freedesktop/resolve1")
servers.extend(
[".".join(map(str, dns[2])) for dns in bus_client.DNS if dns[1] == 2]
) # IPv4
servers.extend(
[":".join(map(str, dns[2])) for dns in bus_client.DNS if dns[1] == 10]
) # IPv6
# network-manager method
if not len(servers) and subprocess.getstatusoutput("command -v nmcli")[0] == 0:
"nmcli -f all device show | grep IP4.DNS | awk '{ print $2 }'"
)
servers = [l for l in output.split("\n") if ip_pattern.match(l)]
# resolvconf method
if not len(servers) and Path("/etc/resolv.conf").exists():
with open("/etc/resolv.conf", "r") as f:
d = f.read().strip()
servers = [
l.split()[1] for l in d.split("\n") if l.startswith("nameserver")
]
_, output = subprocess.getstatusoutput("systemd-resolve --status")
lines = [l.strip() for l in output.split("\n")]
dns_line = False
for line in lines:
if line.startswith("DNS Servers:"):
dns_line = True
servers.append(line.split()[-1])
elif dns_line and ip_pattern.match(line):
servers.append(line)
else:
dns_line = False
for line in output.split("\n"):
if "has address " in line:
return line.split("has address ")[1]
def check_dns(hostname: str, expected_ip: str, resolvers: set) -> tuple:
resolver = dns.resolver.Resolver(configure=False)
resolver.nameservers = list(resolvers)
try:
answers = [rdata.address for rdata in resolver.query(hostname)]
except Exception as dns_err:
u.error("cannot resolve {}: {}".format(hostname, dns_err))
errors += 1
else:
for address in answers:
u.error("{} instead of {}".format(address, expected_ip))
def check_resolver(conf: dict, resolvers: set, ip: str) -> tuple:
conf_resolvers_keys = ("NETWORK_DNS1", "NETWORK_DNS2", "NETWORK_DNS3")
for conf_resolver_key in conf_resolvers_keys:
conf_resolver = conf.get(conf_resolver_key)
if conf_resolver and conf_resolver not in resolvers:
u.warning("resolver {} not configured".format(conf_resolver))
warnings += 1
u.info("no resolver defined in envsetup configuration, unable to test DNS")
exit(2)
return warnings, errors
def main():
print("Check DNS settings:")
if not u.supported_platform():
u.info("Platform not supported")
exit(2)
warnings = 0
errors = 0
conf = u.load_conf()
resolvers = get_dns_servers()
ip = conf.get("NETWORK_IP_NAT") or conf.get("NETWORK_IP")
check_resolver_warn, check_resolver_err = check_resolver(conf, resolvers, ip)
if check_resolver_err:
errors += check_resolver_err
if check_resolver_warn:
warnings += check_resolver_warn
services_info = (
("MS_SERVER_NAME", "mediaserver", "ubicast-mediaserver"),
("MONITOR_SERVER_NAME", "monitor", "ubicast-monitor"),
("CM_SERVER_NAME", "mirismanager", "ubicast-skyreach"),
)
for conf_name, default_domain, package in services_info:
domain = conf.get(conf_name)
resolution_ignored = conf.get("TESTER_DNS_RESOLUTION_IGNORED", "").split(",")
if (
domain
and domain not in ("localhost", default_domain)
and domain not in resolution_ignored
):
# check that the service is installed on this system
status, _ = subprocess.getstatusoutput("dpkg -s {}".format(package))
if status == 0:
check_dns_warn, check_dns_err = check_dns(domain, ip, resolvers)
if check_dns_err:
errors += check_dns_err
if check_dns_warn:
warnings += check_dns_warn
else:
u.info("{} not installed, skip {}".format(package, domain))
if errors:
exit(1)
elif warnings:
exit(3)
if __name__ == "__main__":
main()