diff --git a/tests/test_backup.py b/tests/test_backup.py index 37cd11ca0cea7bee3558d0bbb9b174cd2772c477..ade4c97083093a0c0b476d97917f9c714a856d50 100755 --- a/tests/test_backup.py +++ b/tests/test_backup.py @@ -1,147 +1,200 @@ #!/usr/bin/env python3 -# -*- coding: utf-8 -*- # Copyright 2017, Florent Thiery -''' + +""" Criticality: Normal Checks that the server backups are not older than a day. -''' +""" + from datetime import datetime -import imp +from pathlib import Path import os import socket import subprocess import sys -GREEN = '\033[92m' -RED = '\033[91m' -DEF = '\033[0m' +sys.path.append(str(Path(__file__).parents[1].resolve())) -MAX_AGE_H = 48 +# pylint: disable=wrong-import-position +from envsetup import utils as u # noqa: E402 +MAX_AGE = 2 -def print_red(string): - print(RED + string + DEF) +def test_ssh(host: str) -> bool: + """Check that MediaVault server can reached. -def print_green(string): - print(GREEN + string + DEF) + :param ip: MediaVault hostname or IP address + :type ip: str + :return: Wether it can connect to server or not + :rtype: bool + """ + print("Checking connection to MediaVault ({}):".format(host)) -def test_ssh(ip): - cmd = 'ssh -o StrictHostKeyChecking=no -o PasswordAuthentication=no %s ls /tmp' % ip - print('Connecting to MediaVault: %s' % cmd) + cmd = "ssh -o StrictHostKeyChecking=no -o PasswordAuthentication=no {} :".format( + host + ) try: subprocess.check_output(cmd, shell=True, timeout=5) - print('%sLogged in successfully%s' % (GREEN, DEF)) + u.success("Logged in successfully") except subprocess.CalledProcessError: - print('%sFailed to login using SSH, run ssh-copy-id %s %s' % (RED, ip, DEF)) + u.error("Failed to login using SSH public key authentication") return False except subprocess.TimeoutExpired: try: - cmd_port = 'nc -z -w2 {} 22'.format(ip) + cmd_port = "nc -z -w2 {} 22".format(host) subprocess.check_output(cmd_port, shell=True, timeout=5) - except Exception: - cmd_ping = 'ping -c2 -w4 {}'.format(ip) + u.error("Failed to bind SSH port") + except subprocess.CalledProcessError: + cmd_ping = "ping -c2 -w4 {}".format(host) subprocess.check_output(cmd_ping, shell=True, timeout=15) + u.error("Failed to ping host") return False + return True -def test_last_backup_is_recent(server): +def test_last_backup_is_recent(server: str) -> bool: + """Check that the latest backup is recent enough. + + :param server: MediaVault hostname or IP address + :type server: str + :return: Wether the latest backup is too old or not + :rtype: bool + """ + + print("Checking latest backup age:") + client = socket.gethostname() - path = '/backup/%s/home/latest' % client - cmd = 'ssh -o StrictHostKeyChecking=no %s ls -l %s | grep latest' % (server, path) + path = "/backup/{}/home/latest".format(client) + cmd = "ssh -o StrictHostKeyChecking=no {} ls -l {} | grep latest".format( + server, path + ) status, out = subprocess.getstatusoutput(cmd) if status == 0: - date = out.strip().split(' ')[-1] - pdate = datetime.strptime(date, '%Y-%m-%d-%H%M%S') - if (datetime.now() - pdate).days > 2: - print('Backup is older than 2 days') + date = out.strip().split(" ")[-1] + pdate = datetime.strptime(date, "%Y-%m-%d-%H%M%S") + if (datetime.now() - pdate).days > MAX_AGE: + u.error("Backup is older than {} days".format(MAX_AGE)) return False - else: - print('There is a backup that is less than 2 days old, this is fine') - return True - else: - out = out or 'No output.' - print('SSH access is not working (code: %s):\n%s' % (status, out)) - return False + u.success("Backup is less than {} days old".format(MAX_AGE)) + return True + + out = out or "No output." + u.error("SSH access is not working (code: {}): {}".format(status, out)) + + return False + + +def check_backup_is_incremental(path: str) -> bool: + """Check that backup is incremental. + + :param path: Backup folder path + :type param: str + :return: Wether the backup is incremental or not + :rtype: bool + """ + print("Checking that backup is incremental:") -def check_backup_is_incremental(path): - # incremental backups done with tmbackup mean that they will all at least contain links dirs = os.listdir(path) is_incremental = True - for d in dirs: + for directory in dirs: files_count = 0 - folder_path = os.path.join(path, d) + folder_path = os.path.join(path, directory) if os.path.isdir(folder_path): files_count = len(os.listdir(folder_path)) if files_count == 0: - print('Folder %s is empty, this indicates non-incremental backups (we are expecting links), removing folder' % folder_path) + u.error("Folder {} is empty".format(folder_path)) is_incremental = False os.rmdir(folder_path) + return is_incremental -def check_local_backup(path): - all_ok = True +def check_local_backup(path: str) -> bool: + """Check that local backup is in a correct state. + + :param path: Local backup folder path + :type path: str + :return: Wether local backup is correct or not + :rtype: bool + """ + backup_folder = os.path.dirname(path) - print('Checking %s' % backup_folder) - latest = os.path.join(backup_folder, 'latest') + print("Checking %s" % backup_folder) + + all_ok = True + latest = os.path.join(backup_folder, "latest") if os.path.exists(latest): # resolve symbolic link latest = os.path.realpath(latest) - date = os.path.basename(latest) - d = datetime.strptime(date, '%Y-%m-%d-%H%M%S') + latest_date = os.path.basename(latest) + date = datetime.strptime(latest_date, "%Y-%m-%d-%H%M%S") now = datetime.now() - diff_seconds = (now - d).total_seconds() - if diff_seconds > MAX_AGE_H * 3600: - print_red('Backup %s is older than %sh (%ih)' % (backup_folder, MAX_AGE_H, diff_seconds / 3600)) + diff_seconds = (now - date).total_seconds() + if diff_seconds > MAX_AGE * 24 * 3600: + u.error( + "Backup {} is older than {}d (%{}h)".format( + backup_folder, MAX_AGE, diff_seconds / 3600 + ) + ) all_ok = False else: - print_green('Backup %s is fine' % backup_folder) + u.success("Backup {} is fine".format(backup_folder)) if not check_backup_is_incremental(backup_folder): all_ok = False - elif os.path.exists(os.path.join(backup_folder, 'backup.inprogress')): - print_red('Initial backup %s still running' % backup_folder) + elif os.path.exists(os.path.join(backup_folder, "backup.inprogress")): + u.error("Initial backup %s still running" % backup_folder) all_ok = False else: - print_red('Backup %s is not working' % latest) + u.error("Backup {} is not working".format(latest)) all_ok = False + return all_ok -def check_local_backups(paths): +def check_local_backups(paths: str) -> bool: + """Run check for all local backup paths. + + :param paths: Comma separated list of backup paths + :type paths: str + :return: Wether all backups are good or not + :rtype: bool + """ + all_ok = True - folders = paths.split(',') - for f in folders: - cmd = "find %s -maxdepth 4 -name backup.marker" % f - status, out = subprocess.getstatusoutput(cmd) - for bf in out.split('\n'): - all_ok = min(check_local_backup(bf), all_ok) + folders = paths.split(",") + for folder in folders: + cmd = "find {} -maxdepth 4 -name backup.marker".format(folder) + _, out = subprocess.getstatusoutput(cmd) + for backup_folder in out.split("\n"): + all_ok = min(check_local_backup(backup_folder), all_ok) + return all_ok -os.chdir(os.path.dirname(__file__)) -if os.path.isfile('../utils.py'): - es_utils = imp.load_source('es_utils', '../utils.py') - conf = es_utils.load_conf() - BACKUP_SERVER = conf.get('BACKUP_SERVER') - LOCAL_BACKUP_FOLDERS = conf.get('LOCAL_BACKUP_FOLDERS') - if BACKUP_SERVER: - if not test_ssh(BACKUP_SERVER): - print('Failed to ssh into backup server') +def main(): + """Run all checks and exits with corresponding exit code.""" + + conf = u.load_conf() + backup_server = conf.get("BACKUP_SERVER") + local_backup_folders = conf.get("LOCAL_BACKUP_FOLDERS") + if backup_server: + if not test_ssh(backup_server): sys.exit(1) else: - if not test_last_backup_is_recent(BACKUP_SERVER): + if not test_last_backup_is_recent(backup_server): sys.exit(1) else: sys.exit(0) - elif LOCAL_BACKUP_FOLDERS: - sys.exit(not check_local_backups(LOCAL_BACKUP_FOLDERS)) + elif local_backup_folders: + sys.exit(not check_local_backups(local_backup_folders)) else: - print('No BACKUP_SERVER defined in config, untestable') + print("No backup_server defined in config, untestable") sys.exit(2) -else: - print('Unable to load config, untestable') - sys.exit(2) + + +if __name__ == "__main__": + main()