diff --git a/tests/test_fail2ban.py b/tests/test_fail2ban.py new file mode 100755 index 0000000000000000000000000000000000000000..4cc3ab292d928a145bf06f47e8b2f8e918299fac --- /dev/null +++ b/tests/test_fail2ban.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 +""" +Criticality: Low +This test check the current state of the fail2ban service. +""" + +from pathlib import Path +import sys + +# pylint: disable=E0401 +import dbus + +sys.path.append(str(Path(__file__).parents[1].resolve())) + +# pylint: disable=wrong-import-position +from envsetup import utils as u # noqa: E402 + + +def get_service_state(name: str) -> tuple: + """Get the systemd service state. + + :param name: Service name + :type name: str + :return: Active state, and running state. + :rtype: tuple + """ + + bus = dbus.SystemBus() + systemd = bus.get_object("org.freedesktop.systemd1", "/org/freedesktop/systemd1") + manager = dbus.Interface(systemd, "org.freedesktop.systemd1.Manager") + unit = manager.LoadUnit("{}.service".format(name)) + proxy = bus.get_object("org.freedesktop.systemd1", str(unit)) + + active = proxy.Get( + "org.freedesktop.systemd1.Unit", + "ActiveState", + dbus_interface="org.freedesktop.DBus.Properties", + ) + state = proxy.Get( + "org.freedesktop.systemd1.Unit", + "SubState", + dbus_interface="org.freedesktop.DBus.Properties", + ) + + return str(active), str(state) + + +def check_service_running(name: str) -> bool: + """Check that the given service is active and running. + + :param name: Service name + :type name: str + :return: Wether the service active and running or not + :rtype: bool + """ + + active, state = get_service_state(name) + + if active != "active" or state != "running": + return False + + return True + + +def get_jails() -> list: + """Get the list of active jails. + + :return: List of jails + :rtype: list + """ + + _, output = u.exec_cmd(["fail2ban-client status | grep 'Jail list'"]) + jails = output.split(":")[1].replace(" ", "").split(",") + + return jails + + +def check_jail(name: str) -> int: + _, output = u.exec_cmd("fail2ban-client status {} | grep 'Jail list'".format(name)) + print(output) + return 0 + + +def main(): + """Run all checks and exits with corresponding exit code.""" + + # init + errors = 0 + warnings = 0 + + print("Checking fail2ban state:") + if not check_service_running("fail2ban"): + u.error("fail2ban is not running") + errors += 1 + else: + u.success("fail2ban is running") + + # print("Checking fail2ban jails:") + + if errors: + sys.exit(1) + if warnings: + sys.exit(3) + else: + sys.exit(0) + + +if __name__ == "__main__": + main()