Newer
Older
Checks the current state of the PostgreSQL database cluster.
Nicolas KAROLAK
committed
import uuid
try:
import psycopg2
except ImportError:
sys.exit(2)
sys.path.append(str(Path(__file__).parents[1].resolve()))
from utilities import logging as lg # noqa: E402
from utilities.apt import Apt # noqa: E402
from utilities.config import load_conf # noqa: E402
def check_listen(host: str, port: int) -> bool:
'''Check if server is listening (TCP only).
:param host: The hostname or IP address to bind
:param port: The port number to bind
:type host: str
:return: Wether the `host` is listening on TCP/`port`
# try to connect to the port used by psql-primary frontend
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = client.connect_ex((host, port))
client.close()
return result == 0
def get_haproxy_conf(path: str = '/etc/haproxy/haproxy.cfg') -> dict:
'''Get HAProxy configuration in a dictionary.
:param path: HAProxy configuration file, defaults to '/etc/haproxy/haproxy.cfg'
:type path: str
:return: HAProxy configuration file content
:rtype: dict
conf = {}
# load configuration file
try:
with open(path) as data:
lines = data.readlines()
except EnvironmentError:
return conf
# define patterns
pattern_block = re.compile(r'^([a-zA-Z0-9_.-]+ *[a-zA-Z0-9_.-]+)')
pattern_param = re.compile(r'^\s+([ /:()|a-zA-Z0-9_.-]+)')
# parse configuration file
for line in lines:
match_block = pattern_block.match(line)
if match_block:
block = match_block.group(1)
conf[block] = []
else:
match_param = pattern_param.match(line)
if match_param:
param = match_param.group(1)
conf[block].append(param)
return conf
def get_nodes(conf: dict) -> dict:
'''Get the list of nodes from HAProxy configuration.
:param conf: The HAProxy configuration file content
:type conf: dict
:return: The list of nodes found in HAProxy configuration
:rtype: dict
servers = {}
server_lines = [x for x in conf[item] if x.startswith('server ')]
for line in server_lines:
# split line
elements = line.split()
# get needed elements
name = elements[1]
rephacheck = elements[7]
# update dictionary
servers.update(
{name: {'host': host, 'port': port, 'rephacheck': rephacheck}}
def get_node_state(host: str, port: int) -> str:
'''Get the curent state of node from its RepHACheck daemon.
:param node: The node's hostname or IP address
:param port: The node's port on which RepHACheck is listening
:type node: str
:type port: int
:return: The current state of the node accordind to its RepHACheck daemon
:rtype: str
# connect and get tcp stream data
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
data = client.recv(1024)
client.close()
# extract string from data output
state = data.decode('utf-8').rstrip()
def check_fenced(nodes: dict) -> tuple:
'''Check if the cluster have a fenced node.
:param nodes: The dictionary containing nodes and their informations
:type nodes: dict
:return: Wether the nodes list contains a fenced server
host = nodes[node]['host']
port = int(nodes[node]['rephacheck'])
if get_node_state(host, port) == 'fenced':
return True, node
return False, None
def check_psql(db_conn: dict, query: str) -> tuple:
'''Check if we can write data on this node.
:param db_conn: Database connection parameters
:type db_conn: dict
:param query: Query to execute
:type query: str
:return: Wether the query can be executed or not
# build database connection uri
if 'password' in db_conn:
uri = 'postgresql://{}:{}@{}:{}/{}'.format(
db_conn['user'],
urllib.parse.quote_plus(db_conn['password']),
db_conn['host'],
db_conn['port'],
db_conn['dbname'],
uri = 'postgresql:///{}'.format(db_conn['dbname'])
command = ['su -l postgres -c "psql {} -c \'{}\'"'.format(uri, query)]
subprocess.check_output(command, shell=True)
except subprocess.CalledProcessError as psql_error:
return False, str(psql_error).rstrip()
def check_replication(primary: dict, standby: dict) -> tuple:
'''Check if replication is working between the primary and standby servers.
:param primary: Database connection parameters for primary server
:param standby: Database connection parameters for standby server
:type standby: dict
:return: Wether replication between primary/stanbdy is working or not
:rtype: tuple
# connections
try:
primary_client = psycopg2.connect(**primary)
standby_client = psycopg2.connect(**standby)
except psycopg2.Error as repl_conn_error:
return False, str(repl_conn_error).rstrip()
Nicolas KAROLAK
committed
# random id
rand = uuid.uuid4().hex
write_query = 'CREATE TABLE es_test_{} (id serial PRIMARY KEY);'.format(rand)
read_query = 'SELECT * FROM es_test_{};'.format(rand)
Nicolas KAROLAK
committed
# write
try:
primary_psql = primary_client.cursor()
Nicolas KAROLAK
committed
primary_psql.execute(write_query)
except psycopg2.Error as repl_write_error:
return False, str(repl_write_error).rstrip()
Nicolas KAROLAK
committed
# read
max_time = 6.0
timer = 0.0
while timer < max_time:
time.sleep(timer)
timer += 0.2
try:
standby_psql = primary_client.cursor()
standby_psql.execute(read_query)
msg = 'took ~{}s'.format(str(timer))
Nicolas KAROLAK
committed
break
except psycopg2.Error as repl_read_error:
msg = str(repl_read_error).rstrip()
Nicolas KAROLAK
committed
else:
return False, msg
Nicolas KAROLAK
committed
# delete
try:
primary_psql.execute('DROP TABLE es_test_{};'.format(rand))
Nicolas KAROLAK
committed
except psycopg2.Error:
pass
# close
primary_psql.close()
standby_psql.close()
primary_client.close()
standby_client.close()
return True, msg
def check_ha(db_conn: dict, errors: int = 0) -> int:
'''Run all tests for a highly-available setup.
:param db_conn: Database connection parameters
:type db_conn: dict
:param errors: Error counter, defaults to 0
:param errors: int, optional
:return: Number of errors
:rtype: int
# get haproxy conf
ha_conf = get_haproxy_conf()
# get nodes data
nodes = get_nodes(ha_conf)
# check haproxy
lg.log('Checking local HAProxy frontends:')
if not check_listen(db_conn['host'], 54321):
lg.error('HAProxy pgsql-primary frontend is not listening')
lg.success('HAProxy pgsql-primary frontend is listening')
if not check_listen(db_conn['host'], 54322):
lg.error('HAProxy pgsql-standby frontend is not listening')
lg.success('HAProxy pgsql-standby frontend is listening')
lg.log('Checking remote PostgreSQL nodes:')
node_host = nodes[node]['host']
node_port = nodes[node]['port']
if not check_listen(node_host, node_port):
lg.error('cannot bind {}:{}'.format(node_host, node_port))
lg.success('can bind {}:{}'.format(node_host, node_port))
lg.log('Checking cluster state:')
fenced, node = check_fenced(nodes)
if fenced:
lg.error('Node `{}` is fenced'.format(node))
lg.success('No fenced node found')
lg.log('Checking replication state:')
primary = db_conn.copy()
standby = db_conn.copy()
status, info = check_replication(primary, standby)
if not status:
lg.error('cannot replicate between primary/standby ({})'.format(info))
lg.success('can replicate between primary/standby ({})'.format(info))
def check_local(db_conn: dict, errors: int = 0) -> int:
'''Run all tests for a highly-available setup.
:param db_conn: Database connection parameters
:type db_conn: dict
:param errors: Error counter, defaults to 0
:param errors: int, optional
:return: Number of errors
:rtype: int
host = db_conn['host']
port = db_conn['port']
user = db_conn['user']
lg.log('Checking local PostgreSQL node:')
if not check_listen(host, port):
lg.error('cannot connect to {}:{}'.format(host, port))
lg.success('can connect to {}:{}'.format(host, port))
lg.log('Checking read operation:')
read_query = 'SELECT 1;'
status, info = check_psql(db_conn, read_query)
if not status:
lg.error('cannot read from {}@{}:{} ({})'.format(user, host, port, info))
lg.success('can read from {}@{}:{}'.format(user, host, port))
# get replication state if available
if check_listen('127.0.0.1', 8543):
state = get_node_state('127.0.0.1', 8543)
else:
state = 'primary'
lg.log('Checking write operation:')
if state != 'primary':
lg.info('this database is in {} state'.format(state))
write_query = 'CREATE TABLE es_test_{} (id serial PRIMARY KEY);'.format(rand)
status, info = check_psql(db_conn, write_query)
if not status:
lg.error('cannot write on {}@{}:{} ({})'.format(user, host, port, info))
lg.success('can write on {}@{}:{}'.format(user, host, port))
check_psql(db_conn, 'DROP TABLE es_test_{};'.format(rand))
'''Run all checks and exits with corresponding exit code.'''
pattern_postgresql_client = re.compile(r'postgresql-client.*')
Baptiste DE RENZO
committed
if not list(filter(pattern_postgresql_client.match, apt.installed_packages)):
db_host = conf.get('DB_HOST') if conf.get('DB_HOST') else '127.0.0.1'
db_port = 54321 if check_listen('127.0.0.1', 54321) else 5432
db_user = conf.get('DB_USER') if conf.get('DB_USER') else 'postgres'
db_pass = conf.get('DB_PG_ROOT_PWD')
db_conn = {'dbname': db_user, 'host': db_host, 'port': db_port, 'user': db_user}
db_conn.update({'password': db_pass})
# determine if HA setup and run according tests
lg.log('Checking availibility mode:')
if check_listen('127.0.0.1', 54321) and check_listen('127.0.0.1', 54322):
lg.info('this setup is using a primary and standby database')
errors = check_ha(db_conn)
lg.info('this setup is using a single database')
errors = check_local(db_conn)
if errors:
sys.exit(1)
else:
sys.exit(0)