"""Utilities shared by fail2ban-related services.""" from __future__ import annotations import json from datetime import UTC, datetime def ts_to_iso(unix_ts: int) -> str: """Convert a Unix timestamp to an ISO 8601 UTC string.""" return datetime.fromtimestamp(unix_ts, tz=UTC).isoformat() async def get_fail2ban_db_path(socket_path: str) -> str: """Query fail2ban for the path to its SQLite database file.""" from app.utils.fail2ban_client import Fail2BanClient # pragma: no cover socket_timeout: float = 5.0 async with Fail2BanClient(socket_path, timeout=socket_timeout) as client: response = await client.send(["get", "dbfile"]) if not isinstance(response, tuple) or len(response) != 2: raise RuntimeError(f"Unexpected response from fail2ban: {response!r}") code, data = response if code != 0: raise RuntimeError(f"fail2ban error code {code}: {data!r}") if data is None: raise RuntimeError("fail2ban has no database configured (dbfile is None)") return str(data) def parse_data_json(raw: object) -> tuple[list[str], int]: """Extract matches and failure count from the fail2ban bans.data value.""" if raw is None: return [], 0 obj: dict[str, object] = {} if isinstance(raw, str): try: parsed = json.loads(raw) if isinstance(parsed, dict): obj = parsed except json.JSONDecodeError: return [], 0 elif isinstance(raw, dict): obj = raw raw_matches = obj.get("matches") matches = [str(m) for m in raw_matches] if isinstance(raw_matches, list) else [] raw_failures = obj.get("failures") failures = 0 if isinstance(raw_failures, (int, float, str)): try: failures = int(raw_failures) except (ValueError, TypeError): failures = 0 return matches, failures