#!/usr/bin/env python3 ''' Criticality: High Tests that all webserver services (vhosts) are available and reachable. ''' from pathlib import Path import re import requests import socket import sys if sys.version_info < (3, 6, 0): sys.exit(2) try: from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) except ImportError: requests.packages.urllib3.disable_warnings() sys.path.append(str(Path(__file__).parents[1].resolve())) # pylint: disable=wrong-import-position import utils as u # noqa: E402 ''' This script checks for all enabled vhosts in Nginx conf that: * The response status code is 200, 401 or 403. * The host is resolved as 127.0.0.1. * The Wowza response is correct on /streaming/ (only for mediaserver vhosts). ''' def get_configs(path: str) -> list: configs_dir = Path(path) configs = [c.resolve() for c in configs_dir.glob('*.conf')] return configs def get_vhosts(config: Path) -> list: # remove comments and blank lines sanitize = re.compile(r'(?:\s*#\s*.*)|(?:^\s*)', re.M) # capture server blocks servers = re.compile(r'^server\s+{(?:\s*(?!server\s{).)+', re.M) with open(str(config)) as config_fo: config_content = sanitize.sub(r'', config_fo.read()) vhosts = servers.findall(config_content) return vhosts def get_hostnames(vhost: str) -> list: # extract hostname(s) from server_name values server_names = re.compile(r'^\s*server_name\s+(.*);$') hostnames = [] for line in vhost.splitlines(): if server_names.match(line): hostnames.extend(server_names.match(line).group(1).split()) return hostnames def get_ports(vhost: str) -> list: # extract port(s) from listen values listens = re.compile(r'^\s*listen\s+(?:.*:)?(\d+)\s*(ssl)?.*;$') ports = [] for line in vhost.splitlines(): if listens.match(line): ports.append( (listens.match(line).group(1), 'https' if listens.match(line).group(2) else 'http') ) return ports def test_vhost( ports_info=None, domains=None, resolution_ignored=None, nginx_file=None, wowza_dir=None, tested=0, ): errors = 0 warnings = 0 name = nginx_file.name for port, proto in ports_info or [(80, False)]: for domain in domains or ['localhost']: url = '%s://%s:%s' % (proto, domain, port) u.info('- testing url "%s" from %s' % (url, name)) # test domain IP ip_error = None ip_warning = None try: ip = socket.gethostbyname(domain) except Exception as e: ip_error = '%s not resolved: %s' % (domain, e) else: if ip != '127.0.0.1' and ip != '127.0.1.1': ip_warning = '%s resolve to %s instead of 127.0.0.1' % (domain, ip) if ip_error: if resolution_ignored and domain in resolution_ignored: u.info('%s (ignored)' % ip_error) ip_error = None else: u.error(ip_error) elif ip_warning: if resolution_ignored and domain in resolution_ignored: u.info('%s (ignored)' % ip_warning) ip_warning = None else: u.warning(ip_warning) # test url req_error = False try: req = requests.get( url, verify=False, proxies={'http': '', 'https': ''}, timeout=30 ) req_time = int(1000 * req.elapsed.total_seconds()) except Exception as e: code = str(e) req_time = 0 else: code = req.status_code if ( domain != 'localhost' and code not in (200, 401, 403) or domain == 'localhost' and code not in (200, 401, 403, 404) ): u.error('%s status: %s, %s ms' % (domain, code, req_time)) req_error = True else: if req_time > 10000: u.warning('%s status: %s, %s ms' % (domain, code, req_time)) warnings += 1 if 'mediaserver' in name and wowza_dir: # test /streaming url try: req = requests.get( url + '/streaming/', verify=False, proxies={'http': '', 'https': ''}, timeout=30, ) req_time = int(1000 * req.elapsed.total_seconds()) except Exception as e: code = str(e) req_time = 0 else: code = req.status_code if code != 200: u.error('%s streaming: %s, %s ms' % (domain, code, req_time)) req_error = True elif req_time > 10000: u.warning('%s streaming: %s, %s ms' % (domain, code, req_time)) warnings += 1 tested += 1 if ip_warning: warnings += 1 if ip_error or req_error: errors += 1 return tested, warnings, errors def main(): print('Check that nginx vhosts are well configured:') # check that Nginx dir exists nginx_dir = '/etc/nginx/sites-enabled' if not Path(nginx_dir).exists(): u.info('nginx dir does not exists ("%s"), test skipped.' % nginx_dir) exit(2) # check that Wowza is installed wowza_dir = '/usr/local/WowzaStreamingEngine' if not Path(wowza_dir).exists(): wowza_dir = None # get envsetup conf conf = u.load_conf() # get enabled vhosts resolution_ignored = conf.get('TESTER_VHOST_RESOLUTION_IGNORED', '').split(',') errors = 0 warnings = 0 nginx_confs = get_configs(nginx_dir) for nginx_conf in nginx_confs: tested = 0 vhosts = get_vhosts(nginx_conf) for vhost in vhosts: hostnames = get_hostnames(vhost) ports = get_ports(vhost) t, w, e = test_vhost( ports, hostnames, resolution_ignored, nginx_conf, wowza_dir, tested, ) tested += t warnings += w errors += e if errors: exit(1) elif warnings: exit(3) if not tested: u.error('no url found in nginx sites-enabled dir') exit(1) if __name__ == '__main__': main()