"""Geo cache flush background task. Registers an APScheduler job that periodically persists newly resolved IP geo entries from the in-memory dirty set to the ``geo_cache`` table. After Task 2 removed geo cache writes from GET requests, newly resolved IPs are only held in the in-memory cache until this task flushes them. With the default 60-second interval, at most one minute of new resolution results is at risk on an unexpected process restart. Correlation IDs are propagated through the task using :mod:`app.utils.correlation` so that task logs can be correlated across runs. """ from __future__ import annotations import uuid from typing import TYPE_CHECKING import structlog from app.tasks.db import task_db from app.tasks.timeout_utils import run_with_timeout from app.utils.correlation import get_correlation_id, reset_correlation_id, set_correlation_id from app.utils.runtime_state import get_effective_settings if TYPE_CHECKING: from fastapi import FastAPI from app.config import Settings from app.services.geo_cache import GeoCache log: structlog.stdlib.BoundLogger = structlog.get_logger() #: How often the flush job fires (seconds). Configurable tuning constant. GEO_FLUSH_INTERVAL: int = 60 #: Stable APScheduler job ID — ensures re-registration replaces, not duplicates. JOB_ID: str = "geo_cache_flush" #: Maximum seconds to allow for geo cache flush to complete. TASK_TIMEOUT_SECONDS: int = 60 async def _run_flush_with_resources( geo_cache: GeoCache, settings: Settings, correlation_id: str | None = None, ) -> None: """Flush the geo cache dirty set to the application database. Args: geo_cache: The application's GeoCache instance. settings: The resolved application settings used for database access. correlation_id: Optional correlation ID for log correlation. """ if correlation_id is None: correlation_id = str(uuid.uuid4()) token = set_correlation_id(correlation_id) try: await _do_flush_with_resources(geo_cache, settings) finally: reset_correlation_id(token) async def _do_flush_with_resources(geo_cache: GeoCache, settings: Settings) -> None: """Inner flush logic that runs with correlation context set.""" async def _do_flush() -> None: async with task_db(settings) as db: count = await geo_cache.flush_dirty(db) if count > 0: log.debug("geo_cache_flush_ran", correlation_id=get_correlation_id(), flushed=count) await run_with_timeout("geo_cache_flush", _do_flush(), TASK_TIMEOUT_SECONDS) async def _run_flush(app: FastAPI) -> None: geo_cache: GeoCache = app.state.geo_cache await _run_flush_with_resources(geo_cache, get_effective_settings(app)) def register(app: FastAPI) -> None: """Add (or replace) the geo cache flush job in the application scheduler. Must be called after the scheduler has been started (i.e., inside the lifespan handler, after ``scheduler.start()``). Args: app: The :class:`fastapi.FastAPI` application instance whose ``app.state.scheduler`` will receive the job. """ geo_cache: GeoCache = app.state.geo_cache settings = get_effective_settings(app) app.state.scheduler.add_job( _run_flush_with_resources, trigger="interval", seconds=GEO_FLUSH_INTERVAL, kwargs={"geo_cache": geo_cache, "settings": settings}, id=JOB_ID, replace_existing=True, ) log.info("geo_cache_flush_scheduled", interval_seconds=GEO_FLUSH_INTERVAL)