diff --git a/tests/test_nginx_vhosts.py b/tests/test_nginx_vhosts.py index 7cecdf9c44a548ee3f333e07ba2ce6c714220743..69b91d6115f1d4e933211c08c4dba947696a8301 100755 --- a/tests/test_nginx_vhosts.py +++ b/tests/test_nginx_vhosts.py @@ -7,6 +7,7 @@ Tests that all webserver services (vhosts) are available and reachable. import imp import os +from pathlib import Path import re import requests import socket @@ -27,18 +28,69 @@ This script checks for all enabled vhosts in Nginx conf that: """ +def get_configs(path: str) -> list: + configs_dir = Path(path) + configs = [c.resolve() for c in configs_dir.glob("*.conf")] + + return configs + + +def get_vhosts(config: Path) -> list: + # remove comments and blank lines + sanitize = re.compile(r"(?:\s*#\s*.*)|(?:^\s*)", re.M) + # capture server blocks + servers = re.compile(r"^server\s+{(?:\s*(?!server\s{).)+", re.M) + + with open(config) as config_fo: + config_content = sanitize.sub(r"", config_fo.read()) + vhosts = servers.findall(config_content) + + return vhosts + + +def get_hostnames(vhost: str) -> list: + # extract hostname(s) from server_name values + server_names = re.compile(r"^\s*server_name\s+(.*);$") + + hostnames = [] + for line in vhost.splitlines(): + if server_names.match(line): + hostnames.extend(server_names.match(line)[1].split()) + + return hostnames + + +def get_ports(vhost: str) -> list: + # extract port(s) from listen values + listens = re.compile(r"^\s*listen\s+(?:.*:)?(\d+)\s*(ssl)?.*;$") + + ports = [] + for line in vhost.splitlines(): + if listens.match(line): + ports.append( + (listens.match(line)[1], "https" if listens.match(line)[2] else "http") + ) + + return ports + + def test_vhost( - ports_info=None, domains=None, resolution_ignored=None, celerity_conf="" + ports_info=None, + domains=None, + resolution_ignored=None, + celerity_conf="", + nginx_file=None, + wowza_dir=None, + tested=0, ): - tested = 0 errors = 0 warnings = 0 - for port, https in ports_info or [(80, False)]: + name = nginx_file.stem + for port, proto in ports_info or [(80, False)]: for domain in domains or ["localhost"]: - tested += 1 - url = "%s://%s:%s" % ("https" if https else "http", domain, port) - sys.stdout.write('Testing url "%s":\n' % url) - if name.startswith("mediaserver"): + url = "%s://%s:%s" % (proto, domain, port) + sys.stdout.write('Testing url "%s" from %s:\n' % (url, name)) + if name.startswith("mediaserver") and not tested: if not celerity_conf or not re.search( r"http[s]{0,1}://%s" % domain, celerity_conf ): @@ -129,6 +181,7 @@ def test_vhost( "\033[92mOK (%s, %sms)\033[0m" % (code, req_time) ) sys.stdout.write(".\n") + tested += 1 if ip_warning: warnings += 1 @@ -138,7 +191,7 @@ def test_vhost( return tested, warnings, errors -if __name__ == "__main__": +def main(): # check that Nginx dir exists nginx_dir = "/etc/nginx/sites-enabled" if not os.path.exists(nginx_dir): @@ -170,50 +223,27 @@ if __name__ == "__main__": # get enabled vhosts resolution_ignored = conf.get("TESTER_VHOST_RESOLUTION_IGNORED", "").split(",") - tested = 0 errors = 0 warnings = 0 - names = os.listdir(nginx_dir) - names.sort() - for name in names: - path = os.path.join(nginx_dir, name) - level = 0 - domains = list() - ports_info = list() - print('Parsing vhost "%s"...' % path) - with open(path, "r") as fo: - for line in fo: - line = line.strip() - if not line or line.startswith("#"): - continue - words = re.sub(r"\s+", " ", line).strip("; ").split(" ") - if "{" in words or "}" in words: - level += words.count("{") - level -= words.count("}") - if level == 0: - # test - if ports_info or domains: - t, w, e = test_vhost( - ports_info, domains, resolution_ignored, celerity_conf - ) - tested += t - warnings += w - errors += e - domains = list() - ports_info = list() - elif level == 1: - # server section are level 1 - if words[0] == "listen": - https = "ssl" in words - for port in words: - try: - port = int(port.split(":")[-1]) - except ValueError: - pass - else: - ports_info.append((port, https)) - elif words[0] == "server_name": - domains = words[1:] + nginx_confs = get_configs(nginx_dir) + for nginx_conf in nginx_confs: + tested = 0 + vhosts = get_vhosts(nginx_conf) + for vhost in vhosts: + hostnames = get_hostnames(vhost) + ports = get_ports(vhost) + t, w, e = test_vhost( + ports, + hostnames, + resolution_ignored, + celerity_conf, + nginx_conf, + wowza_dir, + tested, + ) + tested += t + warnings += w + errors += e if errors: print("%s url(s) did not correctly respond." % errors) @@ -223,3 +253,7 @@ if __name__ == "__main__": if not tested: print("No url found in Nginx sites-enabled dir.") sys.exit(1) + + +if __name__ == "__main__": + main()