Files
BanGUI/backend/app/repositories/history_archive_repo.py
Lukas 9f05da2d4d Complete history archive support for dashboard/map data and mark task finished
Add source=archive option for dashboard endpoints and history service; update Docs/Tasks.md; include archive branch for list_bans, bans_by_country, ban_trend, bans_by_jail; tests for archive paths.
2026-03-28 12:39:47 +01:00

149 lines
3.9 KiB
Python

"""Ban history archive repository.
Provides persistence APIs for the BanGUI archival history table in the
application database.
"""
from __future__ import annotations
import datetime
from typing import TYPE_CHECKING
from app.models.ban import BLOCKLIST_JAIL, BanOrigin
if TYPE_CHECKING:
import aiosqlite
async def archive_ban_event(
db: aiosqlite.Connection,
jail: str,
ip: str,
timeofban: int,
bancount: int,
data: str,
action: str = "ban",
) -> bool:
"""Insert a new archived ban/unban event, ignoring duplicates."""
async with db.execute(
"""INSERT OR IGNORE INTO history_archive
(jail, ip, timeofban, bancount, data, action)
VALUES (?, ?, ?, ?, ?, ?)""",
(jail, ip, timeofban, bancount, data, action),
) as cursor:
inserted = cursor.rowcount == 1
await db.commit()
return inserted
async def get_archived_history(
db: aiosqlite.Connection,
since: int | None = None,
jail: str | None = None,
ip_filter: str | None = None,
origin: BanOrigin | None = None,
action: str | None = None,
page: int = 1,
page_size: int = 100,
) -> tuple[list[dict], int]:
"""Return a paginated archived history result set."""
wheres: list[str] = []
params: list[object] = []
if since is not None:
wheres.append("timeofban >= ?")
params.append(since)
if jail is not None:
wheres.append("jail = ?")
params.append(jail)
if ip_filter is not None:
wheres.append("ip LIKE ?")
params.append(f"{ip_filter}%")
if origin == "blocklist":
wheres.append("jail = ?")
params.append(BLOCKLIST_JAIL)
elif origin == "selfblock":
wheres.append("jail != ?")
params.append(BLOCKLIST_JAIL)
if action is not None:
wheres.append("action = ?")
params.append(action)
where_sql = "WHERE " + " AND ".join(wheres) if wheres else ""
offset = (page - 1) * page_size
async with db.execute(f"SELECT COUNT(*) FROM history_archive {where_sql}", params) as cur:
row = await cur.fetchone()
total = int(row[0]) if row is not None and row[0] is not None else 0
async with db.execute(
"SELECT jail, ip, timeofban, bancount, data, action "
"FROM history_archive "
f"{where_sql} "
"ORDER BY timeofban DESC LIMIT ? OFFSET ?",
[*params, page_size, offset],
) as cur:
rows = await cur.fetchall()
records = [
{
"jail": str(r[0]),
"ip": str(r[1]),
"timeofban": int(r[2]),
"bancount": int(r[3]),
"data": str(r[4]),
"action": str(r[5]),
}
for r in rows
]
return records, total
async def get_all_archived_history(
db: aiosqlite.Connection,
since: int | None = None,
jail: str | None = None,
ip_filter: str | None = None,
origin: BanOrigin | None = None,
action: str | None = None,
) -> list[dict]:
"""Return all archived history rows for the given filters."""
page: int = 1
page_size: int = 500
all_rows: list[dict] = []
while True:
rows, total = await get_archived_history(
db=db,
since=since,
jail=jail,
ip_filter=ip_filter,
origin=origin,
action=action,
page=page,
page_size=page_size,
)
all_rows.extend(rows)
if len(rows) < page_size:
break
page += 1
return all_rows
async def purge_archived_history(db: aiosqlite.Connection, age_seconds: int) -> int:
"""Purge archived entries older than *age_seconds*; return rows deleted."""
threshold = int(datetime.datetime.now(datetime.UTC).timestamp()) - age_seconds
async with db.execute(
"DELETE FROM history_archive WHERE timeofban < ?",
(threshold,),
) as cursor:
deleted = cursor.rowcount
await db.commit()
return deleted