Files
BanGUI/backend/app/services/config_service.py
Lukas 7ad885d276 refactor: separate config service from jail config service
- Split config_service.py into config_service.py and jail_config_service.py
- Update Docs/Tasks.md, Security.md, TROUBLESHOOTING.md
2026-05-03 01:05:18 +02:00

611 lines
22 KiB
Python

"""Configuration inspection and editing service.
Provides methods to read and update fail2ban jail configuration and global
server settings via the Unix domain socket. Regex validation is performed
locally with Python's :mod:`re` module before any write is sent to the daemon
so that invalid patterns are rejected early.
Architecture note: this module is a pure service — it contains **no**
HTTP/FastAPI concerns. All results are returned as Pydantic models so
routers can serialise them directly.
"""
from __future__ import annotations
import asyncio
import contextlib
import re
from typing import TYPE_CHECKING, TypeVar, cast
import structlog
from app.utils.fail2ban_client import Fail2BanCommand, Fail2BanToken
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable
import aiosqlite
from app.exceptions import ConfigOperationError, ConfigValidationError, JailNotFoundError
from app.models.config import (
AddLogPathRequest,
GlobalConfigUpdate,
JailConfigUpdate,
LogPreviewRequest,
MapColorThresholdsUpdate,
RegexTestRequest,
)
from app.models.config_domain import (
DomainBantimeEscalation,
DomainGlobalConfig,
DomainJailConfig,
DomainJailConfigList,
)
from app.services.log_service import preview_log as util_preview_log
from app.services.log_service import test_regex as util_test_regex
from app.services.settings_service import (
get_map_color_thresholds as util_get_map_color_thresholds,
)
from app.services.settings_service import (
set_map_color_thresholds as util_set_map_color_thresholds,
)
from app.utils.constants import FAIL2BAN_SOCKET_TIMEOUT
from app.utils.fail2ban_client import Fail2BanClient
from app.utils.fail2ban_response import (
ensure_list,
is_not_found_error,
ok,
to_dict,
)
from app.utils.path_utils import validate_log_target
log: structlog.stdlib.BoundLogger = structlog.get_logger()
# ---------------------------------------------------------------------------
# Custom exceptions
# ---------------------------------------------------------------------------
# (exceptions are now defined in app.exceptions and imported above)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
T = TypeVar("T")
async def _safe_get(
client: Fail2BanClient,
command: Fail2BanCommand,
default: object | None = None,
) -> object | None:
"""Send a command and return *default* if it fails."""
try:
return ok(await client.send(command))
except Exception:
return default
async def _safe_get_typed[T](
client: Fail2BanClient,
command: Fail2BanCommand,
default: T,
) -> T:
"""Send a command and return the result typed as ``default``'s type."""
return cast("T", await _safe_get(client, command, default))
def _validate_regex(pattern: str) -> str | None:
"""Validate *pattern* for syntax correctness and ReDoS vulnerabilities.
Args:
pattern: A regex pattern string to validate.
Returns:
``None`` if valid, or an error message string if the pattern is broken
or detected as a ReDoS vulnerability.
"""
from app.utils.regex_validator import (
ReDoSDetectedError,
RegexTimeoutError,
validate_regex_pattern,
)
try:
validate_regex_pattern(pattern)
return None
except (ReDoSDetectedError, RegexTimeoutError) as exc:
return str(exc)
except re.error as exc:
return str(exc)
# ---------------------------------------------------------------------------
# Public API — read jail configuration
# ---------------------------------------------------------------------------
async def get_jail_config(socket_path: str, name: str) -> DomainJailConfig:
"""Return the editable configuration for a single jail.
Args:
socket_path: Path to the fail2ban Unix domain socket.
name: Jail name.
Returns:
:class:`~app.models.config_domain.DomainJailConfig`.
Raises:
JailNotFoundError: If *name* is not a known jail.
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
# Verify existence.
try:
ok(await client.send(["status", name, "short"]))
except ValueError as exc:
if is_not_found_error(exc):
raise JailNotFoundError(name) from exc
raise
bantime_raw: int = await _safe_get_typed(client, ["get", name, "bantime"], 600)
findtime_raw: int = await _safe_get_typed(client, ["get", name, "findtime"], 600)
maxretry_raw: int = await _safe_get_typed(client, ["get", name, "maxretry"], 5)
failregex_raw: list[object] = await _safe_get_typed(client, ["get", name, "failregex"], [])
ignoreregex_raw: list[object] = await _safe_get_typed(client, ["get", name, "ignoreregex"], [])
logpath_raw: list[object] = await _safe_get_typed(client, ["get", name, "logpath"], [])
datepattern_raw: str | None = await _safe_get_typed(client, ["get", name, "datepattern"], None)
logencoding_raw: str = await _safe_get_typed(client, ["get", name, "logencoding"], "UTF-8")
backend_raw: str = await _safe_get_typed(client, ["get", name, "backend"], "polling")
usedns_raw: str = await _safe_get_typed(client, ["get", name, "usedns"], "warn")
prefregex_raw: str = await _safe_get_typed(client, ["get", name, "prefregex"], "")
actions_raw: list[object] = await _safe_get_typed(client, ["get", name, "actions"], [])
bt_increment_raw: bool = await _safe_get_typed(client, ["get", name, "bantime.increment"], False)
bt_factor_raw: str | float | None = await _safe_get_typed(client, ["get", name, "bantime.factor"], None)
bt_formula_raw: str | None = await _safe_get_typed(client, ["get", name, "bantime.formula"], None)
bt_multipliers_raw: str | None = await _safe_get_typed(client, ["get", name, "bantime.multipliers"], None)
bt_maxtime_raw: str | int | None = await _safe_get_typed(client, ["get", name, "bantime.maxtime"], None)
bt_rndtime_raw: str | int | None = await _safe_get_typed(client, ["get", name, "bantime.rndtime"], None)
bt_overalljails_raw: bool = await _safe_get_typed(client, ["get", name, "bantime.overalljails"], False)
bantime_escalation = DomainBantimeEscalation(
increment=bool(bt_increment_raw),
factor=float(bt_factor_raw) if bt_factor_raw is not None else None,
formula=str(bt_formula_raw) if bt_formula_raw else None,
multipliers=str(bt_multipliers_raw) if bt_multipliers_raw else None,
max_time=int(bt_maxtime_raw) if bt_maxtime_raw is not None else None,
rnd_time=int(bt_rndtime_raw) if bt_rndtime_raw is not None else None,
overall_jails=bool(bt_overalljails_raw),
)
jail_cfg = DomainJailConfig(
name=name,
ban_time=int(bantime_raw or 600),
find_time=int(findtime_raw or 600),
max_retry=int(maxretry_raw or 5),
fail_regex=ensure_list(failregex_raw),
ignore_regex=ensure_list(ignoreregex_raw),
log_paths=ensure_list(logpath_raw),
date_pattern=str(datepattern_raw) if datepattern_raw else None,
log_encoding=str(logencoding_raw or "UTF-8"),
backend=str(backend_raw or "polling"),
use_dns=str(usedns_raw or "warn"),
prefregex=str(prefregex_raw) if prefregex_raw else "",
actions=ensure_list(actions_raw),
bantime_escalation=bantime_escalation,
)
log.info("jail_config_fetched", jail=name)
return jail_cfg
async def list_jail_configs(socket_path: str) -> DomainJailConfigList:
"""Return configuration for all active jails.
Args:
socket_path: Path to the fail2ban Unix domain socket.
Returns:
:class:`~app.models.config_domain.DomainJailConfigList`.
Raises:
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
global_status = to_dict(ok(await client.send(["status"])))
jail_list_raw: str = str(global_status.get("Jail list", "") or "").strip()
jail_names: list[str] = (
[j.strip() for j in jail_list_raw.split(",") if j.strip()]
if jail_list_raw
else []
)
if not jail_names:
return DomainJailConfigList(items=[], total=0)
jail_configs: list[DomainJailConfig] = await asyncio.gather(
*[get_jail_config(socket_path, name) for name in jail_names],
return_exceptions=False,
)
log.info("jail_configs_listed", count=len(jail_configs))
return DomainJailConfigList(items=jail_configs, total=len(jail_configs))
# ---------------------------------------------------------------------------
# Public API — write jail configuration
# ---------------------------------------------------------------------------
async def update_jail_config(
socket_path: str,
name: str,
update: JailConfigUpdate,
) -> None:
"""Apply *update* to the configuration of a running jail.
Each non-None field in *update* is sent as a separate ``set`` command.
Regex patterns are validated locally before any write is sent.
Args:
socket_path: Path to the fail2ban Unix domain socket.
name: Jail name.
update: Partial update payload.
Raises:
JailNotFoundError: If *name* is not a known jail.
ConfigValidationError: If a regex pattern fails to compile.
ConfigOperationError: If a ``set`` command is rejected by fail2ban.
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
"""
# Validate all regex patterns before touching the daemon.
for pattern_list, field in [
(update.fail_regex, "fail_regex"),
(update.ignore_regex, "ignore_regex"),
]:
if pattern_list is None:
continue
for pattern in pattern_list:
err = _validate_regex(pattern)
if err:
raise ConfigValidationError(f"Invalid regex in {field!r}: {err!r} (pattern: {pattern!r})")
if update.prefregex is not None and update.prefregex:
err = _validate_regex(update.prefregex)
if err:
raise ConfigValidationError(f"Invalid regex in 'prefregex': {err!r} (pattern: {update.prefregex!r})")
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
# Verify existence.
try:
ok(await client.send(["status", name, "short"]))
except ValueError as exc:
if is_not_found_error(exc):
raise JailNotFoundError(name) from exc
raise
async def _set(key: str, value: Fail2BanToken) -> None:
try:
ok(await client.send(["set", name, key, value]))
except ValueError as exc:
raise ConfigOperationError(f"Failed to set {key!r} = {value!r}: {exc}") from exc
if update.ban_time is not None:
await _set("bantime", update.ban_time)
if update.find_time is not None:
await _set("findtime", update.find_time)
if update.max_retry is not None:
await _set("maxretry", update.max_retry)
if update.date_pattern is not None:
await _set("datepattern", update.date_pattern)
if update.dns_mode is not None:
await _set("usedns", update.dns_mode)
# backend is managed by fail2ban and cannot be changed at runtime by API.
# This field is therefore ignored during updates.
if update.log_encoding is not None:
await _set("logencoding", update.log_encoding)
if update.prefregex is not None:
await _set("prefregex", update.prefregex)
if update.enabled is not None:
await _set("idle", "off" if update.enabled else "on")
# Ban-time escalation fields.
if update.bantime_escalation is not None:
esc = update.bantime_escalation
if esc.increment is not None:
await _set("bantime.increment", "true" if esc.increment else "false")
if esc.factor is not None:
await _set("bantime.factor", str(esc.factor))
if esc.formula is not None:
await _set("bantime.formula", esc.formula)
if esc.multipliers is not None:
await _set("bantime.multipliers", esc.multipliers)
if esc.max_time is not None:
await _set("bantime.maxtime", esc.max_time)
if esc.rnd_time is not None:
await _set("bantime.rndtime", esc.rnd_time)
if esc.overall_jails is not None:
await _set("bantime.overalljails", "true" if esc.overall_jails else "false")
# Replacing regex lists requires deleting old entries then adding new ones.
if update.fail_regex is not None:
await _replace_regex_list(client, name, "failregex", update.fail_regex)
if update.ignore_regex is not None:
await _replace_regex_list(client, name, "ignoreregex", update.ignore_regex)
log.info("jail_config_updated", jail=name)
async def _replace_regex_list(
client: Fail2BanClient,
jail: str,
field: str,
new_patterns: list[str],
) -> None:
"""Replace the full regex list for *field* in *jail*.
Deletes all existing entries (highest index first to preserve ordering)
then inserts all *new_patterns* in order.
Args:
client: Shared :class:`~app.utils.fail2ban_client.Fail2BanClient`.
jail: Jail name.
field: Either ``"failregex"`` or ``"ignoreregex"``.
new_patterns: Replacement list (may be empty to clear).
"""
# Determine current count.
current_raw: list[object] = await _safe_get_typed(client, ["get", jail, field], [])
current: list[str] = ensure_list(current_raw)
del_cmd = f"del{field}"
add_cmd = f"add{field}"
# Delete in reverse order so indices stay stable.
for idx in range(len(current) - 1, -1, -1):
with contextlib.suppress(ValueError):
ok(await client.send(["set", jail, del_cmd, idx]))
# Add new patterns.
for pattern in new_patterns:
err = _validate_regex(pattern)
if err:
raise ConfigValidationError(f"Invalid regex: {err!r} (pattern: {pattern!r})")
try:
ok(await client.send(["set", jail, add_cmd, pattern]))
except ValueError as exc:
raise ConfigOperationError(f"Failed to add {field} pattern: {exc}") from exc
# ---------------------------------------------------------------------------
# Public API — global configuration
# ---------------------------------------------------------------------------
async def get_global_config(socket_path: str) -> DomainGlobalConfig:
"""Return fail2ban global configuration settings.
Args:
socket_path: Path to the fail2ban Unix domain socket.
Returns:
:class:`~app.models.config_domain.DomainGlobalConfig`.
Raises:
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
(
log_level_raw,
log_target_raw,
db_purge_age_raw,
db_max_matches_raw,
) = await asyncio.gather(
_safe_get_typed(client, ["get", "loglevel"], "INFO"),
_safe_get_typed(client, ["get", "logtarget"], "STDOUT"),
_safe_get_typed(client, ["get", "dbpurgeage"], 86400),
_safe_get_typed(client, ["get", "dbmaxmatches"], 10),
)
return DomainGlobalConfig(
log_level=str(log_level_raw or "INFO").upper(),
log_target=str(log_target_raw or "STDOUT"),
db_purge_age=int(db_purge_age_raw or 86400),
db_max_matches=int(db_max_matches_raw or 10),
)
async def update_global_config(socket_path: str, update: GlobalConfigUpdate) -> None:
"""Apply *update* to fail2ban global settings.
Args:
socket_path: Path to the fail2ban Unix domain socket.
update: Partial update payload.
Raises:
ConfigOperationError: If a ``set`` command is rejected.
ConfigValidationError: If log_target validation fails.
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
"""
if update.log_target is not None:
try:
validate_log_target(update.log_target)
except ValueError as e:
raise ConfigValidationError(str(e)) from e
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
async def _set_global(key: str, value: Fail2BanToken) -> None:
try:
ok(await client.send(["set", key, value]))
except ValueError as exc:
raise ConfigOperationError(f"Failed to set global {key!r} = {value!r}: {exc}") from exc
if update.log_level is not None:
await _set_global("loglevel", update.log_level.upper())
if update.log_target is not None:
await _set_global("logtarget", update.log_target)
if update.db_purge_age is not None:
await _set_global("dbpurgeage", update.db_purge_age)
if update.db_max_matches is not None:
await _set_global("dbmaxmatches", update.db_max_matches)
log.info("global_config_updated")
# ---------------------------------------------------------------------------
# Public API — regex tester (stateless, no socket)
# ---------------------------------------------------------------------------
def test_regex(request: RegexTestRequest) -> RegexTestResponse:
"""Proxy to log utilities for regex test without service imports."""
return util_test_regex(request)
# ---------------------------------------------------------------------------
# Public API — log observation
# ---------------------------------------------------------------------------
async def add_log_path(
socket_path: str,
jail: str,
req: AddLogPathRequest,
) -> None:
"""Add a log path to an existing jail.
Args:
socket_path: Path to the fail2ban Unix domain socket.
jail: Jail name to which the log path should be added.
req: :class:`~app.models.config.AddLogPathRequest` with the path to add.
Raises:
JailNotFoundError: If *jail* is not a known jail.
ConfigOperationError: If the command is rejected by fail2ban.
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
try:
ok(await client.send(["status", jail, "short"]))
except ValueError as exc:
if is_not_found_error(exc):
raise JailNotFoundError(jail) from exc
raise
tail_flag = "tail" if req.tail else "head"
try:
ok(await client.send(["set", jail, "addlogpath", req.log_path, tail_flag]))
log.info("log_path_added", jail=jail, path=req.log_path)
except ValueError as exc:
raise ConfigOperationError(f"Failed to add log path {req.log_path!r}: {exc}") from exc
async def delete_log_path(
socket_path: str,
jail: str,
log_path: str,
) -> None:
"""Remove a monitored log path from an existing jail.
Uses ``set <jail> dellogpath <path>`` to remove the path at runtime
without requiring a daemon restart.
Args:
socket_path: Path to the fail2ban Unix domain socket.
jail: Jail name from which the log path should be removed.
log_path: Absolute path of the log file to stop monitoring.
Raises:
JailNotFoundError: If *jail* is not a known jail.
ConfigOperationError: If the command is rejected by fail2ban.
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
try:
ok(await client.send(["status", jail, "short"]))
except ValueError as exc:
if is_not_found_error(exc):
raise JailNotFoundError(jail) from exc
raise
try:
ok(await client.send(["set", jail, "dellogpath", log_path]))
log.info("log_path_deleted", jail=jail, path=log_path)
except ValueError as exc:
raise ConfigOperationError(f"Failed to delete log path {log_path!r}: {exc}") from exc
async def preview_log(
req: LogPreviewRequest,
preview_fn: Callable[[LogPreviewRequest], Awaitable[LogPreviewResponse]] | None = None,
) -> LogPreviewResponse:
"""Proxy to an injectable log preview function."""
if preview_fn is None:
preview_fn = util_preview_log
return await preview_fn(req)
# ---------------------------------------------------------------------------
# Map color thresholds
# ---------------------------------------------------------------------------
async def get_map_color_thresholds(db: aiosqlite.Connection) -> MapColorThresholdsResponse:
"""Retrieve the current map color threshold configuration.
Args:
db: Active aiosqlite connection to the application database.
Returns:
A :class:`MapColorThresholdsResponse` containing the three threshold values.
"""
high, medium, low = await util_get_map_color_thresholds(db)
return MapColorThresholdsResponse(
threshold_high=high,
threshold_medium=medium,
threshold_low=low,
)
async def update_map_color_thresholds(
db: aiosqlite.Connection,
update: MapColorThresholdsUpdate,
) -> None:
"""Update the map color threshold configuration.
Args:
db: Active aiosqlite connection to the application database.
update: The new threshold values.
Raises:
ValueError: If validation fails (thresholds must satisfy high > medium > low).
"""
await util_set_map_color_thresholds(
db,
threshold_high=update.threshold_high,
threshold_medium=update.threshold_medium,
threshold_low=update.threshold_low,
)
# ---------------------------------------------------------------------------
# fail2ban log file reader
# ---------------------------------------------------------------------------
# Log targets that are not file paths — log viewing is unavailable for these.
_NON_FILE_LOG_TARGETS: frozenset[str] = frozenset(
{"STDOUT", "STDERR", "SYSLOG", "SYSTEMD-JOURNAL"}
)
# Only allow reading log files under these base directories (security).
_SAFE_LOG_PREFIXES: tuple[str, ...] = ("/var/log", "/config/log")