Skip to content
Snippets Groups Projects
utils.py 13.7 KiB
Newer Older
#!/usr/bin/env python3
from collections import OrderedDict
from pathlib import Path
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
import socket
Stéphane Diemer's avatar
Stéphane Diemer committed
import subprocess
import sys
from typing import Any

RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
DEF = "\033[0m"
Stéphane Diemer's avatar
Stéphane Diemer committed

DEFAULT_CONF_PATH = "global-conf.sh"
AUTO_CONF_PATH = "auto-generated-conf.sh"
CONF_PATH = "conf.sh"
Stéphane Diemer's avatar
Stéphane Diemer committed

Stéphane Diemer's avatar
Stéphane Diemer committed

def log(text: str, error: bool = False):
    """Output log message to stout or stderr.

    :param text: Message to log
    :type text: str
    :param error: Wether it should output to stderr or not, defaults to False
Stéphane Diemer's avatar
Stéphane Diemer committed

    fo = sys.stderr if error else sys.stdout
    print(text, file=fo)
    fo.flush()


def info(message: str):
    """Print formatted info message.

    :param message: Message to print
    :type message: str
    """

    log(" {}ⓘ{} {}".format(BLUE, DEF, message))


def success(message: str):
    """Print formatted success message.

    :param message: Message to print
    :type message: str
    """

    log(" {}✔{} {}".format(GREEN, DEF, message))


def warning(message: str):
    """Print formatted warning message.

    :param message: Message to print
    :type message: str
    """

    log(" {}⚠{} {}".format(YELLOW, DEF, message), True)


def error(message: str):
    """Print formatted error message.

    :param message: Message to print
    :type message: str
    """

    log(" {}✖{} {}".format(RED, DEF, message), True)


def get_dir(file_path: str) -> str:
    """Get the absolute directory path for the given file.

    :param file_path: File path
    :type file_path: str
    :return: Absolute directory path
    :rtype: str
    """

    return str(Path(file_path).expanduser().resolve().parent)
def exec_cmd(cmd: Any, log_output: bool = True, get_output: bool = True) -> tuple:
    """Execute the given command.

    :param cmd: Command to run
    :type cmd: Any
    :param log_output: Wether to log output or not, defaults to True
    :param get_output: Wether to return output or not, defaults to True
    :return: Return code and output
    :rtype: tuple
    """

Stéphane Diemer's avatar
Stéphane Diemer committed
    shell = not isinstance(cmd, (tuple, list))
    stdout = subprocess.PIPE if get_output or not log_output else sys.stdout
    stderr = subprocess.PIPE if get_output or not log_output else sys.stderr

    # execute
    p = subprocess.Popen(
        cmd, stdin=sys.stdin, stdout=stdout, stderr=stderr, shell=shell
    )
Stéphane Diemer's avatar
Stéphane Diemer committed
    out, err = p.communicate()
        out = out.decode("utf-8").strip() if out else ""
                out += "\n"
            out += err.decode("utf-8").strip()
        out = out.strip()
        if log_output:
            log(out)
    elif log_output:
        sys.stdout.flush()
        sys.stderr.flush()
def check_cmd(cmd: Any, log_output: bool = False) -> int:
    """Get the return code of the given command.

    :param cmd: Command to execute
    :type cmd: Any
    :param log_output: Wether to log output or not, defaults to False
    :return: Return code
    :rtype: int
    """

    code, _ = exec_cmd(cmd, log_output, False)

def load_conf() -> dict:
    """Load EnvSetup configuration settings.

    :return: Configuration settings
    :rtype: dict
    """

    conf = {}
    base_dir = get_dir(__file__)
        (str(Path(base_dir, DEFAULT_CONF_PATH)), True),
        (str(Path(base_dir, AUTO_CONF_PATH)), False),
        (str(Path(base_dir, CONF_PATH)), False),
    override = OrderedDict()
    for path, is_default in files:
        if not Path(path).exists():
                log(
                    "The configuration file '{}' does not exist.".format(path),
                    error=True,
                )
            continue
        with open(path, "r") as fo:
            content = fo.read()
        # Parse conf
        for line in content.split("\n"):
            if line and not line.startswith("#") and "=" in line:
                name, *val = line.split("=")
                name = name.strip(" \t'\"")
                val = ("=".join(val)).strip(" \t'\"")
                conf[name] = val
                    override[name] = False
                    override[name] = True
    conf["_override"] = override
    # Check a value to know if the config file has been changed
    if only_default:
        log("\033[93mWarning:\033[0m")
        log("The configuration is using only default values.")
        log("Perhaps you forget to change the configuration.")
        log("Path of configuration file: %s" % str(Path(base_dir, CONF_PATH)))
        log("Perhaps you want to quit this script to change the configuration?\n")
    return conf


def get_conf(name: str, default: str = None) -> str:
    """Get the given configuration parameter.

    :param name: Parameter name
    :type name: str
    :param default: Default parameter value, defaults to None
    :return: Parameter value
    :rtype: str
    """

def set_conf(key: str, value: str, override: bool = False) -> bool:
    """Write the given configuration option in `conf.sh`.

    :param key: Option name
    :type key: str
    :param value: Option value
    :type value: str
    :param override: Wether to override the option if it already exists, defaults to False
    :return: True if the option have changed, False otherwise
    :rtype: bool
    """

    base_dir = Path(__file__).resolve().parent
    conf_path = Path(base_dir, CONF_PATH).resolve()
    # read conf.sh
    with open(conf_path) as read_conf_fh:
        conf = read_conf_fh.read()

    # check if option already exists
    regex = re.compile(r"^" + key.upper() + "=(.*)$", flags=re.M)
    match = regex.search(conf)
    if match and override:
        # override option
        conf = regex.sub("{}='{}'".format(key.upper(), str(value)), conf)
        with open(conf_path, "w") as conf_fh:
            conf_fh.write(conf)
    elif not match:
        with open(conf_path, "a") as conf_fh:
            conf_fh.write("\n{}='{}'\n".format(key.upper(), str(value)))
        # no match or no override
def run_commands(cmds: list):
    """Run a serie of successive commands.
    :param cmds: List of commands
    :type cmds: list
    :raises Exception: Houston we have a problem
    """
Stéphane Diemer's avatar
Stéphane Diemer committed

    try:
        # Execute commands
        for cmd in cmds:
            if not isinstance(cmd, dict):
                cmd = dict(line=cmd)
            if cmd.get("cond"):
                cond = cmd["cond"]
                negate = cmd.get("cond_neg")
                skip = cmd.get("cond_skip")
                valid = code != 0 if negate else code == 0
                if not valid:
                    msg = 'Condition for command "%s" not fullfilled.' % cmd["line"]
Stéphane Diemer's avatar
Stéphane Diemer committed
                    if skip:
                        log("%s Command skipped." % msg)
Stéphane Diemer's avatar
Stéphane Diemer committed
                        continue
                    raise Exception(msg)
            if cmd["line"] == "write":
                if not cmd.get("target"):
                    raise Exception("No target file to write in.")
                if (
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
                    cmd.get("backup")
                    and Path(cmd["target"]).exists()
                    and not Path(cmd["target"] + ".back").exists()
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
                    Path(cmd["target"]).rename(Path(cmd["target"] + ".back"))
                    log("A backup file has been created for:\n%s" % cmd["target"])
Stéphane Diemer's avatar
Stéphane Diemer committed
                # Load content from template if any
                content = cmd.get("content", "")
                if cmd.get("template"):
                    if not Path(cmd["template"]).exists():
                        raise Exception(
                            "Template file does not exist: %s." % cmd["template"]
                        )
                    with open(cmd["template"], "r") as fd:
Stéphane Diemer's avatar
Stéphane Diemer committed
                        content = fd.read()
                    if cmd.get("params"):
                        for k, v in cmd["params"]:
Stéphane Diemer's avatar
Stéphane Diemer committed
                            content = content.replace(k, v)
                # Write target file
                with open(cmd["target"], "w+") as fd:
Stéphane Diemer's avatar
Stéphane Diemer committed
                    fd.write(content)
                log("File %s written" % cmd["target"])
            elif cmd["line"] == "backup":
                if not cmd.get("target"):
                    raise Exception("No target file to backup.")
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
                if not Path(cmd["target"] + ".back").exists():
                    Path(cmd["target"]).rename(Path(cmd["target"] + ".back"))
                    log("A backup file has been created for:\n%s" % cmd["target"])
Stéphane Diemer's avatar
Stéphane Diemer committed
                else:
                    log("A backup file already exist for:\n%s" % cmd["target"])
Stéphane Diemer's avatar
Stéphane Diemer committed
            else:
                log(">>> " + cmd["line"])
                code = check_cmd(cmd["line"], log_output=True)
Stéphane Diemer's avatar
Stéphane Diemer committed
                if code != 0:
                    raise Exception("Command exited with code %s." % code)
Stéphane Diemer's avatar
Stéphane Diemer committed
    except Exception as e:
        log("Command failed:\n%s" % e)
Stéphane Diemer's avatar
Stéphane Diemer committed
        raise


def add_hosts_to_localhost(hosts: list):
    """Add a list of hosts to 127.0.0.1 in /etc/hosts.

    :param hosts: List of commands
    :type hosts: list
    :raises Exception: Houston we have a problem
    """

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    rc, hostname = exec_cmd("hostname")
    if rc == 0 and hostname not in hosts:
        hosts.insert(0, hostname)
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    with open("/etc/hosts", "r") as fo:
        content = fo.read()
    new_content = list()
    found_127 = False
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    for line in content.split("\n"):
        if not found_127 and line.startswith("127.0.0.1"):
            found_127 = True
            for host in hosts:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
                if " " + host not in line:
                    line += " " + host
                    log("Adding host %s to /etc/hosts 127.0.0.1 aliases." % host)
        new_content.append(line)
    if not found_127:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        new_content.append("127.0.0.1 %s" % " ".join(hosts))
    new_content = "\n".join(new_content)
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        with open("/etc/hosts", "w") as fo:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        log("/etc/hosts updated.")
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        log("/etc/hosts is already up to date.")
Stéphane Diemer's avatar
Stéphane Diemer committed

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
OPENSSL_CONFIG_TEMPLATE = """
[ req ]

prompt             = no
default_bits       = 4096
default_keyfile    = envsetup.csr.pem
distinguished_name = subject
req_extensions     = req_ext
x509_extensions    = x509_ext
string_mask        = utf8only

[ subject ]

C            = FR
ST           = IDF
L            = Paris
O            = UbiCast
CN           = MediaServer
emailAddress = root@localhost

[ x509_ext ]

subjectKeyIdentifier   = hash
authorityKeyIdentifier = keyid,issuer
basicConstraints       = CA:FALSE
keyUsage               = digitalSignature, keyEncipherment
subjectAltName         = @alternate_names
nsComment              = "OpenSSL Generated Certificate"

[ req_ext ]

subjectKeyIdentifier = hash
basicConstraints     = CA:FALSE
keyUsage             = digitalSignature, keyEncipherment
subjectAltName       = @alternate_names

[ alternate_names ]

"""

Stéphane Diemer's avatar
Stéphane Diemer committed

Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
def mkcert(
    domains: list,
    ecc: bool = True,
    days: int = 3650,
    config_tpl: str = OPENSSL_CONFIG_TEMPLATE,
):
    """Generate a self-signed certificate for the domains list.

    :param domains: Domains for which the certificates will be self-signed
    :type domains: list
    :param ecc: Wether to use Elliptic Curve cryptography or not, defaults to True, if Fasle RSA is used
    :param days: Validity lifetime of the certificate, defaults to 3650
    :param config_tpl: OpenSSL config file template, defaults to OPENSSL_CONFIG_TEMPLATE
    """

    # create certs dir
    cert_dir = "/etc/ssl/envsetup"
    Path(cert_dir).mkdir(mode=0o755, parents=True, exist_ok=True)
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    # populate template with domains
    for i, domain in enumerate(domains, start=1):
        config_tpl = config_tpl + "DNS.{} = {}\n".format(i, domain)
    # write openssl config file
    with open(cert_dir + "/conf", "w") as config_fh:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        config_fh.write(config_tpl)
    # key type: elliptic curve (default) or rsa
    if ecc:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        subprocess.check_call(
            ["openssl", "ecparam", "-name", "secp384r1", "-out", cert_dir + "/ecparam"]
        )
Stéphane Diemer's avatar
Stéphane Diemer committed
        keytype = "ec:" + cert_dir + "/ecparam"
    else:
        keytype = "rsa"
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    # execute openssl to generate keypair
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
    subprocess.check_call(
        [
            "openssl",
            "req",
            "-config",
            cert_dir + "/conf",
            "-new",
            "-x509",
            "-sha256",
            "-nodes",
            "-newkey",
            keytype,
            "-keyout",
            cert_dir + "/key.pem",
            "-days",
            str(days),
            "-out",
            cert_dir + "/cert.pem",
        ]
    )


def get_ip() -> str:
    """Get the "primary" ip address, the one used by the default route.

    :return: IP address
    :rtype: str
    """

    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # doesn't have to be reachable
        s.connect(("10.255.255.255", 1))
        IP = s.getsockname()[0]
    except Exception:
        IP = "127.0.0.1"
    finally:
        s.close()

    return IP