Newer
Older
Script to start tests and to manage their results
from io import StringIO
import argparse
import base64
import datetime
import glob
import os
import socket
import subprocess
import sys
import time
import uuid
from utilities.config import load_conf, get_conf
from utilities.logging import strip_colors, escape_html
from utilities.systemd import check_systemd_setup
The system is out of support, UbiCast will not be notified if errors are detected.
Please contact UbiCast sales team (sales@ubicast.eu) to renew the support contract.\033[0m'''
class Logger(object):
def __init__(self, stream, log_buffer):
self.stream = stream
self.log_buffer = log_buffer
def write(self, text):
self.stream.write(text)
self.stream.flush()
self.log_buffer.write(text)
self.log_buffer.flush()
def flush(self):
pass
log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout
def raid_idle():
idle = True
devs = glob.glob('/sys/block/md*/md/sync_action')
print('State in %s is %s' % (d, sync_state))
return idle
class Tester():
MAX_LOG_FILES = 50
def __init__(self):
print('\033[96m-------------------------------\033[0m')
print('\033[96m- UbiCast applications tester -\033[0m')
print('\033[96m-------------------------------\033[0m')
# parse args
parser = argparse.ArgumentParser(description=__doc__.strip())
parser.add_argument('-d', '--debug', dest='debug', action='store_true', help='Debug mode (can be started with non root users).')
parser.add_argument('-e', '--email', dest='send_email', action='store_true', help='Send tests report by email.')
parser.add_argument('-b', '--basic', dest='basic_tests', action='store_true', help='Run only basic tests (exclude mediaserver tests).')
parser.add_argument('-n', '--no-update', dest='no_update', action='store_true', help='Do not update envsetup repository.')
parser.add_argument('-p', '--no-packages', dest='no_packages', action='store_true', help='Do not install packages.')
parser.add_argument('msuser', nargs='?', help='The unix user of the MediaServer instance to test. Default is user specified in configuration or all users if not set.')
args = parser.parse_args()
# Check current dir
root_dir = get_dir(__file__)
os.chdir(root_dir)
self.root_dir = root_dir
# Add to python path
if root_dir not in sys.path:
sys.path.append(root_dir)
# Check that this script is run by root
if os.getuid() != 0 and not args.debug:
print('This script should be run as root user.')
sys.exit(1)
# Update envsetup files
if not args.no_update:
tester_path = os.path.join(root_dir, os.path.basename(__file__))
mtime = os.path.getmtime(tester_path)
subprocess.run(['python3', 'update_envsetup.py'], timeout=1800)
if mtime != os.path.getmtime(tester_path):
print('The script has changed, restarting it...')
os.execl('/usr/bin/python3', 'python3', tester_path, '-n', *sys.argv[1:])
sys.exit(1) # not reachable
# Install utilities packages
if not args.no_packages:
subprocess.run(['python3', 'pkgs_envsetup.py'], timeout=1800)
# Load conf
conf = load_conf()
if not conf:
# Check RAID status
if not raid_idle():
print('A RAID check or operation is in progress, aborting tests')
sys.exit(1)
# Get tests to run
tests = self.discover_tests(args.basic_tests, msuser=args.msuser, no_update=args.no_update)
if not tests:
# Print system info
self.print_system_info()
# Create logs dir
self.logs_dir = os.path.join(self.root_dir, 'logs')
os.makedirs(self.logs_dir, exist_ok=True)
print('Logs dir is "%s".' % self.logs_dir)
# Check systemd service and timer
check_systemd_setup()
# Run tests

Stéphane Diemer
committed
now, failures, out_of_support, log_content, html_report = self.run_tests(tests)
if args.send_email:
failures += self.send_report_email(now, failures, out_of_support, log_content, html_report)
sys.exit(1 if failures > 0 else 0)
def print_system_info(self):
print('System information:')
print('- Date: %s UTC.' % datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'))
print('- FQDN: %s.' % socket.getfqdn())
p = subprocess.run(['ip', '-br', 'addr'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=1800)
print('- IP config:\n%s' % p.stdout.decode('utf-8'))

Stéphane Diemer
committed
def get_log_name(self, now):
hostname = socket.gethostname()
log_name = 'results_%s_%s.log' % (
hostname or 'noname',
now.strftime('%Y-%m-%d_%H-%M-%S'),
)
return log_name
description = ''
if path.endswith('.py'):
start = (
content.find("'''")
if content.find("'''") != -1
else content.find('"""')
)
if start > 0:
start += 3
end = (
content.find("'''", start)
if content.find("'''", start) != -1
else content.find('"""', start)
)
if end > 0:
description = content[start:end]
else:
for line in content.split('\n'):
if line.startswith('#!'):
elif line.startswith('#'):
description += line[1:].strip() + '\n'
else:
break
description = description.strip()
if description.startswith('Criticality:'):
criticality, *description = description.split('\n')
criticality = criticality[len('Criticality:') :].strip() # noqa: E203
description = '\n'.join(description)
return criticality, description
def discover_tests(self, basic_only=False, msuser=None, no_update=False):
ignored_tests = get_conf('TESTER_IGNORED_TESTS', '').split(',')
ignored_tests.append('__init__.py')
if basic_only:
tests = self.discover_basic_tests(ignored_tests)
elif msuser:
tests = self.discover_mediaserver_tests(msuser, no_update, ignored_tests)
else:
tests = self.discover_basic_tests(ignored_tests)
tests.extend(self.discover_mediaserver_tests(msuser, no_update, ignored_tests))
criticalities_map = {'Low': 1, 'Normal': 2, 'High': 3}
tests.sort(key=lambda i: (-criticalities_map.get(i[1], 0), i[0]))
return tests
def discover_basic_tests(self, ignored_tests=None):
test_dir = os.path.join(self.root_dir, 'scripts')
if not os.path.isdir(test_dir):
print('The tests dir is missing ("%s").' % test_dir)
return list()
names = os.listdir(test_dir)
print('The tests dir is empty ("%s").' % test_dir)
return list()
if ignored_tests and name in ignored_tests:
test_path = os.path.join(test_dir, name)
if os.path.isfile(test_path):
criticality, description = self.parse_file_header(test_path)
tests.append((name, criticality, description, [test_path], None))
return tests
def discover_mediaserver_tests(self, msuser=None, no_update=False, ignored_tests=None):
# Get MS instances
ms_users = list()
for user in os.listdir('/home'):
if os.path.exists('/home/%s/msinstance' % user) and (
not msuser or user == msuser
):
ms_users.append(user)
if not ms_users:
return list()
ms_users.sort()
cleaned_list = list()
instances_to_test = get_conf('TESTER_MS_INSTANCES', '').split(',')
if instances_to_test:
for val in instances_to_test:
val = val.strip()
if not val:
continue
if val in ms_users:
cleaned_list.append(val)
else:
print(
'An inexisting instance has been requested for tests: "%s".'
% val
if cleaned_list:
ms_users = cleaned_list
else:
try:
max_instances = int(get_conf('TESTER_MAX_INSTANCES') or 2)
except Exception as e:
print('TESTER_MAX_INSTANCES has an invalid value: %s' % e)
max_instances = 2
if len(ms_users) > max_instances:
ms_users = ms_users[:max_instances]
print('Instances that will be tested: %s.' % ', '.join(ms_users))
# Clone testing suite
ms_path = os.path.join(self.root_dir, 'scripts', 'ms-testing-suite')
if not os.path.exists(ms_path):
print('Cloning ms-testing-suite in "%s".' % ms_path)
subprocess.run([
'git',
'clone',
'--recursive',
'https://mirismanager.ubicast.eu/git/mediaserver/ms-testing-suite.git',
ms_path,
], timeout=1800)
if not os.path.exists(ms_path):
print('The ms-testing-suite dir "%s" does not exist, no MediaServer test will be run.' % ms_path)
return list()
# Update testing suite if allowed
if not no_update:
print('Updating ms-testing-suite in "%s".' % ms_path)
os.chdir(ms_path)
branch = get_conf('ENVSETUP_BRANCH') or 'stable'
if branch:
subprocess.run(['git', 'checkout', branch], timeout=1800)
subprocess.run(['git', 'fetch', '--recurse-submodules', '--all'], timeout=1800)
subprocess.run(['git', 'reset', '--hard', 'origin/{}'.format(branch)], timeout=1800)
subprocess.run(['git', 'pull', '--recurse-submodules'], timeout=1800)
subprocess.run(['git', 'submodule', 'update', '--init', '--recursive'], timeout=1800)
os.chdir(self.root_dir)
# Build tests list
print('Add MediaServer tests if available.')
wowza_dir = '/usr/local/WowzaStreamingEngine'
etc_lives_conf = '/etc/mediaserver/lives.json'
local_lives_conf = '/home/%s/msinstance/conf/lives.json'
old_etc_lives_conf = '/etc/mediaserver/lives_conf.py'
old_local_lives_conf = '/home/%s/msinstance/conf/lives_conf.py'
tests = list()
for user in ms_users:
ms_tests = ['ms_vod_tester.py', 'test_caches.py']
# Check if live tests should be started
if (
os.path.exists(wowza_dir)
or os.path.exists(etc_lives_conf)
or os.path.exists(local_lives_conf % user)
or os.path.exists(old_etc_lives_conf)
or os.path.exists(old_local_lives_conf % user)
):
ms_tests.append('test_wowza_secure.py')
ms_tests.append('ms_live_tester.py')
ignore_rules = get_conf('TESTER_IGNORE_ROUTING_RULES', '0')
for name in ms_tests:
if ignored_tests and name in ignored_tests:
continue
test_path = os.path.join(ms_path, name)
if os.path.exists(test_path):
criticality, description = self.parse_file_header(test_path)
tests.append((
criticality,
description,
[test_path, user],

Stéphane Diemer
committed
def run_tests(self, tests):
# Run all tests
successes = 0
failures = 0
total_duration = None
report_rows = [('Test', 'Criticality', 'Result', 'Duration', 'Description')]
report_rows_length = [len(t) for t in report_rows[0]]
out_of_support = False
for name, criticality, description, command, env in tests:
print('\033[1;95m-- Test "%s" --\033[0;0m' % name)
start_date = datetime.datetime.utcnow()
print('Test start: %s UTC.' % start_date.strftime('%Y-%m-%d %H:%M:%S'))
returncode = None
wait_time = 5 * count * count
print('Waiting %s s...' % wait_time)
time.sleep(wait_time)
test_env = dict(os.environ)
if env:
test_env.update(env)
try:
p = subprocess.run(
command,
env=test_env,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=1800,
)
out = p.stdout.decode('utf-8', 'replace').strip()
print(out)
out_of_support = out_of_support or 'out of support' in out
returncode = p.returncode
if returncode in (0, 2, 3):
break
except Exception as e:
print('Command failed: %s' % e)
returncode = None
if returncode == 0:
elif returncode == 2:
elif returncode == 3:
print('Command exited with code %s.' % returncode)
# Get duration
end_date = datetime.datetime.utcnow()
duration = end_date - start_date
if total_duration:
total_duration += duration
else:
total_duration = duration
print(
'Test end: %s UTC (duration: %s).'
% (end_date.strftime('%Y-%m-%d %H:%M:%S'), duration)
)
# Prepare report
report_rows.append((name, criticality, status, str(duration), description))
report_rows_length = [
max(len(strip_colors(t)), report_rows_length[i])
for i, t in enumerate(report_rows[-1])
]
# Display results
# results as text
print('\nTests results:')
log_report = ''
for row in report_rows:
if not log_report:
for i, val in enumerate(row):
if i == len(row) - 1:
break
if i == 0:
# merge name and description
log_report += '\n\033[96m%s\033[0m \033[37m%s\033[0m\n' % (
val,
row[-1],
)
else:
nb_sp = report_rows_length[i] - len(strip_colors(val))
log_report += ' %s%s' % (val, ' ' * nb_sp)
log_report += '\n' + '-' * 50
log_report = OUT_OF_SUPPORT_TEXT + '\n' + log_report
print('Total tests duration: %s.\n' % total_duration)
html_cell = 'th' if not html_report else 'td'
html_report += '\n <tr>'
html_report += ' <%s>%s</%s>' % (html_cell, escape_html(val), html_cell)
html_report = '<table border="1">%s\n</table>' % html_report
if out_of_support:
html_report = '<p>' + escape_html(OUT_OF_SUPPORT_TEXT) + '</p>\n' + html_report
# Store locally results
now = datetime.datetime.utcnow()

Stéphane Diemer
committed
history_file = os.path.join(self.logs_dir, 'tests_history.txt')
add_header = not os.path.exists(history_file)
fo.write('Date | Result | Succeeded | Failed | Not testable\n')
fo.write('%s | %s | %s | %s | %s\n' % (
now.strftime('%Y-%m-%d %H:%M:%S'),
'KO' if failures > 0 else 'OK',
successes,
failures,
len(tests) - successes - failures,
))

Stéphane Diemer
committed
names = os.listdir(self.logs_dir)
names.sort()
for name in list(names):
names.remove(name)
while len(names) > self.MAX_LOG_FILES - 1:
name = names.pop(0)
try:

Stéphane Diemer
committed
print('Removing old log "%s".' % os.path.join(self.logs_dir, name))
os.remove(os.path.join(self.logs_dir, name))

Stéphane Diemer
committed
log_content = strip_colors(log_buffer.getvalue())
with open(os.path.join(self.logs_dir, self.get_log_name(now)), 'w') as fo:
fo.write(log_content)
return now, failures, out_of_support, log_content, html_report
def send_report_email(self, now, failures, out_of_support, log_content, html_report):
hostname = socket.gethostname()
if not hostname:
print('Failed to get hostname (required to send email).')

Stéphane Diemer
committed
return 1
fqdn = socket.getfqdn()
# Get sender and recipients

Stéphane Diemer
committed
recipients = get_conf('EMAIL_ADMINS') or ''
system_domain = get_conf('MS_SERVER_NAME')
system_type = 'MediaServer'
if not system_domain or system_domain == 'mediaserver':

Stéphane Diemer
committed
system_domain = get_conf('CM_SERVER_NAME')
system_type = 'MirisManager'
if not system_domain or system_domain == 'mirismanager':

Stéphane Diemer
committed
system_domain = get_conf('MONITOR_SERVER_NAME')
system_type = 'Server'
if not system_domain or system_domain == 'monitor':
system_domain = fqdn
if '.' in system_domain:
top_domain = '.'.join(system_domain.split('.')[-2:])
elif '.' in fqdn:
top_domain = '.'.join(fqdn.split('.')[-2:])
else:
top_domain = system_domain + '.local'
sender = hostname + '@' + top_domain
print('Sender address: %s' % sender)
# Prepare email contant

Stéphane Diemer
committed
if out_of_support:
system_domain = '[OUT OF SUPPORT] %s' % system_domain
recipients = recipients.replace('sysadmin@ubicast.eu', '').replace(
',,', ','
)
elif get_conf('PREMIUM_SUPPORT') != '0':
system_domain = '[PREMIUM] %s' % system_domain
recipients = recipients.replace('sysadmin@ubicast.eu', '').replace(
',,', ','
)
recipients += ',sysadmin+premium@ubicast.eu'
recipients = recipients.strip(',')
if not recipients:
print('No recipients defined for email sending. Set a value for EMAIL_ADMINS.')
return 0
boundary = str(uuid.uuid4())
if get_conf('TESTER_BASE64_ATTACH') != '0':
log_content_encoding = 'base64'
log_content = base64.b64encode(log_content.encode('utf-8')).decode()
mail = '''From: %(hostname)s <%(sender)s>
To: %(recipients)s
Subject: %(system_domain)s (%(hostname)s) %(system_type)s health report: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"
--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8
<p><b>Date: %(date)s UTC</b></p>
<p>FQDN: %(fqdn)s</p>
%(report)s
--%(boundary)s
Content-type: text/plain; name="%(log_name)s"; charset=UTF-8
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: %(log_content_encoding)s

Stéphane Diemer
committed
boundary=boundary,
sender=sender,
hostname=hostname,
recipients=recipients,
status=('KO (%s tests failed)' % failures) if failures > 0 else 'OK',
date=now.strftime('%Y-%m-%d %H:%M:%S'),

Stéphane Diemer
committed
report=html_report,
log_name=self.get_log_name(now).replace('.log', '.txt'),
log_content_encoding=log_content_encoding,
log_content=log_content,
system_domain=system_domain,
system_type=system_type,
)

Stéphane Diemer
committed
p = subprocess.Popen(
['/usr/sbin/sendmail', '-t'],
stdin=subprocess.PIPE,
stdout=sys.stdout.stream,
stderr=sys.stderr.stream,
)
p.communicate(input=mail.encode('utf-8'), timeout=1800)
if p.returncode != 0:
print('Failed to send email.')
return 1
else:
print('Email sent to: %s' % recipients)
return 0