"""Geo / IP lookup router. Provides the IP enrichment endpoint: * ``GET /api/geo/lookup/{ip}`` — ban status, ban history, and geo info for an IP """ from __future__ import annotations from typing import TYPE_CHECKING, Annotated if TYPE_CHECKING: import aiohttp from fastapi import APIRouter, HTTPException, Path, Request, status from app.dependencies import AuthDep from app.models.geo import GeoDetail, IpLookupResponse from app.services import geo_service, jail_service from app.utils.fail2ban_client import Fail2BanConnectionError router: APIRouter = APIRouter(prefix="/api/geo", tags=["Geo"]) _IpPath = Annotated[str, Path(description="IPv4 or IPv6 address to look up.")] @router.get( "/lookup/{ip}", response_model=IpLookupResponse, summary="Look up ban status and geo information for an IP", ) async def lookup_ip( request: Request, _auth: AuthDep, ip: _IpPath, ) -> IpLookupResponse: """Return current ban status, geo data, and network information for an IP. Checks every running fail2ban jail to determine whether the IP is currently banned, and enriches the result with country, ASN, and organisation data from ip-api.com. Args: request: Incoming request (used to access ``app.state``). _auth: Validated session — enforces authentication. ip: The IP address to look up. Returns: :class:`~app.models.geo.IpLookupResponse` with ban status and geo data. Raises: HTTPException: 400 when *ip* is not a valid IP address. HTTPException: 502 when fail2ban is unreachable. """ socket_path: str = request.app.state.settings.fail2ban_socket http_session: aiohttp.ClientSession = request.app.state.http_session async def _enricher(addr: str) -> geo_service.GeoInfo | None: return await geo_service.lookup(addr, http_session) try: result = await jail_service.lookup_ip( socket_path, ip, geo_enricher=_enricher, ) except ValueError as exc: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc), ) from exc except Fail2BanConnectionError as exc: raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Cannot reach fail2ban: {exc}", ) from exc raw_geo = result.get("geo") geo_detail: GeoDetail | None = None if raw_geo is not None: geo_detail = GeoDetail( country_code=raw_geo.country_code, country_name=raw_geo.country_name, asn=raw_geo.asn, org=raw_geo.org, ) return IpLookupResponse( ip=result["ip"], currently_banned_in=result["currently_banned_in"], geo=geo_detail, )