"""History service. Queries the fail2ban SQLite database for all historical ban records. Supports filtering by jail, IP, and time range. For per-IP forensics the service provides a full ban timeline with matched log lines and failure counts. All fail2ban database I/O uses aiosqlite in **read-only** mode so BanGUI never modifies or locks the fail2ban database. """ from __future__ import annotations from datetime import UTC, datetime from typing import TYPE_CHECKING import structlog if TYPE_CHECKING: import aiohttp import aiosqlite from app.models._common import TimeRange from app.models.ban import BanOrigin from app.models.geo import GeoEnricher, GeoInfo from app.repositories.protocols import HistoryArchiveRepository from app.services.protocols import Fail2BanMetadataService from app.models.history_domain import ( DomainHistoryBanItem, DomainHistoryList, DomainIpDetail, DomainIpTimelineEvent, ) from app.repositories import fail2ban_db_repo from app.repositories import history_archive_repo as default_history_archive_repo from app.utils.constants import DEFAULT_PAGE_SIZE from app.utils.fail2ban_db_utils import parse_data_json, ts_to_iso from app.utils.time_utils import since_unix log: structlog.stdlib.BoundLogger = structlog.get_logger() # --------------------------------------------------------------------------- # Internal Helpers # --------------------------------------------------------------------------- async def _get_fail2ban_db_path(socket_path: str) -> str: """Get the fail2ban database path (testable via mocking). This internal helper allows tests to patch the dependency without direct service coupling. In production, routers inject the Fail2BanMetadataService via dependency injection. Args: socket_path: Path to the fail2ban Unix domain socket. Returns: The resolved fail2ban SQLite database path. """ from app.services.fail2ban_metadata_service import ( # noqa: PLC0415 default_fail2ban_metadata_service, ) return await default_fail2ban_metadata_service.get_db_path(socket_path) async def _resolve_geo_info( ip: str, *, http_session: aiohttp.ClientSession | None = None, geo_enricher: GeoEnricher | None = None, ) -> GeoInfo | None: """Resolve geolocation information for a single IP address. The explicit *geo_enricher* has priority over *http_session*. When no geo_enricher is provided, no HTTP lookups are performed. Args: ip: The IP address to look up. http_session: Unused; kept for backward compatibility. geo_enricher: Optional async callable ``(ip: str) -> GeoInfo | None``. Returns: Geolocation info if available, or ``None``. """ if geo_enricher is not None: return await geo_enricher(ip) return None _HISTORY_SYNC_PAGE_SIZE: int = 500 _HISTORY_SYNC_BACKFILL_WINDOW: int = 648000 async def _get_last_archive_ts( db: aiosqlite.Connection, history_archive_repo: HistoryArchiveRepository = default_history_archive_repo, ) -> int | None: """Return the most recent archived ban timestamp, or ``None`` if empty.""" return await history_archive_repo.get_max_timeofban(db) async def sync_from_fail2ban_db( db: aiosqlite.Connection, socket_path: str, history_archive_repo: HistoryArchiveRepository = default_history_archive_repo, fail2ban_metadata_service: Fail2BanMetadataService | None = None, ) -> int: """Copy new records from the fail2ban DB into the BanGUI archive table. Args: db: Application database connection for the archive table. socket_path: Path to the fail2ban Unix domain socket. history_archive_repo: Repository for persisting archived ban events. fail2ban_metadata_service: Service for resolving the fail2ban DB path. If not provided, uses the default singleton (lazy import). Returns: Number of fail2ban records scanned and archived. """ if fail2ban_metadata_service is None: from app.services.fail2ban_metadata_service import ( # noqa: PLC0415 default_fail2ban_metadata_service, ) fail2ban_metadata_service = default_fail2ban_metadata_service last_ts = await _get_last_archive_ts(db, history_archive_repo=history_archive_repo) now_ts = int(datetime.now(tz=UTC).timestamp()) if last_ts is None: last_ts = now_ts - _HISTORY_SYNC_BACKFILL_WINDOW log.info("history_sync_backfill", window_seconds=_HISTORY_SYNC_BACKFILL_WINDOW) next_since = last_ts + 1 total_synced = 0 while True: fail2ban_db_path = await fail2ban_metadata_service.get_db_path(socket_path) rows, _ = await fail2ban_db_repo.get_history_page( db_path=fail2ban_db_path, since=next_since, page=1, page_size=_HISTORY_SYNC_PAGE_SIZE, ) if not rows: break for row in rows: await history_archive_repo.archive_ban_event( db=db, jail=row.jail, ip=row.ip, timeofban=row.timeofban, bancount=row.bancount, data=row.data, action="ban", ) total_synced += len(rows) next_since = max(row.timeofban for row in rows) + 1 if len(rows) < _HISTORY_SYNC_PAGE_SIZE: break log.info("history_sync_completed", synced=total_synced) return total_synced # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- async def list_history( socket_path: str, *, range_: TimeRange | None = None, jail: str | None = None, ip_filter: str | None = None, origin: BanOrigin | None = None, source: str = "fail2ban", page: int = 1, page_size: int = DEFAULT_PAGE_SIZE, max_page_size: int = 500, http_session: aiohttp.ClientSession | None = None, geo_enricher: GeoEnricher | None = None, db: aiosqlite.Connection | None = None, history_archive_repo: HistoryArchiveRepository = default_history_archive_repo, fail2ban_metadata_service: Fail2BanMetadataService | None = None, ) -> DomainHistoryList: """Return a paginated list of historical ban records with optional filters. Queries the fail2ban ``bans`` table applying the requested filters and returns a paginated list ordered newest-first. When *geo_enricher* is supplied, each record is enriched with country and ASN data. Args: socket_path: Path to the fail2ban Unix domain socket. range_: Time-range preset. ``None`` means all-time (no time filter). jail: If given, restrict results to bans from this jail. ip_filter: If given, restrict results to bans for this exact IP (or a prefix — the query uses ``LIKE ip_filter%``). page: 1-based page number (default: ``1``). page_size: Maximum items per page, capped at ``max_page_size``. max_page_size: Deployment-configured maximum page size (default: ``500``). http_session: Optional shared :class:`aiohttp.ClientSession` (unused; kept for backward compatibility). geo_enricher: Optional async callable ``(ip: str) -> GeoInfo | None``. db: Application database connection (required when source is 'archive'). history_archive_repo: Repository for accessing archived ban events. fail2ban_metadata_service: Service for resolving the fail2ban DB path. If not provided, uses the default singleton (lazy import). Returns: :class:`~app.models.history_domain.DomainHistoryList` with paginated items and the total matching count. """ effective_page_size: int = min(page_size, max_page_size) # Build WHERE clauses dynamically. since: int | None = None if range_ is not None: since = since_unix(range_) if fail2ban_metadata_service is None: db_path: str = await _get_fail2ban_db_path(socket_path) else: db_path = await fail2ban_metadata_service.get_db_path(socket_path) log.info( "history_service_list", db_path=db_path, range=range_, jail=jail, ip_filter=ip_filter, page=page, ) items: list[DomainHistoryBanItem] = [] total: int if source == "archive": if db is None: raise ValueError("db must be provided when source is 'archive'") archived_rows, total = await history_archive_repo.get_archived_history( db=db, since=since, jail=jail, ip_filter=ip_filter, page=page, page_size=effective_page_size, ) for row in archived_rows: jail_name = row["jail"] ip = row["ip"] banned_at = ts_to_iso(int(row["timeofban"])) ban_count = int(row["bancount"]) matches, failures = parse_data_json(row["data"]) # archive records may include actions; we treat all as history country_code = None country_name = None asn = None org = None try: geo = await _resolve_geo_info( ip, http_session=http_session, geo_enricher=geo_enricher, ) if geo is not None: country_code = geo.country_code country_name = geo.country_name asn = geo.asn org = geo.org except Exception: # noqa: BLE001 log.warning("history_service_geo_lookup_failed", ip=ip) items.append( DomainHistoryBanItem( ip=ip, jail=jail_name, banned_at=banned_at, ban_count=ban_count, failures=failures, matches=matches, country_code=country_code, country_name=country_name, asn=asn, org=org, ) ) else: rows, total = await fail2ban_db_repo.get_history_page( db_path=db_path, since=since, jail=jail, ip_filter=ip_filter, origin=origin, page=page, page_size=effective_page_size, ) for row in rows: jail_name: str = row.jail ip: str = row.ip banned_at: str = ts_to_iso(row.timeofban) ban_count: int = row.bancount matches, failures = parse_data_json(row.data) country_code: str | None = None country_name: str | None = None asn: str | None = None org: str | None = None try: geo = await _resolve_geo_info( ip, http_session=http_session, geo_enricher=geo_enricher, ) if geo is not None: country_code = geo.country_code country_name = geo.country_name asn = geo.asn org = geo.org except Exception: # noqa: BLE001 log.warning("history_service_geo_lookup_failed", ip=ip) items.append( DomainHistoryBanItem( ip=ip, jail=jail_name, banned_at=banned_at, ban_count=ban_count, failures=failures, matches=matches, country_code=country_code, country_name=country_name, asn=asn, org=org, ) ) return DomainHistoryList( items=items, total=total, page=page, page_size=effective_page_size, ) async def get_ip_detail( socket_path: str, ip: str, *, http_session: aiohttp.ClientSession | None = None, geo_enricher: GeoEnricher | None = None, fail2ban_metadata_service: Fail2BanMetadataService | None = None, ) -> DomainIpDetail | None: """Return the full historical record for a single IP address. Fetches all ban events for *ip* from the fail2ban database, ordered newest-first. Aggregates total bans, total failures, and the timestamp of the most recent ban. Optionally enriches with geolocation data. Args: socket_path: Path to the fail2ban Unix domain socket. ip: The IP address to look up. http_session: Optional shared :class:`aiohttp.ClientSession` (unused; kept for backward compatibility). geo_enricher: Optional async callable ``(ip: str) -> GeoInfo | None``. fail2ban_metadata_service: Service for resolving the fail2ban DB path. If not provided, uses the default singleton (lazy import). Returns: :class:`~app.models.history_domain.DomainIpDetail` if any records exist for *ip*, or ``None`` if the IP has no history in the database. """ if fail2ban_metadata_service is None: db_path: str = await _get_fail2ban_db_path(socket_path) else: db_path = await fail2ban_metadata_service.get_db_path(socket_path) log.info("history_service_ip_detail", db_path=db_path, ip=ip) rows = await fail2ban_db_repo.get_history_for_ip(db_path=db_path, ip=ip) if not rows: return None timeline: list[DomainIpTimelineEvent] = [] total_failures: int = 0 for row in rows: jail_name: str = row.jail banned_at: str = ts_to_iso(row.timeofban) ban_count: int = row.bancount matches, failures = parse_data_json(row.data) total_failures += failures timeline.append( DomainIpTimelineEvent( jail=jail_name, banned_at=banned_at, ban_count=ban_count, failures=failures, matches=matches, ) ) last_ban_at: str | None = timeline[0].banned_at if timeline else None country_code: str | None = None country_name: str | None = None asn: str | None = None org: str | None = None try: geo = await _resolve_geo_info( ip, http_session=http_session, geo_enricher=geo_enricher, ) if geo is not None: country_code = geo.country_code country_name = geo.country_name asn = geo.asn org = geo.org except Exception: # noqa: BLE001 log.warning("history_service_geo_lookup_failed_detail", ip=ip) return DomainIpDetail( ip=ip, total_bans=len(timeline), total_failures=total_failures, last_ban_at=last_ban_at, country_code=country_code, country_name=country_name, asn=asn, org=org, timeline=timeline, )