Skip to content
Snippets Groups Projects
Commit 1d1d3711 authored by Stéphane Diemer's avatar Stéphane Diemer
Browse files

Added email sending support in tester script (refs #20133).

parent 65ca2aec
No related branches found
No related tags found
No related merge requests found
......@@ -18,7 +18,7 @@ def setup(interactive=True):
]
# Create / update ubicast account
cmds.append('echo "Checking ubicast account"')
code, out = utils.exec_cmd(['id', 'ubicast'], get_out=True)
code, out = utils.exec_cmd(['id', 'ubicast'])
if code != 0:
cmds.append('useradd -m -s /bin/bash ubicast')
out = ''
......@@ -29,7 +29,7 @@ def setup(interactive=True):
# root
cmds.append('mkdir -p /root/.ssh')
cmds.append('chmod 700 /root/.ssh')
code, out = utils.exec_cmd(['rgrep', 'support@ubicast', '/root/.ssh'], get_out=True)
code, out = utils.exec_cmd(['rgrep', 'support@ubicast', '/root/.ssh'])
if code != 0:
cmds.append('cat "%s/ubicast_support.pub" >> /root/.ssh/authorized_keys' % dir_path)
else:
......@@ -37,7 +37,7 @@ def setup(interactive=True):
# ubicast
cmds.append('mkdir -p /home/ubicast/.ssh')
cmds.append('chmod 700 /home/ubicast/.ssh')
code, out = utils.exec_cmd(['rgrep', 'support@ubicast', '/home/ubicast/.ssh'], get_out=True)
code, out = utils.exec_cmd(['rgrep', 'support@ubicast', '/home/ubicast/.ssh'])
if code != 0:
cmds.append('cat "%s/ubicast_support.pub" >> /home/ubicast/.ssh/authorized_keys' % dir_path)
else:
......
......@@ -6,7 +6,7 @@ import utils
def setup(interactive=True):
# Get hostname
utils.log('Getting system hostname.')
code, out = utils.exec_cmd(['hostname'], get_out=True)
code, out = utils.exec_cmd(['hostname'])
if code == 0:
hostname = out
utils.log('Hostname is %s.' % hostname)
......
......@@ -9,7 +9,7 @@ def setup(interactive=True):
dir_path = utils.get_dir(__file__)
wowza_setup_name = 'WowzaStreamingEngine-4.5.0-linux-x64-installer.deb'
utils.log('It may take a while to download the Wowza installer from the UbiCast server.')
if utils.exec_cmd('lsb_release -a | grep 14') == 0:
if utils.check_cmd('lsb_release -a | grep 14') == 0:
jre_package = 'openjdk-7-jre-headless' # 14.04
else:
jre_package = 'openjdk-8-jre-headless' # 16.04
......
......@@ -59,7 +59,7 @@ def setup(interactive=True):
hosts.append(server_name)
utils.run_commands(cmds)
# Update hosts file
rc, hostname = utils.exec_cmd('hostname', get_out=True)
rc, hostname = utils.exec_cmd('hostname')
if rc == 0 and hostname not in hosts:
hosts.insert(0, hostname)
with open('/etc/hosts', 'r') as fo:
......
......@@ -34,6 +34,8 @@ SHELL_ADMIN_PWD='test'
# -- Emails --
EMAIL_SMTP_SERVER=
EMAIL_SENDER='support@ubicast.eu'
# separate emails with comas in EMAIL_ADMINS
EMAIL_ADMINS=
# -- Wowza --
WOWZA_LIVE_PWD='test'
......
......@@ -61,11 +61,11 @@ class EnvSetup():
log('No action available.')
sys.exit(1)
# Check that this script is run by root
debug = '-d' in args
if debug:
self.debug = '-d' in args
if self.debug:
args.remove('-d')
code, out = utils.exec_cmd(['whoami'], get_out=True)
if out != 'root' and not debug:
code, out = utils.exec_cmd(['whoami'])
if out != 'root' and not self.debug:
log('This script should be run as root user.')
sys.exit(1)
# Load conf
......@@ -95,6 +95,7 @@ class EnvSetup():
else:
log(' \033[1;94m%s\033[0m' % (action['label']))
log('')
log(' t: Run tests')
log(' c: Configuration status')
log(' e: Exit\n')
log('Info:')
......@@ -113,7 +114,13 @@ class EnvSetup():
log('Exit')
sys.exit(0)
exit_code = 0
if target == 'c':
if target == 't':
# Run tests
args = [os.path.join(self.root_dir, 'tester.py'), 'tester.py']
if self.debug:
args.append('-d')
os.execl(*args)
elif target == 'c':
# Display current configuration
log('Configuration status:')
override = utils.get_conf('_override')
......
......@@ -3,13 +3,36 @@
'''
Script to start tests and to manage their results
'''
from io import StringIO
import datetime
import os
import subprocess
import sys
import uuid
import utils
from utils import log
class Logger(object):
def __init__(self, stream, log_buffer):
self.stream = stream
self.log_buffer = log_buffer
def write(self, text):
self.stream.write(text)
self.stream.flush()
self.log_buffer.write(text)
self.log_buffer.flush()
def flush(self):
pass
log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout
class Tester():
USAGE = '''%s [-e] [-d] [-h]
-e: send email with report.
......@@ -37,7 +60,7 @@ class Tester():
debug = '-d' in args
if debug:
args.remove('-d')
code, out = utils.exec_cmd(['whoami'], get_out=True)
code, out = utils.exec_cmd(['whoami'])
if out != 'root' and not debug:
log('This script should be run as root user.')
sys.exit(1)
......@@ -49,7 +72,7 @@ class Tester():
# Check for email value
email = '-e' in args
if email:
args.remove('-i')
args.remove('-e')
exit_code = self.run_tests(email)
sys.exit(exit_code)
......@@ -63,13 +86,13 @@ class Tester():
start += 3
end = content.find('\'\'\'', start)
if end > 0:
description = content[start:end].strip()
description = content[start:end]
else:
for line in content.split('\n'):
if line.startswith('#!'):
continue
elif line.startswith('#'):
description += line + '\n'
description += line[1:].strip() + '\n'
else:
break
return description.strip()
......@@ -95,7 +118,7 @@ class Tester():
description = self.get_file_description(test_path)
# Run test
try:
code = utils.exec_cmd(test_path)
code, out = utils.exec_cmd(test_path)
if code != 0:
raise Exception('Command exited with code %s.' % code)
except Exception as e:
......@@ -105,7 +128,7 @@ class Tester():
else:
results.append((True, test_path, description))
log('\nTests results:')
html_report = '<table>'
html_report = '<table border="1">'
html_report += '\n<tr><th>Test</th><th>Result</th><th>Description</th></tr>'
for success, test_path, description in results:
file_name = os.path.basename(test_path)
......@@ -116,8 +139,57 @@ class Tester():
html_result = '<span style="color: red;">failure</span>'
term_result = '\033[91mfailure\033[0m'
log(' %s: %s' % (file_name, term_result))
html_report += '\n<tr><td>%s</td><td>%s</td><td>%s</td></tr>' % (file_name, html_result, description)
html_report += '\n<tr><td>%s</td><td>%s</td><td>%s</td></tr>' % (file_name, html_result, description.replace('\n', '<br/>\n'))
html_report += '\n</table>'
# Send email
if email:
log('')
hostname = subprocess.check_output(['hostname'])
if not hostname:
log('Failed to get hostname (required to send email).')
return 1
recipients = utils.get_conf('EMAIL_ADMINS')
if not recipients:
log('No recipients defined for email sending. Set a value for EMAIL_ADMINS.')
return 1
boundary = str(uuid.uuid4())
now = datetime.datetime.utcnow()
mail = '''From: %(hostname)s <noreply@ubicast.eu>
To: %(recipients)s
Subject: UbiCast application test: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"
--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8
<b>Date: %(date)s</b><br/><br/>
%(report)s
--%(boundary)s
Content-type: application/octet-stream; name="%(log_name)s"
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: utf-8
%(log_content)s
''' % dict(
boundary=boundary,
hostname=hostname.decode('utf-8'),
recipients=recipients,
status='OK' if exit_code == 0 else 'KO',
date=now.strftime('%Y-%m-%d %H:%M:%S'),
log_name='results_' + now.strftime('%Y-%m-%d_%H-%M-%S') + '.txt',
report=html_report,
log_content=log_buffer.getvalue(),
)
p = subprocess.Popen(['sendmail', '-t'], stdin=subprocess.PIPE, stdout=sys.stdout.stream, stderr=sys.stderr.stream)
p.communicate(input=mail.encode('utf-8'))
if p.returncode != 0:
log('Failed to send email.')
return 1
else:
log('Email sent to: %s' % recipients)
return exit_code
......
#!/bin/bash
# Check that the installation of an Ubuntu package using APT works.
set -e
echo "Testing apt-get install"
......
#!/bin/bash
# Check that the Nginx configuration files are valid.
set -e
if ( which nginx >/dev/null ); then
......
#!/bin/bash
# Check that the NTP server is reachable (date synchronization).
set -e
source /root/envsetup/conf.sh
......
......@@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
'''
Check that postfix listen correctly the port 25.
Postfix is the service which handles email sendings.
'''
import os
import re
......
......@@ -34,7 +34,7 @@ if content:
if not url or not api_key:
print('The file "%s" is not correct: skyreach url not found.' % apt_source)
sys.exit(1)
print('SkyReach url is %s and API key is %s.' % (url, api_key))
print('SkyReach url is %s and API key is %s[...].' % (url, api_key[:8]))
# Test SkyReach responses
req = requests.get(url)
......@@ -46,6 +46,7 @@ else:
apt_url = '%s/packaging/apt/%s/Packages' % (url, api_key)
req = requests.get(apt_url)
apt_url = apt_url.replace(api_key, api_key[:8] + '[...]')
if not req.ok:
print('Request to %s failed (%s):' % (apt_url, req.status_code))
print(req.text)
......
......@@ -21,23 +21,24 @@ def get_dir(file_path):
return os.path.dirname(os.path.abspath(os.path.expanduser(file_path)))
def exec_cmd(cmd, get_out=False):
stdout = subprocess.PIPE if get_out else sys.stdout
stderr = subprocess.PIPE if get_out else sys.stderr
def exec_cmd(cmd, log_output=True):
shell = not isinstance(cmd, (tuple, list))
p = subprocess.Popen(cmd, stdin=sys.stdin, stdout=stdout, stderr=stderr, shell=shell)
p = subprocess.Popen(cmd, stdin=sys.stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell)
out, err = p.communicate()
if get_out:
out = out.decode('utf-8').strip() if out else ''
if err:
if out:
out += '\n'
out += 'Stderr:\n' + err.decode('utf-8').strip()
return p.returncode, out
else:
sys.stdout.flush()
sys.stderr.flush()
return p.returncode
out = out.decode('utf-8').strip() if out else ''
if err:
if out:
out += '\n'
out += err.decode('utf-8').strip()
out = out.strip()
if log_output:
log(out)
return p.returncode, out
def check_cmd(cmd):
code, out = exec_cmd(cmd, log_output=False)
return code
def load_conf():
......@@ -97,7 +98,7 @@ def run_commands(cmds):
cond = cmd['cond']
negate = cmd.get('cond_neg')
skip = cmd.get('cond_skip')
code = exec_cmd(cond)
code = check_cmd(cond)
success = code != 0 if negate else code == 0
if not success:
msg = 'Condition for command "%s" not fullfilled.' % cmd['line']
......@@ -135,7 +136,7 @@ def run_commands(cmds):
log('A backup file already exist for:\n%s' % cmd['target'])
else:
log('>>> ' + cmd['line'])
code = exec_cmd(cmd['line'])
code = check_cmd(cmd['line'])
if code != 0:
raise Exception('Command exited with code %s.' % code)
except Exception as e:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment