#!/usr/bin/env python3 # -*- coding: utf-8 -*- ''' Script to start tests and to manage their results ''' from io import StringIO import datetime import os import re import subprocess import sys import uuid import utils from utils import log class Logger(object): def __init__(self, stream, log_buffer): self.stream = stream self.log_buffer = log_buffer def write(self, text): self.stream.write(text) self.stream.flush() self.log_buffer.write(text) self.log_buffer.flush() def flush(self): pass log_buffer = StringIO() sys.stdout = Logger(sys.stdout, log_buffer) sys.stderr = sys.stdout def strip_colors(text): return re.sub(r'\033\[[\d;]+m', '', text) class Tester(): USAGE = '''%s [-e] [-f] [-b] [-d] [-h] [msuser] -e: send email with report. -f: send email with report only if at least one test failed. -b: run only basic tests (exclude mediaserver tests). -d: debug mode (can be started with non root users). -h: show this message.''' % __file__ VALID_ARGS = ['-e', '-f', '-b', '-d', '-h'] MAX_LOG_FILES = 50 NO_MAIL_FAILURES_COUNT = 5 def __init__(self, *args): log('\033[96m-------------------------------\033[0m') log('\033[96m- UbiCast applications tester -\033[0m') log('\033[96m-------------------------------\033[0m') args = list(args) msuser = None # Check if help is required if '-h' in args: log('USAGE: ' + self.USAGE) sys.exit(0) for index, arg in enumerate(args): if arg not in self.VALID_ARGS and index + 1 != len(args): log('Invalid argument given: "%s".\n' % arg) log('USAGE: ' + self.USAGE) sys.exit(1) else: log("Optional target user : %s" % arg) if not os.path.isdir(os.path.join('/home', arg)): log("Mediaserver user %s does not exist" % arg) sys.exit(1) else: msuser = arg # Check current dir root_dir = utils.get_dir(__file__) if root_dir != '': os.chdir(root_dir) self.root_dir = root_dir # Add to python path if root_dir not in sys.path: sys.path.append(root_dir) # Check that this script is run by root debug = '-d' in args whoami = subprocess.check_output(['whoami']).decode('utf-8').strip() if whoami != 'root' and not debug: log('This script should be run as root user.') sys.exit(1) # Update envsetup files subprocess.call(['python3', 'update_envsetup.py']) # Load conf conf = utils.load_conf() if not conf: log('No configuration loaded.') sys.exit(1) # Check for email value email = '-e' in args email_if_fail = '-f' in args basic_only = '-b' in args tests = self.discover_tests(basic_only, msuser) if not tests: sys.exit(1) exit_code = self.run_tests(tests, email, email_if_fail) sys.exit(exit_code) def parse_file_header(self, path): with open(path, 'r') as fo: content = fo.read() description = '' if path.endswith('.py'): start = content.find('\'\'\'') if start > 0: start += 3 end = content.find('\'\'\'', start) if end > 0: description = content[start:end] else: for line in content.split('\n'): if line.startswith('#!'): continue elif line.startswith('#'): description += line[1:].strip() + '\n' else: break description = description.strip() if description.startswith('Criticality:'): criticality, *description = description.split('\n') criticality = criticality[len('Criticality:'):].strip() description = '\n'.join(description) else: criticality = 'not specified' return criticality, description def discover_tests(self, basic_only=False, msuser=None): ignored_tests = utils.get_conf('TESTER_IGNORED_TESTS', '').split(',') # Get standard tests path = os.path.join(self.root_dir, 'tests') if not os.path.isdir(path): log('The tests dir is missing ("%s").' % path) return names = os.listdir(path) names.sort() if not names: log('The tests dir is empty ("%s").' % path) return criticalities_map = { 'Low': 1, 'Normal': 2, 'High': 3, } tests = list() for name in names: if name in ignored_tests: continue test_path = os.path.join(path, name) if os.path.isfile(test_path): criticality, description = self.parse_file_header(test_path) tests.append((name, criticality, description, [test_path])) if basic_only: tests.sort(key=lambda i: (-criticalities_map.get(i[1], 0), i[0])) return tests elif msuser: tests = list() # Get MS instances ms_users = list() for user in os.listdir('/home'): if os.path.exists('/home/%s/msinstance' % user) and user == msuser: ms_users.append(user) # Get MediaServer tests if ms_users: ms_users.sort() cleaned_list = list() instances_to_test = utils.get_conf('TESTER_MS_INSTANCES', '').split(',') if instances_to_test: for val in instances_to_test: val = val.strip() if not val: continue if val in ms_users: cleaned_list.append(val) else: log('An inexisting instance has been requested for tests: "%s".' % val) if cleaned_list: ms_users = cleaned_list else: try: max_instances = int(utils.get_conf('TESTER_MAX_INSTANCES') or 2) except Exception as e: log('TESTER_MAX_INSTANCES has an invalid value: %s' % e) max_instances = 2 if len(ms_users) > max_instances: ms_users = ms_users[:max_instances] log('Instances that will be tested: %s.' % ', '.join(ms_users)) # Clone testing suite ms_path = os.path.join(path, 'ms-testing-suite') if os.path.exists(ms_path): log('Updating ms-testing-suite in "%s".' % ms_path) os.chdir(ms_path) subprocess.check_call(['git', 'pull']) os.chdir(self.root_dir) else: log('Cloning ms-testing-suite in "%s".' % ms_path) subprocess.check_call(['git', 'clone', 'https://panel.ubicast.eu/git/mediaserver/ms-testing-suite.git', ms_path]) # Add tests to list wowza_dir = '/usr/local/WowzaStreamingEngine' etc_lives_conf = '/etc/mediaserver/lives_conf.py' local_lives_conf = '/home/%s/msinstance/conf/lives_conf.py' for user in ms_users: ms_tests = ['ms_vod_tester.py', 'test_caches.py'] # Check if live tests should be started if os.path.exists(wowza_dir) or os.path.exists(etc_lives_conf) or os.path.exists(local_lives_conf % user): ms_tests.append('test_wowza_secure.py') ms_tests.append('ms_live_tester.py') for name in ms_tests: if name in ignored_tests: continue test_path = os.path.join(ms_path, name) criticality, description = self.parse_file_header(test_path) tests.append(('%s (%s)' % (name, user), criticality, description, [test_path, user])) tests.sort(key=lambda i: (-criticalities_map.get(i[1], 0), i[0])) return tests def run_tests(self, tests, email=False, email_if_fail=False): # Run all tests successes = 0 failures = 0 total_duration = None report_rows = [('Test', 'Criticality', 'Result', 'Duration', 'Description')] report_rows_length = [len(t) for t in report_rows[0]] for name, criticality, description, command in tests: log('\033[1;95m-- Test "%s" --\033[0;0m' % name) start_date = datetime.datetime.utcnow() log('Test start: %s UTC.' % start_date.strftime('%Y-%m-%d %H:%M:%S')) # Run test p = subprocess.Popen(command, stdin=sys.stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() if out: log(out.decode('utf-8').strip()) if err: log(err.decode('utf-8').strip()) if p.returncode == 0: status = '\033[92msuccess\033[0m' successes += 1 elif p.returncode == 2: status = '\033[94mnot testable\033[0m' elif p.returncode == 3: status = '\033[93mwarning\033[0m' else: status = '\033[91mfailure\033[0m' failures += 1 log('Command exited with code %s.' % p.returncode) # Get duration end_date = datetime.datetime.utcnow() duration = end_date - start_date if total_duration: total_duration += duration else: total_duration = duration log('Test end: %s UTC (duration: %s).' % (end_date.strftime('%Y-%m-%d %H:%M:%S'), duration)) # Prepare report report_rows.append((name, criticality, status, str(duration), description)) report_rows_length = [max(len(strip_colors(t)), report_rows_length[i]) for i, t in enumerate(report_rows[-1])] # Display results # results as text log('\nTests results:') log_report = '' for row in report_rows: if not log_report: log_report += '-' * 50 for i, val in enumerate(row): if i == len(row) - 1: break if i == 0: # merge name and description log_report += '\n\033[96m%s\033[0m \033[37m%s\033[0m\n' % (val, row[-1]) else: nb_sp = report_rows_length[i] - len(strip_colors(val)) log_report += ' %s%s' % (val, ' ' * nb_sp) log_report += '\n' + '-' * 50 log(log_report.strip()) log('Total tests duration: %s.\n' % total_duration) # results as html html_report = '' for row in report_rows: html_cell = 'th' if not html_report else 'td' html_report += '\n <tr>' for i, val in enumerate(row): html_report += ' <%s>%s</%s>' % (html_cell, val, html_cell) html_report += ' </tr>' html_report = '<table border="1">%s\n</table>' % html_report html_report = html_report.replace('\033[90m', '<span style="color: gray;">') html_report = html_report.replace('\033[91m', '<span style="color: red;">') html_report = html_report.replace('\033[92m', '<span style="color: green;">') html_report = html_report.replace('\033[93m', '<span style="color: orange;">') html_report = html_report.replace('\033[94m', '<span style="color: blue;">') html_report = html_report.replace('\033[95m', '<span style="color: purple;">') html_report = html_report.replace('\033[0m', '</span>') # Store locally results now = datetime.datetime.utcnow() log_dir = os.path.join(self.root_dir, 'log') if not os.path.exists(log_dir): os.makedirs(log_dir) history_file = os.path.join(log_dir, 'tests_history.txt') add_header = not os.path.exists(history_file) with open(history_file, 'a') as fo: if add_header: fo.write('Date | Result | Succeeded | Failed | Not testable\n') fo.write('%s | %s | %s | %s | %s\n' % (now.strftime('%Y-%m-%d %H:%M:%S'), 'KO' if failures > 0 else 'OK', successes, failures, len(tests) - successes - failures)) # Search for old logs to remove names = os.listdir(log_dir) names.sort() for name in list(names): if not name.startswith('results_'): names.remove(name) while len(names) > self.MAX_LOG_FILES - 1: name = names.pop(0) try: log('Removing old log "%s".' % os.path.join(log_dir, name)) os.remove(os.path.join(log_dir, name)) except Exception as e: log('Failed to remove old log: %s' % e) # Write log to file hostname = subprocess.check_output(['hostname']) if hostname: hostname = hostname.decode('utf-8').strip() else: log('Failed to get hostname (required to send email).') log_name = 'results_%s_%s.txt' % (hostname or 'noname', now.strftime('%Y-%m-%d_%H-%M-%S')) log_content = strip_colors(log_buffer.getvalue()) with open(os.path.join(log_dir, log_name), 'w') as fo: fo.write(log_content) # Send email send_email = False if hostname: if email: send_email = True elif email_if_fail and failures > 0: # if they were too many consecutive failures, do not send the email with open(history_file, 'r') as fo: history_content = fo.read() lines = history_content.split('\n') lines.reverse() consecutive_failures = 0 for line in lines: if line: if 'KO' in line: consecutive_failures += 1 else: break if consecutive_failures == self.NO_MAIL_FAILURES_COUNT: consecutive_msg = 'Maximum consecutive tester failures reached (%s).\nNo more emails will be sent.' % consecutive_failures send_email = True elif consecutive_failures < self.NO_MAIL_FAILURES_COUNT: consecutive_msg = 'Consecutive tester failures: %s (will stop sending reports when reaching %s failures).' % (consecutive_failures, self.NO_MAIL_FAILURES_COUNT) send_email = True else: consecutive_msg = 'Too many consecutive tester failures: %s, no email will be sent.' % consecutive_failures log(consecutive_msg) html_report += '\n<br/>' + consecutive_msg.replace('\n', '\n<br/>') if send_email: recipients = utils.get_conf('EMAIL_ADMINS') system_domain = utils.get_conf('MS_SERVER_NAME') system_type = 'MediaServer' if system_domain == 'mediaserver': system_domain = utils.get_conf('CM_SERVER_NAME') system_type = 'CampusManager' if system_domain == 'campusmanager': system_domain = utils.get_conf('MONITOR_SERVER_NAME') system_type = 'CampusManager' if system_domain == 'monitor': system_domain = 'Server' system_type = '-' if not recipients: log('No recipients defined for email sending. Set a value for EMAIL_ADMINS.') return 1 boundary = str(uuid.uuid4()) mail = '''From: %(hostname)s <support@ubicast.eu> To: %(recipients)s Subject: %(system_domain)s (%(hostname)s) %(system_type)s health report: %(status)s Mime-Version: 1.0 Content-type: multipart/related; boundary="%(boundary)s" --%(boundary)s Content-Type: text/html; charset=UTF-8 Content-transfer-encoding: utf-8 <b>Date: %(date)s UTC</b><br/><br/> %(report)s --%(boundary)s Content-type: text/plain; name="%(log_name)s"; charset=UTF-8 Content-disposition: attachment; filename="%(log_name)s" Content-transfer-encoding: utf-8 %(log_content)s''' % dict( boundary=boundary, hostname=hostname, recipients=recipients, status=('KO (%s tests failed)' % failures) if failures > 0 else 'OK', date=now.strftime('%Y-%m-%d %H:%M:%S'), report=html_report, log_name=log_name, log_content=log_content, system_domain=system_domain, system_type=system_type, ) p = subprocess.Popen(['/usr/sbin/sendmail', '-t'], stdin=subprocess.PIPE, stdout=sys.stdout.stream, stderr=sys.stderr.stream) p.communicate(input=mail.encode('utf-8')) if p.returncode != 0: log('Failed to send email.') return 1 else: log('Email sent to: %s' % recipients) exit_code = 1 if failures > 0 else 0 return exit_code if __name__ == '__main__': Tester(*sys.argv[1:])