Skip to content
Snippets Groups Projects
test_backup.py 5.71 KiB
Newer Older
#!/usr/bin/env python3
# Copyright 2017, Florent Thiery
Stéphane Diemer's avatar
Stéphane Diemer committed
Criticality: Normal
Stéphane Diemer's avatar
Stéphane Diemer committed
Checks that the server backups are not older than a day.
from datetime import datetime
from pathlib import Path
import sys
sys.path.append(str(Path(__file__).parents[1].resolve()))
# pylint: disable=wrong-import-position
from envsetup import utils as u  # noqa: E402
Florent Thiery's avatar
Florent Thiery committed

def test_ssh(host: str) -> bool:
    """Check that MediaVault server can reached.
    :param ip: MediaVault hostname or IP address
    :type ip: str
    :return: Wether it can connect to server or not
    :rtype: bool
    """
    print("Checking connection to MediaVault ({}):".format(host))
    cmd = "ssh -o StrictHostKeyChecking=no -o PasswordAuthentication=no {} :".format(
        host
    )
Florent Thiery's avatar
Florent Thiery committed
    try:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        subprocess.check_output(cmd, shell=True, timeout=5)
        u.success("Logged in successfully")
Florent Thiery's avatar
Florent Thiery committed
    except subprocess.CalledProcessError:
        u.error("Failed to login using SSH public key authentication")
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        return False
    except subprocess.TimeoutExpired:
            cmd_port = "nc -z -w2 {} 22".format(host)
            subprocess.check_output(cmd_port, shell=True, timeout=5)
            u.error("Failed to bind SSH port")
        except subprocess.CalledProcessError:
            cmd_ping = "ping -c2 -w4 {}".format(host)
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
            subprocess.check_output(cmd_ping, shell=True, timeout=15)
            u.error("Failed to ping host")
Florent Thiery's avatar
Florent Thiery committed
        return False
Florent Thiery's avatar
Florent Thiery committed
    return True
Stéphane Diemer's avatar
Stéphane Diemer committed

def test_last_backup_is_recent(server: str) -> bool:
    """Check that the latest backup is recent enough.

    :param server: MediaVault hostname or IP address
    :type server: str
    :return: Wether the latest backup is too old or not
    :rtype: bool
    """

    print("Checking latest backup age:")

    client = socket.gethostname()
    path = "/backup/{}/home/latest".format(client)
    cmd = "ssh -o StrictHostKeyChecking=no {} ls -l {} | grep latest".format(
        server, path
    )
Florent Thiery's avatar
Florent Thiery committed
    status, out = subprocess.getstatusoutput(cmd)
        date = out.strip().split(" ")[-1]
        pdate = datetime.strptime(date, "%Y-%m-%d-%H%M%S")
        if (datetime.now() - pdate).days > MAX_AGE:
            u.error("Backup is older than {} days".format(MAX_AGE))
        u.success("Backup is less than {} days old".format(MAX_AGE))
        return True

    out = out or "No output."
    u.error("SSH access is not working (code: {}): {}".format(status, out))

    return False


def check_backup_is_incremental(path: str) -> bool:
    """Check that backup is incremental.

    :param path: Backup folder path
    :type param: str
    :return: Wether the backup is incremental or not
    :rtype: bool
    """
    print("Checking that backup is incremental:")
Stéphane Diemer's avatar
Stéphane Diemer committed

    for directory in dirs:
        folder_path = os.path.join(path, directory)
Florent Thiery's avatar
Florent Thiery committed
        if os.path.isdir(folder_path):
            files_count = len(os.listdir(folder_path))
            if files_count == 0:
                u.error("Folder {} is empty".format(folder_path))
                return False
    u.success("Folder {} is not empty".format(folder_path))
def check_local_backup(path: str) -> bool:
    """Check that local backup is in a correct state.

    :param path: Local backup folder path
    :type path: str
    :return: Wether local backup is correct or not
    :rtype: bool
    """

    backup_folder = os.path.dirname(path)
    print("Checking %s" % backup_folder)

    all_ok = True
    latest = os.path.join(backup_folder, "latest")
    if os.path.exists(latest):
        # resolve symbolic link
        latest = os.path.realpath(latest)
        latest_date = os.path.basename(latest)
        date = datetime.strptime(latest_date, "%Y-%m-%d-%H%M%S")
        now = datetime.now()
        diff_seconds = (now - date).total_seconds()
        if diff_seconds > MAX_AGE * 24 * 3600:
            u.error("Backup {} is older than {} days".format(backup_folder, MAX_AGE))
            u.success("Backup {} is fine".format(backup_folder))
        if not check_backup_is_incremental(backup_folder):
            all_ok = False
    elif os.path.exists(os.path.join(backup_folder, "backup.inprogress")):
        u.error("Initial backup %s still running" % backup_folder)
        all_ok = False
    else:
        u.error("Backup {} is not working".format(latest))
        all_ok = False
    return all_ok
def check_local_backups(paths: str) -> bool:
    """Run check for all local backup paths.

    :param paths: Comma separated list of backup paths
    :type paths: str
    :return: Wether all backups are good or not
    :rtype: bool
    """

    folders = paths.split(",")
    for folder in folders:
        cmd = "find {} -maxdepth 4 -name backup.marker".format(folder)
        _, out = subprocess.getstatusoutput(cmd)
        for backup_folder in out.split("\n"):
            all_ok = min(check_local_backup(backup_folder), all_ok)

def main():
    """Run all checks and exits with corresponding exit code."""

    conf = u.load_conf()
    backup_server = conf.get("BACKUP_SERVER")
    local_backup_folders = conf.get("LOCAL_BACKUP_FOLDERS")
    if backup_server:
        if not test_ssh(backup_server):
            if not test_last_backup_is_recent(backup_server):
                sys.exit(1)
            else:
    elif local_backup_folders:
        sys.exit(not check_local_backups(local_backup_folders))
        print("No backup_server defined in config, untestable")


if __name__ == "__main__":
    main()