Files
BanGUI/backend/app/services/history_service.py
Lukas ac2028e1c2 Fix: Consolidate divergent _since_unix implementations (T-09)
Consolidate the two divergent implementations of _since_unix from ban_service.py
and history_service.py into a single shared utility function in time_utils.py.

Changes:
- Move _since_unix to app/utils/time_utils.py with consistent time.time() approach
- Move TIME_RANGE_SLACK_SECONDS constant to app/utils/constants.py
- Update ban_service.py to import since_unix from time_utils
- Update history_service.py to import since_unix from time_utils
- Both services now use the same window boundary calculation with 60-second slack
- Add comprehensive tests for the shared since_unix function
- Document timestamp handling rationale in Backend-Development.md

This ensures dashboard and history queries return consistent row counts for the
same time range by using the same timestamp calculation and slack window across
all services.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-25 18:44:59 +02:00

401 lines
13 KiB
Python

"""History service.
Queries the fail2ban SQLite database for all historical ban records.
Supports filtering by jail, IP, and time range. For per-IP forensics the
service provides a full ban timeline with matched log lines and failure counts.
All fail2ban database I/O uses aiosqlite in **read-only** mode so BanGUI
never modifies or locks the fail2ban database.
"""
from __future__ import annotations
from datetime import UTC, datetime
from typing import TYPE_CHECKING
import structlog
from app.models.ban import BanOrigin, TimeRange
from app.services import geo_service
if TYPE_CHECKING:
import aiohttp
import aiosqlite
from app.models.geo import GeoEnricher, GeoInfo
from app.repositories.protocols import HistoryArchiveRepository
from app.models.history import (
HistoryBanItem,
HistoryListResponse,
IpDetailResponse,
IpTimelineEvent,
)
from app.repositories import fail2ban_db_repo
from app.repositories import history_archive_repo as default_history_archive_repo
from app.services.fail2ban_metadata_service import default_fail2ban_metadata_service
from app.utils.constants import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
from app.utils.fail2ban_db_utils import parse_data_json, ts_to_iso
from app.utils.time_utils import since_unix
log: structlog.stdlib.BoundLogger = structlog.get_logger()
async def get_fail2ban_db_path(socket_path: str) -> str:
"""Return the fail2ban database path using the shared metadata cache."""
return await default_fail2ban_metadata_service.get_db_path(socket_path)
# ---------------------------------------------------------------------------
# Internal Helpers
# ---------------------------------------------------------------------------
async def _resolve_geo_info(
ip: str,
*,
http_session: aiohttp.ClientSession | None = None,
geo_enricher: GeoEnricher | None = None,
) -> GeoInfo | None:
"""Resolve geolocation information for a single IP address.
The explicit *geo_enricher* has priority over *http_session*. When an
HTTP session is provided, the service uses :func:`geo_service.lookup` as a
default enrichment strategy.
"""
if geo_enricher is not None:
return await geo_enricher(ip)
if http_session is not None:
return await geo_service.lookup(ip, http_session)
return None
_HISTORY_SYNC_PAGE_SIZE: int = 500
_HISTORY_SYNC_BACKFILL_WINDOW: int = 648000
async def _get_last_archive_ts(
db: aiosqlite.Connection,
history_archive_repo: HistoryArchiveRepository = default_history_archive_repo,
) -> int | None:
"""Return the most recent archived ban timestamp, or ``None`` if empty."""
return await history_archive_repo.get_max_timeofban(db)
async def sync_from_fail2ban_db(
db: aiosqlite.Connection,
socket_path: str,
history_archive_repo: HistoryArchiveRepository = default_history_archive_repo,
) -> int:
"""Copy new records from the fail2ban DB into the BanGUI archive table.
Args:
db: Application database connection for the archive table.
socket_path: Path to the fail2ban Unix domain socket.
Returns:
Number of fail2ban records scanned and archived.
"""
last_ts = await _get_last_archive_ts(db, history_archive_repo=history_archive_repo)
now_ts = int(datetime.now(tz=UTC).timestamp())
if last_ts is None:
last_ts = now_ts - _HISTORY_SYNC_BACKFILL_WINDOW
log.info("history_sync_backfill", window_seconds=_HISTORY_SYNC_BACKFILL_WINDOW)
next_since = last_ts + 1
total_synced = 0
while True:
fail2ban_db_path = await get_fail2ban_db_path(socket_path)
rows, _ = await fail2ban_db_repo.get_history_page(
db_path=fail2ban_db_path,
since=next_since,
page=1,
page_size=_HISTORY_SYNC_PAGE_SIZE,
)
if not rows:
break
for row in rows:
await history_archive_repo.archive_ban_event(
db=db,
jail=row.jail,
ip=row.ip,
timeofban=row.timeofban,
bancount=row.bancount,
data=row.data,
action="ban",
)
total_synced += len(rows)
next_since = max(row.timeofban for row in rows) + 1
if len(rows) < _HISTORY_SYNC_PAGE_SIZE:
break
log.info("history_sync_completed", synced=total_synced)
return total_synced
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def list_history(
socket_path: str,
*,
range_: TimeRange | None = None,
jail: str | None = None,
ip_filter: str | None = None,
origin: BanOrigin | None = None,
source: str = "fail2ban",
page: int = 1,
page_size: int = DEFAULT_PAGE_SIZE,
http_session: aiohttp.ClientSession | None = None,
geo_enricher: GeoEnricher | None = None,
db: aiosqlite.Connection | None = None,
history_archive_repo: HistoryArchiveRepository = default_history_archive_repo,
) -> HistoryListResponse:
"""Return a paginated list of historical ban records with optional filters.
Queries the fail2ban ``bans`` table applying the requested filters and
returns a paginated list ordered newest-first. When *geo_enricher* is
supplied, each record is enriched with country and ASN data.
Args:
socket_path: Path to the fail2ban Unix domain socket.
range_: Time-range preset. ``None`` means all-time (no time filter).
jail: If given, restrict results to bans from this jail.
ip_filter: If given, restrict results to bans for this exact IP
(or a prefix — the query uses ``LIKE ip_filter%``).
page: 1-based page number (default: ``1``).
page_size: Maximum items per page, capped at ``MAX_PAGE_SIZE``.
http_session: Optional shared :class:`aiohttp.ClientSession` used for
geo lookups when no explicit *geo_enricher* is provided.
geo_enricher: Optional async callable ``(ip: str) -> GeoInfo | None``.
Returns:
:class:`~app.models.history.HistoryListResponse` with paginated items
and the total matching count.
"""
effective_page_size: int = min(page_size, MAX_PAGE_SIZE)
# Build WHERE clauses dynamically.
since: int | None = None
if range_ is not None:
since = since_unix(range_)
db_path: str = await get_fail2ban_db_path(socket_path)
log.info(
"history_service_list",
db_path=db_path,
range=range_,
jail=jail,
ip_filter=ip_filter,
page=page,
)
items: list[HistoryBanItem] = []
total: int
if source == "archive":
if db is None:
raise ValueError("db must be provided when source is 'archive'")
archived_rows, total = await history_archive_repo.get_archived_history(
db=db,
since=since,
jail=jail,
ip_filter=ip_filter,
page=page,
page_size=effective_page_size,
)
for row in archived_rows:
jail_name = row["jail"]
ip = row["ip"]
banned_at = ts_to_iso(int(row["timeofban"]))
ban_count = int(row["bancount"])
matches, failures = parse_data_json(row["data"])
# archive records may include actions; we treat all as history
country_code = None
country_name = None
asn = None
org = None
try:
geo = await _resolve_geo_info(
ip,
http_session=http_session,
geo_enricher=geo_enricher,
)
if geo is not None:
country_code = geo.country_code
country_name = geo.country_name
asn = geo.asn
org = geo.org
except Exception: # noqa: BLE001
log.warning("history_service_geo_lookup_failed", ip=ip)
items.append(
HistoryBanItem(
ip=ip,
jail=jail_name,
banned_at=banned_at,
ban_count=ban_count,
failures=failures,
matches=matches,
country_code=country_code,
country_name=country_name,
asn=asn,
org=org,
)
)
else:
rows, total = await fail2ban_db_repo.get_history_page(
db_path=db_path,
since=since,
jail=jail,
ip_filter=ip_filter,
origin=origin,
page=page,
page_size=effective_page_size,
)
for row in rows:
jail_name: str = row.jail
ip: str = row.ip
banned_at: str = ts_to_iso(row.timeofban)
ban_count: int = row.bancount
matches, failures = parse_data_json(row.data)
country_code: str | None = None
country_name: str | None = None
asn: str | None = None
org: str | None = None
try:
geo = await _resolve_geo_info(
ip,
http_session=http_session,
geo_enricher=geo_enricher,
)
if geo is not None:
country_code = geo.country_code
country_name = geo.country_name
asn = geo.asn
org = geo.org
except Exception: # noqa: BLE001
log.warning("history_service_geo_lookup_failed", ip=ip)
items.append(
HistoryBanItem(
ip=ip,
jail=jail_name,
banned_at=banned_at,
ban_count=ban_count,
failures=failures,
matches=matches,
country_code=country_code,
country_name=country_name,
asn=asn,
org=org,
)
)
return HistoryListResponse(
items=items,
total=total,
page=page,
page_size=effective_page_size,
)
async def get_ip_detail(
socket_path: str,
ip: str,
*,
http_session: aiohttp.ClientSession | None = None,
geo_enricher: GeoEnricher | None = None,
) -> IpDetailResponse | None:
"""Return the full historical record for a single IP address.
Fetches all ban events for *ip* from the fail2ban database, ordered
newest-first. Aggregates total bans, total failures, and the timestamp of
the most recent ban. Optionally enriches with geolocation data.
Args:
socket_path: Path to the fail2ban Unix domain socket.
ip: The IP address to look up.
http_session: Optional shared :class:`aiohttp.ClientSession` used for
geo lookups when no explicit *geo_enricher* is provided.
geo_enricher: Optional async callable ``(ip: str) -> GeoInfo | None``.
Returns:
:class:`~app.models.history.IpDetailResponse` if any records exist
for *ip*, or ``None`` if the IP has no history in the database.
"""
db_path: str = await get_fail2ban_db_path(socket_path)
log.info("history_service_ip_detail", db_path=db_path, ip=ip)
rows = await fail2ban_db_repo.get_history_for_ip(db_path=db_path, ip=ip)
if not rows:
return None
timeline: list[IpTimelineEvent] = []
total_failures: int = 0
for row in rows:
jail_name: str = row.jail
banned_at: str = ts_to_iso(row.timeofban)
ban_count: int = row.bancount
matches, failures = parse_data_json(row.data)
total_failures += failures
timeline.append(
IpTimelineEvent(
jail=jail_name,
banned_at=banned_at,
ban_count=ban_count,
failures=failures,
matches=matches,
)
)
last_ban_at: str | None = timeline[0].banned_at if timeline else None
country_code: str | None = None
country_name: str | None = None
asn: str | None = None
org: str | None = None
try:
geo = await _resolve_geo_info(
ip,
http_session=http_session,
geo_enricher=geo_enricher,
)
if geo is not None:
country_code = geo.country_code
country_name = geo.country_name
asn = geo.asn
org = geo.org
except Exception: # noqa: BLE001
log.warning("history_service_geo_lookup_failed_detail", ip=ip)
return IpDetailResponse(
ip=ip,
total_bans=len(timeline),
total_failures=total_failures,
last_ban_at=last_ban_at,
country_code=country_code,
country_name=country_name,
asn=asn,
org=org,
timeline=timeline,
)