Files
BanGUI/backend/app/routers/jails.py
Lukas ebec5e0f58 Stage 6: jail management — backend service, routers, tests, and frontend
- jail_service.py: list/detail/control/ban/unban/ignore-list/IP-lookup
- jails.py router: 11 endpoints including ignore list management
- bans.py router: active bans, ban, unban
- geo.py router: IP lookup with geo enrichment
- models: Jail.actions, ActiveBan.country/.banned_at optional, GeoDetail
- 217 tests pass (40 service + 36 router + 141 existing), 76% coverage
- Frontend: types/jail.ts, api/jails.ts, hooks/useJails.ts
- JailsPage: jail overview table with controls, ban/unban forms,
  active bans table, IP lookup
- JailDetailPage: full detail, start/stop/idle/reload, patterns,
  ignore list management
2026-03-01 14:09:02 +01:00

545 lines
17 KiB
Python

"""Jails router.
Provides CRUD and control operations for fail2ban jails:
* ``GET /api/jails`` — list all jails
* ``GET /api/jails/{name}`` — full detail for one jail
* ``POST /api/jails/{name}/start`` — start a jail
* ``POST /api/jails/{name}/stop`` — stop a jail
* ``POST /api/jails/{name}/idle`` — toggle idle mode
* ``POST /api/jails/{name}/reload`` — reload a single jail
* ``POST /api/jails/reload-all`` — reload every jail
* ``GET /api/jails/{name}/ignoreip`` — ignore-list for a jail
* ``POST /api/jails/{name}/ignoreip`` — add IP to ignore list
* ``DELETE /api/jails/{name}/ignoreip`` — remove IP from ignore list
* ``POST /api/jails/{name}/ignoreself`` — toggle ignoreself option
"""
from __future__ import annotations
from typing import Annotated
from fastapi import APIRouter, Body, HTTPException, Path, Request, status
from app.dependencies import AuthDep
from app.models.jail import (
IgnoreIpRequest,
JailCommandResponse,
JailDetailResponse,
JailListResponse,
)
from app.services import jail_service
from app.services.jail_service import JailNotFoundError, JailOperationError
from app.utils.fail2ban_client import Fail2BanConnectionError
router: APIRouter = APIRouter(prefix="/api/jails", tags=["Jails"])
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_NamePath = Annotated[str, Path(description="Jail name as configured in fail2ban.")]
def _not_found(name: str) -> HTTPException:
"""Return a 404 response for an unknown jail.
Args:
name: Jail name that was not found.
Returns:
:class:`fastapi.HTTPException` with status 404.
"""
return HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Jail not found: {name!r}",
)
def _bad_gateway(exc: Exception) -> HTTPException:
"""Return a 502 response when fail2ban is unreachable.
Args:
exc: The underlying connection error.
Returns:
:class:`fastapi.HTTPException` with status 502.
"""
return HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Cannot reach fail2ban: {exc}",
)
def _conflict(message: str) -> HTTPException:
"""Return a 409 response for invalid jail state transitions.
Args:
message: Human-readable description of the conflict.
Returns:
:class:`fastapi.HTTPException` with status 409.
"""
return HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=message,
)
# ---------------------------------------------------------------------------
# Jail listing & detail
# ---------------------------------------------------------------------------
@router.get(
"",
response_model=JailListResponse,
summary="List all active fail2ban jails",
)
async def get_jails(
request: Request,
_auth: AuthDep,
) -> JailListResponse:
"""Return a summary of every active fail2ban jail.
Includes runtime metrics (currently banned, total bans, failures) and
key configuration (find time, ban time, max retries, backend, idle state)
for each jail.
Args:
request: Incoming request (used to access ``app.state``).
_auth: Validated session — enforces authentication.
Returns:
:class:`~app.models.jail.JailListResponse` with all active jails.
"""
socket_path: str = request.app.state.settings.fail2ban_socket
try:
return await jail_service.list_jails(socket_path)
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc
@router.get(
"/{name}",
response_model=JailDetailResponse,
summary="Return full detail for a single jail",
)
async def get_jail(
request: Request,
_auth: AuthDep,
name: _NamePath,
) -> JailDetailResponse:
"""Return the complete configuration and runtime state for one jail.
Includes log paths, fail regex and ignore regex patterns, date pattern,
log encoding, attached action names, ban-time settings, and runtime
counters.
Args:
request: Incoming request (used to access ``app.state``).
_auth: Validated session — enforces authentication.
name: Jail name.
Returns:
:class:`~app.models.jail.JailDetailResponse` with the full jail.
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 502 when fail2ban is unreachable.
"""
socket_path: str = request.app.state.settings.fail2ban_socket
try:
return await jail_service.get_jail(socket_path, name)
except JailNotFoundError:
raise _not_found(name) from None
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc
# ---------------------------------------------------------------------------
# Jail control commands
# ---------------------------------------------------------------------------
@router.post(
"/reload-all",
response_model=JailCommandResponse,
summary="Reload all fail2ban jails",
)
async def reload_all_jails(
request: Request,
_auth: AuthDep,
) -> JailCommandResponse:
"""Reload every fail2ban jail to apply configuration changes.
This command instructs fail2ban to re-read its configuration for all
jails simultaneously.
Args:
request: Incoming request (used to access ``app.state``).
_auth: Validated session — enforces authentication.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the reload.
Raises:
HTTPException: 502 when fail2ban is unreachable.
HTTPException: 409 when fail2ban reports the operation failed.
"""
socket_path: str = request.app.state.settings.fail2ban_socket
try:
await jail_service.reload_all(socket_path)
return JailCommandResponse(message="All jails reloaded successfully.", jail="*")
except JailOperationError as exc:
raise _conflict(str(exc)) from exc
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc
@router.post(
"/{name}/start",
response_model=JailCommandResponse,
summary="Start a stopped jail",
)
async def start_jail(
request: Request,
_auth: AuthDep,
name: _NamePath,
) -> JailCommandResponse:
"""Start a fail2ban jail that is currently stopped.
Args:
request: Incoming request (used to access ``app.state``).
_auth: Validated session — enforces authentication.
name: Jail name.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the start.
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 409 when fail2ban reports the operation failed.
HTTPException: 502 when fail2ban is unreachable.
"""
socket_path: str = request.app.state.settings.fail2ban_socket
try:
await jail_service.start_jail(socket_path, name)
return JailCommandResponse(message=f"Jail {name!r} started.", jail=name)
except JailNotFoundError:
raise _not_found(name) from None
except JailOperationError as exc:
raise _conflict(str(exc)) from exc
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc
@router.post(
"/{name}/stop",
response_model=JailCommandResponse,
summary="Stop a running jail",
)
async def stop_jail(
request: Request,
_auth: AuthDep,
name: _NamePath,
) -> JailCommandResponse:
"""Stop a running fail2ban jail.
The jail will no longer monitor logs or issue new bans. Existing bans
may or may not be removed depending on fail2ban configuration.
Args:
request: Incoming request (used to access ``app.state``).
_auth: Validated session — enforces authentication.
name: Jail name.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the stop.
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 409 when fail2ban reports the operation failed.
HTTPException: 502 when fail2ban is unreachable.
"""
socket_path: str = request.app.state.settings.fail2ban_socket
try:
await jail_service.stop_jail(socket_path, name)
return JailCommandResponse(message=f"Jail {name!r} stopped.", jail=name)
except JailNotFoundError:
raise _not_found(name) from None
except JailOperationError as exc:
raise _conflict(str(exc)) from exc
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc
@router.post(
"/{name}/idle",
response_model=JailCommandResponse,
summary="Toggle idle mode for a jail",
)
async def toggle_idle(
request: Request,
_auth: AuthDep,
name: _NamePath,
on: bool = Body(..., description="``true`` to enable idle, ``false`` to disable."),
) -> JailCommandResponse:
"""Enable or disable idle mode for a fail2ban jail.
In idle mode the jail suspends log monitoring without fully stopping,
preserving all existing bans.
Args:
request: Incoming request (used to access ``app.state``).
_auth: Validated session — enforces authentication.
name: Jail name.
on: ``true`` to enable idle, ``false`` to disable.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the change.
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 409 when fail2ban reports the operation failed.
HTTPException: 502 when fail2ban is unreachable.
"""
socket_path: str = request.app.state.settings.fail2ban_socket
state_str = "on" if on else "off"
try:
await jail_service.set_idle(socket_path, name, on=on)
return JailCommandResponse(
message=f"Jail {name!r} idle mode turned {state_str}.",
jail=name,
)
except JailNotFoundError:
raise _not_found(name) from None
except JailOperationError as exc:
raise _conflict(str(exc)) from exc
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc
@router.post(
"/{name}/reload",
response_model=JailCommandResponse,
summary="Reload a single jail",
)
async def reload_jail(
request: Request,
_auth: AuthDep,
name: _NamePath,
) -> JailCommandResponse:
"""Reload a single fail2ban jail to pick up configuration changes.
Args:
request: Incoming request (used to access ``app.state``).
_auth: Validated session — enforces authentication.
name: Jail name.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the reload.
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 409 when fail2ban reports the operation failed.
HTTPException: 502 when fail2ban is unreachable.
"""
socket_path: str = request.app.state.settings.fail2ban_socket
try:
await jail_service.reload_jail(socket_path, name)
return JailCommandResponse(message=f"Jail {name!r} reloaded.", jail=name)
except JailNotFoundError:
raise _not_found(name) from None
except JailOperationError as exc:
raise _conflict(str(exc)) from exc
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc
# ---------------------------------------------------------------------------
# Ignore list (IP whitelist)
# ---------------------------------------------------------------------------
class _IgnoreSelfRequest(IgnoreIpRequest):
"""Request body for the ignoreself toggle endpoint.
Inherits from :class:`~app.models.jail.IgnoreIpRequest` but overrides
the ``ip`` field with a boolean ``on`` field.
"""
@router.get(
"/{name}/ignoreip",
response_model=list[str],
summary="List the ignore IPs for a jail",
)
async def get_ignore_list(
request: Request,
_auth: AuthDep,
name: _NamePath,
) -> list[str]:
"""Return the current ignore list (IP whitelist) for a fail2ban jail.
Args:
request: Incoming request (used to access ``app.state``).
_auth: Validated session — enforces authentication.
name: Jail name.
Returns:
List of IP addresses and CIDR networks on the ignore list.
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 502 when fail2ban is unreachable.
"""
socket_path: str = request.app.state.settings.fail2ban_socket
try:
return await jail_service.get_ignore_list(socket_path, name)
except JailNotFoundError:
raise _not_found(name) from None
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc
@router.post(
"/{name}/ignoreip",
status_code=status.HTTP_201_CREATED,
response_model=JailCommandResponse,
summary="Add an IP or network to the ignore list",
)
async def add_ignore_ip(
request: Request,
_auth: AuthDep,
name: _NamePath,
body: IgnoreIpRequest,
) -> JailCommandResponse:
"""Add an IP address or CIDR network to a jail's ignore list.
IPs on the ignore list are never banned by that jail, even if they
trigger the configured fail regex.
Args:
request: Incoming request (used to access ``app.state``).
_auth: Validated session — enforces authentication.
name: Jail name.
body: Payload containing the IP or CIDR to add.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the addition.
Raises:
HTTPException: 400 when the IP address or network is invalid.
HTTPException: 404 when the jail does not exist.
HTTPException: 409 when fail2ban reports the operation failed.
HTTPException: 502 when fail2ban is unreachable.
"""
socket_path: str = request.app.state.settings.fail2ban_socket
try:
await jail_service.add_ignore_ip(socket_path, name, body.ip)
return JailCommandResponse(
message=f"IP {body.ip!r} added to ignore list of jail {name!r}.",
jail=name,
)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(exc),
) from exc
except JailNotFoundError:
raise _not_found(name) from None
except JailOperationError as exc:
raise _conflict(str(exc)) from exc
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc
@router.delete(
"/{name}/ignoreip",
response_model=JailCommandResponse,
summary="Remove an IP or network from the ignore list",
)
async def del_ignore_ip(
request: Request,
_auth: AuthDep,
name: _NamePath,
body: IgnoreIpRequest,
) -> JailCommandResponse:
"""Remove an IP address or CIDR network from a jail's ignore list.
Args:
request: Incoming request (used to access ``app.state``).
_auth: Validated session — enforces authentication.
name: Jail name.
body: Payload containing the IP or CIDR to remove.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the removal.
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 409 when fail2ban reports the operation failed.
HTTPException: 502 when fail2ban is unreachable.
"""
socket_path: str = request.app.state.settings.fail2ban_socket
try:
await jail_service.del_ignore_ip(socket_path, name, body.ip)
return JailCommandResponse(
message=f"IP {body.ip!r} removed from ignore list of jail {name!r}.",
jail=name,
)
except JailNotFoundError:
raise _not_found(name) from None
except JailOperationError as exc:
raise _conflict(str(exc)) from exc
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc
@router.post(
"/{name}/ignoreself",
response_model=JailCommandResponse,
summary="Toggle the ignoreself option for a jail",
)
async def toggle_ignore_self(
request: Request,
_auth: AuthDep,
name: _NamePath,
on: bool = Body(..., description="``true`` to enable ignoreself, ``false`` to disable."),
) -> JailCommandResponse:
"""Toggle the ``ignoreself`` flag for a fail2ban jail.
When ``ignoreself`` is enabled fail2ban automatically adds the server's
own IP addresses to the ignore list so the host can never ban itself.
Args:
request: Incoming request (used to access ``app.state``).
_auth: Validated session — enforces authentication.
name: Jail name.
on: ``true`` to enable, ``false`` to disable.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the change.
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 409 when fail2ban reports the operation failed.
HTTPException: 502 when fail2ban is unreachable.
"""
socket_path: str = request.app.state.settings.fail2ban_socket
state_str = "enabled" if on else "disabled"
try:
await jail_service.set_ignore_self(socket_path, name, on=on)
return JailCommandResponse(
message=f"ignoreself {state_str} for jail {name!r}.",
jail=name,
)
except JailNotFoundError:
raise _not_found(name) from None
except JailOperationError as exc:
raise _conflict(str(exc)) from exc
except Fail2BanConnectionError as exc:
raise _bad_gateway(exc) from exc