#!/usr/bin/env python3 # Copyright 2017, Florent Thiery """ Criticality: Normal Checks that the server backups are not older than a day. """ from datetime import datetime from pathlib import Path import os import socket import subprocess import sys sys.path.append(str(Path(__file__).parents[1].resolve())) # pylint: disable=wrong-import-position from envsetup import utils as u # noqa: E402 MAX_AGE = 2 def test_ssh(host: str) -> bool: """Check that MediaVault server can reached. :param ip: MediaVault hostname or IP address :type ip: str :return: Wether it can connect to server or not :rtype: bool """ print("Checking connection to MediaVault ({}):".format(host)) cmd = "ssh -o StrictHostKeyChecking=no -o PasswordAuthentication=no {} :".format( host ) try: subprocess.check_output(cmd, shell=True, timeout=5) u.success("logged in successfully") except subprocess.CalledProcessError: u.error("failed to login using SSH public key authentication") return False except subprocess.TimeoutExpired: u.error("timeout") try: cmd_port = "nc -z -w2 {} 22".format(host) subprocess.check_output(cmd_port, shell=True, timeout=5) except subprocess.CalledProcessError: u.error("failed to bind SSH port") try: cmd_ping = "ping -c2 -w4 {}".format(host) subprocess.check_output(cmd_ping, shell=True, timeout=15) except subprocess.CalledProcessError: u.error("failed to ping host") return False return True def test_last_backup_is_recent(server: str) -> bool: """Check that the latest backup is recent enough. :param server: MediaVault hostname or IP address :type server: str :return: Wether the latest backup is too old or not :rtype: bool """ print("Checking latest backup age:") client = socket.gethostname() # set backup potential directories path # TODO: add "/backup/{}/data/" and "/backup/{}/etc/" paths = [ "/backup/{}/home/".format(client), "/backup/data/", "/backup/nas*/", "/backup/ms*/", ] # test each possible path for path in paths: # build and run commands find = "find -L {} {} {} {} {}".format( path, "-maxdepth 1", "-xtype l", "-name latest", "-exec realpath {} +" # must always be last arg ) cmd = "ssh -o StrictHostKeyChecking=no {} '{}'".format(server, find) status, out = subprocess.getstatusoutput(cmd) # TODO: check all backups found instead of only the last of the list # maybe do a `split()` and `if len(lst) > 1`… if status == 0 and out != "": # get directory name and convert to datetime last = out.strip().split("/")[-1] date = datetime.strptime(last, "%Y-%m-%d-%H%M%S") # check age if (datetime.now() - date).days > MAX_AGE: u.error("older than {} days".format(MAX_AGE)) return False u.success("less than {} days old".format(MAX_AGE)) return True # if we reach here, nothing have been found u.error("latest backup directory not found") return False def check_backup_is_incremental(path: str) -> bool: """Check that backup is incremental. :param path: Backup folder path :type param: str :return: Wether the backup is incremental or not :rtype: bool """ all_ok = True for directory in os.listdir(path): files_count = 0 folder_path = os.path.join(path, directory) if os.path.isdir(folder_path): files_count = len(os.listdir(folder_path)) if files_count == 0: u.error("folder {} is empty".format(folder_path)) os.rmdir(folder_path) all_ok = False if all_ok: u.success("no incrementation issue found") return all_ok def check_local_backup(path: str) -> bool: """Check that local backup is in a correct state. :param path: Local backup folder path :type path: str :return: Wether local backup is correct or not :rtype: bool """ backup_folder = os.path.dirname(path) print("Checking {}:".format(backup_folder)) all_ok = True latest = os.path.join(backup_folder, "latest") if os.path.exists(latest): # resolve symbolic link latest = os.path.realpath(latest) latest_date = os.path.basename(latest) date = datetime.strptime(latest_date, "%Y-%m-%d-%H%M%S") now = datetime.now() diff_seconds = (now - date).total_seconds() if diff_seconds > MAX_AGE * 24 * 3600: u.error("older than {} days".format(MAX_AGE)) all_ok = False else: u.success("less than {} days old".format(MAX_AGE)) if not check_backup_is_incremental(backup_folder): all_ok = False elif os.path.exists(os.path.join(backup_folder, "backup.inprogress")): u.warning("still running") all_ok = False else: u.error("not working") all_ok = False return all_ok def check_local_backups(paths: str) -> bool: """Run check for all local backup paths. :param paths: Comma separated list of backup paths :type paths: str :return: Wether all backups are good or not :rtype: bool """ all_ok = True folders = paths.split(",") for folder in folders: cmd = "find -L {} -maxdepth 4 -name backup.marker".format(folder) _, out = subprocess.getstatusoutput(cmd) for backup_folder in out.split("\n"): all_ok = min(check_local_backup(backup_folder), all_ok) return all_ok def main(): """Run all checks and exits with corresponding exit code.""" conf = u.load_conf() backup_server = conf.get("BACKUP_SERVER") local_backup_folders = conf.get("LOCAL_BACKUP_FOLDERS") if backup_server: if not test_ssh(backup_server): sys.exit(1) else: if not test_last_backup_is_recent(backup_server): sys.exit(1) else: sys.exit(0) elif local_backup_folders: sys.exit(not check_local_backups(local_backup_folders)) else: print("No backup_server defined in config, untestable") sys.exit(2) if __name__ == "__main__": main()