"""History sync background task. Periodically copies new records from the fail2ban sqlite database into the BanGUI application archive table to prevent gaps when fail2ban purges old rows. """ from __future__ import annotations import datetime from typing import TYPE_CHECKING import structlog from app.repositories import fail2ban_db_repo from app.utils.fail2ban_db_utils import get_fail2ban_db_path if TYPE_CHECKING: # pragma: no cover from fastapi import FastAPI log: structlog.stdlib.BoundLogger = structlog.get_logger() #: Stable APScheduler job id. JOB_ID: str = "history_sync" #: Interval in seconds between sync runs. HISTORY_SYNC_INTERVAL: int = 300 #: Backfill window when archive is empty (seconds). BACKFILL_WINDOW: int = 7 * 86400 async def _get_last_archive_ts(db) -> int | None: async with db.execute("SELECT MAX(timeofban) FROM history_archive") as cur: row = await cur.fetchone() if row is None or row[0] is None: return None return int(row[0]) async def _run_sync(app: FastAPI) -> None: db = app.state.db socket_path: str = app.state.settings.fail2ban_socket try: last_ts = await _get_last_archive_ts(db) now_ts = int(datetime.datetime.now(datetime.UTC).timestamp()) if last_ts is None: last_ts = now_ts - BACKFILL_WINDOW log.info("history_sync_backfill", window_seconds=BACKFILL_WINDOW) per_page = 500 next_since = last_ts total_synced = 0 while True: fail2ban_db_path = await get_fail2ban_db_path(socket_path) rows, total = await fail2ban_db_repo.get_history_page( db_path=fail2ban_db_path, since=next_since, page=1, page_size=per_page, ) if not rows: break from app.repositories.history_archive_repo import archive_ban_event for row in rows: await archive_ban_event( db=db, jail=row.jail, ip=row.ip, timeofban=row.timeofban, bancount=row.bancount, data=row.data, action="ban", ) total_synced += 1 # Continue where we left off by max timeofban + 1. max_time = max(row.timeofban for row in rows) next_since = max_time + 1 if len(rows) < per_page: break log.info("history_sync_complete", synced=total_synced) except Exception: log.exception("history_sync_failed") def register(app: FastAPI) -> None: """Register the history sync periodic job. Should be called after scheduler startup, from the lifespan handler. """ app.state.scheduler.add_job( _run_sync, trigger="interval", seconds=HISTORY_SYNC_INTERVAL, kwargs={"app": app}, id=JOB_ID, replace_existing=True, next_run_time=datetime.datetime.now(tz=datetime.UTC), ) log.info("history_sync_scheduled", interval_seconds=HISTORY_SYNC_INTERVAL)