Skip to content
Snippets Groups Projects
tester.py 21.48 KiB
#!/usr/bin/env python3

"""
Script to start tests and to manage their results
"""

import base64
from io import StringIO
import datetime
import os
import re
import subprocess
import sys
import uuid
import glob

import utils
from utils import log

OUT_OF_SUPPORT_TEXT = """\033[93mWarning:
The system is out of support, UbiCast will not be notified if errors are detected.
Please contact UbiCast sales team (sales@ubicast.eu) to renew the support contract.\033[0m"""


class Logger(object):
    def __init__(self, stream, log_buffer):
        self.stream = stream
        self.log_buffer = log_buffer

    def write(self, text):
        self.stream.write(text)
        self.stream.flush()
        self.log_buffer.write(text)
        self.log_buffer.flush()

    def flush(self):
        pass


log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout


def strip_colors(text):
    return re.sub(r"\033\[[\d;]+m", "", text)


def escape(text):
    html = text.strip()
    html = html.replace("<", "&lt;")
    html = html.replace(">", "&gt;")
    html = html.replace("\033[90m", '<span style="color: gray;">')
    html = html.replace("\033[91m", '<span style="color: red;">')
    html = html.replace("\033[92m", '<span style="color: green;">')
    html = html.replace("\033[93m", '<span style="color: orange;">')
    html = html.replace("\033[94m", '<span style="color: blue;">')
    html = html.replace("\033[95m", '<span style="color: purple;">')
    html = strip_colors(html)
    return html


def raid_idle():
    idle = True
    devs = glob.glob("/sys/block/md*/md/sync_action")
    for d in devs:
        with open(d, "r") as f:
            sync_state = f.read().strip()
            if sync_state != "idle":
                idle = False
                print("State in %s is %s" % (d, sync_state))
    return idle


class Tester:
    USAGE = (
        """%s [-e] [-f] [-b] [-n] [-d] [-h] [msuser]
    -e: send email with report.
    -f: send email with report only if at least one test failed.
    -b: run only basic tests (exclude mediaserver tests).
    -n: do not update envsetup repository.
    -p: do not install packages.
    -d: debug mode (can be started with non root users).
    -h: show this message."""
        % __file__
    )
    VALID_ARGS = ["-e", "-f", "-b", "-n", "-d", "-h"]
    MAX_LOG_FILES = 50
    NO_MAIL_FAILURES_COUNT = 5

    def __init__(self, *args):
        log("\033[96m-------------------------------\033[0m")
        log("\033[96m- UbiCast applications tester -\033[0m")
        log("\033[96m-------------------------------\033[0m")
        args = list(args)
        msuser = None
        # Check if help is required
        if "-h" in args:
            log("USAGE: " + self.USAGE)
            sys.exit(0)
        for arg in args:
            if arg.startswith("-"):
                if arg not in self.VALID_ARGS:
                    log('Invalid argument given: "%s".\n' % arg)
                    log("USAGE: " + self.USAGE)
                    sys.exit(1)
            else:
                log("Optional target user : %s" % arg)
                if not os.path.isdir(os.path.join("/home", arg)):
                    log("Mediaserver user %s does not exist" % arg)
                    sys.exit(1)
                else:
                    msuser = arg
        # Check current dir
        root_dir = utils.get_dir(__file__)
        if root_dir != "":
            os.chdir(root_dir)
        self.root_dir = root_dir
        # Add to python path
        if root_dir not in sys.path:
            sys.path.append(root_dir)
        # Check that this script is run by root
        debug = "-d" in args
        whoami = subprocess.check_output(["whoami"]).decode("utf-8").strip()
        if whoami != "root" and not debug:
            log("This script should be run as root user.")
            sys.exit(1)
        # Update envsetup files
        if "-n" not in args:
            tester_path = os.path.join(root_dir, os.path.basename(__file__))
            mtime = os.path.getmtime(tester_path)
            subprocess.call(["python3", "update_envsetup.py"])
            if mtime != os.path.getmtime(tester_path):
                log("The script has changed, restarting it...")
                os.execl("/usr/bin/python3", "python3", tester_path, "-n", *args)
                sys.exit(1)  # not reachable
        # Install utilities packages
        if "-p" not in args:
            subprocess.call(["python3", "pkgs_envsetup.py"])
        # Load conf
        conf = utils.load_conf()
        if not conf:
            log("No configuration loaded.")
            sys.exit(1)
        # Check for email value
        email = "-e" in args
        email_if_fail = "-f" in args
        basic_only = "-b" in args
        tests = self.discover_tests(basic_only, msuser)
        if not tests:
            sys.exit(1)

        if raid_idle():
            exit_code = self.run_tests(tests, email, email_if_fail)
        else:
            print("A RAID check or operation is in progress, aborting tests")
            exit_code = 1
        sys.exit(exit_code)

    def parse_file_header(self, path):
        with open(path, "r") as fo:
            content = fo.read()
        description = ""
        if path.endswith(".py"):
            start = (
                content.find("'''")
                if content.find("'''") != -1
                else content.find('"""')
            )
            if start > 0:
                start += 3
                end = (
                    content.find("'''", start)
                    if content.find("'''", start) != -1
                    else content.find('"""', start)
                )
                if end > 0:
                    description = content[start:end]
        else:
            for line in content.split("\n"):
                if line.startswith("#!"):
                    continue
                elif line.startswith("#"):
                    description += line[1:].strip() + "\n"
                else:
                    break
        description = description.strip()
        if description.startswith("Criticality:"):
            criticality, *description = description.split("\n")
            criticality = criticality[len("Criticality:") :].strip()  # noqa: E203
            description = "\n".join(description)
        else:
            criticality = "not specified"
        return criticality, description

    def discover_tests(self, basic_only=False, msuser=None):
        ignored_tests = utils.get_conf("TESTER_IGNORED_TESTS", "").split(",")
        ignored_tests.append("__init__.py")
        # Get standard tests
        path = os.path.join(self.root_dir, "tests")
        if not os.path.isdir(path):
            log('The tests dir is missing ("%s").' % path)
            return
        names = os.listdir(path)
        names.sort()
        if not names:
            log('The tests dir is empty ("%s").' % path)
            return
        criticalities_map = {"Low": 1, "Normal": 2, "High": 3}
        tests = list()
        for name in names:
            if name in ignored_tests:
                continue
            test_path = os.path.join(path, name)
            if os.path.isfile(test_path):
                criticality, description = self.parse_file_header(test_path)
                tests.append((name, criticality, description, [test_path]))
        if basic_only:
            tests.sort(key=lambda i: (-criticalities_map.get(i[1], 0), i[0]))
            return tests
        elif msuser:
            tests = list()
        # Get MS instances
        ms_users = list()
        for user in os.listdir("/home"):
            if os.path.exists("/home/%s/msinstance" % user) and (
                not msuser or user == msuser
            ):
                ms_users.append(user)
        # Get MediaServer tests
        if ms_users:
            ms_users.sort()
            cleaned_list = list()
            instances_to_test = utils.get_conf("TESTER_MS_INSTANCES", "").split(",")
            if instances_to_test:
                for val in instances_to_test:
                    val = val.strip()
                    if not val:
                        continue
                    if val in ms_users:
                        cleaned_list.append(val)
                    else:
                        log(
                            'An inexisting instance has been requested for tests: "%s".'
                            % val
                        )
            if cleaned_list:
                ms_users = cleaned_list
            else:
                try:
                    max_instances = int(utils.get_conf("TESTER_MAX_INSTANCES") or 2)
                except Exception as e:
                    log("TESTER_MAX_INSTANCES has an invalid value: %s" % e)
                    max_instances = 2
                if len(ms_users) > max_instances:
                    ms_users = ms_users[:max_instances]
            log("Instances that will be tested: %s." % ", ".join(ms_users))
            # Clone testing suite
            ms_path = os.path.join(path, "ms-testing-suite")
            if not os.path.exists(ms_path):
                log('Cloning ms-testing-suite in "%s".' % ms_path)
                subprocess.call(
                    [
                        "git",
                        "clone",
                        "--recursive",
                        "https://panel.ubicast.eu/git/mediaserver/ms-testing-suite.git",
                        ms_path,
                    ]
                )
            if os.path.exists(ms_path):
                log('Updating ms-testing-suite in "%s".' % ms_path)
                os.chdir(ms_path)
                branch = utils.get_conf("ENVSETUP_BRANCH") or "stable"
                if branch:
                    subprocess.call(["git", "checkout", branch])
                subprocess.call(["git", "fetch", "--recurse-submodules", "--all"])
                subprocess.call(["git", "reset", "--hard", "origin/{}".format(branch)])
                subprocess.call(["git", "pull", "--recurse-submodules"])
                subprocess.call(["git", "submodule", "update", "--init", "--recursive"])
                os.chdir(self.root_dir)
            # Add tests to list
            log("Add MediaServer tests if available.")
            wowza_dir = "/usr/local/WowzaStreamingEngine"
            etc_lives_conf = "/etc/mediaserver/lives_conf.py"
            local_lives_conf = "/home/%s/msinstance/conf/lives_conf.py"
            for user in ms_users:
                ms_tests = ["ms_vod_tester.py", "test_caches.py"]
                # Check if live tests should be started
                if (
                    os.path.exists(wowza_dir)
                    or os.path.exists(etc_lives_conf)
                    or os.path.exists(local_lives_conf % user)
                ):
                    ms_tests.append("test_wowza_secure.py")
                    ms_tests.append("ms_live_tester.py")
                for name in ms_tests:
                    if name in ignored_tests:
                        continue
                    test_path = os.path.join(ms_path, name)
                    criticality, description = self.parse_file_header(test_path)
                    tests.append(
                        (
                            "%s (%s)" % (name, user),
                            criticality,
                            description,
                            [test_path, user],
                        )
                    )
        tests.sort(key=lambda i: (-criticalities_map.get(i[1], 0), i[0]))
        return tests

    def run_tests(self, tests, email=False, email_if_fail=False):  # noqa: C901

        # Run all tests
        successes = 0
        failures = 0
        total_duration = None
        report_rows = [("Test", "Criticality", "Result", "Duration", "Description")]
        report_rows_length = [len(t) for t in report_rows[0]]
        out_of_support = False
        for name, criticality, description, command in tests:
            log('\033[1;95m-- Test "%s" --\033[0;0m' % name)
            start_date = datetime.datetime.utcnow()
            log("Test start: %s UTC." % start_date.strftime("%Y-%m-%d %H:%M:%S"))
            # Run test
            p = subprocess.Popen(
                command,
                stdin=sys.stdin,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
            )
            out, err = p.communicate()
            if out:
                out = out.decode("utf-8").strip()
                out_of_support = out_of_support or "out of support" in out
                log(out)
            if p.returncode == 0:
                status = "\033[92msuccess\033[0m"
                successes += 1
            elif p.returncode == 2:
                status = "\033[94mnot testable\033[0m"
            elif p.returncode == 3:
                status = "\033[93mwarning\033[0m"
            else:
                status = "\033[91mfailure\033[0m"
                failures += 1
                log("Command exited with code %s." % p.returncode)
            # Get duration
            end_date = datetime.datetime.utcnow()
            duration = end_date - start_date
            if total_duration:
                total_duration += duration
            else:
                total_duration = duration
            log(
                "Test end: %s UTC (duration: %s)."
                % (end_date.strftime("%Y-%m-%d %H:%M:%S"), duration)
            )
            # Prepare report
            report_rows.append((name, criticality, status, str(duration), description))
            report_rows_length = [
                max(len(strip_colors(t)), report_rows_length[i])
                for i, t in enumerate(report_rows[-1])
            ]
        # Display results
        #     results as text
        log("\nTests results:")
        log_report = ""
        for row in report_rows:
            if not log_report:
                log_report += "-" * 50
            for i, val in enumerate(row):
                if i == len(row) - 1:
                    break
                if i == 0:
                    # merge name and description
                    log_report += "\n\033[96m%s\033[0m  \033[37m%s\033[0m\n" % (
                        val,
                        row[-1],
                    )
                else:
                    nb_sp = report_rows_length[i] - len(strip_colors(val))
                    log_report += "  %s%s" % (val, " " * nb_sp)
            log_report += "\n" + "-" * 50
        if out_of_support:
            log_report = OUT_OF_SUPPORT_TEXT + "\n" + log_report
        log(log_report.strip())
        log("Total tests duration: %s.\n" % total_duration)
        #     results as html
        html_report = ""
        for row in report_rows:
            html_cell = "th" if not html_report else "td"
            html_report += "\n <tr>"
            for i, val in enumerate(row):
                html_report += " <%s>%s</%s>" % (html_cell, escape(val), html_cell)
            html_report += " </tr>"
        html_report = '<table border="1">%s\n</table>' % html_report
        if out_of_support:
            html_report = "<p>" + escape(OUT_OF_SUPPORT_TEXT) + "</p>\n" + html_report
        # Store locally results
        now = datetime.datetime.utcnow()
        log_dir = os.path.join(self.root_dir, "log")
        if not os.path.exists(log_dir):
            os.makedirs(log_dir)
        history_file = os.path.join(log_dir, "tests_history.txt")
        add_header = not os.path.exists(history_file)
        with open(history_file, "a") as fo:
            if add_header:
                fo.write("Date | Result | Succeeded | Failed | Not testable\n")
            fo.write(
                "%s | %s | %s | %s | %s\n"
                % (
                    now.strftime("%Y-%m-%d %H:%M:%S"),
                    "KO" if failures > 0 else "OK",
                    successes,
                    failures,
                    len(tests) - successes - failures,
                )
            )
        # Search for old logs to remove
        names = os.listdir(log_dir)
        names.sort()
        for name in list(names):
            if not name.startswith("results_"):
                names.remove(name)
        while len(names) > self.MAX_LOG_FILES - 1:
            name = names.pop(0)
            try:
                log('Removing old log "%s".' % os.path.join(log_dir, name))
                os.remove(os.path.join(log_dir, name))
            except Exception as e:
                log("Failed to remove old log: %s" % e)
        # Write log to file
        hostname = subprocess.check_output(["hostname"])
        if hostname:
            hostname = hostname.decode("utf-8").strip()
        else:
            log("Failed to get hostname (required to send email).")
        log_name = "results_%s_%s.txt" % (
            hostname or "noname",
            now.strftime("%Y-%m-%d_%H-%M-%S"),
        )
        log_content = strip_colors(log_buffer.getvalue())
        with open(os.path.join(log_dir, log_name), "w") as fo:
            fo.write(log_content)
        # Send email
        send_email = False
        if hostname:
            if email:
                send_email = True
            elif email_if_fail and failures > 0:
                # if they were too many consecutive failures, do not send the email
                with open(history_file, "r") as fo:
                    history_content = fo.read()
                lines = history_content.split("\n")
                lines.reverse()
                consecutive_failures = 0
                for line in lines:
                    if line:
                        if "KO" in line:
                            consecutive_failures += 1
                        else:
                            break
                if consecutive_failures == self.NO_MAIL_FAILURES_COUNT:
                    consecutive_msg = (
                        "Maximum consecutive tester failures reached (%s).\nNo more emails will be sent."
                        % consecutive_failures
                    )
                    send_email = True
                elif consecutive_failures < self.NO_MAIL_FAILURES_COUNT:
                    consecutive_msg = (
                        "Consecutive tester failures: %s (will stop sending reports when reaching %s failures)."
                        % (consecutive_failures, self.NO_MAIL_FAILURES_COUNT)
                    )
                    send_email = True
                else:
                    consecutive_msg = (
                        "Too many consecutive tester failures: %s, no email will be sent."
                        % consecutive_failures
                    )
                log(consecutive_msg)
                html_report += "\n<br/>" + consecutive_msg.replace("\n", "\n<br/>")
        if send_email:
            sender = utils.get_conf("EMAIL_SENDER", "root@%s" % hostname)
            recipients = utils.get_conf("EMAIL_ADMINS") or ""
            system_domain = utils.get_conf("MS_SERVER_NAME")
            system_type = "MediaServer"
            if system_domain == "mediaserver":
                system_domain = utils.get_conf("CM_SERVER_NAME")
                system_type = "MirisManager"
                if system_domain == "mirismanager":
                    system_domain = utils.get_conf("MONITOR_SERVER_NAME")
                    system_type = "Server"
                    if system_domain == "monitor":
                        system_type = "-"
            if out_of_support:
                recipients = recipients.replace("sysadmin@ubicast.eu", "").replace(
                    ",,", ","
                )
            elif utils.get_conf("PREMIUM_SUPPORT") != "0":
                system_domain = "[PREMIUM] %s" % system_domain
                recipients = recipients.replace("sysadmin@ubicast.eu", "").replace(
                    ",,", ","
                )
                recipients += ",sysadmin+premium@ubicast.eu"
            recipients = recipients.strip(",")
            if not recipients:
                log(
                    "No recipients defined for email sending. Set a value for EMAIL_ADMINS."
                )
                return 1
            boundary = str(uuid.uuid4())
            mail = """From: %(hostname)s <%(sender)s>
To: %(recipients)s
Subject: %(system_domain)s (%(hostname)s) %(system_type)s health report: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"

--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8

<p><b>Date: %(date)s UTC</b></p>
%(report)s

--%(boundary)s
Content-type: text/plain; name="%(log_name)s"; charset=UTF-8
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: base64

%(log_content)s""" % dict(
                boundary=boundary,
                sender=sender,
                hostname=hostname,
                recipients=recipients,
                status=("KO (%s tests failed)" % failures) if failures > 0 else "OK",
                date=now.strftime("%Y-%m-%d %H:%M:%S"),
                report=html_report,
                log_name=log_name,
                log_content=base64.b64encode(log_content.encode("utf-8")).decode(),
                system_domain=system_domain,
                system_type=system_type,
            )
            p = subprocess.Popen(
                ["/usr/sbin/sendmail", "-t"],
                stdin=subprocess.PIPE,
                stdout=sys.stdout.stream,
                stderr=sys.stderr.stream,
            )
            p.communicate(input=mail.encode("utf-8"))
            if p.returncode != 0:
                log("Failed to send email.")
                return 1
            else:
                log("Email sent to: %s" % recipients)
        exit_code = 1 if failures > 0 else 0
        return exit_code


if __name__ == "__main__":
    Tester(*sys.argv[1:])