Files
BanGUI/backend/app/tasks/scheduler_lock_heartbeat.py
Lukas 187cd8250d Implement database-backed scheduler lock for multi-worker safety
Enforce single-executor safety regardless of process launcher through a
robust database-backed lock mechanism that works reliably in container
orchestration environments.

Key changes:
1. Add scheduler_lock table to database schema (migration 4)
   - Singleton row (id=1) prevents concurrent execution
   - Stores PID, hostname, creation timestamp, heartbeat timestamp
   - Atomic transaction prevents race conditions

2. Create scheduler lock utility (app/utils/scheduler_lock.py)
   - acquire_scheduler_lock(): Atomically acquire or fail
   - release_scheduler_lock(): Clean up on shutdown
   - update_scheduler_lock_heartbeat(): Keep lock alive (every 10 seconds)
   - get_scheduler_lock_info(): Debug/inspect lock status
   - Stale lock detection: TTL-based (60 second expiry)

3. Reorder startup DAG stages
   - DATABASE now comes first (required for lock acquisition)
   - WORKER_MODE depends on DATABASE (performs lock check after initialization)
   - Maintains all other stage dependencies intact

4. Update startup process (app/startup.py)
   - Replace _check_single_worker_mode() with two-tier check:
     * Fast check: BANGUI_WORKERS env var (if explicitly set to >1)
     * Authoritative check: Database lock (catches misconfiguration)
   - Return startup_db from startup_shared_resources() for lock management

5. Register scheduler lock heartbeat task
   - New task: scheduler_lock_heartbeat (app/tasks/scheduler_lock_heartbeat.py)
   - Updates lock heartbeat every 10 seconds (keeps lock alive)
   - Prevents false positives from temporary load spikes

6. Add lock release to lifespan shutdown (app/main.py)
   - Release lock before closing database
   - Allows other instances to acquire during rolling deployments
   - Graceful handoff between instances

7. Comprehensive test coverage (backend/tests/test_scheduler_lock.py)
   - Lock acquisition success and failure cases
   - Stale lock cleanup on startup
   - Lock release and heartbeat updates
   - Full lifecycle: acquire → heartbeat → release

8. Update documentation (Docs/Architekture.md § 9.3)
   - Explain single-executor requirement
   - Document database-backed locking mechanism
   - Compare with alternative approaches (filesystem, env var)
   - Include troubleshooting guide
   - Container orchestration examples (Docker, Kubernetes, systemd)

Why database-backed instead of filesystem?
   - Atomicity: SQLite transactions prevent TOCTOU race windows
   - Container-safe: Works across containers with shared DB volumes
   - No NFS/SMB edge cases
   - Timestamp-based stale detection (PID reuse is unreliable)
   - More reliable in rolling deployments

Benefits:
   - Works with any process manager (uvicorn, gunicorn, etc.)
   - Handles simultaneous startup attempts correctly
   - Automatic failover on instance crash (stale lock cleanup)
   - Clear error messages with troubleshooting steps
   - No environment variable required (lock is authoritative)
   - Scales to multi-worker deployments if combined with external job store

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-29 20:10:53 +02:00

85 lines
2.7 KiB
Python

"""Scheduler lock heartbeat background task.
Registers an APScheduler job that periodically updates the scheduler lock's
heartbeat timestamp. This prevents the lock from being considered stale
if the running instance experiences temporary delays or high load.
Without this heartbeat, stale lock detection (based on TTL) could incorrectly
determine that the scheduler instance has crashed when it's merely busy, and
a new instance could take over.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import structlog
from app.tasks.db import task_db
from app.utils.runtime_state import get_effective_settings
from app.utils.scheduler_lock import update_scheduler_lock_heartbeat
if TYPE_CHECKING:
from fastapi import FastAPI
from app.config import Settings
log: structlog.stdlib.BoundLogger = structlog.get_logger()
#: How often the heartbeat job fires (seconds). Must be less than the lock TTL.
SCHEDULER_LOCK_HEARTBEAT_INTERVAL: int = 10
#: Stable APScheduler job ID — ensures re-registration replaces, not duplicates.
JOB_ID: str = "scheduler_lock_heartbeat"
async def _update_heartbeat_with_resources(settings: Settings) -> None:
"""Update the scheduler lock heartbeat timestamp.
If the heartbeat update fails (e.g., we no longer hold the lock), log
a warning but don't crash the scheduler. This allows the running
application to continue even if something went wrong.
Args:
settings: The resolved application settings used for database access.
"""
async with task_db(settings) as db:
success = await update_scheduler_lock_heartbeat(db)
if success:
log.debug("scheduler_lock_heartbeat_updated")
else:
log.warning(
"scheduler_lock_heartbeat_failed",
message="Failed to update heartbeat; we may have lost the lock.",
)
async def _update_heartbeat(app: FastAPI) -> None:
await _update_heartbeat_with_resources(get_effective_settings(app))
def register(app: FastAPI) -> None:
"""Add (or replace) the scheduler lock heartbeat job.
Must be called after the scheduler has been started (i.e., inside the
lifespan handler, after ``scheduler.start()``).
Args:
app: The :class:`fastapi.FastAPI` application instance whose
``app.state.scheduler`` will receive the job.
"""
settings = get_effective_settings(app)
app.state.scheduler.add_job(
_update_heartbeat_with_resources,
trigger="interval",
seconds=SCHEDULER_LOCK_HEARTBEAT_INTERVAL,
kwargs={"settings": settings},
id=JOB_ID,
replace_existing=True,
)
log.info(
"scheduler_lock_heartbeat_scheduled",
interval_seconds=SCHEDULER_LOCK_HEARTBEAT_INTERVAL,
)