From b4a7415747808989695c21832e72cbf7fb09ebd8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Diemer?= <stephane.diemer@ubicast.eu>
Date: Mon, 25 May 2020 16:00:11 +0200
Subject: [PATCH] Fix tests for Python 3.5 (Ubuntu 16.04)

---
 tests/test_dns_records.py  | 10 +++---
 tests/test_nginx_vhosts.py | 72 +++++++++++++++++++-------------------
 tests/test_partitions.py   | 16 ++++-----
 3 files changed, 49 insertions(+), 49 deletions(-)

diff --git a/tests/test_dns_records.py b/tests/test_dns_records.py
index b77097b4..aef6be3a 100755
--- a/tests/test_dns_records.py
+++ b/tests/test_dns_records.py
@@ -44,21 +44,21 @@ def get_dns_servers() -> set:
         _, output = subprocess.getstatusoutput(
             "nmcli -f all device show | grep IP4.DNS | awk '{ print $2 }'"
         )
-        servers = [l for l in output.split("\n") if ip_pattern.match(l)]
+        servers = [line for line in output.split("\n") if ip_pattern.match(line)]
 
     # resolvconf method
     if not len(servers) and Path("/etc/resolv.conf").exists():
-        with open("/etc/resolv.conf", "r") as f:
-            d = f.read().strip()
+        with open("/etc/resolv.conf", "r") as fo:
+            content = fo.read().strip()
             servers = [
-                l.split()[1] for l in d.split("\n") if l.startswith("nameserver")
+                line.split()[1] for line in content.split("\n") if line.startswith("nameserver")
             ]
 
     # systemd-resolved method
     if "127.0.0.53" in servers:
         servers.remove("127.0.0.53")
         _, output = subprocess.getstatusoutput("systemd-resolve --status")
-        lines = [l.strip() for l in output.split("\n")]
+        lines = [line.strip() for line in output.split("\n")]
         dns_line = False
         for line in lines:
             if line.startswith("DNS Servers:"):
diff --git a/tests/test_nginx_vhosts.py b/tests/test_nginx_vhosts.py
index 6f46a652..8d589215 100755
--- a/tests/test_nginx_vhosts.py
+++ b/tests/test_nginx_vhosts.py
@@ -1,9 +1,9 @@
 #!/usr/bin/env python3
 
-"""
+'''
 Criticality: High
 Tests that all webserver services (vhosts) are available and reachable.
-"""
+'''
 
 from pathlib import Path
 import re
@@ -26,29 +26,29 @@ sys.path.append(str(Path(__file__).parents[1].resolve()))
 # pylint: disable=wrong-import-position
 import utils as u  # noqa: E402
 
-"""
+'''
 This script checks for all enabled vhosts in Nginx conf that:
 * The response status code is 200, 401 or 403.
 * The host is resolved as 127.0.0.1.
 * The Wowza response is correct on /streaming/ (only for mediaserver vhosts).
-"""
+'''
 
 
 def get_configs(path: str) -> list:
     configs_dir = Path(path)
-    configs = [c.resolve() for c in configs_dir.glob("*.conf")]
+    configs = [c.resolve() for c in configs_dir.glob('*.conf')]
 
     return configs
 
 
 def get_vhosts(config: Path) -> list:
     # remove comments and blank lines
-    sanitize = re.compile(r"(?:\s*#\s*.*)|(?:^\s*)", re.M)
+    sanitize = re.compile(r'(?:\s*#\s*.*)|(?:^\s*)', re.M)
     # capture server blocks
-    servers = re.compile(r"^server\s+{(?:\s*(?!server\s{).)+", re.M)
+    servers = re.compile(r'^server\s+{(?:\s*(?!server\s{).)+', re.M)
 
     with open(str(config)) as config_fo:
-        config_content = sanitize.sub(r"", config_fo.read())
+        config_content = sanitize.sub(r'', config_fo.read())
         vhosts = servers.findall(config_content)
 
     return vhosts
@@ -56,7 +56,7 @@ def get_vhosts(config: Path) -> list:
 
 def get_hostnames(vhost: str) -> list:
     # extract hostname(s) from server_name values
-    server_names = re.compile(r"^\s*server_name\s+(.*);$")
+    server_names = re.compile(r'^\s*server_name\s+(.*);$')
 
     hostnames = []
     for line in vhost.splitlines():
@@ -68,13 +68,13 @@ def get_hostnames(vhost: str) -> list:
 
 def get_ports(vhost: str) -> list:
     # extract port(s) from listen values
-    listens = re.compile(r"^\s*listen\s+(?:.*:)?(\d+)\s*(ssl)?.*;$")
+    listens = re.compile(r'^\s*listen\s+(?:.*:)?(\d+)\s*(ssl)?.*;$')
 
     ports = []
     for line in vhost.splitlines():
         if listens.match(line):
             ports.append(
-                (listens.match(line).group(1), "https" if listens.match(line).group(2) else "http")
+                (listens.match(line).group(1), 'https' if listens.match(line).group(2) else 'http')
             )
 
     return ports
@@ -92,28 +92,28 @@ def test_vhost(
     warnings = 0
     name = nginx_file.name
     for port, proto in ports_info or [(80, False)]:
-        for domain in domains or ["localhost"]:
-            url = f"{proto}://{domain}:{port}"
-            u.info(f"- testing url '{url}' from {name}")
+        for domain in domains or ['localhost']:
+            url = '%s://%s:%s' % (proto, domain, port)
+            u.info('- testing url "%s" from %s' % (url, name))
             # test domain IP
             ip_error = None
             ip_warning = None
             try:
                 ip = socket.gethostbyname(domain)
             except Exception as e:
-                ip_error = f"{domain} not resolved: {e}"
+                ip_error = '%s not resolved: %s' % (domain, e)
             else:
-                if ip != "127.0.0.1" and ip != "127.0.1.1":
-                    ip_warning = f"{domain} resolve to {ip} instead of 127.0.0.1"
+                if ip != '127.0.0.1' and ip != '127.0.1.1':
+                    ip_warning = '%s resolve to %s instead of 127.0.0.1' % (domain, ip)
             if ip_error:
                 if resolution_ignored and domain in resolution_ignored:
-                    u.info(f"{ip_error} (ignored)")
+                    u.info('%s (ignored)' % ip_error)
                     ip_error = None
                 else:
                     u.error(ip_error)
             elif ip_warning:
                 if resolution_ignored and domain in resolution_ignored:
-                    u.info(f"{ip_warning} (ignored)")
+                    u.info('%s (ignored)' % ip_warning)
                     ip_warning = None
                 else:
                     u.warning(ip_warning)
@@ -121,7 +121,7 @@ def test_vhost(
             req_error = False
             try:
                 req = requests.get(
-                    url, verify=False, proxies={"http": "", "https": ""}, timeout=30
+                    url, verify=False, proxies={'http': '', 'https': ''}, timeout=30
                 )
                 req_time = int(1000 * req.elapsed.total_seconds())
             except Exception as e:
@@ -130,24 +130,24 @@ def test_vhost(
             else:
                 code = req.status_code
             if (
-                domain != "localhost"
+                domain != 'localhost'
                 and code not in (200, 401, 403)
-                or domain == "localhost"
+                or domain == 'localhost'
                 and code not in (200, 401, 403, 404)
             ):
-                u.error(f"{domain} status: {code}, {req_time}ms")
+                u.error('%s status: %s, %s ms' % (domain, code, req_time))
                 req_error = True
             else:
                 if req_time > 10000:
-                    u.warning(f"{domain} status: {code}, {req_time}ms")
+                    u.warning('%s status: %s, %s ms' % (domain, code, req_time))
                     warnings += 1
-                if "mediaserver" in name and wowza_dir:
+                if 'mediaserver' in name and wowza_dir:
                     # test /streaming url
                     try:
                         req = requests.get(
-                            url + "/streaming/",
+                            url + '/streaming/',
                             verify=False,
-                            proxies={"http": "", "https": ""},
+                            proxies={'http': '', 'https': ''},
                             timeout=30,
                         )
                         req_time = int(1000 * req.elapsed.total_seconds())
@@ -157,10 +157,10 @@ def test_vhost(
                     else:
                         code = req.status_code
                     if code != 200:
-                        u.error(f"{domain} streaming: {code}, {req_time}ms")
+                        u.error('%s streaming: %s, %s ms' % (domain, code, req_time))
                         req_error = True
                     elif req_time > 10000:
-                        u.warning(f"{domain} streaming: {code}, {req_time}ms")
+                        u.warning('%s streaming: %s, %s ms' % (domain, code, req_time))
                         warnings += 1
             tested += 1
 
@@ -174,15 +174,15 @@ def test_vhost(
 
 
 def main():
-    print("Check that nginx vhosts are well configured:")
+    print('Check that nginx vhosts are well configured:')
     # check that Nginx dir exists
-    nginx_dir = "/etc/nginx/sites-enabled"
+    nginx_dir = '/etc/nginx/sites-enabled'
     if not Path(nginx_dir).exists():
-        u.info(f"nginx dir does not exists ('{nginx_dir}'), test skipped.")
+        u.info('nginx dir does not exists ("%s"), test skipped.' % nginx_dir)
         exit(2)
 
     # check that Wowza is installed
-    wowza_dir = "/usr/local/WowzaStreamingEngine"
+    wowza_dir = '/usr/local/WowzaStreamingEngine'
     if not Path(wowza_dir).exists():
         wowza_dir = None
 
@@ -190,7 +190,7 @@ def main():
     conf = u.load_conf()
 
     # get enabled vhosts
-    resolution_ignored = conf.get("TESTER_VHOST_RESOLUTION_IGNORED", "").split(",")
+    resolution_ignored = conf.get('TESTER_VHOST_RESOLUTION_IGNORED', '').split(',')
     errors = 0
     warnings = 0
     nginx_confs = get_configs(nginx_dir)
@@ -217,9 +217,9 @@ def main():
     elif warnings:
         exit(3)
     if not tested:
-        u.error("no url found in nginx sites-enabled dir")
+        u.error('no url found in nginx sites-enabled dir')
         exit(1)
 
 
-if __name__ == "__main__":
+if __name__ == '__main__':
     main()
diff --git a/tests/test_partitions.py b/tests/test_partitions.py
index 739ce159..9476315b 100755
--- a/tests/test_partitions.py
+++ b/tests/test_partitions.py
@@ -51,10 +51,10 @@ def to_gbytes(size_bytes):
 
 def get_memory_gbytes():
     memory_gbytes = 0
-    with open('/proc/meminfo', 'r') as f:
-        for l in f:
-            if 'MemTotal:' in l:
-                memory = l.split('MemTotal:')[1].strip()
+    with open('/proc/meminfo', 'r') as fo:
+        for line in fo:
+            if 'MemTotal:' in line:
+                memory = line.split('MemTotal:')[1].strip()
                 memory_kbytes, unit = memory.split(' ')
                 if unit != 'kB':
                     print('Warning, unexpected unit %s.' % unit)
@@ -80,10 +80,10 @@ def check_allocation(dev):
     if not root_dev:
         return True
     dev_partitions = list()
-    with open('/proc/partitions', 'r') as f:
-        for l in f:
-            if root_dev in l:
-                dev_partitions.append(l)
+    with open('/proc/partitions', 'r') as fo:
+        for line in fo:
+            if root_dev in line:
+                dev_partitions.append(line)
 
     max_size = 0
     total_size = 0
-- 
GitLab