Files
BanGUI/backend/app/services/ban_service.py
Lukas ddfc8a0b02 Optimise geo lookup and aggregation for 10k+ IPs
- Add persistent geo_cache SQLite table (db.py)
- Rewrite geo_service: batch API (100 IPs/call), two-tier cache,
  no caching of failed lookups so they are retried
- Pre-warm geo cache from DB on startup (main.py lifespan)
- Rewrite bans_by_country: SQL GROUP BY ip aggregation + lookup_batch
  instead of 2000-row fetch + asyncio.gather individual calls
- Pre-warm geo cache after blocklist import (blocklist_service)
- Add 300ms debounce to useMapData hook to cancel stale requests
- Add perf benchmark asserting <2s for 10k bans
- Add seed_10k_bans.py script for manual perf testing
2026-03-07 20:28:51 +01:00

446 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Ban service.
Queries the fail2ban SQLite database for ban history. The fail2ban database
path is obtained at runtime by sending ``get dbfile`` to the fail2ban daemon
via the Unix domain socket.
All database I/O is performed through aiosqlite opened in **read-only** mode
so BanGUI never modifies or locks the fail2ban database.
"""
from __future__ import annotations
import json
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any
import aiosqlite
import structlog
from app.models.ban import (
BLOCKLIST_JAIL,
TIME_RANGE_SECONDS,
BanOrigin,
BansByCountryResponse,
DashboardBanItem,
DashboardBanListResponse,
TimeRange,
_derive_origin,
)
from app.utils.fail2ban_client import Fail2BanClient
if TYPE_CHECKING:
import aiohttp
log: structlog.stdlib.BoundLogger = structlog.get_logger()
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_DEFAULT_PAGE_SIZE: int = 100
_MAX_PAGE_SIZE: int = 500
_SOCKET_TIMEOUT: float = 5.0
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _origin_sql_filter(origin: BanOrigin | None) -> tuple[str, tuple[str, ...]]:
"""Return a SQL fragment and its parameters for the origin filter.
Args:
origin: ``"blocklist"`` to restrict to the blocklist-import jail,
``"selfblock"`` to exclude it, or ``None`` for no restriction.
Returns:
A ``(sql_fragment, params)`` pair — the fragment starts with ``" AND"``
so it can be appended directly to an existing WHERE clause.
"""
if origin == "blocklist":
return " AND jail = ?", (BLOCKLIST_JAIL,)
if origin == "selfblock":
return " AND jail != ?", (BLOCKLIST_JAIL,)
return "", ()
def _since_unix(range_: TimeRange) -> int:
"""Return the Unix timestamp representing the start of the time window.
Args:
range_: One of the supported time-range presets.
Returns:
Unix timestamp (seconds since epoch) equal to *now range_*.
"""
seconds: int = TIME_RANGE_SECONDS[range_]
return int(datetime.now(tz=UTC).timestamp()) - seconds
def _ts_to_iso(unix_ts: int) -> str:
"""Convert a Unix timestamp to an ISO 8601 UTC string.
Args:
unix_ts: Seconds since the Unix epoch.
Returns:
ISO 8601 UTC timestamp, e.g. ``"2026-03-01T12:00:00+00:00"``.
"""
return datetime.fromtimestamp(unix_ts, tz=UTC).isoformat()
async def _get_fail2ban_db_path(socket_path: str) -> str:
"""Query fail2ban for the path to its SQLite database.
Sends the ``get dbfile`` command via the fail2ban socket and returns
the value of the ``dbfile`` setting.
Args:
socket_path: Path to the fail2ban Unix domain socket.
Returns:
Absolute path to the fail2ban SQLite database file.
Raises:
RuntimeError: If fail2ban reports that no database is configured
or if the socket response is unexpected.
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
async with Fail2BanClient(socket_path, timeout=_SOCKET_TIMEOUT) as client:
response = await client.send(["get", "dbfile"])
try:
code, data = response
except (TypeError, ValueError) as exc:
raise RuntimeError(f"Unexpected response from fail2ban: {response!r}") from exc
if code != 0:
raise RuntimeError(f"fail2ban error code {code}: {data!r}")
if data is None:
raise RuntimeError("fail2ban has no database configured (dbfile is None)")
return str(data)
def _parse_data_json(raw: Any) -> tuple[list[str], int]:
"""Extract matches and failure count from the ``bans.data`` column.
The ``data`` column stores a JSON blob with optional keys:
* ``matches`` — list of raw matched log lines.
* ``failures`` — total failure count that triggered the ban.
Args:
raw: The raw ``data`` column value (string, dict, or ``None``).
Returns:
A ``(matches, failures)`` tuple. Both default to empty/zero when
parsing fails or the column is absent.
"""
if raw is None:
return [], 0
obj: dict[str, Any] = {}
if isinstance(raw, str):
try:
parsed: Any = json.loads(raw)
if isinstance(parsed, dict):
obj = parsed
# json.loads("null") → None, or other non-dict — treat as empty
except json.JSONDecodeError:
return [], 0
elif isinstance(raw, dict):
obj = raw
matches: list[str] = [str(m) for m in (obj.get("matches") or [])]
failures: int = int(obj.get("failures", 0))
return matches, failures
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def list_bans(
socket_path: str,
range_: TimeRange,
*,
page: int = 1,
page_size: int = _DEFAULT_PAGE_SIZE,
geo_enricher: Any | None = None,
origin: BanOrigin | None = None,
) -> DashboardBanListResponse:
"""Return a paginated list of bans within the selected time window.
Queries the fail2ban database ``bans`` table for records whose
``timeofban`` falls within the specified *range_*. Results are ordered
newest-first.
Args:
socket_path: Path to the fail2ban Unix domain socket.
range_: Time-range preset (``"24h"``, ``"7d"``, ``"30d"``, or
``"365d"``).
page: 1-based page number (default: ``1``).
page_size: Maximum items per page, capped at ``_MAX_PAGE_SIZE``
(default: ``100``).
geo_enricher: Optional async callable ``(ip: str) -> GeoInfo | None``.
When supplied every result is enriched with country and ASN data.
origin: Optional origin filter — ``"blocklist"`` restricts results to
the ``blocklist-import`` jail, ``"selfblock"`` excludes it.
Returns:
:class:`~app.models.ban.DashboardBanListResponse` containing the
paginated items and total count.
"""
since: int = _since_unix(range_)
effective_page_size: int = min(page_size, _MAX_PAGE_SIZE)
offset: int = (page - 1) * effective_page_size
origin_clause, origin_params = _origin_sql_filter(origin)
db_path: str = await _get_fail2ban_db_path(socket_path)
log.info(
"ban_service_list_bans",
db_path=db_path,
since=since,
range=range_,
origin=origin,
)
async with aiosqlite.connect(f"file:{db_path}?mode=ro", uri=True) as f2b_db:
f2b_db.row_factory = aiosqlite.Row
async with f2b_db.execute(
"SELECT COUNT(*) FROM bans WHERE timeofban >= ?" + origin_clause,
(since, *origin_params),
) as cur:
count_row = await cur.fetchone()
total: int = int(count_row[0]) if count_row else 0
async with f2b_db.execute(
"SELECT jail, ip, timeofban, bancount, data "
"FROM bans "
"WHERE timeofban >= ?"
+ origin_clause
+ " ORDER BY timeofban DESC "
"LIMIT ? OFFSET ?",
(since, *origin_params, effective_page_size, offset),
) as cur:
rows = await cur.fetchall()
items: list[DashboardBanItem] = []
for row in rows:
jail: str = str(row["jail"])
ip: str = str(row["ip"])
banned_at: str = _ts_to_iso(int(row["timeofban"]))
ban_count: int = int(row["bancount"])
matches, _ = _parse_data_json(row["data"])
service: str | None = matches[0] if matches else None
country_code: str | None = None
country_name: str | None = None
asn: str | None = None
org: str | None = None
if geo_enricher is not None:
try:
geo = await geo_enricher(ip)
if geo is not None:
country_code = geo.country_code
country_name = geo.country_name
asn = geo.asn
org = geo.org
except Exception: # noqa: BLE001
log.warning("ban_service_geo_lookup_failed", ip=ip)
items.append(
DashboardBanItem(
ip=ip,
jail=jail,
banned_at=banned_at,
service=service,
country_code=country_code,
country_name=country_name,
asn=asn,
org=org,
ban_count=ban_count,
origin=_derive_origin(jail),
)
)
return DashboardBanListResponse(
items=items,
total=total,
page=page,
page_size=effective_page_size,
)
# ---------------------------------------------------------------------------
# bans_by_country
# ---------------------------------------------------------------------------
#: Maximum rows returned in the companion table alongside the map.
_MAX_COMPANION_BANS: int = 200
async def bans_by_country(
socket_path: str,
range_: TimeRange,
http_session: aiohttp.ClientSession | None = None,
geo_enricher: Any | None = None,
app_db: aiosqlite.Connection | None = None,
origin: BanOrigin | None = None,
) -> BansByCountryResponse:
"""Aggregate ban counts per country for the selected time window.
Uses a two-step strategy optimised for large datasets:
1. Queries the fail2ban DB with ``GROUP BY ip`` to get the per-IP ban
counts for all unique IPs in the window — no row-count cap.
2. Batch-resolves every unique IP via :func:`~app.services.geo_service.lookup_batch`
(100 IPs per HTTP call) instead of one-at-a-time lookups.
3. Returns a ``{country_code: count}`` aggregation and the 200 most
recent raw rows (already geo-cached from step 2) for the companion
table.
Args:
socket_path: Path to the fail2ban Unix domain socket.
range_: Time-range preset.
http_session: Optional :class:`aiohttp.ClientSession` for batch
geo lookups. When provided, :func:`geo_service.lookup_batch`
is used instead of the *geo_enricher* callable.
geo_enricher: Legacy async ``(ip) -> GeoInfo | None`` callable;
used when *http_session* is ``None``.
app_db: Optional BanGUI application database used to persist newly
resolved geo entries across restarts.
origin: Optional origin filter — ``"blocklist"`` restricts results to
the ``blocklist-import`` jail, ``"selfblock"`` excludes it.
Returns:
:class:`~app.models.ban.BansByCountryResponse` with per-country
aggregation and the companion ban list.
"""
import asyncio
from app.services import geo_service # noqa: PLC0415
since: int = _since_unix(range_)
origin_clause, origin_params = _origin_sql_filter(origin)
db_path: str = await _get_fail2ban_db_path(socket_path)
log.info(
"ban_service_bans_by_country",
db_path=db_path,
since=since,
range=range_,
origin=origin,
)
async with aiosqlite.connect(f"file:{db_path}?mode=ro", uri=True) as f2b_db:
f2b_db.row_factory = aiosqlite.Row
# Total count for the window.
async with f2b_db.execute(
"SELECT COUNT(*) FROM bans WHERE timeofban >= ?" + origin_clause,
(since, *origin_params),
) as cur:
count_row = await cur.fetchone()
total: int = int(count_row[0]) if count_row else 0
# Aggregation: unique IPs + their total event count.
# No LIMIT here — we need all unique source IPs for accurate country counts.
async with f2b_db.execute(
"SELECT ip, COUNT(*) AS event_count "
"FROM bans "
"WHERE timeofban >= ?"
+ origin_clause
+ " GROUP BY ip",
(since, *origin_params),
) as cur:
agg_rows = await cur.fetchall()
# Companion table: most recent raw rows for display alongside the map.
async with f2b_db.execute(
"SELECT jail, ip, timeofban, bancount, data "
"FROM bans "
"WHERE timeofban >= ?"
+ origin_clause
+ " ORDER BY timeofban DESC "
"LIMIT ?",
(since, *origin_params, _MAX_COMPANION_BANS),
) as cur:
companion_rows = await cur.fetchall()
# Batch-resolve all unique IPs (much faster than individual lookups).
unique_ips: list[str] = [str(r["ip"]) for r in agg_rows]
geo_map: dict[str, Any] = {}
if http_session is not None and unique_ips:
try:
geo_map = await geo_service.lookup_batch(unique_ips, http_session, db=app_db)
except Exception as exc: # noqa: BLE001
log.warning("ban_service_batch_geo_failed", error=str(exc))
elif geo_enricher is not None and unique_ips:
# Fallback: legacy per-IP enricher (used in tests / older callers).
async def _safe_lookup(ip: str) -> tuple[str, Any]:
try:
return ip, await geo_enricher(ip)
except Exception: # noqa: BLE001
log.warning("ban_service_geo_lookup_failed", ip=ip)
return ip, None
results = await asyncio.gather(*(_safe_lookup(ip) for ip in unique_ips))
geo_map = dict(results)
# Build country aggregation from the SQL-grouped rows.
countries: dict[str, int] = {}
country_names: dict[str, str] = {}
for row in agg_rows:
ip: str = str(row["ip"])
geo = geo_map.get(ip)
cc: str | None = geo.country_code if geo else None
cn: str | None = geo.country_name if geo else None
event_count: int = int(row["event_count"])
if cc:
countries[cc] = countries.get(cc, 0) + event_count
if cn and cc not in country_names:
country_names[cc] = cn
# Build companion table from recent rows (geo already cached from batch step).
bans: list[DashboardBanItem] = []
for row in companion_rows:
ip = str(row["ip"])
geo = geo_map.get(ip)
cc = geo.country_code if geo else None
cn = geo.country_name if geo else None
asn: str | None = geo.asn if geo else None
org: str | None = geo.org if geo else None
matches, _ = _parse_data_json(row["data"])
bans.append(
DashboardBanItem(
ip=ip,
jail=str(row["jail"]),
banned_at=_ts_to_iso(int(row["timeofban"])),
service=matches[0] if matches else None,
country_code=cc,
country_name=cn,
asn=asn,
org=org,
ban_count=int(row["bancount"]),
origin=_derive_origin(str(row["jail"])),
)
)
return BansByCountryResponse(
countries=countries,
country_names=country_names,
bans=bans,
total=total,
)