Skip to content
Snippets Groups Projects
tester.py 8.65 KiB
Newer Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Script to start tests and to manage their results
'''
from io import StringIO
import datetime
class Logger(object):
    def __init__(self, stream, log_buffer):
        self.stream = stream
        self.log_buffer = log_buffer

    def write(self, text):
        self.stream.write(text)
        self.stream.flush()
        self.log_buffer.write(text)
        self.log_buffer.flush()

    def flush(self):
        pass

log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout


    USAGE = '''%s [-e] [-d] [-h]
    -e: send email with report.
    -d: debug mode (can be started with non root users).
    -h: show this message.''' % __file__

    def __init__(self, *args):
        log('\033[96m-------------------------------\033[0m')
        log('\033[96m- UbiCast applications tester -\033[0m')
        log('\033[96m-------------------------------\033[0m')
        args = list(args)
        # Check if help is required
        if '-h' in args:
            log('USAGE: ' + self.USAGE)
            sys.exit(0)
        # Check current dir
        root_dir = utils.get_dir(__file__)
        if root_dir != '':
            os.chdir(root_dir)
        self.root_dir = root_dir
        # Add to python path
        if root_dir not in sys.path:
            sys.path.append(root_dir)
        # Check that this script is run by root
        debug = '-d' in args
        whoami = subprocess.check_output(['whoami']).decode('utf-8').strip()
Stéphane Diemer's avatar
Stéphane Diemer committed
        if whoami != 'root' and not debug:
            log('This script should be run as root user.')
            sys.exit(1)
        # Load conf
        conf = utils.load_conf()
        if not conf:
            log('No configuration loaded.')
            sys.exit(1)
        # Check for email value
        email = '-e' in args
        tests = self.discover_tests()
        if not tests:
            sys.exit(1)
        exit_code = self.run_tests(tests, email)
        sys.exit(exit_code)

    def get_file_description(self, path):
        with open(path, 'r') as fo:
            content = fo.read()
        description = ''
        if path.endswith('.py'):
            start = content.find('\'\'\'')
            if start > 0:
                start += 3
                end = content.find('\'\'\'', start)
                if end > 0:
                    description = content[start:end]
        else:
            for line in content.split('\n'):
                if line.startswith('#!'):
                    continue
                elif line.startswith('#'):
                    description += line[1:].strip() + '\n'
                else:
                    break
        return description.strip()

    def discover_tests(self):
        tests = list()
        # Get standard tests
        path = os.path.join(self.root_dir, 'tests')
        if not os.path.isdir(path):
            log('The tests dir is missing ("%s").' % path)
            return tests
        names = os.listdir(path)
        names.sort()
        if not names:
            log('The tests dir is empty ("%s").' % path)
            return tests
        for name in names:
            test_path = os.path.join(path, name)
            if os.path.isfile(test_path):
                description = self.get_file_description(test_path)
                tests.append((name, description, [test_path]))
        # Get MS instances
        ms_users = list()
        for user in os.listdir('/home'):
            if os.path.exists('/home/%s/msinstance' % user):
                ms_users.append(user)
        # Get MediaServer tests
            # Clone testing suite
            ms_path = os.path.join(path, 'ms-testing-suite')
            if os.path.exists(ms_path):
                log('Updating ms-testing-suite in "%s".' % ms_path)
                os.chdir(ms_path)
                subprocess.check_call(['git', 'pull'])
                os.chdir(self.root_dir)
            else:
                log('Cloning ms-testing-suite in "%s".' % ms_path)
                subprocess.check_call(['git', 'clone', 'https://git.ubicast.net/mediaserver/ms-testing-suite.git', ms_path])
            # Add tests to list
            wowza_dir = '/usr/local/WowzaStreamingEngine'
            etc_lives_conf = '/etc/mediaserver/lives_conf.py'
            local_lives_conf = '/home/%s/msinstance/conf/lives_conf.py'
            for user in ms_users:
                ms_tests = ['ms_vod_tester.py']
                # Check if live tests should be started
                if os.path.exists(wowza_dir) or os.path.exists(etc_lives_conf) or os.path.exists(local_lives_conf % user):
                    ms_tests.append('ms_live_tester.py')
                for name in ms_tests:
                    test_path = os.path.join(ms_path, name)
                    description = self.get_file_description(test_path)
                    tests.append(('%s (%s)' % (name, user), description, [test_path, user]))
        return tests

    def run_tests(self, tests, email=False):
        results = list()
        exit_code = 0
        # Run all tests
        for name, description, command in tests:
            log('\033[1;95m-- Test "%s" --\033[0;0m' % name)
            # Run test
            try:
                p = subprocess.Popen(command, stdin=sys.stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                out, err = p.communicate()
                if out:
                    log(out.decode('utf-8').strip())
                if err:
                    log(err.decode('utf-8').strip())
                if p.returncode != 0:
                    raise Exception('Command exited with code %s.' % p.returncode)
            except Exception as e:
                exit_code = 1
                log(e)
                results.append((name, description, command, False))
                results.append((name, description, command, True))
        # Display results
        html_report = '<table border="1">'
        html_report += '\n<tr><th>Test</th><th>Result</th><th>Description</th></tr>'
        for name, description, command, success in results:
            if success:
                html_result = '<span style="color: green;">success</span>'
                term_result = '\033[92msuccess\033[0m'
            else:
                html_result = '<span style="color: red;">failure</span>'
                term_result = '\033[91mfailure\033[0m'
            log('  %s: %s' % (name, term_result))
            html_report += '\n<tr><td>%s</td><td>%s</td><td>%s</td></tr>' % (name, html_result, description.replace('\n', '<br/>\n'))
        html_report += '\n</table>'
        # Send email
        if email:
            log('')
            hostname = subprocess.check_output(['hostname'])
            if not hostname:
                log('Failed to get hostname (required to send email).')
                return 1
Stéphane Diemer's avatar
Stéphane Diemer committed
            hostname = hostname.decode('utf-8').strip()
            recipients = utils.get_conf('EMAIL_ADMINS')
            if not recipients:
                log('No recipients defined for email sending. Set a value for EMAIL_ADMINS.')
                return 1
            boundary = str(uuid.uuid4())
            now = datetime.datetime.utcnow()
            log_content = re.sub(r'\033\[[\d;]+m', '', log_buffer.getvalue())
            mail = '''From: %(hostname)s <noreply@ubicast.eu>
To: %(recipients)s
Subject: UbiCast application test: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"

--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8

<b>Date: %(date)s UTC</b><br/><br/>
Content-type: text/plain; name="%(log_name)s"; charset=UTF-8
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: utf-8

%(log_content)s''' % dict(
                hostname=hostname,
                recipients=recipients,
                status='OK' if exit_code == 0 else 'KO',
                date=now.strftime('%Y-%m-%d %H:%M:%S'),
                log_name='results_%s_%s.txt' % (hostname, now.strftime('%Y-%m-%d_%H-%M-%S')),
                log_content=log_content,
            )
            p = subprocess.Popen(['sendmail', '-t'], stdin=subprocess.PIPE, stdout=sys.stdout.stream, stderr=sys.stderr.stream)
            p.communicate(input=mail.encode('utf-8'))
            if p.returncode != 0:
                log('Failed to send email.')
                return 1
            else:
                log('Email sent to: %s' % recipients)
        return exit_code


if __name__ == '__main__':
    Tester(*sys.argv[1:])