Newer
Older
Criticality: Normal
Checks that the streaming server (Wowza) is running correctly.
sys.path.append(str(Path(__file__).parents[1].resolve()))
# pylint: disable=wrong-import-position
from envsetup import utils as u # noqa: E402
def main():
"""Run all checks and exits with corresponding exit code."""
print("Checking Wowza settings:")
warnings = 0
errors = 0
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
if not check_installed():
exit(2)
# check wowza version
check_warn, check_err = check_version()
if check_err:
errors += 1
elif check_warn:
warnings += 1
# check wowza heap size
check_warn, check_err = check_heap_size()
if check_err:
errors += 1
elif check_warn:
warnings += 1
# check that wowza is running
check_warn, check_err = check_running()
if check_err:
errors += 1
elif check_warn:
warnings += 1
if errors:
exit(1)
elif warnings:
exit(3)
exit(0)
def check_installed() -> bool:
"""Check that Wowza is installed.
:return: Exit return codes
:rtype: bool
"""
cmd = "dpkg --get-selections 'wowza*'"
out = subprocess.getoutput(cmd)
if state != "install":
u.info("not installed, skip test")
return False
return True
def check_version() -> tuple:
"""Check the Wowza version installed.
:return: Exit return codes
:rtype: bool
"""
warnings = 0
errors = 0
cmd = "dpkg --get-selections 'wowza*'"
out = subprocess.getoutput(cmd)
for line in out.split("\n"):
if line.split()[-1] == "install":
u.error("many Wowza versions installed, keep only latest")
errors += 1
version = ".".join(re.findall(r"\d", line))
u.error("cannot find wWowza version")
errors += 1
u.warning("using outdated version: {}".format(version))
warnings += 1
u.success("using recommended version: {}".format(LATEST_VERSION))
return warnings, errors
def check_heap_size() -> tuple:
"""Check the heap size configured.
:return: Exit return codes
:rtype: bool
"""
warnings = 0
errors = 0
cmd = "grep '<HeapSize>2000M</HeapSize>' /usr/local/WowzaStreamingEngine/conf/Tune.xml"
check_heap, _ = subprocess.getstatusoutput(cmd)
if check_heap != 0:
u.warning("not using recommended heap size")
warnings += 1
return warnings, errors
def check_running() -> tuple:
"""Check that Wowza is running.
:return: Exit return codes
:rtype: bool
"""
warnings = 0
errors = 0
cmd = "systemctl status WowzaStreamingEngine"
out = subprocess.getoutput(cmd)
if "Active: active (running)" not in out:
u.error("service not running")
errors += 1
u.success("service running")
return warnings, errors