Refactor map color threshold storage into dedicated settings service

This commit is contained in:
2026-04-17 15:13:07 +02:00
parent 13b3fde274
commit c21cf82e9e
11 changed files with 467 additions and 349 deletions

View File

@@ -13,10 +13,8 @@ routers can serialise them directly.
from __future__ import annotations
import asyncio
from app.utils.async_utils import run_blocking
import contextlib
import re
from pathlib import Path
from typing import TYPE_CHECKING, TypeVar, cast
import structlog
@@ -28,14 +26,10 @@ if TYPE_CHECKING:
import aiosqlite
from app import __version__
from app.exceptions import ConfigOperationError, ConfigValidationError, JailNotFoundError
from app.services.log_service import preview_log as util_preview_log
from app.services.log_service import test_regex as util_test_regex
from app.models.config import (
AddLogPathRequest,
BantimeEscalation,
Fail2BanLogResponse,
GlobalConfigResponse,
GlobalConfigUpdate,
JailConfig,
@@ -48,15 +42,16 @@ from app.models.config import (
MapColorThresholdsUpdate,
RegexTestRequest,
RegexTestResponse,
ServiceStatusResponse,
)
from app.utils.fail2ban_client import Fail2BanClient
from app.utils.setup_utils import (
from app.services.log_service import preview_log as util_preview_log
from app.services.log_service import test_regex as util_test_regex
from app.services.settings_service import (
get_map_color_thresholds as util_get_map_color_thresholds,
)
from app.utils.setup_utils import (
from app.services.settings_service import (
set_map_color_thresholds as util_set_map_color_thresholds,
)
from app.utils.fail2ban_client import Fail2BanClient
log: structlog.stdlib.BoundLogger = structlog.get_logger()
@@ -649,181 +644,3 @@ _NON_FILE_LOG_TARGETS: frozenset[str] = frozenset(
_SAFE_LOG_PREFIXES: tuple[str, ...] = ("/var/log", "/config/log")
def _count_file_lines(file_path: str) -> int:
"""Count the total number of lines in *file_path* synchronously."""
count = 0
with open(file_path, "rb") as fh:
for chunk in iter(lambda: fh.read(65536), b""):
count += chunk.count(b"\n")
return count
def _read_tail_lines(file_path: str, num_lines: int) -> list[str]:
"""Read the last *num_lines* from *file_path* in a memory-efficient way."""
chunk_size = 8192
raw_lines: list[bytes] = []
with open(file_path, "rb") as fh:
fh.seek(0, 2)
end_pos = fh.tell()
if end_pos == 0:
return []
buf = b""
pos = end_pos
while len(raw_lines) <= num_lines and pos > 0:
read_size = min(chunk_size, pos)
pos -= read_size
fh.seek(pos)
chunk = fh.read(read_size)
buf = chunk + buf
raw_lines = buf.split(b"\n")
if pos > 0 and len(raw_lines) > 1:
raw_lines = raw_lines[1:]
return [ln.decode("utf-8", errors="replace").rstrip() for ln in raw_lines[-num_lines:] if ln.strip()]
async def read_fail2ban_log(
socket_path: str,
lines: int,
filter_text: str | None = None,
) -> Fail2BanLogResponse:
"""Read the tail of the fail2ban daemon log file.
Queries the fail2ban socket for the current log target and log level,
validates that the target is a readable file, then returns the last
*lines* entries optionally filtered by *filter_text*.
Security: the resolved log path is rejected unless it starts with one of
the paths in :data:`_SAFE_LOG_PREFIXES`, preventing path traversal.
Args:
socket_path: Path to the fail2ban Unix domain socket.
lines: Number of lines to return from the tail of the file (12000).
filter_text: Optional plain-text substring — only matching lines are
returned. Applied server-side; does not affect *total_lines*.
Returns:
:class:`~app.models.config.Fail2BanLogResponse`.
Raises:
ConfigOperationError: When the log target is not a file, when the
resolved path is outside the allowed directories, or when the
file cannot be read.
~app.utils.fail2ban_client.Fail2BanConnectionError: Socket unreachable.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
log_level_raw, log_target_raw = await asyncio.gather(
_safe_get_typed(client, ["get", "loglevel"], "INFO"),
_safe_get_typed(client, ["get", "logtarget"], "STDOUT"),
)
log_level = str(log_level_raw or "INFO").upper()
log_target = str(log_target_raw or "STDOUT")
# Reject non-file targets up front.
if log_target.upper() in _NON_FILE_LOG_TARGETS:
raise ConfigOperationError(
f"fail2ban is logging to {log_target!r}. "
"File-based log viewing is only available when fail2ban logs to a file path."
)
# Resolve and validate (security: no path traversal outside safe dirs).
try:
resolved = Path(log_target).resolve()
except (ValueError, OSError) as exc:
raise ConfigOperationError(
f"Cannot resolve log target path {log_target!r}: {exc}"
) from exc
resolved_str = str(resolved)
if not any(resolved_str.startswith(safe) for safe in _SAFE_LOG_PREFIXES):
raise ConfigOperationError(
f"Log path {resolved_str!r} is outside the allowed directory. "
"Only paths under /var/log or /config/log are permitted."
)
if not resolved.is_file():
raise ConfigOperationError(f"Log file not found: {resolved_str!r}")
loop = asyncio.get_event_loop()
total_lines, raw_lines = await asyncio.gather(
run_blocking( _count_file_lines, resolved_str),
run_blocking( _read_tail_lines, resolved_str, lines),
)
filtered = (
[ln for ln in raw_lines if filter_text in ln]
if filter_text
else raw_lines
)
log.info(
"fail2ban_log_read",
log_path=resolved_str,
lines_requested=lines,
lines_returned=len(filtered),
filter_active=filter_text is not None,
)
return Fail2BanLogResponse(
log_path=resolved_str,
lines=filtered,
total_lines=total_lines,
log_level=log_level,
log_target=log_target,
)
async def get_service_status(
socket_path: str,
probe_fn: Callable[[str], Awaitable[ServiceStatusResponse]] | None = None,
) -> ServiceStatusResponse:
"""Return fail2ban service health status with log configuration.
Delegates to an injectable *probe_fn* (defaults to
:func:`~app.services.health_service.probe`). This avoids direct service-to-
service imports inside this module.
Args:
socket_path: Path to the fail2ban Unix domain socket.
probe_fn: Optional probe function.
Returns:
:class:`~app.models.config.ServiceStatusResponse`.
"""
if probe_fn is None:
raise ValueError("probe_fn is required to avoid service-to-service coupling")
server_status = await probe_fn(socket_path)
if server_status.online:
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
log_level_raw, log_target_raw = await asyncio.gather(
_safe_get_typed(client, ["get", "loglevel"], "INFO"),
_safe_get_typed(client, ["get", "logtarget"], "STDOUT"),
)
log_level = str(log_level_raw or "INFO").upper()
log_target = str(log_target_raw or "STDOUT")
else:
log_level = "UNKNOWN"
log_target = "UNKNOWN"
log.info(
"service_status_fetched",
online=server_status.online,
jail_count=server_status.active_jails,
)
return ServiceStatusResponse(
online=server_status.online,
version=__version__,
jail_count=server_status.active_jails,
total_bans=server_status.total_bans,
total_failures=server_status.total_failures,
log_level=log_level,
log_target=log_target,
)

View File

@@ -9,13 +9,18 @@ seconds by the background health-check task, not on every HTTP request.
from __future__ import annotations
from typing import cast
import asyncio
from collections.abc import Awaitable, Callable
from typing import TypeVar, cast
import structlog
from app import __version__
from app.models.config import ServiceStatusResponse
from app.models.server import ServerStatus
from app.utils.fail2ban_client import (
Fail2BanClient,
Fail2BanCommand,
Fail2BanConnectionError,
Fail2BanProtocolError,
Fail2BanResponse,
@@ -49,7 +54,9 @@ def _ok(response: object) -> object:
try:
code, data = cast("Fail2BanResponse", response)
except (TypeError, ValueError) as exc:
raise ValueError(f"Unexpected fail2ban response shape: {response!r}") from exc
raise ValueError(
f"Unexpected fail2ban response shape: {response!r}"
) from exc
if code != 0:
raise ValueError(f"fail2ban returned error code {code}: {data!r}")
@@ -81,13 +88,101 @@ def _to_dict(pairs: object) -> dict[str, object]:
return result
T = TypeVar("T")
async def _safe_get(
client: Fail2BanClient,
command: Fail2BanCommand,
default: object | None = None,
) -> object | None:
"""Send a command and return *default* if it fails."""
try:
return _ok(await client.send(command))
except (
Fail2BanConnectionError,
Fail2BanProtocolError,
ValueError,
OSError,
):
return default
async def _safe_get_typed(
client: Fail2BanClient,
command: Fail2BanCommand,
default: T,
) -> T:
"""Send a command and return the result typed as ``default``'s type."""
return cast("T", await _safe_get(client, command, default))
async def get_service_status(
socket_path: str,
probe_fn: Callable[[str], Awaitable[ServerStatus]] | None = None,
) -> ServiceStatusResponse:
"""Return fail2ban service health status with log configuration.
Delegates to an injectable *probe_fn* (defaults to
:func:`~app.services.health_service.probe`).
Args:
socket_path: Path to the fail2ban Unix domain socket.
probe_fn: Optional probe function.
Returns:
:class:`~app.models.config.ServiceStatusResponse`.
"""
if probe_fn is None:
raise ValueError(
"probe_fn is required to avoid service-to-service coupling"
)
server_status = await probe_fn(socket_path)
if server_status.online:
client = Fail2BanClient(
socket_path=socket_path,
timeout=_SOCKET_TIMEOUT,
)
log_level_raw, log_target_raw = await asyncio.gather(
_safe_get_typed(client, ["get", "loglevel"], "INFO"),
_safe_get_typed(client, ["get", "logtarget"], "STDOUT"),
)
log_level = str(log_level_raw or "INFO").upper()
log_target = str(log_target_raw or "STDOUT")
else:
log_level = "UNKNOWN"
log_target = "UNKNOWN"
log.info(
"service_status_fetched",
online=server_status.online,
jail_count=server_status.active_jails,
)
return ServiceStatusResponse(
online=server_status.online,
version=__version__,
jail_count=server_status.active_jails,
total_bans=server_status.total_bans,
total_failures=server_status.total_failures,
log_level=log_level,
log_target=log_target,
)
# ---------------------------------------------------------------------------
# Public interface
# ---------------------------------------------------------------------------
async def probe(socket_path: str, timeout: float = _SOCKET_TIMEOUT) -> ServerStatus:
"""Probe the fail2ban daemon and return a :class:`~app.models.server.ServerStatus`.
async def probe(
socket_path: str,
timeout: float = _SOCKET_TIMEOUT,
) -> ServerStatus:
"""Probe the fail2ban daemon and return a
:class:`~app.models.server.ServerStatus`.
Sends ``ping``, ``version``, ``status``, and per-jail ``status <jail>``
commands. Any socket or protocol error is caught and results in an
@@ -109,11 +204,14 @@ async def probe(socket_path: str, timeout: float = _SOCKET_TIMEOUT) -> ServerSta
# ------------------------------------------------------------------ #
ping_data = _ok(await client.send(["ping"]))
if ping_data != "pong":
log.warning("fail2ban_unexpected_ping_response", response=ping_data)
log.warning(
"fail2ban_unexpected_ping_response",
response=ping_data,
)
return ServerStatus(online=False)
# ------------------------------------------------------------------ #
# 2. Version #
# 2. Version
# ------------------------------------------------------------------ #
try:
version: str | None = str(_ok(await client.send(["version"])))
@@ -125,7 +223,9 @@ async def probe(socket_path: str, timeout: float = _SOCKET_TIMEOUT) -> ServerSta
# ------------------------------------------------------------------ #
status_data = _to_dict(_ok(await client.send(["status"])))
active_jails: int = int(str(status_data.get("Number of jail", 0) or 0))
jail_list_raw: str = str(status_data.get("Jail list", "") or "").strip()
jail_list_raw: str = str(
status_data.get("Jail list", "") or ""
).strip()
jail_names: list[str] = (
[j.strip() for j in jail_list_raw.split(",") if j.strip()]
if jail_list_raw
@@ -140,11 +240,17 @@ async def probe(socket_path: str, timeout: float = _SOCKET_TIMEOUT) -> ServerSta
for jail_name in jail_names:
try:
jail_resp = _to_dict(_ok(await client.send(["status", jail_name])))
jail_resp = _to_dict(
_ok(await client.send(["status", jail_name]))
)
filter_stats = _to_dict(jail_resp.get("Filter") or [])
action_stats = _to_dict(jail_resp.get("Actions") or [])
total_failures += int(str(filter_stats.get("Currently failed", 0) or 0))
total_bans += int(str(action_stats.get("Currently banned", 0) or 0))
total_failures += int(
str(filter_stats.get("Currently failed", 0) or 0)
)
total_bans += int(
str(action_stats.get("Currently banned", 0) or 0)
)
except (ValueError, TypeError, KeyError) as exc:
log.warning(
"fail2ban_jail_status_parse_error",
@@ -174,5 +280,3 @@ async def probe(socket_path: str, timeout: float = _SOCKET_TIMEOUT) -> ServerSta
except ValueError as exc:
log.error("fail2ban_probe_parse_error", error=str(exc))
return ServerStatus(online=False)

View File

@@ -1,22 +1,168 @@
"""Log helper service.
Contains regex test and log preview helpers that are independent of
fail2ban socket operations.
Contains regex test, log preview, and fail2ban log reading helpers.
"""
from __future__ import annotations
from app.utils.async_utils import run_blocking
import asyncio
import re
import structlog
from pathlib import Path
from typing import TypeVar, cast
from app.exceptions import ConfigOperationError
from app.models.config import (
Fail2BanLogResponse,
LogPreviewLine,
LogPreviewRequest,
LogPreviewResponse,
RegexTestRequest,
RegexTestResponse,
)
from app.utils.async_utils import run_blocking
from app.utils.fail2ban_client import (
Fail2BanClient,
Fail2BanConnectionError,
Fail2BanProtocolError,
Fail2BanResponse,
)
log: structlog.stdlib.BoundLogger = structlog.get_logger()
_SOCKET_TIMEOUT: float = 10.0
_NON_FILE_LOG_TARGETS: frozenset[str] = frozenset(
{"STDOUT", "STDERR", "SYSLOG", "SYSTEMD-JOURNAL"}
)
_SAFE_LOG_PREFIXES: tuple[str, ...] = ("/var/log", "/config/log")
def _ok(response: object) -> object:
"""Extract the payload from a fail2ban ``(return_code, data)`` response."""
try:
code, data = cast(Fail2BanResponse, response)
except (TypeError, ValueError) as exc:
raise ValueError(
f"Unexpected fail2ban response shape: {response!r}"
) from exc
if code != 0:
raise ValueError(f"fail2ban returned error code {code}: {data!r}")
return data
def _count_file_lines(file_path: str) -> int:
"""Count the total number of lines in *file_path* synchronously."""
count = 0
with open(file_path, "rb") as fh:
for chunk in iter(lambda: fh.read(65536), b""):
count += chunk.count(b"\n")
return count
async def _safe_get(
client: Fail2BanClient,
command: list[str],
default: object | None = None,
) -> object | None:
"""Send a command and return *default* if it fails."""
try:
return _ok(await client.send(command))
except (
Fail2BanConnectionError,
Fail2BanProtocolError,
OSError,
ValueError,
):
return default
T = TypeVar("T")
async def _safe_get_typed(
client: Fail2BanClient,
command: list[str],
default: T,
) -> T:
"""Send a command and return the result typed as ``default``'s type."""
return cast("T", await _safe_get(client, command, default))
async def read_fail2ban_log(
socket_path: str,
lines: int,
filter_text: str | None = None,
) -> Fail2BanLogResponse:
"""Read the tail of the fail2ban daemon log file.
Queries the fail2ban socket for the current log target and log level,
validates that the target is a readable file, then returns the last
*lines* entries optionally filtered by *filter_text*.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
log_level_raw, log_target_raw = await asyncio.gather(
_safe_get_typed(client, ["get", "loglevel"], "INFO"),
_safe_get_typed(client, ["get", "logtarget"], "STDOUT"),
)
log_level = str(log_level_raw or "INFO").upper()
log_target = str(log_target_raw or "STDOUT")
if log_target.upper() in _NON_FILE_LOG_TARGETS:
raise ConfigOperationError(
f"fail2ban is logging to {log_target!r}. "
"File-based log viewing is only available when fail2ban logs "
"to a file path."
)
try:
resolved = Path(log_target).resolve()
except (ValueError, OSError) as exc:
raise ConfigOperationError(
f"Cannot resolve log target path {log_target!r}: {exc}"
) from exc
resolved_str = str(resolved)
if not any(resolved_str.startswith(safe) for safe in _SAFE_LOG_PREFIXES):
raise ConfigOperationError(
f"Log path {resolved_str!r} is outside the allowed directory. "
"Only paths under /var/log or /config/log are permitted."
)
if not resolved.is_file():
raise ConfigOperationError(f"Log file not found: {resolved_str!r}")
total_lines, raw_lines = await asyncio.gather(
run_blocking(_count_file_lines, resolved_str),
run_blocking(_read_tail_lines, resolved_str, lines),
)
filtered = (
[ln for ln in raw_lines if filter_text in ln]
if filter_text
else raw_lines
)
log.info(
"fail2ban_log_read",
log_path=resolved_str,
lines_requested=lines,
lines_returned=len(filtered),
filter_active=filter_text is not None,
)
return Fail2BanLogResponse(
log_path=resolved_str,
lines=filtered,
total_lines=total_lines,
log_level=log_level,
log_target=log_target,
)
def test_regex(request: RegexTestRequest) -> RegexTestResponse:
@@ -38,7 +184,10 @@ def test_regex(request: RegexTestRequest) -> RegexTestResponse:
return RegexTestResponse(matched=False)
groups: list[str] = list(match.groups() or [])
return RegexTestResponse(matched=True, groups=[str(g) for g in groups if g is not None])
return RegexTestResponse(
matched=True,
groups=[str(g) for g in groups if g is not None],
)
async def preview_log(req: LogPreviewRequest) -> LogPreviewResponse:
@@ -87,7 +236,11 @@ async def preview_log(req: LogPreviewRequest) -> LogPreviewResponse:
matched_count = 0
for line in raw_lines:
m = compiled.search(line)
groups = [str(g) for g in (m.groups() or []) if g is not None] if m else []
groups = [
str(g)
for g in (m.groups() or [])
if g is not None
] if m else []
result_lines.append(
LogPreviewLine(line=line, matched=(m is not None), groups=groups),
)
@@ -124,4 +277,8 @@ def _read_tail_lines(file_path: str, num_lines: int) -> list[str]:
if pos > 0 and len(raw_lines) > 1:
raw_lines = raw_lines[1:]
return [ln.decode("utf-8", errors="replace").rstrip() for ln in raw_lines[-num_lines:] if ln.strip()]
return [
ln.decode("utf-8", errors="replace").rstrip()
for ln in raw_lines[-num_lines:]
if ln.strip()
]

View File

@@ -284,21 +284,6 @@ class ConfigService(Protocol):
) -> None:
...
async def read_fail2ban_log(
self,
socket_path: str,
lines: int,
filter_text: str | None = None,
) -> Fail2BanLogResponse:
...
async def get_service_status(
self,
socket_path: str,
probe_fn: Callable[[str], Awaitable[ServiceStatusResponse]] | None = None,
) -> ServiceStatusResponse:
...
@runtime_checkable
class HistoryService(Protocol):

View File

@@ -0,0 +1,89 @@
"""Shared settings persistence helpers.
This service centralises storage and validation for application settings that are
shared between setup and runtime configuration workflows.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import structlog
from app.repositories import settings_repo
if TYPE_CHECKING: # pragma: no cover
import aiosqlite
log: structlog.stdlib.BoundLogger = structlog.get_logger()
_KEY_MAP_COLOR_THRESHOLD_HIGH = "map_color_threshold_high"
_KEY_MAP_COLOR_THRESHOLD_MEDIUM = "map_color_threshold_medium"
_KEY_MAP_COLOR_THRESHOLD_LOW = "map_color_threshold_low"
_DEFAULT_MAP_COLOR_THRESHOLD_HIGH = 100
_DEFAULT_MAP_COLOR_THRESHOLD_MEDIUM = 50
_DEFAULT_MAP_COLOR_THRESHOLD_LOW = 20
async def get_map_color_thresholds(
db: aiosqlite.Connection,
) -> tuple[int, int, int]:
"""Return map color thresholds from stored settings.
Args:
db: Active aiosqlite connection.
Returns:
A tuple of ``(high, medium, low)`` thresholds.
"""
high = await settings_repo.get_setting(db, _KEY_MAP_COLOR_THRESHOLD_HIGH)
medium = await settings_repo.get_setting(db, _KEY_MAP_COLOR_THRESHOLD_MEDIUM)
low = await settings_repo.get_setting(db, _KEY_MAP_COLOR_THRESHOLD_LOW)
return (
int(high) if high else _DEFAULT_MAP_COLOR_THRESHOLD_HIGH,
int(medium) if medium else _DEFAULT_MAP_COLOR_THRESHOLD_MEDIUM,
int(low) if low else _DEFAULT_MAP_COLOR_THRESHOLD_LOW,
)
async def set_map_color_thresholds(
db: aiosqlite.Connection,
*,
threshold_high: int,
threshold_medium: int,
threshold_low: int,
) -> None:
"""Persist validated map color thresholds.
Args:
db: Active aiosqlite connection.
threshold_high: High threshold value.
threshold_medium: Medium threshold value.
threshold_low: Low threshold value.
Raises:
ValueError: If thresholds are non-positive or misordered.
"""
if threshold_high <= 0 or threshold_medium <= 0 or threshold_low <= 0:
raise ValueError("All thresholds must be positive integers.")
if not (threshold_high > threshold_medium > threshold_low):
raise ValueError("Thresholds must satisfy: high > medium > low.")
await settings_repo.set_setting(
db, _KEY_MAP_COLOR_THRESHOLD_HIGH, str(threshold_high)
)
await settings_repo.set_setting(
db, _KEY_MAP_COLOR_THRESHOLD_MEDIUM, str(threshold_medium)
)
await settings_repo.set_setting(
db, _KEY_MAP_COLOR_THRESHOLD_LOW, str(threshold_low)
)
log.info(
"map_color_thresholds_persisted",
high=threshold_high,
medium=threshold_medium,
low=threshold_low,
)

View File

@@ -15,16 +15,16 @@ import structlog
from app.db import init_db, open_db
from app.repositories import settings_repo
from app.utils.async_utils import run_blocking
from app.utils.setup_utils import (
from app.services.settings_service import (
get_map_color_thresholds as util_get_map_color_thresholds,
)
from app.services.settings_service import (
set_map_color_thresholds as util_set_map_color_thresholds,
)
from app.utils.async_utils import run_blocking
from app.utils.setup_utils import (
get_password_hash as util_get_password_hash,
)
from app.utils.setup_utils import (
set_map_color_thresholds as util_set_map_color_thresholds,
)
if TYPE_CHECKING:
import aiosqlite