"""Application configuration loaded from environment variables and .env file. Follows pydantic-settings patterns: all values are prefixed with BANGUI_ and validated at startup via the Settings singleton. """ import ipaddress import os import shlex from pathlib import Path from typing import Literal from pydantic import Field, field_validator from pydantic_settings import BaseSettings, SettingsConfigDict from app.utils.constants import ( DEFAULT_DATABASE_PATH, DEFAULT_FAIL2BAN_SOCKET, DEFAULT_SESSION_DURATION_MINUTES, ) class Settings(BaseSettings): """BanGUI runtime configuration. All fields are loaded from environment variables prefixed with ``BANGUI_`` or from a ``.env`` file located next to the process working directory. The application will raise a :class:`pydantic.ValidationError` on startup if any required field is missing or has an invalid value. """ database_path: str = Field( default=DEFAULT_DATABASE_PATH, description="Filesystem path to the BanGUI SQLite application database.", ) fail2ban_socket: str = Field( default=DEFAULT_FAIL2BAN_SOCKET, description="Path to the fail2ban Unix domain socket.", ) session_secret: str = Field( ..., min_length=32, description=( "Secret key used when generating session tokens. " "Must be at least 32 characters. " "Must be unique and never committed to source control. " "Generate one with: python -c \"import secrets; print(secrets.token_hex(32))\"" ), ) session_secret_previous: str | None = Field( default=None, description=( "Previous session secret for rotation support. " "Set this to the old secret during a rotation to accept tokens signed " "with either the current or previous secret. Tokens valid with the " "previous secret will be re-signed with the current secret. " "After all old tokens have expired, unset this field to disable rotation." ), ) session_duration_minutes: int = Field( default=DEFAULT_SESSION_DURATION_MINUTES, ge=1, description="Number of minutes a session token remains valid after creation.", ) session_cache_enabled: bool = Field( default=False, description=( "Enable the in-memory session validation cache. " "Disable it in multi-worker deployments to avoid stale revoked sessions." ), ) session_cache_ttl_seconds: float = Field( default=10.0, ge=0.0, description=( "How long (seconds) a cached session validation entry remains fresh. " "Ignored when session_cache_enabled is false." ), ) http_request_timeout_seconds: float = Field( default=20.0, ge=0.0, description="Maximum total time in seconds for outbound external HTTP requests.", ) http_connect_timeout_seconds: float = Field( default=5.0, ge=0.0, description="Maximum time in seconds to establish outbound external HTTP connections.", ) http_max_connections: int = Field( default=10, ge=1, description="Maximum number of concurrent outbound HTTP connections.", ) http_keepalive_timeout_seconds: float = Field( default=15.0, ge=0.0, description="How long idle keepalive connections are retained by the HTTP connector.", ) timezone: str = Field( default="UTC", description="IANA timezone name used when displaying timestamps in the UI.", ) session_cookie_httponly: bool = Field( default=True, description=( "Mark the session cookie as HttpOnly so browser scripts cannot access it." ), ) session_cookie_samesite: Literal["lax", "strict", "none"] = Field( default="lax", description=( "SameSite policy for the session cookie. " "Use 'lax', 'strict', or 'none' depending on deployment requirements." ), ) session_cookie_secure: bool = Field( default=True, description=( "Set the session cookie Secure flag when the backend is served over HTTPS. " "Defaults to True for security. Set to False only for local development over HTTP." ), ) cors_allowed_origins: str | list[str] = Field( default_factory=lambda: [ "http://localhost:5173", "http://127.0.0.1:5173", "https://localhost:5173", "https://127.0.0.1:5173", ], description=( "Comma-separated list of allowed CORS origins when the frontend is " "served from a different origin than the backend. " "Defaults to common localhost development origins. " "Override in production with the specific frontend domain." ), ) @field_validator("database_path", mode="after") @classmethod def _validate_database_path(cls, value: str) -> str: """Validate database_path parent directory exists and is writable. Args: value: The database path string. Returns: The validated path string. Raises: ValueError: If parent directory does not exist or is not writable. """ path = Path(value) parent = path.parent if not parent.exists(): raise ValueError( f"Database parent directory does not exist: {parent}\n" f"Hint: Create it with: mkdir -p {parent}" ) if not os.access(parent, os.W_OK): raise ValueError( f"Database parent directory not writable: {parent}\n" f"Hint: Fix with: chmod 755 {parent}" ) return value @field_validator("fail2ban_socket", mode="after") @classmethod def _validate_fail2ban_socket(cls, value: str) -> str: """Validate fail2ban socket exists and is readable. Args: value: The fail2ban socket path string. Returns: The validated path string. Raises: ValueError: If the socket path exists but is not readable. """ path = Path(value) if path.exists() and not os.access(path, os.R_OK): raise ValueError( f"fail2ban socket not readable: {path}\n" f"Hint: Fix with: chmod 644 {path}" ) return value @field_validator("geoip_db_path", mode="after") @classmethod def _validate_geoip_db_path(cls, value: str | None) -> str | None: """Validate geoip_db_path exists if set. Args: value: The GeoIP database path or None. Returns: The validated path or None. Raises: ValueError: If the path is set but the file does not exist. """ if value is None: return value path = Path(value) if not path.exists(): raise ValueError( f"GeoIP database file does not exist: {path}\n" f"Hint: Download from https://dev.maxmind.com/geoip/geolite2-country" ) return value @field_validator("fail2ban_config_dir", mode="after") @classmethod def _validate_fail2ban_config_dir(cls, value: str) -> str: """Validate fail2ban_config_dir exists. Args: value: The fail2ban configuration directory path. Returns: The validated path string. Raises: ValueError: If the directory does not exist. """ path = Path(value) if not path.exists(): raise ValueError( f"fail2ban config directory does not exist: {path}\n" f"Hint: Mount the fail2ban config directory or adjust BANGUI_FAIL2BAN_CONFIG_DIR" ) return value @field_validator("session_secret", mode="after") @classmethod def _validate_session_secret(cls, value: str) -> str: """Validate session_secret is sufficiently long and non-trivial. Args: value: The session secret string. Returns: The validated secret string. Raises: ValueError: If the secret is too short or appears weak. """ if len(value) < 32: raise ValueError( f"session_secret must be at least 32 characters. Got {len(value)}.\n" f"Hint: Generate one with: python -c \"import secrets; print(secrets.token_hex(32))\"" ) weak_indicators = {"password", "secret", "123", "abc", "admin"} value_lower = value.lower() if any(value_lower.startswith(w) for w in weak_indicators): raise ValueError( "session_secret is too weak (found common word).\n" "Hint: Generate one with: python -c \"import secrets; print(secrets.token_hex(32))\"" ) return value @field_validator("cors_allowed_origins", mode="before") @classmethod def _normalize_cors_origins(cls, value: str | list[str] | None) -> list[str]: if value is None: return [] if isinstance(value, str): return [origin.strip() for origin in value.split(",") if origin.strip()] return value log_level: str = Field( default="info", description="Application log level: debug | info | warning | error | critical.", ) log_file: str | None = Field( default="/data/log/bangui.log", description="Optional file path for writing application logs. Set to null to disable file logging.", ) suppress_third_party_logs: bool = Field( default=True, description=( "When true, sets APScheduler and aiosqlite loggers to WARNING level. " "Set to false to allow third-party libraries to emit DEBUG/INFO logs." ), ) geoip_db_path: str | None = Field( default=None, description=( "Optional path to a MaxMind GeoLite2-Country .mmdb file. " "When set, it is used as the primary resolver for IP geolocation. " "The ip-api.com HTTP API is only used as a fallback when the MMDB is unavailable or returns no result." ), ) geoip_allow_http_fallback: bool = Field( default=False, description=( "Allow fallback to ip-api.com HTTP API when the MaxMind database is unavailable. " "WARNING: Enabling this sends unencrypted IP addresses over HTTP. " "Only use this flag when the MMDB cannot be mounted and you understand the security implications. " "Default is False (only use local MMDB, fail if unavailable)." ), ) fail2ban_config_dir: str = Field( default="/config/fail2ban", description=( "Path to the fail2ban configuration directory. " "Must contain subdirectories jail.d/, filter.d/, and action.d/. " "Used for listing, viewing, and editing configuration files through the web UI." ), ) allowed_log_dirs: list[str] = Field( default_factory=lambda: ["/var/log", "/config/log"], description=( "List of allowed directory prefixes for jail log paths. " "Any log path added must resolve to a path within one of these directories. " "Use absolute paths. Symlinks are resolved before validation." ), ) fail2ban_start_command: str = Field( default="fail2ban-client start", description=( "Shell command used to start (not reload) the fail2ban daemon during " "recovery rollback. Split by whitespace to build the argument list — " "no shell interpretation is performed. " "Example: 'systemctl start fail2ban' or 'fail2ban-client start'." ), ) enable_docs: bool = Field( default=False, description=( "Enable FastAPI interactive API documentation at /api/docs (Swagger UI) " "and /api/redoc (ReDoc). Should be true only in development environments. " "In production, leave unset (defaults to false) to avoid exposing API schema." ), ) trusted_proxies: str | list[str] = Field( default_factory=list, description=( "Comma-separated list of trusted reverse proxy IP addresses or CIDR ranges. " "Only requests from these IPs/ranges are allowed to set X-Forwarded-For and X-Real-IP headers. " "Examples: '192.168.1.1' or '10.0.0.0/8' or '192.168.1.1,10.0.0.0/8'. " "Leave empty to disable proxy header forwarding (default). " "This is critical for correct client IP extraction behind reverse proxies like nginx." ), ) @field_validator("trusted_proxies", mode="before") @classmethod def _normalize_trusted_proxies(cls, value: str | list[str] | None) -> list[str]: """Normalize trusted_proxies from comma-separated string to list. Args: value: A comma-separated string or list of trusted proxy IPs/CIDRs. Returns: A list of normalized proxy IP/CIDR strings. """ if value is None: return [] if isinstance(value, str): return [proxy.strip() for proxy in value.split(",") if proxy.strip()] return value @field_validator("trusted_proxies", mode="after") @classmethod def _validate_trusted_proxies(cls, value: list[str]) -> list[str]: """Validate trusted_proxies as valid IPs or CIDR ranges. Args: value: A list of proxy IP addresses or CIDR ranges. Returns: The validated list. Raises: ValueError: If any item is not a valid IP address or CIDR range. """ for proxy in value: try: # Try to parse as a CIDR network first ipaddress.ip_network(proxy, strict=False) except ValueError: try: # Fall back to parsing as a single IP address ipaddress.ip_address(proxy) except ValueError as exc: raise ValueError( f"Invalid IP address or CIDR range: {proxy!r}. " f"Expected format: '192.168.1.1' or '10.0.0.0/8'" ) from exc return value @field_validator("fail2ban_start_command", mode="after") @classmethod def _validate_fail2ban_start_command(cls, value: str) -> str: """Validate fail2ban_start_command by attempting to parse it with shlex. Ensures the command can be split into arguments without shell interpretation. Raises ValueError if the command contains mismatched quotes. Args: value: The fail2ban start command string. Returns: The validated command string. Raises: ValueError: If the command contains mismatched quotes. """ try: shlex.split(value) except ValueError as e: raise ValueError( f"fail2ban_start_command contains mismatched quotes or is otherwise " f"unparseable: {value!r} — {e}" ) from e return value external_logging_enabled: bool = Field( default=False, description=( "Enable sending logs to an external centralized logging platform. " "When disabled (default), logs are written to stdout only. " "When enabled, set external_logging_provider and provider-specific settings." ), ) external_logging_provider: Literal["datadog", "papertrail", "elasticsearch"] | None = Field( default=None, description=( "External logging platform provider. " "Set to 'datadog', 'papertrail', or 'elasticsearch'. " "Only used when external_logging_enabled is true." ), ) external_logging_buffer_size: int = Field( default=1000, ge=10, description=( "Maximum number of log records to buffer in memory before dropping oldest logs. " "Prevents unbounded memory growth if the external system is temporarily unavailable." ), ) external_logging_flush_interval_seconds: float = Field( default=5.0, gt=0.0, description=( "Maximum time in seconds to buffer logs before sending to the external system. " "Logs are sent earlier if the batch size is reached." ), ) external_log_required: bool = Field( default=False, description=( "When enabled and external logging is configured, startup aborts if the " "external log handler fails to initialize. When disabled (default), a failed " "handler is treated as a warning and the application continues without external " "logging. Set to true in production environments where logs must reach the " "monitoring system." ), ) datadog_api_key: str | None = Field( default=None, description=( "Datadog API key for sending logs. Required when external_logging_provider is 'datadog'. " "Obtain from Datadog organization settings." ), ) datadog_site: str = Field( default="datadoghq.com", description=( "Datadog site: 'datadoghq.com' for US or 'datadoghq.eu' for EU. " "Only used when external_logging_provider is 'datadog'." ), ) datadog_batch_size: int = Field( default=10, ge=1, description=( "Number of log records to batch before sending to Datadog. " "Smaller batches send logs faster; larger batches are more efficient." ), ) papertrail_host: str | None = Field( default=None, description=( "Papertrail host address (e.g., 'logs1.papertrailapp.com'). " "Required when external_logging_provider is 'papertrail'." ), ) papertrail_port: int | None = Field( default=None, ge=1, le=65535, description=( "Papertrail port number. Required when external_logging_provider is 'papertrail'. " "Typically 12345 or in range 10000-32768." ), ) papertrail_program_name: str = Field( default="bangui", description=( "Program name to include in Syslog messages sent to Papertrail. " "Useful for filtering logs by program in Papertrail UI." ), ) elasticsearch_hosts: str | list[str] = Field( default_factory=list, description=( "Elasticsearch host addresses. Can be comma-separated string or list. " "Examples: 'http://elasticsearch:9200' or 'http://es1:9200,http://es2:9200'. " "Required when external_logging_provider is 'elasticsearch'." ), ) elasticsearch_index_prefix: str = Field( default="bangui", description=( "Prefix for Elasticsearch indices where logs are stored. " "Final index names will be '{prefix}-{date}' or similar." ), ) elasticsearch_batch_size: int = Field( default=10, ge=1, description=( "Number of log documents to batch before sending to Elasticsearch. " "Larger batches are more efficient but introduce slight latency." ), ) # Rate limit configuration (per IP) rate_limit_bans_per_minute: int = Field( default=100, ge=1, description="Max ban/unban requests per IP per minute.", ) rate_limit_blocklist_import_per_hour: int = Field( default=10, ge=1, description="Max blocklist import requests per IP per hour.", ) rate_limit_config_update_per_minute: int = Field( default=50, ge=1, description="Max config update requests per IP per minute.", ) # ------------------------------------------------------------------------- # Pagination & display limits (configurable per deployment) # ------------------------------------------------------------------------- max_page_size: int = Field( default=500, ge=1, le=10000, description=( "Maximum number of records returned per paginated API response. " "Individual endpoints may further limit this value. " "Must be between 1 and 10000." ), ) blocklist_preview_max_lines: int = Field( default=100, ge=1, description=( "Maximum number of IP lines returned in a blocklist source preview. " "Must be at least 1." ), ) history_retention_days: int = Field( default=90, ge=1, description=( "Number of days historical ban records are retained before being " "archived or purged by the cleanup task. Must be at least 1." ), ) @field_validator("elasticsearch_hosts", mode="before") @classmethod def _normalize_elasticsearch_hosts(cls, value: str | list[str] | None) -> list[str]: """Normalize elasticsearch_hosts from comma-separated string to list. Args: value: A comma-separated string or list of host URLs. Returns: A list of normalized host URLs. """ if value is None or (isinstance(value, list) and len(value) == 0): return [] if isinstance(value, str): return [host.strip() for host in value.split(",") if host.strip()] return value model_config = SettingsConfigDict( env_prefix="BANGUI_", env_file=".env", env_file_encoding="utf-8", case_sensitive=False, extra="ignore", ) def get_settings() -> Settings: """Return a fresh :class:`Settings` instance loaded from the environment. Returns: A validated :class:`Settings` object. Raises :class:`pydantic.ValidationError` if required keys are absent or values fail validation. """ return Settings()