"""Bans router. Manual ban and unban operations and the active-bans overview: * ``GET /api/bans/active`` — list all currently banned IPs * ``POST /api/bans`` — ban an IP in a specific jail * ``DELETE /api/bans`` — unban an IP from one or all jails * ``DELETE /api/bans/all`` — unban every currently banned IP across all jails """ from __future__ import annotations from typing import TYPE_CHECKING if TYPE_CHECKING: import aiohttp from fastapi import APIRouter, HTTPException, Request, status from app.dependencies import AuthDep from app.models.ban import ActiveBanListResponse, BanRequest, UnbanAllResponse, UnbanRequest from app.models.jail import JailCommandResponse from app.services import jail_service from app.services.jail_service import JailNotFoundError, JailOperationError from app.utils.fail2ban_client import Fail2BanConnectionError router: APIRouter = APIRouter(prefix="/api/bans", tags=["Bans"]) def _bad_gateway(exc: Exception) -> HTTPException: """Return a 502 response when fail2ban is unreachable. Args: exc: The underlying connection error. Returns: :class:`fastapi.HTTPException` with status 502. """ return HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Cannot reach fail2ban: {exc}", ) @router.get( "/active", response_model=ActiveBanListResponse, summary="List all currently banned IPs across all jails", ) async def get_active_bans( request: Request, _auth: AuthDep, ) -> ActiveBanListResponse: """Return every IP that is currently banned across all fail2ban jails. Each entry includes the jail name, ban start time, expiry time, and enriched geolocation data (country code). Args: request: Incoming request (used to access ``app.state``). _auth: Validated session — enforces authentication. Returns: :class:`~app.models.ban.ActiveBanListResponse` with all active bans. Raises: HTTPException: 502 when fail2ban is unreachable. """ socket_path: str = request.app.state.settings.fail2ban_socket http_session: aiohttp.ClientSession = request.app.state.http_session app_db = request.app.state.db try: return await jail_service.get_active_bans( socket_path, http_session=http_session, app_db=app_db, ) except Fail2BanConnectionError as exc: raise _bad_gateway(exc) from exc @router.post( "", status_code=status.HTTP_201_CREATED, response_model=JailCommandResponse, summary="Ban an IP address in a specific jail", ) async def ban_ip( request: Request, _auth: AuthDep, body: BanRequest, ) -> JailCommandResponse: """Ban an IP address in the specified fail2ban jail. The IP address is validated before the command is sent. IPv4 and IPv6 addresses are both accepted. Args: request: Incoming request (used to access ``app.state``). _auth: Validated session — enforces authentication. body: Payload containing the IP address and target jail. Returns: :class:`~app.models.jail.JailCommandResponse` confirming the ban. Raises: HTTPException: 400 when the IP address is invalid. HTTPException: 404 when the specified jail does not exist. HTTPException: 409 when fail2ban reports the ban failed. HTTPException: 502 when fail2ban is unreachable. """ socket_path: str = request.app.state.settings.fail2ban_socket try: await jail_service.ban_ip(socket_path, body.jail, body.ip) return JailCommandResponse( message=f"IP {body.ip!r} banned in jail {body.jail!r}.", jail=body.jail, ) except ValueError as exc: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc), ) from exc except JailNotFoundError: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Jail not found: {body.jail!r}", ) from None except JailOperationError as exc: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=str(exc), ) from exc except Fail2BanConnectionError as exc: raise _bad_gateway(exc) from exc @router.delete( "", response_model=JailCommandResponse, summary="Unban an IP address from one or all jails", ) async def unban_ip( request: Request, _auth: AuthDep, body: UnbanRequest, ) -> JailCommandResponse: """Unban an IP address from a specific jail or all jails. When ``unban_all`` is ``true`` the IP is removed from every jail using fail2ban's global unban command. When ``jail`` is specified only that jail is targeted. If neither ``unban_all`` nor ``jail`` is provided the IP is unbanned from all jails (equivalent to ``unban_all=true``). Args: request: Incoming request (used to access ``app.state``). _auth: Validated session — enforces authentication. body: Payload with the IP address, optional jail, and unban_all flag. Returns: :class:`~app.models.jail.JailCommandResponse` confirming the unban. Raises: HTTPException: 400 when the IP address is invalid. HTTPException: 404 when the specified jail does not exist. HTTPException: 409 when fail2ban reports the unban failed. HTTPException: 502 when fail2ban is unreachable. """ socket_path: str = request.app.state.settings.fail2ban_socket # Determine target jail (None means all jails). target_jail: str | None = None if (body.unban_all or body.jail is None) else body.jail try: await jail_service.unban_ip(socket_path, body.ip, jail=target_jail) scope = f"jail {target_jail!r}" if target_jail else "all jails" return JailCommandResponse( message=f"IP {body.ip!r} unbanned from {scope}.", jail=target_jail or "*", ) except ValueError as exc: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc), ) from exc except JailNotFoundError: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Jail not found: {target_jail!r}", ) from None except JailOperationError as exc: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=str(exc), ) from exc except Fail2BanConnectionError as exc: raise _bad_gateway(exc) from exc @router.delete( "/all", response_model=UnbanAllResponse, summary="Unban every currently banned IP across all jails", ) async def unban_all( request: Request, _auth: AuthDep, ) -> UnbanAllResponse: """Remove all active bans from every fail2ban jail in a single operation. Uses fail2ban's ``unban --all`` command to atomically clear every active ban across all jails. Returns the number of IPs that were unbanned. Args: request: Incoming request (used to access ``app.state``). _auth: Validated session — enforces authentication. Returns: :class:`~app.models.ban.UnbanAllResponse` with the count of unbanned IPs. Raises: HTTPException: 502 when fail2ban is unreachable. """ socket_path: str = request.app.state.settings.fail2ban_socket try: count: int = await jail_service.unban_all_ips(socket_path) return UnbanAllResponse( message=f"All bans cleared. {count} IP address{'es' if count != 1 else ''} unbanned.", count=count, ) except Fail2BanConnectionError as exc: raise _bad_gateway(exc) from exc