Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
"""EnvSetup commands utilities."""
from pathlib import Path
import subprocess
import sys
from typing import Any
from .logging import log
def exec_cmd(cmd: Any, log_output: bool = True, get_output: bool = True) -> tuple:
"""Execute the given command.
:param cmd: Command to run
:type cmd: Any
:param log_output: Wether to log output or not, defaults to True
:type log_output: bool, optional
:param get_output: Wether to return output or not, defaults to True
:type get_output: bool, optional
:return: Return code and output
:rtype: tuple
"""
shell = not isinstance(cmd, (tuple, list))
stdout = subprocess.PIPE if get_output or not log_output else sys.stdout
stderr = subprocess.PIPE if get_output or not log_output else sys.stderr
# execute
p = subprocess.Popen(
cmd, stdin=sys.stdin, stdout=stdout, stderr=stderr, shell=shell
)
out, err = p.communicate()
# send to the correct output
if get_output:
out = out.decode("utf-8").strip() if out else ""
if err:
if out:
out += "\n"
out += err.decode("utf-8").strip()
out = out.strip()
if log_output:
log(out)
elif log_output:
sys.stdout.flush()
sys.stderr.flush()
return p.returncode, out
def check_cmd(cmd: Any, log_output: bool = False) -> int:
"""Get the return code of the given command.
:param cmd: Command to execute
:type cmd: Any
:param log_output: Wether to log output or not, defaults to False
:type log_output: bool, optional
:return: Return code
:rtype: int
"""
code, _ = exec_cmd(cmd, log_output, False)
return code
def run_commands(cmds: list):
"""Run a serie of successive commands.
:param cmds: List of commands
:type cmds: list
:raises Exception: Houston we have a problem
"""
try:
# Execute commands
for cmd in cmds:
if not isinstance(cmd, dict):
cmd = dict(line=cmd)
if cmd.get("cond"):
cond = cmd["cond"]
negate = cmd.get("cond_neg")
skip = cmd.get("cond_skip")
code = check_cmd(cond)
valid = code != 0 if negate else code == 0
if not valid:
msg = 'Condition for command "%s" not fullfilled.' % cmd["line"]
if skip:
log("%s Command skipped." % msg)
continue
raise Exception(msg)
if cmd["line"] == "write":
if not cmd.get("target"):
raise Exception("No target file to write in.")
if (
cmd.get("backup")
and Path(cmd["target"]).exists()
and not Path(cmd["target"] + ".back").exists()
):
Path(cmd["target"]).rename(Path(cmd["target"] + ".back"))
log("A backup file has been created for:\n%s" % cmd["target"])
# Load content from template if any
content = cmd.get("content", "")
if cmd.get("template"):
if not Path(cmd["template"]).exists():
raise Exception(
"Template file does not exist: %s." % cmd["template"]
)
with open(cmd["template"], "r") as fd:
content = fd.read()
if cmd.get("params"):
for k, v in cmd["params"]:
content = content.replace(k, v)
# Write target file
with open(cmd["target"], "w+") as fd:
fd.write(content)
log("File %s written" % cmd["target"])
elif cmd["line"] == "backup":
if not cmd.get("target"):
raise Exception("No target file to backup.")
if not Path(cmd["target"] + ".back").exists():
Path(cmd["target"]).rename(Path(cmd["target"] + ".back"))
log("A backup file has been created for:\n%s" % cmd["target"])
else:
log("A backup file already exist for:\n%s" % cmd["target"])
else:
log(">>> " + cmd["line"])
code = check_cmd(cmd["line"], log_output=True)
if code != 0:
raise Exception("Command exited with code %s." % code)
except Exception as e:
log("Command failed:\n%s" % e)
raise