"""Utilities shared by fail2ban-related services.""" from __future__ import annotations import json from datetime import UTC, datetime from app.services.fail2ban_metadata_service import default_fail2ban_metadata_service def ts_to_iso(unix_ts: int) -> str: """Convert a Unix timestamp to an ISO 8601 UTC string.""" return datetime.fromtimestamp(unix_ts, tz=UTC).isoformat() async def get_fail2ban_db_path(socket_path: str, *, force_refresh: bool = False) -> str: """Return the fail2ban database path, using cached metadata when available.""" return await default_fail2ban_metadata_service.get_db_path( socket_path, force_refresh=force_refresh ) def invalidate_fail2ban_db_path(socket_path: str) -> None: """Invalidate the cached fail2ban database path for the given socket.""" default_fail2ban_metadata_service.invalidate_db_path(socket_path) def parse_data_json(raw: object) -> tuple[list[str], int]: """Extract matches and failure count from the fail2ban bans.data value.""" if raw is None: return [], 0 obj: dict[str, object] = {} if isinstance(raw, str): try: parsed = json.loads(raw) if isinstance(parsed, dict): obj = parsed except json.JSONDecodeError: return [], 0 elif isinstance(raw, dict): obj = raw raw_matches = obj.get("matches") matches = [str(m) for m in raw_matches] if isinstance(raw_matches, list) else [] raw_failures = obj.get("failures") failures = 0 if isinstance(raw_failures, (int, float, str)): try: failures = int(raw_failures) except (ValueError, TypeError): failures = 0 return matches, failures