Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Script to start tests and to manage their results
'''
from io import StringIO
import datetime
import subprocess
import uuid
import glob
import utils
from utils import log
OUT_OF_SUPPORT_TEXT = '''\033[93mWarning:
The system is out of support, UbiCast will not be notified if errors are detected.
Please contact UbiCast sales team (sales@ubicast.eu) to renew the support contract.\033[0m'''
class Logger(object):
def __init__(self, stream, log_buffer):
self.stream = stream
self.log_buffer = log_buffer
def write(self, text):
self.stream.write(text)
self.stream.flush()
self.log_buffer.write(text)
self.log_buffer.flush()
def flush(self):
pass
log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout

Stéphane Diemer
committed
def strip_colors(text):
return re.sub(r'\033\[[\d;]+m', '', text)
def escape(text):
html = text.strip()
html = html.replace('<', '<')
html = html.replace('>', '>')
html = html.replace('\033[90m', '<span style="color: gray;">')
html = html.replace('\033[91m', '<span style="color: red;">')
html = html.replace('\033[92m', '<span style="color: green;">')
html = html.replace('\033[93m', '<span style="color: orange;">')
html = html.replace('\033[94m', '<span style="color: blue;">')
html = html.replace('\033[95m', '<span style="color: purple;">')
html = strip_colors(html)
return html
def raid_idle():
idle = True
devs = glob.glob('/sys/block/md*/md/sync_action')
for d in devs:
with open(d, 'r') as f:
sync_state = f.read().strip()
if sync_state != "idle":
idle = False
print("State in %s is %s" % (d, sync_state))
return idle
class Tester():
USAGE = '''%s [-e] [-f] [-b] [-n] [-d] [-h] [msuser]
-e: send email with report.
-f: send email with report only if at least one test failed.
-b: run only basic tests (exclude mediaserver tests).
-n: do not update envsetup repository.
-d: debug mode (can be started with non root users).
-h: show this message.''' % __file__
VALID_ARGS = ['-e', '-f', '-b', '-n', '-d', '-h']
def __init__(self, *args):
log('\033[96m-------------------------------\033[0m')
log('\033[96m- UbiCast applications tester -\033[0m')
log('\033[96m-------------------------------\033[0m')
args = list(args)
# Check if help is required
if '-h' in args:
log('USAGE: ' + self.USAGE)
sys.exit(0)
for arg in args:
if arg.startswith('-'):
if arg not in self.VALID_ARGS:
log('Invalid argument given: "%s".\n' % arg)
log('USAGE: ' + self.USAGE)
sys.exit(1)
log('Optional target user : %s' % arg)
if not os.path.isdir(os.path.join('/home', arg)):
log('Mediaserver user %s does not exist' % arg)
sys.exit(1)
else:
msuser = arg
# Check current dir
root_dir = utils.get_dir(__file__)
if root_dir != '':
os.chdir(root_dir)
self.root_dir = root_dir
# Add to python path
if root_dir not in sys.path:
sys.path.append(root_dir)
# Check that this script is run by root
debug = '-d' in args
whoami = subprocess.check_output(['whoami']).decode('utf-8').strip()
log('This script should be run as root user.')
sys.exit(1)
if '-n' not in args:
tester_path = os.path.join(root_dir, os.path.basename(__file__))
mtime = os.path.getmtime(tester_path)
subprocess.call(['python3', 'update_envsetup.py'])
if mtime != os.path.getmtime(tester_path):
log('The script has changed, restarting it...')
os.execl('/usr/bin/python3', 'python3', tester_path, '-n', *args)
# Load conf
conf = utils.load_conf()
if not conf:
log('No configuration loaded.')
sys.exit(1)
# Check for email value
email = '-e' in args
email_if_fail = '-f' in args
basic_only = '-b' in args
tests = self.discover_tests(basic_only, msuser)
if raid_idle():
exit_code = self.run_tests(tests, email, email_if_fail)
else:
print('A RAID check or operation is in progress, aborting tests')
exit_code = 1
sys.exit(exit_code)

Stéphane Diemer
committed
def parse_file_header(self, path):
with open(path, 'r') as fo:
content = fo.read()
description = ''
if path.endswith('.py'):
start = (
content.find('\'\'\'')
if content.find('\'\'\'') != -1
else content.find('"""')
)
if start > 0:
start += 3
end = (
content.find('\'\'\'', start)
if content.find('\'\'\'', start) != -1
else content.find('"""', start)
)
description = content[start:end]
else:
for line in content.split('\n'):
if line.startswith('#!'):
continue
elif line.startswith('#'):
description += line[1:].strip() + '\n'
else:
break

Stéphane Diemer
committed
description = description.strip()
if description.startswith('Criticality:'):
criticality, *description = description.split('\n')
criticality = criticality[len('Criticality:'):].strip()
description = '\n'.join(description)
else:
criticality = 'not specified'
return criticality, description
def discover_tests(self, basic_only=False, msuser=None):
ignored_tests = utils.get_conf('TESTER_IGNORED_TESTS', '').split(',')
path = os.path.join(self.root_dir, 'tests')
if not os.path.isdir(path):
log('The tests dir is missing ("%s").' % path)
names = os.listdir(path)
names.sort()
if not names:
log('The tests dir is empty ("%s").' % path)
return
criticalities_map = {
'Low': 1,
'Normal': 2,
'High': 3,
}
tests = list()
if name in ignored_tests:
continue

Stéphane Diemer
committed
criticality, description = self.parse_file_header(test_path)
tests.append((name, criticality, description, [test_path]))
tests.sort(key=lambda i: (-criticalities_map.get(i[1], 0), i[0]))
elif msuser:
tests = list()
# Get MS instances
ms_users = list()
for user in os.listdir('/home'):
if os.path.exists('/home/%s/msinstance' % user) and (not msuser or user == msuser):
ms_users.sort()
cleaned_list = list()
instances_to_test = utils.get_conf('TESTER_MS_INSTANCES', '').split(',')
if instances_to_test:
for val in instances_to_test:
val = val.strip()
if not val:
continue
if val in ms_users:
cleaned_list.append(val)
else:
log('An inexisting instance has been requested for tests: "%s".' % val)
if cleaned_list:
ms_users = cleaned_list
else:
try:
max_instances = int(utils.get_conf('TESTER_MAX_INSTANCES') or 2)
except Exception as e:
log('TESTER_MAX_INSTANCES has an invalid value: %s' % e)
max_instances = 2
if len(ms_users) > max_instances:
ms_users = ms_users[:max_instances]
log('Instances that will be tested: %s.' % ', '.join(ms_users))
# Clone testing suite
ms_path = os.path.join(path, 'ms-testing-suite')
if not os.path.exists(ms_path):
log('Cloning ms-testing-suite in "%s".' % ms_path)
subprocess.call(['git', 'clone', '--recursive', 'https://panel.ubicast.eu/git/mediaserver/ms-testing-suite.git', ms_path])
if os.path.exists(ms_path):
log('Updating ms-testing-suite in "%s".' % ms_path)
os.chdir(ms_path)
subprocess.call(['git', 'pull', '--recurse-submodules'])
subprocess.call(['git', 'submodule', 'update', '--init', '--recursive'])
os.chdir(self.root_dir)
wowza_dir = '/usr/local/WowzaStreamingEngine'
etc_lives_conf = '/etc/mediaserver/lives_conf.py'
local_lives_conf = '/home/%s/msinstance/conf/lives_conf.py'
for user in ms_users:
ms_tests = ['ms_vod_tester.py', 'test_caches.py']
# Check if live tests should be started
if os.path.exists(wowza_dir) or os.path.exists(etc_lives_conf) or os.path.exists(local_lives_conf % user):
ms_tests.append('test_wowza_secure.py')
ms_tests.append('ms_live_tester.py')
if name in ignored_tests:
continue
test_path = os.path.join(ms_path, name)

Stéphane Diemer
committed
criticality, description = self.parse_file_header(test_path)
tests.append(('%s (%s)' % (name, user), criticality, description, [test_path, user]))
tests.sort(key=lambda i: (-criticalities_map.get(i[1], 0), i[0]))
def run_tests(self, tests, email=False, email_if_fail=False):
# Run all tests

Stéphane Diemer
committed
report_rows = [('Test', 'Criticality', 'Result', 'Duration', 'Description')]
report_rows_length = [len(t) for t in report_rows[0]]

Stéphane Diemer
committed
for name, criticality, description, command in tests:
log('\033[1;95m-- Test "%s" --\033[0;0m' % name)
start_date = datetime.datetime.utcnow()
log('Test start: %s UTC.' % start_date.strftime('%Y-%m-%d %H:%M:%S'))
p = subprocess.Popen(command, stdin=sys.stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if out:
out = out.decode('utf-8').strip()
out_of_support = out_of_support or 'out of support' in out
log(out)
err = err.decode('utf-8').strip()
log(err)

Stéphane Diemer
committed
status = '\033[92msuccess\033[0m'

Stéphane Diemer
committed
status = '\033[94mnot testable\033[0m'
elif p.returncode == 3:
status = '\033[93mwarning\033[0m'

Stéphane Diemer
committed
status = '\033[91mfailure\033[0m'
log('Command exited with code %s.' % p.returncode)

Stéphane Diemer
committed
# Get duration
end_date = datetime.datetime.utcnow()
duration = end_date - start_date
if total_duration:
total_duration += duration
else:
total_duration = duration
log('Test end: %s UTC (duration: %s).' % (end_date.strftime('%Y-%m-%d %H:%M:%S'), duration))

Stéphane Diemer
committed
# Prepare report
report_rows.append((name, criticality, status, str(duration), description))
report_rows_length = [max(len(strip_colors(t)), report_rows_length[i]) for i, t in enumerate(report_rows[-1])]
log('\nTests results:')

Stéphane Diemer
committed
log_report = ''
for row in report_rows:
if not log_report:
log_report += '-' * 50
for i, val in enumerate(row):
if i == len(row) - 1:
break
if i == 0:
# merge name and description
log_report += '\n\033[96m%s\033[0m \033[37m%s\033[0m\n' % (val, row[-1])
else:
nb_sp = report_rows_length[i] - len(strip_colors(val))
log_report += ' %s%s' % (val, ' ' * nb_sp)
log_report += '\n' + '-' * 50
if out_of_support:
log_report = OUT_OF_SUPPORT_TEXT + '\n' + log_report
log(log_report.strip())
log('Total tests duration: %s.\n' % total_duration)
# results as html
html_report = ''
for row in report_rows:
html_cell = 'th' if not html_report else 'td'

Stéphane Diemer
committed
html_report += '\n <tr>'
for i, val in enumerate(row):
html_report += ' <%s>%s</%s>' % (html_cell, escape(val), html_cell)

Stéphane Diemer
committed
html_report += ' </tr>'
html_report = '<table border="1">%s\n</table>' % html_report
if out_of_support:
html_report = '<p>' + escape(OUT_OF_SUPPORT_TEXT) + '</p>\n' + html_report
# Store locally results
now = datetime.datetime.utcnow()
log_dir = os.path.join(self.root_dir, 'log')
if not os.path.exists(log_dir):
os.makedirs(log_dir)
history_file = os.path.join(log_dir, 'tests_history.txt')
add_header = not os.path.exists(history_file)
with open(history_file, 'a') as fo:
if add_header:
fo.write('Date | Result | Succeeded | Failed | Not testable\n')
fo.write('%s | %s | %s | %s | %s\n' % (now.strftime('%Y-%m-%d %H:%M:%S'), 'KO' if failures > 0 else 'OK', successes, failures, len(tests) - successes - failures))
# Search for old logs to remove
names = os.listdir(log_dir)
names.sort()
for name in list(names):
if not name.startswith('results_'):
names.remove(name)
while len(names) > self.MAX_LOG_FILES - 1:
name = names.pop(0)
try:
log('Removing old log "%s".' % os.path.join(log_dir, name))
os.remove(os.path.join(log_dir, name))
except Exception as e:
log('Failed to remove old log: %s' % e)
# Write log to file
hostname = subprocess.check_output(['hostname'])
if hostname:
hostname = hostname.decode('utf-8').strip()
else:
log('Failed to get hostname (required to send email).')
log_name = 'results_%s_%s.txt' % (hostname or 'noname', now.strftime('%Y-%m-%d_%H-%M-%S'))

Stéphane Diemer
committed
log_content = strip_colors(log_buffer.getvalue())
with open(os.path.join(log_dir, log_name), 'w') as fo:
fo.write(log_content)
# Send email
send_email = False
if hostname:
if email:
send_email = True
elif email_if_fail and failures > 0:
# if they were too many consecutive failures, do not send the email
with open(history_file, 'r') as fo:
history_content = fo.read()
lines = history_content.split('\n')
lines.reverse()
consecutive_failures = 0
for line in lines:
if line:
if 'KO' in line:
consecutive_failures += 1
else:
break
if consecutive_failures == self.NO_MAIL_FAILURES_COUNT:
consecutive_msg = 'Maximum consecutive tester failures reached (%s).\nNo more emails will be sent.' % consecutive_failures
send_email = True
elif consecutive_failures < self.NO_MAIL_FAILURES_COUNT:
consecutive_msg = 'Consecutive tester failures: %s (will stop sending reports when reaching %s failures).' % (consecutive_failures, self.NO_MAIL_FAILURES_COUNT)
consecutive_msg = 'Too many consecutive tester failures: %s, no email will be sent.' % consecutive_failures
log(consecutive_msg)
html_report += '\n<br/>' + consecutive_msg.replace('\n', '\n<br/>')
recipients = utils.get_conf('EMAIL_ADMINS') or ''
system_domain = utils.get_conf('MS_SERVER_NAME')
system_type = 'MediaServer'
if system_domain == 'mediaserver':
system_domain = utils.get_conf('CM_SERVER_NAME')
system_type = 'CampusManager'
if system_domain == 'campusmanager':
system_domain = utils.get_conf('MONITOR_SERVER_NAME')
system_type = 'CampusManager'
if system_domain == 'monitor':
system_domain = 'Server'
system_type = '-'
if utils.get_conf('PREMIUM_SUPPORT') != '0':
system_domain = '[PREMIUM] %s' % system_domain
if not out_of_support:
recipients += ',premium-support@ubicast.eu'
if out_of_support:
recipients = recipients.replace('sysadmin@ubicast.eu', '').replace(',,', ',')
recipients = recipients.strip(',')
if not recipients:
log('No recipients defined for email sending. Set a value for EMAIL_ADMINS.')
return 1
boundary = str(uuid.uuid4())

Florent Thiery
committed
mail = '''From: %(hostname)s <support@ubicast.eu>
To: %(recipients)s
Subject: %(system_domain)s (%(hostname)s) %(system_type)s health report: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"
--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8
<p><b>Date: %(date)s UTC</b></p>
%(report)s
--%(boundary)s
Content-type: text/plain; name="%(log_name)s"; charset=UTF-8
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: utf-8
boundary=boundary,
recipients=recipients,
status=('KO (%s tests failed)' % failures) if failures > 0 else 'OK',
date=now.strftime('%Y-%m-%d %H:%M:%S'),
report=html_report,
system_domain=system_domain,
system_type=system_type,
p = subprocess.Popen(['/usr/sbin/sendmail', '-t'], stdin=subprocess.PIPE, stdout=sys.stdout.stream, stderr=sys.stderr.stream)
p.communicate(input=mail.encode('utf-8'))
if p.returncode != 0:
log('Failed to send email.')
return 1
else:
log('Email sent to: %s' % recipients)
exit_code = 1 if failures > 0 else 0
return exit_code
if __name__ == '__main__':
Tester(*sys.argv[1:])