#!/usr/bin/env python3 """This test check the current state of the PostgreSQL database cluster.""" import imp import os import re import socket import sys import time import psycopg2 GREEN = "\033[92m" YELLOW = "\033[93m" RED = "\033[91m" DEF = "\033[0m" def is_ha(port: int) -> bool: """Check wether this setup is using higlhy-available databases. :param port: Port number :type port: int :return: Wether it is a highly-available setup or not :rtype: bool """ return port == 54321 def get_haproxy_conf(path: str = "/etc/haproxy/haproxy.cfg") -> dict: """Get HAProxy configuration in a dictionary. :param path: HAProxy configuration file, defaults to "/etc/haproxy/haproxy.cfg" :type path: str :return: HAProxy configuration file content :rtype: dict """ # init configuration dictionary conf: dict = {} # load configuration file try: with open(path) as data: lines = data.readlines() except EnvironmentError: return conf # define patterns pattern_block = re.compile(r"^([a-zA-Z0-9_.-]+ *[a-zA-Z0-9_.-]+)") pattern_param = re.compile(r"^\s+([ /:()|a-zA-Z0-9_.-]+)") # parse configuration file for line in lines: match_block = pattern_block.match(line) if match_block: block = match_block.group(1) conf[block] = [] else: match_param = pattern_param.match(line) if match_param: param = match_param.group(1) conf[block].append(param) return conf def get_nodes(conf: dict) -> dict: """Get the list of nodes from HAProxy configuration. :param conf: The HAProxy configuration file content :type conf: dict :return: The list of nodes found in HAProxy configuration :rtype: dict """ servers: dict = {} for item in conf.keys(): if "pgsql-primary" in item: # filter `server` lines server_lines = [x for x in conf[item] if x.startswith("server ")] for line in server_lines: # split line elements = line.split() # get needed elements name = elements[1] address = elements[2].split(":") host = address[0] port = address[1] rephacheck = elements[7] # update dictionary servers.update( {name: {"host": host, "port": port, "rephacheck": rephacheck}} ) return servers def check_odd_number(number: int) -> bool: """Check if we have an odd number of nodes, ensuring we can have a quorum. :param number: The number of nodes in the cluster :type number: int :return: Wether it as an odd number or not :rtype: bool """ modulo = number % 2 return modulo != 0 def get_node_state(host: str, port: int) -> str: """Get the curent state of node from its RepHACheck daemon. :param node: The node's hostname or IP address :param port: The node's port on which RepHACheck is listening :type node: str :type port: int :return: The current state of the node accordind to its RepHACheck daemon :rtype: str """ # connect and get tcp stream data client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, port)) data = client.recv(1024) client.close() # extract string from data output state = data.decode("utf-8").rstrip() return state def check_primary(nodes: dict) -> tuple: """Check if we have a primary in the nodes. :param nodes: The dictionary containing nodes and their informations :type nodes: dict :return: Wether the nodes list contains a primary server :rtype: tuple """ for node in nodes.keys(): host = nodes[node]["address"] port = nodes[node]["rephacheck"] if get_node_state(host, port) == "primary": return True, node return False, None def check_standby(nodes: dict) -> tuple: """Check if we have a standby in the nodes. :param nodes: The dictionary containing nodes and their informations :type nodes: dict :return: Wether the nodes list contains a standby server :rtype: tuple """ for node in nodes.keys(): host = nodes[node]["address"] port = nodes[node]["rephacheck"] if get_node_state(host, port) == "standby": return True, node return False, None def check_witness(nodes: dict) -> tuple: """Check if we have a witness in the nodes. :param nodes: The dictionary containing nodes and their informations :type nodes: dict :return: Wether the nodes list contains a witness server :rtype: tuple """ for node in nodes.keys(): host = nodes[node]["address"] port = nodes[node]["rephacheck"] if get_node_state(host, port) == "witness": return True, node return False, None def check_fenced(nodes: dict) -> tuple: """Check if the cluster have a fenced node. :param nodes: The dictionary containing nodes and their informations :type nodes: dict :return: Wether the nodes list contains a fenced server :rtype: tuple """ for node in nodes.keys(): host = nodes[node]["address"] port = nodes[node]["rephacheck"] if get_node_state(host, port) == "fenced": return True, node return False, None # pylint: disable=bad-continuation def check_write( host: str, port: int, user: str, pswd: str, name: str = "postgres" ) -> bool: """Check if we can write data on this node. :param host: Database server's hostname or IP address :type host: str :param port: Database server's port :type port: int :param user: Database username :type user: str :param pswd: Database username's password :type pswd: str :param pswd: Database name :type pswd: str :return: Wether it is writeable or not :rtype: bool """ # connection try: client = psycopg2.connect( dbname=name, user=user, password=pswd, host=host, port=port ) except psycopg2.Error: print("{}Cannot connect to the database{}".format(RED, DEF)) return False # query try: psql = client.cursor() psql.execute("CREATE TABLE es_test (id serial PRIMARY KEY);") psql.execute("DROP TABLE es_test;") except psycopg2.Error: return False # close psql.close() client.close() return True # pylint: disable=bad-continuation def check_read( host: str, port: int, user: str, pswd: str, name: str = "postgres" ) -> bool: """Check if we can read data on this node. :param host: Database server's hostname or IP address :type host: str :param port: Database server's port :type port: int :param user: Database username :type user: str :param pswd: Database username's password :type pswd: str :param pswd: Database name :type pswd: str :return: Wether it is writeable or not :rtype: bool """ # connection try: client = psycopg2.connect( dbname=name, user=user, password=pswd, host=host, port=port ) except psycopg2.Error: print("{}Cannot connect to the database{}".format(RED, DEF)) return False # query try: psql = client.cursor() psql.execute("SELECT;") except psycopg2.Error: return False # close psql.close() client.close() return True def check_replication(primary: dict, standby: dict) -> bool: """Check if replication is working between the primary and standby servers. :param primary: Connection details for primary server :type primary: dict :param standby: Connection details for standby server :type standby: dict :return: Wether replication between primary/stanbdy is working or not :rtype: bool """ # connections try: primary_client = psycopg2.connect(**primary) standby_client = psycopg2.connect(**standby) except psycopg2.Error: print("{}Cannot connect to the databases{}".format(RED, DEF)) return False # queries try: primary_psql = primary_client.cursor() primary_psql.execute("CREATE TABLE es_test (id serial PRIMARY KEY);") time.sleep(1) standby_psql = primary_client.cursor() standby_psql.execute("SELECT * FROM es_test;") primary_psql.execute("DROP TABLE es_test;") except psycopg2.Error: return False # close primary_psql.close() standby_psql.close() primary_client.close() standby_client.close() return True def check_listen(host: str, port: int) -> bool: """Check if server is listening (TCP only). :param host: The hostname or IP address to bind :param port: The port number to bind :type host: str :type port: int :return: Wether the `host` is listening on TCP/`port` :rtype: bool """ # try to connect to the port used by psql-primary frontend client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = client.connect_ex((host, port)) client.close() return result == 0 def check_ha(db_conn: dict, errors: int = 0, warnings: int = 0) -> tuple: """Run all tests for a highly-available setup. :param db_conn: Database connection parameters :type db_conn: dict :param errors: Error counter, defaults to 0 :param errors: int, optional :param warnings: Warning counter, defaults to 0 :param warnings: int, optional :return: Numbers of errors and warnings :rtype: tuple """ db_host = db_conn["host"] db_user = db_conn["user"] db_pass = db_conn["pass"] # get haproxy conf ha_conf = get_haproxy_conf() # get nodes data nodes = get_nodes(ha_conf) # check haproxy if not check_listen(db_host, 54321): print("{}HAProxy pgsql-primary frontend is not listening{}".format(RED, DEF)) errors += 1 else: print("HAProxy pgsql-primary frontend is listening") if not check_listen(db_host, 54322): print("{}HAProxy pgsql-standby frontend is not listening{}".format(RED, DEF)) errors += 1 else: print("HAProxy pgsql-standby frontend is listening") # check remotes for node in nodes: node_host = nodes[node]["address"] node_port = nodes[node]["port"] if not check_listen(node_host, node_port): print("{}Cannot bind {}:{}{}".format(RED, node_host, node_port, DEF)) errors += 1 else: print("Can bind {}:{}".format(node_host, node_port)) # check fenced fenced, node = check_fenced(nodes) if fenced: print("{}Node `{}` is fenced{}".format(RED, node, DEF)) errors += 1 else: print("No fenced node found") # check replication primary = { "dbname": "postgres", "user": db_user, "password": db_pass, "host": db_host, "port": 54321, } standby = { "dbname": "postgres", "user": db_user, "password": db_pass, "host": db_host, "port": 54322, } if not check_replication(primary, standby): print("{}Cannot replicate data between primary/standby{}".format(RED, DEF)) errors += 1 else: print("Can replicate data between primary/standby") return errors, warnings def check_local(db_conn: dict, errors: int = 0, warnings: int = 0) -> tuple: """Run all tests for a highly-available setup. :param db_conn: Database connection parameters :type db_conn: dict :param errors: Error counter, defaults to 0 :param errors: int, optional :param warnings: Warning counter, defaults to 0 :param warnings: int, optional :return: Numbers of errors and warnings :rtype: tuple """ db_host = db_conn["host"] db_port = db_conn["port"] db_user = db_conn["user"] db_pass = db_conn["pass"] # check listen if not check_listen(db_host, db_port): print("{}Cannot connect to {}:{}{}".format(RED, db_host, db_port, DEF)) errors += 1 # check read if not check_read(db_host, db_port, db_user, db_pass): print( "{}Cannot read data on {}@{}:{}{}".format( RED, db_user, db_host, db_port, DEF ) ) errors += 1 else: print("Can read data on {}@{}:{}".format(db_user, db_host, db_port)) # check write if not check_write(db_host, db_port, db_user, db_pass): print( "{}Cannot write data on {}@{}:{}{}".format( RED, db_user, db_host, db_port, DEF ) ) errors += 1 else: print("Can write data on {}@{}:{}".format(db_user, db_host, db_port)) return errors, warnings def main(): """Run all checks and exits with corresponding exit code.""" # envsetup utils path cwd = os.path.dirname(__file__) utils = os.path.join(cwd, "..", "utils.py") # check envsetup utils presence if not os.path.isfile(utils): print("{} not found.".format(utils)) sys.exit(1) # load envsetup utils es_utils = imp.load_source("es_utils", utils) # load configuration conf = es_utils.load_conf() # get database configuration db_host = conf.get("DB_HOST", "127.0.0.1") db_port = conf.get(int("DB_PORT"), 5432) db_user = conf.get("DB_USER", "postgres") db_pass = conf.get("DB_PG_ROOT_PWD") db_conf = {"host": db_host, "port": db_port, "user": db_user, "pass": db_pass} # determine if HA setup if is_ha(db_port): print("This setup is using a HA database") errors, warnings = check_ha(db_conf) else: print("This setup is using a local database") errors, warnings = check_local(db_conf) if errors: sys.exit(1) elif warnings: sys.exit(2) else: sys.exit(0) if __name__ == "__main__": main()