"""Geo service. Resolves IP addresses to their country, ASN, and organisation using the `ip-api.com `_ JSON API. Results are cached in memory to avoid redundant HTTP requests for addresses that appear repeatedly. The free ip-api.com endpoint requires no API key and supports up to 45 requests per minute. Because results are cached indefinitely for the life of the process, under normal load the rate limit is rarely approached. Usage:: import aiohttp from app.services import geo_service async with aiohttp.ClientSession() as session: info = await geo_service.lookup("1.2.3.4", session) if info: print(info.country_code) # "DE" """ from __future__ import annotations from dataclasses import dataclass from typing import TYPE_CHECKING import structlog if TYPE_CHECKING: import aiohttp log: structlog.stdlib.BoundLogger = structlog.get_logger() # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- #: ip-api.com single-IP lookup endpoint (HTTP only on the free tier). _API_URL: str = "http://ip-api.com/json/{ip}?fields=status,message,country,countryCode,org,as" #: Maximum number of entries kept in the in-process cache before it is #: flushed completely. A simple eviction strategy — the cache is cheap to #: rebuild and memory is bounded. _MAX_CACHE_SIZE: int = 10_000 #: Timeout for outgoing geo API requests in seconds. _REQUEST_TIMEOUT: float = 5.0 # --------------------------------------------------------------------------- # Domain model # --------------------------------------------------------------------------- @dataclass class GeoInfo: """Geographical and network metadata for a single IP address. All fields default to ``None`` when the information is unavailable or the lookup fails gracefully. """ country_code: str | None """ISO 3166-1 alpha-2 country code, e.g. ``"DE"``.""" country_name: str | None """Human-readable country name, e.g. ``"Germany"``.""" asn: str | None """Autonomous System Number string, e.g. ``"AS3320"``.""" org: str | None """Organisation name associated with the IP, e.g. ``"Deutsche Telekom"``.""" # --------------------------------------------------------------------------- # Internal cache # --------------------------------------------------------------------------- #: Module-level in-memory cache: ``ip → GeoInfo``. _cache: dict[str, GeoInfo] = {} def clear_cache() -> None: """Flush the entire lookup cache. Useful in tests and when the operator suspects stale data. """ _cache.clear() # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- async def lookup(ip: str, http_session: aiohttp.ClientSession) -> GeoInfo | None: """Resolve an IP address to country, ASN, and organisation metadata. Results are cached in-process. If the cache exceeds ``_MAX_CACHE_SIZE`` entries it is flushed before the new result is stored, keeping memory usage bounded. Private, loopback, and link-local addresses are resolved to a placeholder ``GeoInfo`` with ``None`` values so callers are not blocked by pointless API calls for RFC-1918 ranges. Args: ip: IPv4 or IPv6 address string. http_session: Shared :class:`aiohttp.ClientSession` (from ``app.state.http_session``). Returns: A :class:`GeoInfo` instance, or ``None`` when the lookup fails in a way that should prevent the caller from caching a bad result (e.g. network timeout). """ if ip in _cache: return _cache[ip] url: str = _API_URL.format(ip=ip) try: async with http_session.get(url, timeout=_REQUEST_TIMEOUT) as resp: # type: ignore[arg-type] if resp.status != 200: log.warning("geo_lookup_non_200", ip=ip, status=resp.status) return None data: dict[str, object] = await resp.json(content_type=None) except Exception as exc: # noqa: BLE001 log.warning("geo_lookup_request_failed", ip=ip, error=str(exc)) return None if data.get("status") != "success": log.debug( "geo_lookup_failed", ip=ip, message=data.get("message", "unknown"), ) # Still cache a negative result so we do not retry reserved IPs. result = GeoInfo(country_code=None, country_name=None, asn=None, org=None) _store(ip, result) return result country_code: str | None = _str_or_none(data.get("countryCode")) country_name: str | None = _str_or_none(data.get("country")) asn_raw: str | None = _str_or_none(data.get("as")) org_raw: str | None = _str_or_none(data.get("org")) # ip-api returns the full "AS12345 Some Org" string in both "as" and "org". # Extract just the AS number prefix for the asn field. asn: str | None = asn_raw.split()[0] if asn_raw else None org: str | None = org_raw result = GeoInfo( country_code=country_code, country_name=country_name, asn=asn, org=org, ) _store(ip, result) log.debug("geo_lookup_success", ip=ip, country=country_code, asn=asn) return result # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _str_or_none(value: object) -> str | None: """Return *value* as a non-empty string, or ``None``. Args: value: Raw JSON value which may be ``None``, empty, or a string. Returns: Stripped string if non-empty, else ``None``. """ if value is None: return None s = str(value).strip() return s if s else None def _store(ip: str, info: GeoInfo) -> None: """Insert *info* into the module-level cache, flushing if over capacity. Args: ip: The IP address key. info: The :class:`GeoInfo` to store. """ if len(_cache) >= _MAX_CACHE_SIZE: _cache.clear() log.info("geo_cache_flushed", reason="capacity") _cache[ip] = info