Files
BanGUI/backend/app/routers/config_misc.py

430 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
from typing import Annotated
from fastapi import APIRouter, HTTPException, Query, Request, status
from app.dependencies import AuthDep, DbDep, Fail2BanSocketDep, Fail2BanStartCommandDep
from app.models.config import (
Fail2BanLogResponse,
GlobalConfigResponse,
GlobalConfigUpdate,
LogPreviewRequest,
LogPreviewResponse,
MapColorThresholdsResponse,
MapColorThresholdsUpdate,
RegexTestRequest,
RegexTestResponse,
ServiceStatusResponse,
)
from app.services import config_file_service, config_service, jail_service, log_service, setup_service
from app.utils.fail2ban_client import Fail2BanConnectionError
router: APIRouter = APIRouter(tags=["Config Misc"])
def _bad_gateway(exc: Exception) -> HTTPException:
return HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Cannot reach fail2ban: {exc}",
)
@router.get(
"/global",
response_model=GlobalConfigResponse,
summary="Return global fail2ban settings",
)
async def get_global_config(
request: Request,
_auth: AuthDep,
socket_path: Fail2BanSocketDep,
) -> GlobalConfigResponse:
"""Return global fail2ban settings (log level, log target, database config).
Args:
request: Incoming request.
_auth: Validated session.
Returns:
:class:`~app.models.config.GlobalConfigResponse`.
Raises:
HTTPException: 502 when fail2ban is unreachable.
"""
try:
return await config_service.get_global_config(socket_path)
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc
@router.put(
"/global",
status_code=status.HTTP_204_NO_CONTENT,
summary="Update global fail2ban settings",
)
async def update_global_config(
request: Request,
_auth: AuthDep,
socket_path: Fail2BanSocketDep,
body: GlobalConfigUpdate,
) -> None:
"""Update global fail2ban settings.
Args:
request: Incoming request.
_auth: Validated session.
body: Partial update — only non-None fields are written.
Raises:
HTTPException: 400 when a set command is rejected.
HTTPException: 502 when fail2ban is unreachable.
"""
try:
await config_service.update_global_config(socket_path, body)
except ConfigOperationError as exc:
raise _bad_request(str(exc)) from exc
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc
# ---------------------------------------------------------------------------
# Reload endpoint
# ---------------------------------------------------------------------------
@router.post(
"/reload",
status_code=status.HTTP_204_NO_CONTENT,
summary="Reload fail2ban to apply configuration changes",
)
async def reload_fail2ban(
request: Request,
_auth: AuthDep,
socket_path: Fail2BanSocketDep,
) -> None:
"""Trigger a full fail2ban reload.
All jails are stopped and restarted with the current configuration.
Args:
request: Incoming request.
_auth: Validated session.
Raises:
HTTPException: 409 when fail2ban reports the reload failed.
HTTPException: 502 when fail2ban is unreachable.
"""
try:
await jail_service.reload_all(socket_path)
except JailOperationError as exc:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"fail2ban reload failed: {exc}",
) from exc
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc
# Restart endpoint
# ---------------------------------------------------------------------------
@router.post(
"/restart",
status_code=status.HTTP_204_NO_CONTENT,
summary="Restart the fail2ban service",
)
async def restart_fail2ban(
request: Request,
_auth: AuthDep,
socket_path: Fail2BanSocketDep,
start_cmd: Fail2BanStartCommandDep,
) -> None:
"""Trigger a full fail2ban service restart.
Stops the fail2ban daemon via the Unix domain socket, then starts it
again using the configured ``fail2ban_start_command``. After starting,
probes the socket for up to 10 seconds to confirm the daemon came back
online.
Args:
request: Incoming request.
_auth: Validated session.
Raises:
HTTPException: 409 when fail2ban reports the stop command failed.
HTTPException: 502 when fail2ban is unreachable for the stop command.
HTTPException: 503 when fail2ban does not come back online within
10 seconds after being started. Check the fail2ban log for
initialisation errors. Use
``POST /api/config/jails/{name}/rollback`` if a specific jail
is suspect.
"""
start_cmd_parts: list[str] = start_cmd.split()
# Step 1: stop the daemon via socket.
try:
await jail_service.restart(socket_path)
except JailOperationError as exc:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"fail2ban stop command failed: {exc}",
) from exc
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc
# Step 2: start the daemon via subprocess.
await config_file_service.start_daemon(start_cmd_parts)
# Step 3: probe the socket until fail2ban is responsive or the budget expires.
fail2ban_running: bool = await config_file_service.wait_for_fail2ban(socket_path, max_wait_seconds=10.0)
if not fail2ban_running:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=(
"fail2ban was stopped but did not come back online within 10 seconds. "
"Check the fail2ban log for initialisation errors. "
"Use POST /api/config/jails/{name}/rollback if a specific jail is suspect."
),
)
log.info("fail2ban_restarted")
# ---------------------------------------------------------------------------
# Regex tester (stateless)
# ---------------------------------------------------------------------------
@router.post(
"/regex-test",
response_model=RegexTestResponse,
summary="Test a fail regex pattern against a sample log line",
)
async def regex_test(
_auth: AuthDep,
body: RegexTestRequest,
) -> RegexTestResponse:
"""Test whether a regex pattern matches a given log line.
This endpoint is entirely in-process — no fail2ban socket call is made.
Returns the match result and any captured groups.
Args:
_auth: Validated session.
body: Sample log line and regex pattern.
Returns:
:class:`~app.models.config.RegexTestResponse` with match result and groups.
"""
return log_service.test_regex(body)
# ---------------------------------------------------------------------------
# Log path management
# ---------------------------------------------------------------------------
@router.post(
"/preview-log",
response_model=LogPreviewResponse,
summary="Preview log file lines against a regex pattern",
)
async def preview_log(
_auth: AuthDep,
body: LogPreviewRequest,
) -> LogPreviewResponse:
"""Read the last N lines of a log file and test a regex against each one.
Returns each line with a flag indicating whether the regex matched, and
the captured groups for matching lines. The log file is read from the
server's local filesystem.
Args:
_auth: Validated session.
body: Log file path, regex pattern, and number of lines to read.
Returns:
:class:`~app.models.config.LogPreviewResponse` with per-line results.
"""
return await log_service.preview_log(body)
# ---------------------------------------------------------------------------
# Map color thresholds
# ---------------------------------------------------------------------------
@router.get(
"/map-color-thresholds",
response_model=MapColorThresholdsResponse,
summary="Get map color threshold configuration",
)
async def get_map_color_thresholds(
request: Request,
_auth: AuthDep,
db: DbDep,
) -> MapColorThresholdsResponse:
"""Return the configured map color thresholds.
Args:
request: FastAPI request object.
_auth: Validated session.
Returns:
:class:`~app.models.config.MapColorThresholdsResponse` with
current thresholds.
"""
from app.services import setup_service
high, medium, low = await setup_service.get_map_color_thresholds(db)
return MapColorThresholdsResponse(
threshold_high=high,
threshold_medium=medium,
threshold_low=low,
)
@router.put(
"/map-color-thresholds",
response_model=MapColorThresholdsResponse,
summary="Update map color threshold configuration",
)
async def update_map_color_thresholds(
request: Request,
_auth: AuthDep,
db: DbDep,
body: MapColorThresholdsUpdate,
) -> MapColorThresholdsResponse:
"""Update the map color threshold configuration.
Args:
request: FastAPI request object.
_auth: Validated session.
body: New threshold values.
Returns:
:class:`~app.models.config.MapColorThresholdsResponse` with
updated thresholds.
Raises:
HTTPException: 400 if validation fails (thresholds not
properly ordered).
"""
from app.services import setup_service
try:
await setup_service.set_map_color_thresholds(
db,
threshold_high=body.threshold_high,
threshold_medium=body.threshold_medium,
threshold_low=body.threshold_low,
)
except ValueError as exc:
raise _bad_request(str(exc)) from exc
return MapColorThresholdsResponse(
threshold_high=body.threshold_high,
threshold_medium=body.threshold_medium,
threshold_low=body.threshold_low,
)
@router.get(
"/fail2ban-log",
response_model=Fail2BanLogResponse,
summary="Read the tail of the fail2ban daemon log file",
)
async def get_fail2ban_log(
request: Request,
_auth: AuthDep,
socket_path: Fail2BanSocketDep,
lines: Annotated[int, Query(ge=1, le=2000, description="Number of lines to return from the tail.")] = 200,
filter: Annotated[ # noqa: A002
str | None,
Query(description="Plain-text substring filter; only matching lines are returned."),
] = None,
) -> Fail2BanLogResponse:
"""Return the tail of the fail2ban daemon log file.
Queries the fail2ban socket for the current log target and log level,
reads the last *lines* entries from the file, and optionally filters
them by *filter*. Only file-based log targets are supported.
Args:
request: Incoming request.
_auth: Validated session — enforces authentication.
lines: Number of tail lines to return (12000, default 200).
filter: Optional plain-text substring — only matching lines returned.
Returns:
:class:`~app.models.config.Fail2BanLogResponse`.
Raises:
HTTPException: 400 when the log target is not a file or path is outside
the allowed directory.
HTTPException: 502 when fail2ban is unreachable.
"""
try:
return await config_service.read_fail2ban_log(socket_path, lines, filter)
except config_service.ConfigOperationError as exc:
raise _bad_request(str(exc)) from exc
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc
@router.get(
"/service-status",
response_model=ServiceStatusResponse,
summary="Return fail2ban service health status with log configuration",
)
async def get_service_status(
request: Request,
_auth: AuthDep,
socket_path: Fail2BanSocketDep,
) -> ServiceStatusResponse:
"""Return fail2ban service health and current log configuration.
Probes the fail2ban daemon to determine online/offline state, then
augments the result with the current log level and log target values.
Args:
request: Incoming request.
_auth: Validated session — enforces authentication.
Returns:
:class:`~app.models.config.ServiceStatusResponse`.
Raises:
HTTPException: 502 when fail2ban is unreachable (the service itself
handles this gracefully and returns ``online=False``).
"""
from app.services import health_service
try:
return await config_service.get_service_status(
socket_path,
probe_fn=health_service.probe,
)
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc