Add global domain exception handlers in main.py

Register consistent HTTP error mappings for common domain exceptions and add regression tests for 404/400/500 handler behavior.
This commit is contained in:
2026-04-17 16:42:18 +02:00
parent 900d111a5d
commit 04b2e2f700
2 changed files with 162 additions and 1 deletions

View File

@@ -29,6 +29,26 @@ from starlette.middleware.base import BaseHTTPMiddleware
from app import __version__ from app import __version__
from app.config import Settings, get_settings from app.config import Settings, get_settings
from app.exceptions import (
ActionAlreadyExistsError,
ActionNotFoundError,
ActionReadonlyError,
ConfigFileExistsError,
ConfigFileNameError,
ConfigFileNotFoundError,
ConfigOperationError,
ConfigValidationError,
ConfigWriteError,
Fail2BanConnectionError,
Fail2BanProtocolError,
FilterAlreadyExistsError,
FilterNotFoundError,
FilterReadonlyError,
JailAlreadyActiveError,
JailAlreadyInactiveError,
JailNotFoundError,
JailOperationError,
)
from app.routers import ( from app.routers import (
auth, auth,
bans, bans,
@@ -44,7 +64,6 @@ from app.routers import (
setup, setup,
) )
from app.startup import startup_shared_resources from app.startup import startup_shared_resources
from app.exceptions import Fail2BanConnectionError, Fail2BanProtocolError
from app.utils.runtime_state import ApplicationState, RuntimeState from app.utils.runtime_state import ApplicationState, RuntimeState
from app.utils.session_cache import InMemorySessionCache, NoOpSessionCache from app.utils.session_cache import InMemorySessionCache, NoOpSessionCache
from app.utils.setup_state import is_setup_complete_cached, set_setup_complete_cache from app.utils.setup_state import is_setup_complete_cached, set_setup_complete_cache
@@ -204,6 +223,91 @@ async def _fail2ban_protocol_handler(
) )
async def _not_found_handler(
request: Request,
exc: Exception,
) -> JSONResponse:
"""Return a ``404 Not Found`` response for missing domain entities.
Args:
request: The incoming FastAPI request.
exc: The not-found exception.
Returns:
A :class:`fastapi.responses.JSONResponse` with status 404.
"""
log.warning(
"domain_not_found",
path=request.url.path,
method=request.method,
error=str(exc),
)
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={"detail": str(exc)},
)
async def _bad_request_handler(
request: Request,
exc: Exception,
) -> JSONResponse:
"""Return a ``400 Bad Request`` response for validation and domain contract errors.
Args:
request: The incoming FastAPI request.
exc: The validation exception.
Returns:
A :class:`fastapi.responses.JSONResponse` with status 400.
"""
log.warning(
"domain_bad_request",
path=request.url.path,
method=request.method,
error=str(exc),
)
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={"detail": str(exc)},
)
async def _conflict_handler(
request: Request,
exc: Exception,
) -> JSONResponse:
"""Return a ``409 Conflict`` response for domain state conflicts."""
log.warning(
"domain_conflict",
path=request.url.path,
method=request.method,
error=str(exc),
)
return JSONResponse(
status_code=status.HTTP_409_CONFLICT,
content={"detail": str(exc)},
)
async def _domain_error_handler(
request: Request,
exc: Exception,
) -> JSONResponse:
"""Return a ``500 Internal Server Error`` response for domain write failures."""
log.error(
"domain_internal_error",
path=request.url.path,
method=request.method,
error=str(exc),
exc_info=exc,
)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"detail": str(exc)},
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Setup-redirect middleware # Setup-redirect middleware
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -321,6 +425,22 @@ def create_app(settings: Settings | None = None) -> FastAPI:
# rather than falling through to the generic 500 handler. # rather than falling through to the generic 500 handler.
app.add_exception_handler(Fail2BanConnectionError, _fail2ban_connection_handler) # type: ignore[arg-type] app.add_exception_handler(Fail2BanConnectionError, _fail2ban_connection_handler) # type: ignore[arg-type]
app.add_exception_handler(Fail2BanProtocolError, _fail2ban_protocol_handler) # type: ignore[arg-type] app.add_exception_handler(Fail2BanProtocolError, _fail2ban_protocol_handler) # type: ignore[arg-type]
app.add_exception_handler(JailNotFoundError, _not_found_handler) # type: ignore[arg-type]
app.add_exception_handler(FilterNotFoundError, _not_found_handler) # type: ignore[arg-type]
app.add_exception_handler(ActionNotFoundError, _not_found_handler) # type: ignore[arg-type]
app.add_exception_handler(ConfigFileNotFoundError, _not_found_handler) # type: ignore[arg-type]
app.add_exception_handler(ConfigValidationError, _bad_request_handler) # type: ignore[arg-type]
app.add_exception_handler(ConfigFileNameError, _bad_request_handler) # type: ignore[arg-type]
app.add_exception_handler(ConfigOperationError, _bad_request_handler) # type: ignore[arg-type]
app.add_exception_handler(JailOperationError, _conflict_handler) # type: ignore[arg-type]
app.add_exception_handler(JailAlreadyActiveError, _conflict_handler) # type: ignore[arg-type]
app.add_exception_handler(JailAlreadyInactiveError, _conflict_handler) # type: ignore[arg-type]
app.add_exception_handler(FilterAlreadyExistsError, _conflict_handler) # type: ignore[arg-type]
app.add_exception_handler(ActionAlreadyExistsError, _conflict_handler) # type: ignore[arg-type]
app.add_exception_handler(FilterReadonlyError, _conflict_handler) # type: ignore[arg-type]
app.add_exception_handler(ActionReadonlyError, _conflict_handler) # type: ignore[arg-type]
app.add_exception_handler(ConfigFileExistsError, _conflict_handler) # type: ignore[arg-type]
app.add_exception_handler(ConfigWriteError, _domain_error_handler) # type: ignore[arg-type]
app.add_exception_handler(Exception, _unhandled_exception_handler) app.add_exception_handler(Exception, _unhandled_exception_handler)
# --- Routers --- # --- Routers ---

View File

@@ -10,6 +10,7 @@ from httpx import ASGITransport, AsyncClient
from app.config import Settings from app.config import Settings
from app.db import init_db from app.db import init_db
from app.exceptions import ConfigValidationError, ConfigWriteError, JailNotFoundError
from app.main import CORSMiddleware, _lifespan, create_app from app.main import CORSMiddleware, _lifespan, create_app
from app.services import setup_service from app.services import setup_service
@@ -82,6 +83,46 @@ def test_create_app_initialises_runtime_state_manager() -> None:
assert app.state.server_status.online is False assert app.state.server_status.online is False
async def test_create_app_global_domain_exception_handlers() -> None:
"""Global exception handlers map domain exceptions to consistent HTTP responses."""
settings = Settings(
database_path="/tmp/test.db",
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use-in-production",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
)
app = create_app(settings=settings)
@app.get("/not-found")
async def raise_not_found() -> None:
raise JailNotFoundError("ssh")
@app.get("/bad-request")
async def raise_bad_request() -> None:
raise ConfigValidationError("invalid payload")
@app.get("/server-error")
async def raise_server_error() -> None:
raise ConfigWriteError("write failed")
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get("/not-found")
assert response.status_code == 404
assert response.json() == {"detail": "Jail not found: 'ssh'"}
response = await client.get("/bad-request")
assert response.status_code == 400
assert response.json() == {"detail": "invalid payload"}
response = await client.get("/server-error")
assert response.status_code == 500
assert response.json() == {"detail": "write failed"}
def test_create_app_disables_cors_by_default() -> None: def test_create_app_disables_cors_by_default() -> None:
"""The FastAPI app does not add CORS middleware when no origins are configured by environment.""" """The FastAPI app does not add CORS middleware when no origins are configured by environment."""
settings = Settings( settings = Settings(