Newer
Older
"""
Criticality: High
This test check the current state of the PostgreSQL database cluster.
"""
import imp
import os
import re
import socket
try:
import psycopg2
except ImportError:
sys.exit(2)
GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
DEF = "\033[0m"
def success(message: str):
"""Print formatted success message.
:param message: Message to print
:type message: str
"""
print(" {}✔{} {}".format(GREEN, DEF, message))
def warning(message: str):
"""Print formatted warning message.
:param message: Message to print
:type message: str
"""
print(" {}✔{} {}".format(YELLOW, DEF, message))
def error(message: str):
"""Print formatted error message.
:param message: Message to print
:type message: str
"""
print(" {}✔{} {}".format(RED, DEF, message))
def is_ha(port: int) -> bool:
"""Check wether this setup is using higlhy-available databases.
:param port: Port number
:type port: int
:return: Wether it is a highly-available setup or not
:rtype: bool
"""
return port == 54321
def get_haproxy_conf(path: str = "/etc/haproxy/haproxy.cfg") -> dict:
"""Get HAProxy configuration in a dictionary.
:param path: HAProxy configuration file, defaults to "/etc/haproxy/haproxy.cfg"
:type path: str
:return: HAProxy configuration file content
:rtype: dict
"""
# init configuration dictionary
conf = {}
# load configuration file
try:
with open(path) as data:
lines = data.readlines()
except EnvironmentError:
return conf
# define patterns
pattern_block = re.compile(r"^([a-zA-Z0-9_.-]+ *[a-zA-Z0-9_.-]+)")
pattern_param = re.compile(r"^\s+([ /:()|a-zA-Z0-9_.-]+)")
# parse configuration file
for line in lines:
match_block = pattern_block.match(line)
if match_block:
block = match_block.group(1)
conf[block] = []
else:
match_param = pattern_param.match(line)
if match_param:
param = match_param.group(1)
conf[block].append(param)
return conf
def get_nodes(conf: dict) -> dict:
"""Get the list of nodes from HAProxy configuration.
:param conf: The HAProxy configuration file content
:type conf: dict
:return: The list of nodes found in HAProxy configuration
:rtype: dict
"""
servers = {}
for item in conf.keys():
if "pgsql-primary" in item:
# filter `server` lines
server_lines = [x for x in conf[item] if x.startswith("server ")]
for line in server_lines:
# split line
elements = line.split()
# get needed elements
name = elements[1]
address = elements[2].split(":")
host = address[0]
rephacheck = elements[7]
# update dictionary
servers.update(
{name: {"host": host, "port": port, "rephacheck": rephacheck}}
)
return servers
def get_node_state(host: str, port: int) -> str:
"""Get the curent state of node from its RepHACheck daemon.
:param node: The node's hostname or IP address
:param port: The node's port on which RepHACheck is listening
:type node: str
:type port: int
:return: The current state of the node accordind to its RepHACheck daemon
:rtype: str
"""
# connect and get tcp stream data
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
data = client.recv(1024)
client.close()
# extract string from data output
state = data.decode("utf-8").rstrip()
return state
def check_fenced(nodes: dict) -> tuple:
"""Check if the cluster have a fenced node.
:param nodes: The dictionary containing nodes and their informations
:type nodes: dict
:return: Wether the nodes list contains a fenced server
:rtype: tuple
"""
for node in nodes.keys():
host = nodes[node]["host"]
port = int(nodes[node]["rephacheck"])
if get_node_state(host, port) == "fenced":
return True, node
return False, None
def check_listen(host: str, port: int) -> bool:
"""Check if server is listening (TCP only).
:param host: The hostname or IP address to bind
:param port: The port number to bind
:type host: str
:type port: int
:return: Wether the `host` is listening on TCP/`port`
:rtype: bool
# try to connect to the port used by psql-primary frontend
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = client.connect_ex((host, port))
client.close()
def check_psql(db_conn: dict, query: str) -> bool:
"""Check if we can write data on this node.
:param db_conn: Database connection parameters
:type db_conn: dict
:param query: Query to execute
:type query: str
:return: Wether the query can be executed or not
# build database connection uri
if "password" in db_conn:
uri = "postgresql://{}:{}@{}:{}/{}".format(
db_conn["user"],
db_conn["password"],
db_conn["host"],
db_conn["port"],
db_conn["dbname"],
else:
uri = "postgresql:///{}".format(db_conn["dbname"])
command = ["su -l postgres -c \"psql {} -c '{}'\"".format(uri, query)]
subprocess.check_output(command, shell=True)
except subprocess.CalledProcessError:
return False
return True
def check_replication(primary: dict, standby: dict) -> bool:
"""Check if replication is working between the primary and standby servers.
:param primary: Database connection parameters for primary server
:param standby: Database connection parameters for standby server
:type standby: dict
:return: Wether replication between primary/stanbdy is working or not
:rtype: bool
"""
# connections
try:
primary_client = psycopg2.connect(**primary)
standby_client = psycopg2.connect(**standby)
except psycopg2.Error:
error("Cannot connect to the databases")
return False
# queries
try:
primary_psql = primary_client.cursor()
primary_psql.execute("CREATE TABLE es_test (id serial PRIMARY KEY);")
time.sleep(3)
standby_psql = primary_client.cursor()
standby_psql.execute("SELECT * FROM es_test;")
primary_psql.execute("DROP TABLE es_test;")
except psycopg2.Error:
return False
# close
primary_psql.close()
standby_psql.close()
primary_client.close()
standby_client.close()
return True
def check_ha(db_conn: dict, errors: int = 0) -> int:
"""Run all tests for a highly-available setup.
:param db_conn: Database connection parameters
:type db_conn: dict
:param errors: Error counter, defaults to 0
:param errors: int, optional
:return: Number of errors
:rtype: int
"""
# get haproxy conf
ha_conf = get_haproxy_conf()
# get nodes data
nodes = get_nodes(ha_conf)
# check haproxy
print("Checking local HAProxy frontends:")
if not check_listen(db_conn["host"], 54321):
error("HAProxy pgsql-primary frontend is not listening")
success("HAProxy pgsql-primary frontend is listening")
if not check_listen(db_conn["host"], 54322):
error("HAProxy pgsql-standby frontend is not listening")
success("HAProxy pgsql-standby frontend is listening")
print("Checking remote PostgreSQL nodes:")
node_port = nodes[node]["port"]
if not check_listen(node_host, node_port):
error("Cannot bind {}:{}".format(node_host, node_port))
success("Can bind {}:{}".format(node_host, node_port))
print("Checking cluster state:")
fenced, node = check_fenced(nodes)
if fenced:
error("Node `{}` is fenced".format(node))
success("No fenced node found")
print("Checking replication state:")
primary = db_conn
primary["port"] = 54321
standby = db_conn
standby["port"] = 54322
if not check_replication(primary, standby):
error("Cannot replicate data between primary/standby")
success("Can replicate data between primary/standby")
def check_local(db_conn: dict, errors: int = 0) -> int:
"""Run all tests for a highly-available setup.
:param db_conn: Database connection parameters
:type db_conn: dict
:param errors: Error counter, defaults to 0
:param errors: int, optional
:return: Number of errors
:rtype: int
"""
db_host = db_conn["host"]
db_port = db_conn["port"]
db_user = db_conn["user"]
# check listen
print("Checking local PostgreSQL node:")
if not check_listen(db_host, db_port):
error("Cannot connect to {}:{}".format(db_host, db_port))
success("Can connect to {}:{}".format(db_host, db_port))
print("Checking read operation:")
read_query = "SELECT 1;"
if not check_psql(db_conn, read_query):
error("Cannot read data on {}@{}:{}".format(db_user, db_host, db_port))
success("Can read data on {}@{}:{}".format(db_user, db_host, db_port))
print("Checking write operation:")
write_query = "CREATE TABLE es_test (id serial PRIMARY KEY);"
if not check_psql(db_conn, write_query):
error("Cannot write data on {}@{}:{}".format(db_user, db_host, db_port))
success("Can write data on {}@{}:{}".format(db_user, db_host, db_port))
# remove test table
check_psql(db_conn, "DROP TABLE es_test;")
def main():
"""Run all checks and exits with corresponding exit code."""
# envsetup utils path
cwd = os.path.dirname(__file__)
utils = os.path.join(cwd, "..", "utils.py")
# check envsetup utils presence
if not os.path.isfile(utils):
error("{} not found.".format(utils))
sys.exit(1)
# load envsetup utils
es_utils = imp.load_source("es_utils", utils)
# load configuration
conf = es_utils.load_conf()
# get database configuration
db_host = conf.get("DB_HOST") if conf.get("DB_HOST") else "127.0.0.1"
db_port = int(conf.get("DB_PORT")) if conf.get("DB_PORT") else 5432
db_user = conf.get("DB_USER") if conf.get("DB_USER") else "postgres"
db_pass = conf.get("DB_PG_ROOT_PWD")
db_conn = {"dbname": db_user, "host": db_host, "port": db_port, "user": db_user}
if db_pass:
db_conn.update({"password": db_pass})
# determine if HA setup and run according tests
if is_ha(db_port):
print("This setup is using a HA database")
errors = check_ha(db_conn)
else:
print("This setup is using a local database")
errors = check_local(db_conn)