Skip to content
Snippets Groups Projects
test_fail2ban.py 3.05 KiB
Newer Older
#!/usr/bin/env python3
"""
Criticality: Low
Checks the current state of the fail2ban service.
"""

from pathlib import Path
import sys

# pylint: disable=E0401
import dbus

sys.path.append(str(Path(__file__).parents[1].resolve()))

# pylint: disable=wrong-import-position
from envsetup import utils as u  # noqa: E402


def get_service_state(name: str) -> tuple:
    """Get the systemd service state.

    :param name: Service name
    :type name: str
    :return: Active state, and running state.
    :rtype: tuple
    """

    bus = dbus.SystemBus()
    systemd = bus.get_object("org.freedesktop.systemd1", "/org/freedesktop/systemd1")
    manager = dbus.Interface(systemd, "org.freedesktop.systemd1.Manager")
    unit = manager.LoadUnit("{}.service".format(name))
    proxy = bus.get_object("org.freedesktop.systemd1", str(unit))

    active = proxy.Get(
        "org.freedesktop.systemd1.Unit",
        "ActiveState",
        dbus_interface="org.freedesktop.DBus.Properties",
    )
    state = proxy.Get(
        "org.freedesktop.systemd1.Unit",
        "SubState",
        dbus_interface="org.freedesktop.DBus.Properties",
    )

    return str(active), str(state)


def check_service_running(name: str) -> bool:
    """Check that the given service is active and running.

    :param name: Service name
    :type name: str
    :return: Wether the service active and running or not
    :rtype: bool
    """

    active, state = get_service_state(name)

    if active != "active" or state != "running":
        return False

    return True


def get_jails() -> list:
    """Get the list of active jails.

    :return: List of jails
    :rtype: list
    """

    _, output = u.exec_cmd(
        "fail2ban-client status | grep 'Jail list'", log_output=False
    )
    jails = output.split(":")[1].strip().replace(" ", "").split(",")
def check_jail_banned(name: str) -> int:
    """Check if there is currently banned hosts.

    :param name: Jail name
    :type name: str
    :return: Number of banned hosts
    :rtype: int
    """

    _, output = u.exec_cmd(
        "fail2ban-client status {} | grep 'Currently banned'".format(name),
        log_output=False,
    banned = output.split(":")[1].strip()
        return int(banned)
    return 0


def main():
    """Run all checks and exits with corresponding exit code."""

    # init
    errors = 0
    warnings = 0

    print("Checking fail2ban state:")
    if not check_service_running("fail2ban"):
        u.warning("fail2ban is not running")
        warnings += 1
        # warning exit if not running
        sys.exit(3)
    else:
        u.success("fail2ban is running")

    print("Checking fail2ban jails:")
    jails = get_jails()
    for jail in jails:
        u.info("{} jail is running".format(jail))
        banned = check_jail_banned(jail)
        if banned > 0:
            u.warning("there is {} banned host in {} jail".format(banned, jail))
            warnings += 1

    if errors:
        sys.exit(1)
        sys.exit(3)
    else:
        sys.exit(0)


if __name__ == "__main__":
    main()