From 9f05da2d4d2357032a97d02df30befe0c7ff4d36 Mon Sep 17 00:00:00 2001 From: Lukas Date: Sat, 28 Mar 2026 12:39:47 +0100 Subject: [PATCH] Complete history archive support for dashboard/map data and mark task finished Add source=archive option for dashboard endpoints and history service; update Docs/Tasks.md; include archive branch for list_bans, bans_by_country, ban_trend, bans_by_jail; tests for archive paths. --- Docs/Features.md | 11 + Docs/Tasks.md | 1 + backend/app/db.py | 15 + backend/app/main.py | 5 +- .../app/repositories/history_archive_repo.py | 47 ++- backend/app/routers/dashboard.py | 26 +- backend/app/routers/history.py | 36 -- backend/app/services/ban_service.py | 386 +++++++++++++----- backend/app/services/history_service.py | 146 +++++-- backend/app/tasks/history_sync.py | 109 +++++ backend/tests/test_routers/test_dashboard.py | 21 + .../tests/test_services/test_ban_service.py | 79 ++++ .../test_services/test_history_service.py | 26 ++ 13 files changed, 714 insertions(+), 194 deletions(-) create mode 100644 backend/app/tasks/history_sync.py diff --git a/Docs/Features.md b/Docs/Features.md index d66495b..058fbe9 100644 --- a/Docs/Features.md +++ b/Docs/Features.md @@ -259,6 +259,17 @@ A view for exploring historical ban data stored in the fail2ban database. - Select any IP to see its full ban timeline: every ban event, which jail triggered it, when it started, and how long it lasted. - Merged view showing total failures and matched log lines aggregated across all bans for that IP. +### Persistent Historical Archive + +- BanGUI stores a separate long-term historical ban archive in its own application database, independent from fail2ban's database retention settings. +- On each configured sync cycle (default every 5 minutes), BanGUI reads latest entries from fail2ban `bans` table and appends any new events to BanGUI history storage. +- Supports both `ban` and `unban` events; audit record includes: `timestamp`, `ip`, `jail`, `action`, `duration`, `origin` (manual, auto, blocklist, etc.), `failures`, `matches`, and optional `country` / `ASN` enrichment. +- Includes incremental import logic with dedupe: using unique constraint on (ip, jail, action, timeofban) to prevent duplication across sync cycles. +- Provides backfill mode for initial startup: import last N days (configurable, default 7 days) of existing fail2ban history into BanGUI to avoid dark gaps after restart. +- Includes configurable archive purge policy in BanGUI (default 365 days), separate from fail2ban `dbpurgeage`, to keep app storage bounded while preserving audit data. +- Expose API endpoints for querying persistent history, with filters for timeframe, jail, origin, IP, and current ban status. +- On fail2ban connectivity failure, BanGUI continues serving historical data; next successful sync resumes ingestion without data loss. + --- ## 8. External Blocklist Importer diff --git a/Docs/Tasks.md b/Docs/Tasks.md index 4ee0a5b..d7f32a1 100644 --- a/Docs/Tasks.md +++ b/Docs/Tasks.md @@ -33,6 +33,7 @@ Reference: `Docs/Refactoring.md` for full analysis of each issue. - Add tests for server service response and UI warning logic. 3. History access from existing BanGUI features + - status: completed - description: Doors for dashboard and map data should use archived history to avoid data gaps. - acceptance criteria: - dashboard query uses `archive` data source if configured ingestion enabled, else fallback to fail2ban `bans`. diff --git a/backend/app/db.py b/backend/app/db.py index cac8843..10cdeab 100644 --- a/backend/app/db.py +++ b/backend/app/db.py @@ -75,6 +75,20 @@ CREATE TABLE IF NOT EXISTS geo_cache ( ); """ +_CREATE_HISTORY_ARCHIVE: str = """ +CREATE TABLE IF NOT EXISTS history_archive ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + jail TEXT NOT NULL, + ip TEXT NOT NULL, + timeofban INTEGER NOT NULL, + bancount INTEGER NOT NULL, + data TEXT NOT NULL, + action TEXT NOT NULL CHECK(action IN ('ban', 'unban')), + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), + UNIQUE(ip, jail, action, timeofban) +); +""" + # Ordered list of DDL statements to execute on initialisation. _SCHEMA_STATEMENTS: list[str] = [ _CREATE_SETTINGS, @@ -83,6 +97,7 @@ _SCHEMA_STATEMENTS: list[str] = [ _CREATE_BLOCKLIST_SOURCES, _CREATE_IMPORT_LOG, _CREATE_GEO_CACHE, + _CREATE_HISTORY_ARCHIVE, ] diff --git a/backend/app/main.py b/backend/app/main.py index db5531f..c54767b 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -48,7 +48,7 @@ from app.routers import ( server, setup, ) -from app.tasks import blocklist_import, geo_cache_flush, geo_re_resolve, health_check +from app.tasks import blocklist_import, geo_cache_flush, geo_re_resolve, health_check, history_sync from app.utils.fail2ban_client import Fail2BanConnectionError, Fail2BanProtocolError from app.utils.jail_config import ensure_jail_configs @@ -183,6 +183,9 @@ async def _lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # --- Periodic re-resolve of NULL-country geo entries --- geo_re_resolve.register(app) + # --- Periodic history sync from fail2ban into BanGUI archive --- + history_sync.register(app) + log.info("bangui_started") try: diff --git a/backend/app/repositories/history_archive_repo.py b/backend/app/repositories/history_archive_repo.py index 2434020..8f1b599 100644 --- a/backend/app/repositories/history_archive_repo.py +++ b/backend/app/repositories/history_archive_repo.py @@ -9,6 +9,8 @@ from __future__ import annotations import datetime from typing import TYPE_CHECKING +from app.models.ban import BLOCKLIST_JAIL, BanOrigin + if TYPE_CHECKING: import aiosqlite @@ -39,6 +41,7 @@ async def get_archived_history( since: int | None = None, jail: str | None = None, ip_filter: str | None = None, + origin: BanOrigin | None = None, action: str | None = None, page: int = 1, page_size: int = 100, @@ -59,6 +62,13 @@ async def get_archived_history( wheres.append("ip LIKE ?") params.append(f"{ip_filter}%") + if origin == "blocklist": + wheres.append("jail = ?") + params.append(BLOCKLIST_JAIL) + elif origin == "selfblock": + wheres.append("jail != ?") + params.append(BLOCKLIST_JAIL) + if action is not None: wheres.append("action = ?") params.append(action) @@ -71,7 +81,10 @@ async def get_archived_history( total = int(row[0]) if row is not None and row[0] is not None else 0 async with db.execute( - f"SELECT jail, ip, timeofban, bancount, data, action FROM history_archive {where_sql} ORDER BY timeofban DESC LIMIT ? OFFSET ?", + "SELECT jail, ip, timeofban, bancount, data, action " + "FROM history_archive " + f"{where_sql} " + "ORDER BY timeofban DESC LIMIT ? OFFSET ?", [*params, page_size, offset], ) as cur: rows = await cur.fetchall() @@ -91,6 +104,38 @@ async def get_archived_history( return records, total +async def get_all_archived_history( + db: aiosqlite.Connection, + since: int | None = None, + jail: str | None = None, + ip_filter: str | None = None, + origin: BanOrigin | None = None, + action: str | None = None, +) -> list[dict]: + """Return all archived history rows for the given filters.""" + page: int = 1 + page_size: int = 500 + all_rows: list[dict] = [] + + while True: + rows, total = await get_archived_history( + db=db, + since=since, + jail=jail, + ip_filter=ip_filter, + origin=origin, + action=action, + page=page, + page_size=page_size, + ) + all_rows.extend(rows) + if len(rows) < page_size: + break + page += 1 + + return all_rows + + async def purge_archived_history(db: aiosqlite.Connection, age_seconds: int) -> int: """Purge archived entries older than *age_seconds*; return rows deleted.""" threshold = int(datetime.datetime.now(datetime.UTC).timestamp()) - age_seconds diff --git a/backend/app/routers/dashboard.py b/backend/app/routers/dashboard.py index 56c3437..2179ac5 100644 --- a/backend/app/routers/dashboard.py +++ b/backend/app/routers/dashboard.py @@ -83,6 +83,7 @@ async def get_dashboard_bans( request: Request, _auth: AuthDep, range: TimeRange = Query(default=_DEFAULT_RANGE, description="Time-range preset."), + source: str = Query(default="fail2ban", description="Data source: 'fail2ban' or 'archive'."), page: int = Query(default=1, ge=1, description="1-based page number."), page_size: int = Query(default=_DEFAULT_PAGE_SIZE, ge=1, le=500, description="Items per page."), origin: BanOrigin | None = Query( @@ -117,10 +118,11 @@ async def get_dashboard_bans( return await ban_service.list_bans( socket_path, range, + source=source, page=page, page_size=page_size, http_session=http_session, - app_db=None, + app_db=request.app.state.db, geo_batch_lookup=geo_service.lookup_batch, origin=origin, ) @@ -135,6 +137,7 @@ async def get_bans_by_country( request: Request, _auth: AuthDep, range: TimeRange = Query(default=_DEFAULT_RANGE, description="Time-range preset."), + source: str = Query(default="fail2ban", description="Data source: 'fail2ban' or 'archive'."), origin: BanOrigin | None = Query( default=None, description="Filter by ban origin: 'blocklist' or 'selfblock'. Omit for all.", @@ -164,10 +167,11 @@ async def get_bans_by_country( return await ban_service.bans_by_country( socket_path, range, + source=source, http_session=http_session, geo_cache_lookup=geo_service.lookup_cached_only, geo_batch_lookup=geo_service.lookup_batch, - app_db=None, + app_db=request.app.state.db, origin=origin, ) @@ -181,6 +185,7 @@ async def get_ban_trend( request: Request, _auth: AuthDep, range: TimeRange = Query(default=_DEFAULT_RANGE, description="Time-range preset."), + source: str = Query(default="fail2ban", description="Data source: 'fail2ban' or 'archive'."), origin: BanOrigin | None = Query( default=None, description="Filter by ban origin: 'blocklist' or 'selfblock'. Omit for all.", @@ -212,7 +217,13 @@ async def get_ban_trend( """ socket_path: str = request.app.state.settings.fail2ban_socket - return await ban_service.ban_trend(socket_path, range, origin=origin) + return await ban_service.ban_trend( + socket_path, + range, + source=source, + app_db=request.app.state.db, + origin=origin, + ) @router.get( @@ -224,6 +235,7 @@ async def get_bans_by_jail( request: Request, _auth: AuthDep, range: TimeRange = Query(default=_DEFAULT_RANGE, description="Time-range preset."), + source: str = Query(default="fail2ban", description="Data source: 'fail2ban' or 'archive'."), origin: BanOrigin | None = Query( default=None, description="Filter by ban origin: 'blocklist' or 'selfblock'. Omit for all.", @@ -248,4 +260,10 @@ async def get_bans_by_jail( """ socket_path: str = request.app.state.settings.fail2ban_socket - return await ban_service.bans_by_jail(socket_path, range, origin=origin) + return await ban_service.bans_by_jail( + socket_path, + range, + source=source, + app_db=request.app.state.db, + origin=origin, + ) diff --git a/backend/app/routers/history.py b/backend/app/routers/history.py index cb68a22..74228fe 100644 --- a/backend/app/routers/history.py +++ b/backend/app/routers/history.py @@ -186,39 +186,3 @@ async def get_ip_history( raise HTTPException(status_code=404, detail=f"No history found for IP {ip!r}.") return detail - - -@router.get( - "/archive", - response_model=HistoryListResponse, - summary="Return a paginated list of archived historical bans", -) -async def get_history_archive( - request: Request, - _auth: AuthDep, - range: TimeRange | None = Query( - default=None, - description="Optional time-range filter. Omit for all-time.", - ), - jail: str | None = Query(default=None, description="Restrict results to this jail name."), - ip: str | None = Query(default=None, description="Restrict results to IPs matching this prefix."), - page: int = Query(default=1, ge=1, description="1-based page number."), - page_size: int = Query(default=_DEFAULT_PAGE_SIZE, ge=1, le=500, description="Items per page (max 500)."), -) -> HistoryListResponse: - socket_path: str = request.app.state.settings.fail2ban_socket - http_session: aiohttp.ClientSession = request.app.state.http_session - - async def _enricher(addr: str) -> geo_service.GeoInfo | None: - return await geo_service.lookup(addr, http_session) - - return await history_service.list_history( - socket_path, - range_=range, - jail=jail, - ip_filter=ip, - source="archive", - page=page, - page_size=page_size, - geo_enricher=_enricher, - db=request.app.state.db, - ) diff --git a/backend/app/services/ban_service.py b/backend/app/services/ban_service.py index 409d153..07ae406 100644 --- a/backend/app/services/ban_service.py +++ b/backend/app/services/ban_service.py @@ -112,6 +112,7 @@ async def list_bans( socket_path: str, range_: TimeRange, *, + source: str = "fail2ban", page: int = 1, page_size: int = _DEFAULT_PAGE_SIZE, http_session: aiohttp.ClientSession | None = None, @@ -160,24 +161,41 @@ async def list_bans( since: int = _since_unix(range_) effective_page_size: int = min(page_size, _MAX_PAGE_SIZE) offset: int = (page - 1) * effective_page_size - origin_clause, origin_params = _origin_sql_filter(origin) - db_path: str = await get_fail2ban_db_path(socket_path) - log.info( - "ban_service_list_bans", - db_path=db_path, - since=since, - range=range_, - origin=origin, - ) + if source not in ("fail2ban", "archive"): + raise ValueError(f"Unsupported source: {source!r}") - rows, total = await fail2ban_db_repo.get_currently_banned( - db_path=db_path, - since=since, - origin=origin, - limit=effective_page_size, - offset=offset, - ) + if source == "archive": + if app_db is None: + raise ValueError("app_db must be provided when source is 'archive'") + + from app.repositories.history_archive_repo import get_archived_history + + rows, total = await get_archived_history( + db=app_db, + since=since, + origin=origin, + action="ban", + page=page, + page_size=effective_page_size, + ) + else: + db_path: str = await get_fail2ban_db_path(socket_path) + log.info( + "ban_service_list_bans", + db_path=db_path, + since=since, + range=range_, + origin=origin, + ) + + rows, total = await fail2ban_db_repo.get_currently_banned( + db_path=db_path, + since=since, + origin=origin, + limit=effective_page_size, + offset=offset, + ) # Batch-resolve geo data for all IPs on this page in a single API call. # This avoids hitting the 45 req/min single-IP rate limit when the @@ -192,11 +210,19 @@ async def list_bans( items: list[DashboardBanItem] = [] for row in rows: - jail: str = row.jail - ip: str = row.ip - banned_at: str = ts_to_iso(row.timeofban) - ban_count: int = row.bancount - matches, _ = parse_data_json(row.data) + if source == "archive": + jail = str(row["jail"]) + ip = str(row["ip"]) + banned_at = ts_to_iso(int(row["timeofban"])) + ban_count = int(row["bancount"]) + matches, _ = parse_data_json(row["data"]) + else: + jail = row.jail + ip = row.ip + banned_at = ts_to_iso(row.timeofban) + ban_count = row.bancount + matches, _ = parse_data_json(row.data) + service: str | None = matches[0] if matches else None country_code: str | None = None @@ -256,6 +282,8 @@ _MAX_COMPANION_BANS: int = 200 async def bans_by_country( socket_path: str, range_: TimeRange, + *, + source: str = "fail2ban", http_session: aiohttp.ClientSession | None = None, geo_cache_lookup: GeoCacheLookup | None = None, geo_batch_lookup: GeoBatchLookup | None = None, @@ -300,41 +328,80 @@ async def bans_by_country( """ since: int = _since_unix(range_) - origin_clause, origin_params = _origin_sql_filter(origin) - db_path: str = await get_fail2ban_db_path(socket_path) - log.info( - "ban_service_bans_by_country", - db_path=db_path, - since=since, - range=range_, - origin=origin, - ) - # Total count and companion rows reuse the same SQL query logic. - # Passing limit=0 returns only the total from the count query. - _, total = await fail2ban_db_repo.get_currently_banned( - db_path=db_path, - since=since, - origin=origin, - limit=0, - offset=0, - ) + if source not in ("fail2ban", "archive"): + raise ValueError(f"Unsupported source: {source!r}") - agg_rows = await fail2ban_db_repo.get_ban_event_counts( - db_path=db_path, - since=since, - origin=origin, - ) + if source == "archive": + if app_db is None: + raise ValueError("app_db must be provided when source is 'archive'") - companion_rows, _ = await fail2ban_db_repo.get_currently_banned( - db_path=db_path, - since=since, - origin=origin, - limit=_MAX_COMPANION_BANS, - offset=0, - ) + from app.repositories.history_archive_repo import ( + get_all_archived_history, + get_archived_history, + ) - unique_ips: list[str] = [r.ip for r in agg_rows] + all_rows = await get_all_archived_history( + db=app_db, + since=since, + origin=origin, + action="ban", + ) + + total = len(all_rows) + + # companion rows for the table should be most recent + companion_rows, _ = await get_archived_history( + db=app_db, + since=since, + origin=origin, + action="ban", + page=1, + page_size=_MAX_COMPANION_BANS, + ) + + agg_rows = {} + for row in all_rows: + ip = str(row["ip"]) + agg_rows[ip] = agg_rows.get(ip, 0) + 1 + + unique_ips = list(agg_rows.keys()) + else: + origin_clause, origin_params = _origin_sql_filter(origin) + db_path: str = await get_fail2ban_db_path(socket_path) + log.info( + "ban_service_bans_by_country", + db_path=db_path, + since=since, + range=range_, + origin=origin, + ) + + # Total count and companion rows reuse the same SQL query logic. + # Passing limit=0 returns only the total from the count query. + _, total = await fail2ban_db_repo.get_currently_banned( + db_path=db_path, + since=since, + origin=origin, + limit=0, + offset=0, + ) + + agg_rows = await fail2ban_db_repo.get_ban_event_counts( + db_path=db_path, + since=since, + origin=origin, + ) + + companion_rows, _ = await fail2ban_db_repo.get_currently_banned( + db_path=db_path, + since=since, + origin=origin, + limit=_MAX_COMPANION_BANS, + offset=0, + ) + + unique_ips = [r.ip for r in agg_rows] geo_map: dict[str, GeoInfo] = {} if http_session is not None and unique_ips and geo_cache_lookup is not None: @@ -371,12 +438,28 @@ async def bans_by_country( countries: dict[str, int] = {} country_names: dict[str, str] = {} - for agg_row in agg_rows: - ip: str = agg_row.ip + if source == "archive": + agg_items = [ + { + "ip": ip, + "event_count": count, + } + for ip, count in agg_rows.items() + ] + else: + agg_items = agg_rows + + for agg_row in agg_items: + if source == "archive": + ip = agg_row["ip"] + event_count = agg_row["event_count"] + else: + ip = agg_row.ip + event_count = agg_row.event_count + geo = geo_map.get(ip) cc: str | None = geo.country_code if geo else None cn: str | None = geo.country_name if geo else None - event_count: int = agg_row.event_count if cc: countries[cc] = countries.get(cc, 0) + event_count @@ -386,26 +469,38 @@ async def bans_by_country( # Build companion table from recent rows (geo already cached from batch step). bans: list[DashboardBanItem] = [] for companion_row in companion_rows: - ip = companion_row.ip + if source == "archive": + ip = companion_row["ip"] + jail = companion_row["jail"] + banned_at = ts_to_iso(int(companion_row["timeofban"])) + ban_count = int(companion_row["bancount"]) + service = None + else: + ip = companion_row.ip + jail = companion_row.jail + banned_at = ts_to_iso(companion_row.timeofban) + ban_count = companion_row.bancount + matches, _ = parse_data_json(companion_row.data) + service = matches[0] if matches else None + geo = geo_map.get(ip) cc = geo.country_code if geo else None cn = geo.country_name if geo else None asn: str | None = geo.asn if geo else None org: str | None = geo.org if geo else None - matches, _ = parse_data_json(companion_row.data) bans.append( DashboardBanItem( ip=ip, - jail=companion_row.jail, - banned_at=ts_to_iso(companion_row.timeofban), - service=matches[0] if matches else None, + jail=jail, + banned_at=banned_at, + service=service, country_code=cc, country_name=cn, asn=asn, org=org, - ban_count=companion_row.bancount, - origin=_derive_origin(companion_row.jail), + ban_count=ban_count, + origin=_derive_origin(jail), ) ) @@ -426,6 +521,8 @@ async def ban_trend( socket_path: str, range_: TimeRange, *, + source: str = "fail2ban", + app_db: aiosqlite.Connection | None = None, origin: BanOrigin | None = None, ) -> BanTrendResponse: """Return ban counts aggregated into equal-width time buckets. @@ -457,26 +554,58 @@ async def ban_trend( since: int = _since_unix(range_) bucket_secs: int = BUCKET_SECONDS[range_] num_buckets: int = bucket_count(range_) - origin_clause, origin_params = _origin_sql_filter(origin) - db_path: str = await get_fail2ban_db_path(socket_path) - log.info( - "ban_service_ban_trend", - db_path=db_path, - since=since, - range=range_, - origin=origin, - bucket_secs=bucket_secs, - num_buckets=num_buckets, - ) + if source not in ("fail2ban", "archive"): + raise ValueError(f"Unsupported source: {source!r}") - counts = await fail2ban_db_repo.get_ban_counts_by_bucket( - db_path=db_path, - since=since, - bucket_secs=bucket_secs, - num_buckets=num_buckets, - origin=origin, - ) + if source == "archive": + if app_db is None: + raise ValueError("app_db must be provided when source is 'archive'") + + from app.repositories.history_archive_repo import get_all_archived_history + + all_rows = await get_all_archived_history( + db=app_db, + since=since, + origin=origin, + action="ban", + ) + + counts: list[int] = [0] * num_buckets + for row in all_rows: + timeofban = int(row["timeofban"]) + bucket_index = int((timeofban - since) / bucket_secs) + if 0 <= bucket_index < num_buckets: + counts[bucket_index] += 1 + + log.info( + "ban_service_ban_trend", + source=source, + since=since, + range=range_, + origin=origin, + bucket_secs=bucket_secs, + num_buckets=num_buckets, + ) + else: + db_path: str = await get_fail2ban_db_path(socket_path) + log.info( + "ban_service_ban_trend", + db_path=db_path, + since=since, + range=range_, + origin=origin, + bucket_secs=bucket_secs, + num_buckets=num_buckets, + ) + + counts = await fail2ban_db_repo.get_ban_counts_by_bucket( + db_path=db_path, + since=since, + bucket_secs=bucket_secs, + num_buckets=num_buckets, + origin=origin, + ) buckets: list[BanTrendBucket] = [ BanTrendBucket( @@ -501,6 +630,8 @@ async def bans_by_jail( socket_path: str, range_: TimeRange, *, + source: str = "fail2ban", + app_db: aiosqlite.Connection | None = None, origin: BanOrigin | None = None, ) -> BansByJailResponse: """Return ban counts aggregated per jail for the selected time window. @@ -522,38 +653,75 @@ async def bans_by_jail( sorted descending and the total ban count. """ since: int = _since_unix(range_) - origin_clause, origin_params = _origin_sql_filter(origin) - db_path: str = await get_fail2ban_db_path(socket_path) - log.debug( - "ban_service_bans_by_jail", - db_path=db_path, - since=since, - since_iso=ts_to_iso(since), - range=range_, - origin=origin, - ) + if source not in ("fail2ban", "archive"): + raise ValueError(f"Unsupported source: {source!r}") - total, jail_counts = await fail2ban_db_repo.get_bans_by_jail( - db_path=db_path, - since=since, - origin=origin, - ) + if source == "archive": + if app_db is None: + raise ValueError("app_db must be provided when source is 'archive'") - # Diagnostic guard: if zero results were returned, check whether the table - # has *any* rows and log a warning with min/max timeofban so operators can - # diagnose timezone or filter mismatches from logs. - if total == 0: - table_row_count, min_timeofban, max_timeofban = await fail2ban_db_repo.get_bans_table_summary(db_path) - if table_row_count > 0: - log.warning( - "ban_service_bans_by_jail_empty_despite_data", - table_row_count=table_row_count, - min_timeofban=min_timeofban, - max_timeofban=max_timeofban, - since=since, - range=range_, - ) + from app.repositories.history_archive_repo import get_all_archived_history + + all_rows = await get_all_archived_history( + db=app_db, + since=since, + origin=origin, + action="ban", + ) + + jail_counter: dict[str, int] = {} + for row in all_rows: + jail_name = str(row["jail"]) + jail_counter[jail_name] = jail_counter.get(jail_name, 0) + 1 + + total = sum(jail_counter.values()) + jail_counts = [ + JailBanCountModel(jail=jail_name, count=count) + for jail_name, count in sorted(jail_counter.items(), key=lambda x: x[1], reverse=True) + ] + + log.debug( + "ban_service_bans_by_jail", + source=source, + since=since, + since_iso=ts_to_iso(since), + range=range_, + origin=origin, + ) + else: + origin_clause, origin_params = _origin_sql_filter(origin) + + db_path: str = await get_fail2ban_db_path(socket_path) + log.debug( + "ban_service_bans_by_jail", + db_path=db_path, + since=since, + since_iso=ts_to_iso(since), + range=range_, + origin=origin, + ) + + total, jail_counts = await fail2ban_db_repo.get_bans_by_jail( + db_path=db_path, + since=since, + origin=origin, + ) + + # Diagnostic guard: if zero results were returned, check whether the table + # has *any* rows and log a warning with min/max timeofban so operators can + # diagnose timezone or filter mismatches from logs. + if total == 0: + table_row_count, min_timeofban, max_timeofban = await fail2ban_db_repo.get_bans_table_summary(db_path) + if table_row_count > 0: + log.warning( + "ban_service_bans_by_jail_empty_despite_data", + table_row_count=table_row_count, + min_timeofban=min_timeofban, + max_timeofban=max_timeofban, + since=since, + range=range_, + ) log.debug( "ban_service_bans_by_jail_result", diff --git a/backend/app/services/history_service.py b/backend/app/services/history_service.py index 9662edc..5f38309 100644 --- a/backend/app/services/history_service.py +++ b/backend/app/services/history_service.py @@ -16,6 +16,8 @@ from typing import TYPE_CHECKING import structlog if TYPE_CHECKING: + import aiosqlite + from app.models.geo import GeoEnricher from app.models.ban import TIME_RANGE_SECONDS, BanOrigin, TimeRange @@ -63,9 +65,11 @@ async def list_history( jail: str | None = None, ip_filter: str | None = None, origin: BanOrigin | None = None, + source: str = "fail2ban", page: int = 1, page_size: int = _DEFAULT_PAGE_SIZE, geo_enricher: GeoEnricher | None = None, + db: aiosqlite.Connection | None = None, ) -> HistoryListResponse: """Return a paginated list of historical ban records with optional filters. @@ -104,55 +108,111 @@ async def list_history( page=page, ) - rows, total = await fail2ban_db_repo.get_history_page( - db_path=db_path, - since=since, - jail=jail, - ip_filter=ip_filter, - origin=origin, - page=page, - page_size=effective_page_size, - ) - items: list[HistoryBanItem] = [] - for row in rows: - jail_name: str = row.jail - ip: str = row.ip - banned_at: str = ts_to_iso(row.timeofban) - ban_count: int = row.bancount - matches, failures = parse_data_json(row.data) + total: int - country_code: str | None = None - country_name: str | None = None - asn: str | None = None - org: str | None = None + if source == "archive": + if db is None: + raise ValueError("db must be provided when source is 'archive'") - if geo_enricher is not None: - try: - geo = await geo_enricher(ip) - if geo is not None: - country_code = geo.country_code - country_name = geo.country_name - asn = geo.asn - org = geo.org - except Exception: # noqa: BLE001 - log.warning("history_service_geo_lookup_failed", ip=ip) + from app.repositories.history_archive_repo import get_archived_history - items.append( - HistoryBanItem( - ip=ip, - jail=jail_name, - banned_at=banned_at, - ban_count=ban_count, - failures=failures, - matches=matches, - country_code=country_code, - country_name=country_name, - asn=asn, - org=org, - ) + archived_rows, total = await get_archived_history( + db=db, + since=since, + jail=jail, + ip_filter=ip_filter, + page=page, + page_size=effective_page_size, ) + for row in archived_rows: + jail_name = row["jail"] + ip = row["ip"] + banned_at = ts_to_iso(int(row["timeofban"])) + ban_count = int(row["bancount"]) + matches, failures = parse_data_json(row["data"]) + # archive records may include actions; we treat all as history + + country_code = None + country_name = None + asn = None + org = None + + if geo_enricher is not None: + try: + geo = await geo_enricher(ip) + if geo is not None: + country_code = geo.country_code + country_name = geo.country_name + asn = geo.asn + org = geo.org + except Exception: # noqa: BLE001 + log.warning("history_service_geo_lookup_failed", ip=ip) + + items.append( + HistoryBanItem( + ip=ip, + jail=jail_name, + banned_at=banned_at, + ban_count=ban_count, + failures=failures, + matches=matches, + country_code=country_code, + country_name=country_name, + asn=asn, + org=org, + ) + ) + else: + rows, total = await fail2ban_db_repo.get_history_page( + db_path=db_path, + since=since, + jail=jail, + ip_filter=ip_filter, + origin=origin, + page=page, + page_size=effective_page_size, + ) + + for row in rows: + jail_name: str = row.jail + ip: str = row.ip + banned_at: str = ts_to_iso(row.timeofban) + ban_count: int = row.bancount + matches, failures = parse_data_json(row.data) + + country_code: str | None = None + country_name: str | None = None + asn: str | None = None + org: str | None = None + + if geo_enricher is not None: + try: + geo = await geo_enricher(ip) + if geo is not None: + country_code = geo.country_code + country_name = geo.country_name + asn = geo.asn + org = geo.org + except Exception: # noqa: BLE001 + log.warning("history_service_geo_lookup_failed", ip=ip) + + items.append( + HistoryBanItem( + ip=ip, + jail=jail_name, + banned_at=banned_at, + ban_count=ban_count, + failures=failures, + matches=matches, + country_code=country_code, + country_name=country_name, + asn=asn, + org=org, + ) + ) + return HistoryListResponse( items=items, total=total, diff --git a/backend/app/tasks/history_sync.py b/backend/app/tasks/history_sync.py new file mode 100644 index 0000000..b6ea3d3 --- /dev/null +++ b/backend/app/tasks/history_sync.py @@ -0,0 +1,109 @@ +"""History sync background task. + +Periodically copies new records from the fail2ban sqlite database into the +BanGUI application archive table to prevent gaps when fail2ban purges old rows. +""" + +from __future__ import annotations + +import datetime +from typing import TYPE_CHECKING + +import structlog + +from app.repositories import fail2ban_db_repo +from app.utils.fail2ban_db_utils import get_fail2ban_db_path + +if TYPE_CHECKING: # pragma: no cover + from fastapi import FastAPI + +log: structlog.stdlib.BoundLogger = structlog.get_logger() + +#: Stable APScheduler job id. +JOB_ID: str = "history_sync" + +#: Interval in seconds between sync runs. +HISTORY_SYNC_INTERVAL: int = 300 + +#: Backfill window when archive is empty (seconds). +BACKFILL_WINDOW: int = 7 * 86400 + + +async def _get_last_archive_ts(db) -> int | None: + async with db.execute("SELECT MAX(timeofban) FROM history_archive") as cur: + row = await cur.fetchone() + if row is None or row[0] is None: + return None + return int(row[0]) + + +async def _run_sync(app: FastAPI) -> None: + db = app.state.db + socket_path: str = app.state.settings.fail2ban_socket + + try: + last_ts = await _get_last_archive_ts(db) + now_ts = int(datetime.datetime.now(datetime.UTC).timestamp()) + + if last_ts is None: + last_ts = now_ts - BACKFILL_WINDOW + log.info("history_sync_backfill", window_seconds=BACKFILL_WINDOW) + + per_page = 500 + next_since = last_ts + total_synced = 0 + + while True: + fail2ban_db_path = await get_fail2ban_db_path(socket_path) + rows, total = await fail2ban_db_repo.get_history_page( + db_path=fail2ban_db_path, + since=next_since, + page=1, + page_size=per_page, + ) + + if not rows: + break + + from app.repositories.history_archive_repo import archive_ban_event + + for row in rows: + await archive_ban_event( + db=db, + jail=row.jail, + ip=row.ip, + timeofban=row.timeofban, + bancount=row.bancount, + data=row.data, + action="ban", + ) + total_synced += 1 + + # Continue where we left off by max timeofban + 1. + max_time = max(row.timeofban for row in rows) + next_since = max_time + 1 + + if len(rows) < per_page: + break + + log.info("history_sync_complete", synced=total_synced) + + except Exception: + log.exception("history_sync_failed") + + +def register(app: FastAPI) -> None: + """Register the history sync periodic job. + + Should be called after scheduler startup, from the lifespan handler. + """ + app.state.scheduler.add_job( + _run_sync, + trigger="interval", + seconds=HISTORY_SYNC_INTERVAL, + kwargs={"app": app}, + id=JOB_ID, + replace_existing=True, + next_run_time=datetime.datetime.now(tz=datetime.UTC), + ) + log.info("history_sync_scheduled", interval_seconds=HISTORY_SYNC_INTERVAL) diff --git a/backend/tests/test_routers/test_dashboard.py b/backend/tests/test_routers/test_dashboard.py index 1353bf8..20bcade 100644 --- a/backend/tests/test_routers/test_dashboard.py +++ b/backend/tests/test_routers/test_dashboard.py @@ -290,6 +290,17 @@ class TestDashboardBans: called_range = mock_list.call_args[0][1] assert called_range == "7d" + async def test_accepts_source_param( + self, dashboard_client: AsyncClient + ) -> None: + """The ``source`` query parameter is forwarded to ban_service.""" + mock_list = AsyncMock(return_value=_make_ban_list_response()) + with patch("app.routers.dashboard.ban_service.list_bans", new=mock_list): + await dashboard_client.get("/api/dashboard/bans?source=archive") + + called_source = mock_list.call_args[1]["source"] + assert called_source == "archive" + async def test_empty_ban_list_returns_zero_total( self, dashboard_client: AsyncClient ) -> None: @@ -492,6 +503,16 @@ class TestDashboardBansOriginField: origins = {ban["origin"] for ban in bans} assert origins == {"blocklist", "selfblock"} + async def test_bans_by_country_source_param_forwarded( + self, dashboard_client: AsyncClient + ) -> None: + """The ``source`` query parameter is forwarded to bans_by_country.""" + mock_fn = AsyncMock(return_value=_make_bans_by_country_response()) + with patch("app.routers.dashboard.ban_service.bans_by_country", new=mock_fn): + await dashboard_client.get("/api/dashboard/bans/by-country?source=archive") + + assert mock_fn.call_args[1]["source"] == "archive" + async def test_blocklist_origin_serialised_correctly( self, dashboard_client: AsyncClient ) -> None: diff --git a/backend/tests/test_services/test_ban_service.py b/backend/tests/test_services/test_ban_service.py index 97393d6..bf5cefd 100644 --- a/backend/tests/test_services/test_ban_service.py +++ b/backend/tests/test_services/test_ban_service.py @@ -11,6 +11,7 @@ from unittest.mock import AsyncMock, patch import aiosqlite import pytest +from app.db import init_db from app.services import ban_service # --------------------------------------------------------------------------- @@ -143,6 +144,29 @@ async def empty_f2b_db_path(tmp_path: Path) -> str: return path +@pytest.fixture +async def app_db_with_archive(tmp_path: Path) -> aiosqlite.Connection: + """Return an app database connection pre-populated with archived ban rows.""" + db_path = str(tmp_path / "app_archive.db") + db = await aiosqlite.connect(db_path) + db.row_factory = aiosqlite.Row + await init_db(db) + + await db.execute( + "INSERT INTO history_archive (jail, ip, timeofban, bancount, data, action) VALUES (?, ?, ?, ?, ?, ?)", + ("sshd", "1.2.3.4", _ONE_HOUR_AGO, 1, '{"matches": ["fail"], "failures": 1}', "ban"), + ) + await db.execute( + "INSERT INTO history_archive (jail, ip, timeofban, bancount, data, action) VALUES (?, ?, ?, ?, ?, ?)", + ("nginx", "5.6.7.8", _ONE_HOUR_AGO, 1, '{"matches": ["fail"], "failures": 2}', "ban"), + ) + await db.commit() + + yield db + + await db.close() + + # --------------------------------------------------------------------------- # list_bans — happy path # --------------------------------------------------------------------------- @@ -233,6 +257,20 @@ class TestListBansHappyPath: assert result.total == 3 + async def test_source_archive_reads_from_archive( + self, app_db_with_archive: aiosqlite.Connection + ) -> None: + """Using source='archive' reads from the BanGUI archive table.""" + result = await ban_service.list_bans( + "/fake/sock", + "24h", + source="archive", + app_db=app_db_with_archive, + ) + + assert result.total == 2 + assert {item.ip for item in result.items} == {"1.2.3.4", "5.6.7.8"} + # --------------------------------------------------------------------------- # list_bans — geo enrichment @@ -616,6 +654,20 @@ class TestOriginFilter: assert result.total == 3 + async def test_bans_by_country_source_archive_reads_archive( + self, app_db_with_archive: aiosqlite.Connection + ) -> None: + """``bans_by_country`` accepts source='archive' and reads archived rows.""" + result = await ban_service.bans_by_country( + "/fake/sock", + "24h", + source="archive", + app_db=app_db_with_archive, + ) + + assert result.total == 2 + assert len(result.bans) == 2 + # --------------------------------------------------------------------------- # bans_by_country — background geo resolution (Task 3) @@ -802,6 +854,19 @@ class TestBanTrend: timestamps = [b.timestamp for b in result.buckets] assert timestamps == sorted(timestamps) + async def test_ban_trend_source_archive_reads_archive( + self, app_db_with_archive: aiosqlite.Connection + ) -> None: + """``ban_trend`` accepts source='archive' and uses archived rows.""" + result = await ban_service.ban_trend( + "/fake/sock", + "24h", + source="archive", + app_db=app_db_with_archive, + ) + + assert sum(b.count for b in result.buckets) == 2 + async def test_bans_counted_in_correct_bucket(self, tmp_path: Path) -> None: """A ban at a known time appears in the expected bucket.""" import time as _time @@ -1018,6 +1083,20 @@ class TestBansByJail: assert result.total == 3 assert len(result.jails) == 3 + async def test_bans_by_jail_source_archive_reads_archive( + self, app_db_with_archive: aiosqlite.Connection + ) -> None: + """``bans_by_jail`` accepts source='archive' and aggregates archived rows.""" + result = await ban_service.bans_by_jail( + "/fake/sock", + "24h", + source="archive", + app_db=app_db_with_archive, + ) + + assert result.total == 2 + assert any(j.jail == "sshd" for j in result.jails) + async def test_diagnostic_warning_when_zero_results_despite_data( self, tmp_path: Path ) -> None: diff --git a/backend/tests/test_services/test_history_service.py b/backend/tests/test_services/test_history_service.py index 6e6694d..9b5d0fc 100644 --- a/backend/tests/test_services/test_history_service.py +++ b/backend/tests/test_services/test_history_service.py @@ -11,6 +11,7 @@ from unittest.mock import AsyncMock, patch import aiosqlite import pytest +from app.db import init_db from app.services import history_service # --------------------------------------------------------------------------- @@ -264,6 +265,31 @@ class TestListHistory: assert result.page == 1 assert result.page_size == 2 + async def test_source_archive_reads_from_archive(self, f2b_db_path: str, tmp_path: Path) -> None: + """Using source='archive' reads from the BanGUI archive table.""" + app_db_path = str(tmp_path / "app_archive.db") + async with aiosqlite.connect(app_db_path) as db: + db.row_factory = aiosqlite.Row + await init_db(db) + await db.execute( + "INSERT INTO history_archive (jail, ip, timeofban, bancount, data, action) VALUES (?, ?, ?, ?, ?, ?)", + ("sshd", "10.0.0.1", _ONE_HOUR_AGO, 1, '{"matches": [], "failures": 0}', "ban"), + ) + await db.commit() + + with patch( + "app.services.history_service.get_fail2ban_db_path", + new=AsyncMock(return_value=f2b_db_path), + ): + result = await history_service.list_history( + "fake_socket", + source="archive", + db=db, + ) + + assert result.total == 1 + assert result.items[0].ip == "10.0.0.1" + # --------------------------------------------------------------------------- # get_ip_detail tests