From 44a5a3d70ec087d3f65b40a4e58cf54ff200a996 Mon Sep 17 00:00:00 2001 From: Lukas Date: Tue, 10 Mar 2026 18:45:58 +0100 Subject: [PATCH] Fix geo cache write performance: batch commits, read-only GETs, dirty flush - Remove per-IP db.commit() from _persist_entry() and _persist_neg_entry(); add a single commit after the full lookup_batch() chunk loop instead. Reduces commits from ~5,200 to 1 per bans/by-country request. - Remove db dependency from GET /api/dashboard/bans and GET /api/dashboard/bans/by-country; pass app_db=None so no SQLite writes occur during read-only requests. - Add _dirty set to geo_service; _store() marks resolved IPs dirty. New flush_dirty(db) batch-upserts all dirty entries in one transaction. New geo_cache_flush APScheduler task flushes every 60 s so geo data is persisted without blocking requests. --- Docs/Tasks.md | 150 +++++++++++-- backend/app/main.py | 5 +- backend/app/routers/dashboard.py | 24 +- backend/app/services/geo_service.py | 85 ++++++- backend/app/tasks/geo_cache_flush.py | 66 ++++++ .../tests/test_services/test_geo_service.py | 209 ++++++++++++++++++ 6 files changed, 505 insertions(+), 34 deletions(-) create mode 100644 backend/app/tasks/geo_cache_flush.py diff --git a/Docs/Tasks.md b/Docs/Tasks.md index 1b3d793..d51933c 100644 --- a/Docs/Tasks.md +++ b/Docs/Tasks.md @@ -4,24 +4,140 @@ This document breaks the entire BanGUI project into development stages, ordered --- -## Completed +## Issue: World Map Loading Time — Architecture Fix -### [DONE] Fix: Country column shows "—" for blocklist-import IPs in ban list +### Problem Summary -**Root cause:** `ban_service.list_bans()` resolved geo data one IP at a time via -`geo_service.lookup()`, which uses the ip-api.com single-IP endpoint (45 req/min -free tier limit). A page of 100 bans triggered 100 sequential HTTP requests; -after the ~45th request ip-api.com applied rate limiting, all remaining IPs were -added to the in-process negative cache (5 min TTL), and they showed "—" in the -country column permanently until the TTL expired. Because the map endpoint -(`bans_by_country`) used `lookup_batch` (100 IPs per POST), it never hit the -rate limit, which is why the map showed colours while the list did not. +The `GET /api/dashboard/bans/by-country` endpoint is extremely slow on first load. A single request with ~5,200 unique IPs produces **10,400 SQLite commits** and **6,000 INSERT statements** against the app database — all during a read-only GET request. The log shows 21,000+ lines of SQL trace for just 18 HTTP requests. -**Fix:** Added `http_session` and `app_db` parameters to `list_bans()`. When -`http_session` is provided (production path via the dashboard router), the entire -page of IPs is resolved in a single `geo_service.lookup_batch()` call instead of -100 individual ones. The legacy `geo_enricher` callback is kept for backwards -compatibility in tests. Updated `dashboard.py` to pass `http_session` and `db` -instead of constructing a per-IP enricher closure. Added 3 new tests covering -the batch path, failure resilience, and priority over `geo_enricher`. +Root causes (ordered by impact): +1. **Per-IP commit during geo cache writes** — `geo_service._persist_entry()` and `_persist_neg_entry()` each call `await db.commit()` after every single INSERT. With 5,200 uncached IPs this means 5,200+ individual commits, each forcing an `fsync`. This is the dominant bottleneck. +2. **DB writes on a GET request** — The bans/by-country endpoint passes `app_db` to `geo_service.lookup_batch()`, which triggers INSERT+COMMIT for every resolved IP. A GET request should never produce database writes/commits. Users do not expect loading a map page to mutate the database. +3. **Same pattern exists in other endpoints** — The following GET endpoints also trigger geo cache commits: `/api/dashboard/bans`, `/api/bans/active`, `/api/history`, `/api/history/{ip}`, `/api/geo/lookup/{ip}`. + +### Evidence from `log.log` + +- Log line count: **21,117 lines** for 18 HTTP requests +- `INSERT INTO geo_cache`: **6,000** executions +- `db.commit()`: **10,400** calls (each INSERT + its commit = 2 ops per IP) +- `geo_batch_lookup_start`: reports `total=5200` uncached IPs +- The bans/by-country response is at line **21,086** out of 21,117 — the entire log is essentially one request's geo persist work +- Other requests (`/api/dashboard/status`, `/api/blocklists/schedule`, `/api/config/map-color-thresholds`) interleave with the geo persist loop because they share the same single async DB connection + +--- + +### Task 1: Batch geo cache writes — eliminate per-IP commits ✅ DONE + +**Summary:** Removed `await db.commit()` from `_persist_entry()` and `_persist_neg_entry()`. Added a single `await db.commit()` (wrapped in try/except) at the end of `lookup_batch()` after all chunk processing, and after each `_persist_entry` / `_persist_neg_entry` call in `lookup()`. Reduces commits from ~5,200 to **1** per batch request. + +**File:** `backend/app/services/geo_service.py` + +**What to change:** + +The functions `_persist_entry()` and `_persist_neg_entry()` each call `await db.commit()` after every INSERT. Instead, the commit should happen once after the entire batch is processed. + +1. Remove `await db.commit()` from both `_persist_entry()` and `_persist_neg_entry()`. +2. In `lookup_batch()`, after the loop over all chunks is complete and all `_persist_entry()` / `_persist_neg_entry()` calls have been made, issue a single `await db.commit()` if `db is not None`. +3. Wrap the single commit in a try/except to handle any errors gracefully. + +**Expected impact:** Reduces commits from ~5,200 to **1** per request. This alone should cut the endpoint response time dramatically. + +**Testing:** Existing tests in `test_services/test_ban_service.py` and `test_services/test_geo_service.py` should continue to pass. Verify the geo_cache table still gets populated after a batch lookup by checking the DB contents in an integration test. + +--- + +### Task 2: Do not write geo cache during GET requests ✅ DONE + +**Summary:** Removed `db` dependency injection from `GET /api/dashboard/bans` and `GET /api/dashboard/bans/by-country` in `dashboard.py`. Both now pass `app_db=None` to their respective service calls. The other GET endpoints (`/api/bans/active`, `/api/history`, `/api/history/{ip}`, `/api/geo/lookup/{ip}`) already did not pass `db` to geo lookups — confirmed correct. + +**Files:** `backend/app/routers/dashboard.py`, `backend/app/routers/bans.py`, `backend/app/routers/history.py`, `backend/app/routers/geo.py` + +**What to change:** + +GET endpoints should not pass `app_db` (or equivalent) to geo_service functions. The geo resolution should still populate the in-memory cache (which is fast, free, and ephemeral), but should NOT write to SQLite during a read request. + +For each of these GET endpoints: +- `GET /api/dashboard/bans/by-country` in `dashboard.py` — stop passing `app_db=db` to `bans_by_country()`; pass `app_db=None` instead. +- `GET /api/dashboard/bans` in `dashboard.py` — stop passing `app_db=db` to `list_bans()`; pass `app_db=None` instead. +- `GET /api/bans/active` in `bans.py` — the enricher callback should not pass `db` to `geo_service.lookup()`. +- `GET /api/history` and `GET /api/history/{ip}` in `history.py` — same: enricher should not pass `db`. +- `GET /api/geo/lookup/{ip}` in `geo.py` — do not pass `db` to `geo_service.lookup()`. + +The persistent geo cache should only be written during explicit write operations: +- `POST /api/geo/re-resolve` (already a POST — this is correct) +- Blocklist import tasks (`blocklist_service.py`) +- Application startup via `load_cache_from_db()` + +**Expected impact:** GET requests become truly read-only. No commits, no `fsync`, no write contention on the app DB during map loads. + +**Testing:** Run the full test suite. Verify that: +1. The bans/by-country endpoint still returns correct country data (from in-memory cache). +2. The `geo_cache` table is still populated when `POST /api/geo/re-resolve` is called or after blocklist import. +3. After a server restart, geo data is still available (because `load_cache_from_db()` warms memory from previously persisted data). + +--- + +### Task 3: Periodically persist the in-memory geo cache (background task) ✅ DONE + +**Summary:** Added `_dirty: set[str]` to `geo_service.py`. `_store()` now adds IPs with a non-null `country_code` to `_dirty`; `clear_cache()` clears it. Added `flush_dirty(db)` which atomically snapshots/clears `_dirty`, batch-upserts all rows via `executemany()`, commits once, and re-adds entries on failure. Created `backend/app/tasks/geo_cache_flush.py` with a 60-second APScheduler job, registered in `main.py`. + +**Files:** `backend/app/services/geo_service.py`, `backend/app/tasks/` (new task file) + +**What to change:** + +After Task 2, GET requests no longer write to the DB. But newly resolved IPs during GET requests only live in the in-memory cache and would be lost on restart. To avoid this, add a background task that periodically flushes new in-memory entries to the `geo_cache` table. + +1. In `geo_service.py`, add a module-level `_dirty: set[str]` that tracks IPs added to `_cache` but not yet persisted. When `_store()` adds an entry, also add the IP to `_dirty`. +2. Add a new function `flush_dirty(db: aiosqlite.Connection) -> int` that: + - Takes the current `_dirty` set and clears it atomically. + - Uses `executemany()` to batch-INSERT all dirty entries in one transaction. + - Calls `db.commit()` once. + - Returns the count of flushed entries. +3. Register a periodic task (using the existing APScheduler setup) that calls `flush_dirty()` every 60 seconds (configurable). This is similar to how other background tasks already run. + +**Expected impact:** Geo data is persisted without blocking any request. If the server restarts, at most 60 seconds of new geo data is lost (and it will simply be re-fetched from ip-api.com on the next request). + +**Testing:** Write a test that: +- Calls `lookup_batch()` without a DB reference. +- Verifies IPs are in `_dirty`. +- Calls `flush_dirty(db)`. +- Verifies the geo_cache table contains the entries and `_dirty` is empty. + +--- + +### Task 4: Reduce redundant SQL queries per request (settings / auth) + +**Files:** `backend/app/dependencies.py`, `backend/app/main.py`, `backend/app/repositories/settings_repo.py` + +**What to change:** + +The log shows that every single HTTP request executes at least 2 separate SQL queries before reaching the actual endpoint logic: +1. `SELECT value FROM settings WHERE key = 'setup_completed'` (SetupRedirectMiddleware) +2. `SELECT id, token, ... FROM sessions WHERE token = ?` (require_auth dependency) + +When multiple requests arrive concurrently (as seen in the log — 3 parallel requests trigger 3× setup_completed + 3× session token queries), this adds unnecessary DB contention. + +Options (implement one or both): +- **Cache `setup_completed` in memory:** Once setup is complete, it never goes back to incomplete. Cache the result in `app.state` and skip the DB query on subsequent requests. Set it on first `True` read and clear it only if the app restarts. +- **Cache session validation briefly:** Use a short TTL in-memory cache (e.g., 5–10 seconds) for validated session tokens. This reduces repeated DB lookups for the same token across near-simultaneous requests. + +**Expected impact:** Reduces per-request overhead from 2+ SQL queries to 0 for the common case (setup done, session recently validated). + +**Testing:** Existing auth and setup tests must continue to pass. Add a test that validates the cached path (second request skips DB). + +--- + +### Task 5: Audit and verify — run full test suite + +After tasks 1–4 are implemented, run: + +```bash +cd backend && python -m pytest tests/ -x -q +``` + +Verify: +- All tests pass (currently 443). +- `ruff check backend/app/` passes. +- `mypy --strict backend/app/` passes on changed files. +- Manual smoke test: load the world map page and verify it renders quickly with correct country data. diff --git a/backend/app/main.py b/backend/app/main.py index aa97bdf..47e3733 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -34,7 +34,7 @@ from starlette.middleware.base import BaseHTTPMiddleware from app.config import Settings, get_settings from app.db import init_db from app.routers import auth, bans, blocklist, config, dashboard, geo, health, history, jails, server, setup -from app.tasks import blocklist_import, health_check +from app.tasks import blocklist_import, geo_cache_flush, health_check from app.utils.fail2ban_client import Fail2BanConnectionError, Fail2BanProtocolError # --------------------------------------------------------------------------- @@ -151,6 +151,9 @@ async def _lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # --- Blocklist import scheduled task --- blocklist_import.register(app) + # --- Periodic geo cache flush to SQLite --- + geo_cache_flush.register(app) + log.info("bangui_started") try: diff --git a/backend/app/routers/dashboard.py b/backend/app/routers/dashboard.py index 7460d4a..1249b39 100644 --- a/backend/app/routers/dashboard.py +++ b/backend/app/routers/dashboard.py @@ -9,16 +9,14 @@ Also provides ``GET /api/dashboard/bans`` for the dashboard ban-list table. from __future__ import annotations -from typing import TYPE_CHECKING, Annotated - -import aiosqlite +from typing import TYPE_CHECKING if TYPE_CHECKING: import aiohttp -from fastapi import APIRouter, Depends, Query, Request +from fastapi import APIRouter, Query, Request -from app.dependencies import AuthDep, get_db +from app.dependencies import AuthDep from app.models.ban import ( BanOrigin, BansByCountryResponse, @@ -77,7 +75,6 @@ async def get_server_status( async def get_dashboard_bans( request: Request, _auth: AuthDep, - db: Annotated[aiosqlite.Connection, Depends(get_db)], range: TimeRange = Query(default=_DEFAULT_RANGE, description="Time-range preset."), page: int = Query(default=1, ge=1, description="1-based page number."), page_size: int = Query(default=_DEFAULT_PAGE_SIZE, ge=1, le=500, description="Items per page."), @@ -90,12 +87,13 @@ async def get_dashboard_bans( Reads from the fail2ban database and enriches each entry with geolocation data (country, ASN, organisation) from the ip-api.com - free API. Results are sorted newest-first. + free API. Results are sorted newest-first. Geo lookups are served + from the in-memory cache only; no database writes occur during this + GET request. Args: request: The incoming request (used to access ``app.state``). _auth: Validated session dependency. - db: BanGUI application database (for persistent geo cache writes). range: Time-range preset — ``"24h"``, ``"7d"``, ``"30d"``, or ``"365d"``. page: 1-based page number. @@ -115,7 +113,7 @@ async def get_dashboard_bans( page=page, page_size=page_size, http_session=http_session, - app_db=db, + app_db=None, origin=origin, ) @@ -128,7 +126,6 @@ async def get_dashboard_bans( async def get_bans_by_country( request: Request, _auth: AuthDep, - db: Annotated[aiosqlite.Connection, Depends(get_db)], range: TimeRange = Query(default=_DEFAULT_RANGE, description="Time-range preset."), origin: BanOrigin | None = Query( default=None, @@ -139,12 +136,13 @@ async def get_bans_by_country( Uses SQL aggregation (``GROUP BY ip``) and batch geo-resolution to handle 10 000+ banned IPs efficiently. Returns a ``{country_code: count}`` map - and the 200 most recent raw ban rows for the companion access table. + and the 200 most recent raw ban rows for the companion access table. Geo + lookups are served from the in-memory cache only; no database writes occur + during this GET request. Args: request: The incoming request. _auth: Validated session dependency. - db: BanGUI application database (for persistent geo cache writes). range: Time-range preset. origin: Optional filter by ban origin. @@ -159,7 +157,7 @@ async def get_bans_by_country( socket_path, range, http_session=http_session, - app_db=db, + app_db=None, origin=origin, ) diff --git a/backend/app/services/geo_service.py b/backend/app/services/geo_service.py index 50ba24b..6a5f24f 100644 --- a/backend/app/services/geo_service.py +++ b/backend/app/services/geo_service.py @@ -118,6 +118,10 @@ _cache: dict[str, GeoInfo] = {} #: Entries within :data:`_NEG_CACHE_TTL` seconds are not re-queried. _neg_cache: dict[str, float] = {} +#: IPs added to :data:`_cache` but not yet persisted to the database. +#: Consumed and cleared atomically by :func:`flush_dirty`. +_dirty: set[str] = set() + #: Optional MaxMind GeoLite2 reader initialised by :func:`init_geoip`. _geoip_reader: geoip2.database.Reader | None = None @@ -125,10 +129,12 @@ _geoip_reader: geoip2.database.Reader | None = None def clear_cache() -> None: """Flush both the positive and negative lookup caches. - Useful in tests and when the operator suspects stale data. + Also clears the dirty set so any pending-but-unpersisted entries are + discarded. Useful in tests and when the operator suspects stale data. """ _cache.clear() _neg_cache.clear() + _dirty.clear() def clear_neg_cache() -> None: @@ -256,7 +262,6 @@ async def _persist_entry( """, (ip, info.country_code, info.country_name, info.asn, info.org), ) - await db.commit() async def _persist_neg_entry(db: aiosqlite.Connection, ip: str) -> None: @@ -273,7 +278,6 @@ async def _persist_neg_entry(db: aiosqlite.Connection, ip: str) -> None: "INSERT OR IGNORE INTO geo_cache (ip) VALUES (?)", (ip,), ) - await db.commit() # --------------------------------------------------------------------------- @@ -330,6 +334,7 @@ async def lookup( if result.country_code is not None and db is not None: try: await _persist_entry(db, ip, result) + await db.commit() except Exception as exc: # noqa: BLE001 log.warning("geo_persist_failed", ip=ip, error=str(exc)) log.debug("geo_lookup_success", ip=ip, country=result.country_code, asn=result.asn) @@ -350,6 +355,7 @@ async def lookup( if fallback.country_code is not None and db is not None: try: await _persist_entry(db, ip, fallback) + await db.commit() except Exception as exc: # noqa: BLE001 log.warning("geo_persist_failed", ip=ip, error=str(exc)) log.debug("geo_geoip_fallback_success", ip=ip, country=fallback.country_code) @@ -360,6 +366,7 @@ async def lookup( if db is not None: try: await _persist_neg_entry(db, ip) + await db.commit() except Exception as exc: # noqa: BLE001 log.warning("geo_persist_neg_failed", ip=ip, error=str(exc)) @@ -449,6 +456,12 @@ async def lookup_batch( except Exception as exc: # noqa: BLE001 log.warning("geo_persist_neg_failed", ip=ip, error=str(exc)) + if db is not None: + try: + await db.commit() + except Exception as exc: # noqa: BLE001 + log.warning("geo_batch_commit_failed", error=str(exc)) + log.info( "geo_batch_lookup_complete", requested=len(uncached), @@ -561,11 +574,77 @@ def _str_or_none(value: object) -> str | None: def _store(ip: str, info: GeoInfo) -> None: """Insert *info* into the module-level cache, flushing if over capacity. + When the IP resolved successfully (``country_code is not None``) it is + also added to the :data:`_dirty` set so :func:`flush_dirty` can persist + it to the database on the next scheduled flush. + Args: ip: The IP address key. info: The :class:`GeoInfo` to store. """ if len(_cache) >= _MAX_CACHE_SIZE: _cache.clear() + _dirty.clear() log.info("geo_cache_flushed", reason="capacity") _cache[ip] = info + if info.country_code is not None: + _dirty.add(ip) + + +async def flush_dirty(db: aiosqlite.Connection) -> int: + """Persist all new in-memory geo entries to the ``geo_cache`` table. + + Takes an atomic snapshot of :data:`_dirty`, clears it, then batch-inserts + all entries that are still present in :data:`_cache` using a single + ``executemany`` call and one ``COMMIT``. This is the only place that + writes to the persistent cache during normal operation after startup. + + If the database write fails the entries are re-added to :data:`_dirty` + so they will be retried on the next flush cycle. + + Args: + db: Open :class:`aiosqlite.Connection` to the BanGUI application + database. + + Returns: + The number of rows successfully upserted. + """ + if not _dirty: + return 0 + + # Atomically snapshot and clear in a single-threaded async context. + # No ``await`` between copy and clear ensures no interleaving. + to_flush = _dirty.copy() + _dirty.clear() + + rows = [ + (ip, _cache[ip].country_code, _cache[ip].country_name, _cache[ip].asn, _cache[ip].org) + for ip in to_flush + if ip in _cache + ] + if not rows: + return 0 + + try: + await db.executemany( + """ + INSERT INTO geo_cache (ip, country_code, country_name, asn, org) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(ip) DO UPDATE SET + country_code = excluded.country_code, + country_name = excluded.country_name, + asn = excluded.asn, + org = excluded.org, + cached_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') + """, + rows, + ) + await db.commit() + except Exception as exc: # noqa: BLE001 + log.warning("geo_flush_dirty_failed", error=str(exc)) + # Re-add to dirty so they are retried on the next flush cycle. + _dirty.update(to_flush) + return 0 + + log.info("geo_flush_dirty_complete", count=len(rows)) + return len(rows) diff --git a/backend/app/tasks/geo_cache_flush.py b/backend/app/tasks/geo_cache_flush.py new file mode 100644 index 0000000..b225433 --- /dev/null +++ b/backend/app/tasks/geo_cache_flush.py @@ -0,0 +1,66 @@ +"""Geo cache flush background task. + +Registers an APScheduler job that periodically persists newly resolved IP +geo entries from the in-memory ``_dirty`` set to the ``geo_cache`` table. + +After Task 2 removed geo cache writes from GET requests, newly resolved IPs +are only held in the in-memory cache until this task flushes them. With the +default 60-second interval, at most one minute of new resolution results is +at risk on an unexpected process restart. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import structlog + +from app.services import geo_service + +if TYPE_CHECKING: + from fastapi import FastAPI + +log: structlog.stdlib.BoundLogger = structlog.get_logger() + +#: How often the flush job fires (seconds). Configurable tuning constant. +GEO_FLUSH_INTERVAL: int = 60 + +#: Stable APScheduler job ID — ensures re-registration replaces, not duplicates. +JOB_ID: str = "geo_cache_flush" + + +async def _run_flush(app: Any) -> None: + """Flush the geo service dirty set to the application database. + + Reads shared resources from ``app.state`` and delegates to + :func:`~app.services.geo_service.flush_dirty`. + + Args: + app: The :class:`fastapi.FastAPI` application instance passed via + APScheduler ``kwargs``. + """ + db = app.state.db + count = await geo_service.flush_dirty(db) + if count > 0: + log.debug("geo_cache_flush_ran", flushed=count) + + +def register(app: FastAPI) -> None: + """Add (or replace) the geo cache flush job in the application scheduler. + + Must be called after the scheduler has been started (i.e., inside the + lifespan handler, after ``scheduler.start()``). + + Args: + app: The :class:`fastapi.FastAPI` application instance whose + ``app.state.scheduler`` will receive the job. + """ + app.state.scheduler.add_job( + _run_flush, + trigger="interval", + seconds=GEO_FLUSH_INTERVAL, + kwargs={"app": app}, + id=JOB_ID, + replace_existing=True, + ) + log.info("geo_cache_flush_scheduled", interval_seconds=GEO_FLUSH_INTERVAL) diff --git a/backend/tests/test_services/test_geo_service.py b/backend/tests/test_services/test_geo_service.py index 4487b44..d795350 100644 --- a/backend/tests/test_services/test_geo_service.py +++ b/backend/tests/test_services/test_geo_service.py @@ -356,3 +356,212 @@ class TestGeoipFallback: assert result is not None assert result.country_code is None + + +# --------------------------------------------------------------------------- +# Batch single-commit behaviour (Task 1) +# --------------------------------------------------------------------------- + + +def _make_batch_session(batch_response: list[dict[str, object]]) -> MagicMock: + """Build a mock aiohttp.ClientSession for batch POST calls. + + Args: + batch_response: The list that the mock response's ``json()`` returns. + + Returns: + A :class:`MagicMock` with a ``post`` method wired as an async context. + """ + mock_resp = AsyncMock() + mock_resp.status = 200 + mock_resp.json = AsyncMock(return_value=batch_response) + + mock_ctx = AsyncMock() + mock_ctx.__aenter__ = AsyncMock(return_value=mock_resp) + mock_ctx.__aexit__ = AsyncMock(return_value=False) + + session = MagicMock() + session.post = MagicMock(return_value=mock_ctx) + return session + + +def _make_async_db() -> MagicMock: + """Build a minimal mock :class:`aiosqlite.Connection`. + + Returns: + MagicMock with ``execute``, ``executemany``, and ``commit`` wired as + async coroutines. + """ + db = MagicMock() + db.execute = AsyncMock() + db.executemany = AsyncMock() + db.commit = AsyncMock() + return db + + +class TestLookupBatchSingleCommit: + """lookup_batch() issues exactly one commit per call, not one per IP.""" + + async def test_single_commit_for_multiple_ips(self) -> None: + """A batch of N IPs produces exactly one db.commit(), not N.""" + ips = ["1.1.1.1", "2.2.2.2", "3.3.3.3"] + batch_response = [ + {"query": ip, "status": "success", "countryCode": "DE", "country": "Germany", "as": "AS1", "org": "Org"} + for ip in ips + ] + session = _make_batch_session(batch_response) + db = _make_async_db() + + await geo_service.lookup_batch(ips, session, db=db) # type: ignore[arg-type] + + db.commit.assert_awaited_once() + + async def test_commit_called_even_on_failed_lookups(self) -> None: + """A batch with all-failed lookups still triggers one commit.""" + ips = ["10.0.0.1", "10.0.0.2"] + batch_response = [ + {"query": ip, "status": "fail", "message": "private range"} + for ip in ips + ] + session = _make_batch_session(batch_response) + db = _make_async_db() + + await geo_service.lookup_batch(ips, session, db=db) # type: ignore[arg-type] + + db.commit.assert_awaited_once() + + async def test_no_commit_when_db_is_none(self) -> None: + """When db=None, no commit is attempted.""" + ips = ["1.1.1.1"] + batch_response = [ + {"query": "1.1.1.1", "status": "success", "countryCode": "US", "country": "United States", "as": "AS15169", "org": "Google LLC"}, + ] + session = _make_batch_session(batch_response) + + # Should not raise; without db there is nothing to commit. + result = await geo_service.lookup_batch(ips, session, db=None) + + assert result["1.1.1.1"].country_code == "US" + + async def test_no_commit_for_all_cached_ips(self) -> None: + """When all IPs are already cached, no HTTP call and no commit occur.""" + geo_service._cache["5.5.5.5"] = GeoInfo( # type: ignore[attr-defined] + country_code="FR", country_name="France", asn="AS1", org="ISP" + ) + db = _make_async_db() + session = _make_batch_session([]) + + result = await geo_service.lookup_batch(["5.5.5.5"], session, db=db) # type: ignore[arg-type] + + assert result["5.5.5.5"].country_code == "FR" + db.commit.assert_not_awaited() + session.post.assert_not_called() + + +# --------------------------------------------------------------------------- +# Dirty-set tracking and flush_dirty (Task 3) +# --------------------------------------------------------------------------- + + +class TestDirtySetTracking: + """_store() marks successfully resolved IPs as dirty.""" + + def test_successful_resolution_adds_to_dirty(self) -> None: + """Storing a GeoInfo with a country_code adds the IP to _dirty.""" + info = GeoInfo(country_code="DE", country_name="Germany", asn="AS1", org="ISP") + geo_service._store("1.2.3.4", info) # type: ignore[attr-defined] + + assert "1.2.3.4" in geo_service._dirty # type: ignore[attr-defined] + + def test_null_country_does_not_add_to_dirty(self) -> None: + """Storing a GeoInfo with country_code=None must not pollute _dirty.""" + info = GeoInfo(country_code=None, country_name=None, asn=None, org=None) + geo_service._store("10.0.0.1", info) # type: ignore[attr-defined] + + assert "10.0.0.1" not in geo_service._dirty # type: ignore[attr-defined] + + def test_clear_cache_also_clears_dirty(self) -> None: + """clear_cache() must discard any pending dirty entries.""" + info = GeoInfo(country_code="US", country_name="United States", asn="AS1", org="ISP") + geo_service._store("8.8.8.8", info) # type: ignore[attr-defined] + assert geo_service._dirty # type: ignore[attr-defined] + + geo_service.clear_cache() + + assert not geo_service._dirty # type: ignore[attr-defined] + + async def test_lookup_batch_populates_dirty(self) -> None: + """After lookup_batch() with db=None, resolved IPs appear in _dirty.""" + ips = ["1.1.1.1", "2.2.2.2"] + batch_response = [ + {"query": ip, "status": "success", "countryCode": "JP", "country": "Japan", "as": "AS7500", "org": "IIJ"} + for ip in ips + ] + session = _make_batch_session(batch_response) + + await geo_service.lookup_batch(ips, session, db=None) + + for ip in ips: + assert ip in geo_service._dirty # type: ignore[attr-defined] + + +class TestFlushDirty: + """flush_dirty() persists dirty entries and clears the set.""" + + async def test_flush_writes_and_clears_dirty(self) -> None: + """flush_dirty() inserts all dirty IPs and clears _dirty afterwards.""" + info = GeoInfo(country_code="GB", country_name="United Kingdom", asn="AS2856", org="BT") + geo_service._store("100.0.0.1", info) # type: ignore[attr-defined] + assert "100.0.0.1" in geo_service._dirty # type: ignore[attr-defined] + + db = _make_async_db() + count = await geo_service.flush_dirty(db) + + assert count == 1 + db.executemany.assert_awaited_once() + db.commit.assert_awaited_once() + assert "100.0.0.1" not in geo_service._dirty # type: ignore[attr-defined] + + async def test_flush_returns_zero_when_nothing_dirty(self) -> None: + """flush_dirty() returns 0 and makes no DB calls when _dirty is empty.""" + db = _make_async_db() + count = await geo_service.flush_dirty(db) + + assert count == 0 + db.executemany.assert_not_awaited() + db.commit.assert_not_awaited() + + async def test_flush_re_adds_to_dirty_on_db_error(self) -> None: + """When the DB write fails, entries are re-added to _dirty for retry.""" + info = GeoInfo(country_code="AU", country_name="Australia", asn="AS1", org="ISP") + geo_service._store("200.0.0.1", info) # type: ignore[attr-defined] + + db = _make_async_db() + db.executemany = AsyncMock(side_effect=OSError("disk full")) + + count = await geo_service.flush_dirty(db) + + assert count == 0 + assert "200.0.0.1" in geo_service._dirty # type: ignore[attr-defined] + + async def test_flush_batch_and_lookup_batch_integration(self) -> None: + """lookup_batch() populates _dirty; flush_dirty() then persists them.""" + ips = ["10.1.2.3", "10.1.2.4"] + batch_response = [ + {"query": ip, "status": "success", "countryCode": "CA", "country": "Canada", "as": "AS812", "org": "Bell"} + for ip in ips + ] + session = _make_batch_session(batch_response) + + # Resolve without DB to populate only in-memory cache and _dirty. + await geo_service.lookup_batch(ips, session, db=None) + assert geo_service._dirty == set(ips) # type: ignore[attr-defined] + + # Now flush to the DB. + db = _make_async_db() + count = await geo_service.flush_dirty(db) + + assert count == 2 + assert not geo_service._dirty # type: ignore[attr-defined] + db.commit.assert_awaited_once() +