Addresses: Backend session cache not cluster-safe (multi-worker issue) Problem: - Session cache is process-local (InMemorySessionCache) - Multi-worker deployments (uvicorn --workers N) create separate processes - Each process has its own independent session cache - Sessions cached in Worker A are invisible to Workers B, C, D - Users randomly logged out when requests land on different workers - Also affects RuntimeState, rate limiter, and background jobs Solution (Option A - Strict single-worker enforcement): - Enhance startup validation with clearer error messages - Update error messages to explain the problem and how to fix it - Document single-worker requirement prominently in Docker configs - Update module docstrings to clarify constraints Changes: 1. app/startup.py: - Enhanced _check_single_worker_mode() error message with troubleshooting - Enhanced _stage_check_worker_mode_and_acquire_lock() error message - Removed unused import 2. app/utils/session_cache.py: - Updated module docstring to explain constraints more clearly - Added references to deployment documentation - Clarified multi-worker solution for future implementation 3. app/utils/runtime_state.py: - Updated module docstring with deployment constraint references - Aligned messaging with session_cache.py 4. Docker/Dockerfile.backend: - Added comprehensive comments about single-worker requirement - Explained impact in multi-worker deployments - Referenced deployment constraints documentation 5. Docker/docker-compose.yml, compose.prod.yml, compose.debug.yml: - Added documentation comments about BANGUI_WORKERS constraint - Explained why single-worker is required 6. backend/tests/test_startup_integration.py: - Fixed test unpacking to match function return signature (3 values, not 2) This ensures multi-worker deployments fail loudly at startup with clear guidance on what went wrong and how to fix it. The database-backed scheduler lock provides defense-in-depth for container orchestration scenarios. For future multi-worker support, implement: - Redis or database-backed session cache - Shared RuntimeState coordination - Distributed APScheduler backend Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
315 lines
12 KiB
Python
315 lines
12 KiB
Python
"""Centralise mutable runtime application state.
|
|
|
|
Runtime state is kept outside of Starlette's raw ``app.state`` storage and
|
|
exposed through a controlled state manager object. This keeps the FastAPI
|
|
framework state bag limited to shared infrastructure handles and immutable
|
|
configuration while still allowing existing code to access runtime values via
|
|
attribute proxying.
|
|
|
|
⚠️ SINGLE-PROCESS CONSTRAINT
|
|
==============================
|
|
|
|
RuntimeState is designed for a single-process, single-worker asyncio deployment.
|
|
This means:
|
|
- Each process has its own independent copy of all runtime state.
|
|
- Changes to runtime_state in one process are NOT visible to other processes.
|
|
- Mutations must complete without awaiting across read-modify-write sequences
|
|
(cooperative scheduling within a single event loop is safe).
|
|
|
|
IMPACT IN MULTI-WORKER DEPLOYMENTS:
|
|
- Logout processed by worker A clears the session from A's in-memory cache,
|
|
but worker B still has that session in its own cache and will accept it.
|
|
- Health status updates (server_status) received by worker A are invisible
|
|
to worker B's dashboard responses — each worker reports stale data.
|
|
- fail2ban activation/recovery tracking (pending_recovery, last_activation)
|
|
is per-worker and unreliable across processes.
|
|
|
|
SINGLE-WORKER ENFORCEMENT:
|
|
BanGUI enforces single-worker mode at startup:
|
|
1. Environment variable check: BANGUI_WORKERS must be 1 or unset
|
|
2. Database lock: Only one instance can run the scheduler at a time
|
|
3. Startup validation: Fails loudly if multi-worker scenario is detected
|
|
|
|
See Docs/Architekture.md § Deployment Constraints for full details.
|
|
|
|
MULTI-WORKER SOLUTION (Future):
|
|
To deploy BanGUI with multiple workers in the future (e.g., via gunicorn -w 4):
|
|
1. Replace RuntimeState with a shared store (Redis, shared memory, database)
|
|
2. Replace InMemorySessionCache with a shared backend (Redis, database)
|
|
3. Replace APScheduler with a distributed scheduler backend
|
|
4. Ensure all workers use the same backend for coordination
|
|
|
|
CURRENT STATUS:
|
|
For now, BanGUI is deployed as single-worker only. This constraint is
|
|
acceptable and keeps the implementation simple. The database-backed scheduler
|
|
lock ensures only one instance runs background jobs, even in container
|
|
orchestration scenarios where multiple instances may start.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import datetime
|
|
from dataclasses import dataclass, field
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
import structlog
|
|
from starlette.datastructures import State
|
|
|
|
from app.models.config import PendingRecovery
|
|
from app.models.server import ServerStatus
|
|
from app.utils.session_cache import InMemorySessionCache, NoOpSessionCache
|
|
|
|
if TYPE_CHECKING: # pragma: no cover
|
|
from app.config import Settings
|
|
|
|
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
|
|
|
ActivationRecord = dict[str, datetime.datetime]
|
|
|
|
# Maximum seconds since an activation for a subsequent crash to be
|
|
# attributed to that activation.
|
|
_ACTIVATION_CRASH_WINDOW: int = 60
|
|
|
|
_RUNTIME_ATTRIBUTES: frozenset[str] = frozenset(
|
|
{
|
|
"setup_complete_cached",
|
|
"server_status",
|
|
"pending_recovery",
|
|
"last_activation",
|
|
"runtime_settings",
|
|
"jail_service_state",
|
|
}
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class JailServiceState:
|
|
"""Mutable runtime state for the jail service.
|
|
|
|
Stores capability detection results and synchronization primitives used by
|
|
jail operations. This state is initialized once and shared across all
|
|
service calls within a single worker process.
|
|
"""
|
|
|
|
backend_cmd_supported: bool | None = None
|
|
backend_cmd_lock: asyncio.Lock | None = None
|
|
|
|
def get_backend_cmd_lock(self) -> asyncio.Lock:
|
|
"""Return the shared backend capability probe lock, initialising lazily.
|
|
|
|
The caller must already be running inside the event loop when the lock
|
|
is created, which is true for all service entry points.
|
|
"""
|
|
if self.backend_cmd_lock is None:
|
|
self.backend_cmd_lock = asyncio.Lock()
|
|
return self.backend_cmd_lock
|
|
|
|
async def reset_backend_capability_cache(self) -> None:
|
|
"""Reset the cached backend/idle capability detection state.
|
|
|
|
This is intended for test isolation and scenarios where the cached
|
|
probe result must be invalidated before the next detection attempt.
|
|
"""
|
|
async with self.get_backend_cmd_lock():
|
|
self.backend_cmd_supported = None
|
|
|
|
|
|
@dataclass
|
|
class RuntimeState:
|
|
"""Mutable runtime state for the current application instance."""
|
|
|
|
setup_complete_cached: bool = False
|
|
server_status: ServerStatus = field(default_factory=lambda: ServerStatus(online=False))
|
|
pending_recovery: PendingRecovery | None = None
|
|
last_activation: ActivationRecord | None = None
|
|
runtime_settings: Settings | None = None
|
|
jail_service_state: JailServiceState = field(default_factory=JailServiceState)
|
|
|
|
|
|
class ApplicationState(State):
|
|
"""Application state wrapper that delegates runtime state access.
|
|
|
|
This allows runtime values to be stored in a dedicated
|
|
:class:`RuntimeState` instance while preserving the familiar attribute-based
|
|
``app.state`` API for the rest of the application.
|
|
"""
|
|
|
|
def __init__(self, runtime_state: RuntimeState, state: dict[str, Any] | None = None):
|
|
super().__init__(state)
|
|
object.__setattr__(self, "_runtime_state", runtime_state)
|
|
|
|
@property
|
|
def runtime_state(self) -> RuntimeState:
|
|
"""Return the dedicated runtime state manager."""
|
|
return object.__getattribute__(self, "_runtime_state")
|
|
|
|
def __getattr__(self, key: str) -> Any:
|
|
if key in _RUNTIME_ATTRIBUTES:
|
|
return getattr(self.runtime_state, key)
|
|
return super().__getattr__(key)
|
|
|
|
def __setattr__(self, key: str, value: Any) -> None:
|
|
if key in _RUNTIME_ATTRIBUTES:
|
|
setattr(self.runtime_state, key, value)
|
|
return
|
|
super().__setattr__(key, value)
|
|
|
|
def __delattr__(self, key: str) -> None:
|
|
if key in _RUNTIME_ATTRIBUTES:
|
|
delattr(self.runtime_state, key)
|
|
return
|
|
super().__delattr__(key)
|
|
|
|
|
|
def get_runtime_state(app: Any) -> RuntimeState:
|
|
"""Return the runtime state manager for the current FastAPI application."""
|
|
state = getattr(app, "state", None)
|
|
if state is None or not hasattr(state, "runtime_state"):
|
|
raise AttributeError("Runtime state has not been initialised on the application.")
|
|
return state.runtime_state
|
|
|
|
|
|
def get_app_settings(app: Any) -> Settings:
|
|
"""Return the bootstrap settings loaded at startup."""
|
|
settings = getattr(app.state, "settings", None)
|
|
if settings is None:
|
|
raise AttributeError("Application settings are not available on the app state.")
|
|
return settings
|
|
|
|
|
|
def get_effective_settings(app: Any) -> Settings:
|
|
"""Return the effective settings for the current application instance."""
|
|
runtime_settings = getattr(app.state, "runtime_settings", None)
|
|
if runtime_settings is not None:
|
|
return runtime_settings
|
|
return get_app_settings(app)
|
|
|
|
|
|
def set_runtime_settings(app: Any, settings: Settings) -> None:
|
|
"""Store the resolved runtime settings separately from bootstrap config.
|
|
|
|
Also updates the session cache backend if the session cache configuration
|
|
has changed, replacing it with InMemorySessionCache or NoOpSessionCache
|
|
as appropriate.
|
|
|
|
Args:
|
|
app: The FastAPI application instance.
|
|
settings: The new effective settings.
|
|
"""
|
|
runtime_state = get_runtime_state(app)
|
|
runtime_state.runtime_settings = settings
|
|
|
|
# Update session cache if settings changed
|
|
cache_enabled = settings.session_cache_enabled and settings.session_cache_ttl_seconds > 0.0
|
|
new_cache = InMemorySessionCache() if cache_enabled else NoOpSessionCache()
|
|
app.state.session_cache = new_cache
|
|
log.debug("session_cache_updated", cache_type=type(new_cache).__name__)
|
|
|
|
|
|
def update_app_settings(app: Any, **overrides: Any) -> None:
|
|
"""Update the current effective settings immutably."""
|
|
settings = get_app_settings(app)
|
|
updated = settings.model_copy(update=overrides)
|
|
set_runtime_settings(app, updated)
|
|
|
|
|
|
def record_activation(app: Any, jail_name: str, at: datetime.datetime | None = None) -> datetime.datetime:
|
|
"""Record a jail activation timestamp in runtime state."""
|
|
now = at if at is not None else datetime.datetime.now(tz=datetime.UTC)
|
|
runtime_state = get_runtime_state(app)
|
|
runtime_state.last_activation = {
|
|
"jail_name": jail_name,
|
|
"at": now,
|
|
}
|
|
return now
|
|
|
|
|
|
def create_pending_recovery(
|
|
app: Any,
|
|
jail_name: str,
|
|
activated_at: datetime.datetime,
|
|
detected_at: datetime.datetime | None = None,
|
|
) -> None:
|
|
"""Create a pending recovery record in runtime state."""
|
|
runtime_state = get_runtime_state(app)
|
|
runtime_state.pending_recovery = PendingRecovery(
|
|
jail_name=jail_name,
|
|
activated_at=activated_at,
|
|
detected_at=detected_at if detected_at is not None else datetime.datetime.now(tz=datetime.UTC),
|
|
)
|
|
|
|
|
|
def clear_pending_recovery(app: Any) -> None:
|
|
"""Clear the current pending recovery record."""
|
|
get_runtime_state(app).pending_recovery = None
|
|
|
|
|
|
def clear_activation_record(app: Any) -> None:
|
|
"""Clear the current activation tracking record."""
|
|
get_runtime_state(app).last_activation = None
|
|
|
|
|
|
def process_health_probe_result(
|
|
runtime_state: RuntimeState,
|
|
status: ServerStatus,
|
|
now: datetime.datetime | None = None,
|
|
) -> None:
|
|
"""Process a new health probe result and update runtime state.
|
|
|
|
This function tracks fail2ban transitions and creates or resolves
|
|
pending recovery records when the daemon goes offline shortly after a
|
|
jail activation.
|
|
|
|
Args:
|
|
runtime_state: The mutable runtime state manager.
|
|
status: The latest fail2ban server status.
|
|
now: The current timestamp used for time-based decisions.
|
|
"""
|
|
prev_status = getattr(runtime_state, "server_status", ServerStatus(online=False))
|
|
runtime_state.server_status = status
|
|
now = now if now is not None else datetime.datetime.now(tz=datetime.UTC)
|
|
|
|
if status.online and not prev_status.online:
|
|
log.info("fail2ban_came_online", version=status.version)
|
|
existing = runtime_state.pending_recovery
|
|
if existing is not None and not existing.recovered:
|
|
runtime_state.pending_recovery = PendingRecovery(
|
|
jail_name=existing.jail_name,
|
|
activated_at=existing.activated_at,
|
|
detected_at=existing.detected_at,
|
|
recovered=True,
|
|
)
|
|
log.info(
|
|
"pending_recovery_resolved",
|
|
jail=existing.jail_name,
|
|
)
|
|
|
|
elif not status.online and prev_status.online:
|
|
log.warning("fail2ban_went_offline")
|
|
last_activation = runtime_state.last_activation
|
|
if last_activation is not None:
|
|
activated_at = last_activation["at"]
|
|
seconds_since = (now - activated_at).total_seconds()
|
|
if seconds_since <= _ACTIVATION_CRASH_WINDOW:
|
|
jail_name = last_activation["jail_name"]
|
|
current = runtime_state.pending_recovery
|
|
if current is None or current.recovered:
|
|
runtime_state.pending_recovery = PendingRecovery(
|
|
jail_name=jail_name,
|
|
activated_at=activated_at,
|
|
detected_at=now,
|
|
)
|
|
log.warning(
|
|
"activation_crash_detected",
|
|
jail=jail_name,
|
|
seconds_since_activation=seconds_since,
|
|
)
|
|
|
|
log.debug(
|
|
"health_check_complete",
|
|
online=status.online,
|
|
version=status.version,
|
|
active_jails=status.active_jails,
|
|
)
|