refactor: complete Task 2/3 geo decouple + exceptions centralization; mark as done
This commit is contained in:
63
backend/app/utils/fail2ban_db_utils.py
Normal file
63
backend/app/utils/fail2ban_db_utils.py
Normal file
@@ -0,0 +1,63 @@
|
||||
"""Utilities shared by fail2ban-related services."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import UTC, datetime
|
||||
|
||||
|
||||
def ts_to_iso(unix_ts: int) -> str:
|
||||
"""Convert a Unix timestamp to an ISO 8601 UTC string."""
|
||||
return datetime.fromtimestamp(unix_ts, tz=UTC).isoformat()
|
||||
|
||||
|
||||
async def get_fail2ban_db_path(socket_path: str) -> str:
|
||||
"""Query fail2ban for the path to its SQLite database file."""
|
||||
from app.utils.fail2ban_client import Fail2BanClient # pragma: no cover
|
||||
|
||||
socket_timeout: float = 5.0
|
||||
|
||||
async with Fail2BanClient(socket_path, timeout=socket_timeout) as client:
|
||||
response = await client.send(["get", "dbfile"])
|
||||
|
||||
if not isinstance(response, tuple) or len(response) != 2:
|
||||
raise RuntimeError(f"Unexpected response from fail2ban: {response!r}")
|
||||
|
||||
code, data = response
|
||||
if code != 0:
|
||||
raise RuntimeError(f"fail2ban error code {code}: {data!r}")
|
||||
|
||||
if data is None:
|
||||
raise RuntimeError("fail2ban has no database configured (dbfile is None)")
|
||||
|
||||
return str(data)
|
||||
|
||||
|
||||
def parse_data_json(raw: object) -> tuple[list[str], int]:
|
||||
"""Extract matches and failure count from the fail2ban bans.data value."""
|
||||
if raw is None:
|
||||
return [], 0
|
||||
|
||||
obj: dict[str, object] = {}
|
||||
if isinstance(raw, str):
|
||||
try:
|
||||
parsed = json.loads(raw)
|
||||
if isinstance(parsed, dict):
|
||||
obj = parsed
|
||||
except json.JSONDecodeError:
|
||||
return [], 0
|
||||
elif isinstance(raw, dict):
|
||||
obj = raw
|
||||
|
||||
raw_matches = obj.get("matches")
|
||||
matches = [str(m) for m in raw_matches] if isinstance(raw_matches, list) else []
|
||||
|
||||
raw_failures = obj.get("failures")
|
||||
failures = 0
|
||||
if isinstance(raw_failures, (int, float, str)):
|
||||
try:
|
||||
failures = int(raw_failures)
|
||||
except (ValueError, TypeError):
|
||||
failures = 0
|
||||
|
||||
return matches, failures
|
||||
Reference in New Issue
Block a user