Newer
Older
#!/usr/bin/env python3
# Copyright 2017, Florent Thiery
sys.path.append(str(Path(__file__).parents[1].resolve()))
# pylint: disable=wrong-import-position
from envsetup import utils as u # noqa: E402
def test_ssh(host: str) -> bool:
"""Check that MediaVault server can reached.
:param ip: MediaVault hostname or IP address
:type ip: str
:return: Wether it can connect to server or not
:rtype: bool
"""
print("Checking connection to MediaVault ({}):".format(host))
cmd = "ssh -o StrictHostKeyChecking=no -o PasswordAuthentication=no {} :".format(
host
)
subprocess.check_output(cmd, shell=True, timeout=5)
u.success("logged in successfully")
u.error("failed to login using SSH public key authentication")
return False
except subprocess.TimeoutExpired:
cmd_port = "nc -z -w2 {} 22".format(host)
subprocess.check_output(cmd_port, shell=True, timeout=5)
except subprocess.CalledProcessError:
u.error("failed to bind SSH port")
try:
cmd_ping = "ping -c2 -w4 {}".format(host)
subprocess.check_output(cmd_ping, shell=True, timeout=15)
except subprocess.CalledProcessError:
u.error("failed to ping host")
def test_last_backup_is_recent(server: str) -> bool:
"""Check that the latest backup is recent enough.
:param server: MediaVault hostname or IP address
:type server: str
:return: Wether the latest backup is too old or not
:rtype: bool
"""
print("Checking latest backup age:")
client = socket.gethostname()
# set backup potential directories path
# TODO: add "/backup/{}/data/" and "/backup/{}/etc/"
"/backup/{}/home/".format(client),
"/backup/data/",
"/backup/nas*/",
"/backup/ms*/",
]
# test each possible path
for path in paths:
# build and run commands
find = "find -L {} {} {} {} {}".format(
path,
"-xtype l",
"-name latest",
"-exec realpath {} +" # must always be last arg
)
cmd = "ssh -o StrictHostKeyChecking=no {} '{}'".format(server, find)
status, out = subprocess.getstatusoutput(cmd)
# TODO: check all backups found instead of only the last of the list
# maybe do a `split()` and `if len(lst) > 1`…
if status == 0 and out != "":
# get directory name and convert to datetime
last = out.strip().split("/")[-1]
date = datetime.strptime(last, "%Y-%m-%d-%H%M%S")
# check age
if (datetime.now() - date).days > MAX_AGE:
u.error("older than {} days".format(MAX_AGE))
return False
u.success("less than {} days old".format(MAX_AGE))
return True
# if we reach here, nothing have been found
u.error("latest backup directory not found")
return False
def check_backup_is_incremental(path: str) -> bool:
"""Check that backup is incremental.
:param path: Backup folder path
:type param: str
:return: Wether the backup is incremental or not
:rtype: bool
"""
all_ok = True
for directory in os.listdir(path):
Florent Thiéry
committed
files_count = 0
folder_path = os.path.join(path, directory)
Florent Thiéry
committed
files_count = len(os.listdir(folder_path))
if files_count == 0:
u.error("folder {} is empty".format(folder_path))
os.rmdir(folder_path)
all_ok = False
if all_ok:
u.success("no incrementation issue found")
def check_local_backup(path: str) -> bool:
"""Check that local backup is in a correct state.
:param path: Local backup folder path
:type path: str
:return: Wether local backup is correct or not
:rtype: bool
"""
backup_folder = os.path.dirname(path)
print("Checking {}:".format(backup_folder))
all_ok = True
latest = os.path.join(backup_folder, "latest")

Florent Thiery
committed
# resolve symbolic link
latest = os.path.realpath(latest)
latest_date = os.path.basename(latest)
date = datetime.strptime(latest_date, "%Y-%m-%d-%H%M%S")
diff_seconds = (now - date).total_seconds()
if diff_seconds > MAX_AGE * 24 * 3600:
u.error("older than {} days".format(MAX_AGE))
all_ok = False
u.success("less than {} days old".format(MAX_AGE))
if not check_backup_is_incremental(backup_folder):
all_ok = False
elif os.path.exists(os.path.join(backup_folder, "backup.inprogress")):
def check_local_backups(paths: str) -> bool:
"""Run check for all local backup paths.
:param paths: Comma separated list of backup paths
:type paths: str
:return: Wether all backups are good or not
:rtype: bool
"""
folders = paths.split(",")
for folder in folders:
cmd = "find -L {} -maxdepth 4 -name backup.marker".format(folder)
_, out = subprocess.getstatusoutput(cmd)
for backup_folder in out.split("\n"):
all_ok = min(check_local_backup(backup_folder), all_ok)
return all_ok
def main():
"""Run all checks and exits with corresponding exit code."""
conf = u.load_conf()
backup_server = conf.get("BACKUP_SERVER")
local_backup_folders = conf.get("LOCAL_BACKUP_FOLDERS")
if backup_server:
if not test_ssh(backup_server):
sys.exit(1)
else:
if not test_last_backup_is_recent(backup_server):

Florent Thiery
committed
sys.exit(0)
elif local_backup_folders:
sys.exit(not check_local_backups(local_backup_folders))
print("No backup_server defined in config, untestable")
if __name__ == "__main__":
main()