Complete history archive support for dashboard/map data and mark task finished
Add source=archive option for dashboard endpoints and history service; update Docs/Tasks.md; include archive branch for list_bans, bans_by_country, ban_trend, bans_by_jail; tests for archive paths.
This commit is contained in:
109
backend/app/tasks/history_sync.py
Normal file
109
backend/app/tasks/history_sync.py
Normal file
@@ -0,0 +1,109 @@
|
||||
"""History sync background task.
|
||||
|
||||
Periodically copies new records from the fail2ban sqlite database into the
|
||||
BanGUI application archive table to prevent gaps when fail2ban purges old rows.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import structlog
|
||||
|
||||
from app.repositories import fail2ban_db_repo
|
||||
from app.utils.fail2ban_db_utils import get_fail2ban_db_path
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from fastapi import FastAPI
|
||||
|
||||
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
||||
|
||||
#: Stable APScheduler job id.
|
||||
JOB_ID: str = "history_sync"
|
||||
|
||||
#: Interval in seconds between sync runs.
|
||||
HISTORY_SYNC_INTERVAL: int = 300
|
||||
|
||||
#: Backfill window when archive is empty (seconds).
|
||||
BACKFILL_WINDOW: int = 7 * 86400
|
||||
|
||||
|
||||
async def _get_last_archive_ts(db) -> int | None:
|
||||
async with db.execute("SELECT MAX(timeofban) FROM history_archive") as cur:
|
||||
row = await cur.fetchone()
|
||||
if row is None or row[0] is None:
|
||||
return None
|
||||
return int(row[0])
|
||||
|
||||
|
||||
async def _run_sync(app: FastAPI) -> None:
|
||||
db = app.state.db
|
||||
socket_path: str = app.state.settings.fail2ban_socket
|
||||
|
||||
try:
|
||||
last_ts = await _get_last_archive_ts(db)
|
||||
now_ts = int(datetime.datetime.now(datetime.UTC).timestamp())
|
||||
|
||||
if last_ts is None:
|
||||
last_ts = now_ts - BACKFILL_WINDOW
|
||||
log.info("history_sync_backfill", window_seconds=BACKFILL_WINDOW)
|
||||
|
||||
per_page = 500
|
||||
next_since = last_ts
|
||||
total_synced = 0
|
||||
|
||||
while True:
|
||||
fail2ban_db_path = await get_fail2ban_db_path(socket_path)
|
||||
rows, total = await fail2ban_db_repo.get_history_page(
|
||||
db_path=fail2ban_db_path,
|
||||
since=next_since,
|
||||
page=1,
|
||||
page_size=per_page,
|
||||
)
|
||||
|
||||
if not rows:
|
||||
break
|
||||
|
||||
from app.repositories.history_archive_repo import archive_ban_event
|
||||
|
||||
for row in rows:
|
||||
await archive_ban_event(
|
||||
db=db,
|
||||
jail=row.jail,
|
||||
ip=row.ip,
|
||||
timeofban=row.timeofban,
|
||||
bancount=row.bancount,
|
||||
data=row.data,
|
||||
action="ban",
|
||||
)
|
||||
total_synced += 1
|
||||
|
||||
# Continue where we left off by max timeofban + 1.
|
||||
max_time = max(row.timeofban for row in rows)
|
||||
next_since = max_time + 1
|
||||
|
||||
if len(rows) < per_page:
|
||||
break
|
||||
|
||||
log.info("history_sync_complete", synced=total_synced)
|
||||
|
||||
except Exception:
|
||||
log.exception("history_sync_failed")
|
||||
|
||||
|
||||
def register(app: FastAPI) -> None:
|
||||
"""Register the history sync periodic job.
|
||||
|
||||
Should be called after scheduler startup, from the lifespan handler.
|
||||
"""
|
||||
app.state.scheduler.add_job(
|
||||
_run_sync,
|
||||
trigger="interval",
|
||||
seconds=HISTORY_SYNC_INTERVAL,
|
||||
kwargs={"app": app},
|
||||
id=JOB_ID,
|
||||
replace_existing=True,
|
||||
next_run_time=datetime.datetime.now(tz=datetime.UTC),
|
||||
)
|
||||
log.info("history_sync_scheduled", interval_seconds=HISTORY_SYNC_INTERVAL)
|
||||
Reference in New Issue
Block a user