Skip to content
Snippets Groups Projects
Verified Commit 1d6ee2a2 authored by Nicolas KAROLAK's avatar Nicolas KAROLAK
Browse files

fixes #28791

parent f4165ac6
No related branches found
No related tags found
No related merge requests found
...@@ -7,6 +7,7 @@ Tests that all webserver services (vhosts) are available and reachable. ...@@ -7,6 +7,7 @@ Tests that all webserver services (vhosts) are available and reachable.
import imp import imp
import os import os
from pathlib import Path
import re import re
import requests import requests
import socket import socket
...@@ -27,18 +28,69 @@ This script checks for all enabled vhosts in Nginx conf that: ...@@ -27,18 +28,69 @@ This script checks for all enabled vhosts in Nginx conf that:
""" """
def get_configs(path: str) -> list:
configs_dir = Path(path)
configs = [c.resolve() for c in configs_dir.glob("*.conf")]
return configs
def get_vhosts(config: Path) -> list:
# remove comments and blank lines
sanitize = re.compile(r"(?:\s*#\s*.*)|(?:^\s*)", re.M)
# capture server blocks
servers = re.compile(r"^server\s+{(?:\s*(?!server\s{).)+", re.M)
with open(config) as config_fo:
config_content = sanitize.sub(r"", config_fo.read())
vhosts = servers.findall(config_content)
return vhosts
def get_hostnames(vhost: str) -> list:
# extract hostname(s) from server_name values
server_names = re.compile(r"^\s*server_name\s+(.*);$")
hostnames = []
for line in vhost.splitlines():
if server_names.match(line):
hostnames.extend(server_names.match(line)[1].split())
return hostnames
def get_ports(vhost: str) -> list:
# extract port(s) from listen values
listens = re.compile(r"^\s*listen\s+(?:.*:)?(\d+)\s*(ssl)?.*;$")
ports = []
for line in vhost.splitlines():
if listens.match(line):
ports.append(
(listens.match(line)[1], "https" if listens.match(line)[2] else "http")
)
return ports
def test_vhost( def test_vhost(
ports_info=None, domains=None, resolution_ignored=None, celerity_conf="" ports_info=None,
domains=None,
resolution_ignored=None,
celerity_conf="",
nginx_file=None,
wowza_dir=None,
tested=0,
): ):
tested = 0
errors = 0 errors = 0
warnings = 0 warnings = 0
for port, https in ports_info or [(80, False)]: name = nginx_file.stem
for port, proto in ports_info or [(80, False)]:
for domain in domains or ["localhost"]: for domain in domains or ["localhost"]:
tested += 1 url = "%s://%s:%s" % (proto, domain, port)
url = "%s://%s:%s" % ("https" if https else "http", domain, port) sys.stdout.write('Testing url "%s" from %s:\n' % (url, name))
sys.stdout.write('Testing url "%s":\n' % url) if name.startswith("mediaserver") and not tested:
if name.startswith("mediaserver"):
if not celerity_conf or not re.search( if not celerity_conf or not re.search(
r"http[s]{0,1}://%s" % domain, celerity_conf r"http[s]{0,1}://%s" % domain, celerity_conf
): ):
...@@ -129,6 +181,7 @@ def test_vhost( ...@@ -129,6 +181,7 @@ def test_vhost(
"\033[92mOK (%s, %sms)\033[0m" % (code, req_time) "\033[92mOK (%s, %sms)\033[0m" % (code, req_time)
) )
sys.stdout.write(".\n") sys.stdout.write(".\n")
tested += 1
if ip_warning: if ip_warning:
warnings += 1 warnings += 1
...@@ -138,7 +191,7 @@ def test_vhost( ...@@ -138,7 +191,7 @@ def test_vhost(
return tested, warnings, errors return tested, warnings, errors
if __name__ == "__main__": def main():
# check that Nginx dir exists # check that Nginx dir exists
nginx_dir = "/etc/nginx/sites-enabled" nginx_dir = "/etc/nginx/sites-enabled"
if not os.path.exists(nginx_dir): if not os.path.exists(nginx_dir):
...@@ -170,50 +223,27 @@ if __name__ == "__main__": ...@@ -170,50 +223,27 @@ if __name__ == "__main__":
# get enabled vhosts # get enabled vhosts
resolution_ignored = conf.get("TESTER_VHOST_RESOLUTION_IGNORED", "").split(",") resolution_ignored = conf.get("TESTER_VHOST_RESOLUTION_IGNORED", "").split(",")
tested = 0
errors = 0 errors = 0
warnings = 0 warnings = 0
names = os.listdir(nginx_dir) nginx_confs = get_configs(nginx_dir)
names.sort() for nginx_conf in nginx_confs:
for name in names: tested = 0
path = os.path.join(nginx_dir, name) vhosts = get_vhosts(nginx_conf)
level = 0 for vhost in vhosts:
domains = list() hostnames = get_hostnames(vhost)
ports_info = list() ports = get_ports(vhost)
print('Parsing vhost "%s"...' % path) t, w, e = test_vhost(
with open(path, "r") as fo: ports,
for line in fo: hostnames,
line = line.strip() resolution_ignored,
if not line or line.startswith("#"): celerity_conf,
continue nginx_conf,
words = re.sub(r"\s+", " ", line).strip("; ").split(" ") wowza_dir,
if "{" in words or "}" in words: tested,
level += words.count("{") )
level -= words.count("}") tested += t
if level == 0: warnings += w
# test errors += e
if ports_info or domains:
t, w, e = test_vhost(
ports_info, domains, resolution_ignored, celerity_conf
)
tested += t
warnings += w
errors += e
domains = list()
ports_info = list()
elif level == 1:
# server section are level 1
if words[0] == "listen":
https = "ssl" in words
for port in words:
try:
port = int(port.split(":")[-1])
except ValueError:
pass
else:
ports_info.append((port, https))
elif words[0] == "server_name":
domains = words[1:]
if errors: if errors:
print("%s url(s) did not correctly respond." % errors) print("%s url(s) did not correctly respond." % errors)
...@@ -223,3 +253,7 @@ if __name__ == "__main__": ...@@ -223,3 +253,7 @@ if __name__ == "__main__":
if not tested: if not tested:
print("No url found in Nginx sites-enabled dir.") print("No url found in Nginx sites-enabled dir.")
sys.exit(1) sys.exit(1)
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment