605 lines
22 KiB
Python
605 lines
22 KiB
Python
"""Configuration inspection and editing service.
|
|
|
|
Provides methods to read and update fail2ban jail configuration and global
|
|
server settings via the Unix domain socket. Regex validation is performed
|
|
locally with Python's :mod:`re` module before any write is sent to the daemon
|
|
so that invalid patterns are rejected early.
|
|
|
|
Architecture note: this module is a pure service — it contains **no**
|
|
HTTP/FastAPI concerns. All results are returned as Pydantic models so
|
|
routers can serialise them directly.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import contextlib
|
|
import re
|
|
from typing import TYPE_CHECKING, TypeVar, cast
|
|
|
|
import structlog
|
|
|
|
from app.utils.fail2ban_client import Fail2BanCommand, Fail2BanToken
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Awaitable, Callable
|
|
|
|
import aiosqlite
|
|
|
|
from app.exceptions import ConfigOperationError, ConfigValidationError, JailNotFoundError
|
|
from app.models.config import (
|
|
AddLogPathRequest,
|
|
BantimeEscalation,
|
|
GlobalConfigResponse,
|
|
GlobalConfigUpdate,
|
|
JailConfig,
|
|
JailConfigListResponse,
|
|
JailConfigResponse,
|
|
JailConfigUpdate,
|
|
LogPreviewRequest,
|
|
LogPreviewResponse,
|
|
MapColorThresholdsResponse,
|
|
MapColorThresholdsUpdate,
|
|
RegexTestRequest,
|
|
RegexTestResponse,
|
|
)
|
|
from app.services.log_service import preview_log as util_preview_log
|
|
from app.services.log_service import test_regex as util_test_regex
|
|
from app.services.settings_service import (
|
|
get_map_color_thresholds as util_get_map_color_thresholds,
|
|
)
|
|
from app.services.settings_service import (
|
|
set_map_color_thresholds as util_set_map_color_thresholds,
|
|
)
|
|
from app.utils.constants import FAIL2BAN_SOCKET_TIMEOUT
|
|
from app.utils.fail2ban_client import Fail2BanClient
|
|
from app.utils.fail2ban_response import (
|
|
ensure_list,
|
|
is_not_found_error,
|
|
ok,
|
|
to_dict,
|
|
)
|
|
from app.utils.path_utils import validate_log_target
|
|
|
|
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Custom exceptions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# (exceptions are now defined in app.exceptions and imported above)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
async def _safe_get(
|
|
client: Fail2BanClient,
|
|
command: Fail2BanCommand,
|
|
default: object | None = None,
|
|
) -> object | None:
|
|
"""Send a command and return *default* if it fails."""
|
|
try:
|
|
return ok(await client.send(command))
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
async def _safe_get_typed[T](
|
|
client: Fail2BanClient,
|
|
command: Fail2BanCommand,
|
|
default: T,
|
|
) -> T:
|
|
"""Send a command and return the result typed as ``default``'s type."""
|
|
return cast("T", await _safe_get(client, command, default))
|
|
|
|
|
|
def _validate_regex(pattern: str) -> str | None:
|
|
"""Try to compile *pattern* and return an error message if invalid.
|
|
|
|
Args:
|
|
pattern: A regex pattern string to validate.
|
|
|
|
Returns:
|
|
``None`` if valid, or an error message string if the pattern is broken.
|
|
"""
|
|
try:
|
|
re.compile(pattern)
|
|
return None
|
|
except re.error as exc:
|
|
return str(exc)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API — read jail configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def get_jail_config(socket_path: str, name: str) -> JailConfigResponse:
|
|
"""Return the editable configuration for a single jail.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
name: Jail name.
|
|
|
|
Returns:
|
|
:class:`~app.models.config.JailConfigResponse`.
|
|
|
|
Raises:
|
|
JailNotFoundError: If *name* is not a known jail.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
|
|
|
|
# Verify existence.
|
|
try:
|
|
ok(await client.send(["status", name, "short"]))
|
|
except ValueError as exc:
|
|
if is_not_found_error(exc):
|
|
raise JailNotFoundError(name) from exc
|
|
raise
|
|
|
|
bantime_raw: int = await _safe_get_typed(client, ["get", name, "bantime"], 600)
|
|
findtime_raw: int = await _safe_get_typed(client, ["get", name, "findtime"], 600)
|
|
maxretry_raw: int = await _safe_get_typed(client, ["get", name, "maxretry"], 5)
|
|
failregex_raw: list[object] = await _safe_get_typed(client, ["get", name, "failregex"], [])
|
|
ignoreregex_raw: list[object] = await _safe_get_typed(client, ["get", name, "ignoreregex"], [])
|
|
logpath_raw: list[object] = await _safe_get_typed(client, ["get", name, "logpath"], [])
|
|
datepattern_raw: str | None = await _safe_get_typed(client, ["get", name, "datepattern"], None)
|
|
logencoding_raw: str = await _safe_get_typed(client, ["get", name, "logencoding"], "UTF-8")
|
|
backend_raw: str = await _safe_get_typed(client, ["get", name, "backend"], "polling")
|
|
usedns_raw: str = await _safe_get_typed(client, ["get", name, "usedns"], "warn")
|
|
prefregex_raw: str = await _safe_get_typed(client, ["get", name, "prefregex"], "")
|
|
actions_raw: list[object] = await _safe_get_typed(client, ["get", name, "actions"], [])
|
|
bt_increment_raw: bool = await _safe_get_typed(client, ["get", name, "bantime.increment"], False)
|
|
bt_factor_raw: str | float | None = await _safe_get_typed(client, ["get", name, "bantime.factor"], None)
|
|
bt_formula_raw: str | None = await _safe_get_typed(client, ["get", name, "bantime.formula"], None)
|
|
bt_multipliers_raw: str | None = await _safe_get_typed(client, ["get", name, "bantime.multipliers"], None)
|
|
bt_maxtime_raw: str | int | None = await _safe_get_typed(client, ["get", name, "bantime.maxtime"], None)
|
|
bt_rndtime_raw: str | int | None = await _safe_get_typed(client, ["get", name, "bantime.rndtime"], None)
|
|
bt_overalljails_raw: bool = await _safe_get_typed(client, ["get", name, "bantime.overalljails"], False)
|
|
|
|
bantime_escalation = BantimeEscalation(
|
|
increment=bool(bt_increment_raw),
|
|
factor=float(bt_factor_raw) if bt_factor_raw is not None else None,
|
|
formula=str(bt_formula_raw) if bt_formula_raw else None,
|
|
multipliers=str(bt_multipliers_raw) if bt_multipliers_raw else None,
|
|
max_time=int(bt_maxtime_raw) if bt_maxtime_raw is not None else None,
|
|
rnd_time=int(bt_rndtime_raw) if bt_rndtime_raw is not None else None,
|
|
overall_jails=bool(bt_overalljails_raw),
|
|
)
|
|
|
|
jail_cfg = JailConfig(
|
|
name=name,
|
|
ban_time=int(bantime_raw or 600),
|
|
find_time=int(findtime_raw or 600),
|
|
max_retry=int(maxretry_raw or 5),
|
|
fail_regex=ensure_list(failregex_raw),
|
|
ignore_regex=ensure_list(ignoreregex_raw),
|
|
log_paths=ensure_list(logpath_raw),
|
|
date_pattern=str(datepattern_raw) if datepattern_raw else None,
|
|
log_encoding=str(logencoding_raw or "UTF-8"),
|
|
backend=str(backend_raw or "polling"),
|
|
use_dns=str(usedns_raw or "warn"),
|
|
prefregex=str(prefregex_raw) if prefregex_raw else "",
|
|
actions=ensure_list(actions_raw),
|
|
bantime_escalation=bantime_escalation,
|
|
)
|
|
|
|
log.info("jail_config_fetched", jail=name)
|
|
return JailConfigResponse(jail=jail_cfg)
|
|
|
|
|
|
async def list_jail_configs(socket_path: str) -> JailConfigListResponse:
|
|
"""Return configuration for all active jails.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
|
|
Returns:
|
|
:class:`~app.models.config.JailConfigListResponse`.
|
|
|
|
Raises:
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
|
|
|
|
global_status = to_dict(ok(await client.send(["status"])))
|
|
jail_list_raw: str = str(global_status.get("Jail list", "") or "").strip()
|
|
jail_names: list[str] = (
|
|
[j.strip() for j in jail_list_raw.split(",") if j.strip()]
|
|
if jail_list_raw
|
|
else []
|
|
)
|
|
|
|
if not jail_names:
|
|
return JailConfigListResponse(items=[], total=0)
|
|
|
|
responses: list[JailConfigResponse] = await asyncio.gather(
|
|
*[get_jail_config(socket_path, name) for name in jail_names],
|
|
return_exceptions=False,
|
|
)
|
|
|
|
jails = [r.jail for r in responses]
|
|
log.info("jail_configs_listed", count=len(jails))
|
|
return JailConfigListResponse(items=jails, total=len(jails))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API — write jail configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def update_jail_config(
|
|
socket_path: str,
|
|
name: str,
|
|
update: JailConfigUpdate,
|
|
) -> None:
|
|
"""Apply *update* to the configuration of a running jail.
|
|
|
|
Each non-None field in *update* is sent as a separate ``set`` command.
|
|
Regex patterns are validated locally before any write is sent.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
name: Jail name.
|
|
update: Partial update payload.
|
|
|
|
Raises:
|
|
JailNotFoundError: If *name* is not a known jail.
|
|
ConfigValidationError: If a regex pattern fails to compile.
|
|
ConfigOperationError: If a ``set`` command is rejected by fail2ban.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
|
|
"""
|
|
# Validate all regex patterns before touching the daemon.
|
|
for pattern_list, field in [
|
|
(update.fail_regex, "fail_regex"),
|
|
(update.ignore_regex, "ignore_regex"),
|
|
]:
|
|
if pattern_list is None:
|
|
continue
|
|
for pattern in pattern_list:
|
|
err = _validate_regex(pattern)
|
|
if err:
|
|
raise ConfigValidationError(f"Invalid regex in {field!r}: {err!r} (pattern: {pattern!r})")
|
|
if update.prefregex is not None and update.prefregex:
|
|
err = _validate_regex(update.prefregex)
|
|
if err:
|
|
raise ConfigValidationError(f"Invalid regex in 'prefregex': {err!r} (pattern: {update.prefregex!r})")
|
|
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
|
|
|
|
# Verify existence.
|
|
try:
|
|
ok(await client.send(["status", name, "short"]))
|
|
except ValueError as exc:
|
|
if is_not_found_error(exc):
|
|
raise JailNotFoundError(name) from exc
|
|
raise
|
|
|
|
async def _set(key: str, value: Fail2BanToken) -> None:
|
|
try:
|
|
ok(await client.send(["set", name, key, value]))
|
|
except ValueError as exc:
|
|
raise ConfigOperationError(f"Failed to set {key!r} = {value!r}: {exc}") from exc
|
|
|
|
if update.ban_time is not None:
|
|
await _set("bantime", update.ban_time)
|
|
if update.find_time is not None:
|
|
await _set("findtime", update.find_time)
|
|
if update.max_retry is not None:
|
|
await _set("maxretry", update.max_retry)
|
|
if update.date_pattern is not None:
|
|
await _set("datepattern", update.date_pattern)
|
|
if update.dns_mode is not None:
|
|
await _set("usedns", update.dns_mode)
|
|
# backend is managed by fail2ban and cannot be changed at runtime by API.
|
|
# This field is therefore ignored during updates.
|
|
if update.log_encoding is not None:
|
|
await _set("logencoding", update.log_encoding)
|
|
if update.prefregex is not None:
|
|
await _set("prefregex", update.prefregex)
|
|
if update.enabled is not None:
|
|
await _set("idle", "off" if update.enabled else "on")
|
|
|
|
# Ban-time escalation fields.
|
|
if update.bantime_escalation is not None:
|
|
esc = update.bantime_escalation
|
|
if esc.increment is not None:
|
|
await _set("bantime.increment", "true" if esc.increment else "false")
|
|
if esc.factor is not None:
|
|
await _set("bantime.factor", str(esc.factor))
|
|
if esc.formula is not None:
|
|
await _set("bantime.formula", esc.formula)
|
|
if esc.multipliers is not None:
|
|
await _set("bantime.multipliers", esc.multipliers)
|
|
if esc.max_time is not None:
|
|
await _set("bantime.maxtime", esc.max_time)
|
|
if esc.rnd_time is not None:
|
|
await _set("bantime.rndtime", esc.rnd_time)
|
|
if esc.overall_jails is not None:
|
|
await _set("bantime.overalljails", "true" if esc.overall_jails else "false")
|
|
|
|
# Replacing regex lists requires deleting old entries then adding new ones.
|
|
if update.fail_regex is not None:
|
|
await _replace_regex_list(client, name, "failregex", update.fail_regex)
|
|
if update.ignore_regex is not None:
|
|
await _replace_regex_list(client, name, "ignoreregex", update.ignore_regex)
|
|
|
|
log.info("jail_config_updated", jail=name)
|
|
|
|
|
|
async def _replace_regex_list(
|
|
client: Fail2BanClient,
|
|
jail: str,
|
|
field: str,
|
|
new_patterns: list[str],
|
|
) -> None:
|
|
"""Replace the full regex list for *field* in *jail*.
|
|
|
|
Deletes all existing entries (highest index first to preserve ordering)
|
|
then inserts all *new_patterns* in order.
|
|
|
|
Args:
|
|
client: Shared :class:`~app.utils.fail2ban_client.Fail2BanClient`.
|
|
jail: Jail name.
|
|
field: Either ``"failregex"`` or ``"ignoreregex"``.
|
|
new_patterns: Replacement list (may be empty to clear).
|
|
"""
|
|
# Determine current count.
|
|
current_raw: list[object] = await _safe_get_typed(client, ["get", jail, field], [])
|
|
current: list[str] = ensure_list(current_raw)
|
|
|
|
del_cmd = f"del{field}"
|
|
add_cmd = f"add{field}"
|
|
|
|
# Delete in reverse order so indices stay stable.
|
|
for idx in range(len(current) - 1, -1, -1):
|
|
with contextlib.suppress(ValueError):
|
|
ok(await client.send(["set", jail, del_cmd, idx]))
|
|
|
|
# Add new patterns.
|
|
for pattern in new_patterns:
|
|
err = _validate_regex(pattern)
|
|
if err:
|
|
raise ConfigValidationError(f"Invalid regex: {err!r} (pattern: {pattern!r})")
|
|
try:
|
|
ok(await client.send(["set", jail, add_cmd, pattern]))
|
|
except ValueError as exc:
|
|
raise ConfigOperationError(f"Failed to add {field} pattern: {exc}") from exc
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API — global configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def get_global_config(socket_path: str) -> GlobalConfigResponse:
|
|
"""Return fail2ban global configuration settings.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
|
|
Returns:
|
|
:class:`~app.models.config.GlobalConfigResponse`.
|
|
|
|
Raises:
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
|
|
|
|
(
|
|
log_level_raw,
|
|
log_target_raw,
|
|
db_purge_age_raw,
|
|
db_max_matches_raw,
|
|
) = await asyncio.gather(
|
|
_safe_get_typed(client, ["get", "loglevel"], "INFO"),
|
|
_safe_get_typed(client, ["get", "logtarget"], "STDOUT"),
|
|
_safe_get_typed(client, ["get", "dbpurgeage"], 86400),
|
|
_safe_get_typed(client, ["get", "dbmaxmatches"], 10),
|
|
)
|
|
|
|
return GlobalConfigResponse(
|
|
log_level=str(log_level_raw or "INFO").upper(),
|
|
log_target=str(log_target_raw or "STDOUT"),
|
|
db_purge_age=int(db_purge_age_raw or 86400),
|
|
db_max_matches=int(db_max_matches_raw or 10),
|
|
)
|
|
|
|
|
|
async def update_global_config(socket_path: str, update: GlobalConfigUpdate) -> None:
|
|
"""Apply *update* to fail2ban global settings.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
update: Partial update payload.
|
|
|
|
Raises:
|
|
ConfigOperationError: If a ``set`` command is rejected.
|
|
ConfigValidationError: If log_target validation fails.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
|
|
"""
|
|
if update.log_target is not None:
|
|
try:
|
|
validate_log_target(update.log_target)
|
|
except ValueError as e:
|
|
raise ConfigValidationError(str(e)) from e
|
|
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
|
|
|
|
async def _set_global(key: str, value: Fail2BanToken) -> None:
|
|
try:
|
|
ok(await client.send(["set", key, value]))
|
|
except ValueError as exc:
|
|
raise ConfigOperationError(f"Failed to set global {key!r} = {value!r}: {exc}") from exc
|
|
|
|
if update.log_level is not None:
|
|
await _set_global("loglevel", update.log_level.upper())
|
|
if update.log_target is not None:
|
|
await _set_global("logtarget", update.log_target)
|
|
if update.db_purge_age is not None:
|
|
await _set_global("dbpurgeage", update.db_purge_age)
|
|
if update.db_max_matches is not None:
|
|
await _set_global("dbmaxmatches", update.db_max_matches)
|
|
|
|
log.info("global_config_updated")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API — regex tester (stateless, no socket)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_regex(request: RegexTestRequest) -> RegexTestResponse:
|
|
"""Proxy to log utilities for regex test without service imports."""
|
|
return util_test_regex(request)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API — log observation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def add_log_path(
|
|
socket_path: str,
|
|
jail: str,
|
|
req: AddLogPathRequest,
|
|
) -> None:
|
|
"""Add a log path to an existing jail.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
jail: Jail name to which the log path should be added.
|
|
req: :class:`~app.models.config.AddLogPathRequest` with the path to add.
|
|
|
|
Raises:
|
|
JailNotFoundError: If *jail* is not a known jail.
|
|
ConfigOperationError: If the command is rejected by fail2ban.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
|
|
|
|
try:
|
|
ok(await client.send(["status", jail, "short"]))
|
|
except ValueError as exc:
|
|
if is_not_found_error(exc):
|
|
raise JailNotFoundError(jail) from exc
|
|
raise
|
|
|
|
tail_flag = "tail" if req.tail else "head"
|
|
try:
|
|
ok(await client.send(["set", jail, "addlogpath", req.log_path, tail_flag]))
|
|
log.info("log_path_added", jail=jail, path=req.log_path)
|
|
except ValueError as exc:
|
|
raise ConfigOperationError(f"Failed to add log path {req.log_path!r}: {exc}") from exc
|
|
|
|
|
|
async def delete_log_path(
|
|
socket_path: str,
|
|
jail: str,
|
|
log_path: str,
|
|
) -> None:
|
|
"""Remove a monitored log path from an existing jail.
|
|
|
|
Uses ``set <jail> dellogpath <path>`` to remove the path at runtime
|
|
without requiring a daemon restart.
|
|
|
|
Args:
|
|
socket_path: Path to the fail2ban Unix domain socket.
|
|
jail: Jail name from which the log path should be removed.
|
|
log_path: Absolute path of the log file to stop monitoring.
|
|
|
|
Raises:
|
|
JailNotFoundError: If *jail* is not a known jail.
|
|
ConfigOperationError: If the command is rejected by fail2ban.
|
|
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
|
|
"""
|
|
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
|
|
|
|
try:
|
|
ok(await client.send(["status", jail, "short"]))
|
|
except ValueError as exc:
|
|
if is_not_found_error(exc):
|
|
raise JailNotFoundError(jail) from exc
|
|
raise
|
|
|
|
try:
|
|
ok(await client.send(["set", jail, "dellogpath", log_path]))
|
|
log.info("log_path_deleted", jail=jail, path=log_path)
|
|
except ValueError as exc:
|
|
raise ConfigOperationError(f"Failed to delete log path {log_path!r}: {exc}") from exc
|
|
|
|
|
|
async def preview_log(
|
|
req: LogPreviewRequest,
|
|
preview_fn: Callable[[LogPreviewRequest], Awaitable[LogPreviewResponse]] | None = None,
|
|
) -> LogPreviewResponse:
|
|
"""Proxy to an injectable log preview function."""
|
|
if preview_fn is None:
|
|
preview_fn = util_preview_log
|
|
return await preview_fn(req)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Map color thresholds
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def get_map_color_thresholds(db: aiosqlite.Connection) -> MapColorThresholdsResponse:
|
|
"""Retrieve the current map color threshold configuration.
|
|
|
|
Args:
|
|
db: Active aiosqlite connection to the application database.
|
|
|
|
Returns:
|
|
A :class:`MapColorThresholdsResponse` containing the three threshold values.
|
|
"""
|
|
high, medium, low = await util_get_map_color_thresholds(db)
|
|
return MapColorThresholdsResponse(
|
|
threshold_high=high,
|
|
threshold_medium=medium,
|
|
threshold_low=low,
|
|
)
|
|
|
|
|
|
async def update_map_color_thresholds(
|
|
db: aiosqlite.Connection,
|
|
update: MapColorThresholdsUpdate,
|
|
) -> None:
|
|
"""Update the map color threshold configuration.
|
|
|
|
Args:
|
|
db: Active aiosqlite connection to the application database.
|
|
update: The new threshold values.
|
|
|
|
Raises:
|
|
ValueError: If validation fails (thresholds must satisfy high > medium > low).
|
|
"""
|
|
await util_set_map_color_thresholds(
|
|
db,
|
|
threshold_high=update.threshold_high,
|
|
threshold_medium=update.threshold_medium,
|
|
threshold_low=update.threshold_low,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# fail2ban log file reader
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Log targets that are not file paths — log viewing is unavailable for these.
|
|
_NON_FILE_LOG_TARGETS: frozenset[str] = frozenset(
|
|
{"STDOUT", "STDERR", "SYSLOG", "SYSTEMD-JOURNAL"}
|
|
)
|
|
|
|
# Only allow reading log files under these base directories (security).
|
|
_SAFE_LOG_PREFIXES: tuple[str, ...] = ("/var/log", "/config/log")
|
|
|
|
|