Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Script to start tests and to manage their results
'''
from io import StringIO
import datetime
import subprocess
import uuid
import utils
from utils import log
class Logger(object):
def __init__(self, stream, log_buffer):
self.stream = stream
self.log_buffer = log_buffer
def write(self, text):
self.stream.write(text)
self.stream.flush()
self.log_buffer.write(text)
self.log_buffer.flush()
def flush(self):
pass
log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout

Stéphane Diemer
committed
def strip_colors(text):
return re.sub(r'\033\[[\d;]+m', '', text)
class Tester():
USAGE = '''%s [-e] [-f] [-b] [-d] [-h]
-e: send email with report.
-f: send email with report only if at least one test failed.
-b: run only basic tests (exclude mediaserver tests).
-d: debug mode (can be started with non root users).
-h: show this message.''' % __file__
VALID_ARGS = ['-e', '-f', '-b', '-d', '-h']
MAX_LOG_FILES = 50
NO_MAIL_FAILURES_COUNT = 30
MAX_INSTANCES = 2
def __init__(self, *args):
log('\033[96m-------------------------------\033[0m')
log('\033[96m- UbiCast applications tester -\033[0m')
log('\033[96m-------------------------------\033[0m')
args = list(args)
# Check if help is required
if '-h' in args:
log('USAGE: ' + self.USAGE)
sys.exit(0)
for arg in args:
if arg not in self.VALID_ARGS:
log('Invalid argument given: "%s".\n' % arg)
log('USAGE: ' + self.USAGE)
sys.exit(1)
# Check current dir
root_dir = utils.get_dir(__file__)
if root_dir != '':
os.chdir(root_dir)
self.root_dir = root_dir
# Add to python path
if root_dir not in sys.path:
sys.path.append(root_dir)
# Check that this script is run by root
debug = '-d' in args
whoami = subprocess.check_output(['whoami']).decode('utf-8').strip()
log('This script should be run as root user.')
sys.exit(1)
# Load conf
conf = utils.load_conf()
if not conf:
log('No configuration loaded.')
sys.exit(1)
# Check for email value
email = '-e' in args
email_if_fail = '-f' in args
basic_only = '-b' in args
tests = self.discover_tests(basic_only)
exit_code = self.run_tests(tests, email, email_if_fail)
sys.exit(exit_code)

Stéphane Diemer
committed
def parse_file_header(self, path):
with open(path, 'r') as fo:
content = fo.read()
description = ''
if path.endswith('.py'):
start = content.find('\'\'\'')
if start > 0:
start += 3
end = content.find('\'\'\'', start)
if end > 0:
description = content[start:end]
else:
for line in content.split('\n'):
if line.startswith('#!'):
continue
elif line.startswith('#'):
description += line[1:].strip() + '\n'
else:
break

Stéphane Diemer
committed
description = description.strip()
if description.startswith('Criticality:'):
criticality, *description = description.split('\n')
criticality = criticality[len('Criticality:'):].strip()
description = '\n'.join(description)
else:
criticality = 'not specified'
return criticality, description
def discover_tests(self, basic_only=False):
path = os.path.join(self.root_dir, 'tests')
if not os.path.isdir(path):
log('The tests dir is missing ("%s").' % path)
names = os.listdir(path)
names.sort()
if not names:
log('The tests dir is empty ("%s").' % path)
return tests
for name in names:
test_path = os.path.join(path, name)

Stéphane Diemer
committed
criticality, description = self.parse_file_header(test_path)
tests.append((name, criticality, description, [test_path]))
if basic_only:
return tests
# Get MS instances
ms_users = list()
for user in os.listdir('/home'):
if os.path.exists('/home/%s/msinstance' % user):
ms_users.append(user)
if len(ms_users) > self.MAX_INSTANCES:
ms_users = ms_users[:self.MAX_INSTANCES]
# Clone testing suite
ms_path = os.path.join(path, 'ms-testing-suite')
if os.path.exists(ms_path):
log('Updating ms-testing-suite in "%s".' % ms_path)
os.chdir(ms_path)
subprocess.check_call(['git', 'pull'])
os.chdir(self.root_dir)
else:
log('Cloning ms-testing-suite in "%s".' % ms_path)
subprocess.check_call(['git', 'clone', 'https://git.ubicast.net/mediaserver/ms-testing-suite.git', ms_path])
# Add tests to list
wowza_dir = '/usr/local/WowzaStreamingEngine'
etc_lives_conf = '/etc/mediaserver/lives_conf.py'
local_lives_conf = '/home/%s/msinstance/conf/lives_conf.py'
for user in ms_users:
ms_tests = ['ms_vod_tester.py', 'test_caches.py']
# Check if live tests should be started
if os.path.exists(wowza_dir) or os.path.exists(etc_lives_conf) or os.path.exists(local_lives_conf % user):
ms_tests.append('test_wowza_secure.py')
ms_tests.append('ms_live_tester.py')
test_path = os.path.join(ms_path, name)

Stéphane Diemer
committed
criticality, description = self.parse_file_header(test_path)
tests.append(('%s (%s)' % (name, user), criticality, description, [test_path, user]))
def run_tests(self, tests, email=False, email_if_fail=False):
# Run all tests

Stéphane Diemer
committed
report_rows = [('Test', 'Criticality', 'Result', 'Duration', 'Description')]
report_rows_length = [len(t) for t in report_rows[0]]
for name, criticality, description, command in tests:
log('\033[1;95m-- Test "%s" --\033[0;0m' % name)
start_date = datetime.datetime.utcnow()
log('Test start: %s UTC.' % start_date.strftime('%Y-%m-%d %H:%M:%S'))
p = subprocess.Popen(command, stdin=sys.stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if out:
log(out.decode('utf-8').strip())
if err:
log(err.decode('utf-8').strip())
if p.returncode == 0:

Stéphane Diemer
committed
status = '\033[92msuccess\033[0m'

Stéphane Diemer
committed
status = '\033[94mnot testable\033[0m'
elif p.returncode == 3:
status = '\033[93mwarning\033[0m'

Stéphane Diemer
committed
status = '\033[91mfailure\033[0m'
log('Command exited with code %s.' % p.returncode)

Stéphane Diemer
committed
# Get duration
end_date = datetime.datetime.utcnow()
duration = end_date - start_date
if total_duration:
total_duration += duration
else:
total_duration = duration
log('Test end: %s UTC (duration: %s).' % (end_date.strftime('%Y-%m-%d %H:%M:%S'), duration))

Stéphane Diemer
committed
# Prepare report
report_rows.append((name, criticality, status, str(duration), description))
report_rows_length = [max(len(strip_colors(t)), report_rows_length[i]) for i, t in enumerate(report_rows[-1])]
exit_code = 1 if failures > 0 else 0
log('\nTests results:')

Stéphane Diemer
committed
log_report = ''
for row in report_rows:
if not log_report:
log_report += '-' * 50
for i, val in enumerate(row):
if i == len(row) - 1:
break
if i == 0:
# merge name and description
log_report += '\n\033[96m%s\033[0m \033[37m%s\033[0m\n' % (val, row[-1])
else:
nb_sp = report_rows_length[i] - len(strip_colors(val))
log_report += ' %s%s' % (val, ' ' * nb_sp)
log_report += '\n' + '-' * 50
log(log_report.strip())
log('Total tests duration: %s.\n' % total_duration)
# results as html
html_report = ''
for row in report_rows:
html_cell = 'th' if not html_report else 'td'

Stéphane Diemer
committed
html_report += '\n <tr>'
for i, val in enumerate(row):
html_report += ' <%s>%s</%s>' % (html_cell, val, html_cell)
html_report += ' </tr>'
html_report = '<table border="1">%s\n</table>' % html_report
html_report = html_report.replace('\033[90m', '<span style="color: gray;">')

Stéphane Diemer
committed
html_report = html_report.replace('\033[91m', '<span style="color: red;">')
html_report = html_report.replace('\033[92m', '<span style="color: green;">')
html_report = html_report.replace('\033[93m', '<span style="color: orange;">')
html_report = html_report.replace('\033[94m', '<span style="color: blue;">')
html_report = html_report.replace('\033[95m', '<span style="color: purple;">')

Stéphane Diemer
committed
html_report = html_report.replace('\033[0m', '</span>')
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# Store locally results
now = datetime.datetime.utcnow()
log_dir = os.path.join(self.root_dir, 'log')
if not os.path.exists(log_dir):
os.makedirs(log_dir)
history_file = os.path.join(log_dir, 'tests_history.txt')
add_header = not os.path.exists(history_file)
with open(history_file, 'a') as fo:
if add_header:
fo.write('Date | Result | Succeeded | Failed | Not testable\n')
fo.write('%s | %s | %s | %s | %s\n' % (now.strftime('%Y-%m-%d %H:%M:%S'), 'OK' if exit_code == 0 else 'KO', successes, failures, len(tests) - successes - failures))
# Search for old logs to remove
names = os.listdir(log_dir)
names.sort()
for name in list(names):
if not name.startswith('results_'):
names.remove(name)
while len(names) > self.MAX_LOG_FILES - 1:
name = names.pop(0)
try:
log('Removing old log "%s".' % os.path.join(log_dir, name))
os.remove(os.path.join(log_dir, name))
except Exception as e:
log('Failed to remove old log: %s' % e)
# Write log to file
hostname = subprocess.check_output(['hostname'])
if hostname:
hostname = hostname.decode('utf-8').strip()
else:
log('Failed to get hostname (required to send email).')
log_name = 'results_%s_%s.txt' % (hostname or 'noname', now.strftime('%Y-%m-%d_%H-%M-%S'))

Stéphane Diemer
committed
log_content = strip_colors(log_buffer.getvalue())
with open(os.path.join(log_dir, log_name), 'w') as fo:
fo.write(log_content)
# Send email
send_email = False
if hostname:
if email:
send_email = True
elif email_if_fail and exit_code != 0:
# if they were too many consecutive failures, do not send the email
with open(history_file, 'r') as fo:
history_content = fo.read()
lines = history_content.split('\n')
lines.reverse()
consecutive_failures = 0
for line in lines:
if line:
if 'KO' in line:
consecutive_failures += 1
else:
break
if consecutive_failures < self.NO_MAIL_FAILURES_COUNT:
log('Consecutive tester failures: %s.' % consecutive_failures)
send_email = True
else:
log('Too many consecutive tester failures: %s, no email will be sent.' % consecutive_failures)
if send_email:
recipients = utils.get_conf('EMAIL_ADMINS')
if not recipients:
log('No recipients defined for email sending. Set a value for EMAIL_ADMINS.')
return 1
boundary = str(uuid.uuid4())
mail = '''From: %(hostname)s <noreply@ubicast.eu>
To: %(recipients)s
Subject: UbiCast application test: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"
--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8
<b>Date: %(date)s UTC</b><br/><br/>
%(report)s
--%(boundary)s
Content-type: text/plain; name="%(log_name)s"; charset=UTF-8
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: utf-8
boundary=boundary,
recipients=recipients,
status='OK' if exit_code == 0 else 'KO',
date=now.strftime('%Y-%m-%d %H:%M:%S'),
report=html_report,
)
p = subprocess.Popen(['sendmail', '-t'], stdin=subprocess.PIPE, stdout=sys.stdout.stream, stderr=sys.stderr.stream)
p.communicate(input=mail.encode('utf-8'))
if p.returncode != 0:
log('Failed to send email.')
return 1
else:
log('Email sent to: %s' % recipients)
return exit_code
if __name__ == '__main__':
Tester(*sys.argv[1:])