#!/usr/bin/env python3 """EnvSetup utilities.""" from collections import OrderedDict from pathlib import Path import re import socket import subprocess import sys from typing import Any RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" BLUE = "\033[94m" DEF = "\033[0m" DEFAULT_CONF_PATH = "global-conf.sh" AUTO_CONF_PATH = "auto-generated-conf.sh" CONF_PATH = "conf.sh" _conf_cache = None def log(text: str, error: bool = False): """Output log message to stout or stderr. :param text: Message to log :type text: str :param error: Wether it should output to stderr or not, defaults to False :type error: bool, optional """ fo = sys.stderr if error else sys.stdout print(text, file=fo) fo.flush() def info(message: str): """Print formatted info message. :param message: Message to print :type message: str """ log(" {}ⓘ{} {}".format(BLUE, DEF, message)) def success(message: str): """Print formatted success message. :param message: Message to print :type message: str """ log(" {}✔{} {}".format(GREEN, DEF, message)) def warning(message: str): """Print formatted warning message. :param message: Message to print :type message: str """ log(" {}⚠{} {}".format(YELLOW, DEF, message), True) def error(message: str): """Print formatted error message. :param message: Message to print :type message: str """ log(" {}✖{} {}".format(RED, DEF, message), True) def get_dir(file_path: str) -> str: """Get the absolute directory path for the given file. :param file_path: File path :type file_path: str :return: Absolute directory path :rtype: str """ return str(Path(file_path).expanduser().resolve().parent) def exec_cmd(cmd: Any, log_output: bool = True, get_output: bool = True) -> tuple: """Execute the given command. :param cmd: Command to run :type cmd: Any :param log_output: Wether to log output or not, defaults to True :type log_output: bool, optional :param get_output: Wether to return output or not, defaults to True :type get_output: bool, optional :return: Return code and output :rtype: tuple """ shell = not isinstance(cmd, (tuple, list)) stdout = subprocess.PIPE if get_output or not log_output else sys.stdout stderr = subprocess.PIPE if get_output or not log_output else sys.stderr # execute p = subprocess.Popen( cmd, stdin=sys.stdin, stdout=stdout, stderr=stderr, shell=shell ) out, err = p.communicate() # send to the correct output if get_output: out = out.decode("utf-8").strip() if out else "" if err: if out: out += "\n" out += err.decode("utf-8").strip() out = out.strip() if log_output: log(out) elif log_output: sys.stdout.flush() sys.stderr.flush() return p.returncode, out def check_cmd(cmd: Any, log_output: bool = False) -> int: """Get the return code of the given command. :param cmd: Command to execute :type cmd: Any :param log_output: Wether to log output or not, defaults to False :type log_output: bool, optional :return: Return code :rtype: int """ code, _ = exec_cmd(cmd, log_output, False) return code def load_conf() -> dict: """Load EnvSetup configuration settings. :return: Configuration settings :rtype: dict """ conf = {} base_dir = get_dir(__file__) files = ( (str(Path(base_dir, DEFAULT_CONF_PATH)), True), (str(Path(base_dir, AUTO_CONF_PATH)), False), (str(Path(base_dir, CONF_PATH)), False), ) only_default = True override = OrderedDict() for path, is_default in files: if not Path(path).exists(): if is_default: log( "The configuration file '{}' does not exist.".format(path), error=True, ) return dict() continue # Load conf with open(path, "r") as fo: content = fo.read() # Parse conf for line in content.split("\n"): line = line.strip() if line and not line.startswith("#") and "=" in line: name, *val = line.split("=") name = name.strip(" \t'\"") val = ("=".join(val)).strip(" \t'\"") conf[name] = val if is_default: override[name] = False else: only_default = False override[name] = True conf["_override"] = override # Check a value to know if the config file has been changed if only_default: log("\033[93mWarning:\033[0m") log("The configuration is using only default values.") log("Perhaps you forget to change the configuration.") log("Path of configuration file: %s" % str(Path(base_dir, CONF_PATH))) log("Perhaps you want to quit this script to change the configuration?\n") global _conf_cache _conf_cache = conf return conf def get_conf(name: str, default: str = None) -> str: """Get the given configuration parameter. :param name: Parameter name :type name: str :param default: Default parameter value, defaults to None :type default: str, optional :return: Parameter value :rtype: str """ global _conf_cache if _conf_cache is None: load_conf() return _conf_cache.get(name, default) def set_conf(key: str, value: str, override: bool = False) -> bool: """Write the given configuration option in `conf.sh`. :param key: Option name :type key: str :param value: Option value :type value: str :param override: Wether to override the option if it already exists, defaults to False :type override: bool, optional :return: True if the option have changed, False otherwise :rtype: bool """ base_dir = Path(__file__).resolve().parent conf_path = Path(base_dir, CONF_PATH).resolve() # read conf.sh with open(conf_path) as read_conf_fh: conf = read_conf_fh.read() # check if option already exists regex = re.compile(r"^" + key.upper() + "=(.*)$", flags=re.M) match = regex.search(conf) if match and override: # override option conf = regex.sub("{}='{}'".format(key.upper(), str(value)), conf) with open(conf_path, "w") as conf_fh: conf_fh.write(conf) success = True elif not match: # add option with open(conf_path, "a") as conf_fh: conf_fh.write("\n{}='{}'\n".format(key.upper(), str(value))) success = True else: # no match or no override success = False # reload conf load_conf() return success def run_commands(cmds: list): """Run a serie of successive commands. :param cmds: List of commands :type cmds: list :raises Exception: Houston we have a problem """ try: # Execute commands for cmd in cmds: if not isinstance(cmd, dict): cmd = dict(line=cmd) if cmd.get("cond"): cond = cmd["cond"] negate = cmd.get("cond_neg") skip = cmd.get("cond_skip") code = check_cmd(cond) valid = code != 0 if negate else code == 0 if not valid: msg = 'Condition for command "%s" not fullfilled.' % cmd["line"] if skip: log("%s Command skipped." % msg) continue raise Exception(msg) if cmd["line"] == "write": if not cmd.get("target"): raise Exception("No target file to write in.") if ( cmd.get("backup") and Path(cmd["target"]).exists() and not Path(cmd["target"] + ".back").exists() ): Path(cmd["target"]).rename(Path(cmd["target"] + ".back")) log("A backup file has been created for:\n%s" % cmd["target"]) # Load content from template if any content = cmd.get("content", "") if cmd.get("template"): if not Path(cmd["template"]).exists(): raise Exception( "Template file does not exist: %s." % cmd["template"] ) with open(cmd["template"], "r") as fd: content = fd.read() if cmd.get("params"): for k, v in cmd["params"]: content = content.replace(k, v) # Write target file with open(cmd["target"], "w+") as fd: fd.write(content) log("File %s written" % cmd["target"]) elif cmd["line"] == "backup": if not cmd.get("target"): raise Exception("No target file to backup.") if not Path(cmd["target"] + ".back").exists(): Path(cmd["target"]).rename(Path(cmd["target"] + ".back")) log("A backup file has been created for:\n%s" % cmd["target"]) else: log("A backup file already exist for:\n%s" % cmd["target"]) else: log(">>> " + cmd["line"]) code = check_cmd(cmd["line"], log_output=True) if code != 0: raise Exception("Command exited with code %s." % code) except Exception as e: log("Command failed:\n%s" % e) raise def add_hosts_to_localhost(hosts: list): """Add a list of hosts to 127.0.0.1 in /etc/hosts. :param hosts: List of commands :type hosts: list :raises Exception: Houston we have a problem """ rc, hostname = exec_cmd("hostname") if rc == 0 and hostname not in hosts: hosts.insert(0, hostname) with open("/etc/hosts", "r") as fo: content = fo.read() new_content = list() found_127 = False for line in content.split("\n"): if not found_127 and line.startswith("127.0.0.1"): found_127 = True for host in hosts: if " " + host not in line: line += " " + host log("Adding host %s to /etc/hosts 127.0.0.1 aliases." % host) new_content.append(line) if not found_127: new_content.append("127.0.0.1 %s" % " ".join(hosts)) new_content = "\n".join(new_content) if new_content != content: with open("/etc/hosts", "w") as fo: fo.write(new_content) log("/etc/hosts updated.") else: log("/etc/hosts is already up to date.") OPENSSL_CONFIG_TEMPLATE = """ [ req ] prompt = no default_bits = 4096 default_keyfile = envsetup.csr.pem distinguished_name = subject req_extensions = req_ext x509_extensions = x509_ext string_mask = utf8only [ subject ] C = FR ST = IDF L = Paris O = UbiCast CN = MediaServer emailAddress = root@localhost [ x509_ext ] subjectKeyIdentifier = hash authorityKeyIdentifier = keyid,issuer basicConstraints = CA:FALSE keyUsage = digitalSignature, keyEncipherment subjectAltName = @alternate_names nsComment = "OpenSSL Generated Certificate" [ req_ext ] subjectKeyIdentifier = hash basicConstraints = CA:FALSE keyUsage = digitalSignature, keyEncipherment subjectAltName = @alternate_names [ alternate_names ] """ def mkcert( domains: list, ecc: bool = True, days: int = 3650, config_tpl: str = OPENSSL_CONFIG_TEMPLATE, ): """Generate a self-signed certificate for the domains list. :param domains: Domains for which the certificates will be self-signed :type domains: list :param ecc: Wether to use Elliptic Curve cryptography or not, defaults to True, if Fasle RSA is used :type ecc: bool, optional :param days: Validity lifetime of the certificate, defaults to 3650 :type days: int, optional :param config_tpl: OpenSSL config file template, defaults to OPENSSL_CONFIG_TEMPLATE :type config_tpl: str, optional """ # create certs dir cert_dir = "/etc/ssl/envsetup" Path(cert_dir).mkdir(mode=0o755, parents=True, exist_ok=True) # populate template with domains for i, domain in enumerate(domains, start=1): config_tpl = config_tpl + "DNS.{} = {}\n".format(i, domain) # write openssl config file with open(cert_dir + "/conf", "w") as config_fh: config_fh.write(config_tpl) # key type: elliptic curve (default) or rsa if ecc: subprocess.check_call( ["openssl", "ecparam", "-name", "secp384r1", "-out", cert_dir + "/ecparam"] ) keytype = "ec:" + cert_dir + "/ecparam" else: keytype = "rsa" # execute openssl to generate keypair subprocess.check_call( [ "openssl", "req", "-config", cert_dir + "/conf", "-new", "-x509", "-sha256", "-nodes", "-newkey", keytype, "-keyout", cert_dir + "/key.pem", "-days", str(days), "-out", cert_dir + "/cert.pem", ] ) def get_ip() -> str: """Get the "primary" ip address, the one used by the default route. :return: IP address :rtype: str """ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: # doesn't have to be reachable s.connect(("10.255.255.255", 1)) IP = s.getsockname()[0] except Exception: IP = "127.0.0.1" finally: s.close() return IP