Files
BanGUI/backend/app/startup.py
Lukas 187cd8250d Implement database-backed scheduler lock for multi-worker safety
Enforce single-executor safety regardless of process launcher through a
robust database-backed lock mechanism that works reliably in container
orchestration environments.

Key changes:
1. Add scheduler_lock table to database schema (migration 4)
   - Singleton row (id=1) prevents concurrent execution
   - Stores PID, hostname, creation timestamp, heartbeat timestamp
   - Atomic transaction prevents race conditions

2. Create scheduler lock utility (app/utils/scheduler_lock.py)
   - acquire_scheduler_lock(): Atomically acquire or fail
   - release_scheduler_lock(): Clean up on shutdown
   - update_scheduler_lock_heartbeat(): Keep lock alive (every 10 seconds)
   - get_scheduler_lock_info(): Debug/inspect lock status
   - Stale lock detection: TTL-based (60 second expiry)

3. Reorder startup DAG stages
   - DATABASE now comes first (required for lock acquisition)
   - WORKER_MODE depends on DATABASE (performs lock check after initialization)
   - Maintains all other stage dependencies intact

4. Update startup process (app/startup.py)
   - Replace _check_single_worker_mode() with two-tier check:
     * Fast check: BANGUI_WORKERS env var (if explicitly set to >1)
     * Authoritative check: Database lock (catches misconfiguration)
   - Return startup_db from startup_shared_resources() for lock management

5. Register scheduler lock heartbeat task
   - New task: scheduler_lock_heartbeat (app/tasks/scheduler_lock_heartbeat.py)
   - Updates lock heartbeat every 10 seconds (keeps lock alive)
   - Prevents false positives from temporary load spikes

6. Add lock release to lifespan shutdown (app/main.py)
   - Release lock before closing database
   - Allows other instances to acquire during rolling deployments
   - Graceful handoff between instances

7. Comprehensive test coverage (backend/tests/test_scheduler_lock.py)
   - Lock acquisition success and failure cases
   - Stale lock cleanup on startup
   - Lock release and heartbeat updates
   - Full lifecycle: acquire → heartbeat → release

8. Update documentation (Docs/Architekture.md § 9.3)
   - Explain single-executor requirement
   - Document database-backed locking mechanism
   - Compare with alternative approaches (filesystem, env var)
   - Include troubleshooting guide
   - Container orchestration examples (Docker, Kubernetes, systemd)

Why database-backed instead of filesystem?
   - Atomicity: SQLite transactions prevent TOCTOU race windows
   - Container-safe: Works across containers with shared DB volumes
   - No NFS/SMB edge cases
   - Timestamp-based stale detection (PID reuse is unreliable)
   - More reliable in rolling deployments

Benefits:
   - Works with any process manager (uvicorn, gunicorn, etc.)
   - Handles simultaneous startup attempts correctly
   - Automatic failover on instance crash (stale lock cleanup)
   - Clear error messages with troubleshooting steps
   - No environment variable required (lock is authoritative)
   - Scales to multi-worker deployments if combined with external job store

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-29 20:10:53 +02:00

449 lines
16 KiB
Python

"""Application startup helpers.
This module contains shared startup logic extracted from ``app.main`` so that
initialisation is easier to reason about and unit test. The lifespan handler
in ``app.main`` delegates resource creation and task registration here.
The startup process is orchestrated by StartupDAG, which ensures all resources
are initialized in the correct order with explicit dependency tracking, and
cleanly rolls back on failure.
Startup Stages (in order):
1. WORKER_MODE: Verify single-worker mode (no multi-worker scheduler conflicts)
2. DATABASE: Initialize database schema and cache setup completion state
3. GEO_CACHE: Load and configure IP geolocation cache
4. HTTP_SESSION: Create shared aiohttp session with timeouts
5. SCHEDULER: Create APScheduler instance and register background tasks
6. TASKS: Verify all tasks are registered
See StartupDAG in app.startup_dag for full dependency graph and rollback logic.
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import TYPE_CHECKING, Any
import aiohttp
import structlog
from apscheduler.schedulers.asyncio import AsyncIOScheduler # type: ignore[import-untyped]
from app.db import init_db, open_db
from app.services import setup_service
from app.services.dns_validated_connector import create_dns_validated_socket_factory
from app.services.geo_cache import GeoCache
from app.startup_dag import StartupDAG, StartupStage
from app.tasks import (
blocklist_import,
geo_cache_cleanup,
geo_cache_flush,
geo_re_resolve,
health_check,
history_sync,
rate_limiter_cleanup,
scheduler_lock_heartbeat,
session_cleanup,
)
from app.utils.async_utils import run_blocking
from app.utils.jail_config import ensure_jail_configs
from app.utils.runtime_state import set_runtime_settings
from app.utils.scheduler_lock import (
acquire_scheduler_lock,
release_scheduler_lock,
)
from app.utils.setup_state import set_setup_complete_cache
if TYPE_CHECKING:
from fastapi import FastAPI
from app.config import Settings
log: structlog.stdlib.BoundLogger = structlog.get_logger()
def _check_single_worker_mode() -> None:
"""Fast check: verify BANGUI_WORKERS environment variable if set.
This is the first-line guard: if BANGUI_WORKERS is explicitly set to a
value > 1, reject immediately without requiring database access. This
catches obvious misconfiguration early.
The authoritative check is the database-backed lock acquired in
_stage_check_worker_mode_and_acquire_lock(), which handles the general
case where multiple instances start without proper environment setup.
Raises:
RuntimeError: If BANGUI_WORKERS is explicitly set to > 1.
"""
workers_env = os.environ.get("BANGUI_WORKERS")
if workers_env is not None:
try:
worker_count = int(workers_env)
if worker_count > 1:
raise RuntimeError(
"BanGUI background scheduler cannot run with multiple workers.\n"
f"BANGUI_WORKERS is set to {worker_count}. Set it to 1 or remove it.\n"
"See Architekture.md § Deployment Constraints for details."
)
except ValueError as e:
raise RuntimeError(
f"BANGUI_WORKERS environment variable must be an integer, got: {workers_env}"
) from e
async def _ensure_database_schema(database_path: str) -> None:
"""Create the configured runtime database if it does not already exist."""
db = await open_db(database_path)
try:
await init_db(db)
finally:
await db.close()
def _create_http_session(settings: Settings) -> aiohttp.ClientSession:
"""Build a shared aiohttp session with DNS-rebinding protection and reasonable limits.
Uses a custom socket factory that validates all resolved IPs at connection time,
preventing DNS-rebinding attacks where a blocklist URL initially resolves to
a public IP but later resolves to a private IP during the actual connection.
"""
timeout = aiohttp.ClientTimeout(
total=settings.http_request_timeout_seconds,
connect=settings.http_connect_timeout_seconds,
sock_read=settings.http_request_timeout_seconds,
)
connector = aiohttp.TCPConnector(
limit=settings.http_max_connections,
limit_per_host=settings.http_max_connections,
keepalive_timeout=settings.http_keepalive_timeout_seconds,
enable_cleanup_closed=True,
socket_factory=create_dns_validated_socket_factory(),
)
return aiohttp.ClientSession(timeout=timeout, connector=connector)
async def startup_shared_resources(
app: FastAPI,
settings: Settings,
) -> tuple[aiohttp.ClientSession, AsyncIOScheduler, Any]:
"""Create shared resources needed during the application lifespan.
This function orchestrates the entire startup sequence through a StartupDAG,
ensuring all resources are initialized in the correct order with explicit
dependency tracking. If any stage fails, all completed resources are cleanly
rolled back.
The startup stages are:
1. DATABASE: Initialize database and load setup state
2. WORKER_MODE: Validate single-worker configuration and acquire scheduler lock
3. GEO_CACHE: Load IP geolocation cache
4. HTTP_SESSION: Create shared aiohttp session
5. SCHEDULER: Create and start APScheduler
6. TASKS: Register all background jobs
Args:
app: The FastAPI application instance.
settings: Resolved application settings.
Returns:
A tuple of ``(http_session, scheduler, startup_db)``.
Raises:
RuntimeError: If any startup stage fails or prerequisites are not met.
"""
dag = StartupDAG()
# Register all startup stages with their dependencies.
# NOTE: DATABASE stage must come before WORKER_MODE for lock acquisition
dag.register_stage(
StartupStage.DATABASE,
"Initialize database schema and load setup state",
prerequisites=frozenset(),
)
dag.register_stage(
StartupStage.WORKER_MODE,
"Verify single-worker mode and acquire scheduler lock",
prerequisites=frozenset([StartupStage.DATABASE]),
)
dag.register_stage(
StartupStage.GEO_CACHE,
"Load IP geolocation cache from database",
prerequisites=frozenset([StartupStage.WORKER_MODE]),
)
dag.register_stage(
StartupStage.HTTP_SESSION,
"Create shared aiohttp session with configured timeouts",
prerequisites=frozenset([StartupStage.GEO_CACHE]),
)
dag.register_stage(
StartupStage.SCHEDULER,
"Create and start APScheduler for background jobs",
prerequisites=frozenset([StartupStage.HTTP_SESSION]),
)
dag.register_stage(
StartupStage.TASKS,
"Register all background jobs (import, cleanup, health checks)",
prerequisites=frozenset([StartupStage.SCHEDULER]),
)
try:
# Stage 1: Initialize database (must come first for lock acquisition)
startup_db = await dag.execute_stage(
StartupStage.DATABASE,
lambda: _stage_init_database(app, settings),
)
# Stage 2: Validate single-worker mode and acquire scheduler lock
await dag.execute_stage(
StartupStage.WORKER_MODE,
lambda: _stage_check_worker_mode_and_acquire_lock(startup_db),
)
# Stage 3: Load GeoCache
geo_cache = await dag.execute_stage(
StartupStage.GEO_CACHE,
lambda: _stage_init_geo_cache(settings, startup_db),
)
# Stage 4: Create HTTP session
http_session = await dag.execute_stage(
StartupStage.HTTP_SESSION,
lambda: _stage_create_http_session(settings),
)
# Stage 5: Create and start scheduler
scheduler = await dag.execute_stage(
StartupStage.SCHEDULER,
lambda: _stage_create_scheduler(),
)
# Stage 6: Register tasks
await dag.execute_stage(
StartupStage.TASKS,
lambda: _stage_register_tasks(app, scheduler),
)
# Verify all resources are healthy
if not await dag.health_check():
raise RuntimeError("Startup health check failed")
# Store the geo_cache on app state for dependency injection
app.state.geo_cache = geo_cache
log.info(
"startup_completed_successfully",
stages=len(dag.context.completed_stages),
)
return http_session, scheduler, startup_db
except Exception:
# Clean up on failure
log.error("startup_failed_rolling_back_resources")
await dag.rollback()
# Ensure database is closed if it was initialized
if StartupStage.DATABASE in dag.context.completed_stages:
startup_db = dag.context.get_resource(StartupStage.DATABASE)
await startup_db.close()
raise
async def _stage_check_worker_mode_and_acquire_lock(startup_db: Any) -> None:
"""Check single-worker mode and acquire the scheduler lock.
This is stage 1 of the startup DAG. It performs two checks:
1. Fast check: Verify BANGUI_WORKERS env var if explicitly set
2. Authoritative check: Acquire database-backed scheduler lock
The database lock ensures that only one instance runs the scheduler, even
in container orchestration scenarios where multiple instances may start
simultaneously. This prevents duplicate background jobs, duplicate history
entries, and SQLite lock contention.
Args:
startup_db: The initialized database connection.
Raises:
RuntimeError: If the env var check fails or the scheduler lock cannot
be acquired (another instance is running the scheduler).
"""
# Fast check: verify BANGUI_WORKERS if explicitly set
_check_single_worker_mode()
# Authoritative check: acquire the database-backed lock
if not await acquire_scheduler_lock(startup_db):
raise RuntimeError(
"Could not acquire scheduler lock. Another BanGUI instance is already running the scheduler.\n"
"This prevents duplicate background jobs (blocklist imports, history sync, etc.).\n"
"\n"
"To recover from a stale lock (e.g., after a crash):\n"
" 1. Verify no other BanGUI instances are running\n"
" 2. Inspect the lock: sqlite3 bangui.db 'SELECT * FROM scheduler_lock;'\n"
" 3. If stale, clean it: sqlite3 bangui.db 'DELETE FROM scheduler_lock;'\n"
"\n"
"See Architekture.md § Deployment Constraints for details."
)
async def _stage_init_database(app: FastAPI, settings: Settings) -> Any:
"""Initialize database schema and load setup state.
This is stage 2 of the startup DAG. It:
1. Creates database directory if needed
2. Opens the database connection
3. Initializes schema
4. Caches setup completion state
5. Loads persisted runtime settings
Returns:
The database connection object.
"""
db_path: Path = Path(settings.database_path)
await run_blocking(db_path.parent.mkdir, parents=True, exist_ok=True)
log.debug("database_directory_ensured", directory=str(db_path.parent))
original_db_path = db_path.resolve()
startup_db = await open_db(settings.database_path)
try:
await init_db(startup_db)
setup_complete = await setup_service.is_setup_complete(startup_db)
set_setup_complete_cache(app, setup_complete)
log.debug("setup_completion_cached", completed=setup_complete)
if setup_complete:
runtime_database_path = await setup_service.get_runtime_database_path(startup_db)
if runtime_database_path:
if Path(runtime_database_path).resolve() != original_db_path:
await _ensure_database_schema(runtime_database_path)
runtime_db = await open_db(runtime_database_path)
try:
persisted_runtime_settings = (
await setup_service.get_persisted_runtime_settings(runtime_db)
)
finally:
await runtime_db.close()
if persisted_runtime_settings:
updated_settings = settings.model_copy(update=persisted_runtime_settings)
set_runtime_settings(app, updated_settings)
log.info(
"runtime_settings_overridden_from_setup",
overrides=persisted_runtime_settings,
)
except Exception:
await startup_db.close()
raise
return startup_db
async def _stage_init_geo_cache(settings: Settings, startup_db: Any) -> GeoCache:
"""Load IP geolocation cache.
This is stage 3 of the startup DAG. It:
1. Creates GeoCache instance with configured settings
2. Loads cache from database
3. Counts unresolved IPs
4. Initializes GeoIP database
5. Logs warnings if necessary
Returns:
The GeoCache instance.
"""
geo_cache = GeoCache(allow_http_fallback=settings.geoip_allow_http_fallback)
db_path: Path = Path(settings.database_path)
original_db_path = db_path.resolve()
if db_path.resolve() != original_db_path:
runtime_db = await open_db(settings.database_path)
try:
await geo_cache.load_cache_from_db(runtime_db)
unresolved_count = await geo_cache.count_unresolved(runtime_db)
finally:
await runtime_db.close()
else:
await geo_cache.load_cache_from_db(startup_db)
unresolved_count = await geo_cache.count_unresolved(startup_db)
await run_blocking(ensure_jail_configs, Path(settings.fail2ban_config_dir) / "jail.d")
if unresolved_count > 0:
log.warning("geo_cache_unresolved_ips", unresolved=unresolved_count)
geo_cache.init_geoip(settings.geoip_db_path)
if settings.geoip_allow_http_fallback:
log.warning(
"geoip_http_fallback_enabled",
message=(
"WARNING: IP geolocation HTTP fallback is enabled. "
"IP addresses will be sent unencrypted to ip-api.com if the MaxMind database is unavailable. "
"This is a security and privacy risk. Disable BANGUI_GEOIP_ALLOW_HTTP_FALLBACK in production."
),
)
return geo_cache
async def _stage_create_http_session(settings: Settings) -> aiohttp.ClientSession:
"""Create shared aiohttp session with configured timeouts.
This is stage 4 of the startup DAG.
Returns:
The aiohttp ClientSession instance.
"""
return _create_http_session(settings)
async def _stage_create_scheduler() -> AsyncIOScheduler:
"""Create and start APScheduler.
This is stage 5 of the startup DAG.
Returns:
The AsyncIOScheduler instance.
Raises:
RuntimeError: If scheduler creation or startup fails.
"""
scheduler: AsyncIOScheduler = AsyncIOScheduler(timezone="UTC")
scheduler.start()
return scheduler
async def _stage_register_tasks(app: FastAPI, scheduler: AsyncIOScheduler) -> None:
"""Register all background jobs.
This is stage 6 of the startup DAG. It registers:
- scheduler_lock_heartbeat: Periodic update of scheduler lock (keeps it alive)
- health_check: Periodic fail2ban connectivity probe
- blocklist_import: Scheduled blocklist download and application
- geo_cache_cleanup: Periodic purge of stale geo cache entries
- geo_cache_flush: Periodic geo cache persistence
- geo_re_resolve: Periodic re-resolution of stale records
- history_sync: Periodic synchronization of ban history
- session_cleanup: Periodic cleanup of expired sessions
- rate_limiter_cleanup: Periodic cleanup of expired rate-limiter entries
Args:
app: The FastAPI application instance.
scheduler: The APScheduler scheduler to register tasks with.
"""
scheduler_lock_heartbeat.register(app)
health_check.register(app)
await blocklist_import.register(app)
geo_cache_cleanup.register(app)
geo_cache_flush.register(app)
geo_re_resolve.register(app)
history_sync.register(app)
session_cleanup.register(app)
rate_limiter_cleanup.register(app)
log.info("startup_tasks_registered", count=9)