Skip to content
Snippets Groups Projects
Commit c32f9657 authored by Nicolas KAROLAK's avatar Nicolas KAROLAK
Browse files

change(tests): formatting and functions documentation

parent 595578e7
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*-
''' """
Criticality: High Criticality: High
Check that emails can be sent. Check that emails can be sent.
''' """
import os import os
import subprocess import subprocess
import sys import sys
import random import random
import time import time
import imp
# install spf lib if not present
try: try:
import spf import spf
except ImportError: except ImportError:
subprocess.check_call(['apt-get', '-qq', '-y', 'install', 'python3-spf']) subprocess.check_call(["apt-get", "-qq", "-y", "install", "python3-spf"])
import spf import spf
if subprocess.call(['which', 'netstat']) != 0:
subprocess.check_call(['apt-get', '-qq', '-y', 'install', 'net-tools'])
YELLOW = '\033[93m' # install netstat if not present
GREEN = '\033[92m' if subprocess.call(["which", "netstat"]) != 0:
RED = '\033[91m' subprocess.check_call(["apt-get", "-qq", "-y", "install", "net-tools"])
DEF = '\033[0m'
os.chdir(os.path.dirname(__file__)) sys.path.append("..")
if not os.path.isfile('../utils.py'):
print('The envsetup configuration was not found.')
sys.exit(1)
else:
es_utils = imp.load_source('es_utils', '../utils.py')
conf = es_utils.load_conf()
# pylint: disable=wrong-import-position
from envsetup import utils as u # noqa: E402
def print_color(txt, col):
print('%s%s%s' % (col, txt, DEF))
def check_listen() -> int:
"""Check that Postfix is listening on 127.0.0.1:25.
def print_yellow(txt): :return: Exit return code
print_color(txt, YELLOW) :rtype: int
"""
print("Checking that postfix is listening locally:")
def print_red(txt): # get listening state from netstat
print_color(txt, RED) status, out = subprocess.getstatusoutput("netstat -pant | grep master | grep ':25'")
if status != 0 or "127.0.0.1:25" not in out:
u.error("Postfix is not listening on 127.0.0.1:25")
return 1
def print_green(txt): u.success("Postfix is listening on 127.0.0.1:25")
print_color(txt, GREEN)
def check_listening_port():
# check that postfix listens the port 25 correctly
status, out = subprocess.getstatusoutput('netstat -pant | grep master | grep ":25"')
if status != 0:
print_red('The port 25 is not listened by postfix "master" process.')
return 1
print_green('Postfix "master" process is listening port 25 correctly.')
if '127.0.0.1:25' not in out:
print_red('Postfix "master" process is not listening address 127.0.0.1, please check postfix configuration.')
print('Postfix should listen address 127.0.0.1 to be sure that this server cannot be used as an SMTP relay by external services.')
return 1
print_green('Postfix "master" process is listening address 127.0.0.1 correctly.')
return 0 return 0
def check_relay(): def check_relay(conf: dict) -> int:
print('Checking if SMTP relay conforms to conf.') """Check that Postfix is not an open relay.
status, out = subprocess.getstatusoutput('grep relayhost /etc/postfix/main.cf')
:param conf: EnvSetup configuration settings
:type conf: dict
:return: Exit return code
:rtype: int
"""
print("Checking that SMTP relay conforms to conf:")
# get relayhost value from Postfix config
status, out = subprocess.getstatusoutput("grep relayhost /etc/postfix/main.cf")
if status == 0: if status == 0:
configured_relay = out[len('relayhost'):].strip(' \t=').replace('[', '').replace(']', '') configured_relay = (
out[len("relayhost") :].strip(" \t=").replace("[", "").replace("]", "")
)
else: else:
configured_relay = '' configured_relay = ""
if not configured_relay: if not configured_relay:
# no relay configured, check relayless situations # check public ip address
ip = conf.get('NETWORK_IP_NAT') ip_addr = conf.get("NETWORK_IP_NAT")
if not ip: if not ip_addr:
ip = conf.get('NETWORK_IP') ip_addr = conf.get("NETWORK_IP")
if not ip: if not ip_addr:
print_yellow('Cannot determine public IP address') u.warning("Cannot determine public IP address")
return 3 return 3
with open('/etc/mailname', 'r') as f: # check domain origin
d = f.read().strip() with open("/etc/mailname", "r") as mailname:
if d not in ('ubicast.tv', 'ubicast.eu'): data = mailname.read().strip()
print_yellow('/etc/mailname does not contain ubicast.eu or ubicast.tv, mails will probably not be received on ubicast mailing lists') if data not in ("ubicast.tv", "ubicast.eu"):
u.warning("/etc/mailname does not contain ubicast.eu or ubicast.tv")
return 3 return 3
# check spf # check spf
result, explain = spf.check2(i=ip, s='support@ubicast.eu', h='') result, _ = spf.check2(i=ip_addr, s="support@ubicast.eu", h="")
if result != 'pass': if result != "pass":
print_red('ip %s is not in ubicast.eu SPF and emails sent from support@ubicast.eu may be treated as spam' % ip) u.error("SPF record for {} in ubicast.eu is missing".format(ip_addr))
return 3 return 3
conf_relay = conf.get('EMAIL_SMTP_SERVER', '').replace('[', '').replace(']', '')
# get relayhost value from envsetup config
conf_relay = conf.get("EMAIL_SMTP_SERVER", "").replace("[", "").replace("]", "")
if conf_relay != configured_relay: if conf_relay != configured_relay:
print_red('Configured STMP relay (%s) does not match the expected value (%s).' % (configured_relay, conf_relay)) u.error("STMP relay must be {}".format(conf_relay))
return 3 return 3
else:
print_green('STMP relay is properly set.')
return 0
u.success("STMP relay is properly set")
return 0
def check_send_test_email() -> int:
"""Check that Postfix can send email.
:return: Exit return code
:rtype: int
"""
print("Checking Postfix can send email:")
def send_test_email(): # send email
email = 'noreply+%s-%s@ubicast.eu' % (time.time(), random.randint(0, 1000)) email = "noreply+{}-{}@ubicast.eu".format(time.time(), random.randint(0, 1000))
print('Sending test email to "%s".' % email) u.info("Sending test email to '{}'".format(email))
cmd = 'echo "This is a test email" | mail -s "Test email from `cat /etc/hostname`" %s' % email cmd = "echo 'test email' | mail -s 'test email' {}".format(email)
subprocess.getstatusoutput(cmd) subprocess.getstatusoutput(cmd)
# init vars
timeout = 120 timeout = 120
waited = 1 waited = 1
delay = 2 delay = 2
print('Timeout to find email sending log is set to %s seconds.' % timeout) out = ""
out = ''
if os.path.isfile('/var/log/mail.log'): # find logs
cmd = 'grep "%s" /var/log/mail.log' % email if os.path.isfile("/var/log/mail.log"):
cmd = "grep '{}' /var/log/mail.log".format(email)
else: else:
print('/var/log/mail.log not found, trying journalctl') u.info("/var/log/mail.log not found, trying journalctl")
cmd = 'journalctl -u postfix | grep %s' % email cmd = "journalctl -u postfix | grep {}".format(email)
# logs polling
time.sleep(1) time.sleep(1)
while True: while True:
status, out = subprocess.getstatusoutput(cmd) status, out = subprocess.getstatusoutput(cmd)
# found
if status == 0: if status == 0:
out = out.strip().split('\n')[-1] out = out.strip().split("\n")[-1]
if 'status=deferred' not in out: if "status=deferred" not in out:
print('Log entry found after %s seconds.' % waited) u.info("Log entry found after {} seconds".format(waited))
break break
# timeout
if waited >= timeout: if waited >= timeout:
print_red('Failed to send email.') u.error("Failed to send email.")
print('No log entry found after %s seconds using command: %s' % (waited, cmd)) u.info(
"No log entry found after {} seconds using command: {}".format(
waited, cmd
)
)
return 1 return 1
print('No log entry found after %s seconds, waiting %s more seconds...' % (waited, delay)) # not found yet
u.info(
"No log entry found after {} seconds, waiting {} more seconds...".format(
waited, delay
)
)
# wait before next iteration
time.sleep(delay) time.sleep(delay)
waited += delay waited += delay
delay *= 2 delay *= 2
if 'bounced' not in out or 'The email account that you tried to reach does not exist.' in out:
print_green('Email sent.') # check output for errors
return 0 if "bounced" in out or "you tried to reach does not exist" in out:
else: u.error("Failed to send email")
print_red('Failed to send email.') u.info("Sending log line:\n{}".format(out))
print('Sending log line:\n%s' % out)
return 1 return 1
u.success("Email sent.")
return 0
def main():
"""Run all checks and exits with corresponding exit code."""
if not os.path.exists("/etc/postfix"):
u.error("Postfix dir does not exists, please install postfix")
sys.exit(1)
conf = u.load_conf()
rcode = check_listen()
if rcode == 0:
rcode = check_relay(conf)
if check_send_test_email() == 1:
rcode = 1
if not os.path.exists('/etc/postfix'): sys.exit(rcode)
print_red('Postfix dir does not exists, please install postfix.')
sys.exit(1)
rc = check_listening_port()
if rc == 0:
rc = check_relay()
if send_test_email() == 1:
rc = 1
sys.exit(rc) if __name__ == "__main__":
main()
...@@ -4,8 +4,6 @@ Criticality: High ...@@ -4,8 +4,6 @@ Criticality: High
This test check the current state of the PostgreSQL database cluster. This test check the current state of the PostgreSQL database cluster.
""" """
import imp
import os
import re import re
import socket import socket
import subprocess import subprocess
...@@ -13,45 +11,15 @@ import sys ...@@ -13,45 +11,15 @@ import sys
import time import time
import uuid import uuid
sys.path.append("..")
try: try:
import psycopg2 import psycopg2
except ImportError: except ImportError:
sys.exit(2) sys.exit(2)
GREEN = "\033[92m" # pylint: disable=wrong-import-position
YELLOW = "\033[93m" from envsetup import utils as u # noqa: E402
RED = "\033[91m"
DEF = "\033[0m"
def success(message: str):
"""Print formatted success message.
:param message: Message to print
:type message: str
"""
print(" {}✔{} {}".format(GREEN, DEF, message))
def warning(message: str):
"""Print formatted warning message.
:param message: Message to print
:type message: str
"""
print(" {}⚠{} {}".format(YELLOW, DEF, message))
def error(message: str):
"""Print formatted error message.
:param message: Message to print
:type message: str
"""
print(" {}✖{} {}".format(RED, DEF, message))
def is_ha(port: int) -> bool: def is_ha(port: int) -> bool:
...@@ -254,7 +222,6 @@ def check_replication(primary: dict, standby: dict) -> tuple: ...@@ -254,7 +222,6 @@ def check_replication(primary: dict, standby: dict) -> tuple:
rand = uuid.uuid4().hex rand = uuid.uuid4().hex
write_query = "CREATE TABLE es_test_{} (id serial PRIMARY KEY);".format(rand) write_query = "CREATE TABLE es_test_{} (id serial PRIMARY KEY);".format(rand)
read_query = "SELECT * FROM es_test_{};".format(rand) read_query = "SELECT * FROM es_test_{};".format(rand)
del_query = "DROP TABLE es_test_{};".format(rand)
# write # write
try: try:
...@@ -281,7 +248,7 @@ def check_replication(primary: dict, standby: dict) -> tuple: ...@@ -281,7 +248,7 @@ def check_replication(primary: dict, standby: dict) -> tuple:
# delete # delete
try: try:
primary_psql.execute(del_query) primary_psql.execute("DROP TABLE es_test_{};".format(rand))
except psycopg2.Error: except psycopg2.Error:
pass pass
...@@ -314,15 +281,15 @@ def check_ha(db_conn: dict, errors: int = 0) -> int: ...@@ -314,15 +281,15 @@ def check_ha(db_conn: dict, errors: int = 0) -> int:
# check haproxy # check haproxy
print("Checking local HAProxy frontends:") print("Checking local HAProxy frontends:")
if not check_listen(db_conn["host"], 54321): if not check_listen(db_conn["host"], 54321):
error("HAProxy pgsql-primary frontend is not listening") u.error("HAProxy pgsql-primary frontend is not listening")
errors += 1 errors += 1
else: else:
success("HAProxy pgsql-primary frontend is listening") u.success("HAProxy pgsql-primary frontend is listening")
if not check_listen(db_conn["host"], 54322): if not check_listen(db_conn["host"], 54322):
error("HAProxy pgsql-standby frontend is not listening") u.error("HAProxy pgsql-standby frontend is not listening")
errors += 1 errors += 1
else: else:
success("HAProxy pgsql-standby frontend is listening") u.success("HAProxy pgsql-standby frontend is listening")
# check remotes # check remotes
print("Checking remote PostgreSQL nodes:") print("Checking remote PostgreSQL nodes:")
...@@ -330,19 +297,19 @@ def check_ha(db_conn: dict, errors: int = 0) -> int: ...@@ -330,19 +297,19 @@ def check_ha(db_conn: dict, errors: int = 0) -> int:
node_host = nodes[node]["host"] node_host = nodes[node]["host"]
node_port = nodes[node]["port"] node_port = nodes[node]["port"]
if not check_listen(node_host, node_port): if not check_listen(node_host, node_port):
error("Cannot bind {}:{}".format(node_host, node_port)) u.error("Cannot bind {}:{}".format(node_host, node_port))
errors += 1 errors += 1
else: else:
success("Can bind {}:{}".format(node_host, node_port)) u.success("Can bind {}:{}".format(node_host, node_port))
# check fenced # check fenced
print("Checking cluster state:") print("Checking cluster state:")
fenced, node = check_fenced(nodes) fenced, node = check_fenced(nodes)
if fenced: if fenced:
error("Node `{}` is fenced".format(node)) u.error("Node `{}` is fenced".format(node))
errors += 1 errors += 1
else: else:
success("No fenced node found") u.success("No fenced node found")
# check replication # check replication
print("Checking replication state:") print("Checking replication state:")
...@@ -352,10 +319,10 @@ def check_ha(db_conn: dict, errors: int = 0) -> int: ...@@ -352,10 +319,10 @@ def check_ha(db_conn: dict, errors: int = 0) -> int:
standby["port"] = 54322 standby["port"] = 54322
status, info = check_replication(primary, standby) status, info = check_replication(primary, standby)
if not status: if not status:
error("Cannot replicate between primary/standby ({})".format(info)) u.error("Cannot replicate between primary/standby ({})".format(info))
errors += 1 errors += 1
else: else:
success("Can replicate between primary/standby ({})".format(info)) u.success("Can replicate between primary/standby ({})".format(info))
return errors return errors
...@@ -371,27 +338,27 @@ def check_local(db_conn: dict, errors: int = 0) -> int: ...@@ -371,27 +338,27 @@ def check_local(db_conn: dict, errors: int = 0) -> int:
:rtype: int :rtype: int
""" """
db_host = db_conn["host"] host = db_conn["host"]
db_port = db_conn["port"] port = db_conn["port"]
db_user = db_conn["user"] user = db_conn["user"]
# check listen # check listen
print("Checking local PostgreSQL node:") print("Checking local PostgreSQL node:")
if not check_listen(db_host, db_port): if not check_listen(host, port):
error("Cannot connect to {}:{}".format(db_host, db_port)) u.error("Cannot connect to {}:{}".format(host, port))
errors += 1 errors += 1
else: else:
success("Can connect to {}:{}".format(db_host, db_port)) u.success("Can connect to {}:{}".format(host, port))
# check read # check read
print("Checking read operation:") print("Checking read operation:")
read_query = "SELECT 1;" read_query = "SELECT 1;"
status, info = check_psql(db_conn, read_query) status, info = check_psql(db_conn, read_query)
if not status: if not status:
error("Cannot read from {}@{}:{} ({})".format(db_user, db_host, db_port, info)) u.error("Cannot read from {}@{}:{} ({})".format(user, host, port, info))
errors += 1 errors += 1
else: else:
success("Can read from {}@{}:{}".format(db_user, db_host, db_port)) u.success("Can read from {}@{}:{}".format(user, host, port))
# check write # check write
print("Checking write operation:") print("Checking write operation:")
...@@ -399,10 +366,10 @@ def check_local(db_conn: dict, errors: int = 0) -> int: ...@@ -399,10 +366,10 @@ def check_local(db_conn: dict, errors: int = 0) -> int:
write_query = "CREATE TABLE es_test_{} (id serial PRIMARY KEY);".format(rand) write_query = "CREATE TABLE es_test_{} (id serial PRIMARY KEY);".format(rand)
status, info = check_psql(db_conn, write_query) status, info = check_psql(db_conn, write_query)
if not status: if not status:
error("Cannot write on {}@{}:{} ({})".format(db_user, db_host, db_port, info)) u.error("Cannot write on {}@{}:{} ({})".format(user, host, port, info))
errors += 1 errors += 1
else: else:
success("Can write on {}@{}:{}".format(db_user, db_host, db_port)) u.success("Can write on {}@{}:{}".format(user, host, port))
# remove test table # remove test table
check_psql(db_conn, "DROP TABLE es_test_{};".format(rand)) check_psql(db_conn, "DROP TABLE es_test_{};".format(rand))
...@@ -412,20 +379,8 @@ def check_local(db_conn: dict, errors: int = 0) -> int: ...@@ -412,20 +379,8 @@ def check_local(db_conn: dict, errors: int = 0) -> int:
def main(): def main():
"""Run all checks and exits with corresponding exit code.""" """Run all checks and exits with corresponding exit code."""
# envsetup utils path
cwd = os.path.dirname(__file__)
utils = os.path.join(cwd, "..", "utils.py")
# check envsetup utils presence
if not os.path.isfile(utils):
error("{} not found.".format(utils))
sys.exit(1)
# load envsetup utils
es_utils = imp.load_source("es_utils", utils)
# load configuration # load configuration
conf = es_utils.load_conf() conf = u.load_conf()
# get database configuration # get database configuration
db_host = conf.get("DB_HOST") if conf.get("DB_HOST") else "127.0.0.1" db_host = conf.get("DB_HOST") if conf.get("DB_HOST") else "127.0.0.1"
......
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""EnvSetup utilities."""
from collections import OrderedDict from collections import OrderedDict
import os import os
import subprocess import subprocess
import sys import sys
from typing import Any
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
DEF = "\033[0m"
DEFAULT_CONF_PATH = "global-conf.sh"
AUTO_CONF_PATH = "auto-generated-conf.sh"
CONF_PATH = "conf.sh"
DEFAULT_CONF_PATH = 'global-conf.sh'
AUTO_CONF_PATH = 'auto-generated-conf.sh'
CONF_PATH = 'conf.sh'
CONF = dict()
def log(text: str, error: bool = False):
"""Output log message to stout or stderr.
:param text: Message to log
:type text: str
:param error: Wether it should output to stderr or not, defaults to False
:param error: bool, optional
"""
def log(text, error=False):
fo = sys.stderr if error else sys.stdout fo = sys.stderr if error else sys.stdout
print(text, file=fo) print(text, file=fo)
fo.flush() fo.flush()
def get_dir(file_path): def info(message: str):
"""Print formatted info message.
:param message: Message to print
:type message: str
"""
print(" {}🛈{} {}".format(GREEN, DEF, message))
def success(message: str):
"""Print formatted success message.
:param message: Message to print
:type message: str
"""
print(" {}✔{} {}".format(GREEN, DEF, message))
def warning(message: str):
"""Print formatted warning message.
:param message: Message to print
:type message: str
"""
print(" {}⚠{} {}".format(YELLOW, DEF, message))
def error(message: str):
"""Print formatted error message.
:param message: Message to print
:type message: str
"""
print(" {}✖{} {}".format(RED, DEF, message))
def get_dir(file_path: str) -> str:
"""Get the absolute directory path for the given file.
:param file_path: File path
:type file_path: str
:return: Absolute directory path
:rtype: str
"""
return os.path.dirname(os.path.abspath(os.path.expanduser(file_path))) return os.path.dirname(os.path.abspath(os.path.expanduser(file_path)))
def exec_cmd(cmd, log_output=True, get_output=True): def exec_cmd(cmd: Any, log_output: bool = True, get_output: bool = True) -> tuple:
"""Execute the given command.
:param cmd: Command to run
:type cmd: Any
:param log_output: Wether to log output or not, defaults to True
:param log_output: bool, optional
:param get_output: Wether to return output or not, defaults to True
:param get_output: bool, optional
:return: Return code and output
:rtype: tuple
"""
shell = not isinstance(cmd, (tuple, list)) shell = not isinstance(cmd, (tuple, list))
stdout = subprocess.PIPE if get_output or not log_output else sys.stdout stdout = subprocess.PIPE if get_output or not log_output else sys.stdout
stderr = subprocess.PIPE if get_output or not log_output else sys.stderr stderr = subprocess.PIPE if get_output or not log_output else sys.stderr
p = subprocess.Popen(cmd, stdin=sys.stdin, stdout=stdout, stderr=stderr, shell=shell)
# execute
p = subprocess.Popen(
cmd, stdin=sys.stdin, stdout=stdout, stderr=stderr, shell=shell
)
out, err = p.communicate() out, err = p.communicate()
# send to the correct output
if get_output: if get_output:
out = out.decode('utf-8').strip() if out else '' out = out.decode("utf-8").strip() if out else ""
if err: if err:
if out: if out:
out += '\n' out += "\n"
out += err.decode('utf-8').strip() out += err.decode("utf-8").strip()
out = out.strip() out = out.strip()
if log_output: if log_output:
log(out) log(out)
elif log_output: elif log_output:
sys.stdout.flush() sys.stdout.flush()
sys.stderr.flush() sys.stderr.flush()
return p.returncode, out return p.returncode, out
def check_cmd(cmd, log_output=False): def check_cmd(cmd: Any, log_output: bool = False) -> int:
code, out = exec_cmd(cmd, log_output, False) """Get the return code of the given command.
:param cmd: Command to execute
:type cmd: Any
:param log_output: Wether to log output or not, defaults to False
:param log_output: bool, optional
:return: Return code
:rtype: int
"""
code, _ = exec_cmd(cmd, log_output, False)
return code return code
def load_conf(): def load_conf() -> dict:
"""Load EnvSetup configuration settings.
:return: Configuration settings
:rtype: dict
"""
conf = {}
base_dir = get_dir(__file__) base_dir = get_dir(__file__)
files = ( files = (
(os.path.join(base_dir, DEFAULT_CONF_PATH), True), (os.path.join(base_dir, DEFAULT_CONF_PATH), True),
...@@ -60,94 +160,119 @@ def load_conf(): ...@@ -60,94 +160,119 @@ def load_conf():
for path, is_default in files: for path, is_default in files:
if not os.path.exists(path): if not os.path.exists(path):
if is_default: if is_default:
log('The configuration file for EnvSetup script does not exist.\nPath of configuration file: %s' % path, error=True) log(
"The configuration file '{}' does not exist.".format(path),
error=True,
)
return dict() return dict()
continue continue
# Load conf # Load conf
with open(path, 'r') as fo: with open(path, "r") as fo:
content = fo.read() content = fo.read()
# Parse conf # Parse conf
for line in content.split('\n'): for line in content.split("\n"):
line = line.strip() line = line.strip()
if line and not line.startswith('#') and '=' in line: if line and not line.startswith("#") and "=" in line:
name, *val = line.split('=') name, *val = line.split("=")
name = name.strip(' \t\'\"') name = name.strip(" \t'\"")
val = ('='.join(val)).strip(' \t\'\"') val = ("=".join(val)).strip(" \t'\"")
CONF[name] = val conf[name] = val
if is_default: if is_default:
override[name] = False override[name] = False
else: else:
only_default = False only_default = False
override[name] = True override[name] = True
CONF['_override'] = override conf["_override"] = override
# Check a value to know if the config file has been changed # Check a value to know if the config file has been changed
if only_default: if only_default:
log('\033[93mWarning:\033[0m') log("\033[93mWarning:\033[0m")
log('The configuration is using only default values.') log("The configuration is using only default values.")
log('Perhaps you forget to change the configuration.') log("Perhaps you forget to change the configuration.")
log('Path of configuration file: %s' % os.path.join(base_dir, CONF_PATH)) log("Path of configuration file: %s" % os.path.join(base_dir, CONF_PATH))
log('Perhaps you want to quit this script to change the configuration?\n') log("Perhaps you want to quit this script to change the configuration?\n")
return CONF return conf
def get_conf(name: str, default: str = None) -> str:
"""Get the given configuration parameter.
:param name: Parameter name
:type name: str
:param default: Default parameter value, defaults to None
:param default: str, optional
:return: Parameter value
:rtype: str
"""
conf = load_conf()
return conf.get(name, default)
def get_conf(name, default=None): def run_commands(cmds: list):
if not CONF: """Run a serie of successive commands.
load_conf()
return CONF.get(name, default)
:param cmds: List of commands
:type cmds: list
:raises Exception: Houston we have a problem
"""
def run_commands(cmds):
# run a serie of successive commands
try: try:
# Execute commands # Execute commands
for cmd in cmds: for cmd in cmds:
if not isinstance(cmd, dict): if not isinstance(cmd, dict):
cmd = dict(line=cmd) cmd = dict(line=cmd)
if cmd.get('cond'): if cmd.get("cond"):
cond = cmd['cond'] cond = cmd["cond"]
negate = cmd.get('cond_neg') negate = cmd.get("cond_neg")
skip = cmd.get('cond_skip') skip = cmd.get("cond_skip")
code = check_cmd(cond) code = check_cmd(cond)
success = code != 0 if negate else code == 0 valid = code != 0 if negate else code == 0
if not success: if not valid:
msg = 'Condition for command "%s" not fullfilled.' % cmd['line'] msg = 'Condition for command "%s" not fullfilled.' % cmd["line"]
if skip: if skip:
log('%s Command skipped.' % msg) log("%s Command skipped." % msg)
continue continue
raise Exception(msg) raise Exception(msg)
if cmd['line'] == 'write': if cmd["line"] == "write":
if not cmd.get('target'): if not cmd.get("target"):
raise Exception('No target file to write in.') raise Exception("No target file to write in.")
if cmd.get('backup') and os.path.exists(cmd['target']) and not os.path.exists(cmd['target'] + '.back'): if (
os.rename(cmd['target'], cmd['target'] + '.back') cmd.get("backup")
log('A backup file has been created for:\n%s' % cmd['target']) and os.path.exists(cmd["target"])
and not os.path.exists(cmd["target"] + ".back")
):
os.rename(cmd["target"], cmd["target"] + ".back")
log("A backup file has been created for:\n%s" % cmd["target"])
# Load content from template if any # Load content from template if any
content = cmd.get('content', '') content = cmd.get("content", "")
if cmd.get('template'): if cmd.get("template"):
if not os.path.exists(cmd['template']): if not os.path.exists(cmd["template"]):
raise Exception('Template file does not exist: %s.' % cmd['template']) raise Exception(
with open(cmd['template'], 'r') as fd: "Template file does not exist: %s." % cmd["template"]
)
with open(cmd["template"], "r") as fd:
content = fd.read() content = fd.read()
if cmd.get('params'): if cmd.get("params"):
for k, v in cmd['params']: for k, v in cmd["params"]:
content = content.replace(k, v) content = content.replace(k, v)
# Write target file # Write target file
with open(cmd['target'], 'w+') as fd: with open(cmd["target"], "w+") as fd:
fd.write(content) fd.write(content)
log('File %s written' % cmd['target']) log("File %s written" % cmd["target"])
elif cmd['line'] == 'backup': elif cmd["line"] == "backup":
if not cmd.get('target'): if not cmd.get("target"):
raise Exception('No target file to backup.') raise Exception("No target file to backup.")
if not os.path.exists(cmd['target'] + '.back'): if not os.path.exists(cmd["target"] + ".back"):
os.rename(cmd['target'], cmd['target'] + '.back') os.rename(cmd["target"], cmd["target"] + ".back")
log('A backup file has been created for:\n%s' % cmd['target']) log("A backup file has been created for:\n%s" % cmd["target"])
else: else:
log('A backup file already exist for:\n%s' % cmd['target']) log("A backup file already exist for:\n%s" % cmd["target"])
else: else:
log('>>> ' + cmd['line']) log(">>> " + cmd["line"])
code = check_cmd(cmd['line'], log_output=True) code = check_cmd(cmd["line"], log_output=True)
if code != 0: if code != 0:
raise Exception('Command exited with code %s.' % code) raise Exception("Command exited with code %s." % code)
except Exception as e: except Exception as e:
log('Command failed:\n%s' % e) log("Command failed:\n%s" % e)
raise raise
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment