"""Centralise mutable runtime application state. Runtime state is kept outside of Starlette's raw ``app.state`` storage and exposed through a controlled state manager object. This keeps the FastAPI framework state bag limited to shared infrastructure handles and immutable configuration while still allowing existing code to access runtime values via attribute proxying. RuntimeState is designed for a single-process, single-worker asyncio deployment. Mutations must complete without awaiting across read-modify-write sequences. If BanGUI is ever deployed with multiple workers or across processes, this module must be replaced with a shared backend such as Redis or shared memory. """ from __future__ import annotations import datetime from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any import structlog from starlette.datastructures import State from app.models.config import PendingRecovery from app.models.server import ServerStatus from app.utils.session_cache import InMemorySessionCache, NoOpSessionCache if TYPE_CHECKING: # pragma: no cover from app.config import Settings log: structlog.stdlib.BoundLogger = structlog.get_logger() ActivationRecord = dict[str, datetime.datetime] # Maximum seconds since an activation for a subsequent crash to be # attributed to that activation. _ACTIVATION_CRASH_WINDOW: int = 60 _RUNTIME_ATTRIBUTES: frozenset[str] = frozenset( { "setup_complete_cached", "server_status", "pending_recovery", "last_activation", "runtime_settings", } ) @dataclass class RuntimeState: """Mutable runtime state for the current application instance.""" setup_complete_cached: bool = False server_status: ServerStatus = field(default_factory=lambda: ServerStatus(online=False)) pending_recovery: PendingRecovery | None = None last_activation: ActivationRecord | None = None runtime_settings: Settings | None = None class ApplicationState(State): """Application state wrapper that delegates runtime state access. This allows runtime values to be stored in a dedicated :class:`RuntimeState` instance while preserving the familiar attribute-based ``app.state`` API for the rest of the application. """ def __init__(self, runtime_state: RuntimeState, state: dict[str, Any] | None = None): super().__init__(state) object.__setattr__(self, "_runtime_state", runtime_state) @property def runtime_state(self) -> RuntimeState: """Return the dedicated runtime state manager.""" return object.__getattribute__(self, "_runtime_state") def __getattr__(self, key: str) -> Any: if key in _RUNTIME_ATTRIBUTES: return getattr(self.runtime_state, key) return super().__getattr__(key) def __setattr__(self, key: str, value: Any) -> None: if key in _RUNTIME_ATTRIBUTES: setattr(self.runtime_state, key, value) return super().__setattr__(key, value) def __delattr__(self, key: str) -> None: if key in _RUNTIME_ATTRIBUTES: delattr(self.runtime_state, key) return super().__delattr__(key) def get_runtime_state(app: Any) -> RuntimeState: """Return the runtime state manager for the current FastAPI application.""" state = getattr(app, "state", None) if state is None or not hasattr(state, "runtime_state"): raise AttributeError("Runtime state has not been initialised on the application.") return state.runtime_state def get_app_settings(app: Any) -> Settings: """Return the bootstrap settings loaded at startup.""" settings = getattr(app.state, "settings", None) if settings is None: raise AttributeError("Application settings are not available on the app state.") return settings def get_effective_settings(app: Any) -> Settings: """Return the effective settings for the current application instance.""" runtime_settings = getattr(app.state, "runtime_settings", None) if runtime_settings is not None: return runtime_settings return get_app_settings(app) def set_runtime_settings(app: Any, settings: Settings) -> None: """Store the resolved runtime settings separately from bootstrap config. Also updates the session cache backend if the session cache configuration has changed, replacing it with InMemorySessionCache or NoOpSessionCache as appropriate. Args: app: The FastAPI application instance. settings: The new effective settings. """ runtime_state = get_runtime_state(app) runtime_state.runtime_settings = settings # Update session cache if settings changed cache_enabled = settings.session_cache_enabled and settings.session_cache_ttl_seconds > 0.0 new_cache = InMemorySessionCache() if cache_enabled else NoOpSessionCache() app.state.session_cache = new_cache log.debug("session_cache_updated", cache_type=type(new_cache).__name__) def update_app_settings(app: Any, **overrides: Any) -> None: """Update the current effective settings immutably.""" settings = get_app_settings(app) updated = settings.model_copy(update=overrides) set_runtime_settings(app, updated) def record_activation(app: Any, jail_name: str, at: datetime.datetime | None = None) -> datetime.datetime: """Record a jail activation timestamp in runtime state.""" now = at if at is not None else datetime.datetime.now(tz=datetime.UTC) runtime_state = get_runtime_state(app) runtime_state.last_activation = { "jail_name": jail_name, "at": now, } return now def create_pending_recovery( app: Any, jail_name: str, activated_at: datetime.datetime, detected_at: datetime.datetime | None = None, ) -> None: """Create a pending recovery record in runtime state.""" runtime_state = get_runtime_state(app) runtime_state.pending_recovery = PendingRecovery( jail_name=jail_name, activated_at=activated_at, detected_at=detected_at if detected_at is not None else datetime.datetime.now(tz=datetime.UTC), ) def clear_pending_recovery(app: Any) -> None: """Clear the current pending recovery record.""" get_runtime_state(app).pending_recovery = None def clear_activation_record(app: Any) -> None: """Clear the current activation tracking record.""" get_runtime_state(app).last_activation = None def process_health_probe_result( runtime_state: RuntimeState, status: ServerStatus, now: datetime.datetime | None = None, ) -> None: """Process a new health probe result and update runtime state. This function tracks fail2ban transitions and creates or resolves pending recovery records when the daemon goes offline shortly after a jail activation. Args: runtime_state: The mutable runtime state manager. status: The latest fail2ban server status. now: The current timestamp used for time-based decisions. """ prev_status = getattr(runtime_state, "server_status", ServerStatus(online=False)) runtime_state.server_status = status now = now if now is not None else datetime.datetime.now(tz=datetime.UTC) if status.online and not prev_status.online: log.info("fail2ban_came_online", version=status.version) existing = runtime_state.pending_recovery if existing is not None and not existing.recovered: runtime_state.pending_recovery = PendingRecovery( jail_name=existing.jail_name, activated_at=existing.activated_at, detected_at=existing.detected_at, recovered=True, ) log.info( "pending_recovery_resolved", jail=existing.jail_name, ) elif not status.online and prev_status.online: log.warning("fail2ban_went_offline") last_activation = runtime_state.last_activation if last_activation is not None: activated_at = last_activation["at"] seconds_since = (now - activated_at).total_seconds() if seconds_since <= _ACTIVATION_CRASH_WINDOW: jail_name = last_activation["jail_name"] current = runtime_state.pending_recovery if current is None or current.recovered: runtime_state.pending_recovery = PendingRecovery( jail_name=jail_name, activated_at=activated_at, detected_at=now, ) log.warning( "activation_crash_detected", jail=jail_name, seconds_since_activation=seconds_since, ) log.debug( "health_check_complete", online=status.online, version=status.version, active_jails=status.active_jails, )