- Add 5-min negative cache (_neg_cache) so failing IPs are throttled rather than hammering the API on every request - Add MaxMind GeoLite2 fallback (init_geoip / _geoip_lookup) that fires when ip-api fails; controlled by BANGUI_GEOIP_DB_PATH env var - Fix lookup_batch bug: failed API results were stored in positive cache - Add _persist_neg_entry: INSERT OR IGNORE into geo_cache with NULL country_code so re-resolve can find historically failed IPs - Add POST /api/geo/re-resolve: clears neg cache, batch-retries all geo_cache rows with country_code IS NULL, returns resolved/total count - BanTable + MapPage: wrap the country — placeholder in a Fluent UI Tooltip explaining the retry behaviour - Add geoip2>=4.8.0 dependency; geoip_db_path config setting - Tests: add TestNegativeCache (4), TestGeoipFallback (4), TestReResolve (4)
568 lines
20 KiB
Python
568 lines
20 KiB
Python
"""Geo service.
|
|
|
|
Resolves IP addresses to their country, ASN, and organisation using the
|
|
`ip-api.com <http://ip-api.com>`_ JSON API. Results are cached in two tiers:
|
|
|
|
1. **In-memory dict** — fastest; survives for the life of the process.
|
|
2. **Persistent SQLite table** (``geo_cache``) — survives restarts; loaded
|
|
into the in-memory dict during application startup via
|
|
:func:`load_cache_from_db`.
|
|
|
|
Only *successful* lookups (those returning a non-``None`` ``country_code``)
|
|
are written to the persistent cache. Failed lookups are **not** cached so
|
|
they will be retried on the next request.
|
|
|
|
For bulk operations the batch endpoint ``http://ip-api.com/batch`` is used
|
|
(up to 100 IPs per HTTP call) which is far more efficient than one-at-a-time
|
|
requests. Use :func:`lookup_batch` from the ban or blocklist services.
|
|
|
|
Usage::
|
|
|
|
import aiohttp
|
|
import aiosqlite
|
|
from app.services import geo_service
|
|
|
|
# warm the cache from the persistent store at startup
|
|
async with aiosqlite.connect("bangui.db") as db:
|
|
await geo_service.load_cache_from_db(db)
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
# single lookup
|
|
info = await geo_service.lookup("1.2.3.4", session)
|
|
if info:
|
|
print(info.country_code) # "DE"
|
|
|
|
# bulk lookup (more efficient for large sets)
|
|
geo_map = await geo_service.lookup_batch(["1.2.3.4", "5.6.7.8"], session)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import TYPE_CHECKING
|
|
|
|
import geoip2.database
|
|
import geoip2.errors
|
|
import structlog
|
|
|
|
if TYPE_CHECKING:
|
|
import aiohttp
|
|
import aiosqlite
|
|
|
|
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
#: ip-api.com single-IP lookup endpoint (HTTP only on the free tier).
|
|
_API_URL: str = (
|
|
"http://ip-api.com/json/{ip}?fields=status,message,country,countryCode,org,as"
|
|
)
|
|
|
|
#: ip-api.com batch endpoint — accepts up to 100 IPs per POST.
|
|
_BATCH_API_URL: str = (
|
|
"http://ip-api.com/batch?fields=status,message,country,countryCode,org,as,query"
|
|
)
|
|
|
|
#: Maximum IPs per batch request (ip-api.com hard limit is 100).
|
|
_BATCH_SIZE: int = 100
|
|
|
|
#: Maximum number of entries kept in the in-process cache before it is
|
|
#: flushed completely. A simple eviction strategy — the cache is cheap to
|
|
#: rebuild from the persistent store.
|
|
_MAX_CACHE_SIZE: int = 50_000
|
|
|
|
#: Timeout for outgoing geo API requests in seconds.
|
|
_REQUEST_TIMEOUT: float = 5.0
|
|
|
|
#: How many seconds a failed lookup result is suppressed before the IP is
|
|
#: eligible for a new API attempt. Default: 5 minutes.
|
|
_NEG_CACHE_TTL: float = 300.0
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Domain model
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class GeoInfo:
|
|
"""Geographical and network metadata for a single IP address.
|
|
|
|
All fields default to ``None`` when the information is unavailable or
|
|
the lookup fails gracefully.
|
|
"""
|
|
|
|
country_code: str | None
|
|
"""ISO 3166-1 alpha-2 country code, e.g. ``"DE"``."""
|
|
|
|
country_name: str | None
|
|
"""Human-readable country name, e.g. ``"Germany"``."""
|
|
|
|
asn: str | None
|
|
"""Autonomous System Number string, e.g. ``"AS3320"``."""
|
|
|
|
org: str | None
|
|
"""Organisation name associated with the IP, e.g. ``"Deutsche Telekom"``."""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal cache
|
|
# ---------------------------------------------------------------------------
|
|
|
|
#: Module-level in-memory cache: ``ip → GeoInfo`` (positive results only).
|
|
_cache: dict[str, GeoInfo] = {}
|
|
|
|
#: Negative cache: ``ip → epoch timestamp`` of last failed lookup attempt.
|
|
#: Entries within :data:`_NEG_CACHE_TTL` seconds are not re-queried.
|
|
_neg_cache: dict[str, float] = {}
|
|
|
|
#: Optional MaxMind GeoLite2 reader initialised by :func:`init_geoip`.
|
|
_geoip_reader: geoip2.database.Reader | None = None
|
|
|
|
|
|
def clear_cache() -> None:
|
|
"""Flush both the positive and negative lookup caches.
|
|
|
|
Useful in tests and when the operator suspects stale data.
|
|
"""
|
|
_cache.clear()
|
|
_neg_cache.clear()
|
|
|
|
|
|
def clear_neg_cache() -> None:
|
|
"""Flush only the negative (failed-lookups) cache.
|
|
|
|
Useful when triggering a manual re-resolve so that previously failed
|
|
IPs are immediately eligible for a new API attempt.
|
|
"""
|
|
_neg_cache.clear()
|
|
|
|
|
|
def init_geoip(mmdb_path: str | None) -> None:
|
|
"""Initialise the MaxMind GeoLite2-Country database reader.
|
|
|
|
If *mmdb_path* is ``None``, empty, or the file does not exist the
|
|
fallback is silently disabled — ip-api.com remains the sole resolver.
|
|
|
|
Args:
|
|
mmdb_path: Absolute path to a ``GeoLite2-Country.mmdb`` file.
|
|
"""
|
|
global _geoip_reader # noqa: PLW0603
|
|
if not mmdb_path:
|
|
return
|
|
from pathlib import Path # noqa: PLC0415
|
|
|
|
if not Path(mmdb_path).is_file():
|
|
log.warning("geoip_mmdb_not_found", path=mmdb_path)
|
|
return
|
|
_geoip_reader = geoip2.database.Reader(mmdb_path)
|
|
log.info("geoip_mmdb_loaded", path=mmdb_path)
|
|
|
|
|
|
def _geoip_lookup(ip: str) -> GeoInfo | None:
|
|
"""Attempt a local MaxMind GeoLite2 lookup for *ip*.
|
|
|
|
Returns ``None`` when the reader is not initialised, the IP is not in
|
|
the database, or any other error occurs.
|
|
|
|
Args:
|
|
ip: IPv4 or IPv6 address string.
|
|
|
|
Returns:
|
|
A :class:`GeoInfo` with at least ``country_code`` populated, or
|
|
``None`` when resolution is impossible.
|
|
"""
|
|
if _geoip_reader is None:
|
|
return None
|
|
try:
|
|
response = _geoip_reader.country(ip)
|
|
code: str | None = response.country.iso_code or None
|
|
name: str | None = response.country.name or None
|
|
if code is None:
|
|
return None
|
|
return GeoInfo(country_code=code, country_name=name, asn=None, org=None)
|
|
except geoip2.errors.AddressNotFoundError:
|
|
return None
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("geoip_lookup_failed", ip=ip, error=str(exc))
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Persistent cache I/O
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def load_cache_from_db(db: aiosqlite.Connection) -> None:
|
|
"""Pre-populate the in-memory cache from the ``geo_cache`` table.
|
|
|
|
Should be called once during application startup so the service starts
|
|
with a warm cache instead of making cold API calls on the first request.
|
|
|
|
Args:
|
|
db: Open :class:`aiosqlite.Connection` to the BanGUI application
|
|
database (not the fail2ban database).
|
|
"""
|
|
count = 0
|
|
async with db.execute(
|
|
"SELECT ip, country_code, country_name, asn, org FROM geo_cache"
|
|
) as cur:
|
|
async for row in cur:
|
|
ip: str = str(row[0])
|
|
country_code: str | None = row[1]
|
|
if country_code is None:
|
|
continue
|
|
_cache[ip] = GeoInfo(
|
|
country_code=country_code,
|
|
country_name=row[2],
|
|
asn=row[3],
|
|
org=row[4],
|
|
)
|
|
count += 1
|
|
log.info("geo_cache_loaded_from_db", entries=count)
|
|
|
|
|
|
async def _persist_entry(
|
|
db: aiosqlite.Connection,
|
|
ip: str,
|
|
info: GeoInfo,
|
|
) -> None:
|
|
"""Upsert a resolved :class:`GeoInfo` into the ``geo_cache`` table.
|
|
|
|
Only called when ``info.country_code`` is not ``None`` so the persistent
|
|
store never contains empty placeholder rows.
|
|
|
|
Args:
|
|
db: BanGUI application database connection.
|
|
ip: IP address string.
|
|
info: Resolved geo data to persist.
|
|
"""
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO geo_cache (ip, country_code, country_name, asn, org)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(ip) DO UPDATE SET
|
|
country_code = excluded.country_code,
|
|
country_name = excluded.country_name,
|
|
asn = excluded.asn,
|
|
org = excluded.org,
|
|
cached_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
|
|
""",
|
|
(ip, info.country_code, info.country_name, info.asn, info.org),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def _persist_neg_entry(db: aiosqlite.Connection, ip: str) -> None:
|
|
"""Record a failed lookup attempt in ``geo_cache`` with all-NULL fields.
|
|
|
|
Uses ``INSERT OR IGNORE`` so that an existing *positive* entry (one that
|
|
has a ``country_code``) is never overwritten by a later failure.
|
|
|
|
Args:
|
|
db: BanGUI application database connection.
|
|
ip: IP address string whose resolution failed.
|
|
"""
|
|
await db.execute(
|
|
"INSERT OR IGNORE INTO geo_cache (ip) VALUES (?)",
|
|
(ip,),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API — single lookup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def lookup(
|
|
ip: str,
|
|
http_session: aiohttp.ClientSession,
|
|
db: aiosqlite.Connection | None = None,
|
|
) -> GeoInfo | None:
|
|
"""Resolve an IP address to country, ASN, and organisation metadata.
|
|
|
|
Results are cached in-process. If the cache exceeds ``_MAX_CACHE_SIZE``
|
|
entries it is flushed before the new result is stored.
|
|
|
|
Only successful resolutions (``country_code is not None``) are written to
|
|
the persistent cache when *db* is provided. Failed lookups are **not**
|
|
cached so they are retried on the next call.
|
|
|
|
Args:
|
|
ip: IPv4 or IPv6 address string.
|
|
http_session: Shared :class:`aiohttp.ClientSession` (from
|
|
``app.state.http_session``).
|
|
db: Optional BanGUI application database. When provided, successful
|
|
lookups are persisted for cross-restart cache warming.
|
|
|
|
Returns:
|
|
A :class:`GeoInfo` instance, or ``None`` when the lookup fails
|
|
in a way that should prevent the caller from caching a bad result
|
|
(e.g. network timeout).
|
|
"""
|
|
if ip in _cache:
|
|
return _cache[ip]
|
|
|
|
# Negative cache: skip IPs that recently failed to avoid hammering the API.
|
|
neg_ts = _neg_cache.get(ip)
|
|
if neg_ts is not None and (time.monotonic() - neg_ts) < _NEG_CACHE_TTL:
|
|
return GeoInfo(country_code=None, country_name=None, asn=None, org=None)
|
|
|
|
url: str = _API_URL.format(ip=ip)
|
|
api_ok = False
|
|
try:
|
|
async with http_session.get(url, timeout=_REQUEST_TIMEOUT) as resp: # type: ignore[arg-type]
|
|
if resp.status != 200:
|
|
log.warning("geo_lookup_non_200", ip=ip, status=resp.status)
|
|
else:
|
|
data: dict[str, object] = await resp.json(content_type=None)
|
|
if data.get("status") == "success":
|
|
api_ok = True
|
|
result = _parse_single_response(data)
|
|
_store(ip, result)
|
|
if result.country_code is not None and db is not None:
|
|
try:
|
|
await _persist_entry(db, ip, result)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("geo_persist_failed", ip=ip, error=str(exc))
|
|
log.debug("geo_lookup_success", ip=ip, country=result.country_code, asn=result.asn)
|
|
return result
|
|
log.debug(
|
|
"geo_lookup_failed",
|
|
ip=ip,
|
|
message=data.get("message", "unknown"),
|
|
)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("geo_lookup_request_failed", ip=ip, error=str(exc))
|
|
|
|
if not api_ok:
|
|
# Try local MaxMind database as fallback.
|
|
fallback = _geoip_lookup(ip)
|
|
if fallback is not None:
|
|
_store(ip, fallback)
|
|
if fallback.country_code is not None and db is not None:
|
|
try:
|
|
await _persist_entry(db, ip, fallback)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("geo_persist_failed", ip=ip, error=str(exc))
|
|
log.debug("geo_geoip_fallback_success", ip=ip, country=fallback.country_code)
|
|
return fallback
|
|
|
|
# Both resolvers failed — record in negative cache to avoid hammering.
|
|
_neg_cache[ip] = time.monotonic()
|
|
if db is not None:
|
|
try:
|
|
await _persist_neg_entry(db, ip)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("geo_persist_neg_failed", ip=ip, error=str(exc))
|
|
|
|
return GeoInfo(country_code=None, country_name=None, asn=None, org=None)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API — batch lookup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def lookup_batch(
|
|
ips: list[str],
|
|
http_session: aiohttp.ClientSession,
|
|
db: aiosqlite.Connection | None = None,
|
|
) -> dict[str, GeoInfo]:
|
|
"""Resolve multiple IP addresses in bulk using ip-api.com batch endpoint.
|
|
|
|
IPs already present in the in-memory cache are returned immediately
|
|
without making an HTTP request. Uncached IPs are sent to
|
|
``http://ip-api.com/batch`` in chunks of up to :data:`_BATCH_SIZE`.
|
|
|
|
Only successful resolutions (``country_code is not None``) are written to
|
|
the persistent cache when *db* is provided.
|
|
|
|
Args:
|
|
ips: List of IP address strings to resolve. Duplicates are ignored.
|
|
http_session: Shared :class:`aiohttp.ClientSession`.
|
|
db: Optional BanGUI application database for persistent cache writes.
|
|
|
|
Returns:
|
|
Dict mapping ``ip → GeoInfo`` for every input IP. IPs whose
|
|
resolution failed will have a ``GeoInfo`` with all-``None`` fields.
|
|
"""
|
|
geo_result: dict[str, GeoInfo] = {}
|
|
uncached: list[str] = []
|
|
_empty = GeoInfo(country_code=None, country_name=None, asn=None, org=None)
|
|
|
|
unique_ips = list(dict.fromkeys(ips)) # deduplicate, preserve order
|
|
now = time.monotonic()
|
|
for ip in unique_ips:
|
|
if ip in _cache:
|
|
geo_result[ip] = _cache[ip]
|
|
elif ip in _neg_cache and (now - _neg_cache[ip]) < _NEG_CACHE_TTL:
|
|
# Recently failed — skip API call, return empty result.
|
|
geo_result[ip] = _empty
|
|
else:
|
|
uncached.append(ip)
|
|
|
|
if not uncached:
|
|
return geo_result
|
|
|
|
log.info("geo_batch_lookup_start", total=len(uncached))
|
|
|
|
for chunk_start in range(0, len(uncached), _BATCH_SIZE):
|
|
chunk = uncached[chunk_start : chunk_start + _BATCH_SIZE]
|
|
chunk_result = await _batch_api_call(chunk, http_session)
|
|
|
|
for ip, info in chunk_result.items():
|
|
if info.country_code is not None:
|
|
# Successful API resolution.
|
|
_store(ip, info)
|
|
geo_result[ip] = info
|
|
if db is not None:
|
|
try:
|
|
await _persist_entry(db, ip, info)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("geo_persist_failed", ip=ip, error=str(exc))
|
|
else:
|
|
# API failed — try local GeoIP fallback.
|
|
fallback = _geoip_lookup(ip)
|
|
if fallback is not None:
|
|
_store(ip, fallback)
|
|
geo_result[ip] = fallback
|
|
if db is not None:
|
|
try:
|
|
await _persist_entry(db, ip, fallback)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("geo_persist_failed", ip=ip, error=str(exc))
|
|
else:
|
|
# Both resolvers failed — record in negative cache.
|
|
_neg_cache[ip] = time.monotonic()
|
|
geo_result[ip] = _empty
|
|
if db is not None:
|
|
try:
|
|
await _persist_neg_entry(db, ip)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("geo_persist_neg_failed", ip=ip, error=str(exc))
|
|
|
|
log.info(
|
|
"geo_batch_lookup_complete",
|
|
requested=len(uncached),
|
|
resolved=sum(1 for g in geo_result.values() if g.country_code is not None),
|
|
)
|
|
return geo_result
|
|
|
|
|
|
async def _batch_api_call(
|
|
ips: list[str],
|
|
http_session: aiohttp.ClientSession,
|
|
) -> dict[str, GeoInfo]:
|
|
"""Send one batch request to the ip-api.com batch endpoint.
|
|
|
|
Args:
|
|
ips: Up to :data:`_BATCH_SIZE` IP address strings.
|
|
http_session: Shared HTTP session.
|
|
|
|
Returns:
|
|
Dict mapping ``ip → GeoInfo`` for every IP in *ips*. IPs where the
|
|
API returned a failure record or the request raised an exception get
|
|
an all-``None`` :class:`GeoInfo`.
|
|
"""
|
|
empty = GeoInfo(country_code=None, country_name=None, asn=None, org=None)
|
|
fallback: dict[str, GeoInfo] = dict.fromkeys(ips, empty)
|
|
|
|
payload = [{"query": ip} for ip in ips]
|
|
try:
|
|
async with http_session.post(
|
|
_BATCH_API_URL,
|
|
json=payload,
|
|
timeout=_REQUEST_TIMEOUT * 2, # type: ignore[arg-type]
|
|
) as resp:
|
|
if resp.status != 200:
|
|
log.warning("geo_batch_non_200", status=resp.status, count=len(ips))
|
|
return fallback
|
|
data: list[dict[str, object]] = await resp.json(content_type=None)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("geo_batch_request_failed", count=len(ips), error=str(exc))
|
|
return fallback
|
|
|
|
out: dict[str, GeoInfo] = {}
|
|
for entry in data:
|
|
ip_str: str = str(entry.get("query", ""))
|
|
if not ip_str:
|
|
continue
|
|
if entry.get("status") != "success":
|
|
out[ip_str] = empty
|
|
log.debug(
|
|
"geo_batch_entry_failed",
|
|
ip=ip_str,
|
|
message=entry.get("message", "unknown"),
|
|
)
|
|
continue
|
|
out[ip_str] = _parse_single_response(entry)
|
|
|
|
# Fill any IPs missing from the response.
|
|
for ip in ips:
|
|
if ip not in out:
|
|
out[ip] = empty
|
|
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _parse_single_response(data: dict[str, object]) -> GeoInfo:
|
|
"""Build a :class:`GeoInfo` from a single ip-api.com response dict.
|
|
|
|
Args:
|
|
data: A ``status == "success"`` JSON response from ip-api.com.
|
|
|
|
Returns:
|
|
Populated :class:`GeoInfo`.
|
|
"""
|
|
country_code: str | None = _str_or_none(data.get("countryCode"))
|
|
country_name: str | None = _str_or_none(data.get("country"))
|
|
asn_raw: str | None = _str_or_none(data.get("as"))
|
|
org_raw: str | None = _str_or_none(data.get("org"))
|
|
|
|
# ip-api returns "AS12345 Some Org" in both "as" and "org".
|
|
asn: str | None = asn_raw.split()[0] if asn_raw else None
|
|
|
|
return GeoInfo(
|
|
country_code=country_code,
|
|
country_name=country_name,
|
|
asn=asn,
|
|
org=org_raw,
|
|
)
|
|
|
|
|
|
def _str_or_none(value: object) -> str | None:
|
|
"""Return *value* as a non-empty string, or ``None``.
|
|
|
|
Args:
|
|
value: Raw JSON value which may be ``None``, empty, or a string.
|
|
|
|
Returns:
|
|
Stripped string if non-empty, else ``None``.
|
|
"""
|
|
if value is None:
|
|
return None
|
|
s = str(value).strip()
|
|
return s if s else None
|
|
|
|
|
|
def _store(ip: str, info: GeoInfo) -> None:
|
|
"""Insert *info* into the module-level cache, flushing if over capacity.
|
|
|
|
Args:
|
|
ip: The IP address key.
|
|
info: The :class:`GeoInfo` to store.
|
|
"""
|
|
if len(_cache) >= _MAX_CACHE_SIZE:
|
|
_cache.clear()
|
|
log.info("geo_cache_flushed", reason="capacity")
|
|
_cache[ip] = info
|