diff --git a/tests/test_dns_records.py b/tests/test_dns_records.py index b77097b4d41e68db738a6493eb2bc18da23c2ed6..aef6be3a948e942bafdfa01895aad2e63fc24135 100755 --- a/tests/test_dns_records.py +++ b/tests/test_dns_records.py @@ -44,21 +44,21 @@ def get_dns_servers() -> set: _, output = subprocess.getstatusoutput( "nmcli -f all device show | grep IP4.DNS | awk '{ print $2 }'" ) - servers = [l for l in output.split("\n") if ip_pattern.match(l)] + servers = [line for line in output.split("\n") if ip_pattern.match(line)] # resolvconf method if not len(servers) and Path("/etc/resolv.conf").exists(): - with open("/etc/resolv.conf", "r") as f: - d = f.read().strip() + with open("/etc/resolv.conf", "r") as fo: + content = fo.read().strip() servers = [ - l.split()[1] for l in d.split("\n") if l.startswith("nameserver") + line.split()[1] for line in content.split("\n") if line.startswith("nameserver") ] # systemd-resolved method if "127.0.0.53" in servers: servers.remove("127.0.0.53") _, output = subprocess.getstatusoutput("systemd-resolve --status") - lines = [l.strip() for l in output.split("\n")] + lines = [line.strip() for line in output.split("\n")] dns_line = False for line in lines: if line.startswith("DNS Servers:"): diff --git a/tests/test_nginx_vhosts.py b/tests/test_nginx_vhosts.py index 6f46a65261c6e05d1dcd50873031284f1d1b7d43..8d5892150811c359c03c7fdd982e20a8534ed96e 100755 --- a/tests/test_nginx_vhosts.py +++ b/tests/test_nginx_vhosts.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 -""" +''' Criticality: High Tests that all webserver services (vhosts) are available and reachable. -""" +''' from pathlib import Path import re @@ -26,29 +26,29 @@ sys.path.append(str(Path(__file__).parents[1].resolve())) # pylint: disable=wrong-import-position import utils as u # noqa: E402 -""" +''' This script checks for all enabled vhosts in Nginx conf that: * The response status code is 200, 401 or 403. * The host is resolved as 127.0.0.1. * The Wowza response is correct on /streaming/ (only for mediaserver vhosts). -""" +''' def get_configs(path: str) -> list: configs_dir = Path(path) - configs = [c.resolve() for c in configs_dir.glob("*.conf")] + configs = [c.resolve() for c in configs_dir.glob('*.conf')] return configs def get_vhosts(config: Path) -> list: # remove comments and blank lines - sanitize = re.compile(r"(?:\s*#\s*.*)|(?:^\s*)", re.M) + sanitize = re.compile(r'(?:\s*#\s*.*)|(?:^\s*)', re.M) # capture server blocks - servers = re.compile(r"^server\s+{(?:\s*(?!server\s{).)+", re.M) + servers = re.compile(r'^server\s+{(?:\s*(?!server\s{).)+', re.M) with open(str(config)) as config_fo: - config_content = sanitize.sub(r"", config_fo.read()) + config_content = sanitize.sub(r'', config_fo.read()) vhosts = servers.findall(config_content) return vhosts @@ -56,7 +56,7 @@ def get_vhosts(config: Path) -> list: def get_hostnames(vhost: str) -> list: # extract hostname(s) from server_name values - server_names = re.compile(r"^\s*server_name\s+(.*);$") + server_names = re.compile(r'^\s*server_name\s+(.*);$') hostnames = [] for line in vhost.splitlines(): @@ -68,13 +68,13 @@ def get_hostnames(vhost: str) -> list: def get_ports(vhost: str) -> list: # extract port(s) from listen values - listens = re.compile(r"^\s*listen\s+(?:.*:)?(\d+)\s*(ssl)?.*;$") + listens = re.compile(r'^\s*listen\s+(?:.*:)?(\d+)\s*(ssl)?.*;$') ports = [] for line in vhost.splitlines(): if listens.match(line): ports.append( - (listens.match(line).group(1), "https" if listens.match(line).group(2) else "http") + (listens.match(line).group(1), 'https' if listens.match(line).group(2) else 'http') ) return ports @@ -92,28 +92,28 @@ def test_vhost( warnings = 0 name = nginx_file.name for port, proto in ports_info or [(80, False)]: - for domain in domains or ["localhost"]: - url = f"{proto}://{domain}:{port}" - u.info(f"- testing url '{url}' from {name}") + for domain in domains or ['localhost']: + url = '%s://%s:%s' % (proto, domain, port) + u.info('- testing url "%s" from %s' % (url, name)) # test domain IP ip_error = None ip_warning = None try: ip = socket.gethostbyname(domain) except Exception as e: - ip_error = f"{domain} not resolved: {e}" + ip_error = '%s not resolved: %s' % (domain, e) else: - if ip != "127.0.0.1" and ip != "127.0.1.1": - ip_warning = f"{domain} resolve to {ip} instead of 127.0.0.1" + if ip != '127.0.0.1' and ip != '127.0.1.1': + ip_warning = '%s resolve to %s instead of 127.0.0.1' % (domain, ip) if ip_error: if resolution_ignored and domain in resolution_ignored: - u.info(f"{ip_error} (ignored)") + u.info('%s (ignored)' % ip_error) ip_error = None else: u.error(ip_error) elif ip_warning: if resolution_ignored and domain in resolution_ignored: - u.info(f"{ip_warning} (ignored)") + u.info('%s (ignored)' % ip_warning) ip_warning = None else: u.warning(ip_warning) @@ -121,7 +121,7 @@ def test_vhost( req_error = False try: req = requests.get( - url, verify=False, proxies={"http": "", "https": ""}, timeout=30 + url, verify=False, proxies={'http': '', 'https': ''}, timeout=30 ) req_time = int(1000 * req.elapsed.total_seconds()) except Exception as e: @@ -130,24 +130,24 @@ def test_vhost( else: code = req.status_code if ( - domain != "localhost" + domain != 'localhost' and code not in (200, 401, 403) - or domain == "localhost" + or domain == 'localhost' and code not in (200, 401, 403, 404) ): - u.error(f"{domain} status: {code}, {req_time}ms") + u.error('%s status: %s, %s ms' % (domain, code, req_time)) req_error = True else: if req_time > 10000: - u.warning(f"{domain} status: {code}, {req_time}ms") + u.warning('%s status: %s, %s ms' % (domain, code, req_time)) warnings += 1 - if "mediaserver" in name and wowza_dir: + if 'mediaserver' in name and wowza_dir: # test /streaming url try: req = requests.get( - url + "/streaming/", + url + '/streaming/', verify=False, - proxies={"http": "", "https": ""}, + proxies={'http': '', 'https': ''}, timeout=30, ) req_time = int(1000 * req.elapsed.total_seconds()) @@ -157,10 +157,10 @@ def test_vhost( else: code = req.status_code if code != 200: - u.error(f"{domain} streaming: {code}, {req_time}ms") + u.error('%s streaming: %s, %s ms' % (domain, code, req_time)) req_error = True elif req_time > 10000: - u.warning(f"{domain} streaming: {code}, {req_time}ms") + u.warning('%s streaming: %s, %s ms' % (domain, code, req_time)) warnings += 1 tested += 1 @@ -174,15 +174,15 @@ def test_vhost( def main(): - print("Check that nginx vhosts are well configured:") + print('Check that nginx vhosts are well configured:') # check that Nginx dir exists - nginx_dir = "/etc/nginx/sites-enabled" + nginx_dir = '/etc/nginx/sites-enabled' if not Path(nginx_dir).exists(): - u.info(f"nginx dir does not exists ('{nginx_dir}'), test skipped.") + u.info('nginx dir does not exists ("%s"), test skipped.' % nginx_dir) exit(2) # check that Wowza is installed - wowza_dir = "/usr/local/WowzaStreamingEngine" + wowza_dir = '/usr/local/WowzaStreamingEngine' if not Path(wowza_dir).exists(): wowza_dir = None @@ -190,7 +190,7 @@ def main(): conf = u.load_conf() # get enabled vhosts - resolution_ignored = conf.get("TESTER_VHOST_RESOLUTION_IGNORED", "").split(",") + resolution_ignored = conf.get('TESTER_VHOST_RESOLUTION_IGNORED', '').split(',') errors = 0 warnings = 0 nginx_confs = get_configs(nginx_dir) @@ -217,9 +217,9 @@ def main(): elif warnings: exit(3) if not tested: - u.error("no url found in nginx sites-enabled dir") + u.error('no url found in nginx sites-enabled dir') exit(1) -if __name__ == "__main__": +if __name__ == '__main__': main() diff --git a/tests/test_partitions.py b/tests/test_partitions.py index 739ce1594cd0ad4b96520c0dcff16a1639a248cf..9476315bbd4ba56005905a536b00dda79fb36073 100755 --- a/tests/test_partitions.py +++ b/tests/test_partitions.py @@ -51,10 +51,10 @@ def to_gbytes(size_bytes): def get_memory_gbytes(): memory_gbytes = 0 - with open('/proc/meminfo', 'r') as f: - for l in f: - if 'MemTotal:' in l: - memory = l.split('MemTotal:')[1].strip() + with open('/proc/meminfo', 'r') as fo: + for line in fo: + if 'MemTotal:' in line: + memory = line.split('MemTotal:')[1].strip() memory_kbytes, unit = memory.split(' ') if unit != 'kB': print('Warning, unexpected unit %s.' % unit) @@ -80,10 +80,10 @@ def check_allocation(dev): if not root_dev: return True dev_partitions = list() - with open('/proc/partitions', 'r') as f: - for l in f: - if root_dev in l: - dev_partitions.append(l) + with open('/proc/partitions', 'r') as fo: + for line in fo: + if root_dev in line: + dev_partitions.append(line) max_size = 0 total_size = 0