Skip to content
Snippets Groups Projects
utils.py 8.19 KiB
Newer Older
#!/usr/bin/env python3
from collections import OrderedDict
Stéphane Diemer's avatar
Stéphane Diemer committed
import os
import subprocess
import sys
from typing import Any

RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
DEF = "\033[0m"
Stéphane Diemer's avatar
Stéphane Diemer committed

DEFAULT_CONF_PATH = "global-conf.sh"
AUTO_CONF_PATH = "auto-generated-conf.sh"
CONF_PATH = "conf.sh"
def log(text: str, error: bool = False):
    """Output log message to stout or stderr.

    :param text: Message to log
    :type text: str
    :param error: Wether it should output to stderr or not, defaults to False
    :param error: bool, optional
    """
Stéphane Diemer's avatar
Stéphane Diemer committed

    fo = sys.stderr if error else sys.stdout
    print(text, file=fo)
    fo.flush()


def info(message: str):
    """Print formatted info message.

    :param message: Message to print
    :type message: str
    """

    log(" {}ⓘ{} {}".format(BLUE, DEF, message))


def success(message: str):
    """Print formatted success message.

    :param message: Message to print
    :type message: str
    """

    log(" {}✔{} {}".format(GREEN, DEF, message))


def warning(message: str):
    """Print formatted warning message.

    :param message: Message to print
    :type message: str
    """

    log(" {}⚠{} {}".format(YELLOW, DEF, message))


def error(message: str):
    """Print formatted error message.

    :param message: Message to print
    :type message: str
    """

    log(" {}✖{} {}".format(RED, DEF, message))


def get_dir(file_path: str) -> str:
    """Get the absolute directory path for the given file.

    :param file_path: File path
    :type file_path: str
    :return: Absolute directory path
    :rtype: str
    """

Stéphane Diemer's avatar
Stéphane Diemer committed
    return os.path.dirname(os.path.abspath(os.path.expanduser(file_path)))


def exec_cmd(cmd: Any, log_output: bool = True, get_output: bool = True) -> tuple:
    """Execute the given command.

    :param cmd: Command to run
    :type cmd: Any
    :param log_output: Wether to log output or not, defaults to True
    :param log_output: bool, optional
    :param get_output: Wether to return output or not, defaults to True
    :param get_output: bool, optional
    :return: Return code and output
    :rtype: tuple
    """

Stéphane Diemer's avatar
Stéphane Diemer committed
    shell = not isinstance(cmd, (tuple, list))
    stdout = subprocess.PIPE if get_output or not log_output else sys.stdout
    stderr = subprocess.PIPE if get_output or not log_output else sys.stderr

    # execute
    p = subprocess.Popen(
        cmd, stdin=sys.stdin, stdout=stdout, stderr=stderr, shell=shell
    )
Stéphane Diemer's avatar
Stéphane Diemer committed
    out, err = p.communicate()
        out = out.decode("utf-8").strip() if out else ""
                out += "\n"
            out += err.decode("utf-8").strip()
        out = out.strip()
        if log_output:
            log(out)
    elif log_output:
        sys.stdout.flush()
        sys.stderr.flush()
def check_cmd(cmd: Any, log_output: bool = False) -> int:
    """Get the return code of the given command.

    :param cmd: Command to execute
    :type cmd: Any
    :param log_output: Wether to log output or not, defaults to False
    :param log_output: bool, optional
    :return: Return code
    :rtype: int
    """

    code, _ = exec_cmd(cmd, log_output, False)

def load_conf() -> dict:
    """Load EnvSetup configuration settings.

    :return: Configuration settings
    :rtype: dict
    """

    conf = {}
    base_dir = get_dir(__file__)
        (os.path.join(base_dir, DEFAULT_CONF_PATH), True),
        (os.path.join(base_dir, AUTO_CONF_PATH), False),
        (os.path.join(base_dir, CONF_PATH), False),
    override = OrderedDict()
    for path, is_default in files:
        if not os.path.exists(path):
                log(
                    "The configuration file '{}' does not exist.".format(path),
                    error=True,
                )
            continue
        with open(path, "r") as fo:
            content = fo.read()
        # Parse conf
        for line in content.split("\n"):
            if line and not line.startswith("#") and "=" in line:
                name, *val = line.split("=")
                name = name.strip(" \t'\"")
                val = ("=".join(val)).strip(" \t'\"")
                conf[name] = val
                    override[name] = False
                    override[name] = True
    conf["_override"] = override
    # Check a value to know if the config file has been changed
    if only_default:
        log("\033[93mWarning:\033[0m")
        log("The configuration is using only default values.")
        log("Perhaps you forget to change the configuration.")
        log("Path of configuration file: %s" % os.path.join(base_dir, CONF_PATH))
        log("Perhaps you want to quit this script to change the configuration?\n")
    return conf


def get_conf(name: str, default: str = None) -> str:
    """Get the given configuration parameter.

    :param name: Parameter name
    :type name: str
    :param default: Default parameter value, defaults to None
    :param default: str, optional
    :return: Parameter value
    :rtype: str
    """

    conf = load_conf()

    return conf.get(name, default)
def run_commands(cmds: list):
    """Run a serie of successive commands.
    :param cmds: List of commands
    :type cmds: list
    :raises Exception: Houston we have a problem
    """
Stéphane Diemer's avatar
Stéphane Diemer committed

    try:
        # Execute commands
        for cmd in cmds:
            if not isinstance(cmd, dict):
                cmd = dict(line=cmd)
            if cmd.get("cond"):
                cond = cmd["cond"]
                negate = cmd.get("cond_neg")
                skip = cmd.get("cond_skip")
                valid = code != 0 if negate else code == 0
                if not valid:
                    msg = 'Condition for command "%s" not fullfilled.' % cmd["line"]
Stéphane Diemer's avatar
Stéphane Diemer committed
                    if skip:
                        log("%s Command skipped." % msg)
Stéphane Diemer's avatar
Stéphane Diemer committed
                        continue
                    raise Exception(msg)
            if cmd["line"] == "write":
                if not cmd.get("target"):
                    raise Exception("No target file to write in.")
                if (
                    cmd.get("backup")
                    and os.path.exists(cmd["target"])
                    and not os.path.exists(cmd["target"] + ".back")
                ):
                    os.rename(cmd["target"], cmd["target"] + ".back")
                    log("A backup file has been created for:\n%s" % cmd["target"])
Stéphane Diemer's avatar
Stéphane Diemer committed
                # Load content from template if any
                content = cmd.get("content", "")
                if cmd.get("template"):
                    if not os.path.exists(cmd["template"]):
                        raise Exception(
                            "Template file does not exist: %s." % cmd["template"]
                        )
                    with open(cmd["template"], "r") as fd:
Stéphane Diemer's avatar
Stéphane Diemer committed
                        content = fd.read()
                    if cmd.get("params"):
                        for k, v in cmd["params"]:
Stéphane Diemer's avatar
Stéphane Diemer committed
                            content = content.replace(k, v)
                # Write target file
                with open(cmd["target"], "w+") as fd:
Stéphane Diemer's avatar
Stéphane Diemer committed
                    fd.write(content)
                log("File %s written" % cmd["target"])
            elif cmd["line"] == "backup":
                if not cmd.get("target"):
                    raise Exception("No target file to backup.")
                if not os.path.exists(cmd["target"] + ".back"):
                    os.rename(cmd["target"], cmd["target"] + ".back")
                    log("A backup file has been created for:\n%s" % cmd["target"])
Stéphane Diemer's avatar
Stéphane Diemer committed
                else:
                    log("A backup file already exist for:\n%s" % cmd["target"])
Stéphane Diemer's avatar
Stéphane Diemer committed
            else:
                log(">>> " + cmd["line"])
                code = check_cmd(cmd["line"], log_output=True)
Stéphane Diemer's avatar
Stéphane Diemer committed
                if code != 0:
                    raise Exception("Command exited with code %s." % code)
Stéphane Diemer's avatar
Stéphane Diemer committed
    except Exception as e:
        log("Command failed:\n%s" % e)
Stéphane Diemer's avatar
Stéphane Diemer committed
        raise