Newer
Older
"""EnvSetup utilities."""
from collections import OrderedDict
from typing import Any
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
DEF = "\033[0m"
DEFAULT_CONF_PATH = "global-conf.sh"
AUTO_CONF_PATH = "auto-generated-conf.sh"
CONF_PATH = "conf.sh"
def log(text: str, error: bool = False):
"""Output log message to stout or stderr.
:param text: Message to log
:type text: str
:param error: Wether it should output to stderr or not, defaults to False
:param error: bool, optional
"""
fo = sys.stderr if error else sys.stdout
print(text, file=fo)
fo.flush()
def info(message: str):
"""Print formatted info message.
:param message: Message to print
:type message: str
"""
log(" {}ⓘ{} {}".format(BLUE, DEF, message))
def success(message: str):
"""Print formatted success message.
:param message: Message to print
:type message: str
"""
log(" {}✔{} {}".format(GREEN, DEF, message))
def warning(message: str):
"""Print formatted warning message.
:param message: Message to print
:type message: str
"""
log(" {}⚠{} {}".format(YELLOW, DEF, message), True)
def error(message: str):
"""Print formatted error message.
:param message: Message to print
:type message: str
"""
log(" {}✖{} {}".format(RED, DEF, message), True)
def get_dir(file_path: str) -> str:
"""Get the absolute directory path for the given file.
:param file_path: File path
:type file_path: str
:return: Absolute directory path
:rtype: str
"""
return str(Path(file_path).expanduser().resolve().parent)
def exec_cmd(cmd: Any, log_output: bool = True, get_output: bool = True) -> tuple:
"""Execute the given command.
:param cmd: Command to run
:type cmd: Any
:param log_output: Wether to log output or not, defaults to True
:param log_output: bool, optional
:param get_output: Wether to return output or not, defaults to True
:param get_output: bool, optional
:return: Return code and output
:rtype: tuple
"""
stdout = subprocess.PIPE if get_output or not log_output else sys.stdout
stderr = subprocess.PIPE if get_output or not log_output else sys.stderr
# execute
p = subprocess.Popen(
cmd, stdin=sys.stdin, stdout=stdout, stderr=stderr, shell=shell
)
# send to the correct output
out = out.decode("utf-8").strip() if out else ""
out += "\n"
out += err.decode("utf-8").strip()
out = out.strip()
if log_output:
log(out)
elif log_output:
sys.stdout.flush()
sys.stderr.flush()
return p.returncode, out
def check_cmd(cmd: Any, log_output: bool = False) -> int:
"""Get the return code of the given command.
:param cmd: Command to execute
:type cmd: Any
:param log_output: Wether to log output or not, defaults to False
:param log_output: bool, optional
:return: Return code
:rtype: int
"""
code, _ = exec_cmd(cmd, log_output, False)
return code
def load_conf() -> dict:
"""Load EnvSetup configuration settings.
:return: Configuration settings
:rtype: dict
"""
conf = {}
base_dir = get_dir(__file__)
(str(Path(base_dir, DEFAULT_CONF_PATH)), True),
(str(Path(base_dir, AUTO_CONF_PATH)), False),
(str(Path(base_dir, CONF_PATH)), False),
only_default = True
override = OrderedDict()
for path, is_default in files:
log(
"The configuration file '{}' does not exist.".format(path),
error=True,
)
with open(path, "r") as fo:
content = fo.read()
# Parse conf
for line in content.split("\n"):
if line and not line.startswith("#") and "=" in line:
name, *val = line.split("=")
name = name.strip(" \t'\"")
val = ("=".join(val)).strip(" \t'\"")
conf[name] = val
override[name] = False
else:
only_default = False
conf["_override"] = override
# Check a value to know if the config file has been changed
if only_default:
log("\033[93mWarning:\033[0m")
log("The configuration is using only default values.")
log("Perhaps you forget to change the configuration.")
log("Path of configuration file: %s" % str(Path(base_dir, CONF_PATH)))
log("Perhaps you want to quit this script to change the configuration?\n")
return conf
def get_conf(name: str, default: str = None) -> str:
"""Get the given configuration parameter.
:param name: Parameter name
:type name: str
:param default: Default parameter value, defaults to None
:param default: str, optional
:return: Parameter value
:rtype: str
"""
conf = load_conf()
return conf.get(name, default)
def set_conf(key: str, value: str, override: bool = False) -> bool:
"""Write the given configuration option in `conf.sh`.
:param key: Option name
:type key: str
:param value: Option value
:type value: str
:param override: Wether to override the option if it already exists, defaults to False
:param override: bool, optional
:return: True if the option have changed, False otherwise
:rtype: bool
"""
base_dir = Path(__file__).resolve().parent
conf_path = Path(base_dir, CONF_PATH).resolve()
# read conf.sh
with open(conf_path) as read_conf_fh:
conf = read_conf_fh.read()
# check if option already exists
regex = re.compile(r"^" + key.upper() + "=(.*)$", flags=re.M)
match = regex.search(conf)
if match and override:
conf = regex.sub("{}='{}'".format(key.upper(), str(value)), conf)
with open(conf_path, "w") as conf_fh:
conf_fh.write(conf)
return True
with open(conf_path, "a") as conf_fh:
conf_fh.write("\n{}='{}'\n".format(key.upper(), str(value)))
return True
else:
def run_commands(cmds: list):
"""Run a serie of successive commands.
:param cmds: List of commands
:type cmds: list
:raises Exception: Houston we have a problem
"""
try:
# Execute commands
for cmd in cmds:
if not isinstance(cmd, dict):
cmd = dict(line=cmd)
if cmd.get("cond"):
cond = cmd["cond"]
negate = cmd.get("cond_neg")
skip = cmd.get("cond_skip")
code = check_cmd(cond)
valid = code != 0 if negate else code == 0
if not valid:
msg = 'Condition for command "%s" not fullfilled.' % cmd["line"]
log("%s Command skipped." % msg)
if cmd["line"] == "write":
if not cmd.get("target"):
raise Exception("No target file to write in.")
if (
cmd.get("backup")
and Path(cmd["target"]).exists()
and not Path(cmd["target"] + ".back").exists()
Path(cmd["target"]).rename(Path(cmd["target"] + ".back"))
log("A backup file has been created for:\n%s" % cmd["target"])
content = cmd.get("content", "")
if cmd.get("template"):
raise Exception(
"Template file does not exist: %s." % cmd["template"]
)
with open(cmd["template"], "r") as fd:
if cmd.get("params"):
for k, v in cmd["params"]:
content = content.replace(k, v)
# Write target file
with open(cmd["target"], "w+") as fd:
log("File %s written" % cmd["target"])
elif cmd["line"] == "backup":
if not cmd.get("target"):
raise Exception("No target file to backup.")
if not Path(cmd["target"] + ".back").exists():
Path(cmd["target"]).rename(Path(cmd["target"] + ".back"))
log("A backup file has been created for:\n%s" % cmd["target"])
log("A backup file already exist for:\n%s" % cmd["target"])
log(">>> " + cmd["line"])
code = check_cmd(cmd["line"], log_output=True)
raise Exception("Command exited with code %s." % code)
log("Command failed:\n%s" % e)
def add_hosts_to_localhost(hosts: list):
"""Add a list of hosts to 127.0.0.1 in /etc/hosts.
:param hosts: List of commands
:type hosts: list
:raises Exception: Houston we have a problem
"""
if rc == 0 and hostname not in hosts:
hosts.insert(0, hostname)
content = fo.read()
new_content = list()
found_127 = False
for line in content.split("\n"):
if not found_127 and line.startswith("127.0.0.1"):
found_127 = True
for host in hosts:
if " " + host not in line:
line += " " + host
log("Adding host %s to /etc/hosts 127.0.0.1 aliases." % host)
new_content.append(line)
if not found_127:
new_content.append("127.0.0.1 %s" % " ".join(hosts))
new_content = "\n".join(new_content)
if new_content != content:
fo.write(new_content)
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
OPENSSL_CONFIG_TEMPLATE = """
[ req ]
prompt = no
default_bits = 4096
default_keyfile = envsetup.csr.pem
distinguished_name = subject
req_extensions = req_ext
x509_extensions = x509_ext
string_mask = utf8only
[ subject ]
C = FR
ST = IDF
L = Paris
O = UbiCast
CN = MediaServer
emailAddress = root@localhost
[ x509_ext ]
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid,issuer
basicConstraints = CA:FALSE
keyUsage = digitalSignature, keyEncipherment
subjectAltName = @alternate_names
nsComment = "OpenSSL Generated Certificate"
[ req_ext ]
subjectKeyIdentifier = hash
basicConstraints = CA:FALSE
keyUsage = digitalSignature, keyEncipherment
subjectAltName = @alternate_names
[ alternate_names ]
"""
def mkcert(
domains: list,
ecc: bool = True,
days: int = 3650,
config_tpl: str = OPENSSL_CONFIG_TEMPLATE,
):
"""Generate a self-signed certificate for the domains list.
:param domains: Domains for which the certificates will be self-signed
:type domains: list
:param ecc: Wether to use Elliptic Curve cryptography or not, defaults to True, if Fasle RSA is used
:param ecc: bool, optional
:param days: Validity lifetime of the certificate, defaults to 3650
:param days: int, optional
:param config_tpl: OpenSSL config file template, defaults to OPENSSL_CONFIG_TEMPLATE
:param config_tpl: str, optional
"""
# create certs dir
cert_dir = "/etc/ssl/envsetup"
Path(cert_dir).mkdir(mode=0o755, parents=True, exist_ok=True)
# populate template with domains
for i, domain in enumerate(domains, start=1):
config_tpl = config_tpl + "DNS.{} = {}\n".format(i, domain)
# write openssl config file
with open(cert_dir + "/conf", "w") as config_fh:
# key type: elliptic curve (default) or rsa
if ecc:
subprocess.check_call(
["openssl", "ecparam", "-name", "secp384r1", "-out", cert_dir + "/ecparam"]
)
subprocess.check_call(
[
"openssl",
"req",
"-config",
cert_dir + "/conf",
"-new",
"-x509",
"-sha256",
"-nodes",
"-newkey",
keytype,
"-keyout",
cert_dir + "/key.pem",
"-days",
str(days),
"-out",
cert_dir + "/cert.pem",
]
)
def get_ip() -> str:
"""Get the "primary" ip address, the one used by the default route.
:return: IP address
:rtype: str
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't have to be reachable
s.connect(("10.255.255.255", 1))
IP = s.getsockname()[0]
except Exception:
IP = "127.0.0.1"
finally:
s.close()
return IP