Newer
Older
#!/usr/bin/env python3
Script to start tests and to manage their results
from io import StringIO
import datetime
import subprocess
import uuid
import glob
The system is out of support, UbiCast will not be notified if errors are detected.
Please contact UbiCast sales team (sales@ubicast.eu) to renew the support contract.\033[0m"""
class Logger(object):
def __init__(self, stream, log_buffer):
self.stream = stream
self.log_buffer = log_buffer
def write(self, text):
self.stream.write(text)
self.stream.flush()
self.log_buffer.write(text)
self.log_buffer.flush()
def flush(self):
pass
log_buffer = StringIO()
sys.stdout = Logger(sys.stdout, log_buffer)
sys.stderr = sys.stdout

Stéphane Diemer
committed
def strip_colors(text):

Stéphane Diemer
committed
def escape(text):
html = text.strip()
html = html.replace("<", "<")
html = html.replace(">", ">")
html = html.replace("\033[90m", '<span style="color: gray;">')
html = html.replace("\033[91m", '<span style="color: red;">')
html = html.replace("\033[92m", '<span style="color: green;">')
html = html.replace("\033[93m", '<span style="color: orange;">')
html = html.replace("\033[94m", '<span style="color: blue;">')
html = html.replace("\033[95m", '<span style="color: purple;">')
html = strip_colors(html)
return html
def raid_idle():
idle = True
for d in devs:
sync_state = f.read().strip()
if sync_state != "idle":
idle = False
print("State in %s is %s" % (d, sync_state))
return idle
-e: send email with report.
-f: send email with report only if at least one test failed.
-b: run only basic tests (exclude mediaserver tests).
-n: do not update envsetup repository.
-p: do not install packages.
-d: debug mode (can be started with non root users).
def __init__(self, *args):
log("\033[96m-------------------------------\033[0m")
log("\033[96m- UbiCast applications tester -\033[0m")
log("\033[96m-------------------------------\033[0m")
args = list(args)
# Check if help is required
if arg not in self.VALID_ARGS:
log('Invalid argument given: "%s".\n' % arg)
log("Optional target user : %s" % arg)
if not os.path.isdir(os.path.join("/home", arg)):
log("Mediaserver user %s does not exist" % arg)
sys.exit(1)
else:
msuser = arg
# Check current dir
root_dir = utils.get_dir(__file__)
os.chdir(root_dir)
self.root_dir = root_dir
# Add to python path
if root_dir not in sys.path:
sys.path.append(root_dir)
# Check that this script is run by root
debug = "-d" in args
whoami = subprocess.check_output(["whoami"]).decode("utf-8").strip()
if whoami != "root" and not debug:
log("This script should be run as root user.")
tester_path = os.path.join(root_dir, os.path.basename(__file__))
mtime = os.path.getmtime(tester_path)
if mtime != os.path.getmtime(tester_path):
log("The script has changed, restarting it...")
os.execl("/usr/bin/python3", "python3", tester_path, "-n", *args)
# Install utilities packages
if "-p" not in args:
subprocess.call(["python3", "pkgs_envsetup.py"])
# Load conf
conf = utils.load_conf()
if not conf:
sys.exit(1)
# Check for email value
email = "-e" in args
email_if_fail = "-f" in args
basic_only = "-b" in args
tests = self.discover_tests(basic_only, msuser, args)
if raid_idle():
exit_code = self.run_tests(tests, email, email_if_fail)
else:
print("A RAID check or operation is in progress, aborting tests")
exit_code = 1
sys.exit(exit_code)

Stéphane Diemer
committed
def parse_file_header(self, path):
content = fo.read()
if start > 0:
start += 3
content.find("'''", start)
if content.find("'''", start) != -1
description = content[start:end]
for line in content.split("\n"):
if line.startswith("#!"):
elif line.startswith("#"):
description += line[1:].strip() + "\n"
else:
break

Stéphane Diemer
committed
description = description.strip()
if description.startswith("Criticality:"):
criticality, *description = description.split("\n")
criticality = criticality[len("Criticality:") :].strip() # noqa: E203

Stéphane Diemer
committed
else:

Stéphane Diemer
committed
return criticality, description
def discover_tests(self, basic_only=False, msuser=None, args=[]):
ignored_tests = utils.get_conf("TESTER_IGNORED_TESTS", "").split(",")
if not os.path.isdir(path):
log('The tests dir is missing ("%s").' % path)
names = os.listdir(path)
names.sort()
if not names:
log('The tests dir is empty ("%s").' % path)
if name in ignored_tests:
continue

Stéphane Diemer
committed
criticality, description = self.parse_file_header(test_path)
tests.append((name, criticality, description, [test_path]))
tests.sort(key=lambda i: (-criticalities_map.get(i[1], 0), i[0]))
elif msuser:
tests = list()
# Get MS instances
ms_users = list()
for user in os.listdir("/home"):
if os.path.exists("/home/%s/msinstance" % user) and (
not msuser or user == msuser
):
ms_users.sort()
cleaned_list = list()
instances_to_test = utils.get_conf("TESTER_MS_INSTANCES", "").split(",")
if instances_to_test:
for val in instances_to_test:
val = val.strip()
if not val:
continue
if val in ms_users:
cleaned_list.append(val)
else:
log(
'An inexisting instance has been requested for tests: "%s".'
% val
)
if cleaned_list:
ms_users = cleaned_list
else:
try:
max_instances = int(utils.get_conf("TESTER_MAX_INSTANCES") or 2)
except Exception as e:
log("TESTER_MAX_INSTANCES has an invalid value: %s" % e)
max_instances = 2
if len(ms_users) > max_instances:
ms_users = ms_users[:max_instances]
log("Instances that will be tested: %s." % ", ".join(ms_users))
if not os.path.exists(ms_path):
log('Cloning ms-testing-suite in "%s".' % ms_path)
subprocess.call(
[
"git",
"clone",
"--recursive",
"https://panel.ubicast.eu/git/mediaserver/ms-testing-suite.git",
ms_path,
]
)
if os.path.exists(ms_path) and "-n" not in args:
log('Updating ms-testing-suite in "%s".' % ms_path)
os.chdir(ms_path)
subprocess.call(["git", "checkout", branch])
subprocess.call(["git", "fetch", "--recurse-submodules", "--all"])
subprocess.call(["git", "reset", "--hard", "origin/{}".format(branch)])
subprocess.call(["git", "pull", "--recurse-submodules"])
subprocess.call(["git", "submodule", "update", "--init", "--recursive"])
os.chdir(self.root_dir)
log("Add MediaServer tests if available.")
wowza_dir = "/usr/local/WowzaStreamingEngine"
etc_lives_conf = "/etc/mediaserver/lives_conf.py"
local_lives_conf = "/home/%s/msinstance/conf/lives_conf.py"
# Check if live tests should be started
if (
os.path.exists(wowza_dir)
or os.path.exists(etc_lives_conf)
or os.path.exists(local_lives_conf % user)
):
ms_tests.append("test_wowza_secure.py")
ms_tests.append("ms_live_tester.py")
if name in ignored_tests:
continue
test_path = os.path.join(ms_path, name)

Stéphane Diemer
committed
criticality, description = self.parse_file_header(test_path)
tests.append(
(
"%s (%s)" % (name, user),
criticality,
description,
[test_path, user],
)
)
tests.sort(key=lambda i: (-criticalities_map.get(i[1], 0), i[0]))
def run_tests(self, tests, email=False, email_if_fail=False): # noqa: C901
# Run all tests
report_rows = [("Test", "Criticality", "Result", "Duration", "Description")]

Stéphane Diemer
committed
report_rows_length = [len(t) for t in report_rows[0]]

Stéphane Diemer
committed
for name, criticality, description, command in tests:
log('\033[1;95m-- Test "%s" --\033[0;0m' % name)
start_date = datetime.datetime.utcnow()
log("Test start: %s UTC." % start_date.strftime("%Y-%m-%d %H:%M:%S"))
count = 0
while count < 3:
count += 1
log("Attempt: %s" % str(count))
p = subprocess.Popen(
command,
stdin=sys.stdin,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if p.returncode in (0, 2, 3):
break
sleep(5 * count * count)
out = out.decode("utf-8").strip()
out_of_support = out_of_support or "out of support" in out
elif p.returncode == 3:

Stéphane Diemer
committed
# Get duration
end_date = datetime.datetime.utcnow()
duration = end_date - start_date
if total_duration:
total_duration += duration
else:
total_duration = duration
log(
"Test end: %s UTC (duration: %s)."
% (end_date.strftime("%Y-%m-%d %H:%M:%S"), duration)
)

Stéphane Diemer
committed
# Prepare report
report_rows.append((name, criticality, status, str(duration), description))
report_rows_length = [
max(len(strip_colors(t)), report_rows_length[i])
for i, t in enumerate(report_rows[-1])
]

Stéphane Diemer
committed
for row in report_rows:
for i, val in enumerate(row):
if i == len(row) - 1:
break
if i == 0:
# merge name and description
log_report += "\n\033[96m%s\033[0m \033[37m%s\033[0m\n" % (
val,
row[-1],
)
else:
nb_sp = report_rows_length[i] - len(strip_colors(val))
log_report += " %s%s" % (val, " " * nb_sp)
log_report += "\n" + "-" * 50
html_cell = "th" if not html_report else "td"
html_report += "\n <tr>"

Stéphane Diemer
committed
for i, val in enumerate(row):
html_report += " <%s>%s</%s>" % (html_cell, escape(val), html_cell)
html_report += " </tr>"
html_report = '<table border="1">%s\n</table>' % html_report
html_report = "<p>" + escape(OUT_OF_SUPPORT_TEXT) + "</p>\n" + html_report
# Store locally results
now = datetime.datetime.utcnow()
if not os.path.exists(log_dir):
os.makedirs(log_dir)
history_file = os.path.join(log_dir, "tests_history.txt")
add_header = not os.path.exists(history_file)
fo.write("Date | Result | Succeeded | Failed | Not testable\n")
fo.write(
"%s | %s | %s | %s | %s\n"
% (
now.strftime("%Y-%m-%d %H:%M:%S"),
"KO" if failures > 0 else "OK",
successes,
failures,
len(tests) - successes - failures,
)
)
# Search for old logs to remove
names = os.listdir(log_dir)
names.sort()
for name in list(names):
names.remove(name)
while len(names) > self.MAX_LOG_FILES - 1:
name = names.pop(0)
try:
log('Removing old log "%s".' % os.path.join(log_dir, name))
os.remove(os.path.join(log_dir, name))
except Exception as e:
log("Failed to get hostname (required to send email).")
log_name = "results_%s_%s.txt" % (
hostname or "noname",
now.strftime("%Y-%m-%d_%H-%M-%S"),
)

Stéphane Diemer
committed
log_content = strip_colors(log_buffer.getvalue())
# Send email
send_email = False
if hostname:
if email:
send_email = True
elif email_if_fail and failures > 0:
# if they were too many consecutive failures, do not send the email
lines.reverse()
consecutive_failures = 0
for line in lines:
if line:
consecutive_failures += 1
else:
break
if consecutive_failures == self.NO_MAIL_FAILURES_COUNT:
consecutive_msg = (
"Maximum consecutive tester failures reached (%s).\nNo more emails will be sent."
% consecutive_failures
)
send_email = True
elif consecutive_failures < self.NO_MAIL_FAILURES_COUNT:
consecutive_msg = (
"Consecutive tester failures: %s (will stop sending reports when reaching %s failures)."
% (consecutive_failures, self.NO_MAIL_FAILURES_COUNT)
)
consecutive_msg = (
"Too many consecutive tester failures: %s, no email will be sent."
% consecutive_failures
)
html_report += "\n<br/>" + consecutive_msg.replace("\n", "\n<br/>")
recipients = utils.get_conf("EMAIL_ADMINS") or ""
system_domain = utils.get_conf("MS_SERVER_NAME")
system_type = "MediaServer"
if system_domain == "mediaserver":
system_domain = utils.get_conf("CM_SERVER_NAME")
system_type = "MirisManager"
if system_domain == "mirismanager":
system_domain = utils.get_conf("MONITOR_SERVER_NAME")
system_type = "Server"
if system_domain == "monitor":
system_type = "-"
recipients = recipients.replace("sysadmin@ubicast.eu", "").replace(
",,", ","
)
elif utils.get_conf("PREMIUM_SUPPORT") != "0":
system_domain = "[PREMIUM] %s" % system_domain
recipients = recipients.replace("sysadmin@ubicast.eu", "").replace(
",,", ","
)
recipients += ",sysadmin+premium@ubicast.eu"
recipients = recipients.strip(",")
if not recipients:
log(
"No recipients defined for email sending. Set a value for EMAIL_ADMINS."
)
return 1
boundary = str(uuid.uuid4())
if utils.get_conf("TESTER_BASE64_ATTACH") != "0":
log_content_encoding = "base64"
log_content = base64.b64encode(log_content.encode("utf-8")).decode()
mail = """From: %(hostname)s <%(sender)s>
To: %(recipients)s
Subject: %(system_domain)s (%(hostname)s) %(system_type)s health report: %(status)s
Mime-Version: 1.0
Content-type: multipart/related; boundary="%(boundary)s"
--%(boundary)s
Content-Type: text/html; charset=UTF-8
Content-transfer-encoding: utf-8
<p><b>Date: %(date)s UTC</b></p>
%(report)s
--%(boundary)s
Content-type: text/plain; name="%(log_name)s"; charset=UTF-8
Content-disposition: attachment; filename="%(log_name)s"
Content-transfer-encoding: %(log_content_encoding)s
boundary=boundary,
recipients=recipients,
status=("KO (%s tests failed)" % failures) if failures > 0 else "OK",
date=now.strftime("%Y-%m-%d %H:%M:%S"),
report=html_report,
log_content_encoding=log_content_encoding,
log_content=log_content,
system_domain=system_domain,
system_type=system_type,
p = subprocess.Popen(
["/usr/sbin/sendmail", "-t"],
stdin=subprocess.PIPE,
stdout=sys.stdout.stream,
stderr=sys.stderr.stream,
)
p.communicate(input=mail.encode("utf-8"))
if p.returncode != 0:
return 1
else:
exit_code = 1 if failures > 0 else 0
return exit_code
Tester(*sys.argv[1:])