Refactor history sync into history_service and update docs/tests

This commit is contained in:
2026-04-14 15:09:58 +02:00
parent 86fa271c40
commit 58bb769a35
7 changed files with 147 additions and 90 deletions

View File

@@ -187,7 +187,7 @@ The business logic layer. Services orchestrate operations, enforce rules, and co
| `config_file_service.py` | Shared utilities for configuration parsing and manipulation: parses config files, validates names/IPs, manages atomic file writes, probes fail2ban socket | | `config_file_service.py` | Shared utilities for configuration parsing and manipulation: parses config files, validates names/IPs, manages atomic file writes, probes fail2ban socket |
| `raw_config_io_service.py` | Low-level file I/O for raw fail2ban config files | | `raw_config_io_service.py` | Low-level file I/O for raw fail2ban config files |
| `log_service.py` | Log preview and regex test operations (extracted from config_service) | | `log_service.py` | Log preview and regex test operations (extracted from config_service) |
| `history_service.py` | Queries the fail2ban database for historical ban records, builds per-IP timelines, computes ban counts and repeat-offender flags | | `history_service.py` | Queries the fail2ban database for historical ban records, builds per-IP timelines, computes ban counts and repeat-offender flags, and syncs new records into BanGUI's archive table |
| `blocklist_service.py` | Downloads blocklists via aiohttp, validates IPs/CIDRs, applies bans through fail2ban or iptables, logs import results | | `blocklist_service.py` | Downloads blocklists via aiohttp, validates IPs/CIDRs, applies bans through fail2ban or iptables, logs import results |
| `geo_service.py` | Resolves IP addresses to country, ASN, and RIR using external APIs or a local database, caches results | | `geo_service.py` | Resolves IP addresses to country, ASN, and RIR using external APIs or a local database, caches results |
| `server_service.py` | Reads and writes fail2ban server-level settings (log level, log target, syslog socket, DB location, purge age) | | `server_service.py` | Reads and writes fail2ban server-level settings (log level, log target, syslog socket, DB location, purge age) |
@@ -233,6 +233,7 @@ APScheduler background jobs that run on a schedule without user interaction.
| `geo_cache_flush.py` | Periodically flushes newly resolved IPs from the in-memory dirty set to the `geo_cache` SQLite table (default: every 60 seconds). GET requests populate only the in-memory cache; this task persists them without blocking any request. | | `geo_cache_flush.py` | Periodically flushes newly resolved IPs from the in-memory dirty set to the `geo_cache` SQLite table (default: every 60 seconds). GET requests populate only the in-memory cache; this task persists them without blocking any request. |
| `geo_re_resolve.py` | Periodically re-resolves stale entries in `geo_cache` to keep geolocation data fresh | | `geo_re_resolve.py` | Periodically re-resolves stale entries in `geo_cache` to keep geolocation data fresh |
| `health_check.py` | Periodically pings the fail2ban socket and updates the cached server status so the frontend always has fresh data | | `health_check.py` | Periodically pings the fail2ban socket and updates the cached server status so the frontend always has fresh data |
| `history_sync.py` | Periodically copies new records from the fail2ban SQLite database into BanGUI's `history_archive` table; delegates the sync algorithm to `history_service.py` |
#### Utils (`app/utils/`) #### Utils (`app/utils/`)

View File

@@ -145,7 +145,7 @@ Framework types in the service layer violate the Dependency Inversion principle,
--- ---
### TASK-05 — Extract business logic out of `tasks/history_sync.py` into a service 🟠 ### TASK-05 — Extract business logic out of `tasks/history_sync.py` into a service
**Where:** **Where:**
`backend/app/tasks/history_sync.py` — lines 1516 import repositories directly; the entire `_run_sync_with_settings()` function contains paging logic, backfill window calculation, and `archive_ban_event` calls that constitute business logic. `backend/app/tasks/history_sync.py` — lines 1516 import repositories directly; the entire `_run_sync_with_settings()` function contains paging logic, backfill window calculation, and `archive_ban_event` calls that constitute business logic.
@@ -371,7 +371,7 @@ Module-level asyncio primitives are a known footgun in Python async codebases. F
--- ---
### TASK-12 — Remove duplicate import in `server_service.py` 🟡 ### TASK-12 — Remove duplicate import in `server_service.py`
**Where:** **Where:**
`backend/app/services/server_service.py` — lines 1718: `backend/app/services/server_service.py` — lines 1718:

View File

@@ -4,8 +4,8 @@ Queries the fail2ban SQLite database for all historical ban records.
Supports filtering by jail, IP, and time range. For per-IP forensics the Supports filtering by jail, IP, and time range. For per-IP forensics the
service provides a full ban timeline with matched log lines and failure counts. service provides a full ban timeline with matched log lines and failure counts.
All database I/O uses aiosqlite in **read-only** mode so BanGUI never All fail2ban database I/O uses aiosqlite in **read-only** mode so BanGUI
modifies or locks the fail2ban database. never modifies or locks the fail2ban database.
""" """
from __future__ import annotations from __future__ import annotations
@@ -28,6 +28,7 @@ from app.models.history import (
IpTimelineEvent, IpTimelineEvent,
) )
from app.repositories import fail2ban_db_repo from app.repositories import fail2ban_db_repo
from app.repositories.history_archive_repo import archive_ban_event
from app.utils.fail2ban_db_utils import get_fail2ban_db_path, parse_data_json, ts_to_iso from app.utils.fail2ban_db_utils import get_fail2ban_db_path, parse_data_json, ts_to_iso
log: structlog.stdlib.BoundLogger = structlog.get_logger() log: structlog.stdlib.BoundLogger = structlog.get_logger()
@@ -53,6 +54,75 @@ def _since_unix(range_: TimeRange) -> int:
return int(datetime.now(tz=UTC).timestamp()) - seconds return int(datetime.now(tz=UTC).timestamp()) - seconds
_HISTORY_SYNC_PAGE_SIZE: int = 500
_HISTORY_SYNC_BACKFILL_WINDOW: int = 648000
async def _get_last_archive_ts(db: aiosqlite.Connection) -> int | None:
"""Return the most recent archived ban timestamp, or ``None`` if empty."""
async with db.execute("SELECT MAX(timeofban) FROM history_archive") as cur:
row = await cur.fetchone()
if row is None or row[0] is None:
return None
return int(row[0])
async def sync_from_fail2ban_db(
db: aiosqlite.Connection,
socket_path: str,
) -> int:
"""Copy new records from the fail2ban DB into the BanGUI archive table.
Args:
db: Application database connection for the archive table.
socket_path: Path to the fail2ban Unix domain socket.
Returns:
Number of fail2ban records scanned and archived.
"""
last_ts = await _get_last_archive_ts(db)
now_ts = int(datetime.now(tz=UTC).timestamp())
if last_ts is None:
last_ts = now_ts - _HISTORY_SYNC_BACKFILL_WINDOW
log.info("history_sync_backfill", window_seconds=_HISTORY_SYNC_BACKFILL_WINDOW)
next_since = last_ts + 1
total_synced = 0
while True:
fail2ban_db_path = await get_fail2ban_db_path(socket_path)
rows, _ = await fail2ban_db_repo.get_history_page(
db_path=fail2ban_db_path,
since=next_since,
page=1,
page_size=_HISTORY_SYNC_PAGE_SIZE,
)
if not rows:
break
for row in rows:
await archive_ban_event(
db=db,
jail=row.jail,
ip=row.ip,
timeofban=row.timeofban,
bancount=row.bancount,
data=row.data,
action="ban",
)
total_synced += len(rows)
next_since = max(row.timeofban for row in rows) + 1
if len(rows) < _HISTORY_SYNC_PAGE_SIZE:
break
log.info("history_sync_completed", synced=total_synced)
return total_synced
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Public API # Public API
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View File

@@ -14,7 +14,6 @@ from typing import cast
import structlog import structlog
from app.exceptions import ServerOperationError
from app.exceptions import ServerOperationError from app.exceptions import ServerOperationError
from app.models.server import ServerSettings, ServerSettingsResponse, ServerSettingsUpdate from app.models.server import ServerSettings, ServerSettingsResponse, ServerSettingsUpdate
from app.utils.fail2ban_client import Fail2BanClient, Fail2BanCommand, Fail2BanResponse from app.utils.fail2ban_client import Fail2BanClient, Fail2BanCommand, Fail2BanResponse

View File

@@ -12,13 +12,10 @@ from typing import TYPE_CHECKING
import structlog import structlog
from app.db import open_db from app.db import open_db
from app.repositories import fail2ban_db_repo from app.services import history_service
from app.repositories.history_archive_repo import archive_ban_event
from app.utils.fail2ban_db_utils import get_fail2ban_db_path
from app.utils.runtime_state import get_effective_settings from app.utils.runtime_state import get_effective_settings
if TYPE_CHECKING: if TYPE_CHECKING:
import aiosqlite
from fastapi import FastAPI from fastapi import FastAPI
from app.config import Settings from app.config import Settings
@@ -35,73 +32,17 @@ HISTORY_SYNC_INTERVAL: int = 300
BACKFILL_WINDOW: int = 648000 BACKFILL_WINDOW: int = 648000
async def _get_db(settings: Settings) -> tuple[aiosqlite.Connection, bool]:
db = await open_db(settings.database_path)
return db, True
async def _get_last_archive_ts(db) -> int | None:
async with db.execute("SELECT MAX(timeofban) FROM history_archive") as cur:
row = await cur.fetchone()
if row is None or row[0] is None:
return None
return int(row[0])
async def _run_sync_with_settings(settings: Settings) -> None: async def _run_sync_with_settings(settings: Settings) -> None:
socket_path: str = settings.fail2ban_socket socket_path: str = settings.fail2ban_socket
db, close_db = await _get_db(settings) db = await open_db(settings.database_path)
try: try:
last_ts = await _get_last_archive_ts(db) synced = await history_service.sync_from_fail2ban_db(db, socket_path)
now_ts = int(datetime.datetime.now(datetime.UTC).timestamp()) log.info("history_sync_complete", synced=synced)
if last_ts is None:
last_ts = now_ts - BACKFILL_WINDOW
log.info("history_sync_backfill", window_seconds=BACKFILL_WINDOW)
per_page = 500
next_since = last_ts + 1
total_synced = 0
while True:
fail2ban_db_path = await get_fail2ban_db_path(socket_path)
rows, total = await fail2ban_db_repo.get_history_page(
db_path=fail2ban_db_path,
since=next_since,
page=1,
page_size=per_page,
)
if not rows:
break
for row in rows:
await archive_ban_event(
db=db,
jail=row.jail,
ip=row.ip,
timeofban=row.timeofban,
bancount=row.bancount,
data=row.data,
action="ban",
)
total_synced += 1
# Continue where we left off by max timeofban + 1.
max_time = max(row.timeofban for row in rows)
next_since = max_time + 1
if len(rows) < per_page:
break
log.info("history_sync_complete", synced=total_synced)
except Exception: except Exception:
log.exception("history_sync_failed") log.exception("history_sync_failed")
finally: finally:
if close_db: await db.close()
await db.close()
async def _run_sync(app: FastAPI) -> None: async def _run_sync(app: FastAPI) -> None:

View File

@@ -378,3 +378,58 @@ class TestGetIpDetail:
assert result.country_name == "United States" assert result.country_name == "United States"
assert result.asn == "AS15169" assert result.asn == "AS15169"
assert result.org == "Google" assert result.org == "Google"
# ---------------------------------------------------------------------------
# sync_from_fail2ban_db tests
# ---------------------------------------------------------------------------
class TestSyncFromFail2BanDb:
async def test_archives_new_records_and_returns_count(self) -> None:
from types import SimpleNamespace
fake_db = AsyncMock()
fake_rows = [
SimpleNamespace(
jail="sshd",
ip="1.2.3.4",
timeofban=1000,
bancount=1,
data="{}",
)
]
with patch(
"app.services.history_service._get_last_archive_ts",
new=AsyncMock(return_value=1000),
), patch(
"app.services.history_service.get_fail2ban_db_path",
new=AsyncMock(return_value="/tmp/fake.sqlite3"),
), patch(
"app.services.history_service.fail2ban_db_repo.get_history_page",
new=AsyncMock(return_value=(fake_rows, 1)),
) as mock_page, patch(
"app.services.history_service.archive_ban_event",
new=AsyncMock(return_value=True),
) as archive_mock:
count = await history_service.sync_from_fail2ban_db(
fake_db, "/tmp/fake.sock"
)
assert count == 1
mock_page.assert_awaited_once_with(
db_path="/tmp/fake.sqlite3",
since=1001,
page=1,
page_size=500,
)
archive_mock.assert_awaited_once_with(
db=fake_db,
jail="sshd",
ip="1.2.3.4",
timeofban=1000,
bancount=1,
data="{}",
action="ban",
)

View File

@@ -31,35 +31,26 @@ class TestHistorySyncTask:
async def test_backfill_window_is_7_5_days(self) -> None: async def test_backfill_window_is_7_5_days(self) -> None:
assert history_sync.BACKFILL_WINDOW == 648000 assert history_sync.BACKFILL_WINDOW == 648000
async def test_sync_uses_strict_since_after_restart(self) -> None: async def test_run_sync_delegates_to_history_service(self) -> None:
fake_app = type("FakeApp", (), {})() fake_app = type("FakeApp", (), {})()
fake_app.state = type("FakeState", (), {})() fake_app.state = type("FakeState", (), {})()
fake_app.state.settings = type("FakeSettings", (), {})() fake_app.state.settings = type("FakeSettings", (), {})()
fake_app.state.settings.fail2ban_socket = "/tmp/fake.sock" fake_app.state.settings.fail2ban_socket = "/tmp/fake.sock"
fake_app.state.settings.database_path = "/tmp/fake.db" fake_app.state.settings.database_path = "/tmp/fake.db"
fake_app.state.db = MagicMock() fake_db = AsyncMock()
fake_app.state.db.close = AsyncMock() fake_db.close = AsyncMock()
async def fake_get_history_page(*, db_path: str, since: int, page: int, page_size: int, **kwargs):
assert since == 1001
return [], 0
async def fake_get_fail2ban_db_path(socket_path: str) -> str:
return "/tmp/fake.sqlite3"
with patch( with patch(
"app.tasks.history_sync.open_db", "app.tasks.history_sync.open_db",
new_callable=AsyncMock, new_callable=AsyncMock,
return_value=fake_app.state.db, return_value=fake_db,
), patch( ), patch(
"app.tasks.history_sync._get_last_archive_ts", "app.tasks.history_sync.history_service.sync_from_fail2ban_db",
new=AsyncMock(return_value=1000), new_callable=AsyncMock,
), patch( return_value=42,
"app.tasks.history_sync.get_fail2ban_db_path", ) as mock_sync:
new=fake_get_fail2ban_db_path,
), patch(
"app.tasks.history_sync.fail2ban_db_repo.get_history_page",
new=fake_get_history_page,
):
await history_sync._run_sync(fake_app) await history_sync._run_sync(fake_app)
mock_sync.assert_awaited_once_with(fake_db, "/tmp/fake.sock")
fake_db.close.assert_awaited_once()