104 lines
2.8 KiB
Python
104 lines
2.8 KiB
Python
"""Ban history archive repository.
|
|
|
|
Provides persistence APIs for the BanGUI archival history table in the
|
|
application database.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
import aiosqlite
|
|
|
|
|
|
async def archive_ban_event(
|
|
db: aiosqlite.Connection,
|
|
jail: str,
|
|
ip: str,
|
|
timeofban: int,
|
|
bancount: int,
|
|
data: str,
|
|
action: str = "ban",
|
|
) -> bool:
|
|
"""Insert a new archived ban/unban event, ignoring duplicates."""
|
|
async with db.execute(
|
|
"""INSERT OR IGNORE INTO history_archive
|
|
(jail, ip, timeofban, bancount, data, action)
|
|
VALUES (?, ?, ?, ?, ?, ?)""",
|
|
(jail, ip, timeofban, bancount, data, action),
|
|
) as cursor:
|
|
inserted = cursor.rowcount == 1
|
|
await db.commit()
|
|
return inserted
|
|
|
|
|
|
async def get_archived_history(
|
|
db: aiosqlite.Connection,
|
|
since: int | None = None,
|
|
jail: str | None = None,
|
|
ip_filter: str | None = None,
|
|
action: str | None = None,
|
|
page: int = 1,
|
|
page_size: int = 100,
|
|
) -> tuple[list[dict], int]:
|
|
"""Return a paginated archived history result set."""
|
|
wheres: list[str] = []
|
|
params: list[object] = []
|
|
|
|
if since is not None:
|
|
wheres.append("timeofban >= ?")
|
|
params.append(since)
|
|
|
|
if jail is not None:
|
|
wheres.append("jail = ?")
|
|
params.append(jail)
|
|
|
|
if ip_filter is not None:
|
|
wheres.append("ip LIKE ?")
|
|
params.append(f"{ip_filter}%")
|
|
|
|
if action is not None:
|
|
wheres.append("action = ?")
|
|
params.append(action)
|
|
|
|
where_sql = "WHERE " + " AND ".join(wheres) if wheres else ""
|
|
offset = (page - 1) * page_size
|
|
|
|
async with db.execute(f"SELECT COUNT(*) FROM history_archive {where_sql}", params) as cur:
|
|
row = await cur.fetchone()
|
|
total = int(row[0]) if row is not None and row[0] is not None else 0
|
|
|
|
async with db.execute(
|
|
f"SELECT jail, ip, timeofban, bancount, data, action FROM history_archive {where_sql} ORDER BY timeofban DESC LIMIT ? OFFSET ?",
|
|
[*params, page_size, offset],
|
|
) as cur:
|
|
rows = await cur.fetchall()
|
|
|
|
records = [
|
|
{
|
|
"jail": str(r[0]),
|
|
"ip": str(r[1]),
|
|
"timeofban": int(r[2]),
|
|
"bancount": int(r[3]),
|
|
"data": str(r[4]),
|
|
"action": str(r[5]),
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
return records, total
|
|
|
|
|
|
async def purge_archived_history(db: aiosqlite.Connection, age_seconds: int) -> int:
|
|
"""Purge archived entries older than *age_seconds*; return rows deleted."""
|
|
threshold = int(datetime.datetime.now(datetime.UTC).timestamp()) - age_seconds
|
|
async with db.execute(
|
|
"DELETE FROM history_archive WHERE timeofban < ?",
|
|
(threshold,),
|
|
) as cursor:
|
|
deleted = cursor.rowcount
|
|
await db.commit()
|
|
return deleted
|