Add automatic cleanup of stale geolocation cache entries to prevent unbounded database growth. Resolves the issue where unique IP addresses accumulated indefinitely in the geo_cache table, degrading query performance. ## Changes ### Database Schema (Migration 3) - Add 'last_seen' column to geo_cache table tracking last reference time - Existing entries default to current timestamp ### Repository Layer (geo_cache_repo.py) - Update upsert_entry() to set/refresh last_seen on insert/update - Update upsert_neg_entry() to set/refresh last_seen on negative cache hits - Update bulk_upsert_entries() to set/refresh last_seen in batch operations - Add delete_stale_entries(db, cutoff_iso) -> int for purging old entries ### Background Task (geo_cache_cleanup.py) - New APScheduler task that runs nightly (24-hour interval) - Calculates cutoff as 90 days ago from current time (UTC) - Deletes all entries with last_seen older than cutoff - Logs operation results (info when deleted > 0, debug when 0 deleted) - Configurable retention period via GEO_CACHE_RETENTION_DAYS constant ### Application Startup (startup.py) - Register geo_cache_cleanup task in scheduler during app startup - Placed after geo_cache_flush in task registration order ### Tests - Add delete_stale_entries test cases covering: * Removal of old entries beyond cutoff * No deletion when all entries are recent * Empty table edge case - Update existing test fixtures to include last_seen column - Add full test suite for cleanup task registration and execution ### Documentation - Architekture.md: Document cleanup task, update schema/diagram - Backend-Development.md: Add retention policy documentation ## Behavior When an IP is accessed, its last_seen is refreshed. After 90 days of no access, an IP is purged by the nightly cleanup. On next encounter, the IP is re-resolved from MaxMind MMDB or ip-api.com (if configured). This is acceptable because: 1. Stale geolocation data may become inaccurate over time 2. Re-resolution cost is minimal compared to unbounded storage growth 3. Active IPs maintain fresh data through their last_seen updates Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
229 lines
6.6 KiB
Python
229 lines
6.6 KiB
Python
"""Repository for the geo cache persistent store.
|
|
|
|
This module provides typed, async helpers for querying and mutating the
|
|
``geo_cache`` table in the BanGUI application database.
|
|
|
|
All functions accept an open :class:`aiosqlite.Connection` and do not manage
|
|
connection lifetimes.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING, TypedDict
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Sequence
|
|
|
|
import aiosqlite
|
|
|
|
|
|
class GeoCacheRow(TypedDict):
|
|
"""A single row from the ``geo_cache`` table."""
|
|
|
|
ip: str
|
|
country_code: str | None
|
|
country_name: str | None
|
|
asn: str | None
|
|
org: str | None
|
|
|
|
|
|
async def load_all(db: aiosqlite.Connection) -> list[GeoCacheRow]:
|
|
"""Load all geo cache rows from the database.
|
|
|
|
Args:
|
|
db: Open BanGUI application database connection.
|
|
|
|
Returns:
|
|
List of rows from the ``geo_cache`` table.
|
|
"""
|
|
rows: list[GeoCacheRow] = []
|
|
async with db.execute(
|
|
"SELECT ip, country_code, country_name, asn, org FROM geo_cache"
|
|
) as cur:
|
|
async for row in cur:
|
|
rows.append(
|
|
GeoCacheRow(
|
|
ip=str(row[0]),
|
|
country_code=row[1],
|
|
country_name=row[2],
|
|
asn=row[3],
|
|
org=row[4],
|
|
)
|
|
)
|
|
return rows
|
|
|
|
|
|
async def get_unresolved_ips(db: aiosqlite.Connection) -> list[str]:
|
|
"""Return all IPs in ``geo_cache`` where ``country_code`` is NULL.
|
|
|
|
Args:
|
|
db: Open BanGUI application database connection.
|
|
|
|
Returns:
|
|
List of IPv4/IPv6 strings that need geo resolution.
|
|
"""
|
|
ips: list[str] = []
|
|
async with db.execute(
|
|
"SELECT ip FROM geo_cache WHERE country_code IS NULL"
|
|
) as cur:
|
|
async for row in cur:
|
|
ips.append(str(row[0]))
|
|
return ips
|
|
|
|
|
|
async def count_unresolved(db: aiosqlite.Connection) -> int:
|
|
"""Return the number of unresolved rows (country_code IS NULL)."""
|
|
async with db.execute(
|
|
"SELECT COUNT(*) FROM geo_cache WHERE country_code IS NULL"
|
|
) as cur:
|
|
row = await cur.fetchone()
|
|
return int(row[0]) if row else 0
|
|
|
|
|
|
async def upsert_entry(
|
|
db: aiosqlite.Connection,
|
|
ip: str,
|
|
country_code: str | None,
|
|
country_name: str | None,
|
|
asn: str | None,
|
|
org: str | None,
|
|
) -> None:
|
|
"""Insert or update a resolved geo cache entry."""
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO geo_cache (ip, country_code, country_name, asn, org)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(ip) DO UPDATE SET
|
|
country_code = excluded.country_code,
|
|
country_name = excluded.country_name,
|
|
asn = excluded.asn,
|
|
org = excluded.org,
|
|
cached_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now'),
|
|
last_seen = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
|
|
""",
|
|
(ip, country_code, country_name, asn, org),
|
|
)
|
|
|
|
|
|
async def upsert_entry_and_commit(
|
|
db: aiosqlite.Connection,
|
|
ip: str,
|
|
country_code: str | None,
|
|
country_name: str | None,
|
|
asn: str | None,
|
|
org: str | None,
|
|
) -> None:
|
|
"""Insert or update a resolved geo cache entry and commit."""
|
|
await upsert_entry(db, ip, country_code, country_name, asn, org)
|
|
await db.commit()
|
|
|
|
|
|
async def upsert_neg_entry(db: aiosqlite.Connection, ip: str) -> None:
|
|
"""Record a failed lookup attempt as a negative entry."""
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO geo_cache (ip) VALUES (?)
|
|
ON CONFLICT(ip) DO UPDATE SET
|
|
last_seen = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
|
|
""",
|
|
(ip,),
|
|
)
|
|
|
|
|
|
async def upsert_neg_entry_and_commit(db: aiosqlite.Connection, ip: str) -> None:
|
|
"""Record a failed lookup attempt and commit the transaction."""
|
|
await upsert_neg_entry(db, ip)
|
|
await db.commit()
|
|
|
|
|
|
async def bulk_upsert_entries(
|
|
db: aiosqlite.Connection,
|
|
rows: Sequence[tuple[str, str | None, str | None, str | None, str | None]],
|
|
) -> int:
|
|
"""Bulk insert or update multiple geo cache entries."""
|
|
if not rows:
|
|
return 0
|
|
|
|
await db.executemany(
|
|
"""
|
|
INSERT INTO geo_cache (ip, country_code, country_name, asn, org)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(ip) DO UPDATE SET
|
|
country_code = excluded.country_code,
|
|
country_name = excluded.country_name,
|
|
asn = excluded.asn,
|
|
org = excluded.org,
|
|
cached_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now'),
|
|
last_seen = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
|
|
""",
|
|
rows,
|
|
)
|
|
return len(rows)
|
|
|
|
|
|
async def bulk_upsert_neg_entries(db: aiosqlite.Connection, ips: list[str]) -> int:
|
|
"""Bulk insert negative lookup entries."""
|
|
if not ips:
|
|
return 0
|
|
|
|
await db.executemany(
|
|
"INSERT OR IGNORE INTO geo_cache (ip) VALUES (?)",
|
|
[(ip,) for ip in ips],
|
|
)
|
|
return len(ips)
|
|
|
|
|
|
async def bulk_upsert_entries_and_commit(
|
|
db: aiosqlite.Connection,
|
|
rows: Sequence[tuple[str, str | None, str | None, str | None, str | None]],
|
|
) -> int:
|
|
"""Bulk insert or update multiple geo cache entries and commit."""
|
|
count = await bulk_upsert_entries(db, rows)
|
|
await db.commit()
|
|
return count
|
|
|
|
|
|
async def bulk_upsert_neg_entries_and_commit(db: aiosqlite.Connection, ips: list[str]) -> int:
|
|
"""Bulk insert negative lookup entries and commit."""
|
|
count = await bulk_upsert_neg_entries(db, ips)
|
|
await db.commit()
|
|
return count
|
|
|
|
|
|
async def bulk_upsert_entries_and_neg_entries_and_commit(
|
|
db: aiosqlite.Connection,
|
|
rows: Sequence[tuple[str, str | None, str | None, str | None, str | None]],
|
|
ips: list[str],
|
|
) -> tuple[int, int]:
|
|
"""Persist positive and negative geo cache rows together, then commit."""
|
|
positive_count = 0
|
|
negative_count = 0
|
|
|
|
if rows:
|
|
positive_count = await bulk_upsert_entries(db, rows)
|
|
if ips:
|
|
negative_count = await bulk_upsert_neg_entries(db, ips)
|
|
|
|
if rows or ips:
|
|
await db.commit()
|
|
|
|
return positive_count, negative_count
|
|
|
|
|
|
async def delete_stale_entries(db: aiosqlite.Connection, cutoff_iso: str) -> int:
|
|
"""Delete geo cache entries not referenced since the cutoff timestamp.
|
|
|
|
Args:
|
|
db: Open BanGUI application database connection.
|
|
cutoff_iso: ISO 8601 timestamp (e.g., '2024-01-01T00:00:00Z'). Entries with
|
|
``last_seen`` before this time will be deleted.
|
|
|
|
Returns:
|
|
The number of rows deleted.
|
|
"""
|
|
async with db.execute(
|
|
"DELETE FROM geo_cache WHERE last_seen < ?",
|
|
(cutoff_iso,),
|
|
) as cur:
|
|
return cur.rowcount if cur.rowcount is not None else 0
|