Refactor geo cache persistence into repository + remove raw SQL from tasks/main, update task list
This commit is contained in:
@@ -186,11 +186,7 @@ async def cache_stats(db: aiosqlite.Connection) -> dict[str, int]:
|
||||
Dict with keys ``cache_size``, ``unresolved``, ``neg_cache_size``,
|
||||
and ``dirty_size``.
|
||||
"""
|
||||
async with db.execute(
|
||||
"SELECT COUNT(*) FROM geo_cache WHERE country_code IS NULL"
|
||||
) as cur:
|
||||
row = await cur.fetchone()
|
||||
unresolved: int = int(row[0]) if row else 0
|
||||
unresolved = await geo_cache_repo.count_unresolved(db)
|
||||
|
||||
return {
|
||||
"cache_size": len(_cache),
|
||||
@@ -200,6 +196,12 @@ async def cache_stats(db: aiosqlite.Connection) -> dict[str, int]:
|
||||
}
|
||||
|
||||
|
||||
async def count_unresolved(db: aiosqlite.Connection) -> int:
|
||||
"""Return the number of unresolved entries in the persistent geo cache."""
|
||||
|
||||
return await geo_cache_repo.count_unresolved(db)
|
||||
|
||||
|
||||
async def get_unresolved_ips(db: aiosqlite.Connection) -> list[str]:
|
||||
"""Return geo cache IPs where the country code has not yet been resolved.
|
||||
|
||||
@@ -282,21 +284,18 @@ async def load_cache_from_db(db: aiosqlite.Connection) -> None:
|
||||
database (not the fail2ban database).
|
||||
"""
|
||||
count = 0
|
||||
async with db.execute(
|
||||
"SELECT ip, country_code, country_name, asn, org FROM geo_cache"
|
||||
) as cur:
|
||||
async for row in cur:
|
||||
ip: str = str(row[0])
|
||||
country_code: str | None = row[1]
|
||||
if country_code is None:
|
||||
continue
|
||||
_cache[ip] = GeoInfo(
|
||||
country_code=country_code,
|
||||
country_name=row[2],
|
||||
asn=row[3],
|
||||
org=row[4],
|
||||
)
|
||||
count += 1
|
||||
for row in await geo_cache_repo.load_all(db):
|
||||
country_code: str | None = row["country_code"]
|
||||
if country_code is None:
|
||||
continue
|
||||
ip: str = row["ip"]
|
||||
_cache[ip] = GeoInfo(
|
||||
country_code=country_code,
|
||||
country_name=row["country_name"],
|
||||
asn=row["asn"],
|
||||
org=row["org"],
|
||||
)
|
||||
count += 1
|
||||
log.info("geo_cache_loaded_from_db", entries=count)
|
||||
|
||||
|
||||
@@ -315,18 +314,13 @@ async def _persist_entry(
|
||||
ip: IP address string.
|
||||
info: Resolved geo data to persist.
|
||||
"""
|
||||
await db.execute(
|
||||
"""
|
||||
INSERT INTO geo_cache (ip, country_code, country_name, asn, org)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(ip) DO UPDATE SET
|
||||
country_code = excluded.country_code,
|
||||
country_name = excluded.country_name,
|
||||
asn = excluded.asn,
|
||||
org = excluded.org,
|
||||
cached_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
|
||||
""",
|
||||
(ip, info.country_code, info.country_name, info.asn, info.org),
|
||||
await geo_cache_repo.upsert_entry(
|
||||
db=db,
|
||||
ip=ip,
|
||||
country_code=info.country_code,
|
||||
country_name=info.country_name,
|
||||
asn=info.asn,
|
||||
org=info.org,
|
||||
)
|
||||
|
||||
|
||||
@@ -340,10 +334,7 @@ async def _persist_neg_entry(db: aiosqlite.Connection, ip: str) -> None:
|
||||
db: BanGUI application database connection.
|
||||
ip: IP address string whose resolution failed.
|
||||
"""
|
||||
await db.execute(
|
||||
"INSERT OR IGNORE INTO geo_cache (ip) VALUES (?)",
|
||||
(ip,),
|
||||
)
|
||||
await geo_cache_repo.upsert_neg_entry(db=db, ip=ip)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -599,19 +590,7 @@ async def lookup_batch(
|
||||
if db is not None:
|
||||
if pos_rows:
|
||||
try:
|
||||
await db.executemany(
|
||||
"""
|
||||
INSERT INTO geo_cache (ip, country_code, country_name, asn, org)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(ip) DO UPDATE SET
|
||||
country_code = excluded.country_code,
|
||||
country_name = excluded.country_name,
|
||||
asn = excluded.asn,
|
||||
org = excluded.org,
|
||||
cached_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
|
||||
""",
|
||||
pos_rows,
|
||||
)
|
||||
await geo_cache_repo.bulk_upsert_entries(db, pos_rows)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning(
|
||||
"geo_batch_persist_failed",
|
||||
@@ -620,10 +599,7 @@ async def lookup_batch(
|
||||
)
|
||||
if neg_ips:
|
||||
try:
|
||||
await db.executemany(
|
||||
"INSERT OR IGNORE INTO geo_cache (ip) VALUES (?)",
|
||||
[(ip,) for ip in neg_ips],
|
||||
)
|
||||
await geo_cache_repo.bulk_upsert_neg_entries(db, neg_ips)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning(
|
||||
"geo_batch_persist_neg_failed",
|
||||
@@ -806,19 +782,7 @@ async def flush_dirty(db: aiosqlite.Connection) -> int:
|
||||
return 0
|
||||
|
||||
try:
|
||||
await db.executemany(
|
||||
"""
|
||||
INSERT INTO geo_cache (ip, country_code, country_name, asn, org)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(ip) DO UPDATE SET
|
||||
country_code = excluded.country_code,
|
||||
country_name = excluded.country_name,
|
||||
asn = excluded.asn,
|
||||
org = excluded.org,
|
||||
cached_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
|
||||
""",
|
||||
rows,
|
||||
)
|
||||
await geo_cache_repo.bulk_upsert_entries(db, rows)
|
||||
await db.commit()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_flush_dirty_failed", error=str(exc))
|
||||
|
||||
Reference in New Issue
Block a user