"""Scheduler lock heartbeat background task. Registers an APScheduler job that periodically updates the scheduler lock's heartbeat timestamp. This prevents the lock from being considered stale if the running instance experiences temporary delays or high load. Without this heartbeat, stale lock detection (based on TTL) could incorrectly determine that the scheduler instance has crashed when it's merely busy, and a new instance could take over. """ from __future__ import annotations from typing import TYPE_CHECKING import structlog from app.tasks.db import task_db from app.utils.runtime_state import get_effective_settings from app.utils.scheduler_lock import update_scheduler_lock_heartbeat if TYPE_CHECKING: from fastapi import FastAPI from app.config import Settings log: structlog.stdlib.BoundLogger = structlog.get_logger() #: How often the heartbeat job fires (seconds). Must be less than the lock TTL. SCHEDULER_LOCK_HEARTBEAT_INTERVAL: int = 10 #: Stable APScheduler job ID — ensures re-registration replaces, not duplicates. JOB_ID: str = "scheduler_lock_heartbeat" async def _update_heartbeat_with_resources(settings: Settings) -> None: """Update the scheduler lock heartbeat timestamp. If the heartbeat update fails (e.g., we no longer hold the lock), log a warning but don't crash the scheduler. This allows the running application to continue even if something went wrong. Args: settings: The resolved application settings used for database access. """ async with task_db(settings) as db: success = await update_scheduler_lock_heartbeat(db) if success: log.debug("scheduler_lock_heartbeat_updated") else: log.warning( "scheduler_lock_heartbeat_failed", message="Failed to update heartbeat; we may have lost the lock.", ) async def _update_heartbeat(app: FastAPI) -> None: await _update_heartbeat_with_resources(get_effective_settings(app)) def register(app: FastAPI) -> None: """Add (or replace) the scheduler lock heartbeat job. Must be called after the scheduler has been started (i.e., inside the lifespan handler, after ``scheduler.start()``). Args: app: The :class:`fastapi.FastAPI` application instance whose ``app.state.scheduler`` will receive the job. """ settings = get_effective_settings(app) app.state.scheduler.add_job( _update_heartbeat_with_resources, trigger="interval", seconds=SCHEDULER_LOCK_HEARTBEAT_INTERVAL, kwargs={"settings": settings}, id=JOB_ID, replace_existing=True, ) log.info( "scheduler_lock_heartbeat_scheduled", interval_seconds=SCHEDULER_LOCK_HEARTBEAT_INTERVAL, )