95 lines
3.1 KiB
Python
95 lines
3.1 KiB
Python
"""Fail2Ban runtime metadata resolution and caching service."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
import structlog
|
|
|
|
from app.utils.fail2ban_client import (
|
|
Fail2BanClient,
|
|
Fail2BanConnectionError,
|
|
Fail2BanProtocolError,
|
|
)
|
|
|
|
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
|
|
|
|
|
class Fail2BanMetadataService:
|
|
"""Resolve and cache runtime fail2ban metadata.
|
|
|
|
This service is responsible for querying fail2ban for metadata such as the
|
|
path to the fail2ban SQLite database and caching the result so repeated
|
|
requests do not require a socket discovery round-trip.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._cache: dict[str, str] = {}
|
|
self._lock: asyncio.Lock[None] = asyncio.Lock()
|
|
|
|
async def get_db_path(self, socket_path: str, *, force_refresh: bool = False) -> str:
|
|
"""Return the cached fail2ban database path or resolve it if missing.
|
|
|
|
Args:
|
|
socket_path: The fail2ban Unix domain socket path.
|
|
force_refresh: If ``True``, attempt to resolve the DB path again even
|
|
when a cached value exists.
|
|
|
|
Returns:
|
|
The resolved fail2ban SQLite database path.
|
|
"""
|
|
cached = self._cache.get(socket_path)
|
|
if cached is not None and not force_refresh:
|
|
return cached
|
|
|
|
async with self._lock:
|
|
cached = self._cache.get(socket_path)
|
|
if cached is not None and not force_refresh:
|
|
return cached
|
|
|
|
try:
|
|
db_path = await self._resolve_db_path(socket_path)
|
|
except (Fail2BanConnectionError, Fail2BanProtocolError) as exc:
|
|
if cached is not None:
|
|
log.warning(
|
|
"fail2ban_metadata_service_fallback_to_cache",
|
|
socket_path=socket_path,
|
|
error=str(exc),
|
|
)
|
|
return cached
|
|
raise
|
|
|
|
self._cache[socket_path] = db_path
|
|
return db_path
|
|
|
|
async def _resolve_db_path(self, socket_path: str) -> str:
|
|
"""Query fail2ban for the configured database path."""
|
|
socket_timeout: float = 5.0
|
|
async with Fail2BanClient(socket_path, timeout=socket_timeout) as client:
|
|
response = await client.send(["get", "dbfile"])
|
|
|
|
if not isinstance(response, tuple) or len(response) != 2:
|
|
raise RuntimeError(f"Unexpected response from fail2ban: {response!r}")
|
|
|
|
code, data = response
|
|
if code != 0:
|
|
raise RuntimeError(f"fail2ban error code {code}: {data!r}")
|
|
|
|
if data is None:
|
|
raise RuntimeError("fail2ban has no database configured (dbfile is None)")
|
|
|
|
db_path = str(data)
|
|
log.info(
|
|
"fail2ban_metadata_service_resolved_db_path",
|
|
socket_path=socket_path,
|
|
db_path=db_path,
|
|
)
|
|
return db_path
|
|
|
|
def invalidate_db_path(self, socket_path: str) -> None:
|
|
"""Clear a cached DB path so it is re-resolved on next request."""
|
|
self._cache.pop(socket_path, None)
|
|
|
|
|
|
default_fail2ban_metadata_service = Fail2BanMetadataService()
|