Something went wrong on our end
-
Stéphane Diemer authoredStéphane Diemer authored
tester.py 8.66 KiB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Script to start tests and to manage their results
'''
from io import StringIO
import datetime
import os
import re
import subprocess
import sys
import uuid
import utils
from utils import log
class Logger(object):
def __init__(self, stream, log_buffer):
self.stream = stream
self.log_buffer = log_buffer
def write(self, text):
self.stream.write(text)
self.stream.flush()
self.log_buffer.write(text)
self.log_buffer.flush()
def flush(self):
pass
log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout
class Tester():
USAGE = '''%s [-ms] [-e] [-d] [-h]
-ms: run MediaServer tests in addition of standard tests.
-e: send email with report.
-d: debug mode (can be started with non root users).
-h: show this message.''' % __file__
def __init__(self, *args):
log('\033[96m-------------------------------\033[0m')
log('\033[96m- UbiCast applications tester -\033[0m')
log('\033[96m-------------------------------\033[0m')
args = list(args)
# Check if help is required
if '-h' in args:
log('USAGE: ' + self.USAGE)
sys.exit(0)
# Check current dir
root_dir = utils.get_dir(__file__)
if root_dir != '':
os.chdir(root_dir)
self.root_dir = root_dir
# Add to python path
if root_dir not in sys.path:
sys.path.append(root_dir)
# Check that this script is run by root
debug = '-d' in args
whoami = subprocess.check_output(['whoami']).decode('utf-8').strip()
if whoami != 'root' and not debug:
log('This script should be run as root user.')
sys.exit(1)
# Load conf
conf = utils.load_conf()
if not conf:
log('No configuration loaded.')
sys.exit(1)
# Check for email value
email = '-e' in args
ms = '-ms' in args
tests = self.discover_tests(ms)
if not tests:
sys.exit(1)
exit_code = self.run_tests(tests, email)
sys.exit(exit_code)
def get_file_description(self, path):
with open(path, 'r') as fo:
content = fo.read()
description = ''
if path.endswith('.py'):
start = content.find('\'\'\'')
if start > 0:
start += 3
end = content.find('\'\'\'', start)
if end > 0:
description = content[start:end]
else:
for line in content.split('\n'):
if line.startswith('#!'):
continue
elif line.startswith('#'):
description += line[1:].strip() + '\n'
else:
break
return description.strip()
def discover_tests(self, ms=False):
tests = list()
# Get standard tests
path = os.path.join(self.root_dir, 'tests')
if not os.path.isdir(path):
log('The tests dir is missing ("%s").' % path)
return tests
names = os.listdir(path)
names.sort()
if not names:
log('The tests dir is empty ("%s").' % path)
return tests
for name in names:
test_path = os.path.join(path, name)
if os.path.isfile(test_path):
description = self.get_file_description(test_path)
tests.append((name, description, [test_path]))
# Get MediaServer tests
if ms:
# Clone testing suite
ms_path = os.path.join(path, 'ms-testing-suite')
if os.path.exists(ms_path):
log('Updating ms-testing-suite in "%s".' % ms_path)
os.chdir(ms_path)
subprocess.check_call(['git', 'pull'])
os.chdir(self.root_dir)
else:
log('Cloning ms-testing-suite in "%s".' % ms_path)
subprocess.check_call(['git', 'clone', 'https://git.ubicast.net/mediaserver/ms-testing-suite.git', ms_path])
ms_tests = ['ms_vod_tester.py']
# Get MS instances
users = list()
for user in os.listdir('/home'):
if os.path.exists('/home/%s/msinstance' % user):
users.append(user)
# check that Wowza is installed
wowza_dir = '/usr/local/WowzaStreamingEngine'
if not os.path.exists(wowza_dir):
log('Wowza is not installed ("%s" does not exist), the live test will not be run.' % wowza_dir)
else:
ms_tests.append('ms_live_tester.py')
# Add tests to list
for user in users:
for name in ms_tests:
test_path = os.path.join(ms_path, name)
description = self.get_file_description(test_path)
tests.append(('%s (%s)' % (name, user), description, [test_path, user]))
return tests
def run_tests(self, tests, email=False):
results = list()
exit_code = 0
# Run all tests
for name, description, command in tests:
log('\033[1;95m-- Test "%s" --\033[0;0m' % name)
# Run test
try:
p = subprocess.Popen(command, stdin=sys.stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if out:
log(out.decode('utf-8').strip())
if err:
log(err.decode('utf-8').strip())
if p.returncode != 0:
raise Exception('Command exited with code %s.' % p.returncode)
except Exception as e:
exit_code = 1
log(e)
results.append((name, description, command, False))
else:
results.append((name, description, command, True))
# Display results
log('\nTests results:')
html_report = '<table border="1">'
html_report += '\n<tr><th>Test</th><th>Result</th><th>Description</th></tr>'
for name, description, command, success in results:
if success:
html_result = '<span style="color: green;">success</span>'
term_result = '\033[92msuccess\033[0m'
else:
html_result = '<span style="color: red;">failure</span>'
term_result = '\033[91mfailure\033[0m'
log(' %s: %s' % (name, term_result))
html_report += '\n<tr><td>%s</td><td>%s</td><td>%s</td></tr>' % (name, html_result, description.replace('\n', '<br/>\n'))
html_report += '\n</table>'
# Send email
if email:
log('')
hostname = subprocess.check_output(['hostname'])
if not hostname:
log('Failed to get hostname (required to send email).')
return 1
hostname = hostname.decode('utf-8').strip()
recipients = utils.get_conf('EMAIL_ADMINS')
if not recipients:
log('No recipients defined for email sending. Set a value for EMAIL_ADMINS.')
return 1
boundary = str(uuid.uuid4())
now = datetime.datetime.utcnow()
log_content = re.sub(r'\033\[[\d;]+m', '', log_buffer.getvalue())
mail = '''From: %(hostname)s <noreply@ubicast.eu>
To: %(recipients)s
Subject: UbiCast application test: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"
--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8
<b>Date: %(date)s UTC</b><br/><br/>
%(report)s
--%(boundary)s
Content-type: text/plain; name="%(log_name)s"; charset=UTF-8
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: utf-8
%(log_content)s''' % dict(
boundary=boundary,
hostname=hostname,
recipients=recipients,
status='OK' if exit_code == 0 else 'KO',
date=now.strftime('%Y-%m-%d %H:%M:%S'),
log_name='results_%s_%s.txt' % (hostname, now.strftime('%Y-%m-%d_%H-%M-%S')),
report=html_report,
log_content=log_content,
)
p = subprocess.Popen(['sendmail', '-t'], stdin=subprocess.PIPE, stdout=sys.stdout.stream, stderr=sys.stderr.stream)
p.communicate(input=mail.encode('utf-8'))
if p.returncode != 0:
log('Failed to send email.')
return 1
else:
log('Email sent to: %s' % recipients)
return exit_code
if __name__ == '__main__':
Tester(*sys.argv[1:])