"""Ban history archive repository. Provides persistence APIs for the BanGUI archival history table in the application database. """ from __future__ import annotations import datetime from typing import TYPE_CHECKING from app.models.ban import BLOCKLIST_JAIL, BanOrigin if TYPE_CHECKING: import aiosqlite async def archive_ban_event( db: aiosqlite.Connection, jail: str, ip: str, timeofban: int, bancount: int, data: str, action: str = "ban", ) -> bool: """Insert a new archived ban/unban event, ignoring duplicates.""" async with db.execute( """INSERT OR IGNORE INTO history_archive (jail, ip, timeofban, bancount, data, action) VALUES (?, ?, ?, ?, ?, ?)""", (jail, ip, timeofban, bancount, data, action), ) as cursor: inserted = cursor.rowcount == 1 await db.commit() return inserted async def get_archived_history( db: aiosqlite.Connection, since: int | None = None, jail: str | None = None, ip_filter: str | None = None, origin: BanOrigin | None = None, action: str | None = None, page: int = 1, page_size: int = 100, ) -> tuple[list[dict], int]: """Return a paginated archived history result set.""" wheres: list[str] = [] params: list[object] = [] if since is not None: wheres.append("timeofban >= ?") params.append(since) if jail is not None: wheres.append("jail = ?") params.append(jail) if ip_filter is not None: wheres.append("ip LIKE ?") params.append(f"{ip_filter}%") if origin == "blocklist": wheres.append("jail = ?") params.append(BLOCKLIST_JAIL) elif origin == "selfblock": wheres.append("jail != ?") params.append(BLOCKLIST_JAIL) if action is not None: wheres.append("action = ?") params.append(action) where_sql = "WHERE " + " AND ".join(wheres) if wheres else "" offset = (page - 1) * page_size async with db.execute(f"SELECT COUNT(*) FROM history_archive {where_sql}", params) as cur: row = await cur.fetchone() total = int(row[0]) if row is not None and row[0] is not None else 0 async with db.execute( "SELECT jail, ip, timeofban, bancount, data, action " "FROM history_archive " f"{where_sql} " "ORDER BY timeofban DESC LIMIT ? OFFSET ?", [*params, page_size, offset], ) as cur: rows = await cur.fetchall() records = [ { "jail": str(r[0]), "ip": str(r[1]), "timeofban": int(r[2]), "bancount": int(r[3]), "data": str(r[4]), "action": str(r[5]), } for r in rows ] return records, total async def get_all_archived_history( db: aiosqlite.Connection, since: int | None = None, jail: str | None = None, ip_filter: str | None = None, origin: BanOrigin | None = None, action: str | None = None, ) -> list[dict]: """Return all archived history rows for the given filters.""" page: int = 1 page_size: int = 500 all_rows: list[dict] = [] while True: rows, total = await get_archived_history( db=db, since=since, jail=jail, ip_filter=ip_filter, origin=origin, action=action, page=page, page_size=page_size, ) all_rows.extend(rows) if len(rows) < page_size: break page += 1 return all_rows async def purge_archived_history(db: aiosqlite.Connection, age_seconds: int) -> int: """Purge archived entries older than *age_seconds*; return rows deleted.""" threshold = int(datetime.datetime.now(datetime.UTC).timestamp()) - age_seconds async with db.execute( "DELETE FROM history_archive WHERE timeofban < ?", (threshold,), ) as cursor: deleted = cursor.rowcount await db.commit() return deleted