Skip to content
Snippets Groups Projects
tester.py 6.52 KiB
Newer Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Script to start tests and to manage their results
'''
from io import StringIO
import datetime
class Logger(object):
    def __init__(self, stream, log_buffer):
        self.stream = stream
        self.log_buffer = log_buffer

    def write(self, text):
        self.stream.write(text)
        self.stream.flush()
        self.log_buffer.write(text)
        self.log_buffer.flush()

    def flush(self):
        pass

log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout


class Tester():
    USAGE = '''%s [-e] [-d] [-h]
    -e: send email with report.
    -d: debug mode (can be started with non root users).
    -h: show this message.''' % __file__

    def __init__(self, *args):
        log('\033[96m-------------------------------\033[0m')
        log('\033[96m- UbiCast applications tester -\033[0m')
        log('\033[96m-------------------------------\033[0m')
        args = list(args)
        # Check if help is required
        if '-h' in args:
            log('USAGE: ' + self.USAGE)
            sys.exit(0)
        # Check current dir
        root_dir = utils.get_dir(__file__)
        if root_dir != '':
            os.chdir(root_dir)
        self.root_dir = root_dir
        # Add to python path
        if root_dir not in sys.path:
            sys.path.append(root_dir)
        # Check that this script is run by root
        debug = '-d' in args
        if debug:
            args.remove('-d')
        whoami = subprocess.check_output(['whoami']).decode('utf-8').strip()
Stéphane Diemer's avatar
Stéphane Diemer committed
        if whoami != 'root' and not debug:
            log('This script should be run as root user.')
            sys.exit(1)
        # Load conf
        conf = utils.load_conf()
        if not conf:
            log('No configuration loaded.')
            sys.exit(1)
        # Check for email value
        email = '-e' in args
        if email:
        exit_code = self.run_tests(email)
        sys.exit(exit_code)

    def get_file_description(self, path):
        with open(path, 'r') as fo:
            content = fo.read()
        description = ''
        if path.endswith('.py'):
            start = content.find('\'\'\'')
            if start > 0:
                start += 3
                end = content.find('\'\'\'', start)
                if end > 0:
                    description = content[start:end]
        else:
            for line in content.split('\n'):
                if line.startswith('#!'):
                    continue
                elif line.startswith('#'):
                    description += line[1:].strip() + '\n'
                else:
                    break
        return description.strip()

    def run_tests(self, email=False):
        path = os.path.join(self.root_dir, 'tests')
        if not os.path.isdir(path):
            log('The tests dir is missing ("%s").' % path)
            return 1
        os.chdir(path)
        names = os.listdir(path)
        names.sort()
        if not names:
            log('The tests dir is empty ("%s").' % path)
            return 1
        results = list()
        exit_code = 0
        # Run all tests
        for name in names:
            log('\033[1;95m-- Test "%s" --\033[0;0m' % name)
            test_path = os.path.join(path, name)
            # Get test description
            description = self.get_file_description(test_path)
            # Run test
            try:
                code, out = utils.exec_cmd(test_path)
                if code != 0:
                    raise Exception('Command exited with code %s.' % code)
            except Exception as e:
                exit_code = 1
                log(e)
                results.append((False, test_path, description))
            else:
                results.append((True, test_path, description))
        log('\nTests results:')
        html_report = '<table border="1">'
        html_report += '\n<tr><th>Test</th><th>Result</th><th>Description</th></tr>'
        for success, test_path, description in results:
            file_name = os.path.basename(test_path)
            if success:
                html_result = '<span style="color: green;">success</span>'
                term_result = '\033[92msuccess\033[0m'
            else:
                html_result = '<span style="color: red;">failure</span>'
                term_result = '\033[91mfailure\033[0m'
            log('  %s: %s' % (file_name, term_result))
            html_report += '\n<tr><td>%s</td><td>%s</td><td>%s</td></tr>' % (file_name, html_result, description.replace('\n', '<br/>\n'))
        html_report += '\n</table>'
        # Send email
        if email:
            log('')
            hostname = subprocess.check_output(['hostname'])
            if not hostname:
                log('Failed to get hostname (required to send email).')
                return 1
            recipients = utils.get_conf('EMAIL_ADMINS')
            if not recipients:
                log('No recipients defined for email sending. Set a value for EMAIL_ADMINS.')
                return 1
            boundary = str(uuid.uuid4())
            now = datetime.datetime.utcnow()
            mail = '''From: %(hostname)s <noreply@ubicast.eu>
To: %(recipients)s
Subject: UbiCast application test: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"

--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8

<b>Date: %(date)s UTC</b><br/><br/>
%(report)s

--%(boundary)s
Content-type: application/octet-stream; name="%(log_name)s"
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: utf-8

%(log_content)s
''' % dict(
                boundary=boundary,
                hostname=hostname.decode('utf-8'),
                recipients=recipients,
                status='OK' if exit_code == 0 else 'KO',
                date=now.strftime('%Y-%m-%d %H:%M:%S'),
                log_name='results_' + now.strftime('%Y-%m-%d_%H-%M-%S') + '.txt',
                report=html_report,
                log_content=log_buffer.getvalue(),
            )
            p = subprocess.Popen(['sendmail', '-t'], stdin=subprocess.PIPE, stdout=sys.stdout.stream, stderr=sys.stderr.stream)
            p.communicate(input=mail.encode('utf-8'))
            if p.returncode != 0:
                log('Failed to send email.')
                return 1
            else:
                log('Email sent to: %s' % recipients)
        return exit_code


if __name__ == '__main__':
    Tester(*sys.argv[1:])