Skip to content
Snippets Groups Projects
tester.py 19.6 KiB
Newer Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Script to start tests and to manage their results
'''
from io import StringIO
import datetime
OUT_OF_SUPPORT_TEXT = '''\033[93mWarning:
The system is out of support, UbiCast will not be notified if errors are detected.
Please contact UbiCast sales team (sales@ubicast.eu) to renew the support contract.\033[0m'''

class Logger(object):
    def __init__(self, stream, log_buffer):
        self.stream = stream
        self.log_buffer = log_buffer

    def write(self, text):
        self.stream.write(text)
        self.stream.flush()
        self.log_buffer.write(text)
        self.log_buffer.flush()

    def flush(self):
        pass

log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout


def strip_colors(text):
    return re.sub(r'\033\[[\d;]+m', '', text)


def escape(text):
    html = text.strip()
    html = html.replace('<', '&lt;')
    html = html.replace('>', '&gt;')
    html = html.replace('\033[90m', '<span style="color: gray;">')
    html = html.replace('\033[91m', '<span style="color: red;">')
    html = html.replace('\033[92m', '<span style="color: green;">')
    html = html.replace('\033[93m', '<span style="color: orange;">')
    html = html.replace('\033[94m', '<span style="color: blue;">')
    html = html.replace('\033[95m', '<span style="color: purple;">')
    html = strip_colors(html)
    return html


def raid_idle():
    idle = True
    devs = glob.glob('/sys/block/md*/md/sync_action')
    for d in devs:
        with open(d, 'r') as f:
            sync_state = f.read().strip()
            if sync_state != "idle":
                idle = False
                print("State in %s is %s" % (d, sync_state))
    return idle


    USAGE = '''%s [-e] [-f] [-b] [-n] [-d] [-h] [msuser]
    -e: send email with report.
    -f: send email with report only if at least one test failed.
    -b: run only basic tests (exclude mediaserver tests).
    -n: do not update envsetup repository.
    -d: debug mode (can be started with non root users).
    -h: show this message.''' % __file__
    VALID_ARGS = ['-e', '-f', '-b', '-n', '-d', '-h']
    MAX_LOG_FILES = 50
    NO_MAIL_FAILURES_COUNT = 5

    def __init__(self, *args):
        log('\033[96m-------------------------------\033[0m')
        log('\033[96m- UbiCast applications tester -\033[0m')
        log('\033[96m-------------------------------\033[0m')
        args = list(args)
        # Check if help is required
        if '-h' in args:
            log('USAGE: ' + self.USAGE)
            sys.exit(0)
        for arg in args:
            if arg.startswith('-'):
                if arg not in self.VALID_ARGS:
                    log('Invalid argument given: "%s".\n' % arg)
                    log('USAGE: ' + self.USAGE)
                    sys.exit(1)
                log('Optional target user : %s' % arg)
                if not os.path.isdir(os.path.join('/home', arg)):
                    log('Mediaserver user %s does not exist' % arg)
                    sys.exit(1)
                else:
                    msuser = arg
        # Check current dir
        root_dir = utils.get_dir(__file__)
        if root_dir != '':
            os.chdir(root_dir)
        self.root_dir = root_dir
        # Add to python path
        if root_dir not in sys.path:
            sys.path.append(root_dir)
        # Check that this script is run by root
        debug = '-d' in args
        whoami = subprocess.check_output(['whoami']).decode('utf-8').strip()
Stéphane Diemer's avatar
Stéphane Diemer committed
        if whoami != 'root' and not debug:
            log('This script should be run as root user.')
            sys.exit(1)
        # Update envsetup files
        if '-n' not in args:
            tester_path = os.path.join(root_dir, os.path.basename(__file__))
            mtime = os.path.getmtime(tester_path)
            subprocess.call(['python3', 'update_envsetup.py'])
            if mtime != os.path.getmtime(tester_path):
                log('The script has changed, restarting it...')
                os.execl('/usr/bin/python3', 'python3', tester_path, '-n', *args)
                sys.exit(1)  # not reachable
        # Load conf
        conf = utils.load_conf()
        if not conf:
            log('No configuration loaded.')
            sys.exit(1)
        # Check for email value
        email = '-e' in args
        email_if_fail = '-f' in args
        basic_only = '-b' in args
        tests = self.discover_tests(basic_only, msuser)
        if not tests:
            sys.exit(1)

        if raid_idle():
            exit_code = self.run_tests(tests, email, email_if_fail)
        else:
            print('A RAID check or operation is in progress, aborting tests')
            exit_code = 1
        with open(path, 'r') as fo:
            content = fo.read()
        description = ''
        if path.endswith('.py'):
            start = (
                content.find('\'\'\'')
                if content.find('\'\'\'') != -1
                else content.find('"""')
            )
                end = (
                    content.find('\'\'\'', start)
                    if content.find('\'\'\'', start) != -1
                    else content.find('"""', start)
                )
                    description = content[start:end]
        else:
            for line in content.split('\n'):
                if line.startswith('#!'):
                    continue
                elif line.startswith('#'):
                    description += line[1:].strip() + '\n'
        description = description.strip()
        if description.startswith('Criticality:'):
            criticality, *description = description.split('\n')
            criticality = criticality[len('Criticality:'):].strip()
            description = '\n'.join(description)
        else:
            criticality = 'not specified'
        return criticality, description
    def discover_tests(self, basic_only=False, msuser=None):
        ignored_tests = utils.get_conf('TESTER_IGNORED_TESTS', '').split(',')
        ignored_tests.append("__init__.py")
        # Get standard tests
        path = os.path.join(self.root_dir, 'tests')
        if not os.path.isdir(path):
            log('The tests dir is missing ("%s").' % path)
        names = os.listdir(path)
        names.sort()
        if not names:
            log('The tests dir is empty ("%s").' % path)
            return
        criticalities_map = {
            'Low': 1,
            'Normal': 2,
            'High': 3,
        }
        tests = list()
        for name in names:
            if name in ignored_tests:
                continue
            test_path = os.path.join(path, name)
            if os.path.isfile(test_path):
                criticality, description = self.parse_file_header(test_path)
                tests.append((name, criticality, description, [test_path]))
            tests.sort(key=lambda i: (-criticalities_map.get(i[1], 0), i[0]))
        elif msuser:
            tests = list()
        # Get MS instances
        ms_users = list()
        for user in os.listdir('/home'):
            if os.path.exists('/home/%s/msinstance' % user) and (not msuser or user == msuser):
                ms_users.append(user)
        # Get MediaServer tests
            ms_users.sort()
            cleaned_list = list()
            instances_to_test = utils.get_conf('TESTER_MS_INSTANCES', '').split(',')
            if instances_to_test:
                for val in instances_to_test:
                    val = val.strip()
                    if not val:
                        continue
                    if val in ms_users:
                        cleaned_list.append(val)
                    else:
                        log('An inexisting instance has been requested for tests: "%s".' % val)
            if cleaned_list:
                ms_users = cleaned_list
            else:
                try:
                    max_instances = int(utils.get_conf('TESTER_MAX_INSTANCES') or 2)
                except Exception as e:
                    log('TESTER_MAX_INSTANCES has an invalid value: %s' % e)
                    max_instances = 2
                if len(ms_users) > max_instances:
                    ms_users = ms_users[:max_instances]
            log('Instances that will be tested: %s.' % ', '.join(ms_users))
            # Clone testing suite
            ms_path = os.path.join(path, 'ms-testing-suite')
            if not os.path.exists(ms_path):
                log('Cloning ms-testing-suite in "%s".' % ms_path)
                subprocess.call(['git', 'clone', '--recursive', 'https://panel.ubicast.eu/git/mediaserver/ms-testing-suite.git', ms_path])
            if os.path.exists(ms_path):
                log('Updating ms-testing-suite in "%s".' % ms_path)
                os.chdir(ms_path)
                subprocess.call(['git', 'pull', '--recurse-submodules'])
                subprocess.call(['git', 'submodule', 'update', '--init', '--recursive'])
                os.chdir(self.root_dir)
            # Add tests to list
            log('Add MediaServer tests if available.')
            wowza_dir = '/usr/local/WowzaStreamingEngine'
            etc_lives_conf = '/etc/mediaserver/lives_conf.py'
            local_lives_conf = '/home/%s/msinstance/conf/lives_conf.py'
            for user in ms_users:
                ms_tests = ['ms_vod_tester.py', 'test_caches.py']
                # Check if live tests should be started
                if os.path.exists(wowza_dir) or os.path.exists(etc_lives_conf) or os.path.exists(local_lives_conf % user):
                    ms_tests.append('test_wowza_secure.py')
                    ms_tests.append('ms_live_tester.py')
                for name in ms_tests:
                    if name in ignored_tests:
                        continue
                    test_path = os.path.join(ms_path, name)
                    criticality, description = self.parse_file_header(test_path)
                    tests.append(('%s (%s)' % (name, user), criticality, description, [test_path, user]))
        tests.sort(key=lambda i: (-criticalities_map.get(i[1], 0), i[0]))
    def run_tests(self, tests, email=False, email_if_fail=False):
        successes = 0
        failures = 0
        total_duration = None
        report_rows = [('Test', 'Criticality', 'Result', 'Duration', 'Description')]
        report_rows_length = [len(t) for t in report_rows[0]]
        out_of_support = False
        for name, criticality, description, command in tests:
            log('\033[1;95m-- Test "%s" --\033[0;0m' % name)
            start_date = datetime.datetime.utcnow()
            log('Test start: %s UTC.' % start_date.strftime('%Y-%m-%d %H:%M:%S'))
            p = subprocess.Popen(command, stdin=sys.stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            out, err = p.communicate()
            if out:
                out = out.decode('utf-8').strip()
                out_of_support = out_of_support or 'out of support' in out
                log(out)
                err = err.decode('utf-8').strip()
                log(err)
            if p.returncode == 0:
            elif p.returncode == 2:
            elif p.returncode == 3:
                status = '\033[93mwarning\033[0m'
                log('Command exited with code %s.' % p.returncode)
            end_date = datetime.datetime.utcnow()
            duration = end_date - start_date
            if total_duration:
                total_duration += duration
            else:
                total_duration = duration
            log('Test end: %s UTC (duration: %s).' % (end_date.strftime('%Y-%m-%d %H:%M:%S'), duration))
            # Prepare report
            report_rows.append((name, criticality, status, str(duration), description))
            report_rows_length = [max(len(strip_colors(t)), report_rows_length[i]) for i, t in enumerate(report_rows[-1])]
        # Display results
        #     results as text
            if not log_report:
                log_report += '-' * 50
            for i, val in enumerate(row):
                if i == len(row) - 1:
                    break
                if i == 0:
                    # merge name and description
                    log_report += '\n\033[96m%s\033[0m  \033[37m%s\033[0m\n' % (val, row[-1])
                else:
                    nb_sp = report_rows_length[i] - len(strip_colors(val))
                    log_report += '  %s%s' % (val, ' ' * nb_sp)
            log_report += '\n' + '-' * 50
        if out_of_support:
            log_report = OUT_OF_SUPPORT_TEXT + '\n' + log_report
        log(log_report.strip())
        log('Total tests duration: %s.\n' % total_duration)
        #     results as html
        html_report = ''
        for row in report_rows:
            html_cell = 'th' if not html_report else 'td'
            html_report += '\n <tr>'
            for i, val in enumerate(row):
                html_report += ' <%s>%s</%s>' % (html_cell, escape(val), html_cell)
        html_report = '<table border="1">%s\n</table>' % html_report
        if out_of_support:
            html_report = '<p>' + escape(OUT_OF_SUPPORT_TEXT) + '</p>\n' + html_report
        # Store locally results
        now = datetime.datetime.utcnow()
        log_dir = os.path.join(self.root_dir, 'log')
        if not os.path.exists(log_dir):
            os.makedirs(log_dir)
        history_file = os.path.join(log_dir, 'tests_history.txt')
        add_header = not os.path.exists(history_file)
        with open(history_file, 'a') as fo:
            if add_header:
                fo.write('Date | Result | Succeeded | Failed | Not testable\n')
            fo.write('%s | %s | %s | %s | %s\n' % (now.strftime('%Y-%m-%d %H:%M:%S'), 'KO' if failures > 0 else 'OK', successes, failures, len(tests) - successes - failures))
        # Search for old logs to remove
        names = os.listdir(log_dir)
        names.sort()
        for name in list(names):
            if not name.startswith('results_'):
                names.remove(name)
        while len(names) > self.MAX_LOG_FILES - 1:
            name = names.pop(0)
            try:
                log('Removing old log "%s".' % os.path.join(log_dir, name))
                os.remove(os.path.join(log_dir, name))
            except Exception as e:
                log('Failed to remove old log: %s' % e)
        # Write log to file
        hostname = subprocess.check_output(['hostname'])
        if hostname:
            hostname = hostname.decode('utf-8').strip()
        else:
            log('Failed to get hostname (required to send email).')
        log_name = 'results_%s_%s.txt' % (hostname or 'noname', now.strftime('%Y-%m-%d_%H-%M-%S'))
        log_content = strip_colors(log_buffer.getvalue())
        with open(os.path.join(log_dir, log_name), 'w') as fo:
            fo.write(log_content)
        send_email = False
        if hostname:
            if email:
                send_email = True
            elif email_if_fail and failures > 0:
                # if they were too many consecutive failures, do not send the email
                with open(history_file, 'r') as fo:
                    history_content = fo.read()
                lines = history_content.split('\n')
                lines.reverse()
                consecutive_failures = 0
                for line in lines:
                    if line:
                        if 'KO' in line:
                            consecutive_failures += 1
                        else:
                            break
                if consecutive_failures == self.NO_MAIL_FAILURES_COUNT:
                    consecutive_msg = 'Maximum consecutive tester failures reached (%s).\nNo more emails will be sent.' % consecutive_failures
                    send_email = True
                elif consecutive_failures < self.NO_MAIL_FAILURES_COUNT:
                    consecutive_msg = 'Consecutive tester failures: %s (will stop sending reports when reaching %s failures).' % (consecutive_failures, self.NO_MAIL_FAILURES_COUNT)
                    send_email = True
                else:
                    consecutive_msg = 'Too many consecutive tester failures: %s, no email will be sent.' % consecutive_failures
                log(consecutive_msg)
                html_report += '\n<br/>' + consecutive_msg.replace('\n', '\n<br/>')
            recipients = utils.get_conf('EMAIL_ADMINS') or ''
            system_domain = utils.get_conf('MS_SERVER_NAME')
            system_type = 'MediaServer'
            if system_domain == 'mediaserver':
                system_domain = utils.get_conf('CM_SERVER_NAME')
                system_type = 'CampusManager'
                if system_domain == 'campusmanager':
                    system_domain = utils.get_conf('MONITOR_SERVER_NAME')
                    system_type = 'CampusManager'
                    if system_domain == 'monitor':
                        system_domain = 'Server'
                        system_type = '-'
            if utils.get_conf('PREMIUM_SUPPORT') != '0':
                system_domain = '[PREMIUM] %s' % system_domain
                if not out_of_support:
                    recipients += ',premium-support@ubicast.eu'
            if out_of_support:
                recipients = recipients.replace('sysadmin@ubicast.eu', '').replace(',,', ',')
            recipients = recipients.strip(',')
            if not recipients:
                log('No recipients defined for email sending. Set a value for EMAIL_ADMINS.')
                return 1
            boundary = str(uuid.uuid4())
            mail = '''From: %(hostname)s <support@ubicast.eu>
Subject: %(system_domain)s (%(hostname)s) %(system_type)s health report: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"

--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8

<p><b>Date: %(date)s UTC</b></p>
Content-type: text/plain; name="%(log_name)s"; charset=UTF-8
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: utf-8

%(log_content)s''' % dict(
                hostname=hostname,
                status=('KO (%s tests failed)' % failures) if failures > 0 else 'OK',
                date=now.strftime('%Y-%m-%d %H:%M:%S'),
                report=html_report,
                log_name=log_name,
                log_content=log_content,
                system_domain=system_domain,
                system_type=system_type,
            p = subprocess.Popen(['/usr/sbin/sendmail', '-t'], stdin=subprocess.PIPE, stdout=sys.stdout.stream, stderr=sys.stderr.stream)
            p.communicate(input=mail.encode('utf-8'))
            if p.returncode != 0:
                log('Failed to send email.')
                return 1
            else:
                log('Email sent to: %s' % recipients)
        exit_code = 1 if failures > 0 else 0
        return exit_code


if __name__ == '__main__':
    Tester(*sys.argv[1:])