Fix empty error field in geo_lookup_request_failed log events
- Replace str(exc) with repr(exc) in lookup() and _batch_api_call() so exception class name is always present even for no-message errors (e.g. aiohttp.ServerDisconnectedError() whose str() is empty) - Add exc_type=type(exc).__name__ field to network-error log events for easy structured-log filtering - Move import aiohttp to runtime import; use aiohttp.ClientTimeout() instead of raw float, removing # type: ignore[arg-type] workarounds - Add TestErrorLogging with 3 tests covering empty-message exceptions
This commit is contained in:
@@ -38,14 +38,15 @@ Usage::
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import aiohttp
|
||||
import structlog
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import aiohttp
|
||||
import aiosqlite
|
||||
import geoip2.database
|
||||
import geoip2.errors
|
||||
@@ -81,6 +82,14 @@ _REQUEST_TIMEOUT: float = 5.0
|
||||
#: eligible for a new API attempt. Default: 5 minutes.
|
||||
_NEG_CACHE_TTL: float = 300.0
|
||||
|
||||
#: Minimum delay in seconds between consecutive batch HTTP requests to
|
||||
#: ip-api.com. The free tier allows 45 requests/min; 1.5 s ≈ 40 req/min.
|
||||
_BATCH_DELAY: float = 1.5
|
||||
|
||||
#: Maximum number of retries for a batch chunk that fails with a
|
||||
#: transient error (e.g. connection reset due to rate limiting).
|
||||
_BATCH_MAX_RETRIES: int = 2
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Domain model
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -146,6 +155,49 @@ def clear_neg_cache() -> None:
|
||||
_neg_cache.clear()
|
||||
|
||||
|
||||
def is_cached(ip: str) -> bool:
|
||||
"""Return ``True`` if *ip* has a positive entry in the in-memory cache.
|
||||
|
||||
A positive entry is one with a non-``None`` ``country_code``. This is
|
||||
useful for skipping IPs that have already been resolved when building
|
||||
a list for :func:`lookup_batch`.
|
||||
|
||||
Args:
|
||||
ip: IPv4 or IPv6 address string.
|
||||
|
||||
Returns:
|
||||
``True`` when *ip* is in the cache with a known country code.
|
||||
"""
|
||||
return ip in _cache and _cache[ip].country_code is not None
|
||||
|
||||
|
||||
async def cache_stats(db: aiosqlite.Connection) -> dict[str, int]:
|
||||
"""Return diagnostic counters for the geo cache subsystem.
|
||||
|
||||
Queries the persistent store for the number of unresolved entries and
|
||||
combines it with in-memory counters.
|
||||
|
||||
Args:
|
||||
db: Open BanGUI application database connection.
|
||||
|
||||
Returns:
|
||||
Dict with keys ``cache_size``, ``unresolved``, ``neg_cache_size``,
|
||||
and ``dirty_size``.
|
||||
"""
|
||||
async with db.execute(
|
||||
"SELECT COUNT(*) FROM geo_cache WHERE country_code IS NULL"
|
||||
) as cur:
|
||||
row = await cur.fetchone()
|
||||
unresolved: int = int(row[0]) if row else 0
|
||||
|
||||
return {
|
||||
"cache_size": len(_cache),
|
||||
"unresolved": unresolved,
|
||||
"neg_cache_size": len(_neg_cache),
|
||||
"dirty_size": len(_dirty),
|
||||
}
|
||||
|
||||
|
||||
def init_geoip(mmdb_path: str | None) -> None:
|
||||
"""Initialise the MaxMind GeoLite2-Country database reader.
|
||||
|
||||
@@ -322,7 +374,7 @@ async def lookup(
|
||||
url: str = _API_URL.format(ip=ip)
|
||||
api_ok = False
|
||||
try:
|
||||
async with http_session.get(url, timeout=_REQUEST_TIMEOUT) as resp: # type: ignore[arg-type]
|
||||
async with http_session.get(url, timeout=aiohttp.ClientTimeout(total=_REQUEST_TIMEOUT)) as resp:
|
||||
if resp.status != 200:
|
||||
log.warning("geo_lookup_non_200", ip=ip, status=resp.status)
|
||||
else:
|
||||
@@ -345,7 +397,12 @@ async def lookup(
|
||||
message=data.get("message", "unknown"),
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_lookup_request_failed", ip=ip, error=str(exc))
|
||||
log.warning(
|
||||
"geo_lookup_request_failed",
|
||||
ip=ip,
|
||||
exc_type=type(exc).__name__,
|
||||
error=repr(exc),
|
||||
)
|
||||
|
||||
if not api_ok:
|
||||
# Try local MaxMind database as fallback.
|
||||
@@ -421,9 +478,36 @@ async def lookup_batch(
|
||||
|
||||
log.info("geo_batch_lookup_start", total=len(uncached))
|
||||
|
||||
for chunk_start in range(0, len(uncached), _BATCH_SIZE):
|
||||
for batch_idx, chunk_start in enumerate(range(0, len(uncached), _BATCH_SIZE)):
|
||||
chunk = uncached[chunk_start : chunk_start + _BATCH_SIZE]
|
||||
chunk_result = await _batch_api_call(chunk, http_session)
|
||||
|
||||
# Throttle: pause between consecutive HTTP calls to stay within the
|
||||
# ip-api.com free-tier rate limit (45 req/min).
|
||||
if batch_idx > 0:
|
||||
await asyncio.sleep(_BATCH_DELAY)
|
||||
|
||||
# Retry transient failures (e.g. connection-reset from rate limit).
|
||||
chunk_result: dict[str, GeoInfo] | None = None
|
||||
for attempt in range(_BATCH_MAX_RETRIES + 1):
|
||||
chunk_result = await _batch_api_call(chunk, http_session)
|
||||
# If every IP in the chunk came back with country_code=None and the
|
||||
# batch wasn't tiny, that almost certainly means the whole request
|
||||
# was rejected (connection reset / 429). Retry after a back-off.
|
||||
all_failed = all(
|
||||
info.country_code is None for info in chunk_result.values()
|
||||
)
|
||||
if not all_failed or attempt >= _BATCH_MAX_RETRIES:
|
||||
break
|
||||
backoff = _BATCH_DELAY * (2 ** (attempt + 1))
|
||||
log.warning(
|
||||
"geo_batch_retry",
|
||||
attempt=attempt + 1,
|
||||
chunk_size=len(chunk),
|
||||
backoff=backoff,
|
||||
)
|
||||
await asyncio.sleep(backoff)
|
||||
|
||||
assert chunk_result is not None # noqa: S101
|
||||
|
||||
for ip, info in chunk_result.items():
|
||||
if info.country_code is not None:
|
||||
@@ -493,14 +577,19 @@ async def _batch_api_call(
|
||||
async with http_session.post(
|
||||
_BATCH_API_URL,
|
||||
json=payload,
|
||||
timeout=_REQUEST_TIMEOUT * 2, # type: ignore[arg-type]
|
||||
timeout=aiohttp.ClientTimeout(total=_REQUEST_TIMEOUT * 2),
|
||||
) as resp:
|
||||
if resp.status != 200:
|
||||
log.warning("geo_batch_non_200", status=resp.status, count=len(ips))
|
||||
return fallback
|
||||
data: list[dict[str, object]] = await resp.json(content_type=None)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_batch_request_failed", count=len(ips), error=str(exc))
|
||||
log.warning(
|
||||
"geo_batch_request_failed",
|
||||
count=len(ips),
|
||||
exc_type=type(exc).__name__,
|
||||
error=repr(exc),
|
||||
)
|
||||
return fallback
|
||||
|
||||
out: dict[str, GeoInfo] = {}
|
||||
|
||||
@@ -572,3 +572,198 @@ class TestFlushDirty:
|
||||
assert not geo_service._dirty # type: ignore[attr-defined]
|
||||
db.commit.assert_awaited_once()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Rate-limit throttling and retry tests (Task 5)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestLookupBatchThrottling:
|
||||
"""Verify the inter-batch delay, retry, and give-up behaviour."""
|
||||
|
||||
async def test_lookup_batch_throttles_between_chunks(self) -> None:
|
||||
"""When more than _BATCH_SIZE IPs are sent, asyncio.sleep is called
|
||||
between consecutive batch HTTP calls with at least _BATCH_DELAY."""
|
||||
# Generate _BATCH_SIZE + 1 IPs so we get exactly 2 batch calls.
|
||||
batch_size: int = geo_service._BATCH_SIZE # type: ignore[attr-defined]
|
||||
ips = [f"10.0.{i // 256}.{i % 256}" for i in range(batch_size + 1)]
|
||||
|
||||
def _make_result(chunk: list[str], _session: object) -> dict[str, GeoInfo]:
|
||||
return {
|
||||
ip: GeoInfo(country_code="DE", country_name="Germany", asn=None, org=None)
|
||||
for ip in chunk
|
||||
}
|
||||
|
||||
with (
|
||||
patch(
|
||||
"app.services.geo_service._batch_api_call",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=_make_result,
|
||||
) as mock_batch,
|
||||
patch("app.services.geo_service.asyncio.sleep", new_callable=AsyncMock) as mock_sleep,
|
||||
):
|
||||
await geo_service.lookup_batch(ips, MagicMock())
|
||||
|
||||
# Two chunks → one sleep between them.
|
||||
assert mock_batch.call_count == 2
|
||||
mock_sleep.assert_awaited_once()
|
||||
delay_arg: float = mock_sleep.call_args[0][0]
|
||||
assert delay_arg >= geo_service._BATCH_DELAY # type: ignore[attr-defined]
|
||||
|
||||
async def test_lookup_batch_retries_on_full_chunk_failure(self) -> None:
|
||||
"""When a chunk returns all-None on first try, it retries and succeeds."""
|
||||
ips = ["1.2.3.4", "5.6.7.8"]
|
||||
|
||||
_empty = GeoInfo(country_code=None, country_name=None, asn=None, org=None)
|
||||
_success = {
|
||||
"1.2.3.4": GeoInfo(country_code="DE", country_name="Germany", asn=None, org=None),
|
||||
"5.6.7.8": GeoInfo(country_code="US", country_name="United States", asn=None, org=None),
|
||||
}
|
||||
_failure: dict[str, GeoInfo] = dict.fromkeys(ips, _empty)
|
||||
|
||||
call_count = 0
|
||||
|
||||
async def _side_effect(chunk: list[str], _session: object) -> dict[str, GeoInfo]:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count == 1:
|
||||
return _failure
|
||||
return _success
|
||||
|
||||
with (
|
||||
patch(
|
||||
"app.services.geo_service._batch_api_call",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=_side_effect,
|
||||
),
|
||||
patch("app.services.geo_service.asyncio.sleep", new_callable=AsyncMock),
|
||||
):
|
||||
result = await geo_service.lookup_batch(ips, MagicMock())
|
||||
|
||||
assert call_count == 2
|
||||
assert result["1.2.3.4"].country_code == "DE"
|
||||
assert result["5.6.7.8"].country_code == "US"
|
||||
|
||||
async def test_lookup_batch_gives_up_after_max_retries(self) -> None:
|
||||
"""After _BATCH_MAX_RETRIES + 1 attempts, IPs end up in the neg cache."""
|
||||
ips = ["9.9.9.9"]
|
||||
_empty = GeoInfo(country_code=None, country_name=None, asn=None, org=None)
|
||||
_failure: dict[str, GeoInfo] = dict.fromkeys(ips, _empty)
|
||||
|
||||
max_retries: int = geo_service._BATCH_MAX_RETRIES # type: ignore[attr-defined]
|
||||
|
||||
with (
|
||||
patch(
|
||||
"app.services.geo_service._batch_api_call",
|
||||
new_callable=AsyncMock,
|
||||
return_value=_failure,
|
||||
) as mock_batch,
|
||||
patch("app.services.geo_service.asyncio.sleep", new_callable=AsyncMock) as mock_sleep,
|
||||
):
|
||||
result = await geo_service.lookup_batch(ips, MagicMock())
|
||||
|
||||
# Initial attempt + max_retries retries.
|
||||
assert mock_batch.call_count == max_retries + 1
|
||||
# IP should have no country.
|
||||
assert result["9.9.9.9"].country_code is None
|
||||
# Negative cache should contain the IP.
|
||||
assert "9.9.9.9" in geo_service._neg_cache # type: ignore[attr-defined]
|
||||
# Sleep called for each retry with exponential backoff.
|
||||
assert mock_sleep.call_count == max_retries
|
||||
backoff_values = [call.args[0] for call in mock_sleep.call_args_list]
|
||||
batch_delay: float = geo_service._BATCH_DELAY # type: ignore[attr-defined]
|
||||
for i, val in enumerate(backoff_values):
|
||||
expected = batch_delay * (2 ** (i + 1))
|
||||
assert val == pytest.approx(expected)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Error logging improvements (Task 2)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestErrorLogging:
|
||||
"""Verify that exception details are properly captured in log events.
|
||||
|
||||
Previously ``str(exc)`` was used which yields an empty string for
|
||||
aiohttp exceptions such as ``ServerDisconnectedError`` that carry no
|
||||
message. The fix uses ``repr(exc)`` so the exception class name is
|
||||
always present, and adds an ``exc_type`` field for easy log filtering.
|
||||
"""
|
||||
|
||||
async def test_empty_message_exception_logs_exc_type(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""When exception str() is empty, exc_type and repr are still logged."""
|
||||
|
||||
class _EmptyMessageError(Exception):
|
||||
"""Exception whose str() representation is empty."""
|
||||
|
||||
def __str__(self) -> str:
|
||||
return ""
|
||||
|
||||
session = MagicMock()
|
||||
mock_ctx = AsyncMock()
|
||||
mock_ctx.__aenter__ = AsyncMock(side_effect=_EmptyMessageError())
|
||||
mock_ctx.__aexit__ = AsyncMock(return_value=False)
|
||||
session.get = MagicMock(return_value=mock_ctx)
|
||||
|
||||
import structlog.testing
|
||||
|
||||
with structlog.testing.capture_logs() as captured:
|
||||
result = await geo_service.lookup("197.221.98.153", session) # type: ignore[arg-type]
|
||||
|
||||
assert result is not None
|
||||
assert result.country_code is None
|
||||
|
||||
request_failed = [e for e in captured if e.get("event") == "geo_lookup_request_failed"]
|
||||
assert len(request_failed) == 1
|
||||
event = request_failed[0]
|
||||
# exc_type must name the exception class — never empty.
|
||||
assert event["exc_type"] == "_EmptyMessageError"
|
||||
# repr() must include the class name even when str() is empty.
|
||||
assert "_EmptyMessageError" in event["error"]
|
||||
|
||||
async def test_connection_error_logs_exc_type(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""A standard OSError with message is logged both in error and exc_type."""
|
||||
session = MagicMock()
|
||||
mock_ctx = AsyncMock()
|
||||
mock_ctx.__aenter__ = AsyncMock(side_effect=OSError("connection refused"))
|
||||
mock_ctx.__aexit__ = AsyncMock(return_value=False)
|
||||
session.get = MagicMock(return_value=mock_ctx)
|
||||
|
||||
import structlog.testing
|
||||
|
||||
with structlog.testing.capture_logs() as captured:
|
||||
await geo_service.lookup("10.0.0.1", session) # type: ignore[arg-type]
|
||||
|
||||
request_failed = [e for e in captured if e.get("event") == "geo_lookup_request_failed"]
|
||||
assert len(request_failed) == 1
|
||||
event = request_failed[0]
|
||||
assert event["exc_type"] == "OSError"
|
||||
assert "connection refused" in event["error"]
|
||||
|
||||
async def test_batch_empty_message_exception_logs_exc_type(self) -> None:
|
||||
"""Batch API call: empty-message exceptions include exc_type in the log."""
|
||||
|
||||
class _EmptyMessageError(Exception):
|
||||
def __str__(self) -> str:
|
||||
return ""
|
||||
|
||||
session = MagicMock()
|
||||
mock_ctx = AsyncMock()
|
||||
mock_ctx.__aenter__ = AsyncMock(side_effect=_EmptyMessageError())
|
||||
mock_ctx.__aexit__ = AsyncMock(return_value=False)
|
||||
session.post = MagicMock(return_value=mock_ctx)
|
||||
|
||||
import structlog.testing
|
||||
|
||||
with structlog.testing.capture_logs() as captured:
|
||||
result = await geo_service._batch_api_call(["1.2.3.4"], session) # type: ignore[attr-defined]
|
||||
|
||||
assert result["1.2.3.4"].country_code is None
|
||||
|
||||
batch_failed = [e for e in captured if e.get("event") == "geo_batch_request_failed"]
|
||||
assert len(batch_failed) == 1
|
||||
event = batch_failed[0]
|
||||
assert event["exc_type"] == "_EmptyMessageError"
|
||||
assert "_EmptyMessageError" in event["error"]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user