"""Application startup helpers. This module contains shared startup logic extracted from ``app.main`` so that initialisation is easier to reason about and unit test. The lifespan handler in ``app.main`` delegates resource creation and task registration here. """ from __future__ import annotations import os from contextlib import suppress from pathlib import Path from typing import TYPE_CHECKING import aiohttp import structlog from apscheduler.schedulers.asyncio import AsyncIOScheduler # type: ignore[import-untyped] from app.db import init_db, open_db from app.services import setup_service from app.services.geo_cache import GeoCache from app.tasks import blocklist_import, geo_cache_flush, geo_re_resolve, health_check, history_sync from app.utils.async_utils import run_blocking from app.utils.jail_config import ensure_jail_configs from app.utils.runtime_state import set_runtime_settings from app.utils.setup_state import set_setup_complete_cache if TYPE_CHECKING: from fastapi import FastAPI from app.config import Settings log: structlog.stdlib.BoundLogger = structlog.get_logger() def _check_single_worker_mode() -> None: """Verify that the application is running with a single worker. APScheduler's AsyncIOScheduler is bound to a single asyncio event loop and cannot be safely shared across multiple worker processes. If each worker starts its own scheduler instance, all background jobs execute N times (where N is the number of workers), resulting in duplicate blocklist imports, duplicate ban operations, duplicate history writes, and SQLite lock contention. This function detects multi-worker configurations and raises a clear RuntimeError with instructions. Raises: RuntimeError: If the app would run with multiple workers. """ # Check for explicit worker count env var (convention used in deployment) workers_env = os.environ.get("BANGUI_WORKERS") if workers_env is not None: try: worker_count = int(workers_env) if worker_count > 1: raise RuntimeError( "BanGUI background scheduler cannot run with multiple workers.\n" f"BANGUI_WORKERS is set to {worker_count}. Set it to 1 or remove it.\n" "See Architekture.md ยง Deployment Constraints for details." ) except ValueError as e: raise RuntimeError( f"BANGUI_WORKERS environment variable must be an integer, got: {workers_env}" ) from e async def _ensure_database_schema(database_path: str) -> None: """Create the configured runtime database if it does not already exist.""" db = await open_db(database_path) try: await init_db(db) finally: await db.close() def _create_http_session(settings: Settings) -> aiohttp.ClientSession: """Build a shared aiohttp session with reasonable global limits and timeouts.""" timeout = aiohttp.ClientTimeout( total=settings.http_request_timeout_seconds, connect=settings.http_connect_timeout_seconds, sock_read=settings.http_request_timeout_seconds, ) connector = aiohttp.TCPConnector( limit=settings.http_max_connections, limit_per_host=settings.http_max_connections, keepalive_timeout=settings.http_keepalive_timeout_seconds, enable_cleanup_closed=True, ) return aiohttp.ClientSession(timeout=timeout, connector=connector) async def startup_shared_resources( app: FastAPI, settings: Settings, ) -> tuple[aiohttp.ClientSession, AsyncIOScheduler]: """Create shared resources needed during the application lifespan. Args: app: The FastAPI application instance. settings: Resolved application settings. Returns: A tuple of ``(http_session, scheduler)``. """ _check_single_worker_mode() db_path: Path = Path(settings.database_path) await run_blocking(db_path.parent.mkdir, parents=True, exist_ok=True) log.debug("database_directory_ensured", directory=str(db_path.parent)) original_db_path = db_path.resolve() startup_db = await open_db(settings.database_path) try: await init_db(startup_db) setup_complete = await setup_service.is_setup_complete(startup_db) set_setup_complete_cache(app, setup_complete) log.debug("setup_completion_cached", completed=setup_complete) if setup_complete: runtime_database_path = await setup_service.get_runtime_database_path(startup_db) if runtime_database_path: if Path(runtime_database_path).resolve() != original_db_path: await _ensure_database_schema(runtime_database_path) runtime_db = await open_db(runtime_database_path) try: persisted_runtime_settings = ( await setup_service.get_persisted_runtime_settings(runtime_db) ) finally: await runtime_db.close() if persisted_runtime_settings: updated_settings = settings.model_copy(update=persisted_runtime_settings) set_runtime_settings(app, updated_settings) settings = updated_settings log.info( "runtime_settings_overridden_from_setup", overrides=persisted_runtime_settings, ) # Create and initialize the GeoCache instance geo_cache = GeoCache() if Path(settings.database_path).resolve() != original_db_path: runtime_db = await open_db(settings.database_path) try: await geo_cache.load_cache_from_db(runtime_db) unresolved_count = await geo_cache.count_unresolved(runtime_db) finally: await runtime_db.close() else: await geo_cache.load_cache_from_db(startup_db) unresolved_count = await geo_cache.count_unresolved(startup_db) finally: await startup_db.close() await run_blocking(ensure_jail_configs, Path(settings.fail2ban_config_dir) / "jail.d") if unresolved_count > 0: log.warning("geo_cache_unresolved_ips", unresolved=unresolved_count) http_session: aiohttp.ClientSession = _create_http_session(settings) geo_cache.init_geoip(settings.geoip_db_path) app.state.geo_cache = geo_cache scheduler: AsyncIOScheduler | None = None try: scheduler = AsyncIOScheduler(timezone="UTC") scheduler.start() health_check.register(app) await blocklist_import.register(app) geo_cache_flush.register(app) geo_re_resolve.register(app) history_sync.register(app) return http_session, scheduler except Exception: with suppress(Exception): await http_session.close() if scheduler is not None: with suppress(Exception): scheduler.shutdown(wait=False) raise