Skip to content
Snippets Groups Projects
commands.py 4.57 KiB
Newer Older
#!/usr/bin/env python3

"""EnvSetup commands utilities."""

from pathlib import Path
import subprocess
import sys
from typing import Any

from .logging import log


def exec_cmd(cmd: Any, log_output: bool = True, get_output: bool = True) -> tuple:
    """Execute the given command.

    :param cmd: Command to run
    :type cmd: Any
    :param log_output: Wether to log output or not, defaults to True
    :type log_output: bool, optional
    :param get_output: Wether to return output or not, defaults to True
    :type get_output: bool, optional
    :return: Return code and output
    :rtype: tuple
    """

    shell = not isinstance(cmd, (tuple, list))
    stdout = subprocess.PIPE if get_output or not log_output else sys.stdout
    stderr = subprocess.PIPE if get_output or not log_output else sys.stderr

    # execute
    p = subprocess.Popen(
        cmd, stdin=sys.stdin, stdout=stdout, stderr=stderr, shell=shell
    )
    out, err = p.communicate()

    # send to the correct output
    if get_output:
        out = out.decode("utf-8").strip() if out else ""
        if err:
            if out:
                out += "\n"
            out += err.decode("utf-8").strip()
        out = out.strip()
        if log_output:
            log(out)
    elif log_output:
        sys.stdout.flush()
        sys.stderr.flush()

    return p.returncode, out


def check_cmd(cmd: Any, log_output: bool = False) -> int:
    """Get the return code of the given command.

    :param cmd: Command to execute
    :type cmd: Any
    :param log_output: Wether to log output or not, defaults to False
    :type log_output: bool, optional
    :return: Return code
    :rtype: int
    """

    code, _ = exec_cmd(cmd, log_output, False)

    return code


def run_commands(cmds: list):
    """Run a serie of successive commands.

    :param cmds: List of commands
    :type cmds: list
    :raises Exception: Houston we have a problem
    """

    try:
        # Execute commands
        for cmd in cmds:
            if not isinstance(cmd, dict):
                cmd = dict(line=cmd)
            if cmd.get("cond"):
                cond = cmd["cond"]
                negate = cmd.get("cond_neg")
                skip = cmd.get("cond_skip")
                code = check_cmd(cond)
                valid = code != 0 if negate else code == 0
                if not valid:
                    msg = 'Condition for command "%s" not fullfilled.' % cmd["line"]
                    if skip:
                        log("%s Command skipped." % msg)
                        continue
                    raise Exception(msg)
            if cmd["line"] == "write":
                if not cmd.get("target"):
                    raise Exception("No target file to write in.")
                if (
                    cmd.get("backup")
                    and Path(cmd["target"]).exists()
                    and not Path(cmd["target"] + ".back").exists()
                ):
                    Path(cmd["target"]).rename(Path(cmd["target"] + ".back"))
                    log("A backup file has been created for:\n%s" % cmd["target"])
                # Load content from template if any
                content = cmd.get("content", "")
                if cmd.get("template"):
                    if not Path(cmd["template"]).exists():
                        raise Exception(
                            "Template file does not exist: %s." % cmd["template"]
                        )
                    with open(cmd["template"], "r") as fd:
                        content = fd.read()
                    if cmd.get("params"):
                        for k, v in cmd["params"]:
                            content = content.replace(k, v)
                # Write target file
                with open(cmd["target"], "w+") as fd:
                    fd.write(content)
                log("File %s written" % cmd["target"])
            elif cmd["line"] == "backup":
                if not cmd.get("target"):
                    raise Exception("No target file to backup.")
                if not Path(cmd["target"] + ".back").exists():
                    Path(cmd["target"]).rename(Path(cmd["target"] + ".back"))
                    log("A backup file has been created for:\n%s" % cmd["target"])
                else:
                    log("A backup file already exist for:\n%s" % cmd["target"])
            else:
                log(">>> " + cmd["line"])
                code = check_cmd(cmd["line"], log_output=True)
                if code != 0:
                    raise Exception("Command exited with code %s." % code)
    except Exception as e:
        log("Command failed:\n%s" % e)
        raise