## 27) Error response body shape is inconsistent
This commit is contained in:
@@ -10,11 +10,17 @@ All domain exceptions inherit from one of these base categories:
|
||||
- **ConflictError** (409): State conflict, resource already exists, invalid state transition
|
||||
- **OperationError** (500): Operation failure, write errors
|
||||
- **ServiceUnavailableError** (503): Infrastructure/external service issues
|
||||
- **AuthenticationError** (401): Authentication or authorization failure
|
||||
- **RateLimitError** (429): Rate limit exceeded
|
||||
|
||||
Service exceptions inherit from the appropriate category, allowing routers to
|
||||
handle categories rather than individual exception types. Exception handlers in
|
||||
main.py register only base category types.
|
||||
|
||||
Every exception class has:
|
||||
- **error_code**: A machine-readable error code for client-side branching
|
||||
- **get_error_metadata()**: Returns structured metadata for the API response
|
||||
|
||||
Example:
|
||||
def get_jail(name: str) -> Jail:
|
||||
# Raises JailNotFoundError (subclass of NotFoundError)
|
||||
@@ -22,52 +28,84 @@ Example:
|
||||
|
||||
@app.exception_handler(NotFoundError)
|
||||
async def handle_not_found(request, exc):
|
||||
return JSONResponse(status_code=404, content={"detail": str(exc)})
|
||||
return JSONResponse(status_code=404, content=ErrorResponse(
|
||||
code=exc.error_code,
|
||||
detail=str(exc),
|
||||
metadata=exc.get_error_metadata()
|
||||
).model_dump())
|
||||
|
||||
See Backend-Development.md for the complete exception contract.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Exception Base Classes (Categories)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class DomainError(Exception):
|
||||
"""Base class for all domain exceptions."""
|
||||
"""Base class for all domain exceptions.
|
||||
|
||||
All domain exceptions must:
|
||||
1. Define an `error_code` class attribute (machine-readable error code)
|
||||
2. Implement `get_error_metadata()` to return structured error context
|
||||
"""
|
||||
|
||||
pass
|
||||
error_code: str = "internal_error"
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
"""Return structured metadata for the API error response.
|
||||
|
||||
Subclasses should override to expose only safe, relevant metadata.
|
||||
|
||||
Returns:
|
||||
A dictionary of metadata key-value pairs safe for client consumption.
|
||||
"""
|
||||
return {}
|
||||
|
||||
|
||||
class NotFoundError(DomainError):
|
||||
"""Raised when a requested domain entity is not found. HTTP 404."""
|
||||
|
||||
pass
|
||||
error_code: str = "not_found"
|
||||
|
||||
|
||||
class BadRequestError(DomainError):
|
||||
"""Raised for invalid input, validation failures, or invalid identifiers. HTTP 400."""
|
||||
|
||||
pass
|
||||
error_code: str = "invalid_input"
|
||||
|
||||
|
||||
class ConflictError(DomainError):
|
||||
"""Raised for state conflicts or resource constraints. HTTP 409."""
|
||||
|
||||
pass
|
||||
error_code: str = "conflict"
|
||||
|
||||
|
||||
class OperationError(DomainError):
|
||||
"""Raised when a domain operation fails (write, update, etc.). HTTP 500."""
|
||||
|
||||
pass
|
||||
error_code: str = "operation_failed"
|
||||
|
||||
|
||||
class ServiceUnavailableError(DomainError):
|
||||
"""Raised for infrastructure or external service issues. HTTP 503."""
|
||||
|
||||
pass
|
||||
error_code: str = "service_unavailable"
|
||||
|
||||
|
||||
class AuthenticationError(DomainError):
|
||||
"""Raised for authentication or authorization failures. HTTP 401."""
|
||||
|
||||
error_code: str = "authentication_required"
|
||||
|
||||
|
||||
class RateLimitError(DomainError):
|
||||
"""Raised when a client exceeds rate limits. HTTP 429."""
|
||||
|
||||
error_code: str = "rate_limit_exceeded"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -78,30 +116,45 @@ class ServiceUnavailableError(DomainError):
|
||||
class JailNotFoundError(NotFoundError):
|
||||
"""Raised when a requested jail name does not exist."""
|
||||
|
||||
error_code: str = "jail_not_found"
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
super().__init__(f"Jail not found: {name!r}")
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"jail_name": self.name}
|
||||
|
||||
|
||||
class JailOperationError(ConflictError):
|
||||
"""Raised when a jail state operation fails (e.g. start/stop already in progress)."""
|
||||
|
||||
error_code: str = "jail_operation_failed"
|
||||
|
||||
|
||||
class ConfigValidationError(BadRequestError):
|
||||
"""Raised when config values fail validation before applying."""
|
||||
|
||||
error_code: str = "config_validation_failed"
|
||||
|
||||
|
||||
class ConfigOperationError(BadRequestError):
|
||||
"""Raised when a config payload update or command fails."""
|
||||
|
||||
error_code: str = "config_operation_failed"
|
||||
|
||||
|
||||
class ConfigDirError(ServiceUnavailableError):
|
||||
"""Raised when the fail2ban config directory is missing or inaccessible."""
|
||||
|
||||
error_code: str = "config_dir_unavailable"
|
||||
|
||||
|
||||
class ConfigFileNotFoundError(NotFoundError):
|
||||
"""Raised when a requested config file does not exist."""
|
||||
|
||||
error_code: str = "config_file_not_found"
|
||||
|
||||
def __init__(self, filename: str) -> None:
|
||||
"""Initialize with the filename that was not found.
|
||||
|
||||
@@ -111,10 +164,15 @@ class ConfigFileNotFoundError(NotFoundError):
|
||||
self.filename = filename
|
||||
super().__init__(f"Config file not found: {filename!r}")
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"filename": self.filename}
|
||||
|
||||
|
||||
class ConfigFileExistsError(ConflictError):
|
||||
"""Raised when trying to create a file that already exists."""
|
||||
|
||||
error_code: str = "config_file_exists"
|
||||
|
||||
def __init__(self, filename: str) -> None:
|
||||
"""Initialize with the filename that already exists.
|
||||
|
||||
@@ -124,22 +182,33 @@ class ConfigFileExistsError(ConflictError):
|
||||
self.filename = filename
|
||||
super().__init__(f"Config file already exists: {filename!r}")
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"filename": self.filename}
|
||||
|
||||
|
||||
class ConfigFileWriteError(OperationError):
|
||||
"""Raised when a file cannot be written (permissions, disk full, etc.)."""
|
||||
|
||||
error_code: str = "config_file_write_failed"
|
||||
|
||||
|
||||
class ConfigFileNameError(BadRequestError):
|
||||
"""Raised when a supplied filename is invalid or unsafe."""
|
||||
|
||||
error_code: str = "config_file_name_invalid"
|
||||
|
||||
|
||||
class ServerOperationError(BadRequestError):
|
||||
"""Raised when a server control command (e.g. refresh) fails."""
|
||||
|
||||
error_code: str = "server_operation_failed"
|
||||
|
||||
|
||||
class Fail2BanConnectionError(ServiceUnavailableError):
|
||||
"""Raised when the fail2ban socket is unreachable or returns an error."""
|
||||
|
||||
error_code: str = "fail2ban_unreachable"
|
||||
|
||||
def __init__(self, message: str, socket_path: str) -> None:
|
||||
"""Initialize with a human-readable message and the socket path.
|
||||
|
||||
@@ -150,112 +219,215 @@ class Fail2BanConnectionError(ServiceUnavailableError):
|
||||
self.socket_path: str = socket_path
|
||||
super().__init__(f"{message} (socket: {socket_path})")
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"socket_path": self.socket_path}
|
||||
|
||||
|
||||
class Fail2BanProtocolError(ServiceUnavailableError):
|
||||
"""Raised when the response from fail2ban cannot be parsed."""
|
||||
|
||||
error_code: str = "fail2ban_protocol_error"
|
||||
|
||||
|
||||
class FilterInvalidRegexError(BadRequestError):
|
||||
"""Raised when a regex pattern fails to compile."""
|
||||
|
||||
error_code: str = "filter_invalid_regex"
|
||||
|
||||
def __init__(self, pattern: str, error: str) -> None:
|
||||
"""Initialize with the invalid pattern and compile error."""
|
||||
self.pattern = pattern
|
||||
self.error = error
|
||||
super().__init__(f"Invalid regex {pattern!r}: {error}")
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"pattern": self.pattern, "error": self.error}
|
||||
|
||||
|
||||
class JailNotFoundInConfigError(NotFoundError):
|
||||
"""Raised when the requested jail name is not defined in any config file."""
|
||||
|
||||
error_code: str = "jail_not_in_config"
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
super().__init__(f"Jail not found in config: {name!r}")
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"jail_name": self.name}
|
||||
|
||||
|
||||
class ConfigWriteError(OperationError):
|
||||
"""Raised when writing a configuration file modification fails."""
|
||||
|
||||
error_code: str = "config_write_failed"
|
||||
|
||||
def __init__(self, message: str) -> None:
|
||||
self.message = message
|
||||
super().__init__(message)
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"message": self.message}
|
||||
|
||||
|
||||
class JailNameError(BadRequestError):
|
||||
"""Raised when a jail name contains invalid characters."""
|
||||
|
||||
error_code: str = "jail_name_invalid"
|
||||
|
||||
|
||||
class JailAlreadyActiveError(ConflictError):
|
||||
"""Raised when trying to activate a jail that is already active."""
|
||||
|
||||
error_code: str = "jail_already_active"
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
super().__init__(f"Jail is already active: {name!r}")
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"jail_name": self.name}
|
||||
|
||||
|
||||
class JailAlreadyInactiveError(ConflictError):
|
||||
"""Raised when trying to deactivate a jail that is already inactive."""
|
||||
|
||||
error_code: str = "jail_already_inactive"
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
super().__init__(f"Jail is already inactive: {name!r}")
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"jail_name": self.name}
|
||||
|
||||
|
||||
class FilterNotFoundError(NotFoundError):
|
||||
"""Raised when the requested filter name is not found."""
|
||||
|
||||
error_code: str = "filter_not_found"
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
super().__init__(f"Filter not found: {name!r}")
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"filter_name": self.name}
|
||||
|
||||
|
||||
class FilterAlreadyExistsError(ConflictError):
|
||||
"""Raised when trying to create a filter whose `.conf` or `.local` already exists."""
|
||||
|
||||
error_code: str = "filter_already_exists"
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
super().__init__(f"Filter already exists: {name!r}")
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"filter_name": self.name}
|
||||
|
||||
|
||||
class FilterNameError(BadRequestError):
|
||||
"""Raised when a filter name contains invalid characters."""
|
||||
|
||||
error_code: str = "filter_name_invalid"
|
||||
|
||||
|
||||
class FilterReadonlyError(ConflictError):
|
||||
"""Raised when trying to delete a shipped `.conf` filter with no `.local` override."""
|
||||
|
||||
error_code: str = "filter_readonly"
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
super().__init__(
|
||||
f"Filter {name!r} is a shipped default (.conf only); only user-created .local files can be deleted."
|
||||
)
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"filter_name": self.name}
|
||||
|
||||
|
||||
class ActionNotFoundError(NotFoundError):
|
||||
"""Raised when the requested action name is not found."""
|
||||
|
||||
error_code: str = "action_not_found"
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
super().__init__(f"Action not found: {name!r}")
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"action_name": self.name}
|
||||
|
||||
|
||||
class ActionAlreadyExistsError(ConflictError):
|
||||
"""Raised when trying to create an action whose `.conf` or `.local` already exists."""
|
||||
|
||||
error_code: str = "action_already_exists"
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
super().__init__(f"Action already exists: {name!r}")
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"action_name": self.name}
|
||||
|
||||
|
||||
class ActionNameError(BadRequestError):
|
||||
"""Raised when an action name contains invalid characters."""
|
||||
|
||||
error_code: str = "action_name_invalid"
|
||||
|
||||
|
||||
class ActionReadonlyError(ConflictError):
|
||||
"""Raised when trying to delete a shipped `.conf` action with no `.local` override."""
|
||||
|
||||
error_code: str = "action_readonly"
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
super().__init__(
|
||||
f"Action {name!r} is a shipped default (.conf only); only user-created .local files can be deleted."
|
||||
)
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"action_name": self.name}
|
||||
|
||||
|
||||
|
||||
class SetupAlreadyCompleteError(ConflictError):
|
||||
"""Raised when attempting to run setup when it has already been completed."""
|
||||
|
||||
error_code: str = "setup_already_complete"
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__("Setup has already been completed.")
|
||||
|
||||
|
||||
|
||||
class BlocklistSourceNotFoundError(NotFoundError):
|
||||
"""Raised when a blocklist source is not found."""
|
||||
|
||||
error_code: str = "blocklist_source_not_found"
|
||||
|
||||
def __init__(self, source_id: int) -> None:
|
||||
self.source_id = source_id
|
||||
super().__init__(f"Blocklist source not found: {source_id}")
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"source_id": self.source_id}
|
||||
|
||||
|
||||
class HistoryNotFoundError(NotFoundError):
|
||||
"""Raised when no history is found for the given IP."""
|
||||
|
||||
error_code: str = "history_not_found"
|
||||
|
||||
def __init__(self, ip: str) -> None:
|
||||
self.ip = ip
|
||||
super().__init__(f"No history found for IP: {ip}")
|
||||
|
||||
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
|
||||
return {"ip": self.ip}
|
||||
|
||||
Reference in New Issue
Block a user