New exceptions: DatabaseBusyError, DatabasePermissionDeniedError, DatabasePathInvalidError, DatabaseCorruptedError, DatabaseUnavailableError. open_db creates parent directory if missing. Catches all aiosqlite errors and maps to specific exception types. get_db retries up to 3x on locked database with backoff. Propagates specific exceptions instead of generic HTTPException. Tests for all new error types and retry behavior.
597 lines
19 KiB
Python
597 lines
19 KiB
Python
"""Shared domain exception classes used across routers and services.
|
|
|
|
Exception Taxonomy
|
|
==================
|
|
|
|
All domain exceptions inherit from one of these base categories:
|
|
|
|
- **NotFoundError** (404): Domain entity not found
|
|
- **BadRequestError** (400): Invalid input, validation failure, invalid identifiers
|
|
- **ConflictError** (409): State conflict, resource already exists, invalid state transition
|
|
- **OperationError** (500): Operation failure, write errors
|
|
- **ServiceUnavailableError** (503): Infrastructure/external service issues
|
|
- **AuthenticationError** (401): Authentication or authorization failure
|
|
- **RateLimitError** (429): Rate limit exceeded
|
|
|
|
Service exceptions inherit from the appropriate category, allowing routers to
|
|
handle categories rather than individual exception types. Exception handlers in
|
|
main.py register only base category types.
|
|
|
|
Every exception class has:
|
|
- **error_code**: A machine-readable error code for client-side branching
|
|
- **get_error_metadata()**: Returns structured metadata for the API response
|
|
|
|
Example:
|
|
def get_jail(name: str) -> Jail:
|
|
# Raises JailNotFoundError (subclass of NotFoundError)
|
|
...
|
|
|
|
@app.exception_handler(NotFoundError)
|
|
async def handle_not_found(request, exc):
|
|
return JSONResponse(status_code=404, content=ErrorResponse(
|
|
code=exc.error_code,
|
|
detail=str(exc),
|
|
metadata=exc.get_error_metadata()
|
|
).model_dump())
|
|
|
|
See Backend-Development.md for the complete exception contract.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING
|
|
|
|
from app.utils.display_sanitizer import sanitize_for_display
|
|
|
|
if TYPE_CHECKING:
|
|
from app.models.response import ErrorMetadata
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Exception Base Classes (Categories)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class DomainError(Exception):
|
|
"""Base class for all domain exceptions.
|
|
|
|
All domain exceptions must:
|
|
1. Define an `error_code` class attribute (machine-readable error code)
|
|
2. Implement `get_error_metadata()` to return structured error context
|
|
"""
|
|
|
|
error_code: str = "internal_error"
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
"""Return structured metadata for the API error response.
|
|
|
|
Subclasses should override to expose only safe, relevant metadata.
|
|
|
|
Returns:
|
|
A dictionary of metadata key-value pairs safe for client consumption.
|
|
"""
|
|
return {}
|
|
|
|
|
|
class NotFoundError(DomainError):
|
|
"""Raised when a requested domain entity is not found. HTTP 404."""
|
|
|
|
error_code: str = "not_found"
|
|
|
|
|
|
class BadRequestError(DomainError):
|
|
"""Raised for invalid input, validation failures, or invalid identifiers. HTTP 400."""
|
|
|
|
error_code: str = "invalid_input"
|
|
|
|
|
|
class ConflictError(DomainError):
|
|
"""Raised for state conflicts or resource constraints. HTTP 409."""
|
|
|
|
error_code: str = "conflict"
|
|
|
|
|
|
class OperationError(DomainError):
|
|
"""Raised when a domain operation fails (write, update, etc.). HTTP 500."""
|
|
|
|
error_code: str = "operation_failed"
|
|
|
|
|
|
class ServiceUnavailableError(DomainError):
|
|
"""Raised for infrastructure or external service issues. HTTP 503."""
|
|
|
|
error_code: str = "service_unavailable"
|
|
|
|
|
|
class AuthenticationError(DomainError):
|
|
"""Raised for authentication or authorization failures. HTTP 401."""
|
|
|
|
error_code: str = "authentication_required"
|
|
|
|
|
|
class RateLimitError(DomainError):
|
|
"""Raised when a client exceeds rate limits. HTTP 429."""
|
|
|
|
error_code: str = "rate_limit_exceeded"
|
|
|
|
def __init__(self, message: str, retry_after_seconds: float = 60.0) -> None:
|
|
"""Initialize with a message and optional retry-after time.
|
|
|
|
Args:
|
|
message: Description of the rate limit violation.
|
|
retry_after_seconds: Estimated seconds to wait before retrying (default 60).
|
|
"""
|
|
self.retry_after_seconds: float = retry_after_seconds
|
|
super().__init__(message)
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"retry_after_seconds": self.retry_after_seconds}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Jail-Specific Exceptions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class JailNotFoundError(NotFoundError):
|
|
"""Raised when a requested jail name does not exist."""
|
|
|
|
error_code: str = "jail_not_found"
|
|
|
|
def __init__(self, name: str) -> None:
|
|
self.name = name
|
|
super().__init__(f"Jail not found: {sanitize_for_display(name)!r}")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"jail_name": self.name}
|
|
|
|
|
|
class JailOperationError(ConflictError):
|
|
"""Raised when a jail state operation fails (e.g. start/stop already in progress)."""
|
|
|
|
error_code: str = "jail_operation_failed"
|
|
|
|
|
|
class ConfigValidationError(BadRequestError):
|
|
"""Raised when config values fail validation before applying."""
|
|
|
|
error_code: str = "config_validation_failed"
|
|
|
|
|
|
class ConfigOperationError(BadRequestError):
|
|
"""Raised when a config payload update or command fails."""
|
|
|
|
error_code: str = "config_operation_failed"
|
|
|
|
|
|
class ConfigDirError(ServiceUnavailableError):
|
|
"""Raised when the fail2ban config directory is missing or inaccessible."""
|
|
|
|
error_code: str = "config_dir_unavailable"
|
|
|
|
|
|
class ConfigFileNotFoundError(NotFoundError):
|
|
"""Raised when a requested config file does not exist."""
|
|
|
|
error_code: str = "config_file_not_found"
|
|
|
|
def __init__(self, filename: str) -> None:
|
|
"""Initialize with the filename that was not found.
|
|
|
|
Args:
|
|
filename: The filename that could not be located.
|
|
"""
|
|
self.filename = filename
|
|
super().__init__(f"Config file not found: {filename!r}")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"filename": self.filename}
|
|
|
|
|
|
class ConfigFileExistsError(ConflictError):
|
|
"""Raised when trying to create a file that already exists."""
|
|
|
|
error_code: str = "config_file_exists"
|
|
|
|
def __init__(self, filename: str) -> None:
|
|
"""Initialize with the filename that already exists.
|
|
|
|
Args:
|
|
filename: The filename that conflicts.
|
|
"""
|
|
self.filename = filename
|
|
super().__init__(f"Config file already exists: {filename!r}")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"filename": self.filename}
|
|
|
|
|
|
class ConfigFileWriteError(OperationError):
|
|
"""Raised when a file cannot be written (permissions, disk full, etc.)."""
|
|
|
|
error_code: str = "config_file_write_failed"
|
|
|
|
|
|
class ConfigFileNameError(BadRequestError):
|
|
"""Raised when a supplied filename is invalid or unsafe."""
|
|
|
|
error_code: str = "config_file_name_invalid"
|
|
|
|
|
|
class ServerOperationError(BadRequestError):
|
|
"""Raised when a server control command (e.g. refresh) fails."""
|
|
|
|
error_code: str = "server_operation_failed"
|
|
|
|
|
|
class Fail2BanConnectionError(ServiceUnavailableError):
|
|
"""Raised when the fail2ban socket is unreachable or returns an error."""
|
|
|
|
error_code: str = "fail2ban_unreachable"
|
|
|
|
def __init__(self, message: str, socket_path: str) -> None:
|
|
"""Initialize with a human-readable message and the socket path.
|
|
|
|
Args:
|
|
message: Description of the connection problem.
|
|
socket_path: The fail2ban socket path that was targeted.
|
|
"""
|
|
self.socket_path: str = socket_path
|
|
super().__init__(f"{message} (socket: {socket_path})")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"socket_path": self.socket_path}
|
|
|
|
|
|
class Fail2BanProtocolError(ServiceUnavailableError):
|
|
"""Raised when the response from fail2ban cannot be parsed."""
|
|
|
|
error_code: str = "fail2ban_protocol_error"
|
|
|
|
|
|
class FilterInvalidRegexError(BadRequestError):
|
|
"""Raised when a regex pattern fails to compile."""
|
|
|
|
error_code: str = "filter_invalid_regex"
|
|
|
|
def __init__(self, pattern: str, error: str) -> None:
|
|
"""Initialize with the invalid pattern and compile error."""
|
|
self.pattern = pattern
|
|
self.error = error
|
|
super().__init__(f"Invalid regex {pattern!r}: {error}")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"pattern": self.pattern, "error": self.error}
|
|
|
|
|
|
class FilterRegexTooLongError(BadRequestError):
|
|
"""Raised when a regex pattern exceeds the maximum length."""
|
|
|
|
error_code: str = "filter_regex_too_long"
|
|
|
|
def __init__(self, pattern: str, max_length: int) -> None:
|
|
"""Initialize with the pattern and maximum allowed length.
|
|
|
|
Args:
|
|
pattern: The regex pattern that is too long.
|
|
max_length: The maximum allowed length.
|
|
"""
|
|
self.pattern = pattern
|
|
self.max_length = max_length
|
|
self.actual_length = len(pattern)
|
|
super().__init__(
|
|
f"Regex pattern exceeds maximum length of {max_length} characters: {self.actual_length} provided"
|
|
)
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {
|
|
"pattern_length": self.actual_length,
|
|
"max_length": self.max_length,
|
|
}
|
|
|
|
|
|
class FilterRegexTimeoutError(BadRequestError):
|
|
"""Raised when a regex pattern compilation times out (possible ReDoS attack)."""
|
|
|
|
error_code: str = "filter_regex_timeout"
|
|
|
|
def __init__(self, pattern: str, timeout_seconds: int) -> None:
|
|
"""Initialize with the pattern and timeout value.
|
|
|
|
Args:
|
|
pattern: The regex pattern that timed out.
|
|
timeout_seconds: The timeout value in seconds.
|
|
"""
|
|
self.pattern = pattern
|
|
self.timeout_seconds = timeout_seconds
|
|
super().__init__(
|
|
f"Regex pattern compilation timed out after {timeout_seconds}s "
|
|
f"(possible ReDoS attack). Pattern is too complex or causes catastrophic backtracking."
|
|
)
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"timeout_seconds": self.timeout_seconds}
|
|
|
|
|
|
class JailNotFoundInConfigError(NotFoundError):
|
|
"""Raised when the requested jail name is not defined in any config file."""
|
|
|
|
error_code: str = "jail_not_in_config"
|
|
|
|
def __init__(self, name: str) -> None:
|
|
self.name = name
|
|
super().__init__(f"Jail not found in config: {sanitize_for_display(name)!r}")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"jail_name": self.name}
|
|
|
|
|
|
class ConfigWriteError(OperationError):
|
|
"""Raised when writing a configuration file modification fails."""
|
|
|
|
error_code: str = "config_write_failed"
|
|
|
|
def __init__(self, message: str) -> None:
|
|
self.message = message
|
|
super().__init__(message)
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"message": self.message}
|
|
|
|
|
|
class JailNameError(BadRequestError):
|
|
"""Raised when a jail name contains invalid characters."""
|
|
|
|
error_code: str = "jail_name_invalid"
|
|
|
|
|
|
class JailAlreadyActiveError(ConflictError):
|
|
"""Raised when trying to activate a jail that is already active."""
|
|
|
|
error_code: str = "jail_already_active"
|
|
|
|
def __init__(self, name: str) -> None:
|
|
self.name = name
|
|
super().__init__(f"Jail is already active: {sanitize_for_display(name)!r}")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"jail_name": self.name}
|
|
|
|
|
|
class JailAlreadyInactiveError(ConflictError):
|
|
"""Raised when trying to deactivate a jail that is already inactive."""
|
|
|
|
error_code: str = "jail_already_inactive"
|
|
|
|
def __init__(self, name: str) -> None:
|
|
self.name = name
|
|
super().__init__(f"Jail is already inactive: {sanitize_for_display(name)!r}")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"jail_name": self.name}
|
|
|
|
|
|
class FilterNotFoundError(NotFoundError):
|
|
"""Raised when the requested filter name is not found."""
|
|
|
|
error_code: str = "filter_not_found"
|
|
|
|
def __init__(self, name: str) -> None:
|
|
self.name = name
|
|
super().__init__(f"Filter not found: {name!r}")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"filter_name": self.name}
|
|
|
|
|
|
class FilterAlreadyExistsError(ConflictError):
|
|
"""Raised when trying to create a filter whose `.conf` or `.local` already exists."""
|
|
|
|
error_code: str = "filter_already_exists"
|
|
|
|
def __init__(self, name: str) -> None:
|
|
self.name = name
|
|
super().__init__(f"Filter already exists: {name!r}")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"filter_name": self.name}
|
|
|
|
|
|
class FilterNameError(BadRequestError):
|
|
"""Raised when a filter name contains invalid characters."""
|
|
|
|
error_code: str = "filter_name_invalid"
|
|
|
|
|
|
class FilterReadonlyError(ConflictError):
|
|
"""Raised when trying to delete a shipped `.conf` filter with no `.local` override."""
|
|
|
|
error_code: str = "filter_readonly"
|
|
|
|
def __init__(self, name: str) -> None:
|
|
self.name = name
|
|
super().__init__(
|
|
f"Filter {name!r} is a shipped default (.conf only); only user-created .local files can be deleted."
|
|
)
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"filter_name": self.name}
|
|
|
|
|
|
class ActionNotFoundError(NotFoundError):
|
|
"""Raised when the requested action name is not found."""
|
|
|
|
error_code: str = "action_not_found"
|
|
|
|
def __init__(self, name: str) -> None:
|
|
self.name = name
|
|
super().__init__(f"Action not found: {name!r}")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"action_name": self.name}
|
|
|
|
|
|
class ActionAlreadyExistsError(ConflictError):
|
|
"""Raised when trying to create an action whose `.conf` or `.local` already exists."""
|
|
|
|
error_code: str = "action_already_exists"
|
|
|
|
def __init__(self, name: str) -> None:
|
|
self.name = name
|
|
super().__init__(f"Action already exists: {name!r}")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"action_name": self.name}
|
|
|
|
|
|
class ActionNameError(BadRequestError):
|
|
"""Raised when an action name contains invalid characters."""
|
|
|
|
error_code: str = "action_name_invalid"
|
|
|
|
|
|
class ActionReadonlyError(ConflictError):
|
|
"""Raised when trying to delete a shipped `.conf` action with no `.local` override."""
|
|
|
|
error_code: str = "action_readonly"
|
|
|
|
def __init__(self, name: str) -> None:
|
|
self.name = name
|
|
super().__init__(
|
|
f"Action {name!r} is a shipped default (.conf only); only user-created .local files can be deleted."
|
|
)
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"action_name": self.name}
|
|
|
|
|
|
class SetupAlreadyCompleteError(ConflictError):
|
|
"""Raised when attempting to run setup when it has already been completed."""
|
|
|
|
error_code: str = "setup_already_complete"
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__("Setup has already been completed.")
|
|
|
|
|
|
class DatabaseBusyError(ServiceUnavailableError):
|
|
"""Raised when the SQLite database is locked or busy after all retries."""
|
|
|
|
error_code: str = "database_busy"
|
|
|
|
def __init__(self, database_path: str, retries: int) -> None:
|
|
self.database_path = database_path
|
|
self.retries = retries
|
|
super().__init__(
|
|
f"Database is temporarily busy after {retries} retries."
|
|
)
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"database_path": self.database_path, "retries": self.retries}
|
|
|
|
|
|
class DatabasePermissionDeniedError(ServiceUnavailableError):
|
|
"""Raised when the database file cannot be accessed due to insufficient permissions."""
|
|
|
|
error_code: str = "database_permission_denied"
|
|
|
|
def __init__(self, database_path: str) -> None:
|
|
self.database_path = database_path
|
|
super().__init__("Insufficient permissions to access the database file.")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"database_path": self.database_path}
|
|
|
|
|
|
class DatabasePathInvalidError(ServiceUnavailableError):
|
|
"""Raised when the database directory does not exist or the path is invalid."""
|
|
|
|
error_code: str = "database_path_invalid"
|
|
|
|
def __init__(self, database_path: str) -> None:
|
|
self.database_path = database_path
|
|
super().__init__("Database directory does not exist or path is invalid.")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"database_path": self.database_path}
|
|
|
|
|
|
class DatabaseCorruptedError(ServiceUnavailableError):
|
|
"""Raised when the database file is corrupted."""
|
|
|
|
error_code: str = "database_corrupted"
|
|
|
|
def __init__(self, database_path: str) -> None:
|
|
self.database_path = database_path
|
|
super().__init__("Database file is corrupted.")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"database_path": self.database_path}
|
|
|
|
|
|
class DatabaseUnavailableError(ServiceUnavailableError):
|
|
"""Raised for any other unexpected database error."""
|
|
|
|
error_code: str = "database_unavailable"
|
|
|
|
def __init__(self, database_path: str, error: str) -> None:
|
|
self.database_path = database_path
|
|
self.error = error
|
|
super().__init__(f"Database is not available: {error}")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"database_path": self.database_path, "error": self.error}
|
|
|
|
|
|
class BlocklistSourceNotFoundError(NotFoundError):
|
|
"""Raised when a blocklist source is not found."""
|
|
|
|
error_code: str = "blocklist_source_not_found"
|
|
|
|
def __init__(self, source_id: int) -> None:
|
|
self.source_id = source_id
|
|
super().__init__(f"Blocklist source not found: {source_id}")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"source_id": self.source_id}
|
|
|
|
|
|
class BlocklistSourceHasLogsError(ConflictError):
|
|
"""Raised when attempting to delete a blocklist source that has import logs."""
|
|
|
|
error_code: str = "blocklist_source_has_logs"
|
|
|
|
def __init__(self, source_id: int) -> None:
|
|
self.source_id = source_id
|
|
super().__init__(
|
|
f"Blocklist source {source_id} cannot be deleted because it has import logs. Delete the import logs first."
|
|
)
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"source_id": self.source_id}
|
|
|
|
|
|
class BlocklistSourceAlreadyExistsError(ConflictError):
|
|
"""Raised when a blocklist source with the same URL already exists."""
|
|
|
|
error_code: str = "blocklist_source_already_exists"
|
|
|
|
def __init__(self, url: str) -> None:
|
|
self.url = url
|
|
super().__init__(f"Blocklist source with URL already exists: {url}")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"url": self.url}
|
|
|
|
|
|
class HistoryNotFoundError(NotFoundError):
|
|
"""Raised when no history is found for the given IP."""
|
|
|
|
error_code: str = "history_not_found"
|
|
|
|
def __init__(self, ip: str) -> None:
|
|
self.ip = ip
|
|
super().__init__(f"No history found for IP: {ip}")
|
|
|
|
def get_error_metadata(self) -> ErrorMetadata:
|
|
return {"ip": self.ip}
|