Make geo lookups non-blocking with bulk DB writes and background tasks
This commit is contained in:
@@ -20,7 +20,7 @@ from fastapi import APIRouter, HTTPException, Request, status
|
||||
from app.dependencies import AuthDep
|
||||
from app.models.ban import ActiveBanListResponse, BanRequest, UnbanAllResponse, UnbanRequest
|
||||
from app.models.jail import JailCommandResponse
|
||||
from app.services import geo_service, jail_service
|
||||
from app.services import jail_service
|
||||
from app.services.jail_service import JailNotFoundError, JailOperationError
|
||||
from app.utils.fail2ban_client import Fail2BanConnectionError
|
||||
|
||||
@@ -68,12 +68,14 @@ async def get_active_bans(
|
||||
"""
|
||||
socket_path: str = request.app.state.settings.fail2ban_socket
|
||||
http_session: aiohttp.ClientSession = request.app.state.http_session
|
||||
|
||||
async def _enricher(ip: str) -> geo_service.GeoInfo | None:
|
||||
return await geo_service.lookup(ip, http_session)
|
||||
app_db = request.app.state.db
|
||||
|
||||
try:
|
||||
return await jail_service.get_active_bans(socket_path, geo_enricher=_enricher)
|
||||
return await jail_service.get_active_bans(
|
||||
socket_path,
|
||||
http_session=http_session,
|
||||
app_db=app_db,
|
||||
)
|
||||
except Fail2BanConnectionError as exc:
|
||||
raise _bad_gateway(exc) from exc
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ so BanGUI never modifies or locks the fail2ban database.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from datetime import UTC, datetime
|
||||
from typing import TYPE_CHECKING, Any
|
||||
@@ -344,20 +345,26 @@ async def bans_by_country(
|
||||
|
||||
1. Queries the fail2ban DB with ``GROUP BY ip`` to get the per-IP ban
|
||||
counts for all unique IPs in the window — no row-count cap.
|
||||
2. Batch-resolves every unique IP via :func:`~app.services.geo_service.lookup_batch`
|
||||
(100 IPs per HTTP call) instead of one-at-a-time lookups.
|
||||
2. Serves geo data from the in-memory cache only (non-blocking).
|
||||
Any IPs not yet in the cache are scheduled for background resolution
|
||||
via :func:`asyncio.create_task` so the response is returned immediately
|
||||
and subsequent requests benefit from the warmed cache.
|
||||
3. Returns a ``{country_code: count}`` aggregation and the 200 most
|
||||
recent raw rows (already geo-cached from step 2) for the companion
|
||||
table.
|
||||
recent raw rows for the companion table.
|
||||
|
||||
Note:
|
||||
On the very first request a large number of IPs may be uncached and
|
||||
the country map will be sparse. The background task will resolve them
|
||||
and the next request will return a complete map. This trade-off keeps
|
||||
the endpoint fast regardless of dataset size.
|
||||
|
||||
Args:
|
||||
socket_path: Path to the fail2ban Unix domain socket.
|
||||
range_: Time-range preset.
|
||||
http_session: Optional :class:`aiohttp.ClientSession` for batch
|
||||
geo lookups. When provided, :func:`geo_service.lookup_batch`
|
||||
is used instead of the *geo_enricher* callable.
|
||||
http_session: Optional :class:`aiohttp.ClientSession` for background
|
||||
geo lookups. When ``None``, only cached data is used.
|
||||
geo_enricher: Legacy async ``(ip) -> GeoInfo | None`` callable;
|
||||
used when *http_session* is ``None``.
|
||||
used when *http_session* is ``None`` (e.g. tests).
|
||||
app_db: Optional BanGUI application database used to persist newly
|
||||
resolved geo entries across restarts.
|
||||
origin: Optional origin filter — ``"blocklist"`` restricts results to
|
||||
@@ -367,8 +374,6 @@ async def bans_by_country(
|
||||
:class:`~app.models.ban.BansByCountryResponse` with per-country
|
||||
aggregation and the companion ban list.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
from app.services import geo_service # noqa: PLC0415
|
||||
|
||||
since: int = _since_unix(range_)
|
||||
@@ -417,15 +422,26 @@ async def bans_by_country(
|
||||
) as cur:
|
||||
companion_rows = await cur.fetchall()
|
||||
|
||||
# Batch-resolve all unique IPs (much faster than individual lookups).
|
||||
unique_ips: list[str] = [str(r["ip"]) for r in agg_rows]
|
||||
geo_map: dict[str, Any] = {}
|
||||
|
||||
if http_session is not None and unique_ips:
|
||||
try:
|
||||
geo_map = await geo_service.lookup_batch(unique_ips, http_session, db=app_db)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("ban_service_batch_geo_failed", error=str(exc))
|
||||
# Serve only what is already in the in-memory cache — no API calls on
|
||||
# the hot path. Uncached IPs are resolved asynchronously in the
|
||||
# background so subsequent requests benefit from a warmer cache.
|
||||
geo_map, uncached = geo_service.lookup_cached_only(unique_ips)
|
||||
if uncached:
|
||||
log.info(
|
||||
"ban_service_geo_background_scheduled",
|
||||
uncached=len(uncached),
|
||||
cached=len(geo_map),
|
||||
)
|
||||
# Fire-and-forget: lookup_batch handles rate-limiting / retries.
|
||||
# The dirty-set flush task persists results to the DB.
|
||||
asyncio.create_task( # noqa: RUF006
|
||||
geo_service.lookup_batch(uncached, http_session, db=app_db),
|
||||
name="geo_bans_by_country",
|
||||
)
|
||||
elif geo_enricher is not None and unique_ips:
|
||||
# Fallback: legacy per-IP enricher (used in tests / older callers).
|
||||
async def _safe_lookup(ip: str) -> tuple[str, Any]:
|
||||
|
||||
@@ -435,6 +435,41 @@ async def lookup(
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def lookup_cached_only(
|
||||
ips: list[str],
|
||||
) -> tuple[dict[str, GeoInfo], list[str]]:
|
||||
"""Return cached geo data for *ips* without making any external API calls.
|
||||
|
||||
Used by callers that want to return a fast response using only what is
|
||||
already in memory, while deferring resolution of uncached IPs to a
|
||||
background task.
|
||||
|
||||
Args:
|
||||
ips: IP address strings to look up.
|
||||
|
||||
Returns:
|
||||
A ``(geo_map, uncached)`` tuple where *geo_map* maps every IP that
|
||||
was already in the in-memory cache to its :class:`GeoInfo`, and
|
||||
*uncached* is the list of IPs that were not found in the cache.
|
||||
Entries in the negative cache (recently failed) are **not** included
|
||||
in *uncached* so they are not re-queued immediately.
|
||||
"""
|
||||
geo_map: dict[str, GeoInfo] = {}
|
||||
uncached: list[str] = []
|
||||
now = time.monotonic()
|
||||
|
||||
for ip in dict.fromkeys(ips): # deduplicate, preserve order
|
||||
if ip in _cache:
|
||||
geo_map[ip] = _cache[ip]
|
||||
elif ip in _neg_cache and (now - _neg_cache[ip]) < _NEG_CACHE_TTL:
|
||||
# Still within the cool-down window — do not re-queue.
|
||||
pass
|
||||
else:
|
||||
uncached.append(ip)
|
||||
|
||||
return geo_map, uncached
|
||||
|
||||
|
||||
async def lookup_batch(
|
||||
ips: list[str],
|
||||
http_session: aiohttp.ClientSession,
|
||||
@@ -447,7 +482,9 @@ async def lookup_batch(
|
||||
``http://ip-api.com/batch`` in chunks of up to :data:`_BATCH_SIZE`.
|
||||
|
||||
Only successful resolutions (``country_code is not None``) are written to
|
||||
the persistent cache when *db* is provided.
|
||||
the persistent cache when *db* is provided. Both positive and negative
|
||||
entries are written in bulk using ``executemany`` (one round-trip per
|
||||
chunk) rather than one ``execute`` per IP.
|
||||
|
||||
Args:
|
||||
ips: List of IP address strings to resolve. Duplicates are ignored.
|
||||
@@ -509,16 +546,19 @@ async def lookup_batch(
|
||||
|
||||
assert chunk_result is not None # noqa: S101
|
||||
|
||||
# Collect bulk-write rows instead of one execute per IP.
|
||||
pos_rows: list[tuple[str, str | None, str | None, str | None, str | None]] = []
|
||||
neg_ips: list[str] = []
|
||||
|
||||
for ip, info in chunk_result.items():
|
||||
if info.country_code is not None:
|
||||
# Successful API resolution.
|
||||
_store(ip, info)
|
||||
geo_result[ip] = info
|
||||
if db is not None:
|
||||
try:
|
||||
await _persist_entry(db, ip, info)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_persist_failed", ip=ip, error=str(exc))
|
||||
pos_rows.append(
|
||||
(ip, info.country_code, info.country_name, info.asn, info.org)
|
||||
)
|
||||
else:
|
||||
# API failed — try local GeoIP fallback.
|
||||
fallback = _geoip_lookup(ip)
|
||||
@@ -526,19 +566,56 @@ async def lookup_batch(
|
||||
_store(ip, fallback)
|
||||
geo_result[ip] = fallback
|
||||
if db is not None:
|
||||
try:
|
||||
await _persist_entry(db, ip, fallback)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_persist_failed", ip=ip, error=str(exc))
|
||||
pos_rows.append(
|
||||
(
|
||||
ip,
|
||||
fallback.country_code,
|
||||
fallback.country_name,
|
||||
fallback.asn,
|
||||
fallback.org,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Both resolvers failed — record in negative cache.
|
||||
_neg_cache[ip] = time.monotonic()
|
||||
geo_result[ip] = _empty
|
||||
if db is not None:
|
||||
try:
|
||||
await _persist_neg_entry(db, ip)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_persist_neg_failed", ip=ip, error=str(exc))
|
||||
neg_ips.append(ip)
|
||||
|
||||
if db is not None:
|
||||
if pos_rows:
|
||||
try:
|
||||
await db.executemany(
|
||||
"""
|
||||
INSERT INTO geo_cache (ip, country_code, country_name, asn, org)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(ip) DO UPDATE SET
|
||||
country_code = excluded.country_code,
|
||||
country_name = excluded.country_name,
|
||||
asn = excluded.asn,
|
||||
org = excluded.org,
|
||||
cached_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
|
||||
""",
|
||||
pos_rows,
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning(
|
||||
"geo_batch_persist_failed",
|
||||
count=len(pos_rows),
|
||||
error=str(exc),
|
||||
)
|
||||
if neg_ips:
|
||||
try:
|
||||
await db.executemany(
|
||||
"INSERT OR IGNORE INTO geo_cache (ip) VALUES (?)",
|
||||
[(ip,) for ip in neg_ips],
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning(
|
||||
"geo_batch_persist_neg_failed",
|
||||
count=len(neg_ips),
|
||||
error=str(exc),
|
||||
)
|
||||
|
||||
if db is not None:
|
||||
try:
|
||||
|
||||
@@ -627,16 +627,34 @@ async def unban_ip(
|
||||
async def get_active_bans(
|
||||
socket_path: str,
|
||||
geo_enricher: Any | None = None,
|
||||
http_session: Any | None = None,
|
||||
app_db: Any | None = None,
|
||||
) -> ActiveBanListResponse:
|
||||
"""Return all currently banned IPs across every jail.
|
||||
|
||||
For each jail the ``get <jail> banip --with-time`` command is used
|
||||
to retrieve ban start and expiry times alongside the IP address.
|
||||
|
||||
Geo enrichment strategy (highest priority first):
|
||||
|
||||
1. When *http_session* is provided the entire set of banned IPs is resolved
|
||||
in a single :func:`~app.services.geo_service.lookup_batch` call (up to
|
||||
100 IPs per HTTP request). This is far more efficient than concurrent
|
||||
per-IP lookups and stays within ip-api.com rate limits.
|
||||
2. When only *geo_enricher* is provided (legacy / test path) each IP is
|
||||
resolved individually via the supplied async callable.
|
||||
|
||||
Args:
|
||||
socket_path: Path to the fail2ban Unix domain socket.
|
||||
geo_enricher: Optional async callable ``(ip) → GeoInfo | None``
|
||||
used to enrich each ban entry with country and ASN data.
|
||||
Ignored when *http_session* is provided.
|
||||
http_session: Optional shared :class:`aiohttp.ClientSession`. When
|
||||
provided, :func:`~app.services.geo_service.lookup_batch` is used
|
||||
for efficient bulk geo resolution.
|
||||
app_db: Optional BanGUI application database connection used to
|
||||
persist newly resolved geo entries across restarts. Only
|
||||
meaningful when *http_session* is provided.
|
||||
|
||||
Returns:
|
||||
:class:`~app.models.ban.ActiveBanListResponse` with all active bans.
|
||||
@@ -645,6 +663,8 @@ async def get_active_bans(
|
||||
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
||||
cannot be reached.
|
||||
"""
|
||||
from app.services import geo_service # noqa: PLC0415
|
||||
|
||||
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
||||
|
||||
# Fetch jail names.
|
||||
@@ -690,8 +710,23 @@ async def get_active_bans(
|
||||
if ban is not None:
|
||||
bans.append(ban)
|
||||
|
||||
# Enrich with geo data if an enricher was provided.
|
||||
if geo_enricher is not None:
|
||||
# Enrich with geo data — prefer batch lookup over per-IP enricher.
|
||||
if http_session is not None and bans:
|
||||
all_ips: list[str] = [ban.ip for ban in bans]
|
||||
try:
|
||||
geo_map = await geo_service.lookup_batch(all_ips, http_session, db=app_db)
|
||||
except Exception: # noqa: BLE001
|
||||
log.warning("active_bans_batch_geo_failed")
|
||||
geo_map = {}
|
||||
enriched: list[ActiveBan] = []
|
||||
for ban in bans:
|
||||
geo = geo_map.get(ban.ip)
|
||||
if geo is not None:
|
||||
enriched.append(ban.model_copy(update={"country": geo.country_code}))
|
||||
else:
|
||||
enriched.append(ban)
|
||||
bans = enriched
|
||||
elif geo_enricher is not None:
|
||||
bans = await _enrich_bans(bans, geo_enricher)
|
||||
|
||||
log.info("active_bans_fetched", total=len(bans))
|
||||
|
||||
Reference in New Issue
Block a user