Newer
Older
#!/usr/bin/env python3
Script to start tests and to manage their results
from io import StringIO
import datetime
import subprocess
import uuid
import glob
# temp fix: broken git state
# import utils
The system is out of support, UbiCast will not be notified if errors are detected.
Please contact UbiCast sales team (sales@ubicast.eu) to renew the support contract.\033[0m"""
class Logger(object):
def __init__(self, stream, log_buffer):
self.stream = stream
self.log_buffer = log_buffer
def write(self, text):
self.stream.write(text)
self.stream.flush()
self.log_buffer.write(text)
self.log_buffer.flush()
def flush(self):
pass
log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout

Stéphane Diemer
committed
def strip_colors(text):

Stéphane Diemer
committed
def escape(text):
html = text.strip()
html = html.replace("<", "<")
html = html.replace(">", ">")
html = html.replace("\033[90m", '<span style="color: gray;">')
html = html.replace("\033[91m", '<span style="color: red;">')
html = html.replace("\033[92m", '<span style="color: green;">')
html = html.replace("\033[93m", '<span style="color: orange;">')
html = html.replace("\033[94m", '<span style="color: blue;">')
html = html.replace("\033[95m", '<span style="color: purple;">')
html = strip_colors(html)
return html
def raid_idle():
idle = True
for d in devs:
sync_state = f.read().strip()
if sync_state != "idle":
idle = False
print("State in %s is %s" % (d, sync_state))
return idle
class Tester:
USAGE = (
"""%s [-e] [-f] [-b] [-n] [-d] [-h] [msuser]
-e: send email with report.
-f: send email with report only if at least one test failed.
-b: run only basic tests (exclude mediaserver tests).
-n: do not update envsetup repository.
-d: debug mode (can be started with non root users).
-h: show this message."""
% __file__
)
VALID_ARGS = ["-e", "-f", "-b", "-n", "-d", "-h"]
def __init__(self, *args):
# temp fix: broken git state
import utils
from utils import log
log("\033[96m-------------------------------\033[0m")
log("\033[96m- UbiCast applications tester -\033[0m")
log("\033[96m-------------------------------\033[0m")
args = list(args)
# Check if help is required
if arg not in self.VALID_ARGS:
log('Invalid argument given: "%s".\n' % arg)
log("Optional target user : %s" % arg)
if not os.path.isdir(os.path.join("/home", arg)):
log("Mediaserver user %s does not exist" % arg)
sys.exit(1)
else:
msuser = arg
# Check current dir
root_dir = utils.get_dir(__file__)
os.chdir(root_dir)
self.root_dir = root_dir
# Add to python path
if root_dir not in sys.path:
sys.path.append(root_dir)
# Check that this script is run by root
debug = "-d" in args
whoami = subprocess.check_output(["whoami"]).decode("utf-8").strip()
if whoami != "root" and not debug:
log("This script should be run as root user.")
tester_path = os.path.join(root_dir, os.path.basename(__file__))
mtime = os.path.getmtime(tester_path)
if mtime != os.path.getmtime(tester_path):
log("The script has changed, restarting it...")
os.execl("/usr/bin/python3", "python3", tester_path, "-n", *args)
# Load conf
conf = utils.load_conf()
if not conf:
sys.exit(1)
# Check for email value
email = "-e" in args
email_if_fail = "-f" in args
basic_only = "-b" in args
tests = self.discover_tests(basic_only, msuser)
if raid_idle():
exit_code = self.run_tests(tests, email, email_if_fail)
else:
print("A RAID check or operation is in progress, aborting tests")
exit_code = 1
sys.exit(exit_code)

Stéphane Diemer
committed
def parse_file_header(self, path):
content = fo.read()
if start > 0:
start += 3
content.find("'''", start)
if content.find("'''", start) != -1
description = content[start:end]
for line in content.split("\n"):
if line.startswith("#!"):
elif line.startswith("#"):
description += line[1:].strip() + "\n"
else:
break

Stéphane Diemer
committed
description = description.strip()
if description.startswith("Criticality:"):
criticality, *description = description.split("\n")
criticality = criticality[len("Criticality:") :].strip()
description = "\n".join(description)

Stéphane Diemer
committed
else:

Stéphane Diemer
committed
return criticality, description
def discover_tests(self, basic_only=False, msuser=None):
# temp fix: broken git state
import utils
from utils import log
ignored_tests = utils.get_conf("TESTER_IGNORED_TESTS", "").split(",")
if not os.path.isdir(path):
log('The tests dir is missing ("%s").' % path)
names = os.listdir(path)
names.sort()
if not names:
log('The tests dir is empty ("%s").' % path)
if name in ignored_tests:
continue

Stéphane Diemer
committed
criticality, description = self.parse_file_header(test_path)
tests.append((name, criticality, description, [test_path]))
tests.sort(key=lambda i: (-criticalities_map.get(i[1], 0), i[0]))
elif msuser:
tests = list()
# Get MS instances
ms_users = list()
for user in os.listdir("/home"):
if os.path.exists("/home/%s/msinstance" % user) and (
not msuser or user == msuser
):
ms_users.sort()
cleaned_list = list()
instances_to_test = utils.get_conf("TESTER_MS_INSTANCES", "").split(",")
if instances_to_test:
for val in instances_to_test:
val = val.strip()
if not val:
continue
if val in ms_users:
cleaned_list.append(val)
else:
log(
'An inexisting instance has been requested for tests: "%s".'
% val
)
if cleaned_list:
ms_users = cleaned_list
else:
try:
max_instances = int(utils.get_conf("TESTER_MAX_INSTANCES") or 2)
except Exception as e:
log("TESTER_MAX_INSTANCES has an invalid value: %s" % e)
max_instances = 2
if len(ms_users) > max_instances:
ms_users = ms_users[:max_instances]
log("Instances that will be tested: %s." % ", ".join(ms_users))
if not os.path.exists(ms_path):
log('Cloning ms-testing-suite in "%s".' % ms_path)
subprocess.call(
[
"git",
"clone",
"--recursive",
"https://panel.ubicast.eu/git/mediaserver/ms-testing-suite.git",
ms_path,
]
)
if os.path.exists(ms_path):
log('Updating ms-testing-suite in "%s".' % ms_path)
os.chdir(ms_path)
subprocess.call(["git", "checkout", branch])
subprocess.call(["git", "fetch", "--recurse-submodules", "--all"])
subprocess.call(["git", "reset", "--hard", "origin/{}".format(branch)])
subprocess.call(["git", "pull", "--recurse-submodules"])
subprocess.call(["git", "submodule", "update", "--init", "--recursive"])
os.chdir(self.root_dir)
log("Add MediaServer tests if available.")
wowza_dir = "/usr/local/WowzaStreamingEngine"
etc_lives_conf = "/etc/mediaserver/lives_conf.py"
local_lives_conf = "/home/%s/msinstance/conf/lives_conf.py"
# Check if live tests should be started
if (
os.path.exists(wowza_dir)
or os.path.exists(etc_lives_conf)
or os.path.exists(local_lives_conf % user)
):
ms_tests.append("test_wowza_secure.py")
ms_tests.append("ms_live_tester.py")
if name in ignored_tests:
continue
test_path = os.path.join(ms_path, name)

Stéphane Diemer
committed
criticality, description = self.parse_file_header(test_path)
tests.append(
(
"%s (%s)" % (name, user),
criticality,
description,
[test_path, user],
)
)
tests.sort(key=lambda i: (-criticalities_map.get(i[1], 0), i[0]))
def run_tests(self, tests, email=False, email_if_fail=False):
# temp fix: broken git state
import utils
from utils import log
# Run all tests
report_rows = [("Test", "Criticality", "Result", "Duration", "Description")]

Stéphane Diemer
committed
report_rows_length = [len(t) for t in report_rows[0]]

Stéphane Diemer
committed
for name, criticality, description, command in tests:
log('\033[1;95m-- Test "%s" --\033[0;0m' % name)
start_date = datetime.datetime.utcnow()
log("Test start: %s UTC." % start_date.strftime("%Y-%m-%d %H:%M:%S"))
p = subprocess.Popen(
command, stdin=sys.stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
out, err = p.communicate()
if out:
out = out.decode("utf-8").strip()
out_of_support = out_of_support or "out of support" in out
elif p.returncode == 3:

Stéphane Diemer
committed
# Get duration
end_date = datetime.datetime.utcnow()
duration = end_date - start_date
if total_duration:
total_duration += duration
else:
total_duration = duration
log(
"Test end: %s UTC (duration: %s)."
% (end_date.strftime("%Y-%m-%d %H:%M:%S"), duration)
)

Stéphane Diemer
committed
# Prepare report
report_rows.append((name, criticality, status, str(duration), description))
report_rows_length = [
max(len(strip_colors(t)), report_rows_length[i])
for i, t in enumerate(report_rows[-1])
]

Stéphane Diemer
committed
for row in report_rows:
for i, val in enumerate(row):
if i == len(row) - 1:
break
if i == 0:
# merge name and description
log_report += "\n\033[96m%s\033[0m \033[37m%s\033[0m\n" % (
val,
row[-1],
)
else:
nb_sp = report_rows_length[i] - len(strip_colors(val))
log_report += " %s%s" % (val, " " * nb_sp)
log_report += "\n" + "-" * 50
html_cell = "th" if not html_report else "td"
html_report += "\n <tr>"

Stéphane Diemer
committed
for i, val in enumerate(row):
html_report += " <%s>%s</%s>" % (html_cell, escape(val), html_cell)
html_report += " </tr>"
html_report = '<table border="1">%s\n</table>' % html_report
html_report = "<p>" + escape(OUT_OF_SUPPORT_TEXT) + "</p>\n" + html_report
# Store locally results
now = datetime.datetime.utcnow()
if not os.path.exists(log_dir):
os.makedirs(log_dir)
history_file = os.path.join(log_dir, "tests_history.txt")
add_header = not os.path.exists(history_file)
fo.write("Date | Result | Succeeded | Failed | Not testable\n")
fo.write(
"%s | %s | %s | %s | %s\n"
% (
now.strftime("%Y-%m-%d %H:%M:%S"),
"KO" if failures > 0 else "OK",
successes,
failures,
len(tests) - successes - failures,
)
)
# Search for old logs to remove
names = os.listdir(log_dir)
names.sort()
for name in list(names):
names.remove(name)
while len(names) > self.MAX_LOG_FILES - 1:
name = names.pop(0)
try:
log('Removing old log "%s".' % os.path.join(log_dir, name))
os.remove(os.path.join(log_dir, name))
except Exception as e:
log("Failed to get hostname (required to send email).")
log_name = "results_%s_%s.txt" % (
hostname or "noname",
now.strftime("%Y-%m-%d_%H-%M-%S"),
)

Stéphane Diemer
committed
log_content = strip_colors(log_buffer.getvalue())
# Send email
send_email = False
if hostname:
if email:
send_email = True
elif email_if_fail and failures > 0:
# if they were too many consecutive failures, do not send the email
lines.reverse()
consecutive_failures = 0
for line in lines:
if line:
consecutive_failures += 1
else:
break
if consecutive_failures == self.NO_MAIL_FAILURES_COUNT:
consecutive_msg = (
"Maximum consecutive tester failures reached (%s).\nNo more emails will be sent."
% consecutive_failures
)
send_email = True
elif consecutive_failures < self.NO_MAIL_FAILURES_COUNT:
consecutive_msg = (
"Consecutive tester failures: %s (will stop sending reports when reaching %s failures)."
% (consecutive_failures, self.NO_MAIL_FAILURES_COUNT)
)
consecutive_msg = (
"Too many consecutive tester failures: %s, no email will be sent."
% consecutive_failures
)
html_report += "\n<br/>" + consecutive_msg.replace("\n", "\n<br/>")
recipients = utils.get_conf("EMAIL_ADMINS") or ""
system_domain = utils.get_conf("MS_SERVER_NAME")
system_type = "MediaServer"
if system_domain == "mediaserver":
system_domain = utils.get_conf("CM_SERVER_NAME")
system_type = "MirisManager"
if system_domain == "mirismanager":
system_domain = utils.get_conf("MONITOR_SERVER_NAME")
system_type = "Server"
if system_domain == "monitor":
system_type = "-"
recipients = recipients.replace("sysadmin@ubicast.eu", "").replace(
",,", ","
)
elif utils.get_conf("PREMIUM_SUPPORT") != "0":
system_domain = "[PREMIUM] %s" % system_domain
recipients = recipients.replace("sysadmin@ubicast.eu", "").replace(
",,", ","
)
recipients += ",sysadmin+premium@ubicast.eu"
recipients = recipients.strip(",")
if not recipients:
log(
"No recipients defined for email sending. Set a value for EMAIL_ADMINS."
)
return 1
boundary = str(uuid.uuid4())
To: %(recipients)s
Subject: %(system_domain)s (%(hostname)s) %(system_type)s health report: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"
--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8
<p><b>Date: %(date)s UTC</b></p>
%(report)s
--%(boundary)s
Content-type: text/plain; name="%(log_name)s"; charset=UTF-8
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: utf-8
boundary=boundary,
recipients=recipients,
status=("KO (%s tests failed)" % failures) if failures > 0 else "OK",
date=now.strftime("%Y-%m-%d %H:%M:%S"),
report=html_report,
system_domain=system_domain,
system_type=system_type,
p = subprocess.Popen(
["/usr/sbin/sendmail", "-t"],
stdin=subprocess.PIPE,
stdout=sys.stdout.stream,
stderr=sys.stderr.stream,
)
p.communicate(input=mail.encode("utf-8"))
if p.returncode != 0:
return 1
else:
exit_code = 1 if failures > 0 else 0
return exit_code
subprocess.call(["python3", "/root/envsetup/update_envsetup.py"])
Tester(*sys.argv[1:])