"""History service. Queries the fail2ban SQLite database for all historical ban records. Supports filtering by jail, IP, and time range. For per-IP forensics the service provides a full ban timeline with matched log lines and failure counts. All database I/O uses aiosqlite in **read-only** mode so BanGUI never modifies or locks the fail2ban database. """ from __future__ import annotations from datetime import UTC, datetime import structlog from app.services.geo_service import GeoEnricher from app.models.ban import TIME_RANGE_SECONDS, TimeRange from app.models.history import ( HistoryBanItem, HistoryListResponse, IpDetailResponse, IpTimelineEvent, ) from app.repositories import fail2ban_db_repo from app.services.ban_service import _get_fail2ban_db_path, _parse_data_json, _ts_to_iso log: structlog.stdlib.BoundLogger = structlog.get_logger() # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- _DEFAULT_PAGE_SIZE: int = 100 _MAX_PAGE_SIZE: int = 500 def _since_unix(range_: TimeRange) -> int: """Return the Unix timestamp for the start of the given time window. Args: range_: One of the supported time-range presets. Returns: Unix timestamp (seconds since epoch) equal to *now − range_*. """ seconds: int = TIME_RANGE_SECONDS[range_] return int(datetime.now(tz=UTC).timestamp()) - seconds # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- async def list_history( socket_path: str, *, range_: TimeRange | None = None, jail: str | None = None, ip_filter: str | None = None, page: int = 1, page_size: int = _DEFAULT_PAGE_SIZE, geo_enricher: GeoEnricher | None = None, ) -> HistoryListResponse: """Return a paginated list of historical ban records with optional filters. Queries the fail2ban ``bans`` table applying the requested filters and returns a paginated list ordered newest-first. When *geo_enricher* is supplied, each record is enriched with country and ASN data. Args: socket_path: Path to the fail2ban Unix domain socket. range_: Time-range preset. ``None`` means all-time (no time filter). jail: If given, restrict results to bans from this jail. ip_filter: If given, restrict results to bans for this exact IP (or a prefix — the query uses ``LIKE ip_filter%``). page: 1-based page number (default: ``1``). page_size: Maximum items per page, capped at ``_MAX_PAGE_SIZE``. geo_enricher: Optional async callable ``(ip: str) -> GeoInfo | None``. Returns: :class:`~app.models.history.HistoryListResponse` with paginated items and the total matching count. """ effective_page_size: int = min(page_size, _MAX_PAGE_SIZE) # Build WHERE clauses dynamically. since: int | None = None if range_ is not None: since = _since_unix(range_) db_path: str = await _get_fail2ban_db_path(socket_path) log.info( "history_service_list", db_path=db_path, range=range_, jail=jail, ip_filter=ip_filter, page=page, ) rows, total = await fail2ban_db_repo.get_history_page( db_path=db_path, since=since, jail=jail, ip_filter=ip_filter, page=page, page_size=effective_page_size, ) items: list[HistoryBanItem] = [] for row in rows: jail_name: str = row.jail ip: str = row.ip banned_at: str = _ts_to_iso(row.timeofban) ban_count: int = row.bancount matches, failures = _parse_data_json(row.data) country_code: str | None = None country_name: str | None = None asn: str | None = None org: str | None = None if geo_enricher is not None: try: geo = await geo_enricher(ip) if geo is not None: country_code = geo.country_code country_name = geo.country_name asn = geo.asn org = geo.org except Exception: # noqa: BLE001 log.warning("history_service_geo_lookup_failed", ip=ip) items.append( HistoryBanItem( ip=ip, jail=jail_name, banned_at=banned_at, ban_count=ban_count, failures=failures, matches=matches, country_code=country_code, country_name=country_name, asn=asn, org=org, ) ) return HistoryListResponse( items=items, total=total, page=page, page_size=effective_page_size, ) async def get_ip_detail( socket_path: str, ip: str, *, geo_enricher: GeoEnricher | None = None, ) -> IpDetailResponse | None: """Return the full historical record for a single IP address. Fetches all ban events for *ip* from the fail2ban database, ordered newest-first. Aggregates total bans, total failures, and the timestamp of the most recent ban. Optionally enriches with geolocation data. Args: socket_path: Path to the fail2ban Unix domain socket. ip: The IP address to look up. geo_enricher: Optional async callable ``(ip: str) -> GeoInfo | None``. Returns: :class:`~app.models.history.IpDetailResponse` if any records exist for *ip*, or ``None`` if the IP has no history in the database. """ db_path: str = await _get_fail2ban_db_path(socket_path) log.info("history_service_ip_detail", db_path=db_path, ip=ip) rows = await fail2ban_db_repo.get_history_for_ip(db_path=db_path, ip=ip) if not rows: return None timeline: list[IpTimelineEvent] = [] total_failures: int = 0 for row in rows: jail_name: str = row.jail banned_at: str = _ts_to_iso(row.timeofban) ban_count: int = row.bancount matches, failures = _parse_data_json(row.data) total_failures += failures timeline.append( IpTimelineEvent( jail=jail_name, banned_at=banned_at, ban_count=ban_count, failures=failures, matches=matches, ) ) last_ban_at: str | None = timeline[0].banned_at if timeline else None country_code: str | None = None country_name: str | None = None asn: str | None = None org: str | None = None if geo_enricher is not None: try: geo = await geo_enricher(ip) if geo is not None: country_code = geo.country_code country_name = geo.country_name asn = geo.asn org = geo.org except Exception: # noqa: BLE001 log.warning("history_service_geo_lookup_failed_detail", ip=ip) return IpDetailResponse( ip=ip, total_bans=len(timeline), total_failures=total_failures, last_ban_at=last_ban_at, country_code=country_code, country_name=country_name, asn=asn, org=org, timeline=timeline, )