Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Script to start tests and to manage their results
'''
from io import StringIO
import datetime
import subprocess
import uuid
import utils
from utils import log
class Logger(object):
def __init__(self, stream, log_buffer):
self.stream = stream
self.log_buffer = log_buffer
def write(self, text):
self.stream.write(text)
self.stream.flush()
self.log_buffer.write(text)
self.log_buffer.flush()
def flush(self):
pass
log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout
class Tester():
USAGE = '''%s [-e] [-b] [-d] [-h]
-e: send email with report.
-b: run only basic tests (exclude mediaserver tests).
-d: debug mode (can be started with non root users).
-h: show this message.''' % __file__
def __init__(self, *args):
log('\033[96m-------------------------------\033[0m')
log('\033[96m- UbiCast applications tester -\033[0m')
log('\033[96m-------------------------------\033[0m')
args = list(args)
# Check if help is required
if '-h' in args:
log('USAGE: ' + self.USAGE)
sys.exit(0)
# Check current dir
root_dir = utils.get_dir(__file__)
if root_dir != '':
os.chdir(root_dir)
self.root_dir = root_dir
# Add to python path
if root_dir not in sys.path:
sys.path.append(root_dir)
# Check that this script is run by root
debug = '-d' in args
whoami = subprocess.check_output(['whoami']).decode('utf-8').strip()
log('This script should be run as root user.')
sys.exit(1)
# Load conf
conf = utils.load_conf()
if not conf:
log('No configuration loaded.')
sys.exit(1)
# Check for email value
email = '-e' in args
basic_only = '-b' in args
tests = self.discover_tests(basic_only)
if not tests:
sys.exit(1)
exit_code = self.run_tests(tests, email)
sys.exit(exit_code)
def get_file_description(self, path):
with open(path, 'r') as fo:
content = fo.read()
description = ''
if path.endswith('.py'):
start = content.find('\'\'\'')
if start > 0:
start += 3
end = content.find('\'\'\'', start)
if end > 0:
description = content[start:end]
else:
for line in content.split('\n'):
if line.startswith('#!'):
continue
elif line.startswith('#'):
description += line[1:].strip() + '\n'
else:
break
return description.strip()
def discover_tests(self, basic_only=False):
path = os.path.join(self.root_dir, 'tests')
if not os.path.isdir(path):
log('The tests dir is missing ("%s").' % path)
names = os.listdir(path)
names.sort()
if not names:
log('The tests dir is empty ("%s").' % path)
return tests
for name in names:
test_path = os.path.join(path, name)
if os.path.isfile(test_path):
description = self.get_file_description(test_path)
tests.append((name, description, [test_path]))
if basic_only:
return tests
# Get MS instances
ms_users = list()
for user in os.listdir('/home'):
if os.path.exists('/home/%s/msinstance' % user):
ms_users.append(user)
# Clone testing suite
ms_path = os.path.join(path, 'ms-testing-suite')
if os.path.exists(ms_path):
log('Updating ms-testing-suite in "%s".' % ms_path)
os.chdir(ms_path)
subprocess.check_call(['git', 'pull'])
os.chdir(self.root_dir)
else:
log('Cloning ms-testing-suite in "%s".' % ms_path)
subprocess.check_call(['git', 'clone', 'https://git.ubicast.net/mediaserver/ms-testing-suite.git', ms_path])
# Add tests to list
wowza_dir = '/usr/local/WowzaStreamingEngine'
etc_lives_conf = '/etc/mediaserver/lives_conf.py'
local_lives_conf = '/home/%s/msinstance/conf/lives_conf.py'
for user in ms_users:
ms_tests = ['ms_vod_tester.py']
# Check if live tests should be started
if os.path.exists(wowza_dir) or os.path.exists(etc_lives_conf) or os.path.exists(local_lives_conf % user):
ms_tests.append('ms_live_tester.py')
test_path = os.path.join(ms_path, name)
description = self.get_file_description(test_path)
tests.append(('%s (%s)' % (name, user), description, [test_path, user]))
return tests
def run_tests(self, tests, email=False):
results = list()
exit_code = 0
# Run all tests
for name, description, command in tests:
log('\033[1;95m-- Test "%s" --\033[0;0m' % name)
start_date = datetime.datetime.utcnow()
log('Test start: %s UTC.' % start_date.strftime('%Y-%m-%d %H:%M:%S'))
p = subprocess.Popen(command, stdin=sys.stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if out:
log(out.decode('utf-8').strip())
if err:
log(err.decode('utf-8').strip())
if p.returncode == 0:
success = True
elif p.returncode == 2:
success = None
success = False
exit_code = 1
log('Command exited with code %s.' % p.returncode)
end_date = datetime.datetime.utcnow()
log('Test end: %s UTC (duration: %s).' % (end_date.strftime('%Y-%m-%d %H:%M:%S'), end_date - start_date))
results.append((name, description, command, success))
log('\nTests results:')
html_report = '<table border="1">'
html_report += '\n<tr><th>Test</th><th>Result</th><th>Description</th></tr>'
for name, description, command, success in results:
if success is None:
html_result = '<span style="color: blue;">not testable</span>'
term_result = '\033[94mnot testable\033[0m'
elif success:
html_result = '<span style="color: green;">success</span>'
term_result = '\033[92msuccess\033[0m'
else:
html_result = '<span style="color: red;">failure</span>'
term_result = '\033[91mfailure\033[0m'
log(' %s: %s' % (name, term_result))
html_report += '\n<tr><td>%s</td><td>%s</td><td>%s</td></tr>' % (name, html_result, description.replace('\n', '<br/>\n'))
html_report += '\n</table>'
# Send email
if email:
log('')
hostname = subprocess.check_output(['hostname'])
if not hostname:
log('Failed to get hostname (required to send email).')
return 1
recipients = utils.get_conf('EMAIL_ADMINS')
if not recipients:
log('No recipients defined for email sending. Set a value for EMAIL_ADMINS.')
return 1
boundary = str(uuid.uuid4())
now = datetime.datetime.utcnow()
log_content = re.sub(r'\033\[[\d;]+m', '', log_buffer.getvalue())
mail = '''From: %(hostname)s <noreply@ubicast.eu>
To: %(recipients)s
Subject: UbiCast application test: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"
--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8
<b>Date: %(date)s UTC</b><br/><br/>
%(report)s
--%(boundary)s
Content-type: text/plain; name="%(log_name)s"; charset=UTF-8
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: utf-8
boundary=boundary,
recipients=recipients,
status='OK' if exit_code == 0 else 'KO',
date=now.strftime('%Y-%m-%d %H:%M:%S'),
log_name='results_%s_%s.txt' % (hostname, now.strftime('%Y-%m-%d_%H-%M-%S')),
report=html_report,
)
p = subprocess.Popen(['sendmail', '-t'], stdin=subprocess.PIPE, stdout=sys.stdout.stream, stderr=sys.stderr.stream)
p.communicate(input=mail.encode('utf-8'))
if p.returncode != 0:
log('Failed to send email.')
return 1
else:
log('Email sent to: %s' % recipients)
return exit_code
if __name__ == '__main__':
Tester(*sys.argv[1:])