Files
BanGUI/backend/tests/test_services/test_geo_service.py
Lukas 9ac7f8d22d feat: implement dashboard ban overview (Stage 5)
- Add ban_service reading fail2ban SQLite DB via read-only aiosqlite
- Add geo_service resolving IPs via ip-api.com with 10k in-memory cache
- Add GET /api/dashboard/bans and GET /api/dashboard/accesses endpoints
- Add TimeRange, DashboardBanItem, DashboardBanListResponse, AccessListItem,
  AccessListResponse models in models/ban.py
- Build BanTable component (Fluent UI DataGrid) with bans/accesses modes,
  pagination, loading/error/empty states, and ban-count badges
- Build useBans hook managing time-range and pagination state
- Update DashboardPage: status bar + time-range toolbar + tab switcher
- Add 37 new backend tests (ban service, geo service, dashboard router)
- All 141 tests pass; ruff/mypy --strict/tsc --noEmit clean
2026-03-01 12:57:19 +01:00

213 lines
7.3 KiB
Python

"""Tests for geo_service.lookup()."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from app.services import geo_service
from app.services.geo_service import GeoInfo
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_session(response_json: dict[str, object], status: int = 200) -> MagicMock:
"""Build a mock aiohttp.ClientSession that returns *response_json*.
Args:
response_json: The dict that the mock response's ``json()`` returns.
status: HTTP status code for the mock response.
Returns:
A :class:`MagicMock` that behaves like an
``aiohttp.ClientSession`` in an ``async with`` context.
"""
mock_resp = AsyncMock()
mock_resp.status = status
mock_resp.json = AsyncMock(return_value=response_json)
mock_ctx = AsyncMock()
mock_ctx.__aenter__ = AsyncMock(return_value=mock_resp)
mock_ctx.__aexit__ = AsyncMock(return_value=False)
session = MagicMock()
session.get = MagicMock(return_value=mock_ctx)
return session
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def clear_geo_cache() -> None: # type: ignore[misc]
"""Flush the module-level geo cache before every test."""
geo_service.clear_cache()
# ---------------------------------------------------------------------------
# Happy path
# ---------------------------------------------------------------------------
class TestLookupSuccess:
"""geo_service.lookup() under normal conditions."""
async def test_returns_country_code(self) -> None:
"""country_code is populated from the ``countryCode`` field."""
session = _make_session(
{
"status": "success",
"countryCode": "DE",
"country": "Germany",
"as": "AS3320 Deutsche Telekom AG",
"org": "AS3320 Deutsche Telekom AG",
}
)
result = await geo_service.lookup("1.2.3.4", session) # type: ignore[arg-type]
assert result is not None
assert result.country_code == "DE"
async def test_returns_country_name(self) -> None:
"""country_name is populated from the ``country`` field."""
session = _make_session(
{
"status": "success",
"countryCode": "US",
"country": "United States",
"as": "AS15169 Google LLC",
"org": "Google LLC",
}
)
result = await geo_service.lookup("8.8.8.8", session) # type: ignore[arg-type]
assert result is not None
assert result.country_name == "United States"
async def test_asn_extracted_without_org_suffix(self) -> None:
"""The ASN field contains only the ``AS<N>`` prefix, not the full string."""
session = _make_session(
{
"status": "success",
"countryCode": "DE",
"country": "Germany",
"as": "AS3320 Deutsche Telekom AG",
"org": "Deutsche Telekom",
}
)
result = await geo_service.lookup("1.2.3.4", session) # type: ignore[arg-type]
assert result is not None
assert result.asn == "AS3320"
async def test_org_populated(self) -> None:
"""org field is populated from the ``org`` key."""
session = _make_session(
{
"status": "success",
"countryCode": "US",
"country": "United States",
"as": "AS15169 Google LLC",
"org": "Google LLC",
}
)
result = await geo_service.lookup("8.8.8.8", session) # type: ignore[arg-type]
assert result is not None
assert result.org == "Google LLC"
# ---------------------------------------------------------------------------
# Cache behaviour
# ---------------------------------------------------------------------------
class TestLookupCaching:
"""Verify that results are cached and the cache can be cleared."""
async def test_second_call_uses_cache(self) -> None:
"""Subsequent lookups for the same IP do not make additional HTTP requests."""
session = _make_session(
{
"status": "success",
"countryCode": "DE",
"country": "Germany",
"as": "AS3320 Deutsche Telekom AG",
"org": "Deutsche Telekom",
}
)
await geo_service.lookup("1.2.3.4", session) # type: ignore[arg-type]
await geo_service.lookup("1.2.3.4", session) # type: ignore[arg-type]
# The session.get() should only have been called once.
assert session.get.call_count == 1
async def test_clear_cache_forces_refetch(self) -> None:
"""After clearing the cache a new HTTP request is made."""
session = _make_session(
{
"status": "success",
"countryCode": "DE",
"country": "Germany",
"as": "AS3320",
"org": "Telekom",
}
)
await geo_service.lookup("2.3.4.5", session) # type: ignore[arg-type]
geo_service.clear_cache()
await geo_service.lookup("2.3.4.5", session) # type: ignore[arg-type]
assert session.get.call_count == 2
async def test_negative_result_cached(self) -> None:
"""A failed lookup result (status != success) is also cached."""
session = _make_session(
{"status": "fail", "message": "reserved range"}
)
await geo_service.lookup("192.168.1.1", session) # type: ignore[arg-type]
await geo_service.lookup("192.168.1.1", session) # type: ignore[arg-type]
assert session.get.call_count == 1
# ---------------------------------------------------------------------------
# Failure modes
# ---------------------------------------------------------------------------
class TestLookupFailures:
"""geo_service.lookup() when things go wrong."""
async def test_non_200_response_returns_none(self) -> None:
"""A 429 or 500 status returns ``None`` without caching."""
session = _make_session({}, status=429)
result = await geo_service.lookup("1.2.3.4", session) # type: ignore[arg-type]
assert result is None
async def test_network_error_returns_none(self) -> None:
"""A network exception returns ``None``."""
session = MagicMock()
session.get = MagicMock(side_effect=OSError("connection refused"))
result = await geo_service.lookup("10.0.0.1", session) # type: ignore[arg-type]
assert result is None
async def test_failed_status_returns_geo_info_with_nulls(self) -> None:
"""When ip-api returns ``status=fail`` a GeoInfo with null fields is cached."""
session = _make_session({"status": "fail", "message": "private range"})
result = await geo_service.lookup("10.0.0.1", session) # type: ignore[arg-type]
assert result is not None
assert isinstance(result, GeoInfo)
assert result.country_code is None
assert result.country_name is None