Implement periodic cleanup of expired rate-limiter entries to prevent unbounded memory growth during long runtimes. Changes: - Create rate_limiter_cleanup task that calls cleanup_expired() every 30 minutes - Register the task in the startup DAG alongside other background jobs - Update rate_limiter module documentation with operational notes about the cleanup lifecycle and memory management strategy The cleanup is conservative and only removes IPs with no recent attempts (all timestamps outside the rate-limit window), so active IPs are preserved. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
415 lines
14 KiB
Python
415 lines
14 KiB
Python
"""Application startup helpers.
|
|
|
|
This module contains shared startup logic extracted from ``app.main`` so that
|
|
initialisation is easier to reason about and unit test. The lifespan handler
|
|
in ``app.main`` delegates resource creation and task registration here.
|
|
|
|
The startup process is orchestrated by StartupDAG, which ensures all resources
|
|
are initialized in the correct order with explicit dependency tracking, and
|
|
cleanly rolls back on failure.
|
|
|
|
Startup Stages (in order):
|
|
1. WORKER_MODE: Verify single-worker mode (no multi-worker scheduler conflicts)
|
|
2. DATABASE: Initialize database schema and cache setup completion state
|
|
3. GEO_CACHE: Load and configure IP geolocation cache
|
|
4. HTTP_SESSION: Create shared aiohttp session with timeouts
|
|
5. SCHEDULER: Create APScheduler instance and register background tasks
|
|
6. TASKS: Verify all tasks are registered
|
|
|
|
See StartupDAG in app.startup_dag for full dependency graph and rollback logic.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
import aiohttp
|
|
import structlog
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler # type: ignore[import-untyped]
|
|
|
|
from app.db import init_db, open_db
|
|
from app.services import setup_service
|
|
from app.services.dns_validated_connector import create_dns_validated_socket_factory
|
|
from app.services.geo_cache import GeoCache
|
|
from app.startup_dag import StartupDAG, StartupStage
|
|
from app.tasks import (
|
|
blocklist_import,
|
|
geo_cache_cleanup,
|
|
geo_cache_flush,
|
|
geo_re_resolve,
|
|
health_check,
|
|
history_sync,
|
|
rate_limiter_cleanup,
|
|
session_cleanup,
|
|
)
|
|
from app.utils.async_utils import run_blocking
|
|
from app.utils.jail_config import ensure_jail_configs
|
|
from app.utils.runtime_state import set_runtime_settings
|
|
from app.utils.setup_state import set_setup_complete_cache
|
|
|
|
if TYPE_CHECKING:
|
|
from fastapi import FastAPI
|
|
|
|
from app.config import Settings
|
|
|
|
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
|
|
|
|
|
def _check_single_worker_mode() -> None:
|
|
"""Verify that the application is running with a single worker.
|
|
|
|
APScheduler's AsyncIOScheduler is bound to a single asyncio event loop
|
|
and cannot be safely shared across multiple worker processes. If each
|
|
worker starts its own scheduler instance, all background jobs execute N
|
|
times (where N is the number of workers), resulting in duplicate blocklist
|
|
imports, duplicate ban operations, duplicate history writes, and SQLite
|
|
lock contention.
|
|
|
|
This function detects multi-worker configurations and raises a clear
|
|
RuntimeError with instructions.
|
|
|
|
Raises:
|
|
RuntimeError: If the app would run with multiple workers.
|
|
"""
|
|
# Check for explicit worker count env var (convention used in deployment)
|
|
workers_env = os.environ.get("BANGUI_WORKERS")
|
|
if workers_env is not None:
|
|
try:
|
|
worker_count = int(workers_env)
|
|
if worker_count > 1:
|
|
raise RuntimeError(
|
|
"BanGUI background scheduler cannot run with multiple workers.\n"
|
|
f"BANGUI_WORKERS is set to {worker_count}. Set it to 1 or remove it.\n"
|
|
"See Architekture.md § Deployment Constraints for details."
|
|
)
|
|
except ValueError as e:
|
|
raise RuntimeError(
|
|
f"BANGUI_WORKERS environment variable must be an integer, got: {workers_env}"
|
|
) from e
|
|
|
|
|
|
async def _ensure_database_schema(database_path: str) -> None:
|
|
"""Create the configured runtime database if it does not already exist."""
|
|
db = await open_db(database_path)
|
|
try:
|
|
await init_db(db)
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
def _create_http_session(settings: Settings) -> aiohttp.ClientSession:
|
|
"""Build a shared aiohttp session with DNS-rebinding protection and reasonable limits.
|
|
|
|
Uses a custom socket factory that validates all resolved IPs at connection time,
|
|
preventing DNS-rebinding attacks where a blocklist URL initially resolves to
|
|
a public IP but later resolves to a private IP during the actual connection.
|
|
"""
|
|
timeout = aiohttp.ClientTimeout(
|
|
total=settings.http_request_timeout_seconds,
|
|
connect=settings.http_connect_timeout_seconds,
|
|
sock_read=settings.http_request_timeout_seconds,
|
|
)
|
|
connector = aiohttp.TCPConnector(
|
|
limit=settings.http_max_connections,
|
|
limit_per_host=settings.http_max_connections,
|
|
keepalive_timeout=settings.http_keepalive_timeout_seconds,
|
|
enable_cleanup_closed=True,
|
|
socket_factory=create_dns_validated_socket_factory(),
|
|
)
|
|
return aiohttp.ClientSession(timeout=timeout, connector=connector)
|
|
|
|
|
|
async def startup_shared_resources(
|
|
app: FastAPI,
|
|
settings: Settings,
|
|
) -> tuple[aiohttp.ClientSession, AsyncIOScheduler]:
|
|
"""Create shared resources needed during the application lifespan.
|
|
|
|
This function orchestrates the entire startup sequence through a StartupDAG,
|
|
ensuring all resources are initialized in the correct order with explicit
|
|
dependency tracking. If any stage fails, all completed resources are cleanly
|
|
rolled back.
|
|
|
|
The startup stages are:
|
|
1. WORKER_MODE: Validate single-worker configuration
|
|
2. DATABASE: Initialize database and load setup state
|
|
3. GEO_CACHE: Load IP geolocation cache
|
|
4. HTTP_SESSION: Create shared aiohttp session
|
|
5. SCHEDULER: Create and start APScheduler
|
|
6. TASKS: Register all background jobs
|
|
|
|
Args:
|
|
app: The FastAPI application instance.
|
|
settings: Resolved application settings.
|
|
|
|
Returns:
|
|
A tuple of ``(http_session, scheduler)``.
|
|
|
|
Raises:
|
|
RuntimeError: If any startup stage fails or prerequisites are not met.
|
|
"""
|
|
dag = StartupDAG()
|
|
|
|
# Register all startup stages with their dependencies.
|
|
dag.register_stage(
|
|
StartupStage.WORKER_MODE,
|
|
"Verify single-worker mode (scheduler must not run in multiple workers)",
|
|
prerequisites=frozenset(),
|
|
)
|
|
dag.register_stage(
|
|
StartupStage.DATABASE,
|
|
"Initialize database schema and load setup state",
|
|
prerequisites=frozenset([StartupStage.WORKER_MODE]),
|
|
)
|
|
dag.register_stage(
|
|
StartupStage.GEO_CACHE,
|
|
"Load IP geolocation cache from database",
|
|
prerequisites=frozenset([StartupStage.DATABASE]),
|
|
)
|
|
dag.register_stage(
|
|
StartupStage.HTTP_SESSION,
|
|
"Create shared aiohttp session with configured timeouts",
|
|
prerequisites=frozenset([StartupStage.GEO_CACHE]),
|
|
)
|
|
dag.register_stage(
|
|
StartupStage.SCHEDULER,
|
|
"Create and start APScheduler for background jobs",
|
|
prerequisites=frozenset([StartupStage.HTTP_SESSION]),
|
|
)
|
|
dag.register_stage(
|
|
StartupStage.TASKS,
|
|
"Register all background jobs (import, cleanup, health checks)",
|
|
prerequisites=frozenset([StartupStage.SCHEDULER]),
|
|
)
|
|
|
|
try:
|
|
# Stage 1: Validate single-worker mode
|
|
await dag.execute_stage(
|
|
StartupStage.WORKER_MODE,
|
|
_stage_check_worker_mode,
|
|
)
|
|
|
|
# Stage 2: Initialize database
|
|
startup_db = await dag.execute_stage(
|
|
StartupStage.DATABASE,
|
|
lambda: _stage_init_database(app, settings),
|
|
)
|
|
|
|
# Stage 3: Load GeoCache
|
|
geo_cache = await dag.execute_stage(
|
|
StartupStage.GEO_CACHE,
|
|
lambda: _stage_init_geo_cache(settings, startup_db),
|
|
)
|
|
|
|
# Stage 4: Create HTTP session
|
|
http_session = await dag.execute_stage(
|
|
StartupStage.HTTP_SESSION,
|
|
lambda: _stage_create_http_session(settings),
|
|
)
|
|
|
|
# Stage 5: Create and start scheduler
|
|
scheduler = await dag.execute_stage(
|
|
StartupStage.SCHEDULER,
|
|
lambda: _stage_create_scheduler(),
|
|
)
|
|
|
|
# Stage 6: Register tasks
|
|
await dag.execute_stage(
|
|
StartupStage.TASKS,
|
|
lambda: _stage_register_tasks(app, scheduler),
|
|
)
|
|
|
|
# Verify all resources are healthy
|
|
if not await dag.health_check():
|
|
raise RuntimeError("Startup health check failed")
|
|
|
|
# Store the geo_cache on app state for dependency injection
|
|
app.state.geo_cache = geo_cache
|
|
|
|
log.info(
|
|
"startup_completed_successfully",
|
|
stages=len(dag.context.completed_stages),
|
|
)
|
|
|
|
return http_session, scheduler
|
|
|
|
except Exception:
|
|
# Clean up on failure
|
|
log.error("startup_failed_rolling_back_resources")
|
|
await dag.rollback()
|
|
# Ensure database is closed if it was initialized
|
|
if StartupStage.DATABASE in dag.context.completed_stages:
|
|
startup_db = dag.context.get_resource(StartupStage.DATABASE)
|
|
await startup_db.close()
|
|
raise
|
|
|
|
|
|
async def _stage_check_worker_mode() -> None:
|
|
"""Check that the application is running with a single worker.
|
|
|
|
This is stage 1 of the startup DAG.
|
|
"""
|
|
_check_single_worker_mode()
|
|
|
|
|
|
async def _stage_init_database(app: FastAPI, settings: Settings) -> Any:
|
|
"""Initialize database schema and load setup state.
|
|
|
|
This is stage 2 of the startup DAG. It:
|
|
1. Creates database directory if needed
|
|
2. Opens the database connection
|
|
3. Initializes schema
|
|
4. Caches setup completion state
|
|
5. Loads persisted runtime settings
|
|
|
|
Returns:
|
|
The database connection object.
|
|
"""
|
|
db_path: Path = Path(settings.database_path)
|
|
await run_blocking(db_path.parent.mkdir, parents=True, exist_ok=True)
|
|
|
|
log.debug("database_directory_ensured", directory=str(db_path.parent))
|
|
|
|
original_db_path = db_path.resolve()
|
|
startup_db = await open_db(settings.database_path)
|
|
|
|
try:
|
|
await init_db(startup_db)
|
|
setup_complete = await setup_service.is_setup_complete(startup_db)
|
|
set_setup_complete_cache(app, setup_complete)
|
|
log.debug("setup_completion_cached", completed=setup_complete)
|
|
|
|
if setup_complete:
|
|
runtime_database_path = await setup_service.get_runtime_database_path(startup_db)
|
|
if runtime_database_path:
|
|
if Path(runtime_database_path).resolve() != original_db_path:
|
|
await _ensure_database_schema(runtime_database_path)
|
|
|
|
runtime_db = await open_db(runtime_database_path)
|
|
try:
|
|
persisted_runtime_settings = (
|
|
await setup_service.get_persisted_runtime_settings(runtime_db)
|
|
)
|
|
finally:
|
|
await runtime_db.close()
|
|
|
|
if persisted_runtime_settings:
|
|
updated_settings = settings.model_copy(update=persisted_runtime_settings)
|
|
set_runtime_settings(app, updated_settings)
|
|
log.info(
|
|
"runtime_settings_overridden_from_setup",
|
|
overrides=persisted_runtime_settings,
|
|
)
|
|
except Exception:
|
|
await startup_db.close()
|
|
raise
|
|
|
|
return startup_db
|
|
|
|
|
|
async def _stage_init_geo_cache(settings: Settings, startup_db: Any) -> GeoCache:
|
|
"""Load IP geolocation cache.
|
|
|
|
This is stage 3 of the startup DAG. It:
|
|
1. Creates GeoCache instance with configured settings
|
|
2. Loads cache from database
|
|
3. Counts unresolved IPs
|
|
4. Initializes GeoIP database
|
|
5. Logs warnings if necessary
|
|
|
|
Returns:
|
|
The GeoCache instance.
|
|
"""
|
|
geo_cache = GeoCache(allow_http_fallback=settings.geoip_allow_http_fallback)
|
|
|
|
db_path: Path = Path(settings.database_path)
|
|
original_db_path = db_path.resolve()
|
|
|
|
if db_path.resolve() != original_db_path:
|
|
runtime_db = await open_db(settings.database_path)
|
|
try:
|
|
await geo_cache.load_cache_from_db(runtime_db)
|
|
unresolved_count = await geo_cache.count_unresolved(runtime_db)
|
|
finally:
|
|
await runtime_db.close()
|
|
else:
|
|
await geo_cache.load_cache_from_db(startup_db)
|
|
unresolved_count = await geo_cache.count_unresolved(startup_db)
|
|
|
|
await run_blocking(ensure_jail_configs, Path(settings.fail2ban_config_dir) / "jail.d")
|
|
|
|
if unresolved_count > 0:
|
|
log.warning("geo_cache_unresolved_ips", unresolved=unresolved_count)
|
|
|
|
geo_cache.init_geoip(settings.geoip_db_path)
|
|
|
|
if settings.geoip_allow_http_fallback:
|
|
log.warning(
|
|
"geoip_http_fallback_enabled",
|
|
message=(
|
|
"WARNING: IP geolocation HTTP fallback is enabled. "
|
|
"IP addresses will be sent unencrypted to ip-api.com if the MaxMind database is unavailable. "
|
|
"This is a security and privacy risk. Disable BANGUI_GEOIP_ALLOW_HTTP_FALLBACK in production."
|
|
),
|
|
)
|
|
|
|
return geo_cache
|
|
|
|
|
|
async def _stage_create_http_session(settings: Settings) -> aiohttp.ClientSession:
|
|
"""Create shared aiohttp session with configured timeouts.
|
|
|
|
This is stage 4 of the startup DAG.
|
|
|
|
Returns:
|
|
The aiohttp ClientSession instance.
|
|
"""
|
|
return _create_http_session(settings)
|
|
|
|
|
|
async def _stage_create_scheduler() -> AsyncIOScheduler:
|
|
"""Create and start APScheduler.
|
|
|
|
This is stage 5 of the startup DAG.
|
|
|
|
Returns:
|
|
The AsyncIOScheduler instance.
|
|
|
|
Raises:
|
|
RuntimeError: If scheduler creation or startup fails.
|
|
"""
|
|
scheduler: AsyncIOScheduler = AsyncIOScheduler(timezone="UTC")
|
|
scheduler.start()
|
|
return scheduler
|
|
|
|
|
|
async def _stage_register_tasks(app: FastAPI, scheduler: AsyncIOScheduler) -> None:
|
|
"""Register all background jobs.
|
|
|
|
This is stage 6 of the startup DAG. It registers:
|
|
- health_check: Periodic fail2ban connectivity probe
|
|
- blocklist_import: Scheduled blocklist download and application
|
|
- geo_cache_cleanup: Periodic purge of stale geo cache entries
|
|
- geo_cache_flush: Periodic geo cache persistence
|
|
- geo_re_resolve: Periodic re-resolution of stale records
|
|
- history_sync: Periodic synchronization of ban history
|
|
- session_cleanup: Periodic cleanup of expired sessions
|
|
- rate_limiter_cleanup: Periodic cleanup of expired rate-limiter entries
|
|
|
|
Args:
|
|
app: The FastAPI application instance.
|
|
scheduler: The APScheduler scheduler to register tasks with.
|
|
"""
|
|
health_check.register(app)
|
|
await blocklist_import.register(app)
|
|
geo_cache_cleanup.register(app)
|
|
geo_cache_flush.register(app)
|
|
geo_re_resolve.register(app)
|
|
history_sync.register(app)
|
|
session_cleanup.register(app)
|
|
rate_limiter_cleanup.register(app)
|
|
|
|
log.info("startup_tasks_registered", count=8)
|