#!/usr/bin/env python3 """ Criticality: High Tests that all webserver services (vhosts) are available and reachable. """ import imp import os import re import requests import socket import sys try: from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) except ImportError: requests.packages.urllib3.disable_warnings() """ This script checks for all enabled vhosts in Nginx conf that: * The response status code is 200, 401 or 403. * The host is resolved as 127.0.0.1. * The Wowza response is correct on /streaming/ (only for mediaserver vhosts). """ def test_vhost( ports_info=None, domains=None, resolution_ignored=None, celerity_conf="" ): tested = 0 errors = 0 warnings = 0 for port, https in ports_info or [(80, False)]: for domain in domains or ["localhost"]: tested += 1 url = "%s://%s:%s" % ("https" if https else "http", domain, port) sys.stdout.write('Testing url "%s":\n' % url) if name.startswith("mediaserver"): if not celerity_conf or not re.search( r"http[s]{0,1}://%s" % domain, celerity_conf ): sys.stdout.write( '\033[93mWarning:\033[0m Url "%s" not found in celerity conf; it should also be set in the MediaWorker.\n' % url ) warnings += 1 # test domain IP ip_error = None ip_warning = None try: ip = socket.gethostbyname(domain) except Exception as e: ip_error = "domain is not resolved: %s" % e else: if ip != "127.0.0.1": ip_warning = "domain is resolved with %s instead of 127.0.0.1" % ip sys.stdout.write(" IP: ") if ip_error: if resolution_ignored and domain in resolution_ignored: sys.stdout.write("\033[94mIgnored (%s)\033[0m" % ip_error) ip_error = None else: sys.stdout.write("\033[91mKO (%s)\033[0m" % ip_error) elif ip_warning: if resolution_ignored and domain in resolution_ignored: sys.stdout.write("\033[94mIgnored (%s)\033[0m" % ip_warning) ip_warning = None else: sys.stdout.write("\033[93mWarning (%s)\033[0m" % ip_warning) else: sys.stdout.write("\033[92mOK (127.0.0.1)\033[0m") # test url sys.stdout.write(", status: ") req_error = False try: req = requests.get( url, verify=False, proxies={"http": "", "https": ""}, timeout=30 ) req_time = int(1000 * req.elapsed.total_seconds()) except Exception as e: code = str(e) req_time = 0 else: code = req.status_code if ( domain != "localhost" and code not in (200, 401, 403) or domain == "localhost" and code not in (200, 401, 403, 404) ): sys.stdout.write("\033[91mKO (%s, %sms)\033[0m" % (code, req_time)) req_error = True else: if req_time > 10000: sys.stdout.write("\033[93mOK (%s, %sms)\033[0m" % (code, req_time)) warnings += 1 else: sys.stdout.write("\033[92mOK (%s, %sms)\033[0m" % (code, req_time)) if "mediaserver" in name and wowza_dir: # test /streaming url sys.stdout.write(", streaming: ") try: req = requests.get( url + "/streaming/", verify=False, proxies={"http": "", "https": ""}, timeout=30, ) req_time = int(1000 * req.elapsed.total_seconds()) except Exception as e: code = str(e) req_time = 0 else: code = req.status_code if code != 200: sys.stdout.write( "\033[91mKO (%s, %sms)\033[0m" % (code, req_time) ) req_error = True elif req_time > 10000: sys.stdout.write( "\033[93mOK (%s, %sms)\033[0m" % (code, req_time) ) else: sys.stdout.write( "\033[92mOK (%s, %sms)\033[0m" % (code, req_time) ) sys.stdout.write(".\n") if ip_warning: warnings += 1 if ip_error or req_error: errors += 1 return tested, warnings, errors if __name__ == "__main__": # check that Nginx dir exists nginx_dir = "/etc/nginx/sites-enabled" if not os.path.exists(nginx_dir): print('Nginx dir does not exists ("%s").' % nginx_dir) sys.exit(2) # check that Wowza is installed wowza_dir = "/usr/local/WowzaStreamingEngine" if not os.path.exists(wowza_dir): print('Info: Wowza is not installed ("%s" does not exist).' % wowza_dir) wowza_dir = None else: print( "Info: Wowza is installed, /streaming/ will be tested on mediaserver vhosts." ) # get envsetup conf conf = dict() os.chdir(os.path.dirname(__file__)) if os.path.isfile("../utils.py"): es_utils = imp.load_source("es_utils", "../utils.py") conf = es_utils.load_conf() # get celerity conf celerity_conf = "" if os.path.exists("/etc/celerity/config.py"): with open("/etc/celerity/config.py", "r") as fo: celerity_conf = fo.read() # get enabled vhosts resolution_ignored = conf.get("TESTER_VHOST_RESOLUTION_IGNORED", "").split(",") tested = 0 errors = 0 warnings = 0 names = os.listdir(nginx_dir) names.sort() for name in names: path = os.path.join(nginx_dir, name) level = 0 domains = list() ports_info = list() print('Parsing vhost "%s"...' % path) with open(path, "r") as fo: for line in fo: line = line.strip() if not line or line.startswith("#"): continue words = re.sub(r"\s+", " ", line).strip("; ").split(" ") if "{" in words or "}" in words: level += words.count("{") level -= words.count("}") if level == 0: # test if ports_info or domains: t, w, e = test_vhost( ports_info, domains, resolution_ignored, celerity_conf ) tested += t warnings += w errors += e domains = list() ports_info = list() elif level == 1: # server section are level 1 if words[0] == "listen": https = "ssl" in words for port in words: try: port = int(port.split(":")[-1]) except ValueError: pass else: ports_info.append((port, https)) elif words[0] == "server_name": domains = words[1:] if errors: print("%s url(s) did not correctly respond." % errors) sys.exit(1) elif warnings: sys.exit(3) if not tested: print("No url found in Nginx sites-enabled dir.") sys.exit(1)