Move session cache initialization from per-request _build_app_context to startup lifespan handler. The session cache type is now decided once at app startup based on settings, making _build_app_context pure (read-only). Changes: - Move cache initialization logic to new _update_session_cache() in main.py - Call _update_session_cache() during lifespan startup to initialize cache - Remove three if/elif/elif branches mutating state.session_cache from _build_app_context - Add cache swap logic to set_runtime_settings() in runtime_state.py to handle runtime settings changes (e.g., setup wizard updates) - Keep app.state.session_cache initialization in create_app() for test compatibility This ensures: - _build_app_context is pure and doesn't mutate app state on each request - Session cache configuration decisions are centralized at startup - Settings changes during runtime (via setup wizard) also trigger cache swap - Cache initialization logic is isolated in one place Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
247 lines
8.8 KiB
Python
247 lines
8.8 KiB
Python
"""Centralise mutable runtime application state.
|
|
|
|
Runtime state is kept outside of Starlette's raw ``app.state`` storage and
|
|
exposed through a controlled state manager object. This keeps the FastAPI
|
|
framework state bag limited to shared infrastructure handles and immutable
|
|
configuration while still allowing existing code to access runtime values via
|
|
attribute proxying.
|
|
|
|
RuntimeState is designed for a single-process, single-worker asyncio
|
|
deployment. Mutations must complete without awaiting across read-modify-write
|
|
sequences. If BanGUI is ever deployed with multiple workers or across processes,
|
|
this module must be replaced with a shared backend such as Redis or shared
|
|
memory.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
from dataclasses import dataclass, field
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
import structlog
|
|
from starlette.datastructures import State
|
|
|
|
from app.models.config import PendingRecovery
|
|
from app.models.server import ServerStatus
|
|
from app.utils.session_cache import InMemorySessionCache, NoOpSessionCache
|
|
|
|
if TYPE_CHECKING: # pragma: no cover
|
|
from app.config import Settings
|
|
|
|
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
|
|
|
ActivationRecord = dict[str, datetime.datetime]
|
|
|
|
# Maximum seconds since an activation for a subsequent crash to be
|
|
# attributed to that activation.
|
|
_ACTIVATION_CRASH_WINDOW: int = 60
|
|
|
|
_RUNTIME_ATTRIBUTES: frozenset[str] = frozenset(
|
|
{
|
|
"setup_complete_cached",
|
|
"server_status",
|
|
"pending_recovery",
|
|
"last_activation",
|
|
"runtime_settings",
|
|
}
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class RuntimeState:
|
|
"""Mutable runtime state for the current application instance."""
|
|
|
|
setup_complete_cached: bool = False
|
|
server_status: ServerStatus = field(default_factory=lambda: ServerStatus(online=False))
|
|
pending_recovery: PendingRecovery | None = None
|
|
last_activation: ActivationRecord | None = None
|
|
runtime_settings: Settings | None = None
|
|
|
|
|
|
class ApplicationState(State):
|
|
"""Application state wrapper that delegates runtime state access.
|
|
|
|
This allows runtime values to be stored in a dedicated
|
|
:class:`RuntimeState` instance while preserving the familiar attribute-based
|
|
``app.state`` API for the rest of the application.
|
|
"""
|
|
|
|
def __init__(self, runtime_state: RuntimeState, state: dict[str, Any] | None = None):
|
|
super().__init__(state)
|
|
object.__setattr__(self, "_runtime_state", runtime_state)
|
|
|
|
@property
|
|
def runtime_state(self) -> RuntimeState:
|
|
"""Return the dedicated runtime state manager."""
|
|
return object.__getattribute__(self, "_runtime_state")
|
|
|
|
def __getattr__(self, key: str) -> Any:
|
|
if key in _RUNTIME_ATTRIBUTES:
|
|
return getattr(self.runtime_state, key)
|
|
return super().__getattr__(key)
|
|
|
|
def __setattr__(self, key: str, value: Any) -> None:
|
|
if key in _RUNTIME_ATTRIBUTES:
|
|
setattr(self.runtime_state, key, value)
|
|
return
|
|
super().__setattr__(key, value)
|
|
|
|
def __delattr__(self, key: str) -> None:
|
|
if key in _RUNTIME_ATTRIBUTES:
|
|
delattr(self.runtime_state, key)
|
|
return
|
|
super().__delattr__(key)
|
|
|
|
|
|
def get_runtime_state(app: Any) -> RuntimeState:
|
|
"""Return the runtime state manager for the current FastAPI application."""
|
|
state = getattr(app, "state", None)
|
|
if state is None or not hasattr(state, "runtime_state"):
|
|
raise AttributeError("Runtime state has not been initialised on the application.")
|
|
return state.runtime_state
|
|
|
|
|
|
def get_app_settings(app: Any) -> Settings:
|
|
"""Return the bootstrap settings loaded at startup."""
|
|
settings = getattr(app.state, "settings", None)
|
|
if settings is None:
|
|
raise AttributeError("Application settings are not available on the app state.")
|
|
return settings
|
|
|
|
|
|
def get_effective_settings(app: Any) -> Settings:
|
|
"""Return the effective settings for the current application instance."""
|
|
runtime_settings = getattr(app.state, "runtime_settings", None)
|
|
if runtime_settings is not None:
|
|
return runtime_settings
|
|
return get_app_settings(app)
|
|
|
|
|
|
def set_runtime_settings(app: Any, settings: Settings) -> None:
|
|
"""Store the resolved runtime settings separately from bootstrap config.
|
|
|
|
Also updates the session cache backend if the session cache configuration
|
|
has changed, replacing it with InMemorySessionCache or NoOpSessionCache
|
|
as appropriate.
|
|
|
|
Args:
|
|
app: The FastAPI application instance.
|
|
settings: The new effective settings.
|
|
"""
|
|
runtime_state = get_runtime_state(app)
|
|
runtime_state.runtime_settings = settings
|
|
|
|
# Update session cache if settings changed
|
|
cache_enabled = settings.session_cache_enabled and settings.session_cache_ttl_seconds > 0.0
|
|
new_cache = InMemorySessionCache() if cache_enabled else NoOpSessionCache()
|
|
app.state.session_cache = new_cache
|
|
log.debug("session_cache_updated", cache_type=type(new_cache).__name__)
|
|
|
|
|
|
def update_app_settings(app: Any, **overrides: Any) -> None:
|
|
"""Update the current effective settings immutably."""
|
|
settings = get_app_settings(app)
|
|
updated = settings.model_copy(update=overrides)
|
|
set_runtime_settings(app, updated)
|
|
|
|
|
|
def record_activation(app: Any, jail_name: str, at: datetime.datetime | None = None) -> datetime.datetime:
|
|
"""Record a jail activation timestamp in runtime state."""
|
|
now = at if at is not None else datetime.datetime.now(tz=datetime.UTC)
|
|
runtime_state = get_runtime_state(app)
|
|
runtime_state.last_activation = {
|
|
"jail_name": jail_name,
|
|
"at": now,
|
|
}
|
|
return now
|
|
|
|
|
|
def create_pending_recovery(
|
|
app: Any,
|
|
jail_name: str,
|
|
activated_at: datetime.datetime,
|
|
detected_at: datetime.datetime | None = None,
|
|
) -> None:
|
|
"""Create a pending recovery record in runtime state."""
|
|
runtime_state = get_runtime_state(app)
|
|
runtime_state.pending_recovery = PendingRecovery(
|
|
jail_name=jail_name,
|
|
activated_at=activated_at,
|
|
detected_at=detected_at if detected_at is not None else datetime.datetime.now(tz=datetime.UTC),
|
|
)
|
|
|
|
|
|
def clear_pending_recovery(app: Any) -> None:
|
|
"""Clear the current pending recovery record."""
|
|
get_runtime_state(app).pending_recovery = None
|
|
|
|
|
|
def clear_activation_record(app: Any) -> None:
|
|
"""Clear the current activation tracking record."""
|
|
get_runtime_state(app).last_activation = None
|
|
|
|
|
|
def process_health_probe_result(
|
|
runtime_state: RuntimeState,
|
|
status: ServerStatus,
|
|
now: datetime.datetime | None = None,
|
|
) -> None:
|
|
"""Process a new health probe result and update runtime state.
|
|
|
|
This function tracks fail2ban transitions and creates or resolves
|
|
pending recovery records when the daemon goes offline shortly after a
|
|
jail activation.
|
|
|
|
Args:
|
|
runtime_state: The mutable runtime state manager.
|
|
status: The latest fail2ban server status.
|
|
now: The current timestamp used for time-based decisions.
|
|
"""
|
|
prev_status = getattr(runtime_state, "server_status", ServerStatus(online=False))
|
|
runtime_state.server_status = status
|
|
now = now if now is not None else datetime.datetime.now(tz=datetime.UTC)
|
|
|
|
if status.online and not prev_status.online:
|
|
log.info("fail2ban_came_online", version=status.version)
|
|
existing = runtime_state.pending_recovery
|
|
if existing is not None and not existing.recovered:
|
|
runtime_state.pending_recovery = PendingRecovery(
|
|
jail_name=existing.jail_name,
|
|
activated_at=existing.activated_at,
|
|
detected_at=existing.detected_at,
|
|
recovered=True,
|
|
)
|
|
log.info(
|
|
"pending_recovery_resolved",
|
|
jail=existing.jail_name,
|
|
)
|
|
|
|
elif not status.online and prev_status.online:
|
|
log.warning("fail2ban_went_offline")
|
|
last_activation = runtime_state.last_activation
|
|
if last_activation is not None:
|
|
activated_at = last_activation["at"]
|
|
seconds_since = (now - activated_at).total_seconds()
|
|
if seconds_since <= _ACTIVATION_CRASH_WINDOW:
|
|
jail_name = last_activation["jail_name"]
|
|
current = runtime_state.pending_recovery
|
|
if current is None or current.recovered:
|
|
runtime_state.pending_recovery = PendingRecovery(
|
|
jail_name=jail_name,
|
|
activated_at=activated_at,
|
|
detected_at=now,
|
|
)
|
|
log.warning(
|
|
"activation_crash_detected",
|
|
jail=jail_name,
|
|
seconds_since_activation=seconds_since,
|
|
)
|
|
|
|
log.debug(
|
|
"health_check_complete",
|
|
online=status.online,
|
|
version=status.version,
|
|
active_jails=status.active_jails,
|
|
)
|