Skip to content
Snippets Groups Projects
test_backup.py 6.39 KiB
Newer Older
#!/usr/bin/env python3
# Copyright 2017, Florent Thiery
Stéphane Diemer's avatar
Stéphane Diemer committed
Criticality: Normal
Stéphane Diemer's avatar
Stéphane Diemer committed
Checks that the server backups are not older than a day.
from datetime import datetime
from pathlib import Path
import sys
sys.path.append(str(Path(__file__).parents[1].resolve()))
# pylint: disable=wrong-import-position
from envsetup import utils as u  # noqa: E402
Florent Thiery's avatar
Florent Thiery committed

def test_ssh(host: str) -> bool:
    """Check that MediaVault server can reached.
    :param ip: MediaVault hostname or IP address
    :type ip: str
    :return: Wether it can connect to server or not
    :rtype: bool
    """
    print("Checking connection to MediaVault ({}):".format(host))
    cmd = "ssh -o StrictHostKeyChecking=no -o PasswordAuthentication=no {} :".format(
        host
    )
Florent Thiery's avatar
Florent Thiery committed
    try:
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        subprocess.check_output(cmd, shell=True, timeout=5)
        u.success("logged in successfully")
Florent Thiery's avatar
Florent Thiery committed
    except subprocess.CalledProcessError:
        u.error("failed to login using SSH public key authentication")
Nicolas KAROLAK's avatar
Nicolas KAROLAK committed
        return False
    except subprocess.TimeoutExpired:
        u.error("timeout")
            cmd_port = "nc -z -w2 {} 22".format(host)
            subprocess.check_output(cmd_port, shell=True, timeout=5)
        except subprocess.CalledProcessError:
            u.error("failed to bind SSH port")
            try:
                cmd_ping = "ping -c2 -w4 {}".format(host)
                subprocess.check_output(cmd_ping, shell=True, timeout=15)
            except subprocess.CalledProcessError:
                u.error("failed to ping host")
Florent Thiery's avatar
Florent Thiery committed
        return False
Florent Thiery's avatar
Florent Thiery committed
    return True
Stéphane Diemer's avatar
Stéphane Diemer committed

def test_last_backup_is_recent(server: str) -> bool:
    """Check that the latest backup is recent enough.

    :param server: MediaVault hostname or IP address
    :type server: str
    :return: Wether the latest backup is too old or not
    :rtype: bool
    """

    print("Checking latest backup age:")

    client = socket.gethostname()

    # set backup potential directories path
    # TODO: add "/backup/{}/data/" and "/backup/{}/etc/"
        "/backup/{}/home/".format(client),
        "/backup/data/",
        "/backup/nas*/",
        "/backup/ms*/",
    ]

    # test each possible path
    for path in paths:
        # build and run commands
        find = "find -L {} {} {} {} {}".format(
            path,
            "-maxdepth 1",
            "-xtype l",
            "-name latest",
            "-exec realpath {} +"  # must always be last arg
        )
        cmd = "ssh -o StrictHostKeyChecking=no {} '{}'".format(server, find)
        status, out = subprocess.getstatusoutput(cmd)
        # TODO: check all backups found instead of only the last of the list
        #       maybe do a `split()` and `if len(lst) > 1`…
        if status == 0 and out != "":
            # get directory name and convert to datetime
            last = out.strip().split("/")[-1]
            date = datetime.strptime(last, "%Y-%m-%d-%H%M%S")
            # check age
            if (datetime.now() - date).days > MAX_AGE:
                u.error("older than {} days".format(MAX_AGE))
                return False
            u.success("less than {} days old".format(MAX_AGE))
            return True

    # if we reach here, nothing have been found
    u.error("latest backup directory not found")

    return False


def check_backup_is_incremental(path: str) -> bool:
    """Check that backup is incremental.

    :param path: Backup folder path
    :type param: str
    :return: Wether the backup is incremental or not
    :rtype: bool
    """
    all_ok = True
    for directory in os.listdir(path):
        folder_path = os.path.join(path, directory)
Florent Thiery's avatar
Florent Thiery committed
        if os.path.isdir(folder_path):
            files_count = len(os.listdir(folder_path))
            if files_count == 0:
                u.error("folder {} is empty".format(folder_path))
                all_ok = False
    if all_ok:
        u.success("no incrementation issue found")
def check_local_backup(path: str) -> bool:
    """Check that local backup is in a correct state.

    :param path: Local backup folder path
    :type path: str
    :return: Wether local backup is correct or not
    :rtype: bool
    """

    backup_folder = os.path.dirname(path)
    print("Checking {}:".format(backup_folder))

    all_ok = True
    latest = os.path.join(backup_folder, "latest")
    if os.path.exists(latest):
        # resolve symbolic link
        latest = os.path.realpath(latest)
        latest_date = os.path.basename(latest)
        date = datetime.strptime(latest_date, "%Y-%m-%d-%H%M%S")
        now = datetime.now()
        diff_seconds = (now - date).total_seconds()
        if diff_seconds > MAX_AGE * 24 * 3600:
            u.error("older than {} days".format(MAX_AGE))
            u.success("less than {} days old".format(MAX_AGE))
        if not check_backup_is_incremental(backup_folder):
            all_ok = False
    elif os.path.exists(os.path.join(backup_folder, "backup.inprogress")):
        u.warning("still running")
        all_ok = False
    else:
        u.error("not working")
        all_ok = False
    return all_ok
def check_local_backups(paths: str) -> bool:
    """Run check for all local backup paths.

    :param paths: Comma separated list of backup paths
    :type paths: str
    :return: Wether all backups are good or not
    :rtype: bool
    """

    folders = paths.split(",")
    for folder in folders:
        cmd = "find -L {} -maxdepth 4 -name backup.marker".format(folder)
        _, out = subprocess.getstatusoutput(cmd)
        for backup_folder in out.split("\n"):
            all_ok = min(check_local_backup(backup_folder), all_ok)

def main():
    """Run all checks and exits with corresponding exit code."""

    conf = u.load_conf()
    backup_server = conf.get("BACKUP_SERVER")
    local_backup_folders = conf.get("LOCAL_BACKUP_FOLDERS")
    if backup_server:
        if not test_ssh(backup_server):
            if not test_last_backup_is_recent(backup_server):
                sys.exit(1)
            else:
    elif local_backup_folders:
        sys.exit(not check_local_backups(local_backup_folders))
        print("No backup_server defined in config, untestable")


if __name__ == "__main__":
    main()