"""Shared domain exception classes used across routers and services. Exception Taxonomy ================== All domain exceptions inherit from one of these base categories: - **NotFoundError** (404): Domain entity not found - **BadRequestError** (400): Invalid input, validation failure, invalid identifiers - **ConflictError** (409): State conflict, resource already exists, invalid state transition - **OperationError** (500): Operation failure, write errors - **ServiceUnavailableError** (503): Infrastructure/external service issues - **AuthenticationError** (401): Authentication or authorization failure - **RateLimitError** (429): Rate limit exceeded Service exceptions inherit from the appropriate category, allowing routers to handle categories rather than individual exception types. Exception handlers in main.py register only base category types. Every exception class has: - **error_code**: A machine-readable error code for client-side branching - **get_error_metadata()**: Returns structured metadata for the API response Example: def get_jail(name: str) -> Jail: # Raises JailNotFoundError (subclass of NotFoundError) ... @app.exception_handler(NotFoundError) async def handle_not_found(request, exc): return JSONResponse(status_code=404, content=ErrorResponse( code=exc.error_code, detail=str(exc), metadata=exc.get_error_metadata() ).model_dump()) See Backend-Development.md for the complete exception contract. """ from __future__ import annotations from typing import TYPE_CHECKING if TYPE_CHECKING: from app.models.response import ErrorMetadata # --------------------------------------------------------------------------- # Exception Base Classes (Categories) # --------------------------------------------------------------------------- class DomainError(Exception): """Base class for all domain exceptions. All domain exceptions must: 1. Define an `error_code` class attribute (machine-readable error code) 2. Implement `get_error_metadata()` to return structured error context """ error_code: str = "internal_error" def get_error_metadata(self) -> ErrorMetadata: """Return structured metadata for the API error response. Subclasses should override to expose only safe, relevant metadata. Returns: A dictionary of metadata key-value pairs safe for client consumption. """ return {} class NotFoundError(DomainError): """Raised when a requested domain entity is not found. HTTP 404.""" error_code: str = "not_found" class BadRequestError(DomainError): """Raised for invalid input, validation failures, or invalid identifiers. HTTP 400.""" error_code: str = "invalid_input" class ConflictError(DomainError): """Raised for state conflicts or resource constraints. HTTP 409.""" error_code: str = "conflict" class OperationError(DomainError): """Raised when a domain operation fails (write, update, etc.). HTTP 500.""" error_code: str = "operation_failed" class ServiceUnavailableError(DomainError): """Raised for infrastructure or external service issues. HTTP 503.""" error_code: str = "service_unavailable" class AuthenticationError(DomainError): """Raised for authentication or authorization failures. HTTP 401.""" error_code: str = "authentication_required" class RateLimitError(DomainError): """Raised when a client exceeds rate limits. HTTP 429.""" error_code: str = "rate_limit_exceeded" def __init__(self, message: str, retry_after_seconds: float = 60.0) -> None: """Initialize with a message and optional retry-after time. Args: message: Description of the rate limit violation. retry_after_seconds: Estimated seconds to wait before retrying (default 60). """ self.retry_after_seconds: float = retry_after_seconds super().__init__(message) def get_error_metadata(self) -> ErrorMetadata: return {"retry_after_seconds": self.retry_after_seconds} # --------------------------------------------------------------------------- # Jail-Specific Exceptions # --------------------------------------------------------------------------- class JailNotFoundError(NotFoundError): """Raised when a requested jail name does not exist.""" error_code: str = "jail_not_found" def __init__(self, name: str) -> None: self.name = name super().__init__(f"Jail not found: {name!r}") def get_error_metadata(self) -> ErrorMetadata: return {"jail_name": self.name} class JailOperationError(ConflictError): """Raised when a jail state operation fails (e.g. start/stop already in progress).""" error_code: str = "jail_operation_failed" class ConfigValidationError(BadRequestError): """Raised when config values fail validation before applying.""" error_code: str = "config_validation_failed" class ConfigOperationError(BadRequestError): """Raised when a config payload update or command fails.""" error_code: str = "config_operation_failed" class ConfigDirError(ServiceUnavailableError): """Raised when the fail2ban config directory is missing or inaccessible.""" error_code: str = "config_dir_unavailable" class ConfigFileNotFoundError(NotFoundError): """Raised when a requested config file does not exist.""" error_code: str = "config_file_not_found" def __init__(self, filename: str) -> None: """Initialize with the filename that was not found. Args: filename: The filename that could not be located. """ self.filename = filename super().__init__(f"Config file not found: {filename!r}") def get_error_metadata(self) -> ErrorMetadata: return {"filename": self.filename} class ConfigFileExistsError(ConflictError): """Raised when trying to create a file that already exists.""" error_code: str = "config_file_exists" def __init__(self, filename: str) -> None: """Initialize with the filename that already exists. Args: filename: The filename that conflicts. """ self.filename = filename super().__init__(f"Config file already exists: {filename!r}") def get_error_metadata(self) -> ErrorMetadata: return {"filename": self.filename} class ConfigFileWriteError(OperationError): """Raised when a file cannot be written (permissions, disk full, etc.).""" error_code: str = "config_file_write_failed" class ConfigFileNameError(BadRequestError): """Raised when a supplied filename is invalid or unsafe.""" error_code: str = "config_file_name_invalid" class ServerOperationError(BadRequestError): """Raised when a server control command (e.g. refresh) fails.""" error_code: str = "server_operation_failed" class Fail2BanConnectionError(ServiceUnavailableError): """Raised when the fail2ban socket is unreachable or returns an error.""" error_code: str = "fail2ban_unreachable" def __init__(self, message: str, socket_path: str) -> None: """Initialize with a human-readable message and the socket path. Args: message: Description of the connection problem. socket_path: The fail2ban socket path that was targeted. """ self.socket_path: str = socket_path super().__init__(f"{message} (socket: {socket_path})") def get_error_metadata(self) -> ErrorMetadata: return {"socket_path": self.socket_path} class Fail2BanProtocolError(ServiceUnavailableError): """Raised when the response from fail2ban cannot be parsed.""" error_code: str = "fail2ban_protocol_error" class FilterInvalidRegexError(BadRequestError): """Raised when a regex pattern fails to compile.""" error_code: str = "filter_invalid_regex" def __init__(self, pattern: str, error: str) -> None: """Initialize with the invalid pattern and compile error.""" self.pattern = pattern self.error = error super().__init__(f"Invalid regex {pattern!r}: {error}") def get_error_metadata(self) -> ErrorMetadata: return {"pattern": self.pattern, "error": self.error} class FilterRegexTooLongError(BadRequestError): """Raised when a regex pattern exceeds the maximum length.""" error_code: str = "filter_regex_too_long" def __init__(self, pattern: str, max_length: int) -> None: """Initialize with the pattern and maximum allowed length. Args: pattern: The regex pattern that is too long. max_length: The maximum allowed length. """ self.pattern = pattern self.max_length = max_length self.actual_length = len(pattern) super().__init__( f"Regex pattern exceeds maximum length of {max_length} characters: " f"{self.actual_length} provided" ) def get_error_metadata(self) -> ErrorMetadata: return { "pattern_length": self.actual_length, "max_length": self.max_length, } class FilterRegexTimeoutError(BadRequestError): """Raised when a regex pattern compilation times out (possible ReDoS attack).""" error_code: str = "filter_regex_timeout" def __init__(self, pattern: str, timeout_seconds: int) -> None: """Initialize with the pattern and timeout value. Args: pattern: The regex pattern that timed out. timeout_seconds: The timeout value in seconds. """ self.pattern = pattern self.timeout_seconds = timeout_seconds super().__init__( f"Regex pattern compilation timed out after {timeout_seconds}s " f"(possible ReDoS attack). Pattern is too complex or causes catastrophic backtracking." ) def get_error_metadata(self) -> ErrorMetadata: return {"timeout_seconds": self.timeout_seconds} class JailNotFoundInConfigError(NotFoundError): """Raised when the requested jail name is not defined in any config file.""" error_code: str = "jail_not_in_config" def __init__(self, name: str) -> None: self.name = name super().__init__(f"Jail not found in config: {name!r}") def get_error_metadata(self) -> ErrorMetadata: return {"jail_name": self.name} class ConfigWriteError(OperationError): """Raised when writing a configuration file modification fails.""" error_code: str = "config_write_failed" def __init__(self, message: str) -> None: self.message = message super().__init__(message) def get_error_metadata(self) -> ErrorMetadata: return {"message": self.message} class JailNameError(BadRequestError): """Raised when a jail name contains invalid characters.""" error_code: str = "jail_name_invalid" class JailAlreadyActiveError(ConflictError): """Raised when trying to activate a jail that is already active.""" error_code: str = "jail_already_active" def __init__(self, name: str) -> None: self.name = name super().__init__(f"Jail is already active: {name!r}") def get_error_metadata(self) -> ErrorMetadata: return {"jail_name": self.name} class JailAlreadyInactiveError(ConflictError): """Raised when trying to deactivate a jail that is already inactive.""" error_code: str = "jail_already_inactive" def __init__(self, name: str) -> None: self.name = name super().__init__(f"Jail is already inactive: {name!r}") def get_error_metadata(self) -> ErrorMetadata: return {"jail_name": self.name} class FilterNotFoundError(NotFoundError): """Raised when the requested filter name is not found.""" error_code: str = "filter_not_found" def __init__(self, name: str) -> None: self.name = name super().__init__(f"Filter not found: {name!r}") def get_error_metadata(self) -> ErrorMetadata: return {"filter_name": self.name} class FilterAlreadyExistsError(ConflictError): """Raised when trying to create a filter whose `.conf` or `.local` already exists.""" error_code: str = "filter_already_exists" def __init__(self, name: str) -> None: self.name = name super().__init__(f"Filter already exists: {name!r}") def get_error_metadata(self) -> ErrorMetadata: return {"filter_name": self.name} class FilterNameError(BadRequestError): """Raised when a filter name contains invalid characters.""" error_code: str = "filter_name_invalid" class FilterReadonlyError(ConflictError): """Raised when trying to delete a shipped `.conf` filter with no `.local` override.""" error_code: str = "filter_readonly" def __init__(self, name: str) -> None: self.name = name super().__init__( f"Filter {name!r} is a shipped default (.conf only); only user-created .local files can be deleted." ) def get_error_metadata(self) -> ErrorMetadata: return {"filter_name": self.name} class ActionNotFoundError(NotFoundError): """Raised when the requested action name is not found.""" error_code: str = "action_not_found" def __init__(self, name: str) -> None: self.name = name super().__init__(f"Action not found: {name!r}") def get_error_metadata(self) -> ErrorMetadata: return {"action_name": self.name} class ActionAlreadyExistsError(ConflictError): """Raised when trying to create an action whose `.conf` or `.local` already exists.""" error_code: str = "action_already_exists" def __init__(self, name: str) -> None: self.name = name super().__init__(f"Action already exists: {name!r}") def get_error_metadata(self) -> ErrorMetadata: return {"action_name": self.name} class ActionNameError(BadRequestError): """Raised when an action name contains invalid characters.""" error_code: str = "action_name_invalid" class ActionReadonlyError(ConflictError): """Raised when trying to delete a shipped `.conf` action with no `.local` override.""" error_code: str = "action_readonly" def __init__(self, name: str) -> None: self.name = name super().__init__( f"Action {name!r} is a shipped default (.conf only); only user-created .local files can be deleted." ) def get_error_metadata(self) -> ErrorMetadata: return {"action_name": self.name} class SetupAlreadyCompleteError(ConflictError): """Raised when attempting to run setup when it has already been completed.""" error_code: str = "setup_already_complete" def __init__(self) -> None: super().__init__("Setup has already been completed.") class BlocklistSourceNotFoundError(NotFoundError): """Raised when a blocklist source is not found.""" error_code: str = "blocklist_source_not_found" def __init__(self, source_id: int) -> None: self.source_id = source_id super().__init__(f"Blocklist source not found: {source_id}") def get_error_metadata(self) -> ErrorMetadata: return {"source_id": self.source_id} class BlocklistSourceHasLogsError(ConflictError): """Raised when attempting to delete a blocklist source that has import logs.""" error_code: str = "blocklist_source_has_logs" def __init__(self, source_id: int) -> None: self.source_id = source_id super().__init__( f"Blocklist source {source_id} cannot be deleted because it has import logs. " "Delete the import logs first." ) def get_error_metadata(self) -> ErrorMetadata: return {"source_id": self.source_id} class BlocklistSourceAlreadyExistsError(ConflictError): """Raised when a blocklist source with the same URL already exists.""" error_code: str = "blocklist_source_already_exists" def __init__(self, url: str) -> None: self.url = url super().__init__(f"Blocklist source with URL already exists: {url}") def get_error_metadata(self) -> ErrorMetadata: return {"url": self.url} class HistoryNotFoundError(NotFoundError): """Raised when no history is found for the given IP.""" error_code: str = "history_not_found" def __init__(self, ip: str) -> None: self.ip = ip super().__init__(f"No history found for IP: {ip}") def get_error_metadata(self) -> ErrorMetadata: return {"ip": self.ip}