Files
BanGUI/backend/app/repositories/history_archive_repo.py
Lukas 67b26a3ef7 Refactor pagination with cursor-based support and standardized response format
- Implement cursor-based pagination in pagination.py
- Update response models to standardize pagination structure
- Add cursor pagination utilities for repositories
- Update HistoryArchiveRepository and ImportLogRepository with new pagination
- Add comprehensive tests for cursor pagination
- Update documentation for backend development and task tracking

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-01 17:54:05 +02:00

282 lines
8.5 KiB
Python

"""Ban history archive repository.
Provides persistence APIs for the BanGUI archival history table in the
application database.
Supports both offset-based and cursor-based pagination:
- **Offset pagination** (legacy): ``get_archived_history(page=2, page_size=100)``
- convenient for small datasets but degrades on large offsets.
- **Cursor pagination** (recommended): ``get_archived_history_keyset(page_size=100, last_ban_id=None)``
- constant-time performance regardless of dataset size.
"""
from __future__ import annotations
import datetime
from typing import TYPE_CHECKING, Any
from app.models.ban import BLOCKLIST_JAIL, BanOrigin
from app.utils.fail2ban_db_utils import escape_like
if TYPE_CHECKING:
import aiosqlite
async def archive_ban_event(
db: aiosqlite.Connection,
jail: str,
ip: str,
timeofban: int,
bancount: int,
data: str,
action: str = "ban",
) -> bool:
"""Insert a new archived ban/unban event, ignoring duplicates."""
async with db.execute(
"""INSERT OR IGNORE INTO history_archive
(jail, ip, timeofban, bancount, data, action)
VALUES (?, ?, ?, ?, ?, ?)""",
(jail, ip, timeofban, bancount, data, action),
) as cursor:
inserted = cursor.rowcount == 1
await db.commit()
return inserted
async def get_max_timeofban(db: aiosqlite.Connection) -> int | None:
"""Return the latest archived ban timestamp or ``None`` when empty."""
async with db.execute("SELECT MAX(timeofban) FROM history_archive") as cursor:
row = await cursor.fetchone()
if row is None or row[0] is None:
return None
return int(row[0])
async def get_archived_history(
db: aiosqlite.Connection,
since: int | None = None,
jail: str | None = None,
ip_filter: str | list[str] | None = None,
origin: BanOrigin | None = None,
action: str | None = None,
page: int = 1,
page_size: int = 100,
) -> tuple[list[dict[str, Any]], int]:
"""Return a paginated archived history result set."""
if isinstance(ip_filter, list) and len(ip_filter) == 0:
return [], 0
wheres: list[str] = []
params: list[object] = []
if since is not None:
wheres.append("timeofban >= ?")
params.append(since)
if jail is not None:
wheres.append("jail = ?")
params.append(jail)
if ip_filter is not None:
if isinstance(ip_filter, list):
placeholder = ", ".join("?" for _ in ip_filter)
wheres.append(f"ip IN ({placeholder})")
params.extend(ip_filter)
else:
wheres.append("ip LIKE ? ESCAPE '\\'")
params.append(f"{escape_like(ip_filter)}%")
if origin == "blocklist":
wheres.append("jail = ?")
params.append(BLOCKLIST_JAIL)
elif origin == "selfblock":
wheres.append("jail != ?")
params.append(BLOCKLIST_JAIL)
if action is not None:
wheres.append("action = ?")
params.append(action)
where_sql = "WHERE " + " AND ".join(wheres) if wheres else ""
offset = (page - 1) * page_size
async with db.execute(f"SELECT COUNT(*) FROM history_archive {where_sql}", params) as cur:
row = await cur.fetchone()
total = int(row[0]) if row is not None and row[0] is not None else 0
async with db.execute(
"SELECT jail, ip, timeofban, bancount, data, action "
"FROM history_archive "
f"{where_sql} "
"ORDER BY timeofban DESC LIMIT ? OFFSET ?",
[*params, page_size, offset],
) as cur:
rows = await cur.fetchall()
records = [
{
"jail": str(r[0]),
"ip": str(r[1]),
"timeofban": int(r[2]),
"bancount": int(r[3]),
"data": str(r[4]),
"action": str(r[5]),
}
for r in rows
]
return records, total
async def get_all_archived_history(
db: aiosqlite.Connection,
since: int | None = None,
jail: str | None = None,
ip_filter: str | list[str] | None = None,
origin: BanOrigin | None = None,
action: str | None = None,
) -> list[dict[str, Any]]:
"""Return all archived history rows for the given filters."""
page: int = 1
page_size: int = 500
all_rows: list[dict[str, Any]] = []
while True:
rows, total = await get_archived_history(
db=db,
since=since,
jail=jail,
ip_filter=ip_filter,
origin=origin,
action=action,
page=page,
page_size=page_size,
)
all_rows.extend(rows)
if len(rows) < page_size:
break
page += 1
return all_rows
async def purge_archived_history(db: aiosqlite.Connection, age_seconds: int) -> int:
"""Purge archived entries older than *age_seconds*; return rows deleted."""
threshold = int(datetime.datetime.now(datetime.UTC).timestamp()) - age_seconds
async with db.execute(
"DELETE FROM history_archive WHERE timeofban < ?",
(threshold,),
) as cursor:
deleted = cursor.rowcount
await db.commit()
return deleted
async def get_archived_history_keyset(
db: aiosqlite.Connection,
since: int | None = None,
jail: str | None = None,
ip_filter: str | list[str] | None = None,
origin: BanOrigin | None = None,
action: str | None = None,
page_size: int = 100,
last_ban_id: int | None = None,
) -> tuple[list[dict[str, Any]], bool]:
"""Return cursor-paginated archived history using keyset pagination.
Uses keyset pagination (WHERE id < last_id) for constant-time performance
regardless of result set size. This is the recommended pagination method
for large result sets.
Ordering is by timeofban DESC (newest first), with id DESC as tiebreaker for
events with identical timestamps. This ensures stable, deterministic pagination.
Args:
db: Active aiosqlite connection.
since: If given, filter to events on or after this Unix timestamp.
jail: If given, filter to events for this jail.
ip_filter: If given, filter by IP (exact match list or LIKE prefix).
origin: If given, filter by ban origin ('blocklist' or 'selfblock').
action: If given, filter to this action type ('ban' or 'unban').
page_size: Number of items per page (max returned is page_size + 1 to detect overflow).
last_ban_id: The ID of the last item from the previous page (for cursor).
None for the first page.
Returns:
A 2-tuple ``(records, has_more)`` where:
- *records* is a list of up to page_size dicts with ban details
- *has_more* is True if there are additional pages beyond this one
"""
if isinstance(ip_filter, list) and len(ip_filter) == 0:
return [], False
wheres: list[str] = []
params: list[object] = []
if since is not None:
wheres.append("timeofban >= ?")
params.append(since)
if jail is not None:
wheres.append("jail = ?")
params.append(jail)
if ip_filter is not None:
if isinstance(ip_filter, list):
placeholder = ", ".join("?" for _ in ip_filter)
wheres.append(f"ip IN ({placeholder})")
params.extend(ip_filter)
else:
wheres.append("ip LIKE ? ESCAPE '\\'")
params.append(f"{escape_like(ip_filter)}%")
if origin == "blocklist":
wheres.append("jail = ?")
params.append(BLOCKLIST_JAIL)
elif origin == "selfblock":
wheres.append("jail != ?")
params.append(BLOCKLIST_JAIL)
if action is not None:
wheres.append("action = ?")
params.append(action)
if last_ban_id is not None:
wheres.append("id < ?")
params.append(last_ban_id)
where_sql = "WHERE " + " AND ".join(wheres) if wheres else ""
# Fetch page_size + 1 to detect if there are more pages
fetch_limit = page_size + 1
params.append(fetch_limit)
async with db.execute(
"SELECT id, jail, ip, timeofban, bancount, data, action "
"FROM history_archive "
f"{where_sql} "
"ORDER BY id DESC "
"LIMIT ?", # noqa: S608
params,
) as cur:
rows_iterable = await cur.fetchall()
rows = list(rows_iterable)
records = [
{
"jail": str(r[1]),
"ip": str(r[2]),
"timeofban": int(r[3]),
"bancount": int(r[4]),
"data": str(r[5]),
"action": str(r[6]),
}
for r in rows[:page_size]
]
has_more = len(rows) > page_size
return records, has_more