diff --git a/backend/app/repositories/history_archive_repo.py b/backend/app/repositories/history_archive_repo.py new file mode 100644 index 0000000..2434020 --- /dev/null +++ b/backend/app/repositories/history_archive_repo.py @@ -0,0 +1,103 @@ +"""Ban history archive repository. + +Provides persistence APIs for the BanGUI archival history table in the +application database. +""" + +from __future__ import annotations + +import datetime +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import aiosqlite + + +async def archive_ban_event( + db: aiosqlite.Connection, + jail: str, + ip: str, + timeofban: int, + bancount: int, + data: str, + action: str = "ban", +) -> bool: + """Insert a new archived ban/unban event, ignoring duplicates.""" + async with db.execute( + """INSERT OR IGNORE INTO history_archive + (jail, ip, timeofban, bancount, data, action) + VALUES (?, ?, ?, ?, ?, ?)""", + (jail, ip, timeofban, bancount, data, action), + ) as cursor: + inserted = cursor.rowcount == 1 + await db.commit() + return inserted + + +async def get_archived_history( + db: aiosqlite.Connection, + since: int | None = None, + jail: str | None = None, + ip_filter: str | None = None, + action: str | None = None, + page: int = 1, + page_size: int = 100, +) -> tuple[list[dict], int]: + """Return a paginated archived history result set.""" + wheres: list[str] = [] + params: list[object] = [] + + if since is not None: + wheres.append("timeofban >= ?") + params.append(since) + + if jail is not None: + wheres.append("jail = ?") + params.append(jail) + + if ip_filter is not None: + wheres.append("ip LIKE ?") + params.append(f"{ip_filter}%") + + if action is not None: + wheres.append("action = ?") + params.append(action) + + where_sql = "WHERE " + " AND ".join(wheres) if wheres else "" + offset = (page - 1) * page_size + + async with db.execute(f"SELECT COUNT(*) FROM history_archive {where_sql}", params) as cur: + row = await cur.fetchone() + total = int(row[0]) if row is not None and row[0] is not None else 0 + + async with db.execute( + f"SELECT jail, ip, timeofban, bancount, data, action FROM history_archive {where_sql} ORDER BY timeofban DESC LIMIT ? OFFSET ?", + [*params, page_size, offset], + ) as cur: + rows = await cur.fetchall() + + records = [ + { + "jail": str(r[0]), + "ip": str(r[1]), + "timeofban": int(r[2]), + "bancount": int(r[3]), + "data": str(r[4]), + "action": str(r[5]), + } + for r in rows + ] + + return records, total + + +async def purge_archived_history(db: aiosqlite.Connection, age_seconds: int) -> int: + """Purge archived entries older than *age_seconds*; return rows deleted.""" + threshold = int(datetime.datetime.now(datetime.UTC).timestamp()) - age_seconds + async with db.execute( + "DELETE FROM history_archive WHERE timeofban < ?", + (threshold,), + ) as cursor: + deleted = cursor.rowcount + await db.commit() + return deleted diff --git a/backend/tests/test_repositories/test_history_archive_repo.py b/backend/tests/test_repositories/test_history_archive_repo.py new file mode 100644 index 0000000..c10997f --- /dev/null +++ b/backend/tests/test_repositories/test_history_archive_repo.py @@ -0,0 +1,60 @@ +"""Tests for history_archive_repo.""" + +from __future__ import annotations + +import time +from pathlib import Path + +import aiosqlite +import pytest + +from app.db import init_db +from app.repositories.history_archive_repo import archive_ban_event, get_archived_history, purge_archived_history + + +@pytest.fixture +async def app_db(tmp_path: Path) -> str: + path = str(tmp_path / "app.db") + async with aiosqlite.connect(path) as db: + db.row_factory = aiosqlite.Row + await init_db(db) + return path + + +@pytest.mark.asyncio +async def test_archive_ban_event_deduplication(app_db: str) -> None: + async with aiosqlite.connect(app_db) as db: + # first insert should add + inserted = await archive_ban_event(db, "sshd", "1.1.1.1", 1000, 1, "{}", "ban") + assert inserted + + # duplicate event is ignored + inserted = await archive_ban_event(db, "sshd", "1.1.1.1", 1000, 1, "{}", "ban") + assert not inserted + + +@pytest.mark.asyncio +async def test_get_archived_history_filtering_and_pagination(app_db: str) -> None: + async with aiosqlite.connect(app_db) as db: + await archive_ban_event(db, "sshd", "1.1.1.1", 1000, 1, "{}", "ban") + await archive_ban_event(db, "nginx", "2.2.2.2", 2000, 1, "{}", "ban") + + rows, total = await get_archived_history(db, jail="sshd") + assert total == 1 + assert rows[0]["ip"] == "1.1.1.1" + + rows, total = await get_archived_history(db, page=1, page_size=1) + assert total == 2 + assert len(rows) == 1 + + +@pytest.mark.asyncio +async def test_purge_archived_history(app_db: str) -> None: + now = int(time.time()) + async with aiosqlite.connect(app_db) as db: + await archive_ban_event(db, "sshd", "1.1.1.1", now - 3000, 1, "{}", "ban") + await archive_ban_event(db, "sshd", "1.1.1.2", now - 1000, 1, "{}", "ban") + deleted = await purge_archived_history(db, age_seconds=2000) + assert deleted == 1 + rows, total = await get_archived_history(db) + assert total == 1