Files
BanGUI/backend/app/utils/fail2ban_db_utils.py

56 lines
1.7 KiB
Python

"""Utilities shared by fail2ban-related services."""
from __future__ import annotations
import json
from datetime import UTC, datetime
from app.services.fail2ban_metadata_service import default_fail2ban_metadata_service
def ts_to_iso(unix_ts: int) -> str:
"""Convert a Unix timestamp to an ISO 8601 UTC string."""
return datetime.fromtimestamp(unix_ts, tz=UTC).isoformat()
async def get_fail2ban_db_path(socket_path: str, *, force_refresh: bool = False) -> str:
"""Return the fail2ban database path, using cached metadata when available."""
return await default_fail2ban_metadata_service.get_db_path(
socket_path, force_refresh=force_refresh
)
def invalidate_fail2ban_db_path(socket_path: str) -> None:
"""Invalidate the cached fail2ban database path for the given socket."""
default_fail2ban_metadata_service.invalidate_db_path(socket_path)
def parse_data_json(raw: object) -> tuple[list[str], int]:
"""Extract matches and failure count from the fail2ban bans.data value."""
if raw is None:
return [], 0
obj: dict[str, object] = {}
if isinstance(raw, str):
try:
parsed = json.loads(raw)
if isinstance(parsed, dict):
obj = parsed
except json.JSONDecodeError:
return [], 0
elif isinstance(raw, dict):
obj = raw
raw_matches = obj.get("matches")
matches = [str(m) for m in raw_matches] if isinstance(raw_matches, list) else []
raw_failures = obj.get("failures")
failures = 0
if isinstance(raw_failures, (int, float, str)):
try:
failures = int(raw_failures)
except (ValueError, TypeError):
failures = 0
return matches, failures