Skip to content
Snippets Groups Projects
tester.py 22 KiB
Newer Older
#!/usr/bin/env python3

Script to start tests and to manage their results

from io import StringIO
import argparse
import base64
import datetime
import glob
import os
import socket
import subprocess
import sys
import time
import uuid

from utilities.config import load_conf, get_conf
from utilities.logging import strip_colors, escape_html
from utilities.os import get_dir
from utilities.systemd import check_systemd_setup
OUT_OF_SUPPORT_TEXT = '''\033[93mWarning:
The system is out of support, UbiCast will not be notified if errors are detected.
Please contact UbiCast sales team (sales@ubicast.eu) to renew the support contract.\033[0m'''


class Logger(object):
    def __init__(self, stream, log_buffer):
        self.stream = stream
        self.log_buffer = log_buffer

    def write(self, text):
        self.stream.write(text)
        self.stream.flush()
        self.log_buffer.write(text)
        self.log_buffer.flush()

    def flush(self):
        pass


log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout


def raid_idle():
    idle = True
    devs = glob.glob('/sys/block/md*/md/sync_action')
    for d in devs:
        with open(d, 'r') as f:
            sync_state = f.read().strip()
            if sync_state != 'idle':
                idle = False
                print('State in %s is %s' % (d, sync_state))
    return idle


class Tester():
    MAX_LOG_FILES = 50

    def __init__(self):
        print('\033[96m-------------------------------\033[0m')
        print('\033[96m- UbiCast applications tester -\033[0m')
        print('\033[96m-------------------------------\033[0m')
        # parse args
        parser = argparse.ArgumentParser(description=__doc__.strip())
        parser.add_argument('-d', '--debug', dest='debug', action='store_true', help='Debug mode (can be started with non root users).')
        parser.add_argument('-e', '--email', dest='send_email', action='store_true', help='Send tests report by email.')
        parser.add_argument('-b', '--basic', dest='basic_tests', action='store_true', help='Run only basic tests (exclude mediaserver tests).')
        parser.add_argument('-n', '--no-update', dest='no_update', action='store_true', help='Do not update envsetup repository.')
        parser.add_argument('-p', '--no-packages', dest='no_packages', action='store_true', help='Do not install packages.')
        parser.add_argument('msuser', nargs='?', help='The unix user of the MediaServer instance to test. Default is user specified in configuration or all users if not set.')
        args = parser.parse_args()
        # Check current dir
        root_dir = get_dir(__file__)
        if root_dir != '':
            os.chdir(root_dir)
        self.root_dir = root_dir
        # Add to python path
        if root_dir not in sys.path:
            sys.path.append(root_dir)
        # Check that this script is run by root
        if os.getuid() != 0 and not args.debug:
            print('This script should be run as root user.')
            sys.exit(1)
        # Update envsetup files
        if not args.no_update:
            tester_path = os.path.join(root_dir, os.path.basename(__file__))
            mtime = os.path.getmtime(tester_path)
            subprocess.run(['python3', 'update_envsetup.py'], timeout=1800)
            if mtime != os.path.getmtime(tester_path):
                print('The script has changed, restarting it...')
                os.execl('/usr/bin/python3', 'python3', tester_path, '-n', *sys.argv[1:])
                sys.exit(1)  # not reachable
        # Install utilities packages
        if not args.no_packages:
            subprocess.run(['python3', 'pkgs_envsetup.py'], timeout=1800)
        # Load conf
        conf = load_conf()
        if not conf:
            print('No configuration loaded.')
            sys.exit(1)
        # Check RAID status
        if not raid_idle():
            print('A RAID check or operation is in progress, aborting tests')
        tests = self.discover_tests(args.basic_tests, msuser=args.msuser, no_update=args.no_update)
        if not tests:
            print('No test to run.')
            sys.exit(1)
        # Print system info
        self.print_system_info()
        # Create logs dir
        self.logs_dir = os.path.join(self.root_dir, 'logs')
        os.makedirs(self.logs_dir, exist_ok=True)
        print('Logs dir is "%s".' % self.logs_dir)
        # Check systemd service and timer
        check_systemd_setup()
        now, failures, out_of_support, log_content, html_report = self.run_tests(tests)
        if args.send_email:
            failures += self.send_report_email(now, failures, out_of_support, log_content, html_report)
        sys.exit(1 if failures > 0 else 0)
    def print_system_info(self):
        print('System information:')
        print('- Date: %s UTC.' % datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'))
        print('- FQDN: %s.' % socket.getfqdn())
        p = subprocess.run(['ip', '-br', 'addr'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=1800)
        print('- IP config:\n%s' % p.stdout.decode('utf-8'))
    def get_log_name(self, now):
        hostname = socket.gethostname()
        log_name = 'results_%s_%s.log' % (
            hostname or 'noname',
            now.strftime('%Y-%m-%d_%H-%M-%S'),
        )
        return log_name

    def parse_file_header(self, path):
        with open(path, 'r') as fo:
            content = fo.read()
        description = ''
        if path.endswith('.py'):
            start = (
                content.find("'''")
                if content.find("'''") != -1
                else content.find('"""')
            )
            if start > 0:
                start += 3
                end = (
                    content.find("'''", start)
                    if content.find("'''", start) != -1
                    else content.find('"""', start)
                )
                if end > 0:
                    description = content[start:end]
        else:
            for line in content.split('\n'):
                if line.startswith('#!'):
                elif line.startswith('#'):
                    description += line[1:].strip() + '\n'
                else:
                    break
        description = description.strip()
        if description.startswith('Criticality:'):
            criticality, *description = description.split('\n')
            criticality = criticality[len('Criticality:') :].strip()  # noqa: E203
            description = '\n'.join(description)
            criticality = 'not specified'
        return criticality, description

    def discover_tests(self, basic_only=False, msuser=None, no_update=False):
        ignored_tests = get_conf('TESTER_IGNORED_TESTS', '').split(',')
        ignored_tests.append('__init__.py')
        if basic_only:
            tests = self.discover_basic_tests(ignored_tests)
        elif msuser:
            tests = self.discover_mediaserver_tests(msuser, no_update, ignored_tests)
        else:
            tests = self.discover_basic_tests(ignored_tests)
            tests.extend(self.discover_mediaserver_tests(msuser, no_update, ignored_tests))
        criticalities_map = {'Low': 1, 'Normal': 2, 'High': 3}
        tests.sort(key=lambda i: (-criticalities_map.get(i[1], 0), i[0]))
        return tests

    def discover_basic_tests(self, ignored_tests=None):
        # Get standard tests
        test_dir = os.path.join(self.root_dir, 'scripts')
        if not os.path.isdir(test_dir):
            print('The tests dir is missing ("%s").' % test_dir)
            return list()
        names = os.listdir(test_dir)
        names.sort()
        if not names:
            print('The tests dir is empty ("%s").' % test_dir)
            return list()
        tests = list()
        for name in names:
            if ignored_tests and name in ignored_tests:
            test_path = os.path.join(test_dir, name)
            if os.path.isfile(test_path):
                criticality, description = self.parse_file_header(test_path)
                tests.append((name, criticality, description, [test_path], None))
        return tests

    def discover_mediaserver_tests(self, msuser=None, no_update=False, ignored_tests=None):
        # Get MS instances
        ms_users = list()
        for user in os.listdir('/home'):
            if os.path.exists('/home/%s/msinstance' % user) and (
                not msuser or user == msuser
            ):
                ms_users.append(user)
        if not ms_users:
            return list()
        ms_users.sort()
        cleaned_list = list()
        instances_to_test = get_conf('TESTER_MS_INSTANCES', '').split(',')
        if instances_to_test:
            for val in instances_to_test:
                val = val.strip()
                if not val:
                    continue
                if val in ms_users:
                    cleaned_list.append(val)
                else:
                    print(
                        'An inexisting instance has been requested for tests: "%s".'
                        % val
        if cleaned_list:
            ms_users = cleaned_list
        else:
            try:
                max_instances = int(get_conf('TESTER_MAX_INSTANCES') or 2)
                print('TESTER_MAX_INSTANCES has an invalid value: %s' % e)
                max_instances = 2
            if len(ms_users) > max_instances:
                ms_users = ms_users[:max_instances]
        print('Instances that will be tested: %s.' % ', '.join(ms_users))
        ms_path = os.path.join(self.root_dir, 'scripts', 'ms-testing-suite')
        if not os.path.exists(ms_path):
            print('Cloning ms-testing-suite in "%s".' % ms_path)
            subprocess.run([
                'git',
                'clone',
                '--recursive',
                'https://mirismanager.ubicast.eu/git/mediaserver/ms-testing-suite.git',
                ms_path,
            ], timeout=1800)
        if not os.path.exists(ms_path):
            print('The ms-testing-suite dir "%s" does not exist, no MediaServer test will be run.' % ms_path)
            return list()
        # Update testing suite if allowed
        if not no_update:
            print('Updating ms-testing-suite in "%s".' % ms_path)
            os.chdir(ms_path)
            branch = get_conf('ENVSETUP_BRANCH') or 'stable'
                subprocess.run(['git', 'checkout', branch], timeout=1800)
            subprocess.run(['git', 'fetch', '--recurse-submodules', '--all'], timeout=1800)
            subprocess.run(['git', 'reset', '--hard', 'origin/{}'.format(branch)], timeout=1800)
            subprocess.run(['git', 'pull', '--recurse-submodules'], timeout=1800)
            subprocess.run(['git', 'submodule', 'update', '--init', '--recursive'], timeout=1800)
            os.chdir(self.root_dir)
        # Build tests list
        print('Add MediaServer tests if available.')
        wowza_dir = '/usr/local/WowzaStreamingEngine'
        etc_lives_conf = '/etc/mediaserver/lives.json'
        local_lives_conf = '/home/%s/msinstance/conf/lives.json'
        old_etc_lives_conf = '/etc/mediaserver/lives_conf.py'
        old_local_lives_conf = '/home/%s/msinstance/conf/lives_conf.py'
        tests = list()
        for user in ms_users:
            ms_tests = ['ms_vod_tester.py', 'test_caches.py']
            # Check if live tests should be started
            if (
                os.path.exists(wowza_dir)
                or os.path.exists(etc_lives_conf)
                or os.path.exists(local_lives_conf % user)
                or os.path.exists(old_etc_lives_conf)
                or os.path.exists(old_local_lives_conf % user)
            ):
                ms_tests.append('test_wowza_secure.py')
                ms_tests.append('ms_live_tester.py')
            ignore_rules = get_conf('TESTER_IGNORE_ROUTING_RULES', '0')
            for name in ms_tests:
                if ignored_tests and name in ignored_tests:
                    continue
                test_path = os.path.join(ms_path, name)
                if os.path.exists(test_path):
                    criticality, description = self.parse_file_header(test_path)
                    tests.append((
                        '%s (%s)' % (name, user),
                        criticality,
                        description,
                        [test_path, user],
                        {'IGNORE_ROUTING_RULES': ignore_rules},
        # Run all tests
        successes = 0
        failures = 0
        total_duration = None
        report_rows = [('Test', 'Criticality', 'Result', 'Duration', 'Description')]
        report_rows_length = [len(t) for t in report_rows[0]]
        out_of_support = False
        for name, criticality, description, command, env in tests:
            print('\033[1;95m-- Test "%s" --\033[0;0m' % name)
            start_date = datetime.datetime.utcnow()
            print('Test start: %s UTC.' % start_date.strftime('%Y-%m-%d %H:%M:%S'))
            # Run test
            count = 0
            while count < 3:
                    wait_time = 5 * count * count
                    print('Waiting %s s...' % wait_time)
                    time.sleep(wait_time)
                count += 1
                print('Attempt: %s' % str(count))
                test_env = dict(os.environ)
                if env:
                    test_env.update(env)
                try:
                    p = subprocess.run(
                        command,
                        env=test_env,
                        stdin=subprocess.DEVNULL,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.STDOUT,
                        timeout=1800,
                    )
                    out = p.stdout.decode('utf-8', 'replace').strip()
                    print(out)
                    out_of_support = out_of_support or 'out of support' in out
                    returncode = p.returncode
                    if returncode in (0, 2, 3):
                        break
                except Exception as e:
                    print('Command failed: %s' % e)
                    returncode = None
            if returncode == 0:
                status = '\033[92msuccess\033[0m'
                successes += 1
                status = '\033[94mnot testable\033[0m'
                status = '\033[93mwarning\033[0m'
                status = '\033[91mfailure\033[0m'
                failures += 1
                print('Command exited with code %s.' % returncode)
            # Get duration
            end_date = datetime.datetime.utcnow()
            duration = end_date - start_date
            if total_duration:
                total_duration += duration
            else:
                total_duration = duration
            print(
                'Test end: %s UTC (duration: %s).'
                % (end_date.strftime('%Y-%m-%d %H:%M:%S'), duration)
            )
            # Prepare report
            report_rows.append((name, criticality, status, str(duration), description))
            report_rows_length = [
                max(len(strip_colors(t)), report_rows_length[i])
                for i, t in enumerate(report_rows[-1])
            ]
        # Display results
        #     results as text
        print('\nTests results:')
        log_report = ''
        for row in report_rows:
            if not log_report:
                log_report += '-' * 50
            for i, val in enumerate(row):
                if i == len(row) - 1:
                    break
                if i == 0:
                    # merge name and description
                    log_report += '\n\033[96m%s\033[0m  \033[37m%s\033[0m\n' % (
                        val,
                        row[-1],
                    )
                else:
                    nb_sp = report_rows_length[i] - len(strip_colors(val))
                    log_report += '  %s%s' % (val, ' ' * nb_sp)
            log_report += '\n' + '-' * 50
        if out_of_support:
            log_report = OUT_OF_SUPPORT_TEXT + '\n' + log_report
        print(log_report.strip())
        print('Total tests duration: %s.\n' % total_duration)
        #     results as html
        html_report = ''
        for row in report_rows:
            html_cell = 'th' if not html_report else 'td'
            html_report += '\n <tr>'
            for i, val in enumerate(row):
                html_report += ' <%s>%s</%s>' % (html_cell, escape_html(val), html_cell)
            html_report += ' </tr>'
        html_report = '<table border="1">%s\n</table>' % html_report
        if out_of_support:
            html_report = '<p>' + escape_html(OUT_OF_SUPPORT_TEXT) + '</p>\n' + html_report
        # Store locally results
        now = datetime.datetime.utcnow()
        history_file = os.path.join(self.logs_dir, 'tests_history.txt')
        add_header = not os.path.exists(history_file)
        with open(history_file, 'a') as fo:
            if add_header:
                fo.write('Date | Result | Succeeded | Failed | Not testable\n')
            fo.write('%s | %s | %s | %s | %s\n' % (
                now.strftime('%Y-%m-%d %H:%M:%S'),
                'KO' if failures > 0 else 'OK',
                successes,
                failures,
                len(tests) - successes - failures,
            ))
        # Search for old logs to remove
        names.sort()
        for name in list(names):
            if not name.startswith('results_'):
                names.remove(name)
        while len(names) > self.MAX_LOG_FILES - 1:
            name = names.pop(0)
            try:
                print('Removing old log "%s".' % os.path.join(self.logs_dir, name))
                os.remove(os.path.join(self.logs_dir, name))
            except Exception as e:
                print('Failed to remove old log: %s' % e)
        # Write log to file
        log_content = strip_colors(log_buffer.getvalue())
        with open(os.path.join(self.logs_dir, self.get_log_name(now)), 'w') as fo:
            fo.write(log_content)
        return now, failures, out_of_support, log_content, html_report

    def send_report_email(self, now, failures, out_of_support, log_content, html_report):
        hostname = socket.gethostname()
        if not hostname:
            print('Failed to get hostname (required to send email).')
        fqdn = socket.getfqdn()
        log_content_encoding = 'utf-8'
        # Get sender and recipients
        recipients = get_conf('EMAIL_ADMINS') or ''
        system_domain = get_conf('MS_SERVER_NAME')
        system_type = 'MediaServer'
        if not system_domain or system_domain == 'mediaserver':
            system_domain = get_conf('CM_SERVER_NAME')
            system_type = 'MirisManager'
            if not system_domain or system_domain == 'mirismanager':
                system_domain = get_conf('MONITOR_SERVER_NAME')
                system_type = 'Server'
                if not system_domain or system_domain == 'monitor':
                    system_domain = fqdn
        if '.' in system_domain:
            top_domain = '.'.join(system_domain.split('.')[-2:])
        elif '.' in fqdn:
            top_domain = '.'.join(fqdn.split('.')[-2:])
        else:
            top_domain = system_domain + '.local'
        sender = hostname + '@' + top_domain
        print('Sender address: %s' % sender)
        # Prepare email contant
        if out_of_support:
            system_domain = '[OUT OF SUPPORT] %s' % system_domain
            recipients = recipients.replace('sysadmin@ubicast.eu', '').replace(
                ',,', ','
            )
        elif get_conf('PREMIUM_SUPPORT') != '0':
            system_domain = '[PREMIUM] %s' % system_domain
            recipients = recipients.replace('sysadmin@ubicast.eu', '').replace(
                ',,', ','
            )
            recipients += ',sysadmin+premium@ubicast.eu'
        recipients = recipients.strip(',')
        if not recipients:
            print('No recipients defined for email sending. Set a value for EMAIL_ADMINS.')
            return 0
        boundary = str(uuid.uuid4())
        if get_conf('TESTER_BASE64_ATTACH') != '0':
            log_content_encoding = 'base64'
            log_content = base64.b64encode(log_content.encode('utf-8')).decode()
        mail = '''From: %(hostname)s <%(sender)s>
To: %(recipients)s
Subject: %(system_domain)s (%(hostname)s) %(system_type)s health report: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"

--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8

<p><b>Date: %(date)s UTC</b></p>
%(report)s

--%(boundary)s
Content-type: text/plain; name="%(log_name)s"; charset=UTF-8
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: %(log_content_encoding)s

%(log_content)s''' % dict(
            boundary=boundary,
            sender=sender,
            hostname=hostname,
            recipients=recipients,
            status=('KO (%s tests failed)' % failures) if failures > 0 else 'OK',
            date=now.strftime('%Y-%m-%d %H:%M:%S'),
            report=html_report,
            log_name=self.get_log_name(now).replace('.log', '.txt'),
            log_content_encoding=log_content_encoding,
            log_content=log_content,
            system_domain=system_domain,
            system_type=system_type,
        )
        p = subprocess.Popen(
            ['/usr/sbin/sendmail', '-t'],
            stdin=subprocess.PIPE,
            stdout=sys.stdout.stream,
            stderr=sys.stderr.stream,
        )
        p.communicate(input=mail.encode('utf-8'), timeout=1800)
        if p.returncode != 0:
            print('Failed to send email.')
            return 1
        else:
            print('Email sent to: %s' % recipients)
            return 0
if __name__ == '__main__':