"""Ban history archive repository. Provides persistence APIs for the BanGUI archival history table in the application database. Supports both offset-based and cursor-based pagination: - **Offset pagination** (legacy): ``get_archived_history(page=2, page_size=100)`` - convenient for small datasets but degrades on large offsets. - **Cursor pagination** (recommended): ``get_archived_history_keyset(page_size=100, last_ban_id=None)`` - constant-time performance regardless of dataset size. """ from __future__ import annotations import datetime from typing import TYPE_CHECKING, Any from app.models.ban import BLOCKLIST_JAIL, BanOrigin from app.utils.fail2ban_db_utils import escape_like if TYPE_CHECKING: import aiosqlite async def archive_ban_event( db: aiosqlite.Connection, jail: str, ip: str, timeofban: int, bancount: int, data: str, action: str = "ban", ) -> bool: """Insert a new archived ban/unban event, ignoring duplicates.""" async with db.execute( """INSERT OR IGNORE INTO history_archive (jail, ip, timeofban, bancount, data, action) VALUES (?, ?, ?, ?, ?, ?)""", (jail, ip, timeofban, bancount, data, action), ) as cursor: inserted = cursor.rowcount == 1 await db.commit() return inserted async def get_max_timeofban(db: aiosqlite.Connection) -> int | None: """Return the latest archived ban timestamp or ``None`` when empty.""" async with db.execute("SELECT MAX(timeofban) FROM history_archive") as cursor: row = await cursor.fetchone() if row is None or row[0] is None: return None return int(row[0]) async def get_archived_history( db: aiosqlite.Connection, since: int | None = None, jail: str | None = None, ip_filter: str | list[str] | None = None, origin: BanOrigin | None = None, action: str | None = None, page: int = 1, page_size: int = 100, ) -> tuple[list[dict[str, Any]], int]: """Return a paginated archived history result set.""" if isinstance(ip_filter, list) and len(ip_filter) == 0: return [], 0 wheres: list[str] = [] params: list[object] = [] if since is not None: wheres.append("timeofban >= ?") params.append(since) if jail is not None: wheres.append("jail = ?") params.append(jail) if ip_filter is not None: if isinstance(ip_filter, list): placeholder = ", ".join("?" for _ in ip_filter) wheres.append(f"ip IN ({placeholder})") params.extend(ip_filter) else: wheres.append("ip LIKE ? ESCAPE '\\'") params.append(f"{escape_like(ip_filter)}%") if origin == "blocklist": wheres.append("jail = ?") params.append(BLOCKLIST_JAIL) elif origin == "selfblock": wheres.append("jail != ?") params.append(BLOCKLIST_JAIL) if action is not None: wheres.append("action = ?") params.append(action) where_sql = "WHERE " + " AND ".join(wheres) if wheres else "" offset = (page - 1) * page_size async with db.execute(f"SELECT COUNT(*) FROM history_archive {where_sql}", params) as cur: row = await cur.fetchone() total = int(row[0]) if row is not None and row[0] is not None else 0 async with db.execute( "SELECT id, jail, ip, timeofban, bancount, data, action " "FROM history_archive " f"{where_sql} " "ORDER BY timeofban DESC LIMIT ? OFFSET ?", [*params, page_size, offset], ) as cur: rows = await cur.fetchall() records = [ { "id": int(r[0]), "jail": str(r[1]), "ip": str(r[2]), "timeofban": int(r[3]), "bancount": int(r[4]), "data": str(r[5]), "action": str(r[6]), } for r in rows ] return records, total async def get_all_archived_history( db: aiosqlite.Connection, since: int | None = None, jail: str | None = None, ip_filter: str | list[str] | None = None, origin: BanOrigin | None = None, action: str | None = None, page_size: int = 1000, max_rows: int = 50_000, last_ban_id: int | None = None, ) -> list[dict[str, Any]]: """Return archived history rows for the given filters, bounded to *max_rows*. Uses keyset pagination internally for constant-time performance regardless of how deep into the result set we go. The caller must provide *last_ban_id* from the previous call to continue pagination; ``None`` starts fresh. Args: page_size: Number of rows to fetch per internal batch (default 1000). max_rows: Hard cap on total rows returned (default 50 000). When reached the function returns even if more rows exist. Pass ``0`` to request zero rows (useful for count-only callers). last_ban_id: Cursor from the previous call. ``None`` for the first call — the result set will start from the newest row. """ if max_rows <= 0: return [] all_rows: list[dict[str, Any]] = [] current_last_ban_id: int | None = last_ban_id while True: batch, has_more = await get_archived_history_keyset( db=db, since=since, jail=jail, ip_filter=ip_filter, origin=origin, action=action, page_size=page_size, last_ban_id=current_last_ban_id, ) if not batch: break all_rows.extend(batch) if len(all_rows) >= max_rows: break if not has_more: break # Use the id of the last row in the batch as the next cursor. # Rows are ordered id DESC, so the last row has the smallest id # seen in this batch and is the correct keyset anchor. last_row = batch[-1] current_last_ban_id = last_row.get("id") if current_last_ban_id is None: # Fallback: determine id from the WHERE clause of the previous query. # If we somehow cannot determine the id, stop to avoid an infinite loop. break return all_rows[:max_rows] async def purge_archived_history(db: aiosqlite.Connection, age_seconds: int) -> int: """Purge archived entries older than *age_seconds*; return rows deleted.""" threshold = int(datetime.datetime.now(datetime.UTC).timestamp()) - age_seconds async with db.execute( "DELETE FROM history_archive WHERE timeofban < ?", (threshold,), ) as cursor: deleted = cursor.rowcount await db.commit() return deleted async def get_archived_history_keyset( db: aiosqlite.Connection, since: int | None = None, jail: str | None = None, ip_filter: str | list[str] | None = None, origin: BanOrigin | None = None, action: str | None = None, page_size: int = 100, last_ban_id: int | None = None, ) -> tuple[list[dict[str, Any]], bool]: """Return cursor-paginated archived history using keyset pagination. Uses keyset pagination (WHERE id < last_id) for constant-time performance regardless of result set size. This is the recommended pagination method for large result sets. Ordering is by timeofban DESC (newest first), with id DESC as tiebreaker for events with identical timestamps. This ensures stable, deterministic pagination. Args: db: Active aiosqlite connection. since: If given, filter to events on or after this Unix timestamp. jail: If given, filter to events for this jail. ip_filter: If given, filter by IP (exact match list or LIKE prefix). origin: If given, filter by ban origin ('blocklist' or 'selfblock'). action: If given, filter to this action type ('ban' or 'unban'). page_size: Number of items per page (max returned is page_size + 1 to detect overflow). last_ban_id: The ID of the last item from the previous page (for cursor). None for the first page. Returns: A 2-tuple ``(records, has_more)`` where: - *records* is a list of up to page_size dicts with ban details - *has_more* is True if there are additional pages beyond this one """ if isinstance(ip_filter, list) and len(ip_filter) == 0: return [], False wheres: list[str] = [] params: list[object] = [] if since is not None: wheres.append("timeofban >= ?") params.append(since) if jail is not None: wheres.append("jail = ?") params.append(jail) if ip_filter is not None: if isinstance(ip_filter, list): placeholder = ", ".join("?" for _ in ip_filter) wheres.append(f"ip IN ({placeholder})") params.extend(ip_filter) else: wheres.append("ip LIKE ? ESCAPE '\\'") params.append(f"{escape_like(ip_filter)}%") if origin == "blocklist": wheres.append("jail = ?") params.append(BLOCKLIST_JAIL) elif origin == "selfblock": wheres.append("jail != ?") params.append(BLOCKLIST_JAIL) if action is not None: wheres.append("action = ?") params.append(action) if last_ban_id is not None: wheres.append("id < ?") params.append(last_ban_id) where_sql = "WHERE " + " AND ".join(wheres) if wheres else "" # Fetch page_size + 1 to detect if there are more pages fetch_limit = page_size + 1 params.append(fetch_limit) async with db.execute( "SELECT id, jail, ip, timeofban, bancount, data, action " "FROM history_archive " f"{where_sql} " "ORDER BY id DESC " "LIMIT ?", # noqa: S608 params, ) as cur: rows_iterable = await cur.fetchall() rows = list(rows_iterable) records = [ { "id": int(r[0]), "jail": str(r[1]), "ip": str(r[2]), "timeofban": int(r[3]), "bancount": int(r[4]), "data": str(r[5]), "action": str(r[6]), } for r in rows[:page_size] ] has_more = len(rows) > page_size return records, has_more