- Add _check_single_worker_mode() to startup.py that detects and rejects multi-worker configurations, raising a clear RuntimeError with instructions - Set BANGUI_WORKERS=1 as default in Dockerfile.backend - Document single-worker requirement in compose.prod.yml - Add 'Deployment Constraints' section to Architekture.md explaining why single-worker mode is required and detailing future multi-worker support - Add '9.1 Background Tasks and Scheduler Architecture' section to Backend-Development.md documenting task structure and single-worker requirement - Add comprehensive test suite (test_startup.py) covering all scenarios: allows single worker, rejects multi-worker, validates config format, and verifies informative error messages This fix addresses TASK-002 which identified that in-process APScheduler is unsafe in multi-worker deployments due to each worker creating independent scheduler instances, causing duplicate background job execution. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
188 lines
6.9 KiB
Python
188 lines
6.9 KiB
Python
"""Application startup helpers.
|
|
|
|
This module contains shared startup logic extracted from ``app.main`` so that
|
|
initialisation is easier to reason about and unit test. The lifespan handler
|
|
in ``app.main`` delegates resource creation and task registration here.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from contextlib import suppress
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
|
|
import aiohttp
|
|
import structlog
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler # type: ignore[import-untyped]
|
|
|
|
from app.db import init_db, open_db
|
|
from app.services import setup_service
|
|
from app.services.geo_cache import GeoCache
|
|
from app.tasks import blocklist_import, geo_cache_flush, geo_re_resolve, health_check, history_sync
|
|
from app.utils.async_utils import run_blocking
|
|
from app.utils.jail_config import ensure_jail_configs
|
|
from app.utils.runtime_state import set_runtime_settings
|
|
from app.utils.setup_state import set_setup_complete_cache
|
|
|
|
if TYPE_CHECKING:
|
|
from fastapi import FastAPI
|
|
|
|
from app.config import Settings
|
|
|
|
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
|
|
|
|
|
def _check_single_worker_mode() -> None:
|
|
"""Verify that the application is running with a single worker.
|
|
|
|
APScheduler's AsyncIOScheduler is bound to a single asyncio event loop
|
|
and cannot be safely shared across multiple worker processes. If each
|
|
worker starts its own scheduler instance, all background jobs execute N
|
|
times (where N is the number of workers), resulting in duplicate blocklist
|
|
imports, duplicate ban operations, duplicate history writes, and SQLite
|
|
lock contention.
|
|
|
|
This function detects multi-worker configurations and raises a clear
|
|
RuntimeError with instructions.
|
|
|
|
Raises:
|
|
RuntimeError: If the app would run with multiple workers.
|
|
"""
|
|
# Check for explicit worker count env var (convention used in deployment)
|
|
workers_env = os.environ.get("BANGUI_WORKERS")
|
|
if workers_env is not None:
|
|
try:
|
|
worker_count = int(workers_env)
|
|
if worker_count > 1:
|
|
raise RuntimeError(
|
|
"BanGUI background scheduler cannot run with multiple workers.\n"
|
|
f"BANGUI_WORKERS is set to {worker_count}. Set it to 1 or remove it.\n"
|
|
"See Architekture.md § Deployment Constraints for details."
|
|
)
|
|
except ValueError as e:
|
|
raise RuntimeError(
|
|
f"BANGUI_WORKERS environment variable must be an integer, got: {workers_env}"
|
|
) from e
|
|
|
|
|
|
async def _ensure_database_schema(database_path: str) -> None:
|
|
"""Create the configured runtime database if it does not already exist."""
|
|
db = await open_db(database_path)
|
|
try:
|
|
await init_db(db)
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
def _create_http_session(settings: Settings) -> aiohttp.ClientSession:
|
|
"""Build a shared aiohttp session with reasonable global limits and timeouts."""
|
|
timeout = aiohttp.ClientTimeout(
|
|
total=settings.http_request_timeout_seconds,
|
|
connect=settings.http_connect_timeout_seconds,
|
|
sock_read=settings.http_request_timeout_seconds,
|
|
)
|
|
connector = aiohttp.TCPConnector(
|
|
limit=settings.http_max_connections,
|
|
limit_per_host=settings.http_max_connections,
|
|
keepalive_timeout=settings.http_keepalive_timeout_seconds,
|
|
enable_cleanup_closed=True,
|
|
)
|
|
return aiohttp.ClientSession(timeout=timeout, connector=connector)
|
|
|
|
|
|
async def startup_shared_resources(
|
|
app: FastAPI,
|
|
settings: Settings,
|
|
) -> tuple[aiohttp.ClientSession, AsyncIOScheduler]:
|
|
"""Create shared resources needed during the application lifespan.
|
|
|
|
Args:
|
|
app: The FastAPI application instance.
|
|
settings: Resolved application settings.
|
|
|
|
Returns:
|
|
A tuple of ``(http_session, scheduler)``.
|
|
"""
|
|
_check_single_worker_mode()
|
|
|
|
db_path: Path = Path(settings.database_path)
|
|
await run_blocking(db_path.parent.mkdir, parents=True, exist_ok=True)
|
|
|
|
log.debug("database_directory_ensured", directory=str(db_path.parent))
|
|
|
|
original_db_path = db_path.resolve()
|
|
startup_db = await open_db(settings.database_path)
|
|
try:
|
|
await init_db(startup_db)
|
|
setup_complete = await setup_service.is_setup_complete(startup_db)
|
|
set_setup_complete_cache(app, setup_complete)
|
|
log.debug("setup_completion_cached", completed=setup_complete)
|
|
|
|
if setup_complete:
|
|
runtime_database_path = await setup_service.get_runtime_database_path(startup_db)
|
|
if runtime_database_path:
|
|
if Path(runtime_database_path).resolve() != original_db_path:
|
|
await _ensure_database_schema(runtime_database_path)
|
|
|
|
runtime_db = await open_db(runtime_database_path)
|
|
try:
|
|
persisted_runtime_settings = (
|
|
await setup_service.get_persisted_runtime_settings(runtime_db)
|
|
)
|
|
finally:
|
|
await runtime_db.close()
|
|
|
|
if persisted_runtime_settings:
|
|
updated_settings = settings.model_copy(update=persisted_runtime_settings)
|
|
set_runtime_settings(app, updated_settings)
|
|
settings = updated_settings
|
|
log.info(
|
|
"runtime_settings_overridden_from_setup",
|
|
overrides=persisted_runtime_settings,
|
|
)
|
|
|
|
# Create and initialize the GeoCache instance
|
|
geo_cache = GeoCache()
|
|
if Path(settings.database_path).resolve() != original_db_path:
|
|
runtime_db = await open_db(settings.database_path)
|
|
try:
|
|
await geo_cache.load_cache_from_db(runtime_db)
|
|
unresolved_count = await geo_cache.count_unresolved(runtime_db)
|
|
finally:
|
|
await runtime_db.close()
|
|
else:
|
|
await geo_cache.load_cache_from_db(startup_db)
|
|
unresolved_count = await geo_cache.count_unresolved(startup_db)
|
|
finally:
|
|
await startup_db.close()
|
|
|
|
await run_blocking(ensure_jail_configs, Path(settings.fail2ban_config_dir) / "jail.d")
|
|
|
|
if unresolved_count > 0:
|
|
log.warning("geo_cache_unresolved_ips", unresolved=unresolved_count)
|
|
|
|
http_session: aiohttp.ClientSession = _create_http_session(settings)
|
|
geo_cache.init_geoip(settings.geoip_db_path)
|
|
app.state.geo_cache = geo_cache
|
|
|
|
scheduler: AsyncIOScheduler | None = None
|
|
try:
|
|
scheduler = AsyncIOScheduler(timezone="UTC")
|
|
scheduler.start()
|
|
|
|
health_check.register(app)
|
|
await blocklist_import.register(app)
|
|
geo_cache_flush.register(app)
|
|
geo_re_resolve.register(app)
|
|
history_sync.register(app)
|
|
|
|
return http_session, scheduler
|
|
except Exception:
|
|
with suppress(Exception):
|
|
await http_session.close()
|
|
if scheduler is not None:
|
|
with suppress(Exception):
|
|
scheduler.shutdown(wait=False)
|
|
raise
|