Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Script to start tests and to manage their results
'''
from io import StringIO
import datetime
import subprocess
import uuid
import utils
from utils import log
class Logger(object):
def __init__(self, stream, log_buffer):
self.stream = stream
self.log_buffer = log_buffer
def write(self, text):
self.stream.write(text)
self.stream.flush()
self.log_buffer.write(text)
self.log_buffer.flush()
def flush(self):
pass
log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout
class Tester():
USAGE = '''%s [-e] [-f] [-b] [-d] [-h]
-e: send email with report.
-f: send email with report only if at least one test failed.
-b: run only basic tests (exclude mediaserver tests).
-d: debug mode (can be started with non root users).
-h: show this message.''' % __file__
VALID_ARGS = ['-e', '-f', '-b', '-d', '-h']
MAX_LOG_FILES = 50
NO_MAIL_FAILURES_COUNT = 30
def __init__(self, *args):
log('\033[96m-------------------------------\033[0m')
log('\033[96m- UbiCast applications tester -\033[0m')
log('\033[96m-------------------------------\033[0m')
args = list(args)
# Check if help is required
if '-h' in args:
log('USAGE: ' + self.USAGE)
sys.exit(0)
for arg in args:
if arg not in self.VALID_ARGS:
log('Invalid argument given: "%s".\n' % arg)
log('USAGE: ' + self.USAGE)
sys.exit(1)
# Check current dir
root_dir = utils.get_dir(__file__)
if root_dir != '':
os.chdir(root_dir)
self.root_dir = root_dir
# Add to python path
if root_dir not in sys.path:
sys.path.append(root_dir)
# Check that this script is run by root
debug = '-d' in args
whoami = subprocess.check_output(['whoami']).decode('utf-8').strip()
log('This script should be run as root user.')
sys.exit(1)
# Load conf
conf = utils.load_conf()
if not conf:
log('No configuration loaded.')
sys.exit(1)
# Check for email value
email = '-e' in args
email_if_fail = '-f' in args
basic_only = '-b' in args
tests = self.discover_tests(basic_only)
exit_code = self.run_tests(tests, email, email_if_fail)
sys.exit(exit_code)
def get_file_description(self, path):
with open(path, 'r') as fo:
content = fo.read()
description = ''
if path.endswith('.py'):
start = content.find('\'\'\'')
if start > 0:
start += 3
end = content.find('\'\'\'', start)
if end > 0:
description = content[start:end]
else:
for line in content.split('\n'):
if line.startswith('#!'):
continue
elif line.startswith('#'):
description += line[1:].strip() + '\n'
else:
break
return description.strip()
def discover_tests(self, basic_only=False):
path = os.path.join(self.root_dir, 'tests')
if not os.path.isdir(path):
log('The tests dir is missing ("%s").' % path)
names = os.listdir(path)
names.sort()
if not names:
log('The tests dir is empty ("%s").' % path)
return tests
for name in names:
test_path = os.path.join(path, name)
if os.path.isfile(test_path):
description = self.get_file_description(test_path)
tests.append((name, description, [test_path]))
if basic_only:
return tests
# Get MS instances
ms_users = list()
for user in os.listdir('/home'):
if os.path.exists('/home/%s/msinstance' % user):
ms_users.append(user)
# Clone testing suite
ms_path = os.path.join(path, 'ms-testing-suite')
if os.path.exists(ms_path):
log('Updating ms-testing-suite in "%s".' % ms_path)
os.chdir(ms_path)
subprocess.check_call(['git', 'pull'])
os.chdir(self.root_dir)
else:
log('Cloning ms-testing-suite in "%s".' % ms_path)
subprocess.check_call(['git', 'clone', 'https://git.ubicast.net/mediaserver/ms-testing-suite.git', ms_path])
# Add tests to list
wowza_dir = '/usr/local/WowzaStreamingEngine'
etc_lives_conf = '/etc/mediaserver/lives_conf.py'
local_lives_conf = '/home/%s/msinstance/conf/lives_conf.py'
for user in ms_users:
ms_tests = ['ms_vod_tester.py']
# Check if live tests should be started
if os.path.exists(wowza_dir) or os.path.exists(etc_lives_conf) or os.path.exists(local_lives_conf % user):
ms_tests.append('ms_live_tester.py')
test_path = os.path.join(ms_path, name)
description = self.get_file_description(test_path)
tests.append(('%s (%s)' % (name, user), description, [test_path, user]))
return tests
def run_tests(self, tests, email=False, email_if_fail=False):
# Run all tests
results = list()
successes = 0
failures = 0
for name, description, command in tests:
log('\033[1;95m-- Test "%s" --\033[0;0m' % name)
start_date = datetime.datetime.utcnow()
log('Test start: %s UTC.' % start_date.strftime('%Y-%m-%d %H:%M:%S'))
p = subprocess.Popen(command, stdin=sys.stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if out:
log(out.decode('utf-8').strip())
if err:
log(err.decode('utf-8').strip())
if p.returncode == 0:
success = True
elif p.returncode == 2:
success = None
log('Command exited with code %s.' % p.returncode)
end_date = datetime.datetime.utcnow()
duration = end_date - start_date
if total_duration:
total_duration += duration
else:
total_duration = duration
log('Test end: %s UTC (duration: %s).' % (end_date.strftime('%Y-%m-%d %H:%M:%S'), duration))
results.append((name, description, command, success, duration))
exit_code = 1 if failures > 0 else 0
log('\nTests results:')
html_report = '<table border="1">'
html_report += '\n<tr><th>Test</th><th>Result</th><th>Duration</th><th>Description</th></tr>'
for name, description, command, success, duration in results:
if success is None:
html_result = '<span style="color: blue;">not testable</span>'
term_result = '\033[94mnot testable\033[0m'
elif success:
html_result = '<span style="color: green;">success</span>'
term_result = '\033[92msuccess\033[0m'
else:
html_result = '<span style="color: red;">failure</span>'
term_result = '\033[91mfailure\033[0m'
log(' %s: %s (%s)' % (name, term_result, duration))
html_report += '\n<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr>' % (name, html_result, duration, description.replace('\n', '<br/>\n'))
log('Total tests duration: %s.\n' % total_duration)
html_report += '\n</table>'
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# Store locally results
now = datetime.datetime.utcnow()
log_dir = os.path.join(self.root_dir, 'log')
if not os.path.exists(log_dir):
os.makedirs(log_dir)
history_file = os.path.join(log_dir, 'tests_history.txt')
add_header = not os.path.exists(history_file)
with open(history_file, 'a') as fo:
if add_header:
fo.write('Date | Result | Succeeded | Failed | Not testable\n')
fo.write('%s | %s | %s | %s | %s\n' % (now.strftime('%Y-%m-%d %H:%M:%S'), 'OK' if exit_code == 0 else 'KO', successes, failures, len(tests) - successes - failures))
# Search for old logs to remove
names = os.listdir(log_dir)
names.sort()
for name in list(names):
if not name.startswith('results_'):
names.remove(name)
while len(names) > self.MAX_LOG_FILES - 1:
name = names.pop(0)
try:
log('Removing old log "%s".' % os.path.join(log_dir, name))
os.remove(os.path.join(log_dir, name))
except Exception as e:
log('Failed to remove old log: %s' % e)
# Write log to file
hostname = subprocess.check_output(['hostname'])
if hostname:
hostname = hostname.decode('utf-8').strip()
else:
log('Failed to get hostname (required to send email).')
log_name = 'results_%s_%s.txt' % (hostname or 'noname', now.strftime('%Y-%m-%d_%H-%M-%S'))
log_content = re.sub(r'\033\[[\d;]+m', '', log_buffer.getvalue())
with open(os.path.join(log_dir, log_name), 'w') as fo:
fo.write(log_content)
# Send email
send_email = False
if hostname:
if email:
send_email = True
elif email_if_fail and exit_code != 0:
# if they were too many consecutive failures, do not send the email
with open(history_file, 'r') as fo:
history_content = fo.read()
lines = history_content.split('\n')
lines.reverse()
consecutive_failures = 0
for line in lines:
if line:
if 'KO' in line:
consecutive_failures += 1
else:
break
if consecutive_failures < self.NO_MAIL_FAILURES_COUNT:
log('Consecutive tester failures: %s.' % consecutive_failures)
send_email = True
else:
log('Too many consecutive tester failures: %s, no email will be sent.' % consecutive_failures)
if send_email:
recipients = utils.get_conf('EMAIL_ADMINS')
if not recipients:
log('No recipients defined for email sending. Set a value for EMAIL_ADMINS.')
return 1
boundary = str(uuid.uuid4())
mail = '''From: %(hostname)s <noreply@ubicast.eu>
To: %(recipients)s
Subject: UbiCast application test: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"
--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8
<b>Date: %(date)s UTC</b><br/><br/>
%(report)s
--%(boundary)s
Content-type: text/plain; name="%(log_name)s"; charset=UTF-8
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: utf-8
boundary=boundary,
recipients=recipients,
status='OK' if exit_code == 0 else 'KO',
date=now.strftime('%Y-%m-%d %H:%M:%S'),
report=html_report,
)
p = subprocess.Popen(['sendmail', '-t'], stdin=subprocess.PIPE, stdout=sys.stdout.stream, stderr=sys.stderr.stream)
p.communicate(input=mail.encode('utf-8'))
if p.returncode != 0:
log('Failed to send email.')
return 1
else:
log('Email sent to: %s' % recipients)
return exit_code
if __name__ == '__main__':
Tester(*sys.argv[1:])