"""Tests for geo_service.lookup().""" from __future__ import annotations from unittest.mock import AsyncMock, MagicMock, patch import pytest from app.services import geo_service from app.services.geo_service import GeoInfo # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_session(response_json: dict[str, object], status: int = 200) -> MagicMock: """Build a mock aiohttp.ClientSession that returns *response_json*. Args: response_json: The dict that the mock response's ``json()`` returns. status: HTTP status code for the mock response. Returns: A :class:`MagicMock` that behaves like an ``aiohttp.ClientSession`` in an ``async with`` context. """ mock_resp = AsyncMock() mock_resp.status = status mock_resp.json = AsyncMock(return_value=response_json) mock_ctx = AsyncMock() mock_ctx.__aenter__ = AsyncMock(return_value=mock_resp) mock_ctx.__aexit__ = AsyncMock(return_value=False) session = MagicMock() session.get = MagicMock(return_value=mock_ctx) return session # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture(autouse=True) def clear_geo_cache() -> None: # type: ignore[misc] """Flush the module-level geo cache before every test.""" geo_service.clear_cache() # --------------------------------------------------------------------------- # Happy path # --------------------------------------------------------------------------- class TestLookupSuccess: """geo_service.lookup() under normal conditions.""" async def test_returns_country_code(self) -> None: """country_code is populated from the ``countryCode`` field.""" session = _make_session( { "status": "success", "countryCode": "DE", "country": "Germany", "as": "AS3320 Deutsche Telekom AG", "org": "AS3320 Deutsche Telekom AG", } ) result = await geo_service.lookup("1.2.3.4", session) # type: ignore[arg-type] assert result is not None assert result.country_code == "DE" async def test_returns_country_name(self) -> None: """country_name is populated from the ``country`` field.""" session = _make_session( { "status": "success", "countryCode": "US", "country": "United States", "as": "AS15169 Google LLC", "org": "Google LLC", } ) result = await geo_service.lookup("8.8.8.8", session) # type: ignore[arg-type] assert result is not None assert result.country_name == "United States" async def test_asn_extracted_without_org_suffix(self) -> None: """The ASN field contains only the ``AS`` prefix, not the full string.""" session = _make_session( { "status": "success", "countryCode": "DE", "country": "Germany", "as": "AS3320 Deutsche Telekom AG", "org": "Deutsche Telekom", } ) result = await geo_service.lookup("1.2.3.4", session) # type: ignore[arg-type] assert result is not None assert result.asn == "AS3320" async def test_org_populated(self) -> None: """org field is populated from the ``org`` key.""" session = _make_session( { "status": "success", "countryCode": "US", "country": "United States", "as": "AS15169 Google LLC", "org": "Google LLC", } ) result = await geo_service.lookup("8.8.8.8", session) # type: ignore[arg-type] assert result is not None assert result.org == "Google LLC" # --------------------------------------------------------------------------- # Cache behaviour # --------------------------------------------------------------------------- class TestLookupCaching: """Verify that results are cached and the cache can be cleared.""" async def test_second_call_uses_cache(self) -> None: """Subsequent lookups for the same IP do not make additional HTTP requests.""" session = _make_session( { "status": "success", "countryCode": "DE", "country": "Germany", "as": "AS3320 Deutsche Telekom AG", "org": "Deutsche Telekom", } ) await geo_service.lookup("1.2.3.4", session) # type: ignore[arg-type] await geo_service.lookup("1.2.3.4", session) # type: ignore[arg-type] # The session.get() should only have been called once. assert session.get.call_count == 1 async def test_clear_cache_forces_refetch(self) -> None: """After clearing the cache a new HTTP request is made.""" session = _make_session( { "status": "success", "countryCode": "DE", "country": "Germany", "as": "AS3320", "org": "Telekom", } ) await geo_service.lookup("2.3.4.5", session) # type: ignore[arg-type] geo_service.clear_cache() await geo_service.lookup("2.3.4.5", session) # type: ignore[arg-type] assert session.get.call_count == 2 async def test_negative_result_stored_in_neg_cache(self) -> None: """A failed lookup is stored in the negative cache, so the second call is blocked.""" session = _make_session( {"status": "fail", "message": "reserved range"} ) await geo_service.lookup("192.168.1.1", session) # type: ignore[arg-type] await geo_service.lookup("192.168.1.1", session) # type: ignore[arg-type] # Second call is blocked by the negative cache — only one API hit. assert session.get.call_count == 1 # --------------------------------------------------------------------------- # Failure modes # --------------------------------------------------------------------------- class TestLookupFailures: """geo_service.lookup() when things go wrong.""" async def test_non_200_response_returns_null_geo_info(self) -> None: """A 429 or 500 status returns GeoInfo with null fields (not None).""" session = _make_session({}, status=429) result = await geo_service.lookup("1.2.3.4", session) # type: ignore[arg-type] assert result is not None assert isinstance(result, GeoInfo) assert result.country_code is None async def test_network_error_returns_null_geo_info(self) -> None: """A network exception returns GeoInfo with null fields (not None).""" session = MagicMock() mock_ctx = AsyncMock() mock_ctx.__aenter__ = AsyncMock(side_effect=OSError("connection refused")) mock_ctx.__aexit__ = AsyncMock(return_value=False) session.get = MagicMock(return_value=mock_ctx) result = await geo_service.lookup("10.0.0.1", session) # type: ignore[arg-type] assert result is not None assert isinstance(result, GeoInfo) assert result.country_code is None async def test_failed_status_returns_geo_info_with_nulls(self) -> None: """When ip-api returns ``status=fail`` a GeoInfo with null fields is returned (but not cached).""" session = _make_session({"status": "fail", "message": "private range"}) result = await geo_service.lookup("10.0.0.1", session) # type: ignore[arg-type] assert result is not None assert isinstance(result, GeoInfo) assert result.country_code is None assert result.country_name is None # --------------------------------------------------------------------------- # Negative cache # --------------------------------------------------------------------------- class TestNegativeCache: """Verify the negative cache throttles retries for failing IPs.""" async def test_neg_cache_blocks_second_lookup(self) -> None: """After a failed lookup the second call is served from the neg cache.""" session = _make_session({"status": "fail", "message": "private range"}) r1 = await geo_service.lookup("192.0.2.1", session) # type: ignore[arg-type] r2 = await geo_service.lookup("192.0.2.1", session) # type: ignore[arg-type] # Only one HTTP call should have been made; second served from neg cache. assert session.get.call_count == 1 assert r1 is not None and r1.country_code is None assert r2 is not None and r2.country_code is None async def test_neg_cache_retries_after_ttl(self) -> None: """When the neg-cache entry is older than the TTL a new API call is made.""" session = _make_session({"status": "fail", "message": "private range"}) await geo_service.lookup("192.0.2.2", session) # type: ignore[arg-type] # Manually expire the neg-cache entry. geo_service._neg_cache["192.0.2.2"] -= geo_service._NEG_CACHE_TTL + 1 # type: ignore[attr-defined] await geo_service.lookup("192.0.2.2", session) # type: ignore[arg-type] # Both calls should have hit the API. assert session.get.call_count == 2 async def test_clear_neg_cache_allows_immediate_retry(self) -> None: """After clearing the neg cache the IP is eligible for a new API call.""" session = _make_session({"status": "fail", "message": "private range"}) await geo_service.lookup("192.0.2.3", session) # type: ignore[arg-type] geo_service.clear_neg_cache() await geo_service.lookup("192.0.2.3", session) # type: ignore[arg-type] assert session.get.call_count == 2 async def test_successful_lookup_does_not_pollute_neg_cache(self) -> None: """A successful lookup must not create a neg-cache entry.""" session = _make_session( { "status": "success", "countryCode": "DE", "country": "Germany", "as": "AS3320", "org": "Telekom", } ) await geo_service.lookup("1.2.3.4", session) # type: ignore[arg-type] assert "1.2.3.4" not in geo_service._neg_cache # type: ignore[attr-defined] # --------------------------------------------------------------------------- # GeoIP2 (MaxMind) fallback # --------------------------------------------------------------------------- class TestGeoipFallback: """Verify the MaxMind GeoLite2 fallback is used when ip-api fails.""" def _make_geoip_reader(self, iso_code: str, name: str) -> MagicMock: """Build a mock geoip2.database.Reader that returns *iso_code*.""" country_mock = MagicMock() country_mock.iso_code = iso_code country_mock.name = name response_mock = MagicMock() response_mock.country = country_mock reader = MagicMock() reader.country = MagicMock(return_value=response_mock) return reader async def test_geoip_fallback_called_when_api_fails(self) -> None: """When ip-api returns status=fail, the geoip2 reader is consulted.""" session = _make_session({"status": "fail", "message": "reserved range"}) mock_reader = self._make_geoip_reader("DE", "Germany") with patch.object(geo_service, "_geoip_reader", mock_reader): result = await geo_service.lookup("1.2.3.4", session) # type: ignore[arg-type] mock_reader.country.assert_called_once_with("1.2.3.4") assert result is not None assert result.country_code == "DE" assert result.country_name == "Germany" async def test_geoip_fallback_result_stored_in_cache(self) -> None: """A successful geoip2 fallback result is stored in the positive cache.""" session = _make_session({"status": "fail", "message": "reserved range"}) mock_reader = self._make_geoip_reader("US", "United States") with patch.object(geo_service, "_geoip_reader", mock_reader): await geo_service.lookup("8.8.8.8", session) # type: ignore[arg-type] # Second call must be served from positive cache without hitting API. await geo_service.lookup("8.8.8.8", session) # type: ignore[arg-type] assert session.get.call_count == 1 assert "8.8.8.8" in geo_service._cache # type: ignore[attr-defined] async def test_geoip_fallback_not_called_on_api_success(self) -> None: """When ip-api succeeds, the geoip2 reader must not be consulted.""" session = _make_session( { "status": "success", "countryCode": "JP", "country": "Japan", "as": "AS12345", "org": "NTT", } ) mock_reader = self._make_geoip_reader("XX", "Nowhere") with patch.object(geo_service, "_geoip_reader", mock_reader): result = await geo_service.lookup("1.2.3.4", session) # type: ignore[arg-type] mock_reader.country.assert_not_called() assert result is not None assert result.country_code == "JP" async def test_geoip_fallback_not_called_when_no_reader(self) -> None: """When no geoip2 reader is configured, the fallback silently does nothing.""" session = _make_session({"status": "fail", "message": "private range"}) with patch.object(geo_service, "_geoip_reader", None): result = await geo_service.lookup("10.0.0.1", session) # type: ignore[arg-type] assert result is not None assert result.country_code is None