Files
BanGUI/backend/app/services/jail_service.py
Lukas 1e33220f59 Add reload and restart buttons to Server tab
Adds ability to reload or restart fail2ban service from the Server tab UI.

Backend changes:
- Add new restart() method to jail_service.py that sends 'restart' command
- Add new POST /api/config/restart endpoint in config router
- Endpoint returns 204 on success, 502 if fail2ban unreachable
- Includes structured logging via 'fail2ban_restarted' log entry

Frontend changes:
- Add configRestart endpoint to endpoints.ts
- Add restartFail2Ban() API function in config.ts API module
- Import ArrowSync24Regular icon from Fluent UI
- Add reload and restart button handlers to ServerTab
- Display 'Reload fail2ban' and 'Restart fail2ban' buttons in action row
- Show loading spinner during operation
- Display success/error MessageBar with appropriate feedback
- Update ServerTab docstring to document new buttons

All 115 frontend tests pass.
2026-03-14 22:03:58 +01:00

1264 lines
43 KiB
Python

"""Jail management service.
Provides methods to list, inspect, and control fail2ban jails via the
Unix domain socket. All socket I/O is performed through the async
:class:`~app.utils.fail2ban_client.Fail2BanClient` wrapper.
Architecture note: this module is a pure service — it contains **no**
HTTP/FastAPI concerns. All results are returned as Pydantic models so
routers can serialise them directly.
"""
from __future__ import annotations
import asyncio
import contextlib
import ipaddress
from typing import Any
import structlog
from app.models.ban import ActiveBan, ActiveBanListResponse, JailBannedIpsResponse
from app.models.config import BantimeEscalation
from app.models.jail import (
Jail,
JailDetailResponse,
JailListResponse,
JailStatus,
JailSummary,
)
from app.utils.fail2ban_client import Fail2BanClient, Fail2BanConnectionError
log: structlog.stdlib.BoundLogger = structlog.get_logger()
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_SOCKET_TIMEOUT: float = 10.0
# Guard against concurrent reload_all calls. Overlapping ``reload --all``
# commands sent to fail2ban's socket produce undefined behaviour and may cause
# jails to be permanently removed from the daemon. Serialising them here
# ensures only one reload stream is in-flight at a time.
_reload_all_lock: asyncio.Lock = asyncio.Lock()
# ---------------------------------------------------------------------------
# Custom exceptions
# ---------------------------------------------------------------------------
class JailNotFoundError(Exception):
"""Raised when a requested jail name does not exist in fail2ban."""
def __init__(self, name: str) -> None:
"""Initialise with the jail name that was not found.
Args:
name: The jail name that could not be located.
"""
self.name: str = name
super().__init__(f"Jail not found: {name!r}")
class JailOperationError(Exception):
"""Raised when a jail control command fails for a non-auth reason."""
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _ok(response: Any) -> Any:
"""Extract the payload from a fail2ban ``(return_code, data)`` response.
Args:
response: Raw value returned by :meth:`~Fail2BanClient.send`.
Returns:
The payload ``data`` portion of the response.
Raises:
ValueError: If the response indicates an error (return code ≠ 0).
"""
try:
code, data = response
except (TypeError, ValueError) as exc:
raise ValueError(f"Unexpected fail2ban response shape: {response!r}") from exc
if code != 0:
raise ValueError(f"fail2ban returned error code {code}: {data!r}")
return data
def _to_dict(pairs: Any) -> dict[str, Any]:
"""Convert a list of ``(key, value)`` pairs to a plain dict.
Args:
pairs: A list of ``(key, value)`` pairs (or any iterable thereof).
Returns:
A :class:`dict` with the keys and values from *pairs*.
"""
if not isinstance(pairs, (list, tuple)):
return {}
result: dict[str, Any] = {}
for item in pairs:
try:
k, v = item
result[str(k)] = v
except (TypeError, ValueError):
pass
return result
def _ensure_list(value: Any) -> list[str]:
"""Coerce a fail2ban response value to a list of strings.
Some fail2ban ``get`` responses return ``None`` or a single string
when there is only one entry. This helper normalises the result.
Args:
value: The raw value from a ``get`` command response.
Returns:
A list of strings, possibly empty.
"""
if value is None:
return []
if isinstance(value, str):
return [value] if value.strip() else []
if isinstance(value, (list, tuple)):
return [str(v) for v in value if v is not None]
return [str(value)]
def _is_not_found_error(exc: Exception) -> bool:
"""Return ``True`` if *exc* indicates a jail does not exist.
Checks both space-separated (``"unknown jail"``) and concatenated
(``"unknownjail"``) forms because fail2ban serialises
``UnknownJailException`` without a space when pickled.
Args:
exc: The exception to inspect.
Returns:
``True`` when the exception message signals an unknown jail.
"""
msg = str(exc).lower()
return any(
phrase in msg
for phrase in (
"unknown jail",
"unknownjail", # covers UnknownJailException serialised by fail2ban
"no jail",
"does not exist",
"not found",
)
)
async def _safe_get(
client: Fail2BanClient,
command: list[Any],
default: Any = None,
) -> Any:
"""Send a ``get`` command and return ``default`` on error.
Errors during optional detail queries (logpath, regex, etc.) should
not abort the whole request — this helper swallows them gracefully.
Args:
client: The :class:`~app.utils.fail2ban_client.Fail2BanClient` to use.
command: The command list to send.
default: Value to return when the command fails.
Returns:
The response payload, or *default* on any error.
"""
try:
return _ok(await client.send(command))
except (ValueError, TypeError, Exception):
return default
# ---------------------------------------------------------------------------
# Public API — Jail listing & detail
# ---------------------------------------------------------------------------
async def list_jails(socket_path: str) -> JailListResponse:
"""Return a summary list of all active fail2ban jails.
Queries the daemon for the global jail list and then fetches status
and key configuration for each jail in parallel.
Args:
socket_path: Path to the fail2ban Unix domain socket.
Returns:
:class:`~app.models.jail.JailListResponse` with all active jails.
Raises:
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
# 1. Fetch global status to get jail names.
global_status = _to_dict(_ok(await client.send(["status"])))
jail_list_raw: str = str(global_status.get("Jail list", "") or "").strip()
jail_names: list[str] = (
[j.strip() for j in jail_list_raw.split(",") if j.strip()]
if jail_list_raw
else []
)
log.info("jail_list_fetched", count=len(jail_names))
if not jail_names:
return JailListResponse(jails=[], total=0)
# 2. Fetch summary data for every jail in parallel.
summaries: list[JailSummary] = await asyncio.gather(
*[_fetch_jail_summary(client, name) for name in jail_names],
return_exceptions=False,
)
return JailListResponse(jails=list(summaries), total=len(summaries))
async def _fetch_jail_summary(
client: Fail2BanClient,
name: str,
) -> JailSummary:
"""Fetch and build a :class:`~app.models.jail.JailSummary` for one jail.
Sends the ``status``, ``get ... bantime``, ``findtime``, ``maxretry``,
``backend``, and ``idle`` commands in parallel.
Args:
client: Shared :class:`~app.utils.fail2ban_client.Fail2BanClient`.
name: Jail name.
Returns:
A :class:`~app.models.jail.JailSummary` populated from the responses.
"""
_r = await asyncio.gather(
client.send(["status", name, "short"]),
client.send(["get", name, "bantime"]),
client.send(["get", name, "findtime"]),
client.send(["get", name, "maxretry"]),
client.send(["get", name, "backend"]),
client.send(["get", name, "idle"]),
return_exceptions=True,
)
status_raw: Any = _r[0]
bantime_raw: Any = _r[1]
findtime_raw: Any = _r[2]
maxretry_raw: Any = _r[3]
backend_raw: Any = _r[4]
idle_raw: Any = _r[5]
# Parse jail status (filter + actions).
jail_status: JailStatus | None = None
if not isinstance(status_raw, Exception):
try:
raw = _to_dict(_ok(status_raw))
filter_stats = _to_dict(raw.get("Filter") or [])
action_stats = _to_dict(raw.get("Actions") or [])
jail_status = JailStatus(
currently_banned=int(action_stats.get("Currently banned", 0) or 0),
total_banned=int(action_stats.get("Total banned", 0) or 0),
currently_failed=int(filter_stats.get("Currently failed", 0) or 0),
total_failed=int(filter_stats.get("Total failed", 0) or 0),
)
except (ValueError, TypeError) as exc:
log.warning("jail_status_parse_error", jail=name, error=str(exc))
def _safe_int(raw: Any, fallback: int) -> int:
if isinstance(raw, Exception):
return fallback
try:
return int(_ok(raw))
except (ValueError, TypeError):
return fallback
def _safe_str(raw: Any, fallback: str) -> str:
if isinstance(raw, Exception):
return fallback
try:
return str(_ok(raw))
except (ValueError, TypeError):
return fallback
def _safe_bool(raw: Any, fallback: bool = False) -> bool:
if isinstance(raw, Exception):
return fallback
try:
return bool(_ok(raw))
except (ValueError, TypeError):
return fallback
return JailSummary(
name=name,
enabled=True,
running=True,
idle=_safe_bool(idle_raw),
backend=_safe_str(backend_raw, "polling"),
find_time=_safe_int(findtime_raw, 600),
ban_time=_safe_int(bantime_raw, 600),
max_retry=_safe_int(maxretry_raw, 5),
status=jail_status,
)
async def get_jail(socket_path: str, name: str) -> JailDetailResponse:
"""Return full detail for a single fail2ban jail.
Sends multiple ``get`` and ``status`` commands in parallel to build
the complete jail snapshot.
Args:
socket_path: Path to the fail2ban Unix domain socket.
name: Jail name.
Returns:
:class:`~app.models.jail.JailDetailResponse` with the full jail.
Raises:
JailNotFoundError: If *name* is not a known jail.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
# Verify the jail exists by sending a status command first.
try:
status_raw = _ok(await client.send(["status", name, "short"]))
except ValueError as exc:
if _is_not_found_error(exc):
raise JailNotFoundError(name) from exc
raise
raw = _to_dict(status_raw)
filter_stats = _to_dict(raw.get("Filter") or [])
action_stats = _to_dict(raw.get("Actions") or [])
jail_status = JailStatus(
currently_banned=int(action_stats.get("Currently banned", 0) or 0),
total_banned=int(action_stats.get("Total banned", 0) or 0),
currently_failed=int(filter_stats.get("Currently failed", 0) or 0),
total_failed=int(filter_stats.get("Total failed", 0) or 0),
)
# Fetch all detail fields in parallel.
(
logpath_raw,
failregex_raw,
ignoreregex_raw,
ignoreip_raw,
datepattern_raw,
logencoding_raw,
bantime_raw,
findtime_raw,
maxretry_raw,
backend_raw,
idle_raw,
actions_raw,
bt_increment_raw,
bt_factor_raw,
bt_formula_raw,
bt_multipliers_raw,
bt_maxtime_raw,
bt_rndtime_raw,
bt_overalljails_raw,
) = await asyncio.gather(
_safe_get(client, ["get", name, "logpath"], []),
_safe_get(client, ["get", name, "failregex"], []),
_safe_get(client, ["get", name, "ignoreregex"], []),
_safe_get(client, ["get", name, "ignoreip"], []),
_safe_get(client, ["get", name, "datepattern"], None),
_safe_get(client, ["get", name, "logencoding"], "UTF-8"),
_safe_get(client, ["get", name, "bantime"], 600),
_safe_get(client, ["get", name, "findtime"], 600),
_safe_get(client, ["get", name, "maxretry"], 5),
_safe_get(client, ["get", name, "backend"], "polling"),
_safe_get(client, ["get", name, "idle"], False),
_safe_get(client, ["get", name, "actions"], []),
_safe_get(client, ["get", name, "bantime.increment"], False),
_safe_get(client, ["get", name, "bantime.factor"], None),
_safe_get(client, ["get", name, "bantime.formula"], None),
_safe_get(client, ["get", name, "bantime.multipliers"], None),
_safe_get(client, ["get", name, "bantime.maxtime"], None),
_safe_get(client, ["get", name, "bantime.rndtime"], None),
_safe_get(client, ["get", name, "bantime.overalljails"], False),
)
bt_increment: bool = bool(bt_increment_raw)
bantime_escalation = BantimeEscalation(
increment=bt_increment,
factor=float(bt_factor_raw) if bt_factor_raw is not None else None,
formula=str(bt_formula_raw) if bt_formula_raw else None,
multipliers=str(bt_multipliers_raw) if bt_multipliers_raw else None,
max_time=int(bt_maxtime_raw) if bt_maxtime_raw is not None else None,
rnd_time=int(bt_rndtime_raw) if bt_rndtime_raw is not None else None,
overall_jails=bool(bt_overalljails_raw),
)
jail = Jail(
name=name,
enabled=True,
running=True,
idle=bool(idle_raw),
backend=str(backend_raw or "polling"),
log_paths=_ensure_list(logpath_raw),
fail_regex=_ensure_list(failregex_raw),
ignore_regex=_ensure_list(ignoreregex_raw),
ignore_ips=_ensure_list(ignoreip_raw),
date_pattern=str(datepattern_raw) if datepattern_raw else None,
log_encoding=str(logencoding_raw or "UTF-8"),
find_time=int(findtime_raw or 600),
ban_time=int(bantime_raw or 600),
max_retry=int(maxretry_raw or 5),
bantime_escalation=bantime_escalation,
status=jail_status,
actions=_ensure_list(actions_raw),
)
log.info("jail_detail_fetched", jail=name)
return JailDetailResponse(jail=jail)
# ---------------------------------------------------------------------------
# Public API — Jail control
# ---------------------------------------------------------------------------
async def start_jail(socket_path: str, name: str) -> None:
"""Start a stopped fail2ban jail.
Args:
socket_path: Path to the fail2ban Unix domain socket.
name: Jail name to start.
Raises:
JailNotFoundError: If *name* is not a known jail.
JailOperationError: If fail2ban reports the operation failed.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
try:
_ok(await client.send(["start", name]))
log.info("jail_started", jail=name)
except ValueError as exc:
if _is_not_found_error(exc):
raise JailNotFoundError(name) from exc
raise JailOperationError(str(exc)) from exc
async def stop_jail(socket_path: str, name: str) -> None:
"""Stop a running fail2ban jail.
If the jail is not currently active (already stopped), this is a no-op
and no error is raised — the operation is idempotent.
Args:
socket_path: Path to the fail2ban Unix domain socket.
name: Jail name to stop.
Raises:
JailOperationError: If fail2ban reports the operation failed.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
try:
_ok(await client.send(["stop", name]))
log.info("jail_stopped", jail=name)
except ValueError as exc:
if _is_not_found_error(exc):
# Jail is already stopped or was never running — treat as a no-op.
log.info("jail_stop_noop", jail=name)
return
raise JailOperationError(str(exc)) from exc
async def set_idle(socket_path: str, name: str, *, on: bool) -> None:
"""Toggle the idle mode of a fail2ban jail.
When idle mode is on the jail pauses monitoring without stopping
completely; existing bans remain active.
Args:
socket_path: Path to the fail2ban Unix domain socket.
name: Jail name.
on: Pass ``True`` to enable idle, ``False`` to disable it.
Raises:
JailNotFoundError: If *name* is not a known jail.
JailOperationError: If fail2ban reports the operation failed.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
state = "on" if on else "off"
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
try:
_ok(await client.send(["set", name, "idle", state]))
log.info("jail_idle_toggled", jail=name, idle=on)
except ValueError as exc:
if _is_not_found_error(exc):
raise JailNotFoundError(name) from exc
raise JailOperationError(str(exc)) from exc
async def reload_jail(socket_path: str, name: str) -> None:
"""Reload a single fail2ban jail to pick up configuration changes.
The reload protocol requires a non-empty configuration stream. Without
one, fail2ban's end-of-reload phase removes every jail that received no
configuration commands — permanently deleting the jail from the running
daemon. Sending ``['start', name]`` as the minimal stream is sufficient:
``startJail`` removes the jail from ``reload_state``, which causes the
end phase to *commit* the reload instead of deleting the jail.
Args:
socket_path: Path to the fail2ban Unix domain socket.
name: Jail name to reload.
Raises:
JailNotFoundError: If *name* is not a known jail.
JailOperationError: If fail2ban reports the operation failed.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
try:
_ok(await client.send(["reload", name, [], [["start", name]]]))
log.info("jail_reloaded", jail=name)
except ValueError as exc:
if _is_not_found_error(exc):
raise JailNotFoundError(name) from exc
raise JailOperationError(str(exc)) from exc
async def reload_all(
socket_path: str,
*,
include_jails: list[str] | None = None,
exclude_jails: list[str] | None = None,
) -> None:
"""Reload all fail2ban jails at once.
Fetches the current jail list first so that a ``['start', name]`` entry
can be included in the config stream for every active jail. Without a
non-empty stream the end-of-reload phase deletes every jail that received
no configuration commands.
*include_jails* are added to the stream (e.g. a newly activated jail that
is not yet running). *exclude_jails* are removed from the stream (e.g. a
jail that was just deactivated and should not be restarted).
Args:
socket_path: Path to the fail2ban Unix domain socket.
include_jails: Extra jail names to add to the start stream.
exclude_jails: Jail names to remove from the start stream.
Raises:
JailOperationError: If fail2ban reports the operation failed.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
async with _reload_all_lock:
try:
# Resolve jail names so we can build the minimal config stream.
status_raw = _ok(await client.send(["status"]))
status_dict = _to_dict(status_raw)
jail_list_raw: str = str(status_dict.get("Jail list", ""))
jail_names = [n.strip() for n in jail_list_raw.split(",") if n.strip()]
# Merge include/exclude sets so the stream matches the desired state.
names_set: set[str] = set(jail_names)
if include_jails:
names_set.update(include_jails)
if exclude_jails:
names_set -= set(exclude_jails)
stream: list[list[str]] = [["start", n] for n in sorted(names_set)]
_ok(await client.send(["reload", "--all", [], stream]))
log.info("all_jails_reloaded")
except ValueError as exc:
raise JailOperationError(str(exc)) from exc
async def restart(socket_path: str) -> None:
"""Restart the fail2ban service (daemon).
Sends the 'restart' command to the fail2ban daemon via the Unix socket.
All jails are stopped and the daemon is restarted, re-reading all
configuration from scratch.
Args:
socket_path: Path to the fail2ban Unix domain socket.
Raises:
JailOperationError: If fail2ban reports the operation failed.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
try:
_ok(await client.send(["restart"]))
log.info("fail2ban_restarted")
except ValueError as exc:
raise JailOperationError(str(exc)) from exc
# ---------------------------------------------------------------------------
# Public API — Ban / Unban
# ---------------------------------------------------------------------------
async def ban_ip(socket_path: str, jail: str, ip: str) -> None:
"""Ban an IP address in a specific fail2ban jail.
The IP address is validated with :mod:`ipaddress` before the command
is sent to fail2ban.
Args:
socket_path: Path to the fail2ban Unix domain socket.
jail: Jail in which to apply the ban.
ip: IP address to ban (IPv4 or IPv6).
Raises:
ValueError: If *ip* is not a valid IP address.
JailNotFoundError: If *jail* is not a known jail.
JailOperationError: If fail2ban reports the operation failed.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
# Validate the IP address before sending to avoid injection.
try:
ipaddress.ip_address(ip)
except ValueError as exc:
raise ValueError(f"Invalid IP address: {ip!r}") from exc
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
try:
_ok(await client.send(["set", jail, "banip", ip]))
log.info("ip_banned", ip=ip, jail=jail)
except ValueError as exc:
if _is_not_found_error(exc):
raise JailNotFoundError(jail) from exc
raise JailOperationError(str(exc)) from exc
async def unban_ip(
socket_path: str,
ip: str,
jail: str | None = None,
) -> None:
"""Unban an IP address from one or all fail2ban jails.
If *jail* is ``None``, the IP is unbanned from every jail using the
global ``unban`` command. Otherwise only the specified jail is
targeted.
Args:
socket_path: Path to the fail2ban Unix domain socket.
ip: IP address to unban.
jail: Jail to unban from. ``None`` means all jails.
Raises:
ValueError: If *ip* is not a valid IP address.
JailNotFoundError: If *jail* is specified but does not exist.
JailOperationError: If fail2ban reports the operation failed.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
try:
ipaddress.ip_address(ip)
except ValueError as exc:
raise ValueError(f"Invalid IP address: {ip!r}") from exc
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
try:
if jail is None:
_ok(await client.send(["unban", ip]))
log.info("ip_unbanned_all_jails", ip=ip)
else:
_ok(await client.send(["set", jail, "unbanip", ip]))
log.info("ip_unbanned", ip=ip, jail=jail)
except ValueError as exc:
if _is_not_found_error(exc):
raise JailNotFoundError(jail or "") from exc
raise JailOperationError(str(exc)) from exc
async def get_active_bans(
socket_path: str,
geo_enricher: Any | None = None,
http_session: Any | None = None,
app_db: Any | None = None,
) -> ActiveBanListResponse:
"""Return all currently banned IPs across every jail.
For each jail the ``get <jail> banip --with-time`` command is used
to retrieve ban start and expiry times alongside the IP address.
Geo enrichment strategy (highest priority first):
1. When *http_session* is provided the entire set of banned IPs is resolved
in a single :func:`~app.services.geo_service.lookup_batch` call (up to
100 IPs per HTTP request). This is far more efficient than concurrent
per-IP lookups and stays within ip-api.com rate limits.
2. When only *geo_enricher* is provided (legacy / test path) each IP is
resolved individually via the supplied async callable.
Args:
socket_path: Path to the fail2ban Unix domain socket.
geo_enricher: Optional async callable ``(ip) → GeoInfo | None``
used to enrich each ban entry with country and ASN data.
Ignored when *http_session* is provided.
http_session: Optional shared :class:`aiohttp.ClientSession`. When
provided, :func:`~app.services.geo_service.lookup_batch` is used
for efficient bulk geo resolution.
app_db: Optional BanGUI application database connection used to
persist newly resolved geo entries across restarts. Only
meaningful when *http_session* is provided.
Returns:
:class:`~app.models.ban.ActiveBanListResponse` with all active bans.
Raises:
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
from app.services import geo_service # noqa: PLC0415
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
# Fetch jail names.
global_status = _to_dict(_ok(await client.send(["status"])))
jail_list_raw: str = str(global_status.get("Jail list", "") or "").strip()
jail_names: list[str] = (
[j.strip() for j in jail_list_raw.split(",") if j.strip()]
if jail_list_raw
else []
)
if not jail_names:
return ActiveBanListResponse(bans=[], total=0)
# For each jail, fetch the ban list with time info in parallel.
results: list[Any] = await asyncio.gather(
*[client.send(["get", jn, "banip", "--with-time"]) for jn in jail_names],
return_exceptions=True,
)
bans: list[ActiveBan] = []
for jail_name, raw_result in zip(jail_names, results, strict=False):
if isinstance(raw_result, Exception):
log.warning(
"active_bans_fetch_error",
jail=jail_name,
error=str(raw_result),
)
continue
try:
ban_list: list[str] = _ok(raw_result) or []
except (TypeError, ValueError) as exc:
log.warning(
"active_bans_parse_error",
jail=jail_name,
error=str(exc),
)
continue
for entry in ban_list:
ban = _parse_ban_entry(str(entry), jail_name)
if ban is not None:
bans.append(ban)
# Enrich with geo data — prefer batch lookup over per-IP enricher.
if http_session is not None and bans:
all_ips: list[str] = [ban.ip for ban in bans]
try:
geo_map = await geo_service.lookup_batch(all_ips, http_session, db=app_db)
except Exception: # noqa: BLE001
log.warning("active_bans_batch_geo_failed")
geo_map = {}
enriched: list[ActiveBan] = []
for ban in bans:
geo = geo_map.get(ban.ip)
if geo is not None:
enriched.append(ban.model_copy(update={"country": geo.country_code}))
else:
enriched.append(ban)
bans = enriched
elif geo_enricher is not None:
bans = await _enrich_bans(bans, geo_enricher)
log.info("active_bans_fetched", total=len(bans))
return ActiveBanListResponse(bans=bans, total=len(bans))
def _parse_ban_entry(entry: str, jail: str) -> ActiveBan | None:
"""Parse a ban entry from ``get <jail> banip --with-time`` output.
Expected format::
"1.2.3.4 \t2025-01-01 12:00:00 + 3600 = 2025-01-01 13:00:00"
Args:
entry: Raw ban entry string.
jail: Jail name for the resulting record.
Returns:
An :class:`~app.models.ban.ActiveBan` or ``None`` if parsing fails.
"""
from datetime import UTC, datetime
try:
parts = entry.split("\t", 1)
ip = parts[0].strip()
# Validate IP
ipaddress.ip_address(ip)
if len(parts) < 2:
# Entry has no time info — return with unknown times.
return ActiveBan(
ip=ip,
jail=jail,
banned_at=None,
expires_at=None,
ban_count=1,
country=None,
)
time_part = parts[1].strip()
# Format: "2025-01-01 12:00:00 + 3600 = 2025-01-01 13:00:00"
# Split at " + " to get banned_at and remainder.
plus_idx = time_part.find(" + ")
if plus_idx == -1:
banned_at_str = time_part.strip()
expires_at_str: str | None = None
else:
banned_at_str = time_part[:plus_idx].strip()
remainder = time_part[plus_idx + 3 :] # skip " + "
eq_idx = remainder.find(" = ")
expires_at_str = remainder[eq_idx + 3 :].strip() if eq_idx != -1 else None
_date_fmt = "%Y-%m-%d %H:%M:%S"
def _to_iso(ts: str) -> str:
dt = datetime.strptime(ts, _date_fmt).replace(tzinfo=UTC)
return dt.isoformat()
banned_at_iso: str | None = None
expires_at_iso: str | None = None
with contextlib.suppress(ValueError):
banned_at_iso = _to_iso(banned_at_str)
with contextlib.suppress(ValueError):
if expires_at_str:
expires_at_iso = _to_iso(expires_at_str)
return ActiveBan(
ip=ip,
jail=jail,
banned_at=banned_at_iso,
expires_at=expires_at_iso,
ban_count=1,
country=None,
)
except (ValueError, IndexError, AttributeError) as exc:
log.debug("ban_entry_parse_error", entry=entry, jail=jail, error=str(exc))
return None
# ---------------------------------------------------------------------------
# Public API — Jail-specific paginated bans
# ---------------------------------------------------------------------------
#: Maximum allowed page size for :func:`get_jail_banned_ips`.
_MAX_PAGE_SIZE: int = 100
async def get_jail_banned_ips(
socket_path: str,
jail_name: str,
page: int = 1,
page_size: int = 25,
search: str | None = None,
http_session: Any | None = None,
app_db: Any | None = None,
) -> JailBannedIpsResponse:
"""Return a paginated list of currently banned IPs for a single jail.
Fetches the full ban list from the fail2ban socket, applies an optional
substring search filter on the IP, paginates server-side, and geo-enriches
**only** the current page slice to stay within rate limits.
Args:
socket_path: Path to the fail2ban Unix domain socket.
jail_name: Name of the jail to query.
page: 1-based page number (default 1).
page_size: Items per page; clamped to :data:`_MAX_PAGE_SIZE` (default 25).
search: Optional case-insensitive substring filter applied to IP addresses.
http_session: Optional shared :class:`aiohttp.ClientSession` for geo
enrichment via :func:`~app.services.geo_service.lookup_batch`.
app_db: Optional BanGUI application database for persistent geo cache.
Returns:
:class:`~app.models.ban.JailBannedIpsResponse` with the paginated bans.
Raises:
JailNotFoundError: If *jail_name* is not a known active jail.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket is
unreachable.
"""
from app.services import geo_service # noqa: PLC0415
# Clamp page_size to the allowed maximum.
page_size = min(page_size, _MAX_PAGE_SIZE)
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
# Verify the jail exists.
try:
_ok(await client.send(["status", jail_name, "short"]))
except ValueError as exc:
if _is_not_found_error(exc):
raise JailNotFoundError(jail_name) from exc
raise
# Fetch the full ban list for this jail.
try:
raw_result = _ok(await client.send(["get", jail_name, "banip", "--with-time"]))
except (ValueError, TypeError):
raw_result = []
ban_list: list[str] = raw_result or []
# Parse all entries.
all_bans: list[ActiveBan] = []
for entry in ban_list:
ban = _parse_ban_entry(str(entry), jail_name)
if ban is not None:
all_bans.append(ban)
# Apply optional substring search filter (case-insensitive).
if search:
search_lower = search.lower()
all_bans = [b for b in all_bans if search_lower in b.ip.lower()]
total = len(all_bans)
# Slice the requested page.
start = (page - 1) * page_size
page_bans = all_bans[start : start + page_size]
# Geo-enrich only the page slice.
if http_session is not None and page_bans:
page_ips = [b.ip for b in page_bans]
try:
geo_map = await geo_service.lookup_batch(page_ips, http_session, db=app_db)
except Exception: # noqa: BLE001
log.warning("jail_banned_ips_geo_failed", jail=jail_name)
geo_map = {}
enriched_page: list[ActiveBan] = []
for ban in page_bans:
geo = geo_map.get(ban.ip)
if geo is not None:
enriched_page.append(ban.model_copy(update={"country": geo.country_code}))
else:
enriched_page.append(ban)
page_bans = enriched_page
log.info(
"jail_banned_ips_fetched",
jail=jail_name,
total=total,
page=page,
page_size=page_size,
)
return JailBannedIpsResponse(
items=page_bans,
total=total,
page=page,
page_size=page_size,
)
async def _enrich_bans(
bans: list[ActiveBan],
geo_enricher: Any,
) -> list[ActiveBan]:
"""Enrich ban records with geo data asynchronously.
Args:
bans: The list of :class:`~app.models.ban.ActiveBan` records to enrich.
geo_enricher: Async callable ``(ip) → GeoInfo | None``.
Returns:
The same list with ``country`` fields populated where lookup succeeded.
"""
geo_results: list[Any] = await asyncio.gather(
*[geo_enricher(ban.ip) for ban in bans],
return_exceptions=True,
)
enriched: list[ActiveBan] = []
for ban, geo in zip(bans, geo_results, strict=False):
if geo is not None and not isinstance(geo, Exception):
enriched.append(ban.model_copy(update={"country": geo.country_code}))
else:
enriched.append(ban)
return enriched
# ---------------------------------------------------------------------------
# Public API — Ignore list (IP whitelist)
# ---------------------------------------------------------------------------
async def get_ignore_list(socket_path: str, name: str) -> list[str]:
"""Return the ignore list for a fail2ban jail.
Args:
socket_path: Path to the fail2ban Unix domain socket.
name: Jail name.
Returns:
List of IP addresses and CIDR networks on the jail's ignore list.
Raises:
JailNotFoundError: If *name* is not a known jail.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
try:
raw = _ok(await client.send(["get", name, "ignoreip"]))
return _ensure_list(raw)
except ValueError as exc:
if _is_not_found_error(exc):
raise JailNotFoundError(name) from exc
raise
async def add_ignore_ip(socket_path: str, name: str, ip: str) -> None:
"""Add an IP address or CIDR network to a jail's ignore list.
Args:
socket_path: Path to the fail2ban Unix domain socket.
name: Jail name.
ip: IP address or CIDR network to add.
Raises:
JailNotFoundError: If *name* is not a known jail.
JailOperationError: If fail2ban reports the operation failed.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
# Basic format validation.
try:
ipaddress.ip_network(ip, strict=False)
except ValueError as exc:
raise ValueError(f"Invalid IP address or network: {ip!r}") from exc
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
try:
_ok(await client.send(["set", name, "addignoreip", ip]))
log.info("ignore_ip_added", jail=name, ip=ip)
except ValueError as exc:
if _is_not_found_error(exc):
raise JailNotFoundError(name) from exc
raise JailOperationError(str(exc)) from exc
async def del_ignore_ip(socket_path: str, name: str, ip: str) -> None:
"""Remove an IP address or CIDR network from a jail's ignore list.
Args:
socket_path: Path to the fail2ban Unix domain socket.
name: Jail name.
ip: IP address or CIDR network to remove.
Raises:
JailNotFoundError: If *name* is not a known jail.
JailOperationError: If fail2ban reports the operation failed.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
try:
_ok(await client.send(["set", name, "delignoreip", ip]))
log.info("ignore_ip_removed", jail=name, ip=ip)
except ValueError as exc:
if _is_not_found_error(exc):
raise JailNotFoundError(name) from exc
raise JailOperationError(str(exc)) from exc
async def get_ignore_self(socket_path: str, name: str) -> bool:
"""Return whether a jail ignores the server's own IP addresses.
Args:
socket_path: Path to the fail2ban Unix domain socket.
name: Jail name.
Returns:
``True`` when ``ignoreself`` is enabled for the jail.
Raises:
JailNotFoundError: If *name* is not a known jail.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
try:
raw = _ok(await client.send(["get", name, "ignoreself"]))
return bool(raw)
except ValueError as exc:
if _is_not_found_error(exc):
raise JailNotFoundError(name) from exc
raise
async def set_ignore_self(socket_path: str, name: str, *, on: bool) -> None:
"""Toggle the ``ignoreself`` option for a fail2ban jail.
Args:
socket_path: Path to the fail2ban Unix domain socket.
name: Jail name.
on: ``True`` to enable ignoreself, ``False`` to disable.
Raises:
JailNotFoundError: If *name* is not a known jail.
JailOperationError: If fail2ban reports the operation failed.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
value = "true" if on else "false"
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
try:
_ok(await client.send(["set", name, "ignoreself", value]))
log.info("ignore_self_toggled", jail=name, on=on)
except ValueError as exc:
if _is_not_found_error(exc):
raise JailNotFoundError(name) from exc
raise JailOperationError(str(exc)) from exc
# ---------------------------------------------------------------------------
# Public API — IP lookup
# ---------------------------------------------------------------------------
async def lookup_ip(
socket_path: str,
ip: str,
geo_enricher: Any | None = None,
) -> dict[str, Any]:
"""Return ban status and history for a single IP address.
Checks every running jail for whether the IP is currently banned.
Also queries the fail2ban database for historical ban records.
Args:
socket_path: Path to the fail2ban Unix domain socket.
ip: IP address to look up.
geo_enricher: Optional async callable ``(ip) → GeoInfo | None``.
Returns:
A dict with keys:
* ``ip`` — the queried IP address
* ``currently_banned_in`` — list of jails where the IP is active
* ``geo`` — ``GeoInfo`` dataclass or ``None``
Raises:
ValueError: If *ip* is not a valid IP address.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
try:
ipaddress.ip_address(ip)
except ValueError as exc:
raise ValueError(f"Invalid IP address: {ip!r}") from exc
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
with contextlib.suppress(ValueError, Fail2BanConnectionError):
# Use fail2ban's "banned <ip>" command which checks all jails.
_ok(await client.send(["get", "--all", "banned", ip]))
# Fetch jail names from status.
global_status = _to_dict(_ok(await client.send(["status"])))
jail_list_raw: str = str(global_status.get("Jail list", "") or "").strip()
jail_names: list[str] = (
[j.strip() for j in jail_list_raw.split(",") if j.strip()]
if jail_list_raw
else []
)
# Check ban status per jail in parallel.
ban_results: list[Any] = await asyncio.gather(
*[client.send(["get", jn, "banip"]) for jn in jail_names],
return_exceptions=True,
)
currently_banned_in: list[str] = []
for jail_name, result in zip(jail_names, ban_results, strict=False):
if isinstance(result, Exception):
continue
try:
ban_list: list[str] = _ok(result) or []
if ip in ban_list:
currently_banned_in.append(jail_name)
except (ValueError, TypeError):
pass
geo = None
if geo_enricher is not None:
with contextlib.suppress(Exception): # noqa: BLE001
geo = await geo_enricher(ip)
log.info("ip_lookup_completed", ip=ip, banned_in_jails=currently_banned_in)
return {
"ip": ip,
"currently_banned_in": currently_banned_in,
"geo": geo,
}
async def unban_all_ips(socket_path: str) -> int:
"""Unban every currently banned IP across all fail2ban jails.
Uses fail2ban's global ``unban --all`` command, which atomically removes
every active ban from every jail in a single socket round-trip.
Args:
socket_path: Path to the fail2ban Unix domain socket.
Returns:
The number of IP addresses that were unbanned.
Raises:
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
count: int = int(_ok(await client.send(["unban", "--all"])))
log.info("all_ips_unbanned", count=count)
return count