#!/usr/bin/env python3 """EnvSetup commands utilities.""" from pathlib import Path import subprocess import sys from typing import Any from .logging import log def exec_cmd(cmd: Any, log_output: bool = True, get_output: bool = True) -> tuple: """Execute the given command. :param cmd: Command to run :type cmd: Any :param log_output: Wether to log output or not, defaults to True :type log_output: bool, optional :param get_output: Wether to return output or not, defaults to True :type get_output: bool, optional :return: Return code and output :rtype: tuple """ shell = not isinstance(cmd, (tuple, list)) stdout = subprocess.PIPE if get_output or not log_output else sys.stdout stderr = subprocess.PIPE if get_output or not log_output else sys.stderr # execute p = subprocess.Popen( cmd, stdin=sys.stdin, stdout=stdout, stderr=stderr, shell=shell ) out, err = p.communicate() # send to the correct output if get_output: out = out.decode("utf-8").strip() if out else "" if err: if out: out += "\n" out += err.decode("utf-8").strip() out = out.strip() if log_output: log(out) elif log_output: sys.stdout.flush() sys.stderr.flush() return p.returncode, out def check_cmd(cmd: Any, log_output: bool = False) -> int: """Get the return code of the given command. :param cmd: Command to execute :type cmd: Any :param log_output: Wether to log output or not, defaults to False :type log_output: bool, optional :return: Return code :rtype: int """ code, _ = exec_cmd(cmd, log_output, False) return code def run_commands(cmds: list): """Run a serie of successive commands. :param cmds: List of commands :type cmds: list :raises Exception: Houston we have a problem """ try: # Execute commands for cmd in cmds: if not isinstance(cmd, dict): cmd = dict(line=cmd) if cmd.get("cond"): cond = cmd["cond"] negate = cmd.get("cond_neg") skip = cmd.get("cond_skip") code = check_cmd(cond) valid = code != 0 if negate else code == 0 if not valid: msg = 'Condition for command "%s" not fullfilled.' % cmd["line"] if skip: log("%s Command skipped." % msg) continue raise Exception(msg) if cmd["line"] == "write": if not cmd.get("target"): raise Exception("No target file to write in.") if ( cmd.get("backup") and Path(cmd["target"]).exists() and not Path(cmd["target"] + ".back").exists() ): Path(cmd["target"]).rename(Path(cmd["target"] + ".back")) log("A backup file has been created for:\n%s" % cmd["target"]) # Load content from template if any content = cmd.get("content", "") if cmd.get("template"): if not Path(cmd["template"]).exists(): raise Exception( "Template file does not exist: %s." % cmd["template"] ) with open(cmd["template"], "r") as fd: content = fd.read() if cmd.get("params"): for k, v in cmd["params"]: content = content.replace(k, v) # Write target file with open(cmd["target"], "w+") as fd: fd.write(content) log("File %s written" % cmd["target"]) elif cmd["line"] == "backup": if not cmd.get("target"): raise Exception("No target file to backup.") if not Path(cmd["target"] + ".back").exists(): Path(cmd["target"]).rename(Path(cmd["target"] + ".back")) log("A backup file has been created for:\n%s" % cmd["target"]) else: log("A backup file already exist for:\n%s" % cmd["target"]) else: log(">>> " + cmd["line"]) code = check_cmd(cmd["line"], log_output=True) if code != 0: raise Exception("Command exited with code %s." % code) except Exception as e: log("Command failed:\n%s" % e) raise