#!/usr/bin/env python3 ''' Script to start tests and to manage their results ''' from io import StringIO import argparse import base64 import datetime import glob import os import socket import subprocess import sys import time import uuid from utilities.config import load_conf, get_conf from utilities.logging import strip_colors, escape_html from utilities.os import get_dir from utilities.systemd import check_systemd_setup OUT_OF_SUPPORT_TEXT = '''\033[93mWarning: The system is out of support, UbiCast will not be notified if errors are detected. Please contact UbiCast sales team (sales@ubicast.eu) to renew the support contract.\033[0m''' class Logger(object): ''' Class to duplicate output in a buffer ''' def __init__(self, stream, log_buffer): self.stream = stream self.log_buffer = log_buffer def write(self, text): self.stream.write(text) self.stream.flush() self.log_buffer.write(text) self.log_buffer.flush() def flush(self): pass log_buffer = StringIO() sys.stdout = Logger(sys.stdout, log_buffer) sys.stderr = sys.stdout def raid_idle(): ''' Function to test that the RAID is not running any task ''' idle = True devs = glob.glob('/sys/block/md*/md/sync_action') for d in devs: with open(d, 'r') as f: sync_state = f.read().strip() if sync_state != 'idle': idle = False print('State in %s is %s' % (d, sync_state)) return idle class Tester(): ''' Main tester class ''' MAX_LOG_FILES = 50 def __init__(self): print('\033[96m-------------------------------\033[0m') print('\033[96m- UbiCast applications tester -\033[0m') print('\033[96m-------------------------------\033[0m') # parse args parser = argparse.ArgumentParser(description=__doc__.strip()) parser.add_argument('-d', '--debug', dest='debug', action='store_true', help='Debug mode (can be started with non root users).') parser.add_argument('-e', '--email', dest='send_email', action='store_true', help='Send tests report by email.') parser.add_argument('-b', '--basic', dest='basic_tests', action='store_true', help='Run only basic tests (exclude mediaserver tests).') parser.add_argument('-n', '--no-update', dest='no_update', action='store_true', help='Do not update envsetup repository.') parser.add_argument('-p', '--no-packages', dest='no_packages', action='store_true', help='Do not install packages.') parser.add_argument('msuser', nargs='?', help='The unix user of the MediaServer instance to test. Default is user specified in configuration or all users if not set.') args = parser.parse_args() # Check current dir root_dir = get_dir(__file__) if root_dir != '': os.chdir(root_dir) self.root_dir = root_dir # Add to python path if root_dir not in sys.path: sys.path.append(root_dir) # Check that this script is run by root if os.getuid() != 0 and not args.debug: print('This script should be run as root user.') sys.exit(1) # Update envsetup files if not args.no_update: tester_path = os.path.join(root_dir, os.path.basename(__file__)) mtime = os.path.getmtime(tester_path) subprocess.run(['python3', 'update_envsetup.py'], timeout=1800) if mtime != os.path.getmtime(tester_path): print('The script has changed, restarting it...') os.execl('/usr/bin/python3', 'python3', tester_path, '-n', *sys.argv[1:]) sys.exit(1) # not reachable # Install utilities packages if not args.no_packages: subprocess.run(['python3', 'pkgs_envsetup.py'], timeout=1800) # Load conf conf = load_conf() if not conf: print('No configuration loaded.') sys.exit(1) # Check RAID status if not raid_idle(): print('A RAID check or operation is in progress, aborting tests') sys.exit(1) # Get tests to run tests = self.discover_tests(args.basic_tests, msuser=args.msuser, no_update=args.no_update) if not tests: print('No test to run.') sys.exit(1) # Print system info self.print_system_info() # Create logs dir self.logs_dir = os.path.join(self.root_dir, 'logs') os.makedirs(self.logs_dir, exist_ok=True) print('Logs dir is "%s".' % self.logs_dir) # Check systemd service and timer check_systemd_setup() # Run tests now, failures, out_of_support, log_content, html_report = self.run_tests(tests) if args.send_email: failures += self.send_report_email(now, failures, out_of_support, log_content, html_report) sys.exit(1 if failures > 0 else 0) def print_system_info(self): print('System information:') print('- Date: %s UTC.' % datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')) print('- FQDN: %s.' % socket.getfqdn()) p = subprocess.run(['ip', '-br', 'addr'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=1800) print('- IP config:\n%s' % p.stdout.decode('utf-8')) def get_log_name(self, now): hostname = socket.gethostname() log_name = 'results_%s_%s.log' % ( hostname or 'noname', now.strftime('%Y-%m-%d_%H-%M-%S'), ) return log_name def parse_file_header(self, path): with open(path, 'r') as fo: content = fo.read() description = '' if path.endswith('.py'): start = ( content.find("'''") if content.find("'''") != -1 else content.find('"""') ) if start > 0: start += 3 end = ( content.find("'''", start) if content.find("'''", start) != -1 else content.find('"""', start) ) if end > 0: description = content[start:end] else: for line in content.split('\n'): if line.startswith('#!'): continue elif line.startswith('#'): description += line[1:].strip() + '\n' else: break description = description.strip() if description.startswith('Criticality:'): criticality, *description = description.split('\n') criticality = criticality[len('Criticality:') :].strip() # noqa: E203 description = '\n'.join(description) else: criticality = 'not specified' return criticality, description def discover_tests(self, basic_only=False, msuser=None, no_update=False): ignored_tests = get_conf('TESTER_IGNORED_TESTS', '').split(',') ignored_tests.append('__init__.py') if basic_only: tests = self.discover_basic_tests(ignored_tests) elif msuser: tests = self.discover_mediaserver_tests(msuser, no_update, ignored_tests) else: tests = self.discover_basic_tests(ignored_tests) tests.extend(self.discover_mediaserver_tests(msuser, no_update, ignored_tests)) criticalities_map = {'Low': 1, 'Normal': 2, 'High': 3} tests.sort(key=lambda i: (-criticalities_map.get(i[1], 0), i[0])) return tests def discover_basic_tests(self, ignored_tests=None): # Get standard tests test_dir = os.path.join(self.root_dir, 'scripts') if not os.path.isdir(test_dir): print('The tests dir is missing ("%s").' % test_dir) return list() names = os.listdir(test_dir) names.sort() if not names: print('The tests dir is empty ("%s").' % test_dir) return list() tests = list() for name in names: if ignored_tests and name in ignored_tests: continue test_path = os.path.join(test_dir, name) if os.path.isfile(test_path): criticality, description = self.parse_file_header(test_path) tests.append((name, criticality, description, [test_path], None)) return tests def discover_mediaserver_tests(self, msuser=None, no_update=False, ignored_tests=None): # Get MS instances ms_users = list() for user in os.listdir('/home'): if os.path.exists('/home/%s/msinstance' % user) and ( not msuser or user == msuser ): ms_users.append(user) if not ms_users: return list() ms_users.sort() cleaned_list = list() instances_to_test = get_conf('TESTER_MS_INSTANCES', '').split(',') if instances_to_test: for val in instances_to_test: val = val.strip() if not val: continue if val in ms_users: cleaned_list.append(val) else: print( 'An inexisting instance has been requested for tests: "%s".' % val ) if cleaned_list: ms_users = cleaned_list else: try: max_instances = int(get_conf('TESTER_MAX_INSTANCES') or 2) except Exception as e: print('TESTER_MAX_INSTANCES has an invalid value: %s' % e) max_instances = 2 if len(ms_users) > max_instances: ms_users = ms_users[:max_instances] print('Instances that will be tested: %s.' % ', '.join(ms_users)) # Clone testing suite ms_path = os.path.join(self.root_dir, 'scripts', 'ms-testing-suite') if not os.path.exists(ms_path): print('Cloning ms-testing-suite in "%s".' % ms_path) subprocess.run([ 'git', 'clone', '--recursive', 'https://mirismanager.ubicast.eu/git/mediaserver/ms-testing-suite.git', ms_path, ], timeout=1800) if not os.path.exists(ms_path): print('The ms-testing-suite dir "%s" does not exist, no MediaServer test will be run.' % ms_path) return list() # Update testing suite if allowed if not no_update: print('Updating ms-testing-suite in "%s".' % ms_path) os.chdir(ms_path) branch = get_conf('ENVSETUP_BRANCH') or 'stable' if branch: subprocess.run(['git', 'checkout', branch], timeout=1800) subprocess.run(['git', 'fetch', '--recurse-submodules', '--all'], timeout=1800) subprocess.run(['git', 'reset', '--hard', 'origin/{}'.format(branch)], timeout=1800) subprocess.run(['git', 'pull', '--recurse-submodules'], timeout=1800) subprocess.run(['git', 'submodule', 'update', '--init', '--recursive'], timeout=1800) os.chdir(self.root_dir) # Build tests list print('Add MediaServer tests if available.') wowza_dir = '/usr/local/WowzaStreamingEngine' etc_lives_conf = '/etc/mediaserver/lives.json' local_lives_conf = '/home/%s/msinstance/conf/lives.json' old_etc_lives_conf = '/etc/mediaserver/lives_conf.py' old_local_lives_conf = '/home/%s/msinstance/conf/lives_conf.py' tests = list() for user in ms_users: ms_tests = ['ms_vod_tester.py', 'test_caches.py'] # Check if live tests should be started if ( os.path.exists(wowza_dir) or os.path.exists(etc_lives_conf) or os.path.exists(local_lives_conf % user) or os.path.exists(old_etc_lives_conf) or os.path.exists(old_local_lives_conf % user) ): ms_tests.append('test_wowza_secure.py') ms_tests.append('ms_live_tester.py') ignore_rules = get_conf('TESTER_IGNORE_ROUTING_RULES', '0') for name in ms_tests: if ignored_tests and name in ignored_tests: continue test_path = os.path.join(ms_path, name) if os.path.exists(test_path): criticality, description = self.parse_file_header(test_path) tests.append(( '%s (%s)' % (name, user), criticality, description, [test_path, user], {'IGNORE_ROUTING_RULES': ignore_rules}, )) return tests def run_tests(self, tests): # Run all tests successes = 0 failures = 0 total_duration = None report_rows = [('Test', 'Criticality', 'Result', 'Duration', 'Description')] report_rows_length = [len(t) for t in report_rows[0]] out_of_support = False for name, criticality, description, command, env in tests: print('\033[1;95m-- Test "%s" --\033[0;0m' % name) start_date = datetime.datetime.utcnow() print('Test start: %s UTC.' % start_date.strftime('%Y-%m-%d %H:%M:%S')) # Run test count = 0 returncode = None while count < 3: if count > 0: wait_time = 5 * count * count print('Waiting %s s...' % wait_time) time.sleep(wait_time) count += 1 print('Attempt: %s' % str(count)) test_env = dict(os.environ) if env: test_env.update(env) try: p = subprocess.run( command, env=test_env, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=1800, ) out = p.stdout.decode('utf-8', 'replace').strip() print(out) out_of_support = out_of_support or 'out of support' in out returncode = p.returncode if returncode in (0, 2, 3): break except Exception as e: print('Command failed: %s' % e) returncode = None if returncode == 0: status = '\033[92msuccess\033[0m' successes += 1 elif returncode == 2: status = '\033[94mnot testable\033[0m' elif returncode == 3: status = '\033[93mwarning\033[0m' else: status = '\033[91mfailure\033[0m' failures += 1 print('Command exited with code %s.' % returncode) # Get duration end_date = datetime.datetime.utcnow() duration = end_date - start_date if total_duration: total_duration += duration else: total_duration = duration print( 'Test end: %s UTC (duration: %s).' % (end_date.strftime('%Y-%m-%d %H:%M:%S'), duration) ) # Prepare report report_rows.append((name, criticality, status, str(duration), description)) report_rows_length = [ max(len(strip_colors(t)), report_rows_length[i]) for i, t in enumerate(report_rows[-1]) ] # Display results # results as text print('\nTests results:') log_report = '' for row in report_rows: if not log_report: log_report += '-' * 50 for i, val in enumerate(row): if i == len(row) - 1: break if i == 0: # merge name and description log_report += '\n\033[96m%s\033[0m \033[37m%s\033[0m\n' % ( val, row[-1], ) else: nb_sp = report_rows_length[i] - len(strip_colors(val)) log_report += ' %s%s' % (val, ' ' * nb_sp) log_report += '\n' + '-' * 50 if out_of_support: log_report = OUT_OF_SUPPORT_TEXT + '\n' + log_report print(log_report.strip()) print('Total tests duration: %s.\n' % total_duration) # results as html html_report = '' for row in report_rows: html_cell = 'th' if not html_report else 'td' html_report += '\n <tr>' for i, val in enumerate(row): html_report += ' <%s>%s</%s>' % (html_cell, escape_html(val), html_cell) html_report += ' </tr>' html_report = '<table border="1">%s\n</table>' % html_report if out_of_support: html_report = '<p>' + escape_html(OUT_OF_SUPPORT_TEXT) + '</p>\n' + html_report # Store locally results now = datetime.datetime.utcnow() history_file = os.path.join(self.logs_dir, 'tests_history.txt') add_header = not os.path.exists(history_file) with open(history_file, 'a') as fo: if add_header: fo.write('Date | Result | Succeeded | Failed | Not testable\n') fo.write('%s | %s | %s | %s | %s\n' % ( now.strftime('%Y-%m-%d %H:%M:%S'), 'KO' if failures > 0 else 'OK', successes, failures, len(tests) - successes - failures, )) # Search for old logs to remove names = os.listdir(self.logs_dir) names.sort() for name in list(names): if not name.startswith('results_'): names.remove(name) while len(names) > self.MAX_LOG_FILES - 1: name = names.pop(0) try: print('Removing old log "%s".' % os.path.join(self.logs_dir, name)) os.remove(os.path.join(self.logs_dir, name)) except Exception as e: print('Failed to remove old log: %s' % e) # Write log to file log_content = strip_colors(log_buffer.getvalue()) with open(os.path.join(self.logs_dir, self.get_log_name(now)), 'w') as fo: fo.write(log_content) return now, failures, out_of_support, log_content, html_report def send_report_email(self, now, failures, out_of_support, log_content, html_report): hostname = socket.gethostname() if not hostname: print('Failed to get hostname (required to send email).') return 1 fqdn = socket.getfqdn() log_content_encoding = 'utf-8' # Get sender and recipients recipients = get_conf('EMAIL_ADMINS') or '' system_domain = get_conf('MS_SERVER_NAME') system_type = 'MediaServer' if not system_domain or system_domain == 'mediaserver': system_domain = get_conf('CM_SERVER_NAME') system_type = 'MirisManager' if not system_domain or system_domain == 'mirismanager': system_domain = get_conf('MONITOR_SERVER_NAME') system_type = 'Server' if not system_domain or system_domain == 'monitor': system_domain = fqdn if '.' in system_domain: top_domain = '.'.join(system_domain.split('.')[-2:]) elif '.' in fqdn: top_domain = '.'.join(fqdn.split('.')[-2:]) else: top_domain = system_domain + '.local' sender = hostname + '@' + top_domain print('Sender address: %s' % sender) # Prepare email contant if out_of_support: system_domain = '[OUT OF SUPPORT] %s' % system_domain recipients = recipients.replace('sysadmin@ubicast.eu', '').replace( ',,', ',' ) elif get_conf('PREMIUM_SUPPORT') != '0': system_domain = '[PREMIUM] %s' % system_domain recipients = recipients.replace('sysadmin@ubicast.eu', '').replace( ',,', ',' ) recipients += ',sysadmin+premium@ubicast.eu' recipients = recipients.strip(',') if not recipients: print('No recipients defined for email sending. Set a value for EMAIL_ADMINS.') return 0 boundary = str(uuid.uuid4()) if get_conf('TESTER_BASE64_ATTACH') != '0': log_content_encoding = 'base64' log_content = base64.b64encode(log_content.encode('utf-8')).decode() mail = '''From: %(hostname)s <%(sender)s> To: %(recipients)s Subject: %(system_domain)s (%(hostname)s) %(system_type)s health report: %(status)s Mime-Version: 1.0 Content-type: multipart/related; boundary="%(boundary)s" --%(boundary)s Content-Type: text/html; charset=UTF-8 Content-transfer-encoding: utf-8 <p><b>Date: %(date)s UTC</b></p> <p>FQDN: %(fqdn)s</p> %(report)s --%(boundary)s Content-type: text/plain; name="%(log_name)s"; charset=UTF-8 Content-disposition: attachment; filename="%(log_name)s" Content-transfer-encoding: %(log_content_encoding)s %(log_content)s''' % dict( boundary=boundary, sender=sender, hostname=hostname, recipients=recipients, status=('KO (%s tests failed)' % failures) if failures > 0 else 'OK', date=now.strftime('%Y-%m-%d %H:%M:%S'), fqdn=fqdn, report=html_report, log_name=self.get_log_name(now).replace('.log', '.txt'), log_content_encoding=log_content_encoding, log_content=log_content, system_domain=system_domain, system_type=system_type, ) # Send email p = subprocess.Popen( ['/usr/sbin/sendmail', '-t'], stdin=subprocess.PIPE, stdout=sys.stdout.stream, stderr=sys.stderr.stream, ) p.communicate(input=mail.encode('utf-8'), timeout=1800) if p.returncode != 0: print('Failed to send email.') return 1 else: print('Email sent to: %s' % recipients) return 0 if __name__ == '__main__': Tester()