"""History service. Queries the fail2ban SQLite database for all historical ban records. Supports filtering by jail, IP, and time range. For per-IP forensics the service provides a full ban timeline with matched log lines and failure counts. All database I/O uses aiosqlite in **read-only** mode so BanGUI never modifies or locks the fail2ban database. """ from __future__ import annotations from datetime import UTC, datetime from typing import Any import aiosqlite import structlog from app.models.ban import BLOCKLIST_JAIL, BanOrigin, TIME_RANGE_SECONDS, TimeRange from app.models.history import ( HistoryBanItem, HistoryListResponse, IpDetailResponse, IpTimelineEvent, ) from app.services.ban_service import _get_fail2ban_db_path, _parse_data_json, _ts_to_iso log: structlog.stdlib.BoundLogger = structlog.get_logger() # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- _DEFAULT_PAGE_SIZE: int = 100 _MAX_PAGE_SIZE: int = 500 def _since_unix(range_: TimeRange) -> int: """Return the Unix timestamp for the start of the given time window. Args: range_: One of the supported time-range presets. Returns: Unix timestamp (seconds since epoch) equal to *now − range_*. """ seconds: int = TIME_RANGE_SECONDS[range_] return int(datetime.now(tz=UTC).timestamp()) - seconds # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- async def list_history( socket_path: str, *, range_: TimeRange | None = None, jail: str | None = None, origin: BanOrigin | None = None, ip_filter: str | None = None, page: int = 1, page_size: int = _DEFAULT_PAGE_SIZE, geo_enricher: Any | None = None, ) -> HistoryListResponse: """Return a paginated list of historical ban records with optional filters. Queries the fail2ban ``bans`` table applying the requested filters and returns a paginated list ordered newest-first. When *geo_enricher* is supplied, each record is enriched with country and ASN data. Args: socket_path: Path to the fail2ban Unix domain socket. range_: Time-range preset. ``None`` means all-time (no time filter). jail: If given, restrict results to bans from this jail. origin: Optional origin filter — ``"blocklist"`` restricts results to the ``blocklist-import`` jail, ``"selfblock"`` excludes it. ip_filter: If given, restrict results to bans for this exact IP (or a prefix — the query uses ``LIKE ip_filter%``). page: 1-based page number (default: ``1``). page_size: Maximum items per page, capped at ``_MAX_PAGE_SIZE``. geo_enricher: Optional async callable ``(ip: str) -> GeoInfo | None``. Returns: :class:`~app.models.history.HistoryListResponse` with paginated items and the total matching count. """ effective_page_size: int = min(page_size, _MAX_PAGE_SIZE) offset: int = (page - 1) * effective_page_size # Build WHERE clauses dynamically. wheres: list[str] = [] params: list[Any] = [] if range_ is not None: since: int = _since_unix(range_) wheres.append("timeofban >= ?") params.append(since) if jail is not None: wheres.append("jail = ?") params.append(jail) if origin is not None: if origin == "blocklist": wheres.append("jail = ?") params.append(BLOCKLIST_JAIL) elif origin == "selfblock": wheres.append("jail != ?") params.append(BLOCKLIST_JAIL) if ip_filter is not None: wheres.append("ip LIKE ?") params.append(f"{ip_filter}%") where_sql: str = ("WHERE " + " AND ".join(wheres)) if wheres else "" db_path: str = await _get_fail2ban_db_path(socket_path) log.info( "history_service_list", db_path=db_path, range=range_, jail=jail, ip_filter=ip_filter, page=page, ) async with aiosqlite.connect(f"file:{db_path}?mode=ro", uri=True) as f2b_db: f2b_db.row_factory = aiosqlite.Row async with f2b_db.execute( f"SELECT COUNT(*) FROM bans {where_sql}", # noqa: S608 params, ) as cur: count_row = await cur.fetchone() total: int = int(count_row[0]) if count_row else 0 async with f2b_db.execute( f"SELECT jail, ip, timeofban, bancount, data " # noqa: S608 f"FROM bans {where_sql} " "ORDER BY timeofban DESC " "LIMIT ? OFFSET ?", [*params, effective_page_size, offset], ) as cur: rows = await cur.fetchall() items: list[HistoryBanItem] = [] for row in rows: jail_name: str = str(row["jail"]) ip: str = str(row["ip"]) banned_at: str = _ts_to_iso(int(row["timeofban"])) ban_count: int = int(row["bancount"]) matches, failures = _parse_data_json(row["data"]) country_code: str | None = None country_name: str | None = None asn: str | None = None org: str | None = None if geo_enricher is not None: try: geo = await geo_enricher(ip) if geo is not None: country_code = geo.country_code country_name = geo.country_name asn = geo.asn org = geo.org except Exception: # noqa: BLE001 log.warning("history_service_geo_lookup_failed", ip=ip) items.append( HistoryBanItem( ip=ip, jail=jail_name, banned_at=banned_at, ban_count=ban_count, failures=failures, matches=matches, country_code=country_code, country_name=country_name, asn=asn, org=org, ) ) return HistoryListResponse( items=items, total=total, page=page, page_size=effective_page_size, ) async def get_ip_detail( socket_path: str, ip: str, *, geo_enricher: Any | None = None, ) -> IpDetailResponse | None: """Return the full historical record for a single IP address. Fetches all ban events for *ip* from the fail2ban database, ordered newest-first. Aggregates total bans, total failures, and the timestamp of the most recent ban. Optionally enriches with geolocation data. Args: socket_path: Path to the fail2ban Unix domain socket. ip: The IP address to look up. geo_enricher: Optional async callable ``(ip: str) -> GeoInfo | None``. Returns: :class:`~app.models.history.IpDetailResponse` if any records exist for *ip*, or ``None`` if the IP has no history in the database. """ db_path: str = await _get_fail2ban_db_path(socket_path) log.info("history_service_ip_detail", db_path=db_path, ip=ip) async with aiosqlite.connect(f"file:{db_path}?mode=ro", uri=True) as f2b_db: f2b_db.row_factory = aiosqlite.Row async with f2b_db.execute( "SELECT jail, ip, timeofban, bancount, data " "FROM bans " "WHERE ip = ? " "ORDER BY timeofban DESC", (ip,), ) as cur: rows = await cur.fetchall() if not rows: return None timeline: list[IpTimelineEvent] = [] total_failures: int = 0 for row in rows: jail_name: str = str(row["jail"]) banned_at: str = _ts_to_iso(int(row["timeofban"])) ban_count: int = int(row["bancount"]) matches, failures = _parse_data_json(row["data"]) total_failures += failures timeline.append( IpTimelineEvent( jail=jail_name, banned_at=banned_at, ban_count=ban_count, failures=failures, matches=matches, ) ) last_ban_at: str | None = timeline[0].banned_at if timeline else None country_code: str | None = None country_name: str | None = None asn: str | None = None org: str | None = None if geo_enricher is not None: try: geo = await geo_enricher(ip) if geo is not None: country_code = geo.country_code country_name = geo.country_name asn = geo.asn org = geo.org except Exception: # noqa: BLE001 log.warning("history_service_geo_lookup_failed_detail", ip=ip) return IpDetailResponse( ip=ip, total_bans=len(timeline), total_failures=total_failures, last_ban_at=last_ban_at, country_code=country_code, country_name=country_name, asn=asn, org=org, timeline=timeline, )