Convert inconsistent modeling style to standardized Pydantic models for all external-facing data structures while maintaining TypedDict compatibility where appropriate for internal layer-private structures. Changes: - Converted IpLookupResult TypedDict to use IpLookupResponse Pydantic model in jail_service.lookup_ip() for consistency with routers - Added GeoCacheEntry Pydantic model for geo cache repository rows - Converted GeoCacheRow TypedDict to use GeoCacheEntry alias - Converted ImportLogRow TypedDict to use ImportLogEntry alias - Updated routers and services to work with Pydantic models - Updated all tests to use Pydantic model field access (attributes) instead of dict subscripting Documentation: - Added 'Model Type Usage by Layer' section to Backend-Development.md - Defines when TypedDict is allowed (internal structures) vs Pydantic (external-facing, cross-boundary data) - Provides clear guidance on modeling conventions per layer Benefits: - Consistent validation and serialization behavior - Better IDE support and type checking - Clearer separation of concerns by layer - Reduced maintenance cost from mixed validation approaches Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
225 lines
6.5 KiB
Python
225 lines
6.5 KiB
Python
"""Repository for the geo cache persistent store.
|
|
|
|
This module provides typed, async helpers for querying and mutating the
|
|
``geo_cache`` table in the BanGUI application database.
|
|
|
|
All functions accept an open :class:`aiosqlite.Connection` and do not manage
|
|
connection lifetimes.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Sequence
|
|
|
|
import aiosqlite
|
|
|
|
from app.models.geo import GeoCacheEntry
|
|
|
|
|
|
# Alias for backward compatibility with protocols
|
|
GeoCacheRow = GeoCacheEntry
|
|
|
|
|
|
async def load_all(db: aiosqlite.Connection) -> list[GeoCacheRow]:
|
|
"""Load all geo cache rows from the database.
|
|
|
|
Args:
|
|
db: Open BanGUI application database connection.
|
|
|
|
Returns:
|
|
List of rows from the ``geo_cache`` table.
|
|
"""
|
|
rows: list[GeoCacheRow] = []
|
|
async with db.execute(
|
|
"SELECT ip, country_code, country_name, asn, org FROM geo_cache"
|
|
) as cur:
|
|
async for row in cur:
|
|
rows.append(
|
|
GeoCacheRow(
|
|
ip=str(row[0]),
|
|
country_code=row[1],
|
|
country_name=row[2],
|
|
asn=row[3],
|
|
org=row[4],
|
|
)
|
|
)
|
|
return rows
|
|
|
|
|
|
async def get_unresolved_ips(db: aiosqlite.Connection) -> list[str]:
|
|
"""Return all IPs in ``geo_cache`` where ``country_code`` is NULL.
|
|
|
|
Args:
|
|
db: Open BanGUI application database connection.
|
|
|
|
Returns:
|
|
List of IPv4/IPv6 strings that need geo resolution.
|
|
"""
|
|
ips: list[str] = []
|
|
async with db.execute(
|
|
"SELECT ip FROM geo_cache WHERE country_code IS NULL"
|
|
) as cur:
|
|
async for row in cur:
|
|
ips.append(str(row[0]))
|
|
return ips
|
|
|
|
|
|
async def count_unresolved(db: aiosqlite.Connection) -> int:
|
|
"""Return the number of unresolved rows (country_code IS NULL)."""
|
|
async with db.execute(
|
|
"SELECT COUNT(*) FROM geo_cache WHERE country_code IS NULL"
|
|
) as cur:
|
|
row = await cur.fetchone()
|
|
return int(row[0]) if row else 0
|
|
|
|
|
|
async def upsert_entry(
|
|
db: aiosqlite.Connection,
|
|
ip: str,
|
|
country_code: str | None,
|
|
country_name: str | None,
|
|
asn: str | None,
|
|
org: str | None,
|
|
) -> None:
|
|
"""Insert or update a resolved geo cache entry."""
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO geo_cache (ip, country_code, country_name, asn, org)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(ip) DO UPDATE SET
|
|
country_code = excluded.country_code,
|
|
country_name = excluded.country_name,
|
|
asn = excluded.asn,
|
|
org = excluded.org,
|
|
cached_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now'),
|
|
last_seen = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
|
|
""",
|
|
(ip, country_code, country_name, asn, org),
|
|
)
|
|
|
|
|
|
async def upsert_entry_and_commit(
|
|
db: aiosqlite.Connection,
|
|
ip: str,
|
|
country_code: str | None,
|
|
country_name: str | None,
|
|
asn: str | None,
|
|
org: str | None,
|
|
) -> None:
|
|
"""Insert or update a resolved geo cache entry and commit."""
|
|
await upsert_entry(db, ip, country_code, country_name, asn, org)
|
|
await db.commit()
|
|
|
|
|
|
async def upsert_neg_entry(db: aiosqlite.Connection, ip: str) -> None:
|
|
"""Record a failed lookup attempt as a negative entry."""
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO geo_cache (ip) VALUES (?)
|
|
ON CONFLICT(ip) DO UPDATE SET
|
|
last_seen = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
|
|
""",
|
|
(ip,),
|
|
)
|
|
|
|
|
|
async def upsert_neg_entry_and_commit(db: aiosqlite.Connection, ip: str) -> None:
|
|
"""Record a failed lookup attempt and commit the transaction."""
|
|
await upsert_neg_entry(db, ip)
|
|
await db.commit()
|
|
|
|
|
|
async def bulk_upsert_entries(
|
|
db: aiosqlite.Connection,
|
|
rows: Sequence[tuple[str, str | None, str | None, str | None, str | None]],
|
|
) -> int:
|
|
"""Bulk insert or update multiple geo cache entries."""
|
|
if not rows:
|
|
return 0
|
|
|
|
await db.executemany(
|
|
"""
|
|
INSERT INTO geo_cache (ip, country_code, country_name, asn, org)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(ip) DO UPDATE SET
|
|
country_code = excluded.country_code,
|
|
country_name = excluded.country_name,
|
|
asn = excluded.asn,
|
|
org = excluded.org,
|
|
cached_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now'),
|
|
last_seen = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
|
|
""",
|
|
rows,
|
|
)
|
|
return len(rows)
|
|
|
|
|
|
async def bulk_upsert_neg_entries(db: aiosqlite.Connection, ips: list[str]) -> int:
|
|
"""Bulk insert negative lookup entries."""
|
|
if not ips:
|
|
return 0
|
|
|
|
await db.executemany(
|
|
"INSERT OR IGNORE INTO geo_cache (ip) VALUES (?)",
|
|
[(ip,) for ip in ips],
|
|
)
|
|
return len(ips)
|
|
|
|
|
|
async def bulk_upsert_entries_and_commit(
|
|
db: aiosqlite.Connection,
|
|
rows: Sequence[tuple[str, str | None, str | None, str | None, str | None]],
|
|
) -> int:
|
|
"""Bulk insert or update multiple geo cache entries and commit."""
|
|
count = await bulk_upsert_entries(db, rows)
|
|
await db.commit()
|
|
return count
|
|
|
|
|
|
async def bulk_upsert_neg_entries_and_commit(db: aiosqlite.Connection, ips: list[str]) -> int:
|
|
"""Bulk insert negative lookup entries and commit."""
|
|
count = await bulk_upsert_neg_entries(db, ips)
|
|
await db.commit()
|
|
return count
|
|
|
|
|
|
async def bulk_upsert_entries_and_neg_entries_and_commit(
|
|
db: aiosqlite.Connection,
|
|
rows: Sequence[tuple[str, str | None, str | None, str | None, str | None]],
|
|
ips: list[str],
|
|
) -> tuple[int, int]:
|
|
"""Persist positive and negative geo cache rows together, then commit."""
|
|
positive_count = 0
|
|
negative_count = 0
|
|
|
|
if rows:
|
|
positive_count = await bulk_upsert_entries(db, rows)
|
|
if ips:
|
|
negative_count = await bulk_upsert_neg_entries(db, ips)
|
|
|
|
if rows or ips:
|
|
await db.commit()
|
|
|
|
return positive_count, negative_count
|
|
|
|
|
|
async def delete_stale_entries(db: aiosqlite.Connection, cutoff_iso: str) -> int:
|
|
"""Delete geo cache entries not referenced since the cutoff timestamp.
|
|
|
|
Args:
|
|
db: Open BanGUI application database connection.
|
|
cutoff_iso: ISO 8601 timestamp (e.g., '2024-01-01T00:00:00Z'). Entries with
|
|
``last_seen`` before this time will be deleted.
|
|
|
|
Returns:
|
|
The number of rows deleted.
|
|
"""
|
|
async with db.execute(
|
|
"DELETE FROM geo_cache WHERE last_seen < ?",
|
|
(cutoff_iso,),
|
|
) as cur:
|
|
return cur.rowcount if cur.rowcount is not None else 0
|