#!/usr/bin/env python3 # -*- coding: utf-8 -*- ''' Script to start tests and to manage their results ''' from io import StringIO import datetime import os import re import subprocess import sys import uuid import utils from utils import log class Logger(object): def __init__(self, stream, log_buffer): self.stream = stream self.log_buffer = log_buffer def write(self, text): self.stream.write(text) self.stream.flush() self.log_buffer.write(text) self.log_buffer.flush() def flush(self): pass log_buffer = StringIO() sys.stdout = Logger(sys.stdout, log_buffer) sys.stderr = sys.stdout class Tester(): USAGE = '''%s [-ms] [-e] [-d] [-h] -ms: run MediaServer tests in addition of standard tests. -e: send email with report. -d: debug mode (can be started with non root users). -h: show this message.''' % __file__ def __init__(self, *args): log('\033[96m-------------------------------\033[0m') log('\033[96m- UbiCast applications tester -\033[0m') log('\033[96m-------------------------------\033[0m') args = list(args) # Check if help is required if '-h' in args: log('USAGE: ' + self.USAGE) sys.exit(0) # Check current dir root_dir = utils.get_dir(__file__) if root_dir != '': os.chdir(root_dir) self.root_dir = root_dir # Add to python path if root_dir not in sys.path: sys.path.append(root_dir) # Check that this script is run by root debug = '-d' in args whoami = subprocess.check_output(['whoami']).decode('utf-8').strip() if whoami != 'root' and not debug: log('This script should be run as root user.') sys.exit(1) # Load conf conf = utils.load_conf() if not conf: log('No configuration loaded.') sys.exit(1) # Check for email value email = '-e' in args ms = '-ms' in args tests = self.discover_tests(ms) if not tests: sys.exit(1) exit_code = self.run_tests(tests, email) sys.exit(exit_code) def get_file_description(self, path): with open(path, 'r') as fo: content = fo.read() description = '' if path.endswith('.py'): start = content.find('\'\'\'') if start > 0: start += 3 end = content.find('\'\'\'', start) if end > 0: description = content[start:end] else: for line in content.split('\n'): if line.startswith('#!'): continue elif line.startswith('#'): description += line[1:].strip() + '\n' else: break return description.strip() def discover_tests(self, ms=False): tests = list() # Get standard tests path = os.path.join(self.root_dir, 'tests') if not os.path.isdir(path): log('The tests dir is missing ("%s").' % path) return tests names = os.listdir(path) names.sort() if not names: log('The tests dir is empty ("%s").' % path) return tests for name in names: test_path = os.path.join(path, name) if os.path.isfile(test_path): description = self.get_file_description(test_path) tests.append((name, description, [test_path])) # Get MediaServer tests if ms: # Clone testing suite ms_path = os.path.join(path, 'ms-testing-suite') if os.path.exists(ms_path): log('Updating ms-testing-suite in "%s".' % ms_path) os.chdir(ms_path) subprocess.check_call(['git', 'pull']) os.chdir(self.root_dir) else: log('Cloning ms-testing-suite in "%s".' % ms_path) subprocess.check_call(['git', 'clone', 'https://git.ubicast.net/mediaserver/ms-testing-suite.git', ms_path]) ms_tests = ['ms_vod_tester.py'] # Get MS instances users = list() for user in os.listdir('/home'): if os.path.exists('/home/%s/msinstance' % user): users.append(user) # check that Wowza is installed wowza_dir = '/usr/local/WowzaStreamingEngine' if not os.path.exists(wowza_dir): log('Wowza is not installed ("%s" does not exist), the live test will not be run.' % wowza_dir) else: ms_tests.append('ms_live_tester.py') # Add tests to list for user in users: for name in ms_tests: test_path = os.path.join(ms_path, name) description = self.get_file_description(test_path) tests.append(('%s (%s)' % (name, user), description, [test_path, user])) return tests def run_tests(self, tests, email=False): results = list() exit_code = 0 # Run all tests for name, description, command in tests: log('\033[1;95m-- Test "%s" --\033[0;0m' % name) # Run test try: p = subprocess.Popen(command, stdin=sys.stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() if out: log(out.decode('utf-8').strip()) if err: log(err.decode('utf-8').strip()) if p.returncode != 0: raise Exception('Command exited with code %s.' % p.returncode) except Exception as e: exit_code = 1 log(e) results.append((name, description, command, False)) else: results.append((name, description, command, True)) # Display results log('\nTests results:') html_report = '<table border="1">' html_report += '\n<tr><th>Test</th><th>Result</th><th>Description</th></tr>' for name, description, command, success in results: if success: html_result = '<span style="color: green;">success</span>' term_result = '\033[92msuccess\033[0m' else: html_result = '<span style="color: red;">failure</span>' term_result = '\033[91mfailure\033[0m' log(' %s: %s' % (name, term_result)) html_report += '\n<tr><td>%s</td><td>%s</td><td>%s</td></tr>' % (name, html_result, description.replace('\n', '<br/>\n')) html_report += '\n</table>' # Send email if email: log('') hostname = subprocess.check_output(['hostname']) if not hostname: log('Failed to get hostname (required to send email).') return 1 hostname = hostname.decode('utf-8').strip() recipients = utils.get_conf('EMAIL_ADMINS') if not recipients: log('No recipients defined for email sending. Set a value for EMAIL_ADMINS.') return 1 boundary = str(uuid.uuid4()) now = datetime.datetime.utcnow() log_content = re.sub(r'\033\[[\d;]+m', '', log_buffer.getvalue()) mail = '''From: %(hostname)s <noreply@ubicast.eu> To: %(recipients)s Subject: UbiCast application test: %(status)s Mime-Version: 1.0 Content-type: multipart/related; boundary="%(boundary)s" --%(boundary)s Content-Type: text/html; charset=UTF-8 Content-transfer-encoding: utf-8 <b>Date: %(date)s UTC</b><br/><br/> %(report)s --%(boundary)s Content-type: text/plain; name="%(log_name)s"; charset=UTF-8 Content-disposition: attachment; filename="%(log_name)s" Content-transfer-encoding: utf-8 %(log_content)s''' % dict( boundary=boundary, hostname=hostname, recipients=recipients, status='OK' if exit_code == 0 else 'KO', date=now.strftime('%Y-%m-%d %H:%M:%S'), log_name='results_%s_%s.txt' % (hostname, now.strftime('%Y-%m-%d_%H-%M-%S')), report=html_report, log_content=log_content, ) p = subprocess.Popen(['sendmail', '-t'], stdin=subprocess.PIPE, stdout=sys.stdout.stream, stderr=sys.stderr.stream) p.communicate(input=mail.encode('utf-8')) if p.returncode != 0: log('Failed to send email.') return 1 else: log('Email sent to: %s' % recipients) return exit_code if __name__ == '__main__': Tester(*sys.argv[1:])