#!/usr/bin/env python3 # Copyright 2017, Florent Thiery """ Criticality: Normal Checks that the server backups are not older than a day. """ from datetime import datetime from pathlib import Path import os import socket import subprocess import sys sys.path.append(str(Path(__file__).parents[1].resolve())) # pylint: disable=wrong-import-position from envsetup import utils as u # noqa: E402 MAX_AGE = 2 def test_ssh(host: str) -> bool: """Check that MediaVault server can reached. :param ip: MediaVault hostname or IP address :type ip: str :return: Wether it can connect to server or not :rtype: bool """ print("Checking connection to MediaVault ({}):".format(host)) cmd = "ssh -o StrictHostKeyChecking=no -o PasswordAuthentication=no {} :".format( host ) try: subprocess.check_output(cmd, shell=True, timeout=5) u.success("logged in successfully") except subprocess.CalledProcessError: u.error("failed to login using SSH public key authentication") return False except subprocess.TimeoutExpired: try: cmd_port = "nc -z -w2 {} 22".format(host) subprocess.check_output(cmd_port, shell=True, timeout=5) u.error("failed to bind SSH port") except subprocess.CalledProcessError: cmd_ping = "ping -c2 -w4 {}".format(host) subprocess.check_output(cmd_ping, shell=True, timeout=15) u.error("failed to ping host") return False return True def test_last_backup_is_recent(server: str) -> bool: """Check that the latest backup is recent enough. :param server: MediaVault hostname or IP address :type server: str :return: Wether the latest backup is too old or not :rtype: bool """ print("Checking latest backup age:") client = socket.gethostname() path = "/backup/{}/home/latest".format(client) cmd = "ssh -o StrictHostKeyChecking=no {} ls -l {} | grep latest".format( server, path ) status, out = subprocess.getstatusoutput(cmd) if status == 0: date = out.strip().split(" ")[-1] pdate = datetime.strptime(date, "%Y-%m-%d-%H%M%S") if (datetime.now() - pdate).days > MAX_AGE: u.error("older than {} days".format(MAX_AGE)) return False u.success("less than {} days old".format(MAX_AGE)) return True out = out or "no output" u.error("SSH access not working (code: {}): {}".format(status, out)) return False def check_backup_is_incremental(path: str) -> bool: """Check that backup is incremental. :param path: Backup folder path :type param: str :return: Wether the backup is incremental or not :rtype: bool """ all_ok = True for directory in os.listdir(path): files_count = 0 folder_path = os.path.join(path, directory) if os.path.isdir(folder_path): files_count = len(os.listdir(folder_path)) if files_count == 0: u.error("folder {} is empty".format(folder_path)) os.rmdir(folder_path) all_ok = False if all_ok: u.success("no incrementation issue found") return all_ok def check_local_backup(path: str) -> bool: """Check that local backup is in a correct state. :param path: Local backup folder path :type path: str :return: Wether local backup is correct or not :rtype: bool """ backup_folder = os.path.dirname(path) print("Checking {}:".format(backup_folder)) all_ok = True latest = os.path.join(backup_folder, "latest") if os.path.exists(latest): # resolve symbolic link latest = os.path.realpath(latest) latest_date = os.path.basename(latest) date = datetime.strptime(latest_date, "%Y-%m-%d-%H%M%S") now = datetime.now() diff_seconds = (now - date).total_seconds() if diff_seconds > MAX_AGE * 24 * 3600: u.error("older than {} days".format(MAX_AGE)) all_ok = False else: u.success("less than {} days old".format(MAX_AGE)) if not check_backup_is_incremental(backup_folder): all_ok = False elif os.path.exists(os.path.join(backup_folder, "backup.inprogress")): u.warning("still running") all_ok = False else: u.error("not working") all_ok = False return all_ok def check_local_backups(paths: str) -> bool: """Run check for all local backup paths. :param paths: Comma separated list of backup paths :type paths: str :return: Wether all backups are good or not :rtype: bool """ all_ok = True folders = paths.split(",") for folder in folders: cmd = "find {} -maxdepth 4 -name backup.marker".format(folder) _, out = subprocess.getstatusoutput(cmd) for backup_folder in out.split("\n"): all_ok = min(check_local_backup(backup_folder), all_ok) return all_ok def main(): """Run all checks and exits with corresponding exit code.""" conf = u.load_conf() backup_server = conf.get("BACKUP_SERVER") local_backup_folders = conf.get("LOCAL_BACKUP_FOLDERS") if backup_server: if not test_ssh(backup_server): sys.exit(1) else: if not test_last_backup_is_recent(backup_server): sys.exit(1) else: sys.exit(0) elif local_backup_folders: sys.exit(not check_local_backups(local_backup_folders)) else: print("No backup_server defined in config, untestable") sys.exit(2) if __name__ == "__main__": main()