"""Geo service. Resolves IP addresses to their country, ASN, and organisation using the `ip-api.com `_ JSON API. Results are cached in two tiers: 1. **In-memory dict** — fastest; survives for the life of the process. 2. **Persistent SQLite table** (``geo_cache``) — survives restarts; loaded into the in-memory dict during application startup via :func:`load_cache_from_db`. Only *successful* lookups (those returning a non-``None`` ``country_code``) are written to the persistent cache. Failed lookups are **not** cached so they will be retried on the next request. For bulk operations the batch endpoint ``http://ip-api.com/batch`` is used (up to 100 IPs per HTTP call) which is far more efficient than one-at-a-time requests. Use :func:`lookup_batch` from the ban or blocklist services. Usage:: import aiohttp import aiosqlite from app.services import geo_service # warm the cache from the persistent store at startup async with aiosqlite.connect("bangui.db") as db: await geo_service.load_cache_from_db(db) async with aiohttp.ClientSession() as session: # single lookup info = await geo_service.lookup("1.2.3.4", session) if info: print(info.country_code) # "DE" # bulk lookup (more efficient for large sets) geo_map = await geo_service.lookup_batch(["1.2.3.4", "5.6.7.8"], session) """ from __future__ import annotations import asyncio import time from dataclasses import dataclass from typing import TYPE_CHECKING import aiohttp import structlog if TYPE_CHECKING: import aiosqlite import geoip2.database import geoip2.errors log: structlog.stdlib.BoundLogger = structlog.get_logger() # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- #: ip-api.com single-IP lookup endpoint (HTTP only on the free tier). _API_URL: str = ( "http://ip-api.com/json/{ip}?fields=status,message,country,countryCode,org,as" ) #: ip-api.com batch endpoint — accepts up to 100 IPs per POST. _BATCH_API_URL: str = ( "http://ip-api.com/batch?fields=status,message,country,countryCode,org,as,query" ) #: Maximum IPs per batch request (ip-api.com hard limit is 100). _BATCH_SIZE: int = 100 #: Maximum number of entries kept in the in-process cache before it is #: flushed completely. A simple eviction strategy — the cache is cheap to #: rebuild from the persistent store. _MAX_CACHE_SIZE: int = 50_000 #: Timeout for outgoing geo API requests in seconds. _REQUEST_TIMEOUT: float = 5.0 #: How many seconds a failed lookup result is suppressed before the IP is #: eligible for a new API attempt. Default: 5 minutes. _NEG_CACHE_TTL: float = 300.0 #: Minimum delay in seconds between consecutive batch HTTP requests to #: ip-api.com. The free tier allows 45 requests/min; 1.5 s ≈ 40 req/min. _BATCH_DELAY: float = 1.5 #: Maximum number of retries for a batch chunk that fails with a #: transient error (e.g. connection reset due to rate limiting). _BATCH_MAX_RETRIES: int = 2 # --------------------------------------------------------------------------- # Domain model # --------------------------------------------------------------------------- @dataclass class GeoInfo: """Geographical and network metadata for a single IP address. All fields default to ``None`` when the information is unavailable or the lookup fails gracefully. """ country_code: str | None """ISO 3166-1 alpha-2 country code, e.g. ``"DE"``.""" country_name: str | None """Human-readable country name, e.g. ``"Germany"``.""" asn: str | None """Autonomous System Number string, e.g. ``"AS3320"``.""" org: str | None """Organisation name associated with the IP, e.g. ``"Deutsche Telekom"``.""" # --------------------------------------------------------------------------- # Internal cache # --------------------------------------------------------------------------- #: Module-level in-memory cache: ``ip → GeoInfo`` (positive results only). _cache: dict[str, GeoInfo] = {} #: Negative cache: ``ip → epoch timestamp`` of last failed lookup attempt. #: Entries within :data:`_NEG_CACHE_TTL` seconds are not re-queried. _neg_cache: dict[str, float] = {} #: IPs added to :data:`_cache` but not yet persisted to the database. #: Consumed and cleared atomically by :func:`flush_dirty`. _dirty: set[str] = set() #: Optional MaxMind GeoLite2 reader initialised by :func:`init_geoip`. _geoip_reader: geoip2.database.Reader | None = None def clear_cache() -> None: """Flush both the positive and negative lookup caches. Also clears the dirty set so any pending-but-unpersisted entries are discarded. Useful in tests and when the operator suspects stale data. """ _cache.clear() _neg_cache.clear() _dirty.clear() def clear_neg_cache() -> None: """Flush only the negative (failed-lookups) cache. Useful when triggering a manual re-resolve so that previously failed IPs are immediately eligible for a new API attempt. """ _neg_cache.clear() def is_cached(ip: str) -> bool: """Return ``True`` if *ip* has a positive entry in the in-memory cache. A positive entry is one with a non-``None`` ``country_code``. This is useful for skipping IPs that have already been resolved when building a list for :func:`lookup_batch`. Args: ip: IPv4 or IPv6 address string. Returns: ``True`` when *ip* is in the cache with a known country code. """ return ip in _cache and _cache[ip].country_code is not None async def cache_stats(db: aiosqlite.Connection) -> dict[str, int]: """Return diagnostic counters for the geo cache subsystem. Queries the persistent store for the number of unresolved entries and combines it with in-memory counters. Args: db: Open BanGUI application database connection. Returns: Dict with keys ``cache_size``, ``unresolved``, ``neg_cache_size``, and ``dirty_size``. """ async with db.execute( "SELECT COUNT(*) FROM geo_cache WHERE country_code IS NULL" ) as cur: row = await cur.fetchone() unresolved: int = int(row[0]) if row else 0 return { "cache_size": len(_cache), "unresolved": unresolved, "neg_cache_size": len(_neg_cache), "dirty_size": len(_dirty), } def init_geoip(mmdb_path: str | None) -> None: """Initialise the MaxMind GeoLite2-Country database reader. If *mmdb_path* is ``None``, empty, or the file does not exist the fallback is silently disabled — ip-api.com remains the sole resolver. Args: mmdb_path: Absolute path to a ``GeoLite2-Country.mmdb`` file. """ global _geoip_reader # noqa: PLW0603 if not mmdb_path: return from pathlib import Path # noqa: PLC0415 import geoip2.database # noqa: PLC0415 if not Path(mmdb_path).is_file(): log.warning("geoip_mmdb_not_found", path=mmdb_path) return _geoip_reader = geoip2.database.Reader(mmdb_path) log.info("geoip_mmdb_loaded", path=mmdb_path) def _geoip_lookup(ip: str) -> GeoInfo | None: """Attempt a local MaxMind GeoLite2 lookup for *ip*. Returns ``None`` when the reader is not initialised, the IP is not in the database, or any other error occurs. Args: ip: IPv4 or IPv6 address string. Returns: A :class:`GeoInfo` with at least ``country_code`` populated, or ``None`` when resolution is impossible. """ if _geoip_reader is None: return None import geoip2.errors # noqa: PLC0415 try: response = _geoip_reader.country(ip) code: str | None = response.country.iso_code or None name: str | None = response.country.name or None if code is None: return None return GeoInfo(country_code=code, country_name=name, asn=None, org=None) except geoip2.errors.AddressNotFoundError: return None except Exception as exc: # noqa: BLE001 log.warning("geoip_lookup_failed", ip=ip, error=str(exc)) return None # --------------------------------------------------------------------------- # Persistent cache I/O # --------------------------------------------------------------------------- async def load_cache_from_db(db: aiosqlite.Connection) -> None: """Pre-populate the in-memory cache from the ``geo_cache`` table. Should be called once during application startup so the service starts with a warm cache instead of making cold API calls on the first request. Args: db: Open :class:`aiosqlite.Connection` to the BanGUI application database (not the fail2ban database). """ count = 0 async with db.execute( "SELECT ip, country_code, country_name, asn, org FROM geo_cache" ) as cur: async for row in cur: ip: str = str(row[0]) country_code: str | None = row[1] if country_code is None: continue _cache[ip] = GeoInfo( country_code=country_code, country_name=row[2], asn=row[3], org=row[4], ) count += 1 log.info("geo_cache_loaded_from_db", entries=count) async def _persist_entry( db: aiosqlite.Connection, ip: str, info: GeoInfo, ) -> None: """Upsert a resolved :class:`GeoInfo` into the ``geo_cache`` table. Only called when ``info.country_code`` is not ``None`` so the persistent store never contains empty placeholder rows. Args: db: BanGUI application database connection. ip: IP address string. info: Resolved geo data to persist. """ await db.execute( """ INSERT INTO geo_cache (ip, country_code, country_name, asn, org) VALUES (?, ?, ?, ?, ?) ON CONFLICT(ip) DO UPDATE SET country_code = excluded.country_code, country_name = excluded.country_name, asn = excluded.asn, org = excluded.org, cached_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') """, (ip, info.country_code, info.country_name, info.asn, info.org), ) async def _persist_neg_entry(db: aiosqlite.Connection, ip: str) -> None: """Record a failed lookup attempt in ``geo_cache`` with all-NULL fields. Uses ``INSERT OR IGNORE`` so that an existing *positive* entry (one that has a ``country_code``) is never overwritten by a later failure. Args: db: BanGUI application database connection. ip: IP address string whose resolution failed. """ await db.execute( "INSERT OR IGNORE INTO geo_cache (ip) VALUES (?)", (ip,), ) # --------------------------------------------------------------------------- # Public API — single lookup # --------------------------------------------------------------------------- async def lookup( ip: str, http_session: aiohttp.ClientSession, db: aiosqlite.Connection | None = None, ) -> GeoInfo | None: """Resolve an IP address to country, ASN, and organisation metadata. Results are cached in-process. If the cache exceeds ``_MAX_CACHE_SIZE`` entries it is flushed before the new result is stored. Only successful resolutions (``country_code is not None``) are written to the persistent cache when *db* is provided. Failed lookups are **not** cached so they are retried on the next call. Args: ip: IPv4 or IPv6 address string. http_session: Shared :class:`aiohttp.ClientSession` (from ``app.state.http_session``). db: Optional BanGUI application database. When provided, successful lookups are persisted for cross-restart cache warming. Returns: A :class:`GeoInfo` instance, or ``None`` when the lookup fails in a way that should prevent the caller from caching a bad result (e.g. network timeout). """ if ip in _cache: return _cache[ip] # Negative cache: skip IPs that recently failed to avoid hammering the API. neg_ts = _neg_cache.get(ip) if neg_ts is not None and (time.monotonic() - neg_ts) < _NEG_CACHE_TTL: return GeoInfo(country_code=None, country_name=None, asn=None, org=None) url: str = _API_URL.format(ip=ip) api_ok = False try: async with http_session.get(url, timeout=aiohttp.ClientTimeout(total=_REQUEST_TIMEOUT)) as resp: if resp.status != 200: log.warning("geo_lookup_non_200", ip=ip, status=resp.status) else: data: dict[str, object] = await resp.json(content_type=None) if data.get("status") == "success": api_ok = True result = _parse_single_response(data) _store(ip, result) if result.country_code is not None and db is not None: try: await _persist_entry(db, ip, result) await db.commit() except Exception as exc: # noqa: BLE001 log.warning("geo_persist_failed", ip=ip, error=str(exc)) log.debug("geo_lookup_success", ip=ip, country=result.country_code, asn=result.asn) return result log.debug( "geo_lookup_failed", ip=ip, message=data.get("message", "unknown"), ) except Exception as exc: # noqa: BLE001 log.warning( "geo_lookup_request_failed", ip=ip, exc_type=type(exc).__name__, error=repr(exc), ) if not api_ok: # Try local MaxMind database as fallback. fallback = _geoip_lookup(ip) if fallback is not None: _store(ip, fallback) if fallback.country_code is not None and db is not None: try: await _persist_entry(db, ip, fallback) await db.commit() except Exception as exc: # noqa: BLE001 log.warning("geo_persist_failed", ip=ip, error=str(exc)) log.debug("geo_geoip_fallback_success", ip=ip, country=fallback.country_code) return fallback # Both resolvers failed — record in negative cache to avoid hammering. _neg_cache[ip] = time.monotonic() if db is not None: try: await _persist_neg_entry(db, ip) await db.commit() except Exception as exc: # noqa: BLE001 log.warning("geo_persist_neg_failed", ip=ip, error=str(exc)) return GeoInfo(country_code=None, country_name=None, asn=None, org=None) # --------------------------------------------------------------------------- # Public API — batch lookup # --------------------------------------------------------------------------- def lookup_cached_only( ips: list[str], ) -> tuple[dict[str, GeoInfo], list[str]]: """Return cached geo data for *ips* without making any external API calls. Used by callers that want to return a fast response using only what is already in memory, while deferring resolution of uncached IPs to a background task. Args: ips: IP address strings to look up. Returns: A ``(geo_map, uncached)`` tuple where *geo_map* maps every IP that was already in the in-memory cache to its :class:`GeoInfo`, and *uncached* is the list of IPs that were not found in the cache. Entries in the negative cache (recently failed) are **not** included in *uncached* so they are not re-queued immediately. """ geo_map: dict[str, GeoInfo] = {} uncached: list[str] = [] now = time.monotonic() for ip in dict.fromkeys(ips): # deduplicate, preserve order if ip in _cache: geo_map[ip] = _cache[ip] elif ip in _neg_cache and (now - _neg_cache[ip]) < _NEG_CACHE_TTL: # Still within the cool-down window — do not re-queue. pass else: uncached.append(ip) return geo_map, uncached async def lookup_batch( ips: list[str], http_session: aiohttp.ClientSession, db: aiosqlite.Connection | None = None, ) -> dict[str, GeoInfo]: """Resolve multiple IP addresses in bulk using ip-api.com batch endpoint. IPs already present in the in-memory cache are returned immediately without making an HTTP request. Uncached IPs are sent to ``http://ip-api.com/batch`` in chunks of up to :data:`_BATCH_SIZE`. Only successful resolutions (``country_code is not None``) are written to the persistent cache when *db* is provided. Both positive and negative entries are written in bulk using ``executemany`` (one round-trip per chunk) rather than one ``execute`` per IP. Args: ips: List of IP address strings to resolve. Duplicates are ignored. http_session: Shared :class:`aiohttp.ClientSession`. db: Optional BanGUI application database for persistent cache writes. Returns: Dict mapping ``ip → GeoInfo`` for every input IP. IPs whose resolution failed will have a ``GeoInfo`` with all-``None`` fields. """ geo_result: dict[str, GeoInfo] = {} uncached: list[str] = [] _empty = GeoInfo(country_code=None, country_name=None, asn=None, org=None) unique_ips = list(dict.fromkeys(ips)) # deduplicate, preserve order now = time.monotonic() for ip in unique_ips: if ip in _cache: geo_result[ip] = _cache[ip] elif ip in _neg_cache and (now - _neg_cache[ip]) < _NEG_CACHE_TTL: # Recently failed — skip API call, return empty result. geo_result[ip] = _empty else: uncached.append(ip) if not uncached: return geo_result log.info("geo_batch_lookup_start", total=len(uncached)) for batch_idx, chunk_start in enumerate(range(0, len(uncached), _BATCH_SIZE)): chunk = uncached[chunk_start : chunk_start + _BATCH_SIZE] # Throttle: pause between consecutive HTTP calls to stay within the # ip-api.com free-tier rate limit (45 req/min). if batch_idx > 0: await asyncio.sleep(_BATCH_DELAY) # Retry transient failures (e.g. connection-reset from rate limit). chunk_result: dict[str, GeoInfo] | None = None for attempt in range(_BATCH_MAX_RETRIES + 1): chunk_result = await _batch_api_call(chunk, http_session) # If every IP in the chunk came back with country_code=None and the # batch wasn't tiny, that almost certainly means the whole request # was rejected (connection reset / 429). Retry after a back-off. all_failed = all( info.country_code is None for info in chunk_result.values() ) if not all_failed or attempt >= _BATCH_MAX_RETRIES: break backoff = _BATCH_DELAY * (2 ** (attempt + 1)) log.warning( "geo_batch_retry", attempt=attempt + 1, chunk_size=len(chunk), backoff=backoff, ) await asyncio.sleep(backoff) assert chunk_result is not None # noqa: S101 # Collect bulk-write rows instead of one execute per IP. pos_rows: list[tuple[str, str | None, str | None, str | None, str | None]] = [] neg_ips: list[str] = [] for ip, info in chunk_result.items(): if info.country_code is not None: # Successful API resolution. _store(ip, info) geo_result[ip] = info if db is not None: pos_rows.append( (ip, info.country_code, info.country_name, info.asn, info.org) ) else: # API failed — try local GeoIP fallback. fallback = _geoip_lookup(ip) if fallback is not None: _store(ip, fallback) geo_result[ip] = fallback if db is not None: pos_rows.append( ( ip, fallback.country_code, fallback.country_name, fallback.asn, fallback.org, ) ) else: # Both resolvers failed — record in negative cache. _neg_cache[ip] = time.monotonic() geo_result[ip] = _empty if db is not None: neg_ips.append(ip) if db is not None: if pos_rows: try: await db.executemany( """ INSERT INTO geo_cache (ip, country_code, country_name, asn, org) VALUES (?, ?, ?, ?, ?) ON CONFLICT(ip) DO UPDATE SET country_code = excluded.country_code, country_name = excluded.country_name, asn = excluded.asn, org = excluded.org, cached_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') """, pos_rows, ) except Exception as exc: # noqa: BLE001 log.warning( "geo_batch_persist_failed", count=len(pos_rows), error=str(exc), ) if neg_ips: try: await db.executemany( "INSERT OR IGNORE INTO geo_cache (ip) VALUES (?)", [(ip,) for ip in neg_ips], ) except Exception as exc: # noqa: BLE001 log.warning( "geo_batch_persist_neg_failed", count=len(neg_ips), error=str(exc), ) if db is not None: try: await db.commit() except Exception as exc: # noqa: BLE001 log.warning("geo_batch_commit_failed", error=str(exc)) log.info( "geo_batch_lookup_complete", requested=len(uncached), resolved=sum(1 for g in geo_result.values() if g.country_code is not None), ) return geo_result async def _batch_api_call( ips: list[str], http_session: aiohttp.ClientSession, ) -> dict[str, GeoInfo]: """Send one batch request to the ip-api.com batch endpoint. Args: ips: Up to :data:`_BATCH_SIZE` IP address strings. http_session: Shared HTTP session. Returns: Dict mapping ``ip → GeoInfo`` for every IP in *ips*. IPs where the API returned a failure record or the request raised an exception get an all-``None`` :class:`GeoInfo`. """ empty = GeoInfo(country_code=None, country_name=None, asn=None, org=None) fallback: dict[str, GeoInfo] = dict.fromkeys(ips, empty) payload = [{"query": ip} for ip in ips] try: async with http_session.post( _BATCH_API_URL, json=payload, timeout=aiohttp.ClientTimeout(total=_REQUEST_TIMEOUT * 2), ) as resp: if resp.status != 200: log.warning("geo_batch_non_200", status=resp.status, count=len(ips)) return fallback data: list[dict[str, object]] = await resp.json(content_type=None) except Exception as exc: # noqa: BLE001 log.warning( "geo_batch_request_failed", count=len(ips), exc_type=type(exc).__name__, error=repr(exc), ) return fallback out: dict[str, GeoInfo] = {} for entry in data: ip_str: str = str(entry.get("query", "")) if not ip_str: continue if entry.get("status") != "success": out[ip_str] = empty log.debug( "geo_batch_entry_failed", ip=ip_str, message=entry.get("message", "unknown"), ) continue out[ip_str] = _parse_single_response(entry) # Fill any IPs missing from the response. for ip in ips: if ip not in out: out[ip] = empty return out # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _parse_single_response(data: dict[str, object]) -> GeoInfo: """Build a :class:`GeoInfo` from a single ip-api.com response dict. Args: data: A ``status == "success"`` JSON response from ip-api.com. Returns: Populated :class:`GeoInfo`. """ country_code: str | None = _str_or_none(data.get("countryCode")) country_name: str | None = _str_or_none(data.get("country")) asn_raw: str | None = _str_or_none(data.get("as")) org_raw: str | None = _str_or_none(data.get("org")) # ip-api returns "AS12345 Some Org" in both "as" and "org". asn: str | None = asn_raw.split()[0] if asn_raw else None return GeoInfo( country_code=country_code, country_name=country_name, asn=asn, org=org_raw, ) def _str_or_none(value: object) -> str | None: """Return *value* as a non-empty string, or ``None``. Args: value: Raw JSON value which may be ``None``, empty, or a string. Returns: Stripped string if non-empty, else ``None``. """ if value is None: return None s = str(value).strip() return s if s else None def _store(ip: str, info: GeoInfo) -> None: """Insert *info* into the module-level cache, flushing if over capacity. When the IP resolved successfully (``country_code is not None``) it is also added to the :data:`_dirty` set so :func:`flush_dirty` can persist it to the database on the next scheduled flush. Args: ip: The IP address key. info: The :class:`GeoInfo` to store. """ if len(_cache) >= _MAX_CACHE_SIZE: _cache.clear() _dirty.clear() log.info("geo_cache_flushed", reason="capacity") _cache[ip] = info if info.country_code is not None: _dirty.add(ip) async def flush_dirty(db: aiosqlite.Connection) -> int: """Persist all new in-memory geo entries to the ``geo_cache`` table. Takes an atomic snapshot of :data:`_dirty`, clears it, then batch-inserts all entries that are still present in :data:`_cache` using a single ``executemany`` call and one ``COMMIT``. This is the only place that writes to the persistent cache during normal operation after startup. If the database write fails the entries are re-added to :data:`_dirty` so they will be retried on the next flush cycle. Args: db: Open :class:`aiosqlite.Connection` to the BanGUI application database. Returns: The number of rows successfully upserted. """ if not _dirty: return 0 # Atomically snapshot and clear in a single-threaded async context. # No ``await`` between copy and clear ensures no interleaving. to_flush = _dirty.copy() _dirty.clear() rows = [ (ip, _cache[ip].country_code, _cache[ip].country_name, _cache[ip].asn, _cache[ip].org) for ip in to_flush if ip in _cache ] if not rows: return 0 try: await db.executemany( """ INSERT INTO geo_cache (ip, country_code, country_name, asn, org) VALUES (?, ?, ?, ?, ?) ON CONFLICT(ip) DO UPDATE SET country_code = excluded.country_code, country_name = excluded.country_name, asn = excluded.asn, org = excluded.org, cached_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') """, rows, ) await db.commit() except Exception as exc: # noqa: BLE001 log.warning("geo_flush_dirty_failed", error=str(exc)) # Re-add to dirty so they are retried on the next flush cycle. _dirty.update(to_flush) return 0 log.info("geo_flush_dirty_complete", count=len(rows)) return len(rows)