Files
BanGUI/backend/app/routers/jails.py
Lukas 69e1726045 Refactor data fetching hooks, add page size lint test
- Simplify useFetchData: remove unused URL building logic
- Add usePolledData initial implementation
- Add router page_size param validation test
- Update API reference docs
- Clean up tasks doc
2026-05-04 06:48:24 +02:00

570 lines
19 KiB
Python

"""Jails router.
Provides CRUD and control operations for fail2ban jails:
* ``GET /api/jails`` — list all jails
* ``GET /api/jails/{name}`` — full detail for one jail
* ``GET /api/jails/{name}/banned`` — paginated currently-banned IPs for one jail
* ``POST /api/jails/{name}/start`` — start a jail
* ``POST /api/jails/{name}/stop`` — stop a jail
* ``POST /api/jails/{name}/idle`` — toggle idle mode
* ``POST /api/jails/{name}/reload`` — reload a single jail
* ``POST /api/jails/reload-all`` — reload every jail
* ``GET /api/jails/{name}/ignoreip`` — ignore-list for a jail
* ``POST /api/jails/{name}/ignoreip`` — add IP to ignore list
* ``DELETE /api/jails/{name}/ignoreip`` — remove IP from ignore list
* ``POST /api/jails/{name}/ignoreself`` — toggle ignoreself option
"""
from __future__ import annotations
import asyncio
from typing import Annotated
from fastapi import APIRouter, Body, Path, Query, status
from app.dependencies import (
AuthDep,
BanServiceContextDep,
Fail2BanSocketDep,
GeoCacheDep,
HttpSessionDep,
JailServiceStateDep,
)
from app.exceptions import BadRequestError
from app.mappers import jail_mappers
from app.models.ban import JailBannedIpsResponse
from app.models.jail import (
IgnoreIpRequest,
IgnoreListResponse,
JailCommandResponse,
JailDetailResponse,
JailListResponse,
)
from app.services import jail_service
from app.utils.constants import DEFAULT_PAGE_SIZE
router: APIRouter = APIRouter(prefix="/api/v1/jails", tags=["Jails"])
_NamePath = Annotated[str, Path(description="Jail name as configured in fail2ban.")]
# ---------------------------------------------------------------------------
# Jail listing & detail
# ---------------------------------------------------------------------------
@router.get(
"",
response_model=JailListResponse,
summary="List all active fail2ban jails",
responses={
200: {"description": "Jails list returned", "model": JailListResponse},
401: {"description": "Session missing, expired, or invalid"},
502: {"description": "fail2ban unreachable"},
},
)
async def get_jails(
_auth: AuthDep,
socket_path: Fail2BanSocketDep,
state: JailServiceStateDep,
) -> JailListResponse:
"""Return a summary of every active fail2ban jail.
Includes runtime metrics (currently banned, total bans, failures) and
key configuration (find time, ban time, max retries, backend, idle state)
for each jail.
Args:
_auth: Validated session — enforces authentication.
socket_path: Path to the fail2ban Unix domain socket.
state: The jail service state holder.
Returns:
:class:`~app.models.jail.JailListResponse` with all active jails.
"""
domain_result = await jail_service.list_jails(socket_path, state)
return jail_mappers.map_domain_jail_list_to_response(domain_result)
@router.get(
"/{name}",
response_model=JailDetailResponse,
summary="Return full detail for a single jail",
responses={
200: {"description": "Jail detail returned", "model": JailDetailResponse},
401: {"description": "Session missing, expired, or invalid"},
404: {"description": "Jail not found"},
502: {"description": "fail2ban unreachable"},
},
)
async def get_jail(
_auth: AuthDep,
name: _NamePath,
socket_path: Fail2BanSocketDep,
) -> JailDetailResponse:
"""Return the complete configuration and runtime state for one jail.
Includes log paths, fail regex and ignore regex patterns, date pattern,
log encoding, attached action names, ban-time settings, and runtime
counters.
Args:
_auth: Validated session — enforces authentication.
name: Jail name.
Returns:
:class:`~app.models.jail.JailDetailResponse` with the full jail.
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 502 when fail2ban is unreachable.
"""
jail_detail, ignore_list, ignore_self = await asyncio.gather(
jail_service.get_jail(socket_path, name),
jail_service.get_ignore_list(socket_path, name),
jail_service.get_ignore_self(socket_path, name),
)
# Merge ignore_list and ignore_self from dedicated service calls
jail_detail_with_ignore = jail_detail.model_copy(
update={"ignore_list": ignore_list, "ignore_self": ignore_self}
)
return jail_mappers.map_domain_jail_detail_to_response(jail_detail_with_ignore)
# ---------------------------------------------------------------------------
# Jail control commands
# ---------------------------------------------------------------------------
@router.post(
"/reload-all",
response_model=JailCommandResponse,
summary="Reload all fail2ban jails",
responses={
200: {"description": "All jails reloaded", "model": JailCommandResponse},
401: {"description": "Session missing, expired, or invalid"},
409: {"description": "fail2ban reports operation failed"},
502: {"description": "fail2ban unreachable"},
},
)
async def reload_all_jails(
_auth: AuthDep,
socket_path: Fail2BanSocketDep,
) -> JailCommandResponse:
"""Reload every fail2ban jail to apply configuration changes.
This command instructs fail2ban to re-read its configuration for all
jails simultaneously.
Args:
_auth: Validated session — enforces authentication.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the reload.
Raises:
HTTPException: 502 when fail2ban is unreachable.
HTTPException: 409 when fail2ban reports the operation failed.
"""
await jail_service.reload_all(socket_path)
return JailCommandResponse(message="All jails reloaded successfully.", jail="*")
@router.post(
"/{name}/start",
response_model=JailCommandResponse,
summary="Start a stopped jail",
responses={
200: {"description": "Jail started", "model": JailCommandResponse},
401: {"description": "Session missing, expired, or invalid"},
404: {"description": "Jail not found"},
409: {"description": "fail2ban reports operation failed"},
502: {"description": "fail2ban unreachable"},
},
)
async def start_jail(
_auth: AuthDep,
name: _NamePath,
socket_path: Fail2BanSocketDep,
) -> JailCommandResponse:
"""Start a fail2ban jail that is currently stopped.
Args:
_auth: Validated session — enforces authentication.
name: Jail name.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the start.
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 409 when fail2ban reports the operation failed.
HTTPException: 502 when fail2ban is unreachable.
"""
await jail_service.start_jail(socket_path, name)
return JailCommandResponse(message=f"Jail {name!r} started.", jail=name)
@router.post(
"/{name}/stop",
response_model=JailCommandResponse,
summary="Stop a running jail",
responses={
200: {"description": "Jail stopped", "model": JailCommandResponse},
401: {"description": "Session missing, expired, or invalid"},
409: {"description": "fail2ban reports operation failed"},
502: {"description": "fail2ban unreachable"},
},
)
async def stop_jail(
_auth: AuthDep,
name: _NamePath,
socket_path: Fail2BanSocketDep,
) -> JailCommandResponse:
"""Stop a running fail2ban jail.
The jail will no longer monitor logs or issue new bans. Existing bans
may or may not be removed depending on fail2ban configuration. If the
jail is already stopped the request succeeds silently (idempotent).
Args:
_auth: Validated session — enforces authentication.
name: Jail name.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the stop.
Raises:
HTTPException: 409 when fail2ban reports the operation failed.
HTTPException: 502 when fail2ban is unreachable.
"""
await jail_service.stop_jail(socket_path, name)
return JailCommandResponse(message=f"Jail {name!r} stopped.", jail=name)
@router.post(
"/{name}/idle",
response_model=JailCommandResponse,
summary="Toggle idle mode for a jail",
responses={
200: {"description": "Idle mode toggled", "model": JailCommandResponse},
401: {"description": "Session missing, expired, or invalid"},
404: {"description": "Jail not found"},
409: {"description": "fail2ban reports operation failed"},
502: {"description": "fail2ban unreachable"},
},
)
async def toggle_idle(
_auth: AuthDep,
name: _NamePath,
socket_path: Fail2BanSocketDep,
on: bool = Body(..., description="``true`` to enable idle, ``false`` to disable."),
) -> JailCommandResponse:
"""Enable or disable idle mode for a fail2ban jail.
In idle mode the jail suspends log monitoring without fully stopping,
preserving all existing bans.
Args:
_auth: Validated session — enforces authentication.
name: Jail name.
on: ``true`` to enable idle, ``false`` to disable.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the change.
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 409 when fail2ban reports the operation failed.
HTTPException: 502 when fail2ban is unreachable.
"""
state_str = "on" if on else "off"
await jail_service.set_idle(socket_path, name, on=on)
return JailCommandResponse(
message=f"Jail {name!r} idle mode turned {state_str}.",
jail=name,
)
@router.post(
"/{name}/reload",
response_model=JailCommandResponse,
summary="Reload a single jail",
responses={
200: {"description": "Jail reloaded", "model": JailCommandResponse},
401: {"description": "Session missing, expired, or invalid"},
404: {"description": "Jail not found"},
409: {"description": "fail2ban reports operation failed"},
502: {"description": "fail2ban unreachable"},
},
)
async def reload_jail(
_auth: AuthDep,
name: _NamePath,
socket_path: Fail2BanSocketDep,
) -> JailCommandResponse:
"""Reload a single fail2ban jail to pick up configuration changes.
Args:
_auth: Validated session — enforces authentication.
name: Jail name.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the reload.
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 409 when fail2ban reports the operation failed.
HTTPException: 502 when fail2ban is unreachable.
"""
await jail_service.reload_jail(socket_path, name)
return JailCommandResponse(message=f"Jail {name!r} reloaded.", jail=name)
# ---------------------------------------------------------------------------
# Ignore list (IP whitelist)
# ---------------------------------------------------------------------------
class _IgnoreSelfRequest(IgnoreIpRequest):
"""Request body for the ignoreself toggle endpoint.
Inherits from :class:`~app.models.jail.IgnoreIpRequest` but overrides
the ``ip`` field with a boolean ``on`` field.
"""
@router.get(
"/{name}/ignoreip",
response_model=IgnoreListResponse,
summary="List the ignore IPs for a jail",
responses={
200: {"description": "Ignore list returned", "model": IgnoreListResponse},
401: {"description": "Session missing, expired, or invalid"},
404: {"description": "Jail not found"},
502: {"description": "fail2ban unreachable"},
},
)
async def get_ignore_list(
_auth: AuthDep,
name: _NamePath,
socket_path: Fail2BanSocketDep,
) -> IgnoreListResponse:
"""Return the current ignore list (IP whitelist) for a fail2ban jail.
Args:
_auth: Validated session — enforces authentication.
name: Jail name.
Returns:
List of IP addresses and CIDR networks on the ignore list.
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 502 when fail2ban is unreachable.
"""
ignore_list = await jail_service.get_ignore_list(socket_path, name)
return IgnoreListResponse(items=ignore_list, total=len(ignore_list))
@router.post(
"/{name}/ignoreip",
status_code=status.HTTP_201_CREATED,
response_model=JailCommandResponse,
summary="Add an IP or network to the ignore list",
responses={
201: {"description": "IP added to ignore list", "model": JailCommandResponse},
400: {"description": "IP or network invalid"},
401: {"description": "Session missing, expired, or invalid"},
404: {"description": "Jail not found"},
409: {"description": "fail2ban reports operation failed"},
502: {"description": "fail2ban unreachable"},
},
)
async def add_ignore_ip(
_auth: AuthDep,
name: _NamePath,
body: IgnoreIpRequest,
socket_path: Fail2BanSocketDep,
) -> JailCommandResponse:
"""Add an IP address or CIDR network to a jail's ignore list.
IPs on the ignore list are never banned by that jail, even if they
trigger the configured fail regex.
Args:
_auth: Validated session — enforces authentication.
name: Jail name.
body: Payload containing the IP or CIDR to add.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the addition.
Raises:
HTTPException: 400 when the IP address or network is invalid.
HTTPException: 404 when the jail does not exist.
HTTPException: 409 when fail2ban reports the operation failed.
HTTPException: 502 when fail2ban is unreachable.
"""
await jail_service.add_ignore_ip(socket_path, name, body.ip)
return JailCommandResponse(
message=f"IP {body.ip!r} added to ignore list of jail {name!r}.",
jail=name,
)
@router.delete(
"/{name}/ignoreip",
response_model=JailCommandResponse,
summary="Remove an IP or network from the ignore list",
responses={
200: {"description": "IP removed from ignore list", "model": JailCommandResponse},
401: {"description": "Session missing, expired, or invalid"},
404: {"description": "Jail not found"},
409: {"description": "fail2ban reports operation failed"},
502: {"description": "fail2ban unreachable"},
},
)
async def del_ignore_ip(
_auth: AuthDep,
name: _NamePath,
body: IgnoreIpRequest,
socket_path: Fail2BanSocketDep,
) -> JailCommandResponse:
"""Remove an IP address or CIDR network from a jail's ignore list.
Args:
_auth: Validated session — enforces authentication.
name: Jail name.
body: Payload containing the IP or CIDR to remove.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the removal.
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 409 when fail2ban reports the operation failed.
HTTPException: 502 when fail2ban is unreachable.
"""
await jail_service.del_ignore_ip(socket_path, name, body.ip)
return JailCommandResponse(
message=f"IP {body.ip!r} removed from ignore list of jail {name!r}.",
jail=name,
)
@router.post(
"/{name}/ignoreself",
response_model=JailCommandResponse,
summary="Toggle the ignoreself option for a jail",
responses={
200: {"description": "ignoreself toggled", "model": JailCommandResponse},
401: {"description": "Session missing, expired, or invalid"},
404: {"description": "Jail not found"},
409: {"description": "fail2ban reports operation failed"},
502: {"description": "fail2ban unreachable"},
},
)
async def toggle_ignore_self(
_auth: AuthDep,
name: _NamePath,
socket_path: Fail2BanSocketDep,
on: bool = Body(..., description="``true`` to enable ignoreself, ``false`` to disable."),
) -> JailCommandResponse:
"""Toggle the ``ignoreself`` flag for a fail2ban jail.
When ``ignoreself`` is enabled fail2ban automatically adds the server's
own IP addresses to the ignore list so the host can never ban itself.
Args:
_auth: Validated session — enforces authentication.
name: Jail name.
on: ``true`` to enable, ``false`` to disable.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the change.
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 409 when fail2ban reports the operation failed.
HTTPException: 502 when fail2ban is unreachable.
"""
state_str = "enabled" if on else "disabled"
await jail_service.set_ignore_self(socket_path, name, on=on)
return JailCommandResponse(
message=f"ignoreself {state_str} for jail {name!r}.",
jail=name,
)
# ---------------------------------------------------------------------------
# Currently banned IPs (paginated)
# ---------------------------------------------------------------------------
@router.get(
"/{name}/banned",
response_model=JailBannedIpsResponse,
summary="Return paginated currently-banned IPs for a single jail",
responses={
200: {"description": "Banned IPs returned", "model": JailBannedIpsResponse},
400: {"description": "page or page_size out of range"},
401: {"description": "Session missing, expired, or invalid"},
404: {"description": "Jail not found"},
502: {"description": "fail2ban unreachable"},
},
)
async def get_jail_banned_ips(
_auth: AuthDep,
ban_ctx: BanServiceContextDep,
name: _NamePath,
socket_path: Fail2BanSocketDep,
http_session: HttpSessionDep,
geo_cache: GeoCacheDep,
page: int = Query(default=1, ge=1, description="1-based page number."),
page_size: int = Query(default=DEFAULT_PAGE_SIZE, ge=1, le=100, description="Items per page (max 100)."),
search: str | None = None,
) -> JailBannedIpsResponse:
"""Return a paginated list of IPs currently banned by a specific jail.
The full ban list is fetched from the fail2ban socket, filtered by the
optional *search* substring, sliced to the requested page, and then
geo-enriched exclusively for that page slice.
Args:
_auth: Validated session — enforces authentication.
ban_ctx: Ban service context containing db and repository.
name: Jail name.
socket_path: Path to fail2ban Unix domain socket.
http_session: Shared HTTP session for geolocation.
geo_cache: Geolocation cache instance.
page: 1-based page number (default 1, min 1).
page_size: Items per page (default 100, max 100).
search: Optional case-insensitive substring filter on the IP address.
Returns:
:class:`~app.models.ban.JailBannedIpsResponse` with the paginated bans.
Raises:
HTTPException: 400 when *page* or *page_size* are out of range.
HTTPException: 404 when the jail does not exist.
HTTPException: 502 when fail2ban is unreachable.
"""
if page < 1:
raise BadRequestError("page must be >= 1.")
if not (1 <= page_size <= 100):
raise BadRequestError("page_size must be between 1 and 100.")
domain_result = await jail_service.get_jail_banned_ips(
socket_path=socket_path,
jail_name=name,
page=page,
page_size=page_size,
search=search,
geo_cache=geo_cache,
http_session=http_session,
app_db=ban_ctx.db,
)
return jail_mappers.map_domain_jail_banned_ips_to_response(domain_result)