Task 13: move ban_ip, unban_ip, and get_active_bans from jail_service to ban_service and update routers/tests
This commit is contained in:
@@ -11,11 +11,14 @@ so BanGUI never modifies or locks the fail2ban database.
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
import ipaddress
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
import structlog
|
||||
|
||||
from app.exceptions import JailNotFoundError, JailOperationError
|
||||
from app.models.ban import (
|
||||
BLOCKLIST_JAIL,
|
||||
BUCKET_SECONDS,
|
||||
@@ -26,6 +29,8 @@ from app.models.ban import (
|
||||
BansByJailResponse,
|
||||
BanTrendBucket,
|
||||
BanTrendResponse,
|
||||
ActiveBan,
|
||||
ActiveBanListResponse,
|
||||
DashboardBanItem,
|
||||
DashboardBanListResponse,
|
||||
TimeRange,
|
||||
@@ -42,6 +47,10 @@ from app.repositories.history_archive_repo import (
|
||||
)
|
||||
from app.services.fail2ban_metadata_service import default_fail2ban_metadata_service
|
||||
from app.utils.fail2ban_db_utils import parse_data_json, ts_to_iso
|
||||
from app.utils.fail2ban_client import (
|
||||
Fail2BanClient,
|
||||
Fail2BanResponse,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import aiohttp
|
||||
@@ -70,6 +79,114 @@ _SOCKET_TIMEOUT: float = 5.0
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _ok(response: object) -> object:
|
||||
"""Extract the payload from a fail2ban ``(return_code, data)`` response.
|
||||
|
||||
Args:
|
||||
response: Raw value returned by :meth:`~Fail2BanClient.send`.
|
||||
|
||||
Returns:
|
||||
The payload ``data`` portion of the response.
|
||||
|
||||
Raises:
|
||||
ValueError: If the response indicates an error (return code ≠ 0).
|
||||
"""
|
||||
try:
|
||||
code, data = response # type: ignore[assignment]
|
||||
except (TypeError, ValueError) as exc:
|
||||
raise ValueError(f"Unexpected fail2ban response shape: {response!r}") from exc
|
||||
|
||||
if code != 0:
|
||||
raise ValueError(f"fail2ban returned error code {code}: {data!r}")
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def _to_dict(pairs: object) -> dict[str, object]:
|
||||
"""Convert a list of ``(key, value)`` pairs to a plain dict.
|
||||
|
||||
Args:
|
||||
pairs: A list of ``(key, value)`` pairs (or any iterable thereof).
|
||||
|
||||
Returns:
|
||||
A :class:`dict` with the keys and values from *pairs*.
|
||||
"""
|
||||
if not isinstance(pairs, (list, tuple)):
|
||||
return {}
|
||||
result: dict[str, object] = {}
|
||||
for item in pairs:
|
||||
try:
|
||||
k, v = item
|
||||
result[str(k)] = v
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
return result
|
||||
|
||||
|
||||
def _ensure_list(value: object | None) -> list[str]:
|
||||
"""Coerce a fail2ban response value to a list of strings."""
|
||||
if value is None:
|
||||
return []
|
||||
if isinstance(value, str):
|
||||
return [value] if value.strip() else []
|
||||
if isinstance(value, (list, tuple)):
|
||||
return [str(v) for v in value if v is not None]
|
||||
return [str(value)]
|
||||
|
||||
|
||||
def _is_not_found_error(exc: Exception) -> bool:
|
||||
"""Return ``True`` if *exc* indicates a jail does not exist."""
|
||||
msg = str(exc).lower()
|
||||
return any(
|
||||
phrase in msg
|
||||
for phrase in (
|
||||
"unknown jail",
|
||||
"unknownjail",
|
||||
"no jail",
|
||||
"does not exist",
|
||||
"not found",
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
async def ban_ip(socket_path: str, jail: str, ip: str) -> None:
|
||||
"""Ban an IP address in the specified jail."""
|
||||
try:
|
||||
ipaddress.ip_address(ip)
|
||||
except ValueError as exc:
|
||||
raise ValueError(f"Invalid IP address: {ip!r}") from exc
|
||||
|
||||
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
||||
|
||||
try:
|
||||
_ok(await client.send(["set", jail, "banip", ip]))
|
||||
except ValueError as exc:
|
||||
if _is_not_found_error(exc):
|
||||
raise JailNotFoundError(jail) from exc
|
||||
raise JailOperationError(str(exc)) from exc
|
||||
|
||||
|
||||
async def unban_ip(socket_path: str, ip: str, jail: str | None = None) -> None:
|
||||
"""Unban an IP address from a specific jail or all jails."""
|
||||
try:
|
||||
ipaddress.ip_address(ip)
|
||||
except ValueError as exc:
|
||||
raise ValueError(f"Invalid IP address: {ip!r}") from exc
|
||||
|
||||
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
||||
|
||||
if jail is None:
|
||||
_ok(await client.send(["unban", ip]))
|
||||
return
|
||||
|
||||
try:
|
||||
_ok(await client.send(["set", jail, "unbanip", ip]))
|
||||
except ValueError as exc:
|
||||
if _is_not_found_error(exc):
|
||||
raise JailNotFoundError(jail) from exc
|
||||
raise JailOperationError(str(exc)) from exc
|
||||
|
||||
|
||||
def _origin_sql_filter(origin: BanOrigin | None) -> tuple[str, tuple[str, ...]]:
|
||||
"""Return a SQL fragment and its parameters for the origin filter.
|
||||
|
||||
@@ -88,6 +205,191 @@ def _origin_sql_filter(origin: BanOrigin | None) -> tuple[str, tuple[str, ...]]:
|
||||
return "", ()
|
||||
|
||||
|
||||
def _parse_ban_entry(entry: str, jail: str) -> ActiveBan | None:
|
||||
"""Parse a ban entry from ``get <jail> banip --with-time`` output."""
|
||||
from datetime import UTC, datetime
|
||||
|
||||
try:
|
||||
parts = entry.split("\t", 1)
|
||||
ip = parts[0].strip()
|
||||
|
||||
ipaddress.ip_address(ip)
|
||||
|
||||
if len(parts) < 2:
|
||||
return ActiveBan(
|
||||
ip=ip,
|
||||
jail=jail,
|
||||
banned_at=None,
|
||||
expires_at=None,
|
||||
ban_count=1,
|
||||
country=None,
|
||||
)
|
||||
|
||||
time_part = parts[1].strip()
|
||||
plus_idx = time_part.find(" + ")
|
||||
if plus_idx == -1:
|
||||
banned_at_str = time_part.strip()
|
||||
expires_at_str: str | None = None
|
||||
else:
|
||||
banned_at_str = time_part[:plus_idx].strip()
|
||||
remainder = time_part[plus_idx + 3 :]
|
||||
eq_idx = remainder.find(" = ")
|
||||
expires_at_str = remainder[eq_idx + 3 :].strip() if eq_idx != -1 else None
|
||||
|
||||
_date_fmt = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
def _to_iso(ts: str) -> str:
|
||||
dt = datetime.strptime(ts, _date_fmt).replace(tzinfo=UTC)
|
||||
return dt.isoformat()
|
||||
|
||||
banned_at_iso: str | None = None
|
||||
expires_at_iso: str | None = None
|
||||
|
||||
with contextlib.suppress(ValueError):
|
||||
banned_at_iso = _to_iso(banned_at_str)
|
||||
|
||||
with contextlib.suppress(ValueError):
|
||||
if expires_at_str:
|
||||
expires_at_iso = _to_iso(expires_at_str)
|
||||
|
||||
return ActiveBan(
|
||||
ip=ip,
|
||||
jail=jail,
|
||||
banned_at=banned_at_iso,
|
||||
expires_at=expires_at_iso,
|
||||
ban_count=1,
|
||||
country=None,
|
||||
)
|
||||
except (ValueError, IndexError, AttributeError) as exc:
|
||||
log.debug("ban_entry_parse_error", entry=entry, jail=jail, error=str(exc))
|
||||
return None
|
||||
|
||||
|
||||
async def _enrich_bans(
|
||||
bans: list[ActiveBan],
|
||||
geo_enricher: GeoEnricher,
|
||||
) -> list[ActiveBan]:
|
||||
"""Enrich ban records with geo data asynchronously."""
|
||||
geo_results: list[object | Exception] = await asyncio.gather(
|
||||
*[cast("Awaitable[object]", geo_enricher(ban.ip)) for ban in bans],
|
||||
return_exceptions=True,
|
||||
)
|
||||
enriched: list[ActiveBan] = []
|
||||
for ban, geo in zip(bans, geo_results, strict=False):
|
||||
if geo is not None and not isinstance(geo, Exception):
|
||||
geo_info = cast("GeoInfo", geo)
|
||||
enriched.append(ban.model_copy(update={"country": geo_info.country_code}))
|
||||
else:
|
||||
enriched.append(ban)
|
||||
return enriched
|
||||
|
||||
|
||||
async def get_active_bans(
|
||||
socket_path: str,
|
||||
geo_batch_lookup: GeoBatchLookup | None = None,
|
||||
geo_enricher: GeoEnricher | None = None,
|
||||
http_session: aiohttp.ClientSession | None = None,
|
||||
app_db: aiosqlite.Connection | None = None,
|
||||
) -> ActiveBanListResponse:
|
||||
"""Return all currently banned IPs across every jail.
|
||||
|
||||
For each jail the ``get <jail> banip --with-time`` command is used
|
||||
to retrieve ban start and expiry times alongside the IP address.
|
||||
|
||||
Geo enrichment strategy (highest priority first):
|
||||
|
||||
1. When *http_session* is provided the entire set of banned IPs is resolved
|
||||
in a single :func:`~app.services.geo_service.lookup_batch` call (up to
|
||||
100 IPs per HTTP request). This is far more efficient than concurrent
|
||||
per-IP lookups and stays within ip-api.com rate limits.
|
||||
2. When only *geo_enricher* is provided (legacy / test path) each IP is
|
||||
resolved individually via the supplied async callable.
|
||||
|
||||
Args:
|
||||
socket_path: Path to the fail2ban Unix domain socket.
|
||||
geo_enricher: Optional async callable ``(ip) -> GeoInfo | None``.
|
||||
Used to enrich each ban entry with country and ASN data.
|
||||
Ignored when *http_session* is provided.
|
||||
http_session: Optional shared :class:`aiohttp.ClientSession`. When
|
||||
provided, :func:`~app.services.geo_service.lookup_batch` is used
|
||||
for efficient bulk geo resolution.
|
||||
app_db: Optional BanGUI application database connection used to
|
||||
persist newly resolved geo entries across restarts. Only
|
||||
meaningful when *http_session* is provided.
|
||||
|
||||
Returns:
|
||||
:class:`~app.models.ban.ActiveBanListResponse` with all active bans.
|
||||
|
||||
Raises:
|
||||
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
||||
cannot be reached.
|
||||
"""
|
||||
|
||||
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
||||
|
||||
global_status = _to_dict(_ok(await client.send(["status"])))
|
||||
jail_list_raw: str = str(global_status.get("Jail list", "") or "").strip()
|
||||
jail_names: list[str] = (
|
||||
[j.strip() for j in jail_list_raw.split(",") if j.strip()]
|
||||
if jail_list_raw
|
||||
else []
|
||||
)
|
||||
|
||||
if not jail_names:
|
||||
return ActiveBanListResponse(bans=[], total=0)
|
||||
|
||||
results: list[object | Exception] = await asyncio.gather(
|
||||
*[client.send(["get", jn, "banip", "--with-time"]) for jn in jail_names],
|
||||
return_exceptions=True,
|
||||
)
|
||||
|
||||
bans: list[ActiveBan] = []
|
||||
for jail_name, raw_result in zip(jail_names, results, strict=False):
|
||||
if isinstance(raw_result, Exception):
|
||||
log.warning(
|
||||
"active_bans_fetch_error",
|
||||
jail=jail_name,
|
||||
error=str(raw_result),
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
ban_list: list[str] = cast("list[str]", _ok(raw_result)) or []
|
||||
except (TypeError, ValueError) as exc:
|
||||
log.warning(
|
||||
"active_bans_parse_error",
|
||||
jail=jail_name,
|
||||
error=str(exc),
|
||||
)
|
||||
continue
|
||||
|
||||
for entry in ban_list:
|
||||
ban = _parse_ban_entry(str(entry), jail_name)
|
||||
if ban is not None:
|
||||
bans.append(ban)
|
||||
|
||||
if http_session is not None and bans and geo_batch_lookup is not None:
|
||||
all_ips: list[str] = [ban.ip for ban in bans]
|
||||
try:
|
||||
geo_map = await geo_batch_lookup(all_ips, http_session, db=app_db)
|
||||
except Exception: # noqa: BLE001
|
||||
log.warning("active_bans_batch_geo_failed")
|
||||
geo_map = {}
|
||||
enriched: list[ActiveBan] = []
|
||||
for ban in bans:
|
||||
geo = geo_map.get(ban.ip)
|
||||
if geo is not None:
|
||||
enriched.append(ban.model_copy(update={"country": geo.country_code}))
|
||||
else:
|
||||
enriched.append(ban)
|
||||
bans = enriched
|
||||
elif geo_enricher is not None:
|
||||
bans = await _enrich_bans(bans, geo_enricher)
|
||||
|
||||
log.info("active_bans_fetched", total=len(bans))
|
||||
return ActiveBanListResponse(bans=bans, total=len(bans))
|
||||
|
||||
|
||||
_TIME_RANGE_SLACK_SECONDS: int = 60
|
||||
|
||||
|
||||
@@ -502,7 +804,7 @@ async def bans_by_country(
|
||||
country_names[cc] = cn
|
||||
|
||||
# Build companion table from recent rows (geo already cached from batch step).
|
||||
bans: list[DashboardBanItem] = []
|
||||
bans: list[ActiveBan] = []
|
||||
for companion_row in companion_rows:
|
||||
if source == "archive":
|
||||
ip = companion_row["ip"]
|
||||
|
||||
Reference in New Issue
Block a user