From 58bb769a354a9d3920ae0f043f483f5053e96db9 Mon Sep 17 00:00:00 2001 From: Lukas Date: Tue, 14 Apr 2026 15:09:58 +0200 Subject: [PATCH] Refactor history sync into history_service and update docs/tests --- Docs/Architekture.md | 3 +- Docs/Tasks.md | 4 +- backend/app/services/history_service.py | 74 ++++++++++++++++++- backend/app/services/server_service.py | 1 - backend/app/tasks/history_sync.py | 69 ++--------------- .../test_services/test_history_service.py | 55 ++++++++++++++ backend/tests/test_tasks/test_history_sync.py | 31 +++----- 7 files changed, 147 insertions(+), 90 deletions(-) diff --git a/Docs/Architekture.md b/Docs/Architekture.md index c121465..6701e89 100644 --- a/Docs/Architekture.md +++ b/Docs/Architekture.md @@ -187,7 +187,7 @@ The business logic layer. Services orchestrate operations, enforce rules, and co | `config_file_service.py` | Shared utilities for configuration parsing and manipulation: parses config files, validates names/IPs, manages atomic file writes, probes fail2ban socket | | `raw_config_io_service.py` | Low-level file I/O for raw fail2ban config files | | `log_service.py` | Log preview and regex test operations (extracted from config_service) | -| `history_service.py` | Queries the fail2ban database for historical ban records, builds per-IP timelines, computes ban counts and repeat-offender flags | +| `history_service.py` | Queries the fail2ban database for historical ban records, builds per-IP timelines, computes ban counts and repeat-offender flags, and syncs new records into BanGUI's archive table | | `blocklist_service.py` | Downloads blocklists via aiohttp, validates IPs/CIDRs, applies bans through fail2ban or iptables, logs import results | | `geo_service.py` | Resolves IP addresses to country, ASN, and RIR using external APIs or a local database, caches results | | `server_service.py` | Reads and writes fail2ban server-level settings (log level, log target, syslog socket, DB location, purge age) | @@ -233,6 +233,7 @@ APScheduler background jobs that run on a schedule without user interaction. | `geo_cache_flush.py` | Periodically flushes newly resolved IPs from the in-memory dirty set to the `geo_cache` SQLite table (default: every 60 seconds). GET requests populate only the in-memory cache; this task persists them without blocking any request. | | `geo_re_resolve.py` | Periodically re-resolves stale entries in `geo_cache` to keep geolocation data fresh | | `health_check.py` | Periodically pings the fail2ban socket and updates the cached server status so the frontend always has fresh data | +| `history_sync.py` | Periodically copies new records from the fail2ban SQLite database into BanGUI's `history_archive` table; delegates the sync algorithm to `history_service.py` | #### Utils (`app/utils/`) diff --git a/Docs/Tasks.md b/Docs/Tasks.md index 330e530..5fe7388 100644 --- a/Docs/Tasks.md +++ b/Docs/Tasks.md @@ -145,7 +145,7 @@ Framework types in the service layer violate the Dependency Inversion principle, --- -### TASK-05 — Extract business logic out of `tasks/history_sync.py` into a service 🟠 +### TASK-05 — Extract business logic out of `tasks/history_sync.py` into a service ✅ **Where:** `backend/app/tasks/history_sync.py` — lines 15–16 import repositories directly; the entire `_run_sync_with_settings()` function contains paging logic, backfill window calculation, and `archive_ban_event` calls that constitute business logic. @@ -371,7 +371,7 @@ Module-level asyncio primitives are a known footgun in Python async codebases. F --- -### TASK-12 — Remove duplicate import in `server_service.py` 🟡 +### TASK-12 — Remove duplicate import in `server_service.py` ✅ **Where:** `backend/app/services/server_service.py` — lines 17–18: diff --git a/backend/app/services/history_service.py b/backend/app/services/history_service.py index 5f38309..c96c4b1 100644 --- a/backend/app/services/history_service.py +++ b/backend/app/services/history_service.py @@ -4,8 +4,8 @@ Queries the fail2ban SQLite database for all historical ban records. Supports filtering by jail, IP, and time range. For per-IP forensics the service provides a full ban timeline with matched log lines and failure counts. -All database I/O uses aiosqlite in **read-only** mode so BanGUI never -modifies or locks the fail2ban database. +All fail2ban database I/O uses aiosqlite in **read-only** mode so BanGUI +never modifies or locks the fail2ban database. """ from __future__ import annotations @@ -28,6 +28,7 @@ from app.models.history import ( IpTimelineEvent, ) from app.repositories import fail2ban_db_repo +from app.repositories.history_archive_repo import archive_ban_event from app.utils.fail2ban_db_utils import get_fail2ban_db_path, parse_data_json, ts_to_iso log: structlog.stdlib.BoundLogger = structlog.get_logger() @@ -53,6 +54,75 @@ def _since_unix(range_: TimeRange) -> int: return int(datetime.now(tz=UTC).timestamp()) - seconds +_HISTORY_SYNC_PAGE_SIZE: int = 500 +_HISTORY_SYNC_BACKFILL_WINDOW: int = 648000 + + +async def _get_last_archive_ts(db: aiosqlite.Connection) -> int | None: + """Return the most recent archived ban timestamp, or ``None`` if empty.""" + async with db.execute("SELECT MAX(timeofban) FROM history_archive") as cur: + row = await cur.fetchone() + if row is None or row[0] is None: + return None + return int(row[0]) + + +async def sync_from_fail2ban_db( + db: aiosqlite.Connection, + socket_path: str, +) -> int: + """Copy new records from the fail2ban DB into the BanGUI archive table. + + Args: + db: Application database connection for the archive table. + socket_path: Path to the fail2ban Unix domain socket. + + Returns: + Number of fail2ban records scanned and archived. + """ + last_ts = await _get_last_archive_ts(db) + now_ts = int(datetime.now(tz=UTC).timestamp()) + + if last_ts is None: + last_ts = now_ts - _HISTORY_SYNC_BACKFILL_WINDOW + log.info("history_sync_backfill", window_seconds=_HISTORY_SYNC_BACKFILL_WINDOW) + + next_since = last_ts + 1 + total_synced = 0 + + while True: + fail2ban_db_path = await get_fail2ban_db_path(socket_path) + rows, _ = await fail2ban_db_repo.get_history_page( + db_path=fail2ban_db_path, + since=next_since, + page=1, + page_size=_HISTORY_SYNC_PAGE_SIZE, + ) + + if not rows: + break + + for row in rows: + await archive_ban_event( + db=db, + jail=row.jail, + ip=row.ip, + timeofban=row.timeofban, + bancount=row.bancount, + data=row.data, + action="ban", + ) + + total_synced += len(rows) + next_since = max(row.timeofban for row in rows) + 1 + + if len(rows) < _HISTORY_SYNC_PAGE_SIZE: + break + + log.info("history_sync_completed", synced=total_synced) + return total_synced + + # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- diff --git a/backend/app/services/server_service.py b/backend/app/services/server_service.py index b8b6117..d9f91c6 100644 --- a/backend/app/services/server_service.py +++ b/backend/app/services/server_service.py @@ -14,7 +14,6 @@ from typing import cast import structlog -from app.exceptions import ServerOperationError from app.exceptions import ServerOperationError from app.models.server import ServerSettings, ServerSettingsResponse, ServerSettingsUpdate from app.utils.fail2ban_client import Fail2BanClient, Fail2BanCommand, Fail2BanResponse diff --git a/backend/app/tasks/history_sync.py b/backend/app/tasks/history_sync.py index 29554f3..32b84c1 100644 --- a/backend/app/tasks/history_sync.py +++ b/backend/app/tasks/history_sync.py @@ -12,13 +12,10 @@ from typing import TYPE_CHECKING import structlog from app.db import open_db -from app.repositories import fail2ban_db_repo -from app.repositories.history_archive_repo import archive_ban_event -from app.utils.fail2ban_db_utils import get_fail2ban_db_path +from app.services import history_service from app.utils.runtime_state import get_effective_settings if TYPE_CHECKING: - import aiosqlite from fastapi import FastAPI from app.config import Settings @@ -35,73 +32,17 @@ HISTORY_SYNC_INTERVAL: int = 300 BACKFILL_WINDOW: int = 648000 -async def _get_db(settings: Settings) -> tuple[aiosqlite.Connection, bool]: - db = await open_db(settings.database_path) - return db, True - - -async def _get_last_archive_ts(db) -> int | None: - async with db.execute("SELECT MAX(timeofban) FROM history_archive") as cur: - row = await cur.fetchone() - if row is None or row[0] is None: - return None - return int(row[0]) - - async def _run_sync_with_settings(settings: Settings) -> None: socket_path: str = settings.fail2ban_socket - db, close_db = await _get_db(settings) + db = await open_db(settings.database_path) try: - last_ts = await _get_last_archive_ts(db) - now_ts = int(datetime.datetime.now(datetime.UTC).timestamp()) - - if last_ts is None: - last_ts = now_ts - BACKFILL_WINDOW - log.info("history_sync_backfill", window_seconds=BACKFILL_WINDOW) - - per_page = 500 - next_since = last_ts + 1 - total_synced = 0 - - while True: - fail2ban_db_path = await get_fail2ban_db_path(socket_path) - rows, total = await fail2ban_db_repo.get_history_page( - db_path=fail2ban_db_path, - since=next_since, - page=1, - page_size=per_page, - ) - - if not rows: - break - - for row in rows: - await archive_ban_event( - db=db, - jail=row.jail, - ip=row.ip, - timeofban=row.timeofban, - bancount=row.bancount, - data=row.data, - action="ban", - ) - total_synced += 1 - - # Continue where we left off by max timeofban + 1. - max_time = max(row.timeofban for row in rows) - next_since = max_time + 1 - - if len(rows) < per_page: - break - - log.info("history_sync_complete", synced=total_synced) - + synced = await history_service.sync_from_fail2ban_db(db, socket_path) + log.info("history_sync_complete", synced=synced) except Exception: log.exception("history_sync_failed") finally: - if close_db: - await db.close() + await db.close() async def _run_sync(app: FastAPI) -> None: diff --git a/backend/tests/test_services/test_history_service.py b/backend/tests/test_services/test_history_service.py index 9b5d0fc..f0571ae 100644 --- a/backend/tests/test_services/test_history_service.py +++ b/backend/tests/test_services/test_history_service.py @@ -378,3 +378,58 @@ class TestGetIpDetail: assert result.country_name == "United States" assert result.asn == "AS15169" assert result.org == "Google" + + +# --------------------------------------------------------------------------- +# sync_from_fail2ban_db tests +# --------------------------------------------------------------------------- + + +class TestSyncFromFail2BanDb: + async def test_archives_new_records_and_returns_count(self) -> None: + from types import SimpleNamespace + + fake_db = AsyncMock() + fake_rows = [ + SimpleNamespace( + jail="sshd", + ip="1.2.3.4", + timeofban=1000, + bancount=1, + data="{}", + ) + ] + + with patch( + "app.services.history_service._get_last_archive_ts", + new=AsyncMock(return_value=1000), + ), patch( + "app.services.history_service.get_fail2ban_db_path", + new=AsyncMock(return_value="/tmp/fake.sqlite3"), + ), patch( + "app.services.history_service.fail2ban_db_repo.get_history_page", + new=AsyncMock(return_value=(fake_rows, 1)), + ) as mock_page, patch( + "app.services.history_service.archive_ban_event", + new=AsyncMock(return_value=True), + ) as archive_mock: + count = await history_service.sync_from_fail2ban_db( + fake_db, "/tmp/fake.sock" + ) + + assert count == 1 + mock_page.assert_awaited_once_with( + db_path="/tmp/fake.sqlite3", + since=1001, + page=1, + page_size=500, + ) + archive_mock.assert_awaited_once_with( + db=fake_db, + jail="sshd", + ip="1.2.3.4", + timeofban=1000, + bancount=1, + data="{}", + action="ban", + ) diff --git a/backend/tests/test_tasks/test_history_sync.py b/backend/tests/test_tasks/test_history_sync.py index 2179774..5c77456 100644 --- a/backend/tests/test_tasks/test_history_sync.py +++ b/backend/tests/test_tasks/test_history_sync.py @@ -31,35 +31,26 @@ class TestHistorySyncTask: async def test_backfill_window_is_7_5_days(self) -> None: assert history_sync.BACKFILL_WINDOW == 648000 - async def test_sync_uses_strict_since_after_restart(self) -> None: + async def test_run_sync_delegates_to_history_service(self) -> None: fake_app = type("FakeApp", (), {})() fake_app.state = type("FakeState", (), {})() fake_app.state.settings = type("FakeSettings", (), {})() fake_app.state.settings.fail2ban_socket = "/tmp/fake.sock" fake_app.state.settings.database_path = "/tmp/fake.db" - fake_app.state.db = MagicMock() - fake_app.state.db.close = AsyncMock() - - async def fake_get_history_page(*, db_path: str, since: int, page: int, page_size: int, **kwargs): - assert since == 1001 - return [], 0 - - async def fake_get_fail2ban_db_path(socket_path: str) -> str: - return "/tmp/fake.sqlite3" + fake_db = AsyncMock() + fake_db.close = AsyncMock() with patch( "app.tasks.history_sync.open_db", new_callable=AsyncMock, - return_value=fake_app.state.db, + return_value=fake_db, ), patch( - "app.tasks.history_sync._get_last_archive_ts", - new=AsyncMock(return_value=1000), - ), patch( - "app.tasks.history_sync.get_fail2ban_db_path", - new=fake_get_fail2ban_db_path, - ), patch( - "app.tasks.history_sync.fail2ban_db_repo.get_history_page", - new=fake_get_history_page, - ): + "app.tasks.history_sync.history_service.sync_from_fail2ban_db", + new_callable=AsyncMock, + return_value=42, + ) as mock_sync: await history_sync._run_sync(fake_app) + + mock_sync.assert_awaited_once_with(fake_db, "/tmp/fake.sock") + fake_db.close.assert_awaited_once()