Skip to content
Snippets Groups Projects
Verified Commit d0860ed8 authored by Nicolas KAROLAK's avatar Nicolas KAROLAK
Browse files

update wowza test

parent 2735803d
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2017, Florent Thiery
'''
"""
Criticality: Normal
Checks that the streaming server (Wowza) is running correctly.
'''
"""
from pathlib import Path
import re
import subprocess
import subprocess # nosec: B404
import sys
YELLOW = '\033[93m'
GREEN = '\033[92m'
RED = '\033[91m'
DEF = '\033[0m'
sys.path.append(str(Path(__file__).parents[1].resolve()))
# pylint: disable=wrong-import-position
from envsetup import utils as u # noqa: E402
LATEST_VERSION = '4.7.7'
LATEST_VERSION = "4.7.7"
def check_wowza():
def main():
"""Run all checks and exits with corresponding exit code."""
print("Checking Wowza settings:")
warnings = 0
errors = 0
# check if wowza is installed
print('Getting installed Wowza packages...')
if not check_installed():
exit(2)
# check wowza version
check_warn, check_err = check_version()
if check_err:
errors += 1
elif check_warn:
warnings += 1
# check wowza heap size
check_warn, check_err = check_heap_size()
if check_err:
errors += 1
elif check_warn:
warnings += 1
# check that wowza is running
check_warn, check_err = check_running()
if check_err:
errors += 1
elif check_warn:
warnings += 1
if errors:
exit(1)
elif warnings:
exit(3)
exit(0)
def check_installed() -> bool:
"""Check that Wowza is installed.
:return: Exit return codes
:rtype: bool
"""
cmd = "dpkg --get-selections 'wowza*'"
print(cmd)
out = subprocess.getoutput(cmd)
out = re.sub(r'\s+', ' ', out)
print(out)
if ' install' not in out:
print('Wowza is not installed, skipping test.')
return 2
else:
print('Wowza is installed.')
state = out.split()[-1]
# check wowza version
if state != "install":
u.info("not installed, skip test")
return False
return True
def check_version() -> tuple:
"""Check the Wowza version installed.
:return: Exit return codes
:rtype: bool
"""
warnings = 0
errors = 0
cmd = "dpkg --get-selections 'wowza*'"
out = subprocess.getoutput(cmd)
version = None
for line in out.split('\n'):
if ' install' in line:
for line in out.split("\n"):
if line.split()[-1] == "install":
if version:
print('%sMultiple versions of Wowza are installed.%s' % (RED, DEF))
print('%sPlease remove unused versions.%s' % (RED, DEF))
return 1
version = '.'.join(re.findall(r'\d', line))
u.error("many Wowza versions installed, keep only latest")
errors += 1
version = ".".join(re.findall(r"\d", line))
if not version:
print('%sWowza version not found.%s' % (RED, DEF))
return 1
code = 0
u.error("cannot find wWowza version")
errors += 1
if version != LATEST_VERSION:
print('%sWowza is not using recommended version (current version: %s, recommended version: %s).%s' % (YELLOW, version, LATEST_VERSION, DEF))
code = 3
u.warning("using outdated version: {}".format(version))
warnings += 1
else:
print('%sWowza is using the recommended version: %s.%s' % (GREEN, LATEST_VERSION, DEF))
u.success("using recommended version: {}".format(LATEST_VERSION))
# check wowza heap size
print('Checking Wowza heap size...')
p = subprocess.run("grep '<HeapSize>2000M</HeapSize>' /usr/local/WowzaStreamingEngine/conf/Tune.xml", shell=True)
if p.returncode != 0:
print('%sWowza is not using recommended heap size (2000M) in configuration file "/usr/local/WowzaStreamingEngine/conf/Tune.xml".%s' % (YELLOW, DEF))
code = 3
return warnings, errors
def check_heap_size() -> tuple:
"""Check the heap size configured.
:return: Exit return codes
:rtype: bool
"""
warnings = 0
errors = 0
cmd = "grep '<HeapSize>2000M</HeapSize>' /usr/local/WowzaStreamingEngine/conf/Tune.xml"
check_heap, _ = subprocess.getstatusoutput(cmd)
if check_heap != 0:
u.warning("not using recommended heap size")
warnings += 1
else:
print('%sWowza is using recommended heap size (2000M).%s' % (GREEN, DEF))
u.success("using recommended heap size")
# check that wowza is running
print('Checking that Wowza is running...')
out = subprocess.getoutput('systemctl status WowzaStreamingEngine')
print(out)
if 'Active: active (running)' not in out:
print('%sWowza is not running.%s' % (RED, DEF))
code = 1
return warnings, errors
def check_running() -> tuple:
"""Check that Wowza is running.
:return: Exit return codes
:rtype: bool
"""
warnings = 0
errors = 0
cmd = "systemctl status WowzaStreamingEngine"
out = subprocess.getoutput(cmd)
if "Active: active (running)" not in out:
u.error("service not running")
errors += 1
else:
print('%sWowza is running.%s' % (GREEN, DEF))
return code
u.success("service running")
return warnings, errors
if __name__ == '__main__':
sys.exit(check_wowza())
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment