Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Script to start tests and to manage their results
'''
from io import StringIO
import datetime
import subprocess
import uuid
import utils
from utils import log
class Logger(object):
def __init__(self, stream, log_buffer):
self.stream = stream
self.log_buffer = log_buffer
def write(self, text):
self.stream.write(text)
self.stream.flush()
self.log_buffer.write(text)
self.log_buffer.flush()
def flush(self):
pass
log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout
class Tester():
USAGE = '''%s [-ms] [-e] [-d] [-h]
-ms: run MediaServer tests in addition of standard tests.
-e: send email with report.
-d: debug mode (can be started with non root users).
-h: show this message.''' % __file__
def __init__(self, *args):
log('\033[96m-------------------------------\033[0m')
log('\033[96m- UbiCast applications tester -\033[0m')
log('\033[96m-------------------------------\033[0m')
args = list(args)
# Check if help is required
if '-h' in args:
log('USAGE: ' + self.USAGE)
sys.exit(0)
# Check current dir
root_dir = utils.get_dir(__file__)
if root_dir != '':
os.chdir(root_dir)
self.root_dir = root_dir
# Add to python path
if root_dir not in sys.path:
sys.path.append(root_dir)
# Check that this script is run by root
debug = '-d' in args
whoami = subprocess.check_output(['whoami']).decode('utf-8').strip()
log('This script should be run as root user.')
sys.exit(1)
# Load conf
conf = utils.load_conf()
if not conf:
log('No configuration loaded.')
sys.exit(1)
# Check for email value
email = '-e' in args
ms = '-ms' in args
tests = self.discover_tests(ms)
if not tests:
sys.exit(1)
exit_code = self.run_tests(tests, email)
sys.exit(exit_code)
def get_file_description(self, path):
with open(path, 'r') as fo:
content = fo.read()
description = ''
if path.endswith('.py'):
start = content.find('\'\'\'')
if start > 0:
start += 3
end = content.find('\'\'\'', start)
if end > 0:
description = content[start:end]
else:
for line in content.split('\n'):
if line.startswith('#!'):
continue
elif line.startswith('#'):
description += line[1:].strip() + '\n'
else:
break
return description.strip()
def discover_tests(self, ms=False):
tests = list()
# Get standard tests
path = os.path.join(self.root_dir, 'tests')
if not os.path.isdir(path):
log('The tests dir is missing ("%s").' % path)
names = os.listdir(path)
names.sort()
if not names:
log('The tests dir is empty ("%s").' % path)
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
return tests
for name in names:
test_path = os.path.join(path, name)
description = self.get_file_description(test_path)
tests.append((name, description, [test_path]))
# Get MediaServer tests
if ms:
# Clone testing suite
ms_path = os.path.join(path, 'ms-testing-suite')
if os.path.exists(ms_path):
os.chdir(ms_path)
subprocess.check_call(['git', 'pull'])
os.chdir(self.root_dir)
else:
subprocess.check_call(['git', 'clone', 'https://git.ubicast.net/mediaserver/ms-testing-suite.git', ms_path])
ms_tests = ['ms_vod_tester.py']
# Get MS instances
users = list()
for user in os.listdir('/home'):
if os.path.exists('/home/%s/msinstance' % user):
users.append(user)
# check that Wowza is installed
wowza_dir = '/usr/local/WowzaStreamingEngine'
if not os.path.exists(wowza_dir):
log('Info: Wowza is not installed ("%s" does not exist).' % wowza_dir)
else:
ms_tests.append('ms_live_tester.py')
# Add tests to list
for user in users:
for name in ms_tests:
test_path = os.path.join(path, name)
description = self.get_file_description(test_path)
tests.append(('%s (%s)' % (name, user), description, [test_path, user]))
return tests
def run_tests(self, tests, email=False):
results = list()
exit_code = 0
# Run all tests
for name, description, command in tests:
log('\033[1;95m-- Test "%s" --\033[0;0m' % name)
# Run test
try:
p = subprocess.Popen(command, stdin=sys.stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if out:
log(out.decode('utf-8').strip())
if err:
log(err.decode('utf-8').strip())
if p.returncode != 0:
raise Exception('Command exited with code %s.' % p.returncode)
except Exception as e:
exit_code = 1
log(e)
results.append((name, description, command, False))
results.append((name, description, command, True))
# Display results
log('\nTests results:')
html_report = '<table border="1">'
html_report += '\n<tr><th>Test</th><th>Result</th><th>Description</th></tr>'
for name, description, command, success in results:
if success:
html_result = '<span style="color: green;">success</span>'
term_result = '\033[92msuccess\033[0m'
else:
html_result = '<span style="color: red;">failure</span>'
term_result = '\033[91mfailure\033[0m'
log(' %s: %s' % (name, term_result))
html_report += '\n<tr><td>%s</td><td>%s</td><td>%s</td></tr>' % (name, html_result, description.replace('\n', '<br/>\n'))
html_report += '\n</table>'
# Send email
if email:
log('')
hostname = subprocess.check_output(['hostname'])
if not hostname:
log('Failed to get hostname (required to send email).')
return 1
recipients = utils.get_conf('EMAIL_ADMINS')
if not recipients:
log('No recipients defined for email sending. Set a value for EMAIL_ADMINS.')
return 1
boundary = str(uuid.uuid4())
now = datetime.datetime.utcnow()
log_content = re.sub(r'\033\[[\d;]+m', '', log_buffer.getvalue())
mail = '''From: %(hostname)s <noreply@ubicast.eu>
To: %(recipients)s
Subject: UbiCast application test: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"
--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8
<b>Date: %(date)s UTC</b><br/><br/>
%(report)s
--%(boundary)s
Content-type: text/plain; name="%(log_name)s"; charset=UTF-8
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: utf-8
boundary=boundary,
recipients=recipients,
status='OK' if exit_code == 0 else 'KO',
date=now.strftime('%Y-%m-%d %H:%M:%S'),
log_name='results_%s_%s.txt' % (hostname, now.strftime('%Y-%m-%d_%H-%M-%S')),
report=html_report,
)
p = subprocess.Popen(['sendmail', '-t'], stdin=subprocess.PIPE, stdout=sys.stdout.stream, stderr=sys.stderr.stream)
p.communicate(input=mail.encode('utf-8'))
if p.returncode != 0:
log('Failed to send email.')
return 1
else:
log('Email sent to: %s' % recipients)
return exit_code
if __name__ == '__main__':
Tester(*sys.argv[1:])