Something went wrong on our end
-
Stéphane Diemer authoredStéphane Diemer authored
utils.py 13.05 KiB
#!/usr/bin/env python3
"""EnvSetup utilities."""
from collections import OrderedDict
import os
from pathlib import Path
import re
import subprocess
import sys
from typing import Any
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
DEF = "\033[0m"
DEFAULT_CONF_PATH = "global-conf.sh"
AUTO_CONF_PATH = "auto-generated-conf.sh"
CONF_PATH = "conf.sh"
def log(text: str, error: bool = False):
"""Output log message to stout or stderr.
:param text: Message to log
:type text: str
:param error: Wether it should output to stderr or not, defaults to False
:param error: bool, optional
"""
fo = sys.stderr if error else sys.stdout
print(text, file=fo)
fo.flush()
def info(message: str):
"""Print formatted info message.
:param message: Message to print
:type message: str
"""
log(" {}ⓘ{} {}".format(BLUE, DEF, message))
def success(message: str):
"""Print formatted success message.
:param message: Message to print
:type message: str
"""
log(" {}✔{} {}".format(GREEN, DEF, message))
def warning(message: str):
"""Print formatted warning message.
:param message: Message to print
:type message: str
"""
log(" {}⚠{} {}".format(YELLOW, DEF, message))
def error(message: str):
"""Print formatted error message.
:param message: Message to print
:type message: str
"""
log(" {}✖{} {}".format(RED, DEF, message))
def get_dir(file_path: str) -> str:
"""Get the absolute directory path for the given file.
:param file_path: File path
:type file_path: str
:return: Absolute directory path
:rtype: str
"""
return os.path.dirname(os.path.abspath(os.path.expanduser(file_path)))
def exec_cmd(cmd: Any, log_output: bool = True, get_output: bool = True) -> tuple:
"""Execute the given command.
:param cmd: Command to run
:type cmd: Any
:param log_output: Wether to log output or not, defaults to True
:param log_output: bool, optional
:param get_output: Wether to return output or not, defaults to True
:param get_output: bool, optional
:return: Return code and output
:rtype: tuple
"""
shell = not isinstance(cmd, (tuple, list))
stdout = subprocess.PIPE if get_output or not log_output else sys.stdout
stderr = subprocess.PIPE if get_output or not log_output else sys.stderr
# execute
p = subprocess.Popen(
cmd, stdin=sys.stdin, stdout=stdout, stderr=stderr, shell=shell
)
out, err = p.communicate()
# send to the correct output
if get_output:
out = out.decode("utf-8").strip() if out else ""
if err:
if out:
out += "\n"
out += err.decode("utf-8").strip()
out = out.strip()
if log_output:
log(out)
elif log_output:
sys.stdout.flush()
sys.stderr.flush()
return p.returncode, out
def check_cmd(cmd: Any, log_output: bool = False) -> int:
"""Get the return code of the given command.
:param cmd: Command to execute
:type cmd: Any
:param log_output: Wether to log output or not, defaults to False
:param log_output: bool, optional
:return: Return code
:rtype: int
"""
code, _ = exec_cmd(cmd, log_output, False)
return code
def load_conf() -> dict:
"""Load EnvSetup configuration settings.
:return: Configuration settings
:rtype: dict
"""
conf = {}
base_dir = get_dir(__file__)
files = (
(os.path.join(base_dir, DEFAULT_CONF_PATH), True),
(os.path.join(base_dir, AUTO_CONF_PATH), False),
(os.path.join(base_dir, CONF_PATH), False),
)
only_default = True
override = OrderedDict()
for path, is_default in files:
if not os.path.exists(path):
if is_default:
log(
"The configuration file '{}' does not exist.".format(path),
error=True,
)
return dict()
continue
# Load conf
with open(path, "r") as fo:
content = fo.read()
# Parse conf
for line in content.split("\n"):
line = line.strip()
if line and not line.startswith("#") and "=" in line:
name, *val = line.split("=")
name = name.strip(" \t'\"")
val = ("=".join(val)).strip(" \t'\"")
conf[name] = val
if is_default:
override[name] = False
else:
only_default = False
override[name] = True
conf["_override"] = override
# Check a value to know if the config file has been changed
if only_default:
log("\033[93mWarning:\033[0m")
log("The configuration is using only default values.")
log("Perhaps you forget to change the configuration.")
log("Path of configuration file: %s" % os.path.join(base_dir, CONF_PATH))
log("Perhaps you want to quit this script to change the configuration?\n")
return conf
def get_conf(name: str, default: str = None) -> str:
"""Get the given configuration parameter.
:param name: Parameter name
:type name: str
:param default: Default parameter value, defaults to None
:param default: str, optional
:return: Parameter value
:rtype: str
"""
conf = load_conf()
return conf.get(name, default)
def set_conf(key: str, value: str, override: bool = False) -> bool:
"""Write the given configuration option in `conf.sh`.
:param key: Option name
:type key: str
:param value: Option value
:type value: str
:param override: Wether to override the option if it already exists, defaults to False
:param override: bool, optional
:return: True if the option have changed, False otherwise
:rtype: bool
"""
base_dir = Path(__file__).resolve().parent
conf_path = Path(base_dir, CONF_PATH).resolve()
# read conf.sh
with open(conf_path) as read_conf_fh:
conf = read_conf_fh.read()
# check if option already exists
regex = re.compile(r"^" + key.upper() + "=(.*)$", flags=re.M)
match = regex.search(conf)
if match and override:
# override option
conf = regex.sub("{}='{}'".format(key.upper(), str(value)), conf)
with open(conf_path, "w") as conf_fh:
conf_fh.write(conf)
return True
elif not match:
# add option
with open(conf_path, "a") as conf_fh:
conf_fh.write("\n{}='{}'\n".format(key.upper(), str(value)))
return True
else:
# no match or no override
return False
def run_commands(cmds: list):
"""Run a serie of successive commands.
:param cmds: List of commands
:type cmds: list
:raises Exception: Houston we have a problem
"""
try:
# Execute commands
for cmd in cmds:
if not isinstance(cmd, dict):
cmd = dict(line=cmd)
if cmd.get("cond"):
cond = cmd["cond"]
negate = cmd.get("cond_neg")
skip = cmd.get("cond_skip")
code = check_cmd(cond)
valid = code != 0 if negate else code == 0
if not valid:
msg = 'Condition for command "%s" not fullfilled.' % cmd["line"]
if skip:
log("%s Command skipped." % msg)
continue
raise Exception(msg)
if cmd["line"] == "write":
if not cmd.get("target"):
raise Exception("No target file to write in.")
if (
cmd.get("backup") and
os.path.exists(cmd["target"]) and
not os.path.exists(cmd["target"] + ".back")
):
os.rename(cmd["target"], cmd["target"] + ".back")
log("A backup file has been created for:\n%s" % cmd["target"])
# Load content from template if any
content = cmd.get("content", "")
if cmd.get("template"):
if not os.path.exists(cmd["template"]):
raise Exception(
"Template file does not exist: %s." % cmd["template"]
)
with open(cmd["template"], "r") as fd:
content = fd.read()
if cmd.get("params"):
for k, v in cmd["params"]:
content = content.replace(k, v)
# Write target file
with open(cmd["target"], "w+") as fd:
fd.write(content)
log("File %s written" % cmd["target"])
elif cmd["line"] == "backup":
if not cmd.get("target"):
raise Exception("No target file to backup.")
if not os.path.exists(cmd["target"] + ".back"):
os.rename(cmd["target"], cmd["target"] + ".back")
log("A backup file has been created for:\n%s" % cmd["target"])
else:
log("A backup file already exist for:\n%s" % cmd["target"])
else:
log(">>> " + cmd["line"])
code = check_cmd(cmd["line"], log_output=True)
if code != 0:
raise Exception("Command exited with code %s." % code)
except Exception as e:
log("Command failed:\n%s" % e)
raise
def add_hosts_to_localhost(hosts: list):
"""Add a list of hosts to 127.0.0.1 in /etc/hosts.
:param hosts: List of commands
:type hosts: list
:raises Exception: Houston we have a problem
"""
rc, hostname = exec_cmd('hostname')
if rc == 0 and hostname not in hosts:
hosts.insert(0, hostname)
with open('/etc/hosts', 'r') as fo:
content = fo.read()
new_content = list()
found_127 = False
for line in content.split('\n'):
if not found_127 and line.startswith('127.0.0.1'):
found_127 = True
for host in hosts:
if ' ' + host not in line:
line += ' ' + host
log('Adding host %s to /etc/hosts 127.0.0.1 aliases.' % host)
new_content.append(line)
if not found_127:
new_content.append('127.0.0.1 %s' % ' '.join(hosts))
new_content = '\n'.join(new_content)
if new_content != content:
with open('/etc/hosts', 'w') as fo:
fo.write(new_content)
log('/etc/hosts updated.')
else:
log('/etc/hosts is already up to date.')
OPENSSL_CONFIG_TEMPLATE = """
[ req ]
prompt = no
default_bits = 4096
default_keyfile = envsetup.csr.pem
distinguished_name = subject
req_extensions = req_ext
x509_extensions = x509_ext
string_mask = utf8only
[ subject ]
C = FR
ST = IDF
L = Paris
O = UbiCast
CN = MediaServer
emailAddress = root@localhost
[ x509_ext ]
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid,issuer
basicConstraints = CA:FALSE
keyUsage = digitalSignature, keyEncipherment
subjectAltName = @alternate_names
nsComment = "OpenSSL Generated Certificate"
[ req_ext ]
subjectKeyIdentifier = hash
basicConstraints = CA:FALSE
keyUsage = digitalSignature, keyEncipherment
subjectAltName = @alternate_names
[ alternate_names ]
"""
def mkcert(domains: list, ecc: bool = True, days: int = 3650, config_tpl: str = OPENSSL_CONFIG_TEMPLATE):
"""Generate a self-signed certificate for the domains list.
:param domains: Domains for which the certificates will be self-signed
:type domains: list
:param ecc: Wether to use Elliptic Curve cryptography or not, defaults to True, if Fasle RSA is used
:param ecc: bool, optional
:param days: Validity lifetime of the certificate, defaults to 3650
:param days: int, optional
:param config_tpl: OpenSSL config file template, defaults to OPENSSL_CONFIG_TEMPLATE
:param config_tpl: str, optional
"""
# create certs dir
cert_dir = "/etc/ssl/envsetup"
Path(cert_dir).mkdir(mode=0o755, parents=True, exist_ok=True)
# populate template with domains
for i, domain in enumerate(domains, start=1):
config_tpl = config_tpl + "DNS.{} = {}\n".format(i, domain)
# write openssl config file
with open(cert_dir + "/conf", "w") as config_fh:
config_fh.write(config_tpl)
# key type: elliptic curve (default) or rsa
if ecc:
subprocess.check_call([
"openssl", "ecparam",
"-name", "secp384r1",
"-out", cert_dir + "/ecparam"
])
keytype = "ec:" + cert_dir + "/ecparam"
else:
keytype = "rsa"
# execute openssl to generate keypair
subprocess.check_call([
"openssl", "req",
"-config", cert_dir + "/conf",
"-new",
"-x509",
"-sha256",
"-nodes",
"-newkey", keytype,
"-keyout", cert_dir + "/key.pem",
"-days", str(days),
"-out", cert_dir + "/cert.pem"
])