- jail_service.restart(): replace invalid ["restart"] socket command with ["stop"], matching fail2ban transmitter protocol. The daemon is now stopped via socket; the caller starts it via subprocess. - config_file_service: expose _start_daemon and _wait_for_fail2ban as public start_daemon / wait_for_fail2ban functions. - restart_fail2ban router: orchestrate stop (socket) → start (subprocess) → probe (socket). Returns 204 on success, 503 when fail2ban does not come back within 10 s. Catches JailOperationError → 409. - reload_fail2ban router: add JailOperationError catch → 409 Conflict, consistent with other jail control endpoints. - Tests: add TestJailControls.test_restart_* (3 cases), TestReloadFail2ban 502/409 cases, TestRestartFail2ban (5 cases), TestRollbackJail (6 integration tests verifying file-write, subprocess invocation, socket- probe truthiness, active_jails count, and offline-at-call-time).
1357 lines
47 KiB
Python
1357 lines
47 KiB
Python
"""Jail management service.
|
|
|
|
Provides methods to list, inspect, and control fail2ban jails via the
|
|
Unix domain socket. All socket I/O is performed through the async
|
|
:class:`~app.utils.fail2ban_client.Fail2BanClient` wrapper.
|
|
|
|
Architecture note: this module is a pure service — it contains **no**
|
|
HTTP/FastAPI concerns. All results are returned as Pydantic models so
|
|
routers can serialise them directly.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import contextlib
|
|
import ipaddress
|
|
from typing import Any
|
|
|
|
import structlog
|
|
|
|
from app.models.ban import ActiveBan, ActiveBanListResponse, JailBannedIpsResponse
|
|
from app.models.config import BantimeEscalation
|
|
from app.models.jail import (
|
|
Jail,
|
|
JailDetailResponse,
|
|
JailListResponse,
|
|
JailStatus,
|
|
JailSummary,
|
|
)
|
|
from app.utils.fail2ban_client import Fail2BanClient, Fail2BanConnectionError
|
|
|
|
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_SOCKET_TIMEOUT: float = 10.0
|
|
|
|
# Guard against concurrent reload_all calls. Overlapping ``reload --all``
|
|
# commands sent to fail2ban's socket produce undefined behaviour and may cause
|
|
# jails to be permanently removed from the daemon. Serialising them here
|
|
# ensures only one reload stream is in-flight at a time.
|
|
_reload_all_lock: asyncio.Lock = asyncio.Lock()
|
|
|
|
# Capability detection for optional fail2ban transmitter commands (backend, idle).
|
|
# These commands are not supported in all fail2ban versions. Caching the result
|
|
# avoids sending unsupported commands every polling cycle and spamming the
|
|
# fail2ban log with "Invalid command" errors.
|
|
_backend_cmd_supported: bool | None = None
|
|
_backend_cmd_lock: asyncio.Lock = asyncio.Lock()
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Custom exceptions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class JailNotFoundError(Exception):
|
|
"""Raised when a requested jail name does not exist in fail2ban."""
|
|
|
|
def __init__(self, name: str) -> None:
|
|
"""Initialise with the jail name that was not found.
|
|
|
|
Args:
|
|
name: The jail name that could not be located.
|
|
"""
|
|
self.name: str = name
|
|
super().__init__(f"Jail not found: {name!r}")
|
|
|
|
|
|
class JailOperationError(Exception):
|
|
"""Raised when a jail control command fails for a non-auth reason."""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _ok(response: Any) -> Any:
|
|
"""Extract the payload from a fail2ban ``(return_code, data)`` response.
|
|
|
|
Args:
|
|
response: Raw value returned by :meth:`~Fail2BanClient.send`.
|
|
|
|
Returns:
|
|
The payload ``data`` portion of the response.
|
|
|
|
Raises:
|
|
ValueError: If the response indicates an error (return code ≠ 0).
|
|
"""
|
|
try:
|
|
code, data = response
|
|
except (TypeError, ValueError) as exc:
|
|
raise ValueError(f"Unexpected fail2ban response shape: {response!r}") from exc
|
|
|
|
if code != 0:
|
|
raise ValueError(f"fail2ban returned error code {code}: {data!r}")
|
|
|
|
return data
|
|
|
|
|
|
def _to_dict(pairs: Any) -> dict[str, Any]:
|
|
"""Convert a list of ``(key, value)`` pairs to a plain dict.
|
|
|
|
Args:
|
|
pairs: A list of ``(key, value)`` pairs (or any iterable thereof).
|
|
|
|
Returns:
|
|
A :class:`dict` with the keys and values from *pairs*.
|
|
"""
|
|
if not isinstance(pairs, (list, tuple)):
|
|
return {}
|
|
result: dict[str, Any] = {}
|
|
for item in pairs:
|
|
try:
|
|
k, v = item
|
|
result[str(k)] = v
|
|
except (TypeError, ValueError):
|
|
pass
|
|
return result
|
|
|
|
|
|
def _ensure_list(value: Any) -> list[str]:
|
|
"""Coerce a fail2ban response value to a list of strings.
|
|
|
|
Some fail2ban ``get`` responses return ``None`` or a single string
|
|
when there is only one entry. This helper normalises the result.
|
|
|
|
Args:
|
|
value: The raw value from a ``get`` command response.
|
|
|
|
Returns:
|
|
A list of strings, possibly empty.
|
|
"""
|
|
if value is None:
|
|
return []
|
|
if isinstance(value, str):
|
|
return [value] if value.strip() else []
|
|
if isinstance(value, (list, tuple)):
|
|
return [str(v) for v in value if v is not None]
|
|
return [str(value)]
|
|
|
|
|
|
def _is_not_found_error(exc: Exception) -> bool:
|
|
"""Return ``True`` if *exc* indicates a jail does not exist.
|
|
|
|
Checks both space-separated (``"unknown jail"``) and concatenated
|
|
(``"unknownjail"``) forms because fail2ban serialises
|
|
``UnknownJailException`` without a space when pickled.
|
|
|
|
Args:
|
|
exc: The exception to inspect.
|
|
|
|
Returns:
|
|
``True`` when the exception message signals an unknown jail.
|
|
"""
|
|
msg = str(exc).lower()
|
|
return any(
|
|
phrase in msg
|
|
for phrase in (
|
|
"unknown jail",
|
|
"unknownjail", # covers UnknownJailException serialised by fail2ban
|
|
"no jail",
|
|
"does not exist",
|
|
"not found",
|
|
)
|
|
)
|
|
|
|
|
|
async def _safe_get(
|
|
client: Fail2BanClient,
|
|
command: list[Any],
|
|
default: Any = None,
|
|
) -> Any:
|
|
"""Send a ``get`` command and return ``default`` on error.
|
|
|
|
Errors during optional detail queries (logpath, regex, etc.) should
|
|
not abort the whole request — this helper swallows them gracefully.
|
|
|
|
Args:
|
|
client: The :class:`~app.utils.fail2ban_client.Fail2BanClient` to use.
|
|
command: The command list to send.
|
|
default: Value to return when the command fails.
|
|
|
|
Returns:
|
|
The response payload, or *default* on any error.
|
|
"""
|
|
try:
|
|
return _ok(await client.send(command))
|
|
except (ValueError, TypeError, Exception):
|
|
return default
|
|
|
|
|
|
async def _check_backend_cmd_supported(
|
|
client: Fail2BanClient,
|
|
jail_name: str,
|
|
) -> bool:
|
|
"""Detect whether the fail2ban daemon supports optional ``get ... backend`` command.
|
|
|
|
Some fail2ban versions (e.g. LinuxServer.io container) do not implement the
|
|
optional ``get <jail> backend`` and ``get <jail> idle`` transmitter sub-commands.
|
|
This helper probes the daemon once and caches the result to avoid repeated
|
|
"Invalid command" errors in the fail2ban log.
|
|
|
|
Uses double-check locking to minimize lock contention in concurrent polls.
|
|
|
|
Args:
|
|
client: The :class:`~app.utils.fail2ban_client.Fail2BanClient` to use.
|
|
jail_name: Name of any jail to use for the probe command.
|
|
|
|
Returns:
|
|
``True`` if the command is supported, ``False`` otherwise.
|
|
Once determined, the result is cached and reused for all jails.
|
|
"""
|
|
global _backend_cmd_supported
|
|
|
|
# Fast path: return cached result if already determined.
|
|
if _backend_cmd_supported is not None:
|
|
return _backend_cmd_supported
|
|
|
|
# Slow path: acquire lock and probe the command once.
|
|
async with _backend_cmd_lock:
|
|
# Double-check idiom: another coroutine may have probed while we waited.
|
|
if _backend_cmd_supported is not None:
|
|
return _backend_cmd_supported
|
|
|
|
# Probe: send the command and catch any exception.
|
|
try:
|
|
_ok(await client.send(["get", jail_name, "backend"]))
|
|
_backend_cmd_supported = True
|
|
log.debug("backend_cmd_supported_detected")
|
|
except Exception:
|
|
_backend_cmd_supported = False
|
|
log.debug("backend_cmd_unsupported_detected")
|
|
|
|
return _backend_cmd_supported
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API — Jail listing & detail
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def list_jails(socket_path: str) -> JailListResponse:
|
|
"""Return a summary list of all active fail2ban jails.
|
|
|
|
Queries the daemon for the global jail list and then fetches status
|
|
and key configuration for each jail in parallel.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
|
|
Returns:
|
|
:class:`~app.models.jail.JailListResponse` with all active jails.
|
|
|
|
Raises:
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
|
|
# 1. Fetch global status to get jail names.
|
|
global_status = _to_dict(_ok(await client.send(["status"])))
|
|
jail_list_raw: str = str(global_status.get("Jail list", "") or "").strip()
|
|
jail_names: list[str] = (
|
|
[j.strip() for j in jail_list_raw.split(",") if j.strip()]
|
|
if jail_list_raw
|
|
else []
|
|
)
|
|
|
|
log.info("jail_list_fetched", count=len(jail_names))
|
|
|
|
if not jail_names:
|
|
return JailListResponse(jails=[], total=0)
|
|
|
|
# 2. Fetch summary data for every jail in parallel.
|
|
summaries: list[JailSummary] = await asyncio.gather(
|
|
*[_fetch_jail_summary(client, name) for name in jail_names],
|
|
return_exceptions=False,
|
|
)
|
|
|
|
return JailListResponse(jails=list(summaries), total=len(summaries))
|
|
|
|
|
|
async def _fetch_jail_summary(
|
|
client: Fail2BanClient,
|
|
name: str,
|
|
) -> JailSummary:
|
|
"""Fetch and build a :class:`~app.models.jail.JailSummary` for one jail.
|
|
|
|
Sends the ``status``, ``get ... bantime``, ``findtime``, ``maxretry``,
|
|
``backend``, and ``idle`` commands in parallel (if supported).
|
|
|
|
The ``backend`` and ``idle`` commands are optional and not supported in
|
|
all fail2ban versions. If not supported, this function will not send them
|
|
to avoid spamming the fail2ban log with "Invalid command" errors.
|
|
|
|
Args:
|
|
client: Shared :class:`~app.utils.fail2ban_client.Fail2BanClient`.
|
|
name: Jail name.
|
|
|
|
Returns:
|
|
A :class:`~app.models.jail.JailSummary` populated from the responses.
|
|
"""
|
|
# Check whether optional backend/idle commands are supported.
|
|
# This probe happens once per session and is cached to avoid repeated
|
|
# "Invalid command" errors in the fail2ban log.
|
|
backend_cmd_is_supported = await _check_backend_cmd_supported(client, name)
|
|
|
|
# Build the gather list based on command support.
|
|
gather_list: list[Any] = [
|
|
client.send(["status", name, "short"]),
|
|
client.send(["get", name, "bantime"]),
|
|
client.send(["get", name, "findtime"]),
|
|
client.send(["get", name, "maxretry"]),
|
|
]
|
|
|
|
if backend_cmd_is_supported:
|
|
# Commands are supported; send them for real values.
|
|
gather_list.extend([
|
|
client.send(["get", name, "backend"]),
|
|
client.send(["get", name, "idle"]),
|
|
])
|
|
uses_backend_backend_commands = True
|
|
else:
|
|
# Commands not supported; return default values without sending.
|
|
async def _return_default(value: Any) -> tuple[int, Any]:
|
|
return (0, value)
|
|
|
|
gather_list.extend([
|
|
_return_default("polling"), # backend default
|
|
_return_default(False), # idle default
|
|
])
|
|
uses_backend_backend_commands = False
|
|
|
|
_r = await asyncio.gather(*gather_list, return_exceptions=True)
|
|
status_raw: Any = _r[0]
|
|
bantime_raw: Any = _r[1]
|
|
findtime_raw: Any = _r[2]
|
|
maxretry_raw: Any = _r[3]
|
|
backend_raw: Any = _r[4]
|
|
idle_raw: Any = _r[5]
|
|
|
|
# Parse jail status (filter + actions).
|
|
jail_status: JailStatus | None = None
|
|
if not isinstance(status_raw, Exception):
|
|
try:
|
|
raw = _to_dict(_ok(status_raw))
|
|
filter_stats = _to_dict(raw.get("Filter") or [])
|
|
action_stats = _to_dict(raw.get("Actions") or [])
|
|
jail_status = JailStatus(
|
|
currently_banned=int(action_stats.get("Currently banned", 0) or 0),
|
|
total_banned=int(action_stats.get("Total banned", 0) or 0),
|
|
currently_failed=int(filter_stats.get("Currently failed", 0) or 0),
|
|
total_failed=int(filter_stats.get("Total failed", 0) or 0),
|
|
)
|
|
except (ValueError, TypeError) as exc:
|
|
log.warning("jail_status_parse_error", jail=name, error=str(exc))
|
|
|
|
def _safe_int(raw: Any, fallback: int) -> int:
|
|
if isinstance(raw, Exception):
|
|
return fallback
|
|
try:
|
|
return int(_ok(raw))
|
|
except (ValueError, TypeError):
|
|
return fallback
|
|
|
|
def _safe_str(raw: Any, fallback: str) -> str:
|
|
if isinstance(raw, Exception):
|
|
return fallback
|
|
try:
|
|
return str(_ok(raw))
|
|
except (ValueError, TypeError):
|
|
return fallback
|
|
|
|
def _safe_bool(raw: Any, fallback: bool = False) -> bool:
|
|
if isinstance(raw, Exception):
|
|
return fallback
|
|
try:
|
|
return bool(_ok(raw))
|
|
except (ValueError, TypeError):
|
|
return fallback
|
|
|
|
return JailSummary(
|
|
name=name,
|
|
enabled=True,
|
|
running=True,
|
|
idle=_safe_bool(idle_raw),
|
|
backend=_safe_str(backend_raw, "polling"),
|
|
find_time=_safe_int(findtime_raw, 600),
|
|
ban_time=_safe_int(bantime_raw, 600),
|
|
max_retry=_safe_int(maxretry_raw, 5),
|
|
status=jail_status,
|
|
)
|
|
|
|
|
|
async def get_jail(socket_path: str, name: str) -> JailDetailResponse:
|
|
"""Return full detail for a single fail2ban jail.
|
|
|
|
Sends multiple ``get`` and ``status`` commands in parallel to build
|
|
the complete jail snapshot.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
name: Jail name.
|
|
|
|
Returns:
|
|
:class:`~app.models.jail.JailDetailResponse` with the full jail.
|
|
|
|
Raises:
|
|
JailNotFoundError: If *name* is not a known jail.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
|
|
# Verify the jail exists by sending a status command first.
|
|
try:
|
|
status_raw = _ok(await client.send(["status", name, "short"]))
|
|
except ValueError as exc:
|
|
if _is_not_found_error(exc):
|
|
raise JailNotFoundError(name) from exc
|
|
raise
|
|
|
|
raw = _to_dict(status_raw)
|
|
filter_stats = _to_dict(raw.get("Filter") or [])
|
|
action_stats = _to_dict(raw.get("Actions") or [])
|
|
|
|
jail_status = JailStatus(
|
|
currently_banned=int(action_stats.get("Currently banned", 0) or 0),
|
|
total_banned=int(action_stats.get("Total banned", 0) or 0),
|
|
currently_failed=int(filter_stats.get("Currently failed", 0) or 0),
|
|
total_failed=int(filter_stats.get("Total failed", 0) or 0),
|
|
)
|
|
|
|
# Fetch all detail fields in parallel.
|
|
(
|
|
logpath_raw,
|
|
failregex_raw,
|
|
ignoreregex_raw,
|
|
ignoreip_raw,
|
|
datepattern_raw,
|
|
logencoding_raw,
|
|
bantime_raw,
|
|
findtime_raw,
|
|
maxretry_raw,
|
|
backend_raw,
|
|
idle_raw,
|
|
actions_raw,
|
|
bt_increment_raw,
|
|
bt_factor_raw,
|
|
bt_formula_raw,
|
|
bt_multipliers_raw,
|
|
bt_maxtime_raw,
|
|
bt_rndtime_raw,
|
|
bt_overalljails_raw,
|
|
) = await asyncio.gather(
|
|
_safe_get(client, ["get", name, "logpath"], []),
|
|
_safe_get(client, ["get", name, "failregex"], []),
|
|
_safe_get(client, ["get", name, "ignoreregex"], []),
|
|
_safe_get(client, ["get", name, "ignoreip"], []),
|
|
_safe_get(client, ["get", name, "datepattern"], None),
|
|
_safe_get(client, ["get", name, "logencoding"], "UTF-8"),
|
|
_safe_get(client, ["get", name, "bantime"], 600),
|
|
_safe_get(client, ["get", name, "findtime"], 600),
|
|
_safe_get(client, ["get", name, "maxretry"], 5),
|
|
_safe_get(client, ["get", name, "backend"], "polling"),
|
|
_safe_get(client, ["get", name, "idle"], False),
|
|
_safe_get(client, ["get", name, "actions"], []),
|
|
_safe_get(client, ["get", name, "bantime.increment"], False),
|
|
_safe_get(client, ["get", name, "bantime.factor"], None),
|
|
_safe_get(client, ["get", name, "bantime.formula"], None),
|
|
_safe_get(client, ["get", name, "bantime.multipliers"], None),
|
|
_safe_get(client, ["get", name, "bantime.maxtime"], None),
|
|
_safe_get(client, ["get", name, "bantime.rndtime"], None),
|
|
_safe_get(client, ["get", name, "bantime.overalljails"], False),
|
|
)
|
|
|
|
bt_increment: bool = bool(bt_increment_raw)
|
|
bantime_escalation = BantimeEscalation(
|
|
increment=bt_increment,
|
|
factor=float(bt_factor_raw) if bt_factor_raw is not None else None,
|
|
formula=str(bt_formula_raw) if bt_formula_raw else None,
|
|
multipliers=str(bt_multipliers_raw) if bt_multipliers_raw else None,
|
|
max_time=int(bt_maxtime_raw) if bt_maxtime_raw is not None else None,
|
|
rnd_time=int(bt_rndtime_raw) if bt_rndtime_raw is not None else None,
|
|
overall_jails=bool(bt_overalljails_raw),
|
|
)
|
|
|
|
jail = Jail(
|
|
name=name,
|
|
enabled=True,
|
|
running=True,
|
|
idle=bool(idle_raw),
|
|
backend=str(backend_raw or "polling"),
|
|
log_paths=_ensure_list(logpath_raw),
|
|
fail_regex=_ensure_list(failregex_raw),
|
|
ignore_regex=_ensure_list(ignoreregex_raw),
|
|
ignore_ips=_ensure_list(ignoreip_raw),
|
|
date_pattern=str(datepattern_raw) if datepattern_raw else None,
|
|
log_encoding=str(logencoding_raw or "UTF-8"),
|
|
find_time=int(findtime_raw or 600),
|
|
ban_time=int(bantime_raw or 600),
|
|
max_retry=int(maxretry_raw or 5),
|
|
bantime_escalation=bantime_escalation,
|
|
status=jail_status,
|
|
actions=_ensure_list(actions_raw),
|
|
)
|
|
|
|
log.info("jail_detail_fetched", jail=name)
|
|
return JailDetailResponse(jail=jail)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API — Jail control
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def start_jail(socket_path: str, name: str) -> None:
|
|
"""Start a stopped fail2ban jail.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
name: Jail name to start.
|
|
|
|
Raises:
|
|
JailNotFoundError: If *name* is not a known jail.
|
|
JailOperationError: If fail2ban reports the operation failed.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
try:
|
|
_ok(await client.send(["start", name]))
|
|
log.info("jail_started", jail=name)
|
|
except ValueError as exc:
|
|
if _is_not_found_error(exc):
|
|
raise JailNotFoundError(name) from exc
|
|
raise JailOperationError(str(exc)) from exc
|
|
|
|
|
|
async def stop_jail(socket_path: str, name: str) -> None:
|
|
"""Stop a running fail2ban jail.
|
|
|
|
If the jail is not currently active (already stopped), this is a no-op
|
|
and no error is raised — the operation is idempotent.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
name: Jail name to stop.
|
|
|
|
Raises:
|
|
JailOperationError: If fail2ban reports the operation failed.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
try:
|
|
_ok(await client.send(["stop", name]))
|
|
log.info("jail_stopped", jail=name)
|
|
except ValueError as exc:
|
|
if _is_not_found_error(exc):
|
|
# Jail is already stopped or was never running — treat as a no-op.
|
|
log.info("jail_stop_noop", jail=name)
|
|
return
|
|
raise JailOperationError(str(exc)) from exc
|
|
|
|
|
|
async def set_idle(socket_path: str, name: str, *, on: bool) -> None:
|
|
"""Toggle the idle mode of a fail2ban jail.
|
|
|
|
When idle mode is on the jail pauses monitoring without stopping
|
|
completely; existing bans remain active.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
name: Jail name.
|
|
on: Pass ``True`` to enable idle, ``False`` to disable it.
|
|
|
|
Raises:
|
|
JailNotFoundError: If *name* is not a known jail.
|
|
JailOperationError: If fail2ban reports the operation failed.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
state = "on" if on else "off"
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
try:
|
|
_ok(await client.send(["set", name, "idle", state]))
|
|
log.info("jail_idle_toggled", jail=name, idle=on)
|
|
except ValueError as exc:
|
|
if _is_not_found_error(exc):
|
|
raise JailNotFoundError(name) from exc
|
|
raise JailOperationError(str(exc)) from exc
|
|
|
|
|
|
async def reload_jail(socket_path: str, name: str) -> None:
|
|
"""Reload a single fail2ban jail to pick up configuration changes.
|
|
|
|
The reload protocol requires a non-empty configuration stream. Without
|
|
one, fail2ban's end-of-reload phase removes every jail that received no
|
|
configuration commands — permanently deleting the jail from the running
|
|
daemon. Sending ``['start', name]`` as the minimal stream is sufficient:
|
|
``startJail`` removes the jail from ``reload_state``, which causes the
|
|
end phase to *commit* the reload instead of deleting the jail.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
name: Jail name to reload.
|
|
|
|
Raises:
|
|
JailNotFoundError: If *name* is not a known jail.
|
|
JailOperationError: If fail2ban reports the operation failed.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
try:
|
|
_ok(await client.send(["reload", name, [], [["start", name]]]))
|
|
log.info("jail_reloaded", jail=name)
|
|
except ValueError as exc:
|
|
if _is_not_found_error(exc):
|
|
raise JailNotFoundError(name) from exc
|
|
raise JailOperationError(str(exc)) from exc
|
|
|
|
|
|
async def reload_all(
|
|
socket_path: str,
|
|
*,
|
|
include_jails: list[str] | None = None,
|
|
exclude_jails: list[str] | None = None,
|
|
) -> None:
|
|
"""Reload all fail2ban jails at once.
|
|
|
|
Fetches the current jail list first so that a ``['start', name]`` entry
|
|
can be included in the config stream for every active jail. Without a
|
|
non-empty stream the end-of-reload phase deletes every jail that received
|
|
no configuration commands.
|
|
|
|
*include_jails* are added to the stream (e.g. a newly activated jail that
|
|
is not yet running). *exclude_jails* are removed from the stream (e.g. a
|
|
jail that was just deactivated and should not be restarted).
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
include_jails: Extra jail names to add to the start stream.
|
|
exclude_jails: Jail names to remove from the start stream.
|
|
|
|
Raises:
|
|
JailNotFoundError: If a jail in *include_jails* does not exist or
|
|
its configuration is invalid (e.g. missing logpath).
|
|
JailOperationError: If fail2ban reports the operation failed for
|
|
a different reason.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
async with _reload_all_lock:
|
|
try:
|
|
# Resolve jail names so we can build the minimal config stream.
|
|
status_raw = _ok(await client.send(["status"]))
|
|
status_dict = _to_dict(status_raw)
|
|
jail_list_raw: str = str(status_dict.get("Jail list", ""))
|
|
jail_names = [n.strip() for n in jail_list_raw.split(",") if n.strip()]
|
|
|
|
# Merge include/exclude sets so the stream matches the desired state.
|
|
names_set: set[str] = set(jail_names)
|
|
if include_jails:
|
|
names_set.update(include_jails)
|
|
if exclude_jails:
|
|
names_set -= set(exclude_jails)
|
|
|
|
stream: list[list[str]] = [["start", n] for n in sorted(names_set)]
|
|
_ok(await client.send(["reload", "--all", [], stream]))
|
|
log.info("all_jails_reloaded")
|
|
except ValueError as exc:
|
|
# Detect UnknownJailException (missing or invalid jail configuration)
|
|
# and re-raise as JailNotFoundError for better error specificity.
|
|
if _is_not_found_error(exc):
|
|
# Extract the jail name from include_jails if available.
|
|
jail_name = include_jails[0] if include_jails else "unknown"
|
|
raise JailNotFoundError(jail_name) from exc
|
|
raise JailOperationError(str(exc)) from exc
|
|
|
|
|
|
async def restart(socket_path: str) -> None:
|
|
"""Stop the fail2ban daemon via the Unix socket.
|
|
|
|
Sends ``["stop"]`` to the fail2ban daemon, which calls ``server.quit()``
|
|
on the daemon side and tears down all jails. The caller is responsible
|
|
for starting the daemon again (e.g. via ``fail2ban-client start``).
|
|
|
|
Note:
|
|
``["restart"]`` is a *client-side* orchestration command that is not
|
|
handled by the fail2ban server transmitter — sending it to the socket
|
|
raises ``"Invalid command"`` in the daemon.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
|
|
Raises:
|
|
JailOperationError: If fail2ban reports the stop command failed.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
try:
|
|
_ok(await client.send(["stop"]))
|
|
log.info("fail2ban_stopped_for_restart")
|
|
except ValueError as exc:
|
|
raise JailOperationError(str(exc)) from exc
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API — Ban / Unban
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def ban_ip(socket_path: str, jail: str, ip: str) -> None:
|
|
"""Ban an IP address in a specific fail2ban jail.
|
|
|
|
The IP address is validated with :mod:`ipaddress` before the command
|
|
is sent to fail2ban.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
jail: Jail in which to apply the ban.
|
|
ip: IP address to ban (IPv4 or IPv6).
|
|
|
|
Raises:
|
|
ValueError: If *ip* is not a valid IP address.
|
|
JailNotFoundError: If *jail* is not a known jail.
|
|
JailOperationError: If fail2ban reports the operation failed.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
# Validate the IP address before sending to avoid injection.
|
|
try:
|
|
ipaddress.ip_address(ip)
|
|
except ValueError as exc:
|
|
raise ValueError(f"Invalid IP address: {ip!r}") from exc
|
|
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
try:
|
|
_ok(await client.send(["set", jail, "banip", ip]))
|
|
log.info("ip_banned", ip=ip, jail=jail)
|
|
except ValueError as exc:
|
|
if _is_not_found_error(exc):
|
|
raise JailNotFoundError(jail) from exc
|
|
raise JailOperationError(str(exc)) from exc
|
|
|
|
|
|
async def unban_ip(
|
|
socket_path: str,
|
|
ip: str,
|
|
jail: str | None = None,
|
|
) -> None:
|
|
"""Unban an IP address from one or all fail2ban jails.
|
|
|
|
If *jail* is ``None``, the IP is unbanned from every jail using the
|
|
global ``unban`` command. Otherwise only the specified jail is
|
|
targeted.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
ip: IP address to unban.
|
|
jail: Jail to unban from. ``None`` means all jails.
|
|
|
|
Raises:
|
|
ValueError: If *ip* is not a valid IP address.
|
|
JailNotFoundError: If *jail* is specified but does not exist.
|
|
JailOperationError: If fail2ban reports the operation failed.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
try:
|
|
ipaddress.ip_address(ip)
|
|
except ValueError as exc:
|
|
raise ValueError(f"Invalid IP address: {ip!r}") from exc
|
|
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
try:
|
|
if jail is None:
|
|
_ok(await client.send(["unban", ip]))
|
|
log.info("ip_unbanned_all_jails", ip=ip)
|
|
else:
|
|
_ok(await client.send(["set", jail, "unbanip", ip]))
|
|
log.info("ip_unbanned", ip=ip, jail=jail)
|
|
except ValueError as exc:
|
|
if _is_not_found_error(exc):
|
|
raise JailNotFoundError(jail or "") from exc
|
|
raise JailOperationError(str(exc)) from exc
|
|
|
|
|
|
async def get_active_bans(
|
|
socket_path: str,
|
|
geo_enricher: Any | None = None,
|
|
http_session: Any | None = None,
|
|
app_db: Any | None = None,
|
|
) -> ActiveBanListResponse:
|
|
"""Return all currently banned IPs across every jail.
|
|
|
|
For each jail the ``get <jail> banip --with-time`` command is used
|
|
to retrieve ban start and expiry times alongside the IP address.
|
|
|
|
Geo enrichment strategy (highest priority first):
|
|
|
|
1. When *http_session* is provided the entire set of banned IPs is resolved
|
|
in a single :func:`~app.services.geo_service.lookup_batch` call (up to
|
|
100 IPs per HTTP request). This is far more efficient than concurrent
|
|
per-IP lookups and stays within ip-api.com rate limits.
|
|
2. When only *geo_enricher* is provided (legacy / test path) each IP is
|
|
resolved individually via the supplied async callable.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
geo_enricher: Optional async callable ``(ip) → GeoInfo | None``
|
|
used to enrich each ban entry with country and ASN data.
|
|
Ignored when *http_session* is provided.
|
|
http_session: Optional shared :class:`aiohttp.ClientSession`. When
|
|
provided, :func:`~app.services.geo_service.lookup_batch` is used
|
|
for efficient bulk geo resolution.
|
|
app_db: Optional BanGUI application database connection used to
|
|
persist newly resolved geo entries across restarts. Only
|
|
meaningful when *http_session* is provided.
|
|
|
|
Returns:
|
|
:class:`~app.models.ban.ActiveBanListResponse` with all active bans.
|
|
|
|
Raises:
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
from app.services import geo_service # noqa: PLC0415
|
|
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
|
|
# Fetch jail names.
|
|
global_status = _to_dict(_ok(await client.send(["status"])))
|
|
jail_list_raw: str = str(global_status.get("Jail list", "") or "").strip()
|
|
jail_names: list[str] = (
|
|
[j.strip() for j in jail_list_raw.split(",") if j.strip()]
|
|
if jail_list_raw
|
|
else []
|
|
)
|
|
|
|
if not jail_names:
|
|
return ActiveBanListResponse(bans=[], total=0)
|
|
|
|
# For each jail, fetch the ban list with time info in parallel.
|
|
results: list[Any] = await asyncio.gather(
|
|
*[client.send(["get", jn, "banip", "--with-time"]) for jn in jail_names],
|
|
return_exceptions=True,
|
|
)
|
|
|
|
bans: list[ActiveBan] = []
|
|
for jail_name, raw_result in zip(jail_names, results, strict=False):
|
|
if isinstance(raw_result, Exception):
|
|
log.warning(
|
|
"active_bans_fetch_error",
|
|
jail=jail_name,
|
|
error=str(raw_result),
|
|
)
|
|
continue
|
|
|
|
try:
|
|
ban_list: list[str] = _ok(raw_result) or []
|
|
except (TypeError, ValueError) as exc:
|
|
log.warning(
|
|
"active_bans_parse_error",
|
|
jail=jail_name,
|
|
error=str(exc),
|
|
)
|
|
continue
|
|
|
|
for entry in ban_list:
|
|
ban = _parse_ban_entry(str(entry), jail_name)
|
|
if ban is not None:
|
|
bans.append(ban)
|
|
|
|
# Enrich with geo data — prefer batch lookup over per-IP enricher.
|
|
if http_session is not None and bans:
|
|
all_ips: list[str] = [ban.ip for ban in bans]
|
|
try:
|
|
geo_map = await geo_service.lookup_batch(all_ips, http_session, db=app_db)
|
|
except Exception: # noqa: BLE001
|
|
log.warning("active_bans_batch_geo_failed")
|
|
geo_map = {}
|
|
enriched: list[ActiveBan] = []
|
|
for ban in bans:
|
|
geo = geo_map.get(ban.ip)
|
|
if geo is not None:
|
|
enriched.append(ban.model_copy(update={"country": geo.country_code}))
|
|
else:
|
|
enriched.append(ban)
|
|
bans = enriched
|
|
elif geo_enricher is not None:
|
|
bans = await _enrich_bans(bans, geo_enricher)
|
|
|
|
log.info("active_bans_fetched", total=len(bans))
|
|
return ActiveBanListResponse(bans=bans, total=len(bans))
|
|
|
|
|
|
def _parse_ban_entry(entry: str, jail: str) -> ActiveBan | None:
|
|
"""Parse a ban entry from ``get <jail> banip --with-time`` output.
|
|
|
|
Expected format::
|
|
|
|
"1.2.3.4 \t2025-01-01 12:00:00 + 3600 = 2025-01-01 13:00:00"
|
|
|
|
Args:
|
|
entry: Raw ban entry string.
|
|
jail: Jail name for the resulting record.
|
|
|
|
Returns:
|
|
An :class:`~app.models.ban.ActiveBan` or ``None`` if parsing fails.
|
|
"""
|
|
from datetime import UTC, datetime
|
|
|
|
try:
|
|
parts = entry.split("\t", 1)
|
|
ip = parts[0].strip()
|
|
|
|
# Validate IP
|
|
ipaddress.ip_address(ip)
|
|
|
|
if len(parts) < 2:
|
|
# Entry has no time info — return with unknown times.
|
|
return ActiveBan(
|
|
ip=ip,
|
|
jail=jail,
|
|
banned_at=None,
|
|
expires_at=None,
|
|
ban_count=1,
|
|
country=None,
|
|
)
|
|
|
|
time_part = parts[1].strip()
|
|
# Format: "2025-01-01 12:00:00 + 3600 = 2025-01-01 13:00:00"
|
|
# Split at " + " to get banned_at and remainder.
|
|
plus_idx = time_part.find(" + ")
|
|
if plus_idx == -1:
|
|
banned_at_str = time_part.strip()
|
|
expires_at_str: str | None = None
|
|
else:
|
|
banned_at_str = time_part[:plus_idx].strip()
|
|
remainder = time_part[plus_idx + 3 :] # skip " + "
|
|
eq_idx = remainder.find(" = ")
|
|
expires_at_str = remainder[eq_idx + 3 :].strip() if eq_idx != -1 else None
|
|
|
|
_date_fmt = "%Y-%m-%d %H:%M:%S"
|
|
|
|
def _to_iso(ts: str) -> str:
|
|
dt = datetime.strptime(ts, _date_fmt).replace(tzinfo=UTC)
|
|
return dt.isoformat()
|
|
|
|
banned_at_iso: str | None = None
|
|
expires_at_iso: str | None = None
|
|
|
|
with contextlib.suppress(ValueError):
|
|
banned_at_iso = _to_iso(banned_at_str)
|
|
|
|
with contextlib.suppress(ValueError):
|
|
if expires_at_str:
|
|
expires_at_iso = _to_iso(expires_at_str)
|
|
|
|
return ActiveBan(
|
|
ip=ip,
|
|
jail=jail,
|
|
banned_at=banned_at_iso,
|
|
expires_at=expires_at_iso,
|
|
ban_count=1,
|
|
country=None,
|
|
)
|
|
except (ValueError, IndexError, AttributeError) as exc:
|
|
log.debug("ban_entry_parse_error", entry=entry, jail=jail, error=str(exc))
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API — Jail-specific paginated bans
|
|
# ---------------------------------------------------------------------------
|
|
|
|
#: Maximum allowed page size for :func:`get_jail_banned_ips`.
|
|
_MAX_PAGE_SIZE: int = 100
|
|
|
|
|
|
async def get_jail_banned_ips(
|
|
socket_path: str,
|
|
jail_name: str,
|
|
page: int = 1,
|
|
page_size: int = 25,
|
|
search: str | None = None,
|
|
http_session: Any | None = None,
|
|
app_db: Any | None = None,
|
|
) -> JailBannedIpsResponse:
|
|
"""Return a paginated list of currently banned IPs for a single jail.
|
|
|
|
Fetches the full ban list from the fail2ban socket, applies an optional
|
|
substring search filter on the IP, paginates server-side, and geo-enriches
|
|
**only** the current page slice to stay within rate limits.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
jail_name: Name of the jail to query.
|
|
page: 1-based page number (default 1).
|
|
page_size: Items per page; clamped to :data:`_MAX_PAGE_SIZE` (default 25).
|
|
search: Optional case-insensitive substring filter applied to IP addresses.
|
|
http_session: Optional shared :class:`aiohttp.ClientSession` for geo
|
|
enrichment via :func:`~app.services.geo_service.lookup_batch`.
|
|
app_db: Optional BanGUI application database for persistent geo cache.
|
|
|
|
Returns:
|
|
:class:`~app.models.ban.JailBannedIpsResponse` with the paginated bans.
|
|
|
|
Raises:
|
|
JailNotFoundError: If *jail_name* is not a known active jail.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket is
|
|
unreachable.
|
|
"""
|
|
from app.services import geo_service # noqa: PLC0415
|
|
|
|
# Clamp page_size to the allowed maximum.
|
|
page_size = min(page_size, _MAX_PAGE_SIZE)
|
|
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
|
|
# Verify the jail exists.
|
|
try:
|
|
_ok(await client.send(["status", jail_name, "short"]))
|
|
except ValueError as exc:
|
|
if _is_not_found_error(exc):
|
|
raise JailNotFoundError(jail_name) from exc
|
|
raise
|
|
|
|
# Fetch the full ban list for this jail.
|
|
try:
|
|
raw_result = _ok(await client.send(["get", jail_name, "banip", "--with-time"]))
|
|
except (ValueError, TypeError):
|
|
raw_result = []
|
|
|
|
ban_list: list[str] = raw_result or []
|
|
|
|
# Parse all entries.
|
|
all_bans: list[ActiveBan] = []
|
|
for entry in ban_list:
|
|
ban = _parse_ban_entry(str(entry), jail_name)
|
|
if ban is not None:
|
|
all_bans.append(ban)
|
|
|
|
# Apply optional substring search filter (case-insensitive).
|
|
if search:
|
|
search_lower = search.lower()
|
|
all_bans = [b for b in all_bans if search_lower in b.ip.lower()]
|
|
|
|
total = len(all_bans)
|
|
|
|
# Slice the requested page.
|
|
start = (page - 1) * page_size
|
|
page_bans = all_bans[start : start + page_size]
|
|
|
|
# Geo-enrich only the page slice.
|
|
if http_session is not None and page_bans:
|
|
page_ips = [b.ip for b in page_bans]
|
|
try:
|
|
geo_map = await geo_service.lookup_batch(page_ips, http_session, db=app_db)
|
|
except Exception: # noqa: BLE001
|
|
log.warning("jail_banned_ips_geo_failed", jail=jail_name)
|
|
geo_map = {}
|
|
enriched_page: list[ActiveBan] = []
|
|
for ban in page_bans:
|
|
geo = geo_map.get(ban.ip)
|
|
if geo is not None:
|
|
enriched_page.append(ban.model_copy(update={"country": geo.country_code}))
|
|
else:
|
|
enriched_page.append(ban)
|
|
page_bans = enriched_page
|
|
|
|
log.info(
|
|
"jail_banned_ips_fetched",
|
|
jail=jail_name,
|
|
total=total,
|
|
page=page,
|
|
page_size=page_size,
|
|
)
|
|
return JailBannedIpsResponse(
|
|
items=page_bans,
|
|
total=total,
|
|
page=page,
|
|
page_size=page_size,
|
|
)
|
|
|
|
|
|
async def _enrich_bans(
|
|
bans: list[ActiveBan],
|
|
geo_enricher: Any,
|
|
) -> list[ActiveBan]:
|
|
"""Enrich ban records with geo data asynchronously.
|
|
|
|
Args:
|
|
bans: The list of :class:`~app.models.ban.ActiveBan` records to enrich.
|
|
geo_enricher: Async callable ``(ip) → GeoInfo | None``.
|
|
|
|
Returns:
|
|
The same list with ``country`` fields populated where lookup succeeded.
|
|
"""
|
|
geo_results: list[Any] = await asyncio.gather(
|
|
*[geo_enricher(ban.ip) for ban in bans],
|
|
return_exceptions=True,
|
|
)
|
|
enriched: list[ActiveBan] = []
|
|
for ban, geo in zip(bans, geo_results, strict=False):
|
|
if geo is not None and not isinstance(geo, Exception):
|
|
enriched.append(ban.model_copy(update={"country": geo.country_code}))
|
|
else:
|
|
enriched.append(ban)
|
|
return enriched
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API — Ignore list (IP whitelist)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def get_ignore_list(socket_path: str, name: str) -> list[str]:
|
|
"""Return the ignore list for a fail2ban jail.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
name: Jail name.
|
|
|
|
Returns:
|
|
List of IP addresses and CIDR networks on the jail's ignore list.
|
|
|
|
Raises:
|
|
JailNotFoundError: If *name* is not a known jail.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
try:
|
|
raw = _ok(await client.send(["get", name, "ignoreip"]))
|
|
return _ensure_list(raw)
|
|
except ValueError as exc:
|
|
if _is_not_found_error(exc):
|
|
raise JailNotFoundError(name) from exc
|
|
raise
|
|
|
|
|
|
async def add_ignore_ip(socket_path: str, name: str, ip: str) -> None:
|
|
"""Add an IP address or CIDR network to a jail's ignore list.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
name: Jail name.
|
|
ip: IP address or CIDR network to add.
|
|
|
|
Raises:
|
|
JailNotFoundError: If *name* is not a known jail.
|
|
JailOperationError: If fail2ban reports the operation failed.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
# Basic format validation.
|
|
try:
|
|
ipaddress.ip_network(ip, strict=False)
|
|
except ValueError as exc:
|
|
raise ValueError(f"Invalid IP address or network: {ip!r}") from exc
|
|
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
try:
|
|
_ok(await client.send(["set", name, "addignoreip", ip]))
|
|
log.info("ignore_ip_added", jail=name, ip=ip)
|
|
except ValueError as exc:
|
|
if _is_not_found_error(exc):
|
|
raise JailNotFoundError(name) from exc
|
|
raise JailOperationError(str(exc)) from exc
|
|
|
|
|
|
async def del_ignore_ip(socket_path: str, name: str, ip: str) -> None:
|
|
"""Remove an IP address or CIDR network from a jail's ignore list.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
name: Jail name.
|
|
ip: IP address or CIDR network to remove.
|
|
|
|
Raises:
|
|
JailNotFoundError: If *name* is not a known jail.
|
|
JailOperationError: If fail2ban reports the operation failed.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
try:
|
|
_ok(await client.send(["set", name, "delignoreip", ip]))
|
|
log.info("ignore_ip_removed", jail=name, ip=ip)
|
|
except ValueError as exc:
|
|
if _is_not_found_error(exc):
|
|
raise JailNotFoundError(name) from exc
|
|
raise JailOperationError(str(exc)) from exc
|
|
|
|
|
|
async def get_ignore_self(socket_path: str, name: str) -> bool:
|
|
"""Return whether a jail ignores the server's own IP addresses.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
name: Jail name.
|
|
|
|
Returns:
|
|
``True`` when ``ignoreself`` is enabled for the jail.
|
|
|
|
Raises:
|
|
JailNotFoundError: If *name* is not a known jail.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
try:
|
|
raw = _ok(await client.send(["get", name, "ignoreself"]))
|
|
return bool(raw)
|
|
except ValueError as exc:
|
|
if _is_not_found_error(exc):
|
|
raise JailNotFoundError(name) from exc
|
|
raise
|
|
|
|
|
|
async def set_ignore_self(socket_path: str, name: str, *, on: bool) -> None:
|
|
"""Toggle the ``ignoreself`` option for a fail2ban jail.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
name: Jail name.
|
|
on: ``True`` to enable ignoreself, ``False`` to disable.
|
|
|
|
Raises:
|
|
JailNotFoundError: If *name* is not a known jail.
|
|
JailOperationError: If fail2ban reports the operation failed.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
value = "true" if on else "false"
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
try:
|
|
_ok(await client.send(["set", name, "ignoreself", value]))
|
|
log.info("ignore_self_toggled", jail=name, on=on)
|
|
except ValueError as exc:
|
|
if _is_not_found_error(exc):
|
|
raise JailNotFoundError(name) from exc
|
|
raise JailOperationError(str(exc)) from exc
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API — IP lookup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def lookup_ip(
|
|
socket_path: str,
|
|
ip: str,
|
|
geo_enricher: Any | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Return ban status and history for a single IP address.
|
|
|
|
Checks every running jail for whether the IP is currently banned.
|
|
Also queries the fail2ban database for historical ban records.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
ip: IP address to look up.
|
|
geo_enricher: Optional async callable ``(ip) → GeoInfo | None``.
|
|
|
|
Returns:
|
|
A dict with keys:
|
|
* ``ip`` — the queried IP address
|
|
* ``currently_banned_in`` — list of jails where the IP is active
|
|
* ``geo`` — ``GeoInfo`` dataclass or ``None``
|
|
|
|
Raises:
|
|
ValueError: If *ip* is not a valid IP address.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
try:
|
|
ipaddress.ip_address(ip)
|
|
except ValueError as exc:
|
|
raise ValueError(f"Invalid IP address: {ip!r}") from exc
|
|
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
|
|
with contextlib.suppress(ValueError, Fail2BanConnectionError):
|
|
# Use fail2ban's "banned <ip>" command which checks all jails.
|
|
_ok(await client.send(["get", "--all", "banned", ip]))
|
|
|
|
# Fetch jail names from status.
|
|
global_status = _to_dict(_ok(await client.send(["status"])))
|
|
jail_list_raw: str = str(global_status.get("Jail list", "") or "").strip()
|
|
jail_names: list[str] = (
|
|
[j.strip() for j in jail_list_raw.split(",") if j.strip()]
|
|
if jail_list_raw
|
|
else []
|
|
)
|
|
|
|
# Check ban status per jail in parallel.
|
|
ban_results: list[Any] = await asyncio.gather(
|
|
*[client.send(["get", jn, "banip"]) for jn in jail_names],
|
|
return_exceptions=True,
|
|
)
|
|
|
|
currently_banned_in: list[str] = []
|
|
for jail_name, result in zip(jail_names, ban_results, strict=False):
|
|
if isinstance(result, Exception):
|
|
continue
|
|
try:
|
|
ban_list: list[str] = _ok(result) or []
|
|
if ip in ban_list:
|
|
currently_banned_in.append(jail_name)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
geo = None
|
|
if geo_enricher is not None:
|
|
with contextlib.suppress(Exception): # noqa: BLE001
|
|
geo = await geo_enricher(ip)
|
|
|
|
log.info("ip_lookup_completed", ip=ip, banned_in_jails=currently_banned_in)
|
|
|
|
return {
|
|
"ip": ip,
|
|
"currently_banned_in": currently_banned_in,
|
|
"geo": geo,
|
|
}
|
|
|
|
|
|
async def unban_all_ips(socket_path: str) -> int:
|
|
"""Unban every currently banned IP across all fail2ban jails.
|
|
|
|
Uses fail2ban's global ``unban --all`` command, which atomically removes
|
|
every active ban from every jail in a single socket round-trip.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
|
|
Returns:
|
|
The number of IP addresses that were unbanned.
|
|
|
|
Raises:
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
|
cannot be reached.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
|
count: int = int(_ok(await client.send(["unban", "--all"])))
|
|
log.info("all_ips_unbanned", count=count)
|
|
return count
|