Skip to content
Snippets Groups Projects
Commit dbce90a1 authored by Nicolas KAROLAK's avatar Nicolas KAROLAK
Browse files

refactor(tests): re-format test_backup

parent 28afd136
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2017, Florent Thiery # Copyright 2017, Florent Thiery
'''
"""
Criticality: Normal Criticality: Normal
Checks that the server backups are not older than a day. Checks that the server backups are not older than a day.
''' """
from datetime import datetime from datetime import datetime
import imp from pathlib import Path
import os import os
import socket import socket
import subprocess import subprocess
import sys import sys
GREEN = '\033[92m' sys.path.append(str(Path(__file__).parents[1].resolve()))
RED = '\033[91m'
DEF = '\033[0m'
MAX_AGE_H = 48 # pylint: disable=wrong-import-position
from envsetup import utils as u # noqa: E402
MAX_AGE = 2
def print_red(string):
print(RED + string + DEF)
def test_ssh(host: str) -> bool:
"""Check that MediaVault server can reached.
def print_green(string): :param ip: MediaVault hostname or IP address
print(GREEN + string + DEF) :type ip: str
:return: Wether it can connect to server or not
:rtype: bool
"""
print("Checking connection to MediaVault ({}):".format(host))
def test_ssh(ip): cmd = "ssh -o StrictHostKeyChecking=no -o PasswordAuthentication=no {} :".format(
cmd = 'ssh -o StrictHostKeyChecking=no -o PasswordAuthentication=no %s ls /tmp' % ip host
print('Connecting to MediaVault: %s' % cmd) )
try: try:
subprocess.check_output(cmd, shell=True, timeout=5) subprocess.check_output(cmd, shell=True, timeout=5)
print('%sLogged in successfully%s' % (GREEN, DEF)) u.success("Logged in successfully")
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
print('%sFailed to login using SSH, run ssh-copy-id %s %s' % (RED, ip, DEF)) u.error("Failed to login using SSH public key authentication")
return False return False
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
try: try:
cmd_port = 'nc -z -w2 {} 22'.format(ip) cmd_port = "nc -z -w2 {} 22".format(host)
subprocess.check_output(cmd_port, shell=True, timeout=5) subprocess.check_output(cmd_port, shell=True, timeout=5)
except Exception: u.error("Failed to bind SSH port")
cmd_ping = 'ping -c2 -w4 {}'.format(ip) except subprocess.CalledProcessError:
cmd_ping = "ping -c2 -w4 {}".format(host)
subprocess.check_output(cmd_ping, shell=True, timeout=15) subprocess.check_output(cmd_ping, shell=True, timeout=15)
u.error("Failed to ping host")
return False return False
return True return True
def test_last_backup_is_recent(server): def test_last_backup_is_recent(server: str) -> bool:
"""Check that the latest backup is recent enough.
:param server: MediaVault hostname or IP address
:type server: str
:return: Wether the latest backup is too old or not
:rtype: bool
"""
print("Checking latest backup age:")
client = socket.gethostname() client = socket.gethostname()
path = '/backup/%s/home/latest' % client path = "/backup/{}/home/latest".format(client)
cmd = 'ssh -o StrictHostKeyChecking=no %s ls -l %s | grep latest' % (server, path) cmd = "ssh -o StrictHostKeyChecking=no {} ls -l {} | grep latest".format(
server, path
)
status, out = subprocess.getstatusoutput(cmd) status, out = subprocess.getstatusoutput(cmd)
if status == 0: if status == 0:
date = out.strip().split(' ')[-1] date = out.strip().split(" ")[-1]
pdate = datetime.strptime(date, '%Y-%m-%d-%H%M%S') pdate = datetime.strptime(date, "%Y-%m-%d-%H%M%S")
if (datetime.now() - pdate).days > 2: if (datetime.now() - pdate).days > MAX_AGE:
print('Backup is older than 2 days') u.error("Backup is older than {} days".format(MAX_AGE))
return False return False
else: u.success("Backup is less than {} days old".format(MAX_AGE))
print('There is a backup that is less than 2 days old, this is fine') return True
return True
else: out = out or "No output."
out = out or 'No output.' u.error("SSH access is not working (code: {}): {}".format(status, out))
print('SSH access is not working (code: %s):\n%s' % (status, out))
return False return False
def check_backup_is_incremental(path: str) -> bool:
"""Check that backup is incremental.
:param path: Backup folder path
:type param: str
:return: Wether the backup is incremental or not
:rtype: bool
"""
print("Checking that backup is incremental:")
def check_backup_is_incremental(path):
# incremental backups done with tmbackup mean that they will all at least contain links
dirs = os.listdir(path) dirs = os.listdir(path)
is_incremental = True is_incremental = True
for d in dirs: for directory in dirs:
files_count = 0 files_count = 0
folder_path = os.path.join(path, d) folder_path = os.path.join(path, directory)
if os.path.isdir(folder_path): if os.path.isdir(folder_path):
files_count = len(os.listdir(folder_path)) files_count = len(os.listdir(folder_path))
if files_count == 0: if files_count == 0:
print('Folder %s is empty, this indicates non-incremental backups (we are expecting links), removing folder' % folder_path) u.error("Folder {} is empty".format(folder_path))
is_incremental = False is_incremental = False
os.rmdir(folder_path) os.rmdir(folder_path)
return is_incremental return is_incremental
def check_local_backup(path): def check_local_backup(path: str) -> bool:
all_ok = True """Check that local backup is in a correct state.
:param path: Local backup folder path
:type path: str
:return: Wether local backup is correct or not
:rtype: bool
"""
backup_folder = os.path.dirname(path) backup_folder = os.path.dirname(path)
print('Checking %s' % backup_folder) print("Checking %s" % backup_folder)
latest = os.path.join(backup_folder, 'latest')
all_ok = True
latest = os.path.join(backup_folder, "latest")
if os.path.exists(latest): if os.path.exists(latest):
# resolve symbolic link # resolve symbolic link
latest = os.path.realpath(latest) latest = os.path.realpath(latest)
date = os.path.basename(latest) latest_date = os.path.basename(latest)
d = datetime.strptime(date, '%Y-%m-%d-%H%M%S') date = datetime.strptime(latest_date, "%Y-%m-%d-%H%M%S")
now = datetime.now() now = datetime.now()
diff_seconds = (now - d).total_seconds() diff_seconds = (now - date).total_seconds()
if diff_seconds > MAX_AGE_H * 3600: if diff_seconds > MAX_AGE * 24 * 3600:
print_red('Backup %s is older than %sh (%ih)' % (backup_folder, MAX_AGE_H, diff_seconds / 3600)) u.error(
"Backup {} is older than {}d (%{}h)".format(
backup_folder, MAX_AGE, diff_seconds / 3600
)
)
all_ok = False all_ok = False
else: else:
print_green('Backup %s is fine' % backup_folder) u.success("Backup {} is fine".format(backup_folder))
if not check_backup_is_incremental(backup_folder): if not check_backup_is_incremental(backup_folder):
all_ok = False all_ok = False
elif os.path.exists(os.path.join(backup_folder, 'backup.inprogress')): elif os.path.exists(os.path.join(backup_folder, "backup.inprogress")):
print_red('Initial backup %s still running' % backup_folder) u.error("Initial backup %s still running" % backup_folder)
all_ok = False all_ok = False
else: else:
print_red('Backup %s is not working' % latest) u.error("Backup {} is not working".format(latest))
all_ok = False all_ok = False
return all_ok return all_ok
def check_local_backups(paths): def check_local_backups(paths: str) -> bool:
"""Run check for all local backup paths.
:param paths: Comma separated list of backup paths
:type paths: str
:return: Wether all backups are good or not
:rtype: bool
"""
all_ok = True all_ok = True
folders = paths.split(',') folders = paths.split(",")
for f in folders: for folder in folders:
cmd = "find %s -maxdepth 4 -name backup.marker" % f cmd = "find {} -maxdepth 4 -name backup.marker".format(folder)
status, out = subprocess.getstatusoutput(cmd) _, out = subprocess.getstatusoutput(cmd)
for bf in out.split('\n'): for backup_folder in out.split("\n"):
all_ok = min(check_local_backup(bf), all_ok) all_ok = min(check_local_backup(backup_folder), all_ok)
return all_ok return all_ok
os.chdir(os.path.dirname(__file__)) def main():
if os.path.isfile('../utils.py'): """Run all checks and exits with corresponding exit code."""
es_utils = imp.load_source('es_utils', '../utils.py')
conf = es_utils.load_conf() conf = u.load_conf()
BACKUP_SERVER = conf.get('BACKUP_SERVER') backup_server = conf.get("BACKUP_SERVER")
LOCAL_BACKUP_FOLDERS = conf.get('LOCAL_BACKUP_FOLDERS') local_backup_folders = conf.get("LOCAL_BACKUP_FOLDERS")
if BACKUP_SERVER: if backup_server:
if not test_ssh(BACKUP_SERVER): if not test_ssh(backup_server):
print('Failed to ssh into backup server')
sys.exit(1) sys.exit(1)
else: else:
if not test_last_backup_is_recent(BACKUP_SERVER): if not test_last_backup_is_recent(backup_server):
sys.exit(1) sys.exit(1)
else: else:
sys.exit(0) sys.exit(0)
elif LOCAL_BACKUP_FOLDERS: elif local_backup_folders:
sys.exit(not check_local_backups(LOCAL_BACKUP_FOLDERS)) sys.exit(not check_local_backups(local_backup_folders))
else: else:
print('No BACKUP_SERVER defined in config, untestable') print("No backup_server defined in config, untestable")
sys.exit(2) sys.exit(2)
else:
print('Unable to load config, untestable')
sys.exit(2) if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment