Files
BanGUI/backend/app/services/server_service.py
Lukas 7ec80fdeec refactor(logging): replace structlog with stdlib logging compat layer
- Remove structlog dependency from backend/pyproject.toml
- Add app.utils.logging_compat shim for keyword-arg logging API
- Add app.utils.json_formatter for JSON log output with extra fields
- Update all backend modules to use logging_compat.get_logger()
- Update docstrings in log_sanitizer.py and json_formatter.py
- Update test comment in test_async_utils.py
- Record 406 failing tests in Docs/Tasks.md for tracking
2026-05-10 13:37:54 +02:00

211 lines
7.2 KiB
Python

"""Server-level settings service.
Provides methods to read and update fail2ban server-level settings
(log level, log target, database configuration) via the Unix domain socket.
Also exposes the ``flushlogs`` command for use after log rotation.
Architecture note: this module is a pure service — it contains **no**
HTTP/FastAPI concerns.
"""
from __future__ import annotations
from typing import cast
from app.utils.logging_compat import get_logger
from app.exceptions import Fail2BanConnectionError, Fail2BanProtocolError, ServerOperationError
from app.models.server import ServerSettingsUpdate
from app.models.server_domain import DomainServerSettings, DomainServerSettingsResult
from app.utils.constants import FAIL2BAN_SOCKET_TIMEOUT
from app.utils.fail2ban_client import Fail2BanClient, Fail2BanCommand, Fail2BanResponse
from app.utils.fail2ban_response import ok
# ---------------------------------------------------------------------------
# Types
# ---------------------------------------------------------------------------
type Fail2BanSettingValue = str | int | bool
"""Allowed values for server settings commands."""
log = get_logger(__name__)
def _to_int(value: object | None, default: int) -> int:
"""Convert a raw value to an int, falling back to a default.
The fail2ban control socket can return either int or str values for some
settings, so we normalise them here in a type-safe way.
"""
if isinstance(value, int):
return value
if isinstance(value, float):
return int(value)
if isinstance(value, str):
try:
return int(value)
except ValueError:
return default
return default
def _to_str(value: object | None, default: str) -> str:
"""Convert a raw value to a string, falling back to a default."""
if value is None:
return default
return str(value)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
async def _safe_get(
client: Fail2BanClient,
command: Fail2BanCommand,
default: object | None = None,
) -> object | None:
"""Send a command and silently return *default* on any error.
Args:
client: The :class:`~app.utils.fail2ban_client.Fail2BanClient` to use.
command: Command list to send.
default: Fallback value.
Returns:
The successful response, or *default*.
"""
try:
response = await client.send(command)
return ok(cast("Fail2BanResponse", response))
except (Fail2BanConnectionError, Fail2BanProtocolError, ValueError):
return default
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def get_settings(socket_path: str) -> DomainServerSettingsResult:
"""Return current fail2ban server-level settings.
Fetches log level, log target, syslog socket, database file path, purge
age, and max matches in a single round-trip batch.
Args:
socket_path: Path to the fail2ban Unix domain socket.
Returns:
:class:`~app.models.server_domain.DomainServerSettingsResult`.
Raises:
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
"""
#: Error contract: RETURN_DEFAULT. Fail2ban socket may be unavailable on
#: fresh boot; UI should still render with empty/default values.
#: Error contract: ABORT_ON_ERROR. Raises on invalid response from fail2ban.
#: Router converts Fail2BanConnectionError to HTTP 503.
import asyncio
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
(
log_level_raw,
log_target_raw,
syslog_socket_raw,
db_path_raw,
db_purge_age_raw,
db_max_matches_raw,
) = await asyncio.gather(
_safe_get(client, ["get", "loglevel"], "INFO"),
_safe_get(client, ["get", "logtarget"], "STDOUT"),
_safe_get(client, ["get", "syslogsocket"], None),
_safe_get(client, ["get", "dbfile"], "/var/lib/fail2ban/fail2ban.sqlite3"),
_safe_get(client, ["get", "dbpurgeage"], 86400),
_safe_get(client, ["get", "dbmaxmatches"], 10),
)
log_level = _to_str(log_level_raw, "INFO").upper()
log_target = _to_str(log_target_raw, "STDOUT")
syslog_socket = _to_str(syslog_socket_raw, "") or None
db_path = _to_str(db_path_raw, "/var/lib/fail2ban/fail2ban.sqlite3")
db_purge_age = _to_int(db_purge_age_raw, 86400)
db_max_matches = _to_int(db_max_matches_raw, 10)
settings = DomainServerSettings(
log_level=log_level,
log_target=log_target,
syslog_socket=syslog_socket,
db_path=db_path,
db_purge_age=db_purge_age,
db_max_matches=db_max_matches,
)
warnings: dict[str, bool] = {
"db_purge_age_too_low": db_purge_age < 86400,
}
log.info("server_settings_fetched", db_purge_age=db_purge_age, warnings=warnings)
return DomainServerSettingsResult(settings=settings, warnings=warnings)
async def update_settings(socket_path: str, update: ServerSettingsUpdate) -> None:
"""Apply *update* to fail2ban server-level settings.
Only non-None fields in *update* are sent.
Args:
socket_path: Path to the fail2ban Unix domain socket.
update: Partial update payload.
Error contract: ABORT_ON_ERROR. Raises ServerOperationError (400) or
Fail2BanConnectionError (503). Router converts to HTTP.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
async def _set(key: str, value: Fail2BanSettingValue) -> None:
try:
response = await client.send(["set", key, value])
ok(cast("Fail2BanResponse", response))
except ValueError as exc:
raise ServerOperationError(f"Failed to set {key!r} = {value!r}: {exc}") from exc
if update.log_level is not None:
await _set("loglevel", update.log_level.upper())
if update.log_target is not None:
await _set("logtarget", update.log_target)
if update.db_purge_age is not None:
await _set("dbpurgeage", update.db_purge_age)
if update.db_max_matches is not None:
await _set("dbmaxmatches", update.db_max_matches)
log.info("server_settings_updated")
async def flush_logs(socket_path: str) -> str:
"""Flush and re-open fail2ban log files.
Useful after log rotation so the daemon starts writing to the newly
created file rather than the old rotated one.
Args:
socket_path: Path to the fail2ban Unix domain socket.
Returns:
The response message from fail2ban (e.g. ``"OK"``) as a string.
Raises:
ServerOperationError: If the command is rejected.
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
try:
response = await client.send(["flushlogs"])
result = ok(cast("Fail2BanResponse", response))
log.info("logs_flushed", result=result)
return str(result)
except ValueError as exc:
raise ServerOperationError(f"flushlogs failed: {exc}") from exc