Files
BanGUI/backend/app/tasks/history_sync.py

127 lines
3.6 KiB
Python

"""History sync background task.
Periodically copies new records from the fail2ban sqlite database into the
BanGUI application archive table to prevent gaps when fail2ban purges old rows.
"""
from __future__ import annotations
import datetime
from typing import TYPE_CHECKING
import structlog
from app.db import open_db
from app.repositories import fail2ban_db_repo
from app.repositories.history_archive_repo import archive_ban_event
from app.utils.fail2ban_db_utils import get_fail2ban_db_path
from app.utils.runtime_state import get_effective_settings
if TYPE_CHECKING:
import aiosqlite
from fastapi import FastAPI
from app.config import Settings
log: structlog.stdlib.BoundLogger = structlog.get_logger()
#: Stable APScheduler job id.
JOB_ID: str = "history_sync"
#: Interval in seconds between sync runs.
HISTORY_SYNC_INTERVAL: int = 300
#: Backfill window when archive is empty (seconds).
BACKFILL_WINDOW: int = 648000
async def _get_db(settings: Settings) -> tuple[aiosqlite.Connection, bool]:
db = await open_db(settings.database_path)
return db, True
async def _get_last_archive_ts(db) -> int | None:
async with db.execute("SELECT MAX(timeofban) FROM history_archive") as cur:
row = await cur.fetchone()
if row is None or row[0] is None:
return None
return int(row[0])
async def _run_sync_with_settings(settings: Settings) -> None:
socket_path: str = settings.fail2ban_socket
db, close_db = await _get_db(settings)
try:
last_ts = await _get_last_archive_ts(db)
now_ts = int(datetime.datetime.now(datetime.UTC).timestamp())
if last_ts is None:
last_ts = now_ts - BACKFILL_WINDOW
log.info("history_sync_backfill", window_seconds=BACKFILL_WINDOW)
per_page = 500
next_since = last_ts + 1
total_synced = 0
while True:
fail2ban_db_path = await get_fail2ban_db_path(socket_path)
rows, total = await fail2ban_db_repo.get_history_page(
db_path=fail2ban_db_path,
since=next_since,
page=1,
page_size=per_page,
)
if not rows:
break
for row in rows:
await archive_ban_event(
db=db,
jail=row.jail,
ip=row.ip,
timeofban=row.timeofban,
bancount=row.bancount,
data=row.data,
action="ban",
)
total_synced += 1
# Continue where we left off by max timeofban + 1.
max_time = max(row.timeofban for row in rows)
next_since = max_time + 1
if len(rows) < per_page:
break
log.info("history_sync_complete", synced=total_synced)
except Exception:
log.exception("history_sync_failed")
finally:
if close_db:
await db.close()
async def _run_sync(app: FastAPI) -> None:
await _run_sync_with_settings(get_effective_settings(app))
def register(app: FastAPI) -> None:
"""Register the history sync periodic job.
Should be called after scheduler startup, from the lifespan handler.
"""
settings = get_effective_settings(app)
app.state.scheduler.add_job(
_run_sync_with_settings,
trigger="interval",
seconds=HISTORY_SYNC_INTERVAL,
kwargs={"settings": settings},
id=JOB_ID,
replace_existing=True,
next_run_time=datetime.datetime.now(tz=datetime.UTC),
)
log.info("history_sync_scheduled", interval_seconds=HISTORY_SYNC_INTERVAL)