Fix missing country: neg cache, geoip2 fallback, re-resolve endpoint
- Add 5-min negative cache (_neg_cache) so failing IPs are throttled rather than hammering the API on every request - Add MaxMind GeoLite2 fallback (init_geoip / _geoip_lookup) that fires when ip-api fails; controlled by BANGUI_GEOIP_DB_PATH env var - Fix lookup_batch bug: failed API results were stored in positive cache - Add _persist_neg_entry: INSERT OR IGNORE into geo_cache with NULL country_code so re-resolve can find historically failed IPs - Add POST /api/geo/re-resolve: clears neg cache, batch-retries all geo_cache rows with country_code IS NULL, returns resolved/total count - BanTable + MapPage: wrap the country — placeholder in a Fluent UI Tooltip explaining the retry behaviour - Add geoip2>=4.8.0 dependency; geoip_db_path config setting - Tests: add TestNegativeCache (4), TestGeoipFallback (4), TestReResolve (4)
This commit is contained in:
@@ -45,6 +45,13 @@ class Settings(BaseSettings):
|
||||
default="info",
|
||||
description="Application log level: debug | info | warning | error | critical.",
|
||||
)
|
||||
geoip_db_path: str | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Optional path to a MaxMind GeoLite2-Country .mmdb file. "
|
||||
"When set, failed ip-api.com lookups fall back to local resolution."
|
||||
),
|
||||
)
|
||||
|
||||
model_config = SettingsConfigDict(
|
||||
env_prefix="BANGUI_",
|
||||
|
||||
@@ -137,6 +137,7 @@ async def _lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
# --- Pre-warm geo cache from the persistent store ---
|
||||
from app.services import geo_service # noqa: PLC0415
|
||||
|
||||
geo_service.init_geoip(settings.geoip_db_path)
|
||||
await geo_service.load_cache_from_db(db)
|
||||
|
||||
# --- Background task scheduler ---
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
"""Geo / IP lookup router.
|
||||
|
||||
Provides the IP enrichment endpoint:
|
||||
Provides the IP enrichment endpoints:
|
||||
|
||||
* ``GET /api/geo/lookup/{ip}`` — ban status, ban history, and geo info for an IP
|
||||
* ``POST /api/geo/re-resolve`` — retry all previously failed geo lookups
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -12,9 +13,10 @@ from typing import TYPE_CHECKING, Annotated
|
||||
if TYPE_CHECKING:
|
||||
import aiohttp
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Path, Request, status
|
||||
import aiosqlite
|
||||
from fastapi import APIRouter, Depends, HTTPException, Path, Request, status
|
||||
|
||||
from app.dependencies import AuthDep
|
||||
from app.dependencies import AuthDep, get_db
|
||||
from app.models.geo import GeoDetail, IpLookupResponse
|
||||
from app.services import geo_service, jail_service
|
||||
from app.utils.fail2ban_client import Fail2BanConnectionError
|
||||
@@ -90,3 +92,55 @@ async def lookup_ip(
|
||||
currently_banned_in=result["currently_banned_in"],
|
||||
geo=geo_detail,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# POST /api/geo/re-resolve
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.post(
|
||||
"/re-resolve",
|
||||
summary="Re-resolve all IPs whose country could not be determined",
|
||||
)
|
||||
async def re_resolve_geo(
|
||||
request: Request,
|
||||
_auth: AuthDep,
|
||||
db: Annotated[aiosqlite.Connection, Depends(get_db)],
|
||||
) -> dict[str, int]:
|
||||
"""Retry geo resolution for every IP in ``geo_cache`` with a null country.
|
||||
|
||||
Clears the in-memory negative cache first so that previously failing IPs
|
||||
are immediately eligible for a new API attempt.
|
||||
|
||||
Args:
|
||||
request: Incoming request (used to access ``app.state.http_session``).
|
||||
_auth: Validated session — enforces authentication.
|
||||
db: BanGUI application database (for reading/writing ``geo_cache``).
|
||||
|
||||
Returns:
|
||||
JSON object ``{"resolved": N, "total": M}`` where *N* is the number
|
||||
of IPs that gained a country code and *M* is the total number of IPs
|
||||
that were retried.
|
||||
"""
|
||||
# Collect all IPs in geo_cache that still lack a country code.
|
||||
unresolved: list[str] = []
|
||||
async with db.execute(
|
||||
"SELECT ip FROM geo_cache WHERE country_code IS NULL"
|
||||
) as cur:
|
||||
async for row in cur:
|
||||
unresolved.append(str(row[0]))
|
||||
|
||||
if not unresolved:
|
||||
return {"resolved": 0, "total": 0}
|
||||
|
||||
# Clear negative cache so these IPs bypass the TTL check.
|
||||
geo_service.clear_neg_cache()
|
||||
|
||||
http_session: aiohttp.ClientSession = request.app.state.http_session
|
||||
geo_map = await geo_service.lookup_batch(unresolved, http_session, db=db)
|
||||
|
||||
resolved_count = sum(
|
||||
1 for info in geo_map.values() if info.country_code is not None
|
||||
)
|
||||
return {"resolved": resolved_count, "total": len(unresolved)}
|
||||
|
||||
@@ -38,9 +38,12 @@ Usage::
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import geoip2.database
|
||||
import geoip2.errors
|
||||
import structlog
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -74,6 +77,10 @@ _MAX_CACHE_SIZE: int = 50_000
|
||||
#: Timeout for outgoing geo API requests in seconds.
|
||||
_REQUEST_TIMEOUT: float = 5.0
|
||||
|
||||
#: How many seconds a failed lookup result is suppressed before the IP is
|
||||
#: eligible for a new API attempt. Default: 5 minutes.
|
||||
_NEG_CACHE_TTL: float = 300.0
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Domain model
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -104,16 +111,83 @@ class GeoInfo:
|
||||
# Internal cache
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
#: Module-level in-memory cache: ``ip → GeoInfo``.
|
||||
#: Module-level in-memory cache: ``ip → GeoInfo`` (positive results only).
|
||||
_cache: dict[str, GeoInfo] = {}
|
||||
|
||||
#: Negative cache: ``ip → epoch timestamp`` of last failed lookup attempt.
|
||||
#: Entries within :data:`_NEG_CACHE_TTL` seconds are not re-queried.
|
||||
_neg_cache: dict[str, float] = {}
|
||||
|
||||
#: Optional MaxMind GeoLite2 reader initialised by :func:`init_geoip`.
|
||||
_geoip_reader: geoip2.database.Reader | None = None
|
||||
|
||||
|
||||
def clear_cache() -> None:
|
||||
"""Flush the entire lookup cache.
|
||||
"""Flush both the positive and negative lookup caches.
|
||||
|
||||
Useful in tests and when the operator suspects stale data.
|
||||
"""
|
||||
_cache.clear()
|
||||
_neg_cache.clear()
|
||||
|
||||
|
||||
def clear_neg_cache() -> None:
|
||||
"""Flush only the negative (failed-lookups) cache.
|
||||
|
||||
Useful when triggering a manual re-resolve so that previously failed
|
||||
IPs are immediately eligible for a new API attempt.
|
||||
"""
|
||||
_neg_cache.clear()
|
||||
|
||||
|
||||
def init_geoip(mmdb_path: str | None) -> None:
|
||||
"""Initialise the MaxMind GeoLite2-Country database reader.
|
||||
|
||||
If *mmdb_path* is ``None``, empty, or the file does not exist the
|
||||
fallback is silently disabled — ip-api.com remains the sole resolver.
|
||||
|
||||
Args:
|
||||
mmdb_path: Absolute path to a ``GeoLite2-Country.mmdb`` file.
|
||||
"""
|
||||
global _geoip_reader # noqa: PLW0603
|
||||
if not mmdb_path:
|
||||
return
|
||||
from pathlib import Path # noqa: PLC0415
|
||||
|
||||
if not Path(mmdb_path).is_file():
|
||||
log.warning("geoip_mmdb_not_found", path=mmdb_path)
|
||||
return
|
||||
_geoip_reader = geoip2.database.Reader(mmdb_path)
|
||||
log.info("geoip_mmdb_loaded", path=mmdb_path)
|
||||
|
||||
|
||||
def _geoip_lookup(ip: str) -> GeoInfo | None:
|
||||
"""Attempt a local MaxMind GeoLite2 lookup for *ip*.
|
||||
|
||||
Returns ``None`` when the reader is not initialised, the IP is not in
|
||||
the database, or any other error occurs.
|
||||
|
||||
Args:
|
||||
ip: IPv4 or IPv6 address string.
|
||||
|
||||
Returns:
|
||||
A :class:`GeoInfo` with at least ``country_code`` populated, or
|
||||
``None`` when resolution is impossible.
|
||||
"""
|
||||
if _geoip_reader is None:
|
||||
return None
|
||||
try:
|
||||
response = _geoip_reader.country(ip)
|
||||
code: str | None = response.country.iso_code or None
|
||||
name: str | None = response.country.name or None
|
||||
if code is None:
|
||||
return None
|
||||
return GeoInfo(country_code=code, country_name=name, asn=None, org=None)
|
||||
except geoip2.errors.AddressNotFoundError:
|
||||
return None
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geoip_lookup_failed", ip=ip, error=str(exc))
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -181,6 +255,23 @@ async def _persist_entry(
|
||||
await db.commit()
|
||||
|
||||
|
||||
async def _persist_neg_entry(db: aiosqlite.Connection, ip: str) -> None:
|
||||
"""Record a failed lookup attempt in ``geo_cache`` with all-NULL fields.
|
||||
|
||||
Uses ``INSERT OR IGNORE`` so that an existing *positive* entry (one that
|
||||
has a ``country_code``) is never overwritten by a later failure.
|
||||
|
||||
Args:
|
||||
db: BanGUI application database connection.
|
||||
ip: IP address string whose resolution failed.
|
||||
"""
|
||||
await db.execute(
|
||||
"INSERT OR IGNORE INTO geo_cache (ip) VALUES (?)",
|
||||
(ip,),
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API — single lookup
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -215,36 +306,60 @@ async def lookup(
|
||||
if ip in _cache:
|
||||
return _cache[ip]
|
||||
|
||||
# Negative cache: skip IPs that recently failed to avoid hammering the API.
|
||||
neg_ts = _neg_cache.get(ip)
|
||||
if neg_ts is not None and (time.monotonic() - neg_ts) < _NEG_CACHE_TTL:
|
||||
return GeoInfo(country_code=None, country_name=None, asn=None, org=None)
|
||||
|
||||
url: str = _API_URL.format(ip=ip)
|
||||
api_ok = False
|
||||
try:
|
||||
async with http_session.get(url, timeout=_REQUEST_TIMEOUT) as resp: # type: ignore[arg-type]
|
||||
if resp.status != 200:
|
||||
log.warning("geo_lookup_non_200", ip=ip, status=resp.status)
|
||||
return None
|
||||
|
||||
data: dict[str, object] = await resp.json(content_type=None)
|
||||
else:
|
||||
data: dict[str, object] = await resp.json(content_type=None)
|
||||
if data.get("status") == "success":
|
||||
api_ok = True
|
||||
result = _parse_single_response(data)
|
||||
_store(ip, result)
|
||||
if result.country_code is not None and db is not None:
|
||||
try:
|
||||
await _persist_entry(db, ip, result)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_persist_failed", ip=ip, error=str(exc))
|
||||
log.debug("geo_lookup_success", ip=ip, country=result.country_code, asn=result.asn)
|
||||
return result
|
||||
log.debug(
|
||||
"geo_lookup_failed",
|
||||
ip=ip,
|
||||
message=data.get("message", "unknown"),
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_lookup_request_failed", ip=ip, error=str(exc))
|
||||
return None
|
||||
|
||||
if data.get("status") != "success":
|
||||
log.debug(
|
||||
"geo_lookup_failed",
|
||||
ip=ip,
|
||||
message=data.get("message", "unknown"),
|
||||
)
|
||||
# Do NOT cache failed lookups — they will be retried on the next call.
|
||||
return GeoInfo(country_code=None, country_name=None, asn=None, org=None)
|
||||
if not api_ok:
|
||||
# Try local MaxMind database as fallback.
|
||||
fallback = _geoip_lookup(ip)
|
||||
if fallback is not None:
|
||||
_store(ip, fallback)
|
||||
if fallback.country_code is not None and db is not None:
|
||||
try:
|
||||
await _persist_entry(db, ip, fallback)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_persist_failed", ip=ip, error=str(exc))
|
||||
log.debug("geo_geoip_fallback_success", ip=ip, country=fallback.country_code)
|
||||
return fallback
|
||||
|
||||
result = _parse_single_response(data)
|
||||
_store(ip, result)
|
||||
if result.country_code is not None and db is not None:
|
||||
try:
|
||||
await _persist_entry(db, ip, result)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_persist_failed", ip=ip, error=str(exc))
|
||||
log.debug("geo_lookup_success", ip=ip, country=result.country_code, asn=result.asn)
|
||||
return result
|
||||
# Both resolvers failed — record in negative cache to avoid hammering.
|
||||
_neg_cache[ip] = time.monotonic()
|
||||
if db is not None:
|
||||
try:
|
||||
await _persist_neg_entry(db, ip)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_persist_neg_failed", ip=ip, error=str(exc))
|
||||
|
||||
return GeoInfo(country_code=None, country_name=None, asn=None, org=None)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -277,11 +392,16 @@ async def lookup_batch(
|
||||
"""
|
||||
geo_result: dict[str, GeoInfo] = {}
|
||||
uncached: list[str] = []
|
||||
_empty = GeoInfo(country_code=None, country_name=None, asn=None, org=None)
|
||||
|
||||
unique_ips = list(dict.fromkeys(ips)) # deduplicate, preserve order
|
||||
now = time.monotonic()
|
||||
for ip in unique_ips:
|
||||
if ip in _cache:
|
||||
geo_result[ip] = _cache[ip]
|
||||
elif ip in _neg_cache and (now - _neg_cache[ip]) < _NEG_CACHE_TTL:
|
||||
# Recently failed — skip API call, return empty result.
|
||||
geo_result[ip] = _empty
|
||||
else:
|
||||
uncached.append(ip)
|
||||
|
||||
@@ -295,13 +415,35 @@ async def lookup_batch(
|
||||
chunk_result = await _batch_api_call(chunk, http_session)
|
||||
|
||||
for ip, info in chunk_result.items():
|
||||
_store(ip, info)
|
||||
geo_result[ip] = info
|
||||
if info.country_code is not None and db is not None:
|
||||
try:
|
||||
await _persist_entry(db, ip, info)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_persist_failed", ip=ip, error=str(exc))
|
||||
if info.country_code is not None:
|
||||
# Successful API resolution.
|
||||
_store(ip, info)
|
||||
geo_result[ip] = info
|
||||
if db is not None:
|
||||
try:
|
||||
await _persist_entry(db, ip, info)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_persist_failed", ip=ip, error=str(exc))
|
||||
else:
|
||||
# API failed — try local GeoIP fallback.
|
||||
fallback = _geoip_lookup(ip)
|
||||
if fallback is not None:
|
||||
_store(ip, fallback)
|
||||
geo_result[ip] = fallback
|
||||
if db is not None:
|
||||
try:
|
||||
await _persist_entry(db, ip, fallback)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_persist_failed", ip=ip, error=str(exc))
|
||||
else:
|
||||
# Both resolvers failed — record in negative cache.
|
||||
_neg_cache[ip] = time.monotonic()
|
||||
geo_result[ip] = _empty
|
||||
if db is not None:
|
||||
try:
|
||||
await _persist_neg_entry(db, ip)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_persist_neg_failed", ip=ip, error=str(exc))
|
||||
|
||||
log.info(
|
||||
"geo_batch_lookup_complete",
|
||||
|
||||
Reference in New Issue
Block a user