- Remove structlog dependency from backend/pyproject.toml - Add app.utils.logging_compat shim for keyword-arg logging API - Add app.utils.json_formatter for JSON log output with extra fields - Update all backend modules to use logging_compat.get_logger() - Update docstrings in log_sanitizer.py and json_formatter.py - Update test comment in test_async_utils.py - Record 406 failing tests in Docs/Tasks.md for tracking
485 lines
18 KiB
Python
485 lines
18 KiB
Python
"""Application startup helpers.
|
||
|
||
This module contains shared startup logic extracted from ``app.main`` so that
|
||
initialisation is easier to reason about and unit test. The lifespan handler
|
||
in ``app.main`` delegates resource creation and task registration here.
|
||
|
||
The startup process is orchestrated by StartupDAG, which ensures all resources
|
||
are initialized in the correct order with explicit dependency tracking, and
|
||
cleanly rolls back on failure.
|
||
|
||
Startup Stages (in order):
|
||
1. WORKER_MODE: Verify single-worker mode (no multi-worker scheduler conflicts)
|
||
2. DATABASE: Initialize database schema and cache setup completion state
|
||
3. GEO_CACHE: Load and configure IP geolocation cache
|
||
4. HTTP_SESSION: Create shared aiohttp session with timeouts
|
||
5. SCHEDULER: Create APScheduler instance and register background tasks
|
||
6. TASKS: Verify all tasks are registered
|
||
|
||
See StartupDAG in app.startup_dag for full dependency graph and rollback logic.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
from pathlib import Path
|
||
from typing import TYPE_CHECKING, Any
|
||
|
||
import aiohttp
|
||
from app.utils.logging_compat import get_logger
|
||
from apscheduler.schedulers.asyncio import AsyncIOScheduler # type: ignore[import-untyped]
|
||
|
||
from app.db import init_db, open_db
|
||
from app.services import setup_service
|
||
from app.services.dns_validated_connector import create_dns_validated_socket_factory
|
||
from app.services.geo_cache import GeoCache
|
||
from app.startup_dag import StartupDAG, StartupStage
|
||
from app.tasks import (
|
||
blocklist_import,
|
||
geo_cache_cleanup,
|
||
geo_cache_flush,
|
||
geo_re_resolve,
|
||
health_check,
|
||
history_sync,
|
||
rate_limiter_cleanup,
|
||
scheduler_lock_heartbeat,
|
||
session_cleanup,
|
||
)
|
||
from app.utils.async_utils import run_blocking
|
||
from app.utils.fail2ban_db_utils import ensure_fail2ban_indexes
|
||
from app.utils.jail_config import ensure_jail_configs
|
||
from app.utils.runtime_state import set_runtime_settings
|
||
from app.utils.scheduler_lock import (
|
||
acquire_scheduler_lock,
|
||
)
|
||
from app.utils.setup_state import set_setup_complete_cache
|
||
|
||
if TYPE_CHECKING:
|
||
from fastapi import FastAPI
|
||
|
||
from app.config import Settings
|
||
|
||
log = get_logger(__name__)
|
||
|
||
|
||
def _check_single_worker_mode() -> None:
|
||
"""Fast check: verify BANGUI_WORKERS environment variable if set.
|
||
|
||
This is the first-line guard: if BANGUI_WORKERS is explicitly set to a
|
||
value > 1, reject immediately without requiring database access. This
|
||
catches obvious misconfiguration early.
|
||
|
||
The authoritative check is the database-backed lock acquired in
|
||
_stage_check_worker_mode_and_acquire_lock(), which handles the general
|
||
case where multiple instances start without proper environment setup.
|
||
|
||
Raises:
|
||
RuntimeError: If BANGUI_WORKERS is explicitly set to > 1.
|
||
"""
|
||
workers_env = os.environ.get("BANGUI_WORKERS")
|
||
if workers_env is not None:
|
||
try:
|
||
worker_count = int(workers_env)
|
||
if worker_count > 1:
|
||
raise RuntimeError(
|
||
"BanGUI background scheduler cannot run with multiple workers.\n"
|
||
f"BANGUI_WORKERS is set to {worker_count}. Set it to 1 or remove it.\n"
|
||
"\n"
|
||
"Why this matters:\n"
|
||
" - Session cache is process-local; users may be randomly logged out\n"
|
||
" - Background jobs (blocklist imports, history sync) would run N times\n"
|
||
" - Database lock contention will cause timeouts\n"
|
||
"\n"
|
||
"To fix:\n"
|
||
" 1. Remove BANGUI_WORKERS=N from your environment\n"
|
||
" 2. Don't pass --workers to uvicorn or -w to gunicorn\n"
|
||
" 3. Deploy as a single process (use container orchestration for HA)\n"
|
||
"\n"
|
||
"See Docs/Architekture.md § Deployment Constraints for details."
|
||
)
|
||
except ValueError as e:
|
||
raise RuntimeError(
|
||
f"BANGUI_WORKERS environment variable must be an integer, got: {workers_env}"
|
||
) from e
|
||
|
||
|
||
async def _ensure_database_schema(database_path: str) -> None:
|
||
"""Create the configured runtime database if it does not already exist."""
|
||
db = await open_db(database_path)
|
||
try:
|
||
await init_db(db)
|
||
finally:
|
||
await db.close()
|
||
|
||
|
||
def _create_http_session(settings: Settings) -> aiohttp.ClientSession:
|
||
"""Build a shared aiohttp session with DNS-rebinding protection and reasonable limits.
|
||
|
||
Uses a custom socket factory that validates all resolved IPs at connection time,
|
||
preventing DNS-rebinding attacks where a blocklist URL initially resolves to
|
||
a public IP but later resolves to a private IP during the actual connection.
|
||
"""
|
||
timeout = aiohttp.ClientTimeout(
|
||
total=settings.http_request_timeout_seconds,
|
||
connect=settings.http_connect_timeout_seconds,
|
||
sock_read=settings.http_request_timeout_seconds,
|
||
)
|
||
connector = aiohttp.TCPConnector(
|
||
limit=settings.http_max_connections,
|
||
limit_per_host=settings.http_max_connections,
|
||
keepalive_timeout=settings.http_keepalive_timeout_seconds,
|
||
enable_cleanup_closed=True,
|
||
socket_factory=create_dns_validated_socket_factory(),
|
||
)
|
||
return aiohttp.ClientSession(timeout=timeout, connector=connector)
|
||
|
||
|
||
async def startup_shared_resources(
|
||
app: FastAPI,
|
||
settings: Settings,
|
||
) -> tuple[aiohttp.ClientSession, AsyncIOScheduler, Any]:
|
||
"""Create shared resources needed during the application lifespan.
|
||
|
||
This function orchestrates the entire startup sequence through a StartupDAG,
|
||
ensuring all resources are initialized in the correct order with explicit
|
||
dependency tracking. If any stage fails, all completed resources are cleanly
|
||
rolled back.
|
||
|
||
The startup stages are:
|
||
1. DATABASE: Initialize database and load setup state
|
||
2. WORKER_MODE: Validate single-worker configuration and acquire scheduler lock
|
||
3. GEO_CACHE: Load IP geolocation cache
|
||
4. HTTP_SESSION: Create shared aiohttp session
|
||
5. SCHEDULER: Create and start APScheduler
|
||
6. TASKS: Register all background jobs
|
||
|
||
Args:
|
||
app: The FastAPI application instance.
|
||
settings: Resolved application settings.
|
||
|
||
Returns:
|
||
A tuple of ``(http_session, scheduler, startup_db)``.
|
||
|
||
Raises:
|
||
RuntimeError: If any startup stage fails or prerequisites are not met.
|
||
"""
|
||
dag = StartupDAG()
|
||
|
||
# Register all startup stages with their dependencies.
|
||
# NOTE: DATABASE stage must come before WORKER_MODE for lock acquisition
|
||
dag.register_stage(
|
||
StartupStage.DATABASE,
|
||
"Initialize database schema and load setup state",
|
||
prerequisites=frozenset(),
|
||
)
|
||
dag.register_stage(
|
||
StartupStage.WORKER_MODE,
|
||
"Verify single-worker mode and acquire scheduler lock",
|
||
prerequisites=frozenset([StartupStage.DATABASE]),
|
||
)
|
||
dag.register_stage(
|
||
StartupStage.GEO_CACHE,
|
||
"Load IP geolocation cache from database",
|
||
prerequisites=frozenset([StartupStage.WORKER_MODE]),
|
||
)
|
||
dag.register_stage(
|
||
StartupStage.HTTP_SESSION,
|
||
"Create shared aiohttp session with configured timeouts",
|
||
prerequisites=frozenset([StartupStage.GEO_CACHE]),
|
||
)
|
||
dag.register_stage(
|
||
StartupStage.SCHEDULER,
|
||
"Create and start APScheduler for background jobs",
|
||
prerequisites=frozenset([StartupStage.HTTP_SESSION]),
|
||
)
|
||
dag.register_stage(
|
||
StartupStage.TASKS,
|
||
"Register all background jobs (import, cleanup, health checks)",
|
||
prerequisites=frozenset([StartupStage.SCHEDULER]),
|
||
)
|
||
|
||
try:
|
||
# Stage 1: Initialize database (must come first for lock acquisition)
|
||
startup_db = await dag.execute_stage(
|
||
StartupStage.DATABASE,
|
||
lambda: _stage_init_database(app, settings),
|
||
)
|
||
|
||
# Stage 2: Validate single-worker mode and acquire scheduler lock
|
||
await dag.execute_stage(
|
||
StartupStage.WORKER_MODE,
|
||
lambda: _stage_check_worker_mode_and_acquire_lock(startup_db),
|
||
)
|
||
|
||
# Stage 3: Load GeoCache
|
||
geo_cache = await dag.execute_stage(
|
||
StartupStage.GEO_CACHE,
|
||
lambda: _stage_init_geo_cache(settings, startup_db),
|
||
)
|
||
|
||
# Stage 4: Create HTTP session
|
||
http_session = await dag.execute_stage(
|
||
StartupStage.HTTP_SESSION,
|
||
lambda: _stage_create_http_session(settings),
|
||
)
|
||
|
||
# Stage 5: Create and start scheduler
|
||
scheduler = await dag.execute_stage(
|
||
StartupStage.SCHEDULER,
|
||
lambda: _stage_create_scheduler(),
|
||
)
|
||
|
||
# Store resources on app.state BEFORE registering tasks (tasks depend on them)
|
||
app.state.http_session = http_session
|
||
app.state.geo_cache = geo_cache
|
||
app.state.scheduler = scheduler
|
||
|
||
# Stage 6: Register tasks
|
||
await dag.execute_stage(
|
||
StartupStage.TASKS,
|
||
lambda: _stage_register_tasks(app, scheduler),
|
||
)
|
||
|
||
# Verify all resources are healthy
|
||
if not await dag.health_check():
|
||
raise RuntimeError("Startup health check failed")
|
||
|
||
log.info(
|
||
"startup_completed_successfully",
|
||
stages=len(dag.context.completed_stages),
|
||
)
|
||
|
||
return http_session, scheduler, startup_db
|
||
|
||
except Exception:
|
||
# Clean up on failure
|
||
log.error("startup_failed_rolling_back_resources")
|
||
await dag.rollback()
|
||
# Ensure database is closed if it was initialized
|
||
if StartupStage.DATABASE in dag.context.completed_stages:
|
||
startup_db = dag.context.get_resource(StartupStage.DATABASE)
|
||
await startup_db.close()
|
||
raise
|
||
|
||
|
||
async def _stage_check_worker_mode_and_acquire_lock(startup_db: Any) -> None:
|
||
"""Check single-worker mode and acquire the scheduler lock.
|
||
|
||
This is stage 1 of the startup DAG. It performs two checks:
|
||
1. Fast check: Verify BANGUI_WORKERS env var if explicitly set
|
||
2. Authoritative check: Acquire database-backed scheduler lock
|
||
|
||
The database lock ensures that only one instance runs the scheduler, even
|
||
in container orchestration scenarios where multiple instances may start
|
||
simultaneously. This prevents duplicate background jobs, duplicate history
|
||
entries, and SQLite lock contention.
|
||
|
||
Args:
|
||
startup_db: The initialized database connection.
|
||
|
||
Raises:
|
||
RuntimeError: If the env var check fails or the scheduler lock cannot
|
||
be acquired (another instance is running the scheduler).
|
||
"""
|
||
# Fast check: verify BANGUI_WORKERS if explicitly set
|
||
_check_single_worker_mode()
|
||
|
||
# Authoritative check: acquire the database-backed lock
|
||
if not await acquire_scheduler_lock(startup_db):
|
||
raise RuntimeError(
|
||
"Could not acquire scheduler lock. Another BanGUI instance is already running the scheduler.\n"
|
||
"\n"
|
||
"This prevents duplicate background jobs (blocklist imports, history sync, etc.).\n"
|
||
"\n"
|
||
"IMPORTANT: This also indicates a possible multi-worker misconfiguration:\n"
|
||
" - If BANGUI_WORKERS > 1, multiple workers are trying to acquire the lock\n"
|
||
" - If --workers or -w was passed to uvicorn/gunicorn, remove it\n"
|
||
" - BanGUI must run with exactly 1 worker process (use HA at container level)\n"
|
||
"\n"
|
||
"To recover from a stale lock (e.g., after a crash):\n"
|
||
" 1. Verify no other BanGUI instances are running\n"
|
||
" 2. Inspect the lock: sqlite3 bangui.db 'SELECT * FROM scheduler_lock;'\n"
|
||
" 3. If stale, clean it: sqlite3 bangui.db 'DELETE FROM scheduler_lock;'\n"
|
||
"\n"
|
||
"See Docs/Architekture.md § Deployment Constraints for details."
|
||
)
|
||
|
||
log.warning(
|
||
"rate_limiting_process_local_only",
|
||
message=(
|
||
"Rate limiting is process-local. With multiple workers, each worker enforces "
|
||
"its own independent limit — an attacker can send N × limit requests before "
|
||
"any worker triggers a block. Deploy with a single worker, or replace the "
|
||
"in-process store with a shared backend (e.g., Redis) for multi-worker setups."
|
||
),
|
||
)
|
||
|
||
|
||
async def _stage_init_database(app: FastAPI, settings: Settings) -> Any:
|
||
"""Initialize database schema and load setup state.
|
||
|
||
This is stage 2 of the startup DAG. It:
|
||
1. Creates database directory if needed
|
||
2. Opens the database connection
|
||
3. Initializes schema
|
||
4. Caches setup completion state
|
||
5. Loads persisted runtime settings
|
||
|
||
Returns:
|
||
The database connection object.
|
||
"""
|
||
db_path: Path = Path(settings.database_path)
|
||
await run_blocking(db_path.parent.mkdir, parents=True, exist_ok=True)
|
||
|
||
log.debug("database_directory_ensured", directory=str(db_path.parent))
|
||
|
||
original_db_path = db_path.resolve()
|
||
startup_db = await open_db(settings.database_path)
|
||
|
||
try:
|
||
await init_db(startup_db)
|
||
setup_complete = await setup_service.is_setup_complete(startup_db)
|
||
set_setup_complete_cache(app, setup_complete)
|
||
log.debug("setup_completion_cached", completed=setup_complete)
|
||
|
||
if setup_complete:
|
||
runtime_database_path = await setup_service.get_runtime_database_path(startup_db)
|
||
if runtime_database_path:
|
||
if Path(runtime_database_path).resolve() != original_db_path:
|
||
await _ensure_database_schema(runtime_database_path)
|
||
|
||
runtime_db = await open_db(runtime_database_path)
|
||
try:
|
||
# Ensure fail2ban bans table has performance indexes
|
||
# before any ban query runs against it. This is called on every
|
||
# startup so the index check is cheap (read-only probe).
|
||
f2b_db_path = await setup_service.get_fail2ban_db_path(runtime_db)
|
||
if f2b_db_path:
|
||
await run_blocking(ensure_fail2ban_indexes, f2b_db_path)
|
||
|
||
persisted_runtime_settings = (
|
||
await setup_service.get_persisted_runtime_settings(runtime_db)
|
||
)
|
||
finally:
|
||
await runtime_db.close()
|
||
|
||
if persisted_runtime_settings:
|
||
updated_settings = settings.model_copy(update=persisted_runtime_settings)
|
||
set_runtime_settings(app, updated_settings)
|
||
log.info(
|
||
"runtime_settings_overridden_from_setup",
|
||
overrides=persisted_runtime_settings,
|
||
)
|
||
except Exception:
|
||
await startup_db.close()
|
||
raise
|
||
|
||
return startup_db
|
||
|
||
|
||
async def _stage_init_geo_cache(settings: Settings, startup_db: Any) -> GeoCache:
|
||
"""Load IP geolocation cache.
|
||
|
||
This is stage 3 of the startup DAG. It:
|
||
1. Creates GeoCache instance with configured settings
|
||
2. Loads cache from database
|
||
3. Counts unresolved IPs
|
||
4. Initializes GeoIP database
|
||
5. Logs warnings if necessary
|
||
|
||
Returns:
|
||
The GeoCache instance.
|
||
"""
|
||
geo_cache = GeoCache(allow_http_fallback=settings.geoip_allow_http_fallback)
|
||
|
||
db_path: Path = Path(settings.database_path)
|
||
original_db_path = db_path.resolve()
|
||
|
||
if db_path.resolve() != original_db_path:
|
||
runtime_db = await open_db(settings.database_path)
|
||
try:
|
||
await geo_cache.load_cache_from_db(runtime_db)
|
||
unresolved_count = await geo_cache.count_unresolved(runtime_db)
|
||
finally:
|
||
await runtime_db.close()
|
||
else:
|
||
await geo_cache.load_cache_from_db(startup_db)
|
||
unresolved_count = await geo_cache.count_unresolved(startup_db)
|
||
|
||
await run_blocking(ensure_jail_configs, Path(settings.fail2ban_config_dir) / "jail.d")
|
||
|
||
if unresolved_count > 0:
|
||
log.warning("geo_cache_unresolved_ips", unresolved=unresolved_count)
|
||
|
||
geo_cache.init_geoip(settings.geoip_db_path)
|
||
|
||
if settings.geoip_allow_http_fallback:
|
||
log.warning(
|
||
"geoip_http_fallback_enabled",
|
||
message=(
|
||
"WARNING: IP geolocation HTTP fallback is enabled. "
|
||
"IP addresses will be sent unencrypted to ip-api.com if the MaxMind database is unavailable. "
|
||
"This is a security and privacy risk. Disable BANGUI_GEOIP_ALLOW_HTTP_FALLBACK in production."
|
||
),
|
||
)
|
||
|
||
return geo_cache
|
||
|
||
|
||
async def _stage_create_http_session(settings: Settings) -> aiohttp.ClientSession:
|
||
"""Create shared aiohttp session with configured timeouts.
|
||
|
||
This is stage 4 of the startup DAG.
|
||
|
||
Returns:
|
||
The aiohttp ClientSession instance.
|
||
"""
|
||
return _create_http_session(settings)
|
||
|
||
|
||
async def _stage_create_scheduler() -> AsyncIOScheduler:
|
||
"""Create and start APScheduler.
|
||
|
||
This is stage 5 of the startup DAG.
|
||
|
||
Returns:
|
||
The AsyncIOScheduler instance.
|
||
|
||
Raises:
|
||
RuntimeError: If scheduler creation or startup fails.
|
||
"""
|
||
scheduler: AsyncIOScheduler = AsyncIOScheduler(timezone="UTC")
|
||
scheduler.start()
|
||
return scheduler
|
||
|
||
|
||
async def _stage_register_tasks(app: FastAPI, scheduler: AsyncIOScheduler) -> None:
|
||
"""Register all background jobs.
|
||
|
||
This is stage 6 of the startup DAG. It registers:
|
||
- scheduler_lock_heartbeat: Periodic update of scheduler lock (keeps it alive)
|
||
- health_check: Periodic fail2ban connectivity probe
|
||
- blocklist_import: Scheduled blocklist download and application
|
||
- geo_cache_cleanup: Periodic purge of stale geo cache entries
|
||
- geo_cache_flush: Periodic geo cache persistence
|
||
- geo_re_resolve: Periodic re-resolution of stale records
|
||
- history_sync: Periodic synchronization of ban history
|
||
- session_cleanup: Periodic cleanup of expired sessions
|
||
- rate_limiter_cleanup: Periodic cleanup of expired rate-limiter entries
|
||
|
||
Args:
|
||
app: The FastAPI application instance.
|
||
scheduler: The APScheduler scheduler to register tasks with.
|
||
"""
|
||
scheduler_lock_heartbeat.register(app)
|
||
health_check.register(app)
|
||
await blocklist_import.register(app)
|
||
geo_cache_cleanup.register(app)
|
||
geo_cache_flush.register(app)
|
||
geo_re_resolve.register(app)
|
||
history_sync.register(app)
|
||
session_cleanup.register(app)
|
||
rate_limiter_cleanup.register(app)
|
||
|
||
log.info("startup_tasks_registered", count=9)
|