Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Script to start tests and to manage their results
'''
from io import StringIO
import datetime
import subprocess
import uuid
import utils
from utils import log
class Logger(object):
def __init__(self, stream, log_buffer):
self.stream = stream
self.log_buffer = log_buffer
def write(self, text):
self.stream.write(text)
self.stream.flush()
self.log_buffer.write(text)
self.log_buffer.flush()
def flush(self):
pass
log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout
class Tester():
USAGE = '''%s [-e] [-d] [-h]
-e: send email with report.
-d: debug mode (can be started with non root users).
-h: show this message.''' % __file__
def __init__(self, *args):
log('\033[96m-------------------------------\033[0m')
log('\033[96m- UbiCast applications tester -\033[0m')
log('\033[96m-------------------------------\033[0m')
args = list(args)
# Check if help is required
if '-h' in args:
log('USAGE: ' + self.USAGE)
sys.exit(0)
# Check current dir
root_dir = utils.get_dir(__file__)
if root_dir != '':
os.chdir(root_dir)
self.root_dir = root_dir
# Add to python path
if root_dir not in sys.path:
sys.path.append(root_dir)
# Check that this script is run by root
debug = '-d' in args
if debug:
args.remove('-d')
whoami = subprocess.check_output(['whoami']).decode('utf-8').strip()
log('This script should be run as root user.')
sys.exit(1)
# Load conf
conf = utils.load_conf()
if not conf:
log('No configuration loaded.')
sys.exit(1)
# Check for email value
email = '-e' in args
if email:
args.remove('-e')
exit_code = self.run_tests(email)
sys.exit(exit_code)
def get_file_description(self, path):
with open(path, 'r') as fo:
content = fo.read()
description = ''
if path.endswith('.py'):
start = content.find('\'\'\'')
if start > 0:
start += 3
end = content.find('\'\'\'', start)
if end > 0:
description = content[start:end]
else:
for line in content.split('\n'):
if line.startswith('#!'):
continue
elif line.startswith('#'):
description += line[1:].strip() + '\n'
else:
break
return description.strip()
def run_tests(self, email=False):
path = os.path.join(self.root_dir, 'tests')
if not os.path.isdir(path):
log('The tests dir is missing ("%s").' % path)
return 1
os.chdir(path)
names = os.listdir(path)
names.sort()
if not names:
log('The tests dir is empty ("%s").' % path)
return 1
results = list()
exit_code = 0
# Run all tests
for name in names:
log('\033[1;95m-- Test "%s" --\033[0;0m' % name)
test_path = os.path.join(path, name)
# Get test description
description = self.get_file_description(test_path)
# Run test
try:
code, out = utils.exec_cmd(test_path)
if code != 0:
raise Exception('Command exited with code %s.' % code)
except Exception as e:
exit_code = 1
log(e)
results.append((False, test_path, description))
else:
results.append((True, test_path, description))
log('\nTests results:')
html_report = '<table border="1">'
html_report += '\n<tr><th>Test</th><th>Result</th><th>Description</th></tr>'
for success, test_path, description in results:
file_name = os.path.basename(test_path)
if success:
html_result = '<span style="color: green;">success</span>'
term_result = '\033[92msuccess\033[0m'
else:
html_result = '<span style="color: red;">failure</span>'
term_result = '\033[91mfailure\033[0m'
log(' %s: %s' % (file_name, term_result))
html_report += '\n<tr><td>%s</td><td>%s</td><td>%s</td></tr>' % (file_name, html_result, description.replace('\n', '<br/>\n'))
html_report += '\n</table>'
# Send email
if email:
log('')
hostname = subprocess.check_output(['hostname'])
if not hostname:
log('Failed to get hostname (required to send email).')
return 1
recipients = utils.get_conf('EMAIL_ADMINS')
if not recipients:
log('No recipients defined for email sending. Set a value for EMAIL_ADMINS.')
return 1
boundary = str(uuid.uuid4())
now = datetime.datetime.utcnow()
mail = '''From: %(hostname)s <noreply@ubicast.eu>
To: %(recipients)s
Subject: UbiCast application test: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"
--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8
<b>Date: %(date)s UTC</b><br/><br/>
%(report)s
--%(boundary)s
Content-type: application/octet-stream; name="%(log_name)s"
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: utf-8
%(log_content)s
''' % dict(
boundary=boundary,
hostname=hostname.decode('utf-8'),
recipients=recipients,
status='OK' if exit_code == 0 else 'KO',
date=now.strftime('%Y-%m-%d %H:%M:%S'),
log_name='results_' + now.strftime('%Y-%m-%d_%H-%M-%S') + '.txt',
report=html_report,
log_content=log_buffer.getvalue(),
)
p = subprocess.Popen(['sendmail', '-t'], stdin=subprocess.PIPE, stdout=sys.stdout.stream, stderr=sys.stderr.stream)
p.communicate(input=mail.encode('utf-8'))
if p.returncode != 0:
log('Failed to send email.')
return 1
else:
log('Email sent to: %s' % recipients)
return exit_code
if __name__ == '__main__':
Tester(*sys.argv[1:])