Files
BanGUI/backend/app/routers/bans.py
Lukas 5480dce221 refactor: Remove duplicate router-level exception helpers
All routers now let domain exceptions propagate to the global handlers in main.py
instead of catching and converting them to HTTPException. This eliminates:

- Duplicate exception-to-HTTP-status mappings across 8 routers
- Duplicate helper functions (_bad_gateway, _not_found, _conflict, etc.)
- Inconsistent error response formats

Changes:
- Removed all try/except blocks from routers that catch domain exceptions
- Removed duplicate helper functions from all routers
- Added missing exception handlers to main.py for:
  * ActionNameError
  * FilterNameError
  * JailNameError
  * JailNotFoundInConfigError
  * FilterInvalidRegexError
- Removed unused imports from affected routers

All domain exceptions now propagate to the single authoritative mapping in
main.py, ensuring consistent error codes, messages, and logging across the API.

Affected routers:
- action_config.py: Removed _action_not_found, _bad_request, _not_found helpers
- bans.py: Removed try/except in ban/unban endpoints
- config_misc.py: Removed try/except blocks
- file_config.py: Removed 6 try/except blocks and _service_unavailable helper
- filter_config.py: Removed try/except blocks
- geo.py: Removed try/except in lookup_ip endpoint
- jail_config.py: Removed try/except blocks
- jails.py: Removed try/except blocks
- server.py: Removed try/except blocks

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-23 16:00:37 +02:00

179 lines
5.7 KiB
Python

"""Bans router.
Manual ban and unban operations and the active-bans overview:
* ``GET /api/bans/active`` — list all currently banned IPs
* ``POST /api/bans`` — ban an IP in a specific jail
* ``DELETE /api/bans`` — unban an IP from one or all jails
* ``DELETE /api/bans/all`` — unban every currently banned IP across all jails
"""
from __future__ import annotations
from fastapi import APIRouter, HTTPException, Request, status
from app.dependencies import (
AuthDep,
DbDep,
Fail2BanSocketDep,
GeoBatchLookupDep,
HttpSessionDep,
)
from app.exceptions import JailNotFoundError, JailOperationError
from app.models.ban import ActiveBanListResponse, BanRequest, UnbanAllResponse, UnbanRequest
from app.models.jail import JailCommandResponse
from app.services import ban_service, jail_service
from app.exceptions import Fail2BanConnectionError
router: APIRouter = APIRouter(prefix="/api/bans", tags=["Bans"])
@router.get(
"/active",
response_model=ActiveBanListResponse,
summary="List all currently banned IPs across all jails",
)
async def get_active_bans(
request: Request,
_auth: AuthDep,
db: DbDep,
socket_path: Fail2BanSocketDep,
http_session: HttpSessionDep,
geo_batch_lookup: GeoBatchLookupDep,
) -> ActiveBanListResponse:
"""Return every IP that is currently banned across all fail2ban jails.
Each entry includes the jail name, ban start time, expiry time, and
enriched geolocation data (country code).
Args:
request: Incoming request (used to access ``app.state``).
_auth: Validated session — enforces authentication.
Returns:
:class:`~app.models.ban.ActiveBanListResponse` with all active bans.
Raises:
HTTPException: 502 when fail2ban is unreachable.
"""
return await ban_service.get_active_bans(
socket_path,
geo_batch_lookup=geo_batch_lookup,
http_session=http_session,
app_db=db,
)
@router.post(
"",
status_code=status.HTTP_201_CREATED,
response_model=JailCommandResponse,
summary="Ban an IP address in a specific jail",
)
async def ban_ip(
request: Request,
_auth: AuthDep,
body: BanRequest,
socket_path: Fail2BanSocketDep,
) -> JailCommandResponse:
"""Ban an IP address in the specified fail2ban jail.
The IP address is validated before the command is sent. IPv4 and
IPv6 addresses are both accepted.
Args:
request: Incoming request (used to access ``app.state``).
_auth: Validated session — enforces authentication.
body: Payload containing the IP address and target jail.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the ban.
Raises:
HTTPException: 400 when the IP address is invalid.
HTTPException: 404 when the specified jail does not exist.
HTTPException: 409 when fail2ban reports the ban failed.
HTTPException: 502 when fail2ban is unreachable.
"""
await ban_service.ban_ip(socket_path, body.jail, body.ip)
return JailCommandResponse(
message=f"IP {body.ip!r} banned in jail {body.jail!r}.",
jail=body.jail,
)
@router.delete(
"",
response_model=JailCommandResponse,
summary="Unban an IP address from one or all jails",
)
async def unban_ip(
request: Request,
_auth: AuthDep,
body: UnbanRequest,
socket_path: Fail2BanSocketDep,
) -> JailCommandResponse:
"""Unban an IP address from a specific jail or all jails.
When ``unban_all`` is ``true`` the IP is removed from every jail using
fail2ban's global unban command. When ``jail`` is specified only that
jail is targeted. If neither ``unban_all`` nor ``jail`` is provided the
IP is unbanned from all jails (equivalent to ``unban_all=true``).
Args:
request: Incoming request (used to access ``app.state``).
_auth: Validated session — enforces authentication.
body: Payload with the IP address, optional jail, and unban_all flag.
Returns:
:class:`~app.models.jail.JailCommandResponse` confirming the unban.
Raises:
HTTPException: 400 when the IP address is invalid.
HTTPException: 404 when the specified jail does not exist.
HTTPException: 409 when fail2ban reports the unban failed.
HTTPException: 502 when fail2ban is unreachable.
"""
# Determine target jail (None means all jails).
target_jail: str | None = None if (body.unban_all or body.jail is None) else body.jail
await ban_service.unban_ip(socket_path, body.ip, jail=target_jail)
scope = f"jail {target_jail!r}" if target_jail else "all jails"
return JailCommandResponse(
message=f"IP {body.ip!r} unbanned from {scope}.",
jail=target_jail or "*",
)
@router.delete(
"/all",
response_model=UnbanAllResponse,
summary="Unban every currently banned IP across all jails",
)
async def unban_all(
request: Request,
_auth: AuthDep,
socket_path: Fail2BanSocketDep,
) -> UnbanAllResponse:
"""Remove all active bans from every fail2ban jail in a single operation.
Uses fail2ban's ``unban --all`` command to atomically clear every active
ban across all jails. Returns the number of IPs that were unbanned.
Args:
request: Incoming request (used to access ``app.state``).
_auth: Validated session — enforces authentication.
Returns:
:class:`~app.models.ban.UnbanAllResponse` with the count of
unbanned IPs.
Raises:
HTTPException: 502 when fail2ban is unreachable.
"""
count: int = await jail_service.unban_all_ips(socket_path)
return UnbanAllResponse(
message=f"All bans cleared. {count} IP address{'es' if count != 1 else ''} unbanned.",
count=count,
)