#!/usr/bin/env python3 ''' Criticality: High Checks the current state of the PostgreSQL database cluster. ''' from pathlib import Path import re import socket import subprocess import sys import time import urllib import uuid try: import psycopg2 except ImportError: sys.exit(2) sys.path.append(str(Path(__file__).parents[1].resolve())) from utilities import logging as lg # noqa: E402 from utilities.apt import Apt # noqa: E402 from utilities.config import load_conf # noqa: E402 def check_listen(host: str, port: int) -> bool: '''Check if server is listening (TCP only). :param host: The hostname or IP address to bind :param port: The port number to bind :type host: str :type port: int :return: Wether the `host` is listening on TCP/`port` :rtype: bool ''' # try to connect to the port used by psql-primary frontend client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = client.connect_ex((host, port)) client.close() return result == 0 def get_haproxy_conf(path: str = '/etc/haproxy/haproxy.cfg') -> dict: '''Get HAProxy configuration in a dictionary. :param path: HAProxy configuration file, defaults to '/etc/haproxy/haproxy.cfg' :type path: str :return: HAProxy configuration file content :rtype: dict ''' # init configuration dictionary conf = {} # load configuration file try: with open(path) as data: lines = data.readlines() except EnvironmentError: return conf # define patterns pattern_block = re.compile(r'^([a-zA-Z0-9_.-]+ *[a-zA-Z0-9_.-]+)') pattern_param = re.compile(r'^\s+([ /:()|a-zA-Z0-9_.-]+)') # parse configuration file for line in lines: match_block = pattern_block.match(line) if match_block: block = match_block.group(1) conf[block] = [] else: match_param = pattern_param.match(line) if match_param: param = match_param.group(1) conf[block].append(param) return conf def get_nodes(conf: dict) -> dict: '''Get the list of nodes from HAProxy configuration. :param conf: The HAProxy configuration file content :type conf: dict :return: The list of nodes found in HAProxy configuration :rtype: dict ''' servers = {} for item in conf.keys(): if 'pgsql-primary' in item: # filter `server` lines server_lines = [x for x in conf[item] if x.startswith('server ')] for line in server_lines: # split line elements = line.split() # get needed elements name = elements[1] address = elements[2].split(':') host = address[0] port = int(address[1]) rephacheck = elements[7] # update dictionary servers.update( {name: {'host': host, 'port': port, 'rephacheck': rephacheck}} ) return servers def get_node_state(host: str, port: int) -> str: '''Get the curent state of node from its RepHACheck daemon. :param node: The node's hostname or IP address :param port: The node's port on which RepHACheck is listening :type node: str :type port: int :return: The current state of the node accordind to its RepHACheck daemon :rtype: str ''' # connect and get tcp stream data client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, port)) data = client.recv(1024) client.close() # extract string from data output state = data.decode('utf-8').rstrip() return state def check_fenced(nodes: dict) -> tuple: '''Check if the cluster have a fenced node. :param nodes: The dictionary containing nodes and their informations :type nodes: dict :return: Wether the nodes list contains a fenced server :rtype: tuple ''' for node in nodes.keys(): host = nodes[node]['host'] port = int(nodes[node]['rephacheck']) if get_node_state(host, port) == 'fenced': return True, node return False, None def check_psql(db_conn: dict, query: str) -> tuple: '''Check if we can write data on this node. :param db_conn: Database connection parameters :type db_conn: dict :param query: Query to execute :type query: str :return: Wether the query can be executed or not :rtype: tuple ''' # build database connection uri if 'password' in db_conn: uri = 'postgresql://{}:{}@{}:{}/{}'.format( db_conn['user'], urllib.parse.quote_plus(db_conn['password']), db_conn['host'], db_conn['port'], db_conn['dbname'], ) else: uri = 'postgresql:///{}'.format(db_conn['dbname']) # format command command = ['su -l postgres -c "psql {} -c \'{}\'"'.format(uri, query)] # execute try: subprocess.check_output(command, shell=True) except subprocess.CalledProcessError as psql_error: return False, str(psql_error).rstrip() return True, None def check_replication(primary: dict, standby: dict) -> tuple: '''Check if replication is working between the primary and standby servers. :param primary: Database connection parameters for primary server :type primary: dict :param standby: Database connection parameters for standby server :type standby: dict :return: Wether replication between primary/stanbdy is working or not :rtype: tuple ''' # connections try: primary_client = psycopg2.connect(**primary) standby_client = psycopg2.connect(**standby) except psycopg2.Error as repl_conn_error: return False, str(repl_conn_error).rstrip() # random id rand = uuid.uuid4().hex write_query = 'CREATE TABLE es_test_{} (id serial PRIMARY KEY);'.format(rand) read_query = 'SELECT * FROM es_test_{};'.format(rand) # write try: primary_psql = primary_client.cursor() primary_psql.execute(write_query) except psycopg2.Error as repl_write_error: return False, str(repl_write_error).rstrip() # read max_time = 6.0 timer = 0.0 while timer < max_time: time.sleep(timer) timer += 0.2 try: standby_psql = primary_client.cursor() standby_psql.execute(read_query) msg = 'took ~{}s'.format(str(timer)) break except psycopg2.Error as repl_read_error: msg = str(repl_read_error).rstrip() else: return False, msg # delete try: primary_psql.execute('DROP TABLE es_test_{};'.format(rand)) except psycopg2.Error: pass # close primary_psql.close() standby_psql.close() primary_client.close() standby_client.close() return True, msg def check_ha(db_conn: dict, errors: int = 0) -> int: '''Run all tests for a highly-available setup. :param db_conn: Database connection parameters :type db_conn: dict :param errors: Error counter, defaults to 0 :param errors: int, optional :return: Number of errors :rtype: int ''' # get haproxy conf ha_conf = get_haproxy_conf() # get nodes data nodes = get_nodes(ha_conf) # check haproxy lg.log('Checking local HAProxy frontends:') if not check_listen(db_conn['host'], 54321): lg.error('HAProxy pgsql-primary frontend is not listening') errors += 1 else: lg.success('HAProxy pgsql-primary frontend is listening') if not check_listen(db_conn['host'], 54322): lg.error('HAProxy pgsql-standby frontend is not listening') errors += 1 else: lg.success('HAProxy pgsql-standby frontend is listening') # check remotes lg.log('Checking remote PostgreSQL nodes:') for node in nodes: node_host = nodes[node]['host'] node_port = nodes[node]['port'] if not check_listen(node_host, node_port): lg.error('cannot bind {}:{}'.format(node_host, node_port)) errors += 1 else: lg.success('can bind {}:{}'.format(node_host, node_port)) # check fenced lg.log('Checking cluster state:') fenced, node = check_fenced(nodes) if fenced: lg.error('Node `{}` is fenced'.format(node)) errors += 1 else: lg.success('No fenced node found') # check replication lg.log('Checking replication state:') primary = db_conn.copy() primary['port'] = 54321 standby = db_conn.copy() standby['port'] = 54322 status, info = check_replication(primary, standby) if not status: lg.error('cannot replicate between primary/standby ({})'.format(info)) errors += 1 else: lg.success('can replicate between primary/standby ({})'.format(info)) return errors def check_local(db_conn: dict, errors: int = 0) -> int: '''Run all tests for a highly-available setup. :param db_conn: Database connection parameters :type db_conn: dict :param errors: Error counter, defaults to 0 :param errors: int, optional :return: Number of errors :rtype: int ''' host = db_conn['host'] port = db_conn['port'] user = db_conn['user'] # check listen lg.log('Checking local PostgreSQL node:') if not check_listen(host, port): lg.error('cannot connect to {}:{}'.format(host, port)) errors += 1 else: lg.success('can connect to {}:{}'.format(host, port)) # check read lg.log('Checking read operation:') read_query = 'SELECT 1;' status, info = check_psql(db_conn, read_query) if not status: lg.error('cannot read from {}@{}:{} ({})'.format(user, host, port, info)) errors += 1 else: lg.success('can read from {}@{}:{}'.format(user, host, port)) # get replication state if available if check_listen('127.0.0.1', 8543): state = get_node_state('127.0.0.1', 8543) else: state = 'primary' # check write lg.log('Checking write operation:') if state != 'primary': lg.info('this database is in {} state'.format(state)) else: rand = uuid.uuid4().hex write_query = 'CREATE TABLE es_test_{} (id serial PRIMARY KEY);'.format(rand) status, info = check_psql(db_conn, write_query) if not status: lg.error('cannot write on {}@{}:{} ({})'.format(user, host, port, info)) errors += 1 else: lg.success('can write on {}@{}:{}'.format(user, host, port)) # remove test table check_psql(db_conn, 'DROP TABLE es_test_{};'.format(rand)) return errors def main(): '''Run all checks and exits with corresponding exit code.''' apt = Apt() pattern_postgresql_client = re.compile(r'postgresql-client.*') if not list(filter(pattern_postgresql_client.match, apt.installed_packages)): exit(2) # load configuration conf = load_conf() # get database configuration db_host = conf.get('DB_HOST') if conf.get('DB_HOST') else '127.0.0.1' db_port = 54321 if check_listen('127.0.0.1', 54321) else 5432 db_user = conf.get('DB_USER') if conf.get('DB_USER') else 'postgres' db_pass = conf.get('DB_PG_ROOT_PWD') db_conn = {'dbname': db_user, 'host': db_host, 'port': db_port, 'user': db_user} if db_pass: db_conn.update({'password': db_pass}) # determine if HA setup and run according tests lg.log('Checking availibility mode:') if check_listen('127.0.0.1', 54321) and check_listen('127.0.0.1', 54322): lg.info('this setup is using a primary and standby database') errors = check_ha(db_conn) else: lg.info('this setup is using a single database') errors = check_local(db_conn) if errors: sys.exit(1) else: sys.exit(0) if __name__ == '__main__': main()