#!/usr/bin/env python3 """ Criticality: Low Checks the current state of the fail2ban service. """ from pathlib import Path import sys # pylint: disable=E0401 import dbus sys.path.append(str(Path(__file__).parents[1].resolve())) # pylint: disable=wrong-import-position from envsetup import utils as u # noqa: E402 def get_service_state(name: str) -> tuple: """Get the systemd service state. :param name: Service name :type name: str :return: Active state, and running state. :rtype: tuple """ bus = dbus.SystemBus() systemd = bus.get_object("org.freedesktop.systemd1", "/org/freedesktop/systemd1") manager = dbus.Interface(systemd, "org.freedesktop.systemd1.Manager") unit = manager.LoadUnit("{}.service".format(name)) proxy = bus.get_object("org.freedesktop.systemd1", str(unit)) active = proxy.Get( "org.freedesktop.systemd1.Unit", "ActiveState", dbus_interface="org.freedesktop.DBus.Properties", ) state = proxy.Get( "org.freedesktop.systemd1.Unit", "SubState", dbus_interface="org.freedesktop.DBus.Properties", ) return str(active), str(state) def check_service_running(name: str) -> bool: """Check that the given service is active and running. :param name: Service name :type name: str :return: Wether the service active and running or not :rtype: bool """ active, state = get_service_state(name) if active != "active" or state != "running": return False return True def get_jails() -> list: """Get the list of active jails. :return: List of jails :rtype: list """ _, output = u.exec_cmd( "fail2ban-client status | grep 'Jail list'", log_output=False ) jails = output.split(":")[1].strip().replace(" ", "").split(",") return jails def check_jail_banned(name: str) -> int: """Check if there is currently banned hosts. :param name: Jail name :type name: str :return: Number of banned hosts :rtype: int """ _, output = u.exec_cmd( "fail2ban-client status {} | grep 'Currently banned'".format(name), log_output=False, ) banned = output.split(":")[1].strip() if banned: return int(banned) return 0 def main(): """Run all checks and exits with corresponding exit code.""" # init errors = 0 warnings = 0 print("Checking fail2ban state:") if not check_service_running("fail2ban"): u.warning("fail2ban is not running") warnings += 1 # warning exit if not running sys.exit(3) else: u.success("fail2ban is running") print("Checking fail2ban jails:") jails = get_jails() for jail in jails: u.info("{} jail is running".format(jail)) banned = check_jail_banned(jail) if banned > 0: u.warning("there is {} banned host in {} jail".format(banned, jail)) warnings += 1 if errors: sys.exit(1) elif warnings: sys.exit(3) else: sys.exit(0) if __name__ == "__main__": main()