Newer
Older
"""EnvSetup utilities."""
from collections import OrderedDict
from typing import Any
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
DEF = "\033[0m"
DEFAULT_CONF_PATH = "global-conf.sh"
AUTO_CONF_PATH = "auto-generated-conf.sh"
CONF_PATH = "conf.sh"
def log(text: str, error: bool = False):
"""Output log message to stout or stderr.
:param text: Message to log
:type text: str
:param error: Wether it should output to stderr or not, defaults to False
:param error: bool, optional
"""
fo = sys.stderr if error else sys.stdout
print(text, file=fo)
fo.flush()
def info(message: str):
"""Print formatted info message.
:param message: Message to print
:type message: str
"""
log(" {}ⓘ{} {}".format(BLUE, DEF, message))
def success(message: str):
"""Print formatted success message.
:param message: Message to print
:type message: str
"""
log(" {}✔{} {}".format(GREEN, DEF, message))
def warning(message: str):
"""Print formatted warning message.
:param message: Message to print
:type message: str
"""
log(" {}⚠{} {}".format(YELLOW, DEF, message))
def error(message: str):
"""Print formatted error message.
:param message: Message to print
:type message: str
"""
log(" {}✖{} {}".format(RED, DEF, message))
def get_dir(file_path: str) -> str:
"""Get the absolute directory path for the given file.
:param file_path: File path
:type file_path: str
:return: Absolute directory path
:rtype: str
"""
return os.path.dirname(os.path.abspath(os.path.expanduser(file_path)))
def exec_cmd(cmd: Any, log_output: bool = True, get_output: bool = True) -> tuple:
"""Execute the given command.
:param cmd: Command to run
:type cmd: Any
:param log_output: Wether to log output or not, defaults to True
:param log_output: bool, optional
:param get_output: Wether to return output or not, defaults to True
:param get_output: bool, optional
:return: Return code and output
:rtype: tuple
"""
stdout = subprocess.PIPE if get_output or not log_output else sys.stdout
stderr = subprocess.PIPE if get_output or not log_output else sys.stderr
# execute
p = subprocess.Popen(
cmd, stdin=sys.stdin, stdout=stdout, stderr=stderr, shell=shell
)
# send to the correct output
out = out.decode("utf-8").strip() if out else ""
out += "\n"
out += err.decode("utf-8").strip()
out = out.strip()
if log_output:
log(out)
elif log_output:
sys.stdout.flush()
sys.stderr.flush()
return p.returncode, out
def check_cmd(cmd: Any, log_output: bool = False) -> int:
"""Get the return code of the given command.
:param cmd: Command to execute
:type cmd: Any
:param log_output: Wether to log output or not, defaults to False
:param log_output: bool, optional
:return: Return code
:rtype: int
"""
code, _ = exec_cmd(cmd, log_output, False)
return code
def load_conf() -> dict:
"""Load EnvSetup configuration settings.
:return: Configuration settings
:rtype: dict
"""
conf = {}
base_dir = get_dir(__file__)
(os.path.join(base_dir, DEFAULT_CONF_PATH), True),
(os.path.join(base_dir, AUTO_CONF_PATH), False),
(os.path.join(base_dir, CONF_PATH), False),
only_default = True
override = OrderedDict()
for path, is_default in files:
if not os.path.exists(path):
log(
"The configuration file '{}' does not exist.".format(path),
error=True,
)
with open(path, "r") as fo:
content = fo.read()
# Parse conf
for line in content.split("\n"):
if line and not line.startswith("#") and "=" in line:
name, *val = line.split("=")
name = name.strip(" \t'\"")
val = ("=".join(val)).strip(" \t'\"")
conf[name] = val
override[name] = False
else:
only_default = False
conf["_override"] = override
# Check a value to know if the config file has been changed
if only_default:
log("\033[93mWarning:\033[0m")
log("The configuration is using only default values.")
log("Perhaps you forget to change the configuration.")
log("Path of configuration file: %s" % os.path.join(base_dir, CONF_PATH))
log("Perhaps you want to quit this script to change the configuration?\n")
return conf
def get_conf(name: str, default: str = None) -> str:
"""Get the given configuration parameter.
:param name: Parameter name
:type name: str
:param default: Default parameter value, defaults to None
:param default: str, optional
:return: Parameter value
:rtype: str
"""
conf = load_conf()
return conf.get(name, default)
def run_commands(cmds: list):
"""Run a serie of successive commands.
:param cmds: List of commands
:type cmds: list
:raises Exception: Houston we have a problem
"""
try:
# Execute commands
for cmd in cmds:
if not isinstance(cmd, dict):
cmd = dict(line=cmd)
if cmd.get("cond"):
cond = cmd["cond"]
negate = cmd.get("cond_neg")
skip = cmd.get("cond_skip")
code = check_cmd(cond)
valid = code != 0 if negate else code == 0
if not valid:
msg = 'Condition for command "%s" not fullfilled.' % cmd["line"]
log("%s Command skipped." % msg)
if cmd["line"] == "write":
if not cmd.get("target"):
raise Exception("No target file to write in.")
if (
cmd.get("backup") and
os.path.exists(cmd["target"]) and
not os.path.exists(cmd["target"] + ".back")
):
os.rename(cmd["target"], cmd["target"] + ".back")
log("A backup file has been created for:\n%s" % cmd["target"])
content = cmd.get("content", "")
if cmd.get("template"):
if not os.path.exists(cmd["template"]):
raise Exception(
"Template file does not exist: %s." % cmd["template"]
)
with open(cmd["template"], "r") as fd:
if cmd.get("params"):
for k, v in cmd["params"]:
content = content.replace(k, v)
# Write target file
with open(cmd["target"], "w+") as fd:
log("File %s written" % cmd["target"])
elif cmd["line"] == "backup":
if not cmd.get("target"):
raise Exception("No target file to backup.")
if not os.path.exists(cmd["target"] + ".back"):
os.rename(cmd["target"], cmd["target"] + ".back")
log("A backup file has been created for:\n%s" % cmd["target"])
log("A backup file already exist for:\n%s" % cmd["target"])
log(">>> " + cmd["line"])
code = check_cmd(cmd["line"], log_output=True)
raise Exception("Command exited with code %s." % code)
log("Command failed:\n%s" % e)
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def add_hosts_to_localhost(hosts: list):
"""Add a list of hosts to 127.0.0.1 in /etc/hosts.
:param hosts: List of commands
:type hosts: list
:raises Exception: Houston we have a problem
"""
rc, hostname = exec_cmd('hostname')
if rc == 0 and hostname not in hosts:
hosts.insert(0, hostname)
with open('/etc/hosts', 'r') as fo:
content = fo.read()
new_content = list()
found_127 = False
for line in content.split('\n'):
if not found_127 and line.startswith('127.0.0.1'):
found_127 = True
for host in hosts:
if ' ' + host not in line:
line += ' ' + host
log('Adding host %s to /etc/hosts 127.0.0.1 aliases.' % host)
new_content.append(line)
if not found_127:
new_content.append('127.0.0.1 %s' % ' '.join(hosts))
new_content = '\n'.join(new_content)
if new_content != content:
with open('/etc/hosts', 'w') as fo:
fo.write(new_content)
log('/etc/hosts updated.')
else:
log('/etc/hosts is already up to date.')