#!/usr/bin/env python3 """ Criticality: High Tests that all webserver services (vhosts) are available and reachable. """ from pathlib import Path import re import requests import socket import sys try: from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) except ImportError: requests.packages.urllib3.disable_warnings() sys.path.append(str(Path(__file__).parents[1].resolve())) # pylint: disable=wrong-import-position import utils as u # noqa: E402 """ This script checks for all enabled vhosts in Nginx conf that: * The response status code is 200, 401 or 403. * The host is resolved as 127.0.0.1. * The Wowza response is correct on /streaming/ (only for mediaserver vhosts). """ def get_configs(path: str) -> list: configs_dir = Path(path) configs = [c.resolve() for c in configs_dir.glob("*.conf")] return configs def get_vhosts(config: Path) -> list: # remove comments and blank lines sanitize = re.compile(r"(?:\s*#\s*.*)|(?:^\s*)", re.M) # capture server blocks servers = re.compile(r"^server\s+{(?:\s*(?!server\s{).)+", re.M) with open(str(config)) as config_fo: config_content = sanitize.sub(r"", config_fo.read()) vhosts = servers.findall(config_content) return vhosts def get_hostnames(vhost: str) -> list: # extract hostname(s) from server_name values server_names = re.compile(r"^\s*server_name\s+(.*);$") hostnames = [] for line in vhost.splitlines(): if server_names.match(line): hostnames.extend(server_names.match(line).group(1).split()) return hostnames def get_ports(vhost: str) -> list: # extract port(s) from listen values listens = re.compile(r"^\s*listen\s+(?:.*:)?(\d+)\s*(ssl)?.*;$") ports = [] for line in vhost.splitlines(): if listens.match(line): ports.append( (listens.match(line).group(1), "https" if listens.match(line).group(2) else "http") ) return ports def test_vhost( ports_info=None, domains=None, resolution_ignored=None, celerity_conf="", nginx_file=None, wowza_dir=None, tested=0, ): errors = 0 warnings = 0 name = nginx_file.name for port, proto in ports_info or [(80, False)]: for domain in domains or ["localhost"]: url = f"{proto}://{domain}:{port}" u.info(f"- testing url '{url}' from {name}") if name.startswith("mediaserver") and not tested: if not re.search(r"https?://%s" % domain, celerity_conf): u.warning(f"url '{url}' not found in celerity conf") warnings += 1 # test domain IP ip_error = None ip_warning = None try: ip = socket.gethostbyname(domain) except Exception as e: ip_error = f"{domain} not resolved: {e}" else: if ip != "127.0.0.1": ip_warning = f"{domain} resolve to {ip} instead of 127.0.0.1" if ip_error: if resolution_ignored and domain in resolution_ignored: u.info(f"{ip_error} (ignored)") ip_error = None else: u.error(ip_error) elif ip_warning: if resolution_ignored and domain in resolution_ignored: u.info(f"{ip_warning} (ignored)") ip_warning = None else: u.warning(ip_warning) # test url req_error = False try: req = requests.get( url, verify=False, proxies={"http": "", "https": ""}, timeout=30 ) req_time = int(1000 * req.elapsed.total_seconds()) except Exception as e: code = str(e) req_time = 0 else: code = req.status_code if ( domain != "localhost" and code not in (200, 401, 403) or domain == "localhost" and code not in (200, 401, 403, 404) ): u.error(f"{domain} status: {code}, {req_time}ms") req_error = True else: if req_time > 10000: u.warning(f"{domain} status: {code}, {req_time}ms") warnings += 1 if "mediaserver" in name and wowza_dir: # test /streaming url try: req = requests.get( url + "/streaming/", verify=False, proxies={"http": "", "https": ""}, timeout=30, ) req_time = int(1000 * req.elapsed.total_seconds()) except Exception as e: code = str(e) req_time = 0 else: code = req.status_code if code != 200: u.error(f"{domain} streaming: {code}, {req_time}ms") req_error = True elif req_time > 10000: u.warning(f"{domain} streaming: {code}, {req_time}ms") warnings += 1 tested += 1 if ip_warning: warnings += 1 if ip_error or req_error: errors += 1 return tested, warnings, errors def main(): print("Check that nginx vhosts are well configured:") # check that Nginx dir exists nginx_dir = "/etc/nginx/sites-enabled" if not Path(nginx_dir).exists(): u.info(f"nginx dir does not exists ('{nginx_dir}'), test skipped.") exit(2) # check that Wowza is installed wowza_dir = "/usr/local/WowzaStreamingEngine" if not Path(wowza_dir).exists(): wowza_dir = None # get envsetup conf conf = u.load_conf() # get celerity conf celerity_conf = "" if Path("/etc/celerity/config.py").exists(): with open("/etc/celerity/config.py", "r") as fo: celerity_conf = fo.read() # get enabled vhosts resolution_ignored = conf.get("TESTER_VHOST_RESOLUTION_IGNORED", "").split(",") errors = 0 warnings = 0 nginx_confs = get_configs(nginx_dir) for nginx_conf in nginx_confs: tested = 0 vhosts = get_vhosts(nginx_conf) for vhost in vhosts: hostnames = get_hostnames(vhost) ports = get_ports(vhost) t, w, e = test_vhost( ports, hostnames, resolution_ignored, celerity_conf, nginx_conf, wowza_dir, tested, ) tested += t warnings += w errors += e if errors: exit(1) elif warnings: exit(3) if not tested: u.error("no url found in nginx sites-enabled dir") exit(1) if __name__ == "__main__": main()