Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Script to start tests and to manage their results
'''
from io import StringIO
import datetime
import subprocess
import uuid
import utils
from utils import log
class Logger(object):
def __init__(self, stream, log_buffer):
self.stream = stream
self.log_buffer = log_buffer
def write(self, text):
self.stream.write(text)
self.stream.flush()
self.log_buffer.write(text)
self.log_buffer.flush()
def flush(self):
pass
log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout
class Tester():
USAGE = '''%s [-e] [-d] [-h]
-e: send email with report.
-d: debug mode (can be started with non root users).
-h: show this message.''' % __file__
def __init__(self, *args):
log('\033[96m-------------------------------\033[0m')
log('\033[96m- UbiCast applications tester -\033[0m')
log('\033[96m-------------------------------\033[0m')
args = list(args)
# Check if help is required
if '-h' in args:
log('USAGE: ' + self.USAGE)
sys.exit(0)
# Check current dir
root_dir = utils.get_dir(__file__)
if root_dir != '':
os.chdir(root_dir)
self.root_dir = root_dir
# Add to python path
if root_dir not in sys.path:
sys.path.append(root_dir)
# Check that this script is run by root
debug = '-d' in args
if debug:
args.remove('-d')
whoami = subprocess.check_output(['whoami']).decode('utf-8')
if whoami != 'root' and not debug:
log('This script should be run as root user.')
sys.exit(1)
# Load conf
conf = utils.load_conf()
if not conf:
log('No configuration loaded.')
sys.exit(1)
# Check for email value
email = '-e' in args
if email:
args.remove('-e')
exit_code = self.run_tests(email)
sys.exit(exit_code)
def get_file_description(self, path):
with open(path, 'r') as fo:
content = fo.read()
description = ''
if path.endswith('.py'):
start = content.find('\'\'\'')
if start > 0:
start += 3
end = content.find('\'\'\'', start)
if end > 0:
description = content[start:end]
else:
for line in content.split('\n'):
if line.startswith('#!'):
continue
elif line.startswith('#'):
description += line[1:].strip() + '\n'
else:
break
return description.strip()
def run_tests(self, email=False):
path = os.path.join(self.root_dir, 'tests')
if not os.path.isdir(path):
log('The tests dir is missing ("%s").' % path)
return 1
os.chdir(path)
names = os.listdir(path)
names.sort()
if not names:
log('The tests dir is empty ("%s").' % path)
return 1
results = list()
exit_code = 0
# Run all tests
for name in names:
log('\033[1;95m-- Test "%s" --\033[0;0m' % name)
test_path = os.path.join(path, name)
# Get test description
description = self.get_file_description(test_path)
# Run test
try:
code, out = utils.exec_cmd(test_path)
if code != 0:
raise Exception('Command exited with code %s.' % code)
except Exception as e:
exit_code = 1
log(e)
results.append((False, test_path, description))
else:
results.append((True, test_path, description))
log('\nTests results:')
html_report = '<table border="1">'
html_report += '\n<tr><th>Test</th><th>Result</th><th>Description</th></tr>'
for success, test_path, description in results:
file_name = os.path.basename(test_path)
if success:
html_result = '<span style="color: green;">success</span>'
term_result = '\033[92msuccess\033[0m'
else:
html_result = '<span style="color: red;">failure</span>'
term_result = '\033[91mfailure\033[0m'
log(' %s: %s' % (file_name, term_result))
html_report += '\n<tr><td>%s</td><td>%s</td><td>%s</td></tr>' % (file_name, html_result, description.replace('\n', '<br/>\n'))
html_report += '\n</table>'
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# Send email
if email:
log('')
hostname = subprocess.check_output(['hostname'])
if not hostname:
log('Failed to get hostname (required to send email).')
return 1
recipients = utils.get_conf('EMAIL_ADMINS')
if not recipients:
log('No recipients defined for email sending. Set a value for EMAIL_ADMINS.')
return 1
boundary = str(uuid.uuid4())
now = datetime.datetime.utcnow()
mail = '''From: %(hostname)s <noreply@ubicast.eu>
To: %(recipients)s
Subject: UbiCast application test: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"
--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8
<b>Date: %(date)s</b><br/><br/>
%(report)s
--%(boundary)s
Content-type: application/octet-stream; name="%(log_name)s"
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: utf-8
%(log_content)s
''' % dict(
boundary=boundary,
hostname=hostname.decode('utf-8'),
recipients=recipients,
status='OK' if exit_code == 0 else 'KO',
date=now.strftime('%Y-%m-%d %H:%M:%S'),
log_name='results_' + now.strftime('%Y-%m-%d_%H-%M-%S') + '.txt',
report=html_report,
log_content=log_buffer.getvalue(),
)
p = subprocess.Popen(['sendmail', '-t'], stdin=subprocess.PIPE, stdout=sys.stdout.stream, stderr=sys.stderr.stream)
p.communicate(input=mail.encode('utf-8'))
if p.returncode != 0:
log('Failed to send email.')
return 1
else:
log('Email sent to: %s' % recipients)
return exit_code
if __name__ == '__main__':
Tester(*sys.argv[1:])