"""Jail management service. Provides methods to list, inspect, and control fail2ban jails via the Unix domain socket. All socket I/O is performed through the async :class:`~app.utils.fail2ban_client.Fail2BanClient` wrapper. Architecture note: this module is a pure service — it contains **no** HTTP/FastAPI concerns. All results are returned as Pydantic models so routers can serialise them directly. """ from __future__ import annotations import asyncio import contextlib import ipaddress from typing import Any import structlog from app.models.ban import ActiveBan, ActiveBanListResponse from app.models.jail import ( Jail, JailDetailResponse, JailListResponse, JailStatus, JailSummary, ) from app.utils.fail2ban_client import Fail2BanClient, Fail2BanConnectionError log: structlog.stdlib.BoundLogger = structlog.get_logger() # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- _SOCKET_TIMEOUT: float = 10.0 # --------------------------------------------------------------------------- # Custom exceptions # --------------------------------------------------------------------------- class JailNotFoundError(Exception): """Raised when a requested jail name does not exist in fail2ban.""" def __init__(self, name: str) -> None: """Initialise with the jail name that was not found. Args: name: The jail name that could not be located. """ self.name: str = name super().__init__(f"Jail not found: {name!r}") class JailOperationError(Exception): """Raised when a jail control command fails for a non-auth reason.""" # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _ok(response: Any) -> Any: """Extract the payload from a fail2ban ``(return_code, data)`` response. Args: response: Raw value returned by :meth:`~Fail2BanClient.send`. Returns: The payload ``data`` portion of the response. Raises: ValueError: If the response indicates an error (return code ≠ 0). """ try: code, data = response except (TypeError, ValueError) as exc: raise ValueError(f"Unexpected fail2ban response shape: {response!r}") from exc if code != 0: raise ValueError(f"fail2ban returned error code {code}: {data!r}") return data def _to_dict(pairs: Any) -> dict[str, Any]: """Convert a list of ``(key, value)`` pairs to a plain dict. Args: pairs: A list of ``(key, value)`` pairs (or any iterable thereof). Returns: A :class:`dict` with the keys and values from *pairs*. """ if not isinstance(pairs, (list, tuple)): return {} result: dict[str, Any] = {} for item in pairs: try: k, v = item result[str(k)] = v except (TypeError, ValueError): pass return result def _ensure_list(value: Any) -> list[str]: """Coerce a fail2ban response value to a list of strings. Some fail2ban ``get`` responses return ``None`` or a single string when there is only one entry. This helper normalises the result. Args: value: The raw value from a ``get`` command response. Returns: A list of strings, possibly empty. """ if value is None: return [] if isinstance(value, str): return [value] if value.strip() else [] if isinstance(value, (list, tuple)): return [str(v) for v in value if v is not None] return [str(value)] def _is_not_found_error(exc: Exception) -> bool: """Return ``True`` if *exc* indicates a jail does not exist. Checks both space-separated (``"unknown jail"``) and concatenated (``"unknownjail"``) forms because fail2ban serialises ``UnknownJailException`` without a space when pickled. Args: exc: The exception to inspect. Returns: ``True`` when the exception message signals an unknown jail. """ msg = str(exc).lower() return any( phrase in msg for phrase in ( "unknown jail", "unknownjail", # covers UnknownJailException serialised by fail2ban "no jail", "does not exist", "not found", ) ) async def _safe_get( client: Fail2BanClient, command: list[Any], default: Any = None, ) -> Any: """Send a ``get`` command and return ``default`` on error. Errors during optional detail queries (logpath, regex, etc.) should not abort the whole request — this helper swallows them gracefully. Args: client: The :class:`~app.utils.fail2ban_client.Fail2BanClient` to use. command: The command list to send. default: Value to return when the command fails. Returns: The response payload, or *default* on any error. """ try: return _ok(await client.send(command)) except (ValueError, TypeError, Exception): return default # --------------------------------------------------------------------------- # Public API — Jail listing & detail # --------------------------------------------------------------------------- async def list_jails(socket_path: str) -> JailListResponse: """Return a summary list of all active fail2ban jails. Queries the daemon for the global jail list and then fetches status and key configuration for each jail in parallel. Args: socket_path: Path to the fail2ban Unix domain socket. Returns: :class:`~app.models.jail.JailListResponse` with all active jails. Raises: ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) # 1. Fetch global status to get jail names. global_status = _to_dict(_ok(await client.send(["status"]))) jail_list_raw: str = str(global_status.get("Jail list", "") or "").strip() jail_names: list[str] = ( [j.strip() for j in jail_list_raw.split(",") if j.strip()] if jail_list_raw else [] ) log.info("jail_list_fetched", count=len(jail_names)) if not jail_names: return JailListResponse(jails=[], total=0) # 2. Fetch summary data for every jail in parallel. summaries: list[JailSummary] = await asyncio.gather( *[_fetch_jail_summary(client, name) for name in jail_names], return_exceptions=False, ) return JailListResponse(jails=list(summaries), total=len(summaries)) async def _fetch_jail_summary( client: Fail2BanClient, name: str, ) -> JailSummary: """Fetch and build a :class:`~app.models.jail.JailSummary` for one jail. Sends the ``status``, ``get ... bantime``, ``findtime``, ``maxretry``, ``backend``, and ``idle`` commands in parallel. Args: client: Shared :class:`~app.utils.fail2ban_client.Fail2BanClient`. name: Jail name. Returns: A :class:`~app.models.jail.JailSummary` populated from the responses. """ _r = await asyncio.gather( client.send(["status", name, "short"]), client.send(["get", name, "bantime"]), client.send(["get", name, "findtime"]), client.send(["get", name, "maxretry"]), client.send(["get", name, "backend"]), client.send(["get", name, "idle"]), return_exceptions=True, ) status_raw: Any = _r[0] bantime_raw: Any = _r[1] findtime_raw: Any = _r[2] maxretry_raw: Any = _r[3] backend_raw: Any = _r[4] idle_raw: Any = _r[5] # Parse jail status (filter + actions). jail_status: JailStatus | None = None if not isinstance(status_raw, Exception): try: raw = _to_dict(_ok(status_raw)) filter_stats = _to_dict(raw.get("Filter") or []) action_stats = _to_dict(raw.get("Actions") or []) jail_status = JailStatus( currently_banned=int(action_stats.get("Currently banned", 0) or 0), total_banned=int(action_stats.get("Total banned", 0) or 0), currently_failed=int(filter_stats.get("Currently failed", 0) or 0), total_failed=int(filter_stats.get("Total failed", 0) or 0), ) except (ValueError, TypeError) as exc: log.warning("jail_status_parse_error", jail=name, error=str(exc)) def _safe_int(raw: Any, fallback: int) -> int: if isinstance(raw, Exception): return fallback try: return int(_ok(raw)) except (ValueError, TypeError): return fallback def _safe_str(raw: Any, fallback: str) -> str: if isinstance(raw, Exception): return fallback try: return str(_ok(raw)) except (ValueError, TypeError): return fallback def _safe_bool(raw: Any, fallback: bool = False) -> bool: if isinstance(raw, Exception): return fallback try: return bool(_ok(raw)) except (ValueError, TypeError): return fallback return JailSummary( name=name, enabled=True, running=True, idle=_safe_bool(idle_raw), backend=_safe_str(backend_raw, "polling"), find_time=_safe_int(findtime_raw, 600), ban_time=_safe_int(bantime_raw, 600), max_retry=_safe_int(maxretry_raw, 5), status=jail_status, ) async def get_jail(socket_path: str, name: str) -> JailDetailResponse: """Return full detail for a single fail2ban jail. Sends multiple ``get`` and ``status`` commands in parallel to build the complete jail snapshot. Args: socket_path: Path to the fail2ban Unix domain socket. name: Jail name. Returns: :class:`~app.models.jail.JailDetailResponse` with the full jail. Raises: JailNotFoundError: If *name* is not a known jail. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) # Verify the jail exists by sending a status command first. try: status_raw = _ok(await client.send(["status", name, "short"])) except ValueError as exc: if _is_not_found_error(exc): raise JailNotFoundError(name) from exc raise raw = _to_dict(status_raw) filter_stats = _to_dict(raw.get("Filter") or []) action_stats = _to_dict(raw.get("Actions") or []) jail_status = JailStatus( currently_banned=int(action_stats.get("Currently banned", 0) or 0), total_banned=int(action_stats.get("Total banned", 0) or 0), currently_failed=int(filter_stats.get("Currently failed", 0) or 0), total_failed=int(filter_stats.get("Total failed", 0) or 0), ) # Fetch all detail fields in parallel. ( logpath_raw, failregex_raw, ignoreregex_raw, ignoreip_raw, datepattern_raw, logencoding_raw, bantime_raw, findtime_raw, maxretry_raw, backend_raw, idle_raw, actions_raw, ) = await asyncio.gather( _safe_get(client, ["get", name, "logpath"], []), _safe_get(client, ["get", name, "failregex"], []), _safe_get(client, ["get", name, "ignoreregex"], []), _safe_get(client, ["get", name, "ignoreip"], []), _safe_get(client, ["get", name, "datepattern"], None), _safe_get(client, ["get", name, "logencoding"], "UTF-8"), _safe_get(client, ["get", name, "bantime"], 600), _safe_get(client, ["get", name, "findtime"], 600), _safe_get(client, ["get", name, "maxretry"], 5), _safe_get(client, ["get", name, "backend"], "polling"), _safe_get(client, ["get", name, "idle"], False), _safe_get(client, ["get", name, "actions"], []), ) jail = Jail( name=name, enabled=True, running=True, idle=bool(idle_raw), backend=str(backend_raw or "polling"), log_paths=_ensure_list(logpath_raw), fail_regex=_ensure_list(failregex_raw), ignore_regex=_ensure_list(ignoreregex_raw), ignore_ips=_ensure_list(ignoreip_raw), date_pattern=str(datepattern_raw) if datepattern_raw else None, log_encoding=str(logencoding_raw or "UTF-8"), find_time=int(findtime_raw or 600), ban_time=int(bantime_raw or 600), max_retry=int(maxretry_raw or 5), status=jail_status, actions=_ensure_list(actions_raw), ) log.info("jail_detail_fetched", jail=name) return JailDetailResponse(jail=jail) # --------------------------------------------------------------------------- # Public API — Jail control # --------------------------------------------------------------------------- async def start_jail(socket_path: str, name: str) -> None: """Start a stopped fail2ban jail. Args: socket_path: Path to the fail2ban Unix domain socket. name: Jail name to start. Raises: JailNotFoundError: If *name* is not a known jail. JailOperationError: If fail2ban reports the operation failed. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) try: _ok(await client.send(["start", name])) log.info("jail_started", jail=name) except ValueError as exc: if _is_not_found_error(exc): raise JailNotFoundError(name) from exc raise JailOperationError(str(exc)) from exc async def stop_jail(socket_path: str, name: str) -> None: """Stop a running fail2ban jail. Args: socket_path: Path to the fail2ban Unix domain socket. name: Jail name to stop. Raises: JailNotFoundError: If *name* is not a known jail. JailOperationError: If fail2ban reports the operation failed. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) try: _ok(await client.send(["stop", name])) log.info("jail_stopped", jail=name) except ValueError as exc: if _is_not_found_error(exc): raise JailNotFoundError(name) from exc raise JailOperationError(str(exc)) from exc async def set_idle(socket_path: str, name: str, *, on: bool) -> None: """Toggle the idle mode of a fail2ban jail. When idle mode is on the jail pauses monitoring without stopping completely; existing bans remain active. Args: socket_path: Path to the fail2ban Unix domain socket. name: Jail name. on: Pass ``True`` to enable idle, ``False`` to disable it. Raises: JailNotFoundError: If *name* is not a known jail. JailOperationError: If fail2ban reports the operation failed. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ state = "on" if on else "off" client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) try: _ok(await client.send(["set", name, "idle", state])) log.info("jail_idle_toggled", jail=name, idle=on) except ValueError as exc: if _is_not_found_error(exc): raise JailNotFoundError(name) from exc raise JailOperationError(str(exc)) from exc async def reload_jail(socket_path: str, name: str) -> None: """Reload a single fail2ban jail to pick up configuration changes. Args: socket_path: Path to the fail2ban Unix domain socket. name: Jail name to reload. Raises: JailNotFoundError: If *name* is not a known jail. JailOperationError: If fail2ban reports the operation failed. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) try: _ok(await client.send(["reload", name, [], []])) log.info("jail_reloaded", jail=name) except ValueError as exc: if _is_not_found_error(exc): raise JailNotFoundError(name) from exc raise JailOperationError(str(exc)) from exc async def reload_all(socket_path: str) -> None: """Reload all fail2ban jails at once. Args: socket_path: Path to the fail2ban Unix domain socket. Raises: JailOperationError: If fail2ban reports the operation failed. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) try: _ok(await client.send(["reload", "--all", [], []])) log.info("all_jails_reloaded") except ValueError as exc: raise JailOperationError(str(exc)) from exc # --------------------------------------------------------------------------- # Public API — Ban / Unban # --------------------------------------------------------------------------- async def ban_ip(socket_path: str, jail: str, ip: str) -> None: """Ban an IP address in a specific fail2ban jail. The IP address is validated with :mod:`ipaddress` before the command is sent to fail2ban. Args: socket_path: Path to the fail2ban Unix domain socket. jail: Jail in which to apply the ban. ip: IP address to ban (IPv4 or IPv6). Raises: ValueError: If *ip* is not a valid IP address. JailNotFoundError: If *jail* is not a known jail. JailOperationError: If fail2ban reports the operation failed. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ # Validate the IP address before sending to avoid injection. try: ipaddress.ip_address(ip) except ValueError as exc: raise ValueError(f"Invalid IP address: {ip!r}") from exc client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) try: _ok(await client.send(["set", jail, "banip", ip])) log.info("ip_banned", ip=ip, jail=jail) except ValueError as exc: if _is_not_found_error(exc): raise JailNotFoundError(jail) from exc raise JailOperationError(str(exc)) from exc async def unban_ip( socket_path: str, ip: str, jail: str | None = None, ) -> None: """Unban an IP address from one or all fail2ban jails. If *jail* is ``None``, the IP is unbanned from every jail using the global ``unban`` command. Otherwise only the specified jail is targeted. Args: socket_path: Path to the fail2ban Unix domain socket. ip: IP address to unban. jail: Jail to unban from. ``None`` means all jails. Raises: ValueError: If *ip* is not a valid IP address. JailNotFoundError: If *jail* is specified but does not exist. JailOperationError: If fail2ban reports the operation failed. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ try: ipaddress.ip_address(ip) except ValueError as exc: raise ValueError(f"Invalid IP address: {ip!r}") from exc client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) try: if jail is None: _ok(await client.send(["unban", ip])) log.info("ip_unbanned_all_jails", ip=ip) else: _ok(await client.send(["set", jail, "unbanip", ip])) log.info("ip_unbanned", ip=ip, jail=jail) except ValueError as exc: if _is_not_found_error(exc): raise JailNotFoundError(jail or "") from exc raise JailOperationError(str(exc)) from exc async def get_active_bans( socket_path: str, geo_enricher: Any | None = None, ) -> ActiveBanListResponse: """Return all currently banned IPs across every jail. For each jail the ``get banip --with-time`` command is used to retrieve ban start and expiry times alongside the IP address. Args: socket_path: Path to the fail2ban Unix domain socket. geo_enricher: Optional async callable ``(ip) → GeoInfo | None`` used to enrich each ban entry with country and ASN data. Returns: :class:`~app.models.ban.ActiveBanListResponse` with all active bans. Raises: ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) # Fetch jail names. global_status = _to_dict(_ok(await client.send(["status"]))) jail_list_raw: str = str(global_status.get("Jail list", "") or "").strip() jail_names: list[str] = ( [j.strip() for j in jail_list_raw.split(",") if j.strip()] if jail_list_raw else [] ) if not jail_names: return ActiveBanListResponse(bans=[], total=0) # For each jail, fetch the ban list with time info in parallel. results: list[Any] = await asyncio.gather( *[client.send(["get", jn, "banip", "--with-time"]) for jn in jail_names], return_exceptions=True, ) bans: list[ActiveBan] = [] for jail_name, raw_result in zip(jail_names, results, strict=False): if isinstance(raw_result, Exception): log.warning( "active_bans_fetch_error", jail=jail_name, error=str(raw_result), ) continue try: ban_list: list[str] = _ok(raw_result) or [] except (TypeError, ValueError) as exc: log.warning( "active_bans_parse_error", jail=jail_name, error=str(exc), ) continue for entry in ban_list: ban = _parse_ban_entry(str(entry), jail_name) if ban is not None: bans.append(ban) # Enrich with geo data if an enricher was provided. if geo_enricher is not None: bans = await _enrich_bans(bans, geo_enricher) log.info("active_bans_fetched", total=len(bans)) return ActiveBanListResponse(bans=bans, total=len(bans)) def _parse_ban_entry(entry: str, jail: str) -> ActiveBan | None: """Parse a ban entry from ``get banip --with-time`` output. Expected format:: "1.2.3.4 \t2025-01-01 12:00:00 + 3600 = 2025-01-01 13:00:00" Args: entry: Raw ban entry string. jail: Jail name for the resulting record. Returns: An :class:`~app.models.ban.ActiveBan` or ``None`` if parsing fails. """ from datetime import UTC, datetime try: parts = entry.split("\t", 1) ip = parts[0].strip() # Validate IP ipaddress.ip_address(ip) if len(parts) < 2: # Entry has no time info — return with unknown times. return ActiveBan( ip=ip, jail=jail, banned_at=None, expires_at=None, ban_count=1, country=None, ) time_part = parts[1].strip() # Format: "2025-01-01 12:00:00 + 3600 = 2025-01-01 13:00:00" # Split at " + " to get banned_at and remainder. plus_idx = time_part.find(" + ") if plus_idx == -1: banned_at_str = time_part.strip() expires_at_str: str | None = None else: banned_at_str = time_part[:plus_idx].strip() remainder = time_part[plus_idx + 3 :] # skip " + " eq_idx = remainder.find(" = ") expires_at_str = remainder[eq_idx + 3 :].strip() if eq_idx != -1 else None _date_fmt = "%Y-%m-%d %H:%M:%S" def _to_iso(ts: str) -> str: dt = datetime.strptime(ts, _date_fmt).replace(tzinfo=UTC) return dt.isoformat() banned_at_iso: str | None = None expires_at_iso: str | None = None with contextlib.suppress(ValueError): banned_at_iso = _to_iso(banned_at_str) with contextlib.suppress(ValueError): if expires_at_str: expires_at_iso = _to_iso(expires_at_str) return ActiveBan( ip=ip, jail=jail, banned_at=banned_at_iso, expires_at=expires_at_iso, ban_count=1, country=None, ) except (ValueError, IndexError, AttributeError) as exc: log.debug("ban_entry_parse_error", entry=entry, jail=jail, error=str(exc)) return None async def _enrich_bans( bans: list[ActiveBan], geo_enricher: Any, ) -> list[ActiveBan]: """Enrich ban records with geo data asynchronously. Args: bans: The list of :class:`~app.models.ban.ActiveBan` records to enrich. geo_enricher: Async callable ``(ip) → GeoInfo | None``. Returns: The same list with ``country`` fields populated where lookup succeeded. """ geo_results: list[Any] = await asyncio.gather( *[geo_enricher(ban.ip) for ban in bans], return_exceptions=True, ) enriched: list[ActiveBan] = [] for ban, geo in zip(bans, geo_results, strict=False): if geo is not None and not isinstance(geo, Exception): enriched.append(ban.model_copy(update={"country": geo.country_code})) else: enriched.append(ban) return enriched # --------------------------------------------------------------------------- # Public API — Ignore list (IP whitelist) # --------------------------------------------------------------------------- async def get_ignore_list(socket_path: str, name: str) -> list[str]: """Return the ignore list for a fail2ban jail. Args: socket_path: Path to the fail2ban Unix domain socket. name: Jail name. Returns: List of IP addresses and CIDR networks on the jail's ignore list. Raises: JailNotFoundError: If *name* is not a known jail. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) try: raw = _ok(await client.send(["get", name, "ignoreip"])) return _ensure_list(raw) except ValueError as exc: if _is_not_found_error(exc): raise JailNotFoundError(name) from exc raise async def add_ignore_ip(socket_path: str, name: str, ip: str) -> None: """Add an IP address or CIDR network to a jail's ignore list. Args: socket_path: Path to the fail2ban Unix domain socket. name: Jail name. ip: IP address or CIDR network to add. Raises: JailNotFoundError: If *name* is not a known jail. JailOperationError: If fail2ban reports the operation failed. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ # Basic format validation. try: ipaddress.ip_network(ip, strict=False) except ValueError as exc: raise ValueError(f"Invalid IP address or network: {ip!r}") from exc client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) try: _ok(await client.send(["set", name, "addignoreip", ip])) log.info("ignore_ip_added", jail=name, ip=ip) except ValueError as exc: if _is_not_found_error(exc): raise JailNotFoundError(name) from exc raise JailOperationError(str(exc)) from exc async def del_ignore_ip(socket_path: str, name: str, ip: str) -> None: """Remove an IP address or CIDR network from a jail's ignore list. Args: socket_path: Path to the fail2ban Unix domain socket. name: Jail name. ip: IP address or CIDR network to remove. Raises: JailNotFoundError: If *name* is not a known jail. JailOperationError: If fail2ban reports the operation failed. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) try: _ok(await client.send(["set", name, "delignoreip", ip])) log.info("ignore_ip_removed", jail=name, ip=ip) except ValueError as exc: if _is_not_found_error(exc): raise JailNotFoundError(name) from exc raise JailOperationError(str(exc)) from exc async def get_ignore_self(socket_path: str, name: str) -> bool: """Return whether a jail ignores the server's own IP addresses. Args: socket_path: Path to the fail2ban Unix domain socket. name: Jail name. Returns: ``True`` when ``ignoreself`` is enabled for the jail. Raises: JailNotFoundError: If *name* is not a known jail. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) try: raw = _ok(await client.send(["get", name, "ignoreself"])) return bool(raw) except ValueError as exc: if _is_not_found_error(exc): raise JailNotFoundError(name) from exc raise async def set_ignore_self(socket_path: str, name: str, *, on: bool) -> None: """Toggle the ``ignoreself`` option for a fail2ban jail. Args: socket_path: Path to the fail2ban Unix domain socket. name: Jail name. on: ``True`` to enable ignoreself, ``False`` to disable. Raises: JailNotFoundError: If *name* is not a known jail. JailOperationError: If fail2ban reports the operation failed. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ value = "true" if on else "false" client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) try: _ok(await client.send(["set", name, "ignoreself", value])) log.info("ignore_self_toggled", jail=name, on=on) except ValueError as exc: if _is_not_found_error(exc): raise JailNotFoundError(name) from exc raise JailOperationError(str(exc)) from exc # --------------------------------------------------------------------------- # Public API — IP lookup # --------------------------------------------------------------------------- async def lookup_ip( socket_path: str, ip: str, geo_enricher: Any | None = None, ) -> dict[str, Any]: """Return ban status and history for a single IP address. Checks every running jail for whether the IP is currently banned. Also queries the fail2ban database for historical ban records. Args: socket_path: Path to the fail2ban Unix domain socket. ip: IP address to look up. geo_enricher: Optional async callable ``(ip) → GeoInfo | None``. Returns: A dict with keys: * ``ip`` — the queried IP address * ``currently_banned_in`` — list of jails where the IP is active * ``geo`` — ``GeoInfo`` dataclass or ``None`` Raises: ValueError: If *ip* is not a valid IP address. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket cannot be reached. """ try: ipaddress.ip_address(ip) except ValueError as exc: raise ValueError(f"Invalid IP address: {ip!r}") from exc client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) with contextlib.suppress(ValueError, Fail2BanConnectionError): # Use fail2ban's "banned " command which checks all jails. _ok(await client.send(["get", "--all", "banned", ip])) # Fetch jail names from status. global_status = _to_dict(_ok(await client.send(["status"]))) jail_list_raw: str = str(global_status.get("Jail list", "") or "").strip() jail_names: list[str] = ( [j.strip() for j in jail_list_raw.split(",") if j.strip()] if jail_list_raw else [] ) # Check ban status per jail in parallel. ban_results: list[Any] = await asyncio.gather( *[client.send(["get", jn, "banip"]) for jn in jail_names], return_exceptions=True, ) currently_banned_in: list[str] = [] for jail_name, result in zip(jail_names, ban_results, strict=False): if isinstance(result, Exception): continue try: ban_list: list[str] = _ok(result) or [] if ip in ban_list: currently_banned_in.append(jail_name) except (ValueError, TypeError): pass geo = None if geo_enricher is not None: with contextlib.suppress(Exception): # noqa: BLE001 geo = await geo_enricher(ip) log.info("ip_lookup_completed", ip=ip, banned_in_jails=currently_banned_in) return { "ip": ip, "currently_banned_in": currently_banned_in, "geo": geo, }