Implement database-backed scheduler lock for multi-worker safety
Enforce single-executor safety regardless of process launcher through a
robust database-backed lock mechanism that works reliably in container
orchestration environments.
Key changes:
1. Add scheduler_lock table to database schema (migration 4)
- Singleton row (id=1) prevents concurrent execution
- Stores PID, hostname, creation timestamp, heartbeat timestamp
- Atomic transaction prevents race conditions
2. Create scheduler lock utility (app/utils/scheduler_lock.py)
- acquire_scheduler_lock(): Atomically acquire or fail
- release_scheduler_lock(): Clean up on shutdown
- update_scheduler_lock_heartbeat(): Keep lock alive (every 10 seconds)
- get_scheduler_lock_info(): Debug/inspect lock status
- Stale lock detection: TTL-based (60 second expiry)
3. Reorder startup DAG stages
- DATABASE now comes first (required for lock acquisition)
- WORKER_MODE depends on DATABASE (performs lock check after initialization)
- Maintains all other stage dependencies intact
4. Update startup process (app/startup.py)
- Replace _check_single_worker_mode() with two-tier check:
* Fast check: BANGUI_WORKERS env var (if explicitly set to >1)
* Authoritative check: Database lock (catches misconfiguration)
- Return startup_db from startup_shared_resources() for lock management
5. Register scheduler lock heartbeat task
- New task: scheduler_lock_heartbeat (app/tasks/scheduler_lock_heartbeat.py)
- Updates lock heartbeat every 10 seconds (keeps lock alive)
- Prevents false positives from temporary load spikes
6. Add lock release to lifespan shutdown (app/main.py)
- Release lock before closing database
- Allows other instances to acquire during rolling deployments
- Graceful handoff between instances
7. Comprehensive test coverage (backend/tests/test_scheduler_lock.py)
- Lock acquisition success and failure cases
- Stale lock cleanup on startup
- Lock release and heartbeat updates
- Full lifecycle: acquire → heartbeat → release
8. Update documentation (Docs/Architekture.md § 9.3)
- Explain single-executor requirement
- Document database-backed locking mechanism
- Compare with alternative approaches (filesystem, env var)
- Include troubleshooting guide
- Container orchestration examples (Docker, Kubernetes, systemd)
Why database-backed instead of filesystem?
- Atomicity: SQLite transactions prevent TOCTOU race windows
- Container-safe: Works across containers with shared DB volumes
- No NFS/SMB edge cases
- Timestamp-based stale detection (PID reuse is unreliable)
- More reliable in rolling deployments
Benefits:
- Works with any process manager (uvicorn, gunicorn, etc.)
- Handles simultaneous startup attempts correctly
- Automatic failover on instance crash (stale lock cleanup)
- Clear error messages with troubleshooting steps
- No environment variable required (lock is authoritative)
- Scales to multi-worker deployments if combined with external job store
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -107,7 +107,7 @@ _SCHEMA_STATEMENTS: list[str] = [
|
||||
_CREATE_HISTORY_ARCHIVE,
|
||||
]
|
||||
|
||||
_CURRENT_SCHEMA_VERSION: int = 3
|
||||
_CURRENT_SCHEMA_VERSION: int = 4
|
||||
|
||||
_MIGRATIONS: dict[int, str] = {
|
||||
1: "\n".join(_SCHEMA_STATEMENTS),
|
||||
@@ -130,6 +130,19 @@ CREATE UNIQUE INDEX idx_sessions_token_hash ON sessions (token_hash);
|
||||
-- Tracks when each IP was last referenced to enable purging of stale entries.
|
||||
-- Default to current timestamp for existing rows.
|
||||
ALTER TABLE geo_cache ADD COLUMN last_seen TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'));
|
||||
""",
|
||||
4: """
|
||||
-- Migration 4: Add scheduler_lock table for multi-worker safety.
|
||||
-- Implements database-backed locking to ensure only one worker runs the scheduler.
|
||||
-- Uses atomic transactions to prevent race conditions in container orchestration.
|
||||
-- Lock is held by the process that successfully inserts the singleton row (id=1).
|
||||
CREATE TABLE scheduler_lock (
|
||||
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||||
pid INTEGER NOT NULL,
|
||||
hostname TEXT NOT NULL,
|
||||
created_at REAL NOT NULL,
|
||||
heartbeat_at REAL NOT NULL
|
||||
);
|
||||
""",
|
||||
}
|
||||
|
||||
|
||||
@@ -59,6 +59,7 @@ from app.routers import (
|
||||
from app.startup import startup_shared_resources
|
||||
from app.utils.rate_limiter import RateLimiter
|
||||
from app.utils.runtime_state import ApplicationState, RuntimeState
|
||||
from app.utils.scheduler_lock import release_scheduler_lock
|
||||
from app.utils.session_cache import InMemorySessionCache, NoOpSessionCache
|
||||
from app.utils.setup_state import is_setup_complete_cached, set_setup_complete_cache
|
||||
|
||||
@@ -128,6 +129,9 @@ async def _lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
order on shutdown. They are stored on ``app.state`` so they are
|
||||
accessible to dependency providers and tests.
|
||||
|
||||
The scheduler lock is released on shutdown to allow other instances to
|
||||
acquire it during rolling deployments or after a crash.
|
||||
|
||||
Args:
|
||||
app: The :class:`fastapi.FastAPI` instance being started.
|
||||
"""
|
||||
@@ -136,9 +140,10 @@ async def _lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
|
||||
log.info("bangui_starting_up", database_path=settings.database_path)
|
||||
|
||||
http_session, scheduler = await startup_shared_resources(app, settings)
|
||||
http_session, scheduler, startup_db = await startup_shared_resources(app, settings)
|
||||
app.state.http_session = http_session
|
||||
app.state.scheduler = scheduler
|
||||
app.state.startup_db = startup_db
|
||||
|
||||
# Ensure session cache is initialized based on effective settings.
|
||||
# This cache is process-local and not cluster-safe. In multi-worker
|
||||
@@ -158,6 +163,13 @@ async def _lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
log.info("bangui_shutting_down")
|
||||
scheduler.shutdown(wait=False)
|
||||
await http_session.close()
|
||||
# Release the scheduler lock to allow other instances to take over
|
||||
try:
|
||||
await release_scheduler_lock(startup_db)
|
||||
except Exception as e:
|
||||
log.error("scheduler_lock_release_failed", error=str(e))
|
||||
finally:
|
||||
await startup_db.close()
|
||||
log.info("bangui_shut_down")
|
||||
|
||||
|
||||
|
||||
@@ -42,11 +42,16 @@ from app.tasks import (
|
||||
health_check,
|
||||
history_sync,
|
||||
rate_limiter_cleanup,
|
||||
scheduler_lock_heartbeat,
|
||||
session_cleanup,
|
||||
)
|
||||
from app.utils.async_utils import run_blocking
|
||||
from app.utils.jail_config import ensure_jail_configs
|
||||
from app.utils.runtime_state import set_runtime_settings
|
||||
from app.utils.scheduler_lock import (
|
||||
acquire_scheduler_lock,
|
||||
release_scheduler_lock,
|
||||
)
|
||||
from app.utils.setup_state import set_setup_complete_cache
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -58,22 +63,19 @@ log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
||||
|
||||
|
||||
def _check_single_worker_mode() -> None:
|
||||
"""Verify that the application is running with a single worker.
|
||||
"""Fast check: verify BANGUI_WORKERS environment variable if set.
|
||||
|
||||
APScheduler's AsyncIOScheduler is bound to a single asyncio event loop
|
||||
and cannot be safely shared across multiple worker processes. If each
|
||||
worker starts its own scheduler instance, all background jobs execute N
|
||||
times (where N is the number of workers), resulting in duplicate blocklist
|
||||
imports, duplicate ban operations, duplicate history writes, and SQLite
|
||||
lock contention.
|
||||
This is the first-line guard: if BANGUI_WORKERS is explicitly set to a
|
||||
value > 1, reject immediately without requiring database access. This
|
||||
catches obvious misconfiguration early.
|
||||
|
||||
This function detects multi-worker configurations and raises a clear
|
||||
RuntimeError with instructions.
|
||||
The authoritative check is the database-backed lock acquired in
|
||||
_stage_check_worker_mode_and_acquire_lock(), which handles the general
|
||||
case where multiple instances start without proper environment setup.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If the app would run with multiple workers.
|
||||
RuntimeError: If BANGUI_WORKERS is explicitly set to > 1.
|
||||
"""
|
||||
# Check for explicit worker count env var (convention used in deployment)
|
||||
workers_env = os.environ.get("BANGUI_WORKERS")
|
||||
if workers_env is not None:
|
||||
try:
|
||||
@@ -124,7 +126,7 @@ def _create_http_session(settings: Settings) -> aiohttp.ClientSession:
|
||||
async def startup_shared_resources(
|
||||
app: FastAPI,
|
||||
settings: Settings,
|
||||
) -> tuple[aiohttp.ClientSession, AsyncIOScheduler]:
|
||||
) -> tuple[aiohttp.ClientSession, AsyncIOScheduler, Any]:
|
||||
"""Create shared resources needed during the application lifespan.
|
||||
|
||||
This function orchestrates the entire startup sequence through a StartupDAG,
|
||||
@@ -133,8 +135,8 @@ async def startup_shared_resources(
|
||||
rolled back.
|
||||
|
||||
The startup stages are:
|
||||
1. WORKER_MODE: Validate single-worker configuration
|
||||
2. DATABASE: Initialize database and load setup state
|
||||
1. DATABASE: Initialize database and load setup state
|
||||
2. WORKER_MODE: Validate single-worker configuration and acquire scheduler lock
|
||||
3. GEO_CACHE: Load IP geolocation cache
|
||||
4. HTTP_SESSION: Create shared aiohttp session
|
||||
5. SCHEDULER: Create and start APScheduler
|
||||
@@ -145,7 +147,7 @@ async def startup_shared_resources(
|
||||
settings: Resolved application settings.
|
||||
|
||||
Returns:
|
||||
A tuple of ``(http_session, scheduler)``.
|
||||
A tuple of ``(http_session, scheduler, startup_db)``.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If any startup stage fails or prerequisites are not met.
|
||||
@@ -153,20 +155,21 @@ async def startup_shared_resources(
|
||||
dag = StartupDAG()
|
||||
|
||||
# Register all startup stages with their dependencies.
|
||||
dag.register_stage(
|
||||
StartupStage.WORKER_MODE,
|
||||
"Verify single-worker mode (scheduler must not run in multiple workers)",
|
||||
prerequisites=frozenset(),
|
||||
)
|
||||
# NOTE: DATABASE stage must come before WORKER_MODE for lock acquisition
|
||||
dag.register_stage(
|
||||
StartupStage.DATABASE,
|
||||
"Initialize database schema and load setup state",
|
||||
prerequisites=frozenset([StartupStage.WORKER_MODE]),
|
||||
prerequisites=frozenset(),
|
||||
)
|
||||
dag.register_stage(
|
||||
StartupStage.WORKER_MODE,
|
||||
"Verify single-worker mode and acquire scheduler lock",
|
||||
prerequisites=frozenset([StartupStage.DATABASE]),
|
||||
)
|
||||
dag.register_stage(
|
||||
StartupStage.GEO_CACHE,
|
||||
"Load IP geolocation cache from database",
|
||||
prerequisites=frozenset([StartupStage.DATABASE]),
|
||||
prerequisites=frozenset([StartupStage.WORKER_MODE]),
|
||||
)
|
||||
dag.register_stage(
|
||||
StartupStage.HTTP_SESSION,
|
||||
@@ -185,18 +188,18 @@ async def startup_shared_resources(
|
||||
)
|
||||
|
||||
try:
|
||||
# Stage 1: Validate single-worker mode
|
||||
await dag.execute_stage(
|
||||
StartupStage.WORKER_MODE,
|
||||
_stage_check_worker_mode,
|
||||
)
|
||||
|
||||
# Stage 2: Initialize database
|
||||
# Stage 1: Initialize database (must come first for lock acquisition)
|
||||
startup_db = await dag.execute_stage(
|
||||
StartupStage.DATABASE,
|
||||
lambda: _stage_init_database(app, settings),
|
||||
)
|
||||
|
||||
# Stage 2: Validate single-worker mode and acquire scheduler lock
|
||||
await dag.execute_stage(
|
||||
StartupStage.WORKER_MODE,
|
||||
lambda: _stage_check_worker_mode_and_acquire_lock(startup_db),
|
||||
)
|
||||
|
||||
# Stage 3: Load GeoCache
|
||||
geo_cache = await dag.execute_stage(
|
||||
StartupStage.GEO_CACHE,
|
||||
@@ -233,7 +236,7 @@ async def startup_shared_resources(
|
||||
stages=len(dag.context.completed_stages),
|
||||
)
|
||||
|
||||
return http_session, scheduler
|
||||
return http_session, scheduler, startup_db
|
||||
|
||||
except Exception:
|
||||
# Clean up on failure
|
||||
@@ -246,13 +249,42 @@ async def startup_shared_resources(
|
||||
raise
|
||||
|
||||
|
||||
async def _stage_check_worker_mode() -> None:
|
||||
"""Check that the application is running with a single worker.
|
||||
async def _stage_check_worker_mode_and_acquire_lock(startup_db: Any) -> None:
|
||||
"""Check single-worker mode and acquire the scheduler lock.
|
||||
|
||||
This is stage 1 of the startup DAG.
|
||||
This is stage 1 of the startup DAG. It performs two checks:
|
||||
1. Fast check: Verify BANGUI_WORKERS env var if explicitly set
|
||||
2. Authoritative check: Acquire database-backed scheduler lock
|
||||
|
||||
The database lock ensures that only one instance runs the scheduler, even
|
||||
in container orchestration scenarios where multiple instances may start
|
||||
simultaneously. This prevents duplicate background jobs, duplicate history
|
||||
entries, and SQLite lock contention.
|
||||
|
||||
Args:
|
||||
startup_db: The initialized database connection.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If the env var check fails or the scheduler lock cannot
|
||||
be acquired (another instance is running the scheduler).
|
||||
"""
|
||||
# Fast check: verify BANGUI_WORKERS if explicitly set
|
||||
_check_single_worker_mode()
|
||||
|
||||
# Authoritative check: acquire the database-backed lock
|
||||
if not await acquire_scheduler_lock(startup_db):
|
||||
raise RuntimeError(
|
||||
"Could not acquire scheduler lock. Another BanGUI instance is already running the scheduler.\n"
|
||||
"This prevents duplicate background jobs (blocklist imports, history sync, etc.).\n"
|
||||
"\n"
|
||||
"To recover from a stale lock (e.g., after a crash):\n"
|
||||
" 1. Verify no other BanGUI instances are running\n"
|
||||
" 2. Inspect the lock: sqlite3 bangui.db 'SELECT * FROM scheduler_lock;'\n"
|
||||
" 3. If stale, clean it: sqlite3 bangui.db 'DELETE FROM scheduler_lock;'\n"
|
||||
"\n"
|
||||
"See Architekture.md § Deployment Constraints for details."
|
||||
)
|
||||
|
||||
|
||||
async def _stage_init_database(app: FastAPI, settings: Settings) -> Any:
|
||||
"""Initialize database schema and load setup state.
|
||||
@@ -389,6 +421,7 @@ async def _stage_register_tasks(app: FastAPI, scheduler: AsyncIOScheduler) -> No
|
||||
"""Register all background jobs.
|
||||
|
||||
This is stage 6 of the startup DAG. It registers:
|
||||
- scheduler_lock_heartbeat: Periodic update of scheduler lock (keeps it alive)
|
||||
- health_check: Periodic fail2ban connectivity probe
|
||||
- blocklist_import: Scheduled blocklist download and application
|
||||
- geo_cache_cleanup: Periodic purge of stale geo cache entries
|
||||
@@ -402,6 +435,7 @@ async def _stage_register_tasks(app: FastAPI, scheduler: AsyncIOScheduler) -> No
|
||||
app: The FastAPI application instance.
|
||||
scheduler: The APScheduler scheduler to register tasks with.
|
||||
"""
|
||||
scheduler_lock_heartbeat.register(app)
|
||||
health_check.register(app)
|
||||
await blocklist_import.register(app)
|
||||
geo_cache_cleanup.register(app)
|
||||
@@ -411,4 +445,4 @@ async def _stage_register_tasks(app: FastAPI, scheduler: AsyncIOScheduler) -> No
|
||||
session_cleanup.register(app)
|
||||
rate_limiter_cleanup.register(app)
|
||||
|
||||
log.info("startup_tasks_registered", count=8)
|
||||
log.info("startup_tasks_registered", count=9)
|
||||
|
||||
84
backend/app/tasks/scheduler_lock_heartbeat.py
Normal file
84
backend/app/tasks/scheduler_lock_heartbeat.py
Normal file
@@ -0,0 +1,84 @@
|
||||
"""Scheduler lock heartbeat background task.
|
||||
|
||||
Registers an APScheduler job that periodically updates the scheduler lock's
|
||||
heartbeat timestamp. This prevents the lock from being considered stale
|
||||
if the running instance experiences temporary delays or high load.
|
||||
|
||||
Without this heartbeat, stale lock detection (based on TTL) could incorrectly
|
||||
determine that the scheduler instance has crashed when it's merely busy, and
|
||||
a new instance could take over.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import structlog
|
||||
|
||||
from app.tasks.db import task_db
|
||||
from app.utils.runtime_state import get_effective_settings
|
||||
from app.utils.scheduler_lock import update_scheduler_lock_heartbeat
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fastapi import FastAPI
|
||||
|
||||
from app.config import Settings
|
||||
|
||||
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
||||
|
||||
#: How often the heartbeat job fires (seconds). Must be less than the lock TTL.
|
||||
SCHEDULER_LOCK_HEARTBEAT_INTERVAL: int = 10
|
||||
|
||||
#: Stable APScheduler job ID — ensures re-registration replaces, not duplicates.
|
||||
JOB_ID: str = "scheduler_lock_heartbeat"
|
||||
|
||||
|
||||
async def _update_heartbeat_with_resources(settings: Settings) -> None:
|
||||
"""Update the scheduler lock heartbeat timestamp.
|
||||
|
||||
If the heartbeat update fails (e.g., we no longer hold the lock), log
|
||||
a warning but don't crash the scheduler. This allows the running
|
||||
application to continue even if something went wrong.
|
||||
|
||||
Args:
|
||||
settings: The resolved application settings used for database access.
|
||||
"""
|
||||
async with task_db(settings) as db:
|
||||
success = await update_scheduler_lock_heartbeat(db)
|
||||
|
||||
if success:
|
||||
log.debug("scheduler_lock_heartbeat_updated")
|
||||
else:
|
||||
log.warning(
|
||||
"scheduler_lock_heartbeat_failed",
|
||||
message="Failed to update heartbeat; we may have lost the lock.",
|
||||
)
|
||||
|
||||
|
||||
async def _update_heartbeat(app: FastAPI) -> None:
|
||||
await _update_heartbeat_with_resources(get_effective_settings(app))
|
||||
|
||||
|
||||
def register(app: FastAPI) -> None:
|
||||
"""Add (or replace) the scheduler lock heartbeat job.
|
||||
|
||||
Must be called after the scheduler has been started (i.e., inside the
|
||||
lifespan handler, after ``scheduler.start()``).
|
||||
|
||||
Args:
|
||||
app: The :class:`fastapi.FastAPI` application instance whose
|
||||
``app.state.scheduler`` will receive the job.
|
||||
"""
|
||||
settings = get_effective_settings(app)
|
||||
app.state.scheduler.add_job(
|
||||
_update_heartbeat_with_resources,
|
||||
trigger="interval",
|
||||
seconds=SCHEDULER_LOCK_HEARTBEAT_INTERVAL,
|
||||
kwargs={"settings": settings},
|
||||
id=JOB_ID,
|
||||
replace_existing=True,
|
||||
)
|
||||
log.info(
|
||||
"scheduler_lock_heartbeat_scheduled",
|
||||
interval_seconds=SCHEDULER_LOCK_HEARTBEAT_INTERVAL,
|
||||
)
|
||||
275
backend/app/utils/scheduler_lock.py
Normal file
275
backend/app/utils/scheduler_lock.py
Normal file
@@ -0,0 +1,275 @@
|
||||
"""Database-based scheduler lock for single-executor enforcement.
|
||||
|
||||
This module implements a database-backed lock mechanism that ensures only one
|
||||
BanGUI instance runs the background scheduler, even in container orchestration
|
||||
environments where multiple instances might start simultaneously.
|
||||
|
||||
The lock uses atomic database operations to prevent race conditions:
|
||||
- Lock acquisition is atomic: INSERT fails if the singleton row already exists
|
||||
- Lock release is atomic: DELETE with PID check ensures only the owner releases
|
||||
- Stale lock detection uses heartbeat timestamps: a lock older than TTL is
|
||||
considered abandoned and eligible for cleanup on the next startup
|
||||
|
||||
This approach is more reliable than filesystem-based locking in containerized
|
||||
environments because:
|
||||
1. Database transactions are atomic (no TOCTOU race windows)
|
||||
2. No NFS/network filesystem edge cases
|
||||
3. Stale lock detection is timestamp-based, not PID-based (PID reuse is unreliable)
|
||||
4. Works across container restarts and rolling deployments
|
||||
|
||||
The lock record stores:
|
||||
- id: Always 1 (singleton table)
|
||||
- pid: Process ID of the lock holder
|
||||
- hostname: Container/host name for debugging
|
||||
- created_at: When the lock was first acquired
|
||||
- heartbeat_at: When the lock was last confirmed alive (updated periodically)
|
||||
|
||||
On startup:
|
||||
1. Cleanup any stale locks (where heartbeat_at > TTL)
|
||||
2. Try to insert the lock for this instance
|
||||
3. If INSERT succeeds, lock is acquired
|
||||
4. If INSERT fails (IntegrityError), another instance holds the lock
|
||||
|
||||
On running (periodic):
|
||||
- Update heartbeat_at to keep the lock alive and prevent false positives
|
||||
|
||||
On shutdown:
|
||||
- Delete the lock (this instance is no longer running the scheduler)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import socket
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import aiosqlite
|
||||
import structlog
|
||||
|
||||
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
||||
|
||||
# Lock record expires if heartbeat hasn't been updated for this many seconds.
|
||||
# This prevents stale locks from a crashed instance from blocking new startups.
|
||||
SCHEDULER_LOCK_TTL_SECONDS: int = 60
|
||||
|
||||
# Heartbeat interval: how often to update the lock's heartbeat_at timestamp.
|
||||
# Must be less than TTL to prevent premature expiration.
|
||||
SCHEDULER_LOCK_HEARTBEAT_INTERVAL_SECONDS: int = 10
|
||||
|
||||
|
||||
async def init_scheduler_lock_table(db: aiosqlite.Connection) -> None:
|
||||
"""Create the scheduler_lock table if it doesn't exist.
|
||||
|
||||
This is called during database schema initialization and is safe to call
|
||||
multiple times (CREATE TABLE IF NOT EXISTS is idempotent).
|
||||
|
||||
Args:
|
||||
db: The SQLite database connection.
|
||||
"""
|
||||
await db.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS scheduler_lock (
|
||||
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||||
pid INTEGER NOT NULL,
|
||||
hostname TEXT NOT NULL,
|
||||
created_at REAL NOT NULL,
|
||||
heartbeat_at REAL NOT NULL
|
||||
);
|
||||
"""
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
|
||||
async def acquire_scheduler_lock(db: aiosqlite.Connection) -> bool:
|
||||
"""Try to acquire the scheduler lock.
|
||||
|
||||
This function performs two operations:
|
||||
1. Clean up any stale locks (where heartbeat_at + TTL < now)
|
||||
2. Try to insert a lock record for this instance
|
||||
|
||||
If another instance already holds a valid lock, the INSERT will fail and
|
||||
this function returns False. The caller should reject startup with a clear
|
||||
error message.
|
||||
|
||||
Args:
|
||||
db: The SQLite database connection.
|
||||
|
||||
Returns:
|
||||
True if the lock was successfully acquired, False if held by another instance.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If database operations fail for reasons other than the lock
|
||||
being held (e.g., database is corrupted or inaccessible).
|
||||
"""
|
||||
now = time.time()
|
||||
pid = os.getpid()
|
||||
hostname = socket.gethostname()
|
||||
|
||||
try:
|
||||
# Clean up stale locks first
|
||||
await db.execute(
|
||||
"""
|
||||
DELETE FROM scheduler_lock
|
||||
WHERE (? - heartbeat_at) > ?
|
||||
""",
|
||||
(now, SCHEDULER_LOCK_TTL_SECONDS),
|
||||
)
|
||||
|
||||
# Try to acquire the lock (atomic: INSERT fails if row exists)
|
||||
await db.execute(
|
||||
"""
|
||||
INSERT INTO scheduler_lock (id, pid, hostname, created_at, heartbeat_at)
|
||||
VALUES (1, ?, ?, ?, ?)
|
||||
""",
|
||||
(pid, hostname, now, now),
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
log.info(
|
||||
"scheduler_lock_acquired",
|
||||
pid=pid,
|
||||
hostname=hostname,
|
||||
)
|
||||
return True
|
||||
|
||||
except aiosqlite.IntegrityError:
|
||||
# Lock is already held by another instance (INSERT failed due to UNIQUE constraint)
|
||||
# Log details about who holds the lock to help with debugging
|
||||
try:
|
||||
cursor = await db.execute(
|
||||
"SELECT pid, hostname, created_at, heartbeat_at FROM scheduler_lock WHERE id = 1"
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
if row:
|
||||
lock_pid, lock_hostname, lock_created, lock_heartbeat = row
|
||||
age_seconds = now - lock_created
|
||||
heartbeat_age = now - lock_heartbeat
|
||||
log.warning(
|
||||
"scheduler_lock_held_by_other_instance",
|
||||
our_pid=pid,
|
||||
lock_pid=lock_pid,
|
||||
lock_hostname=lock_hostname,
|
||||
lock_age_seconds=age_seconds,
|
||||
heartbeat_age_seconds=heartbeat_age,
|
||||
)
|
||||
except Exception as e:
|
||||
log.warning("scheduler_lock_held_but_could_not_read_holder", error=str(e))
|
||||
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
# Unexpected database error (not an IntegrityError)
|
||||
raise RuntimeError(
|
||||
f"Failed to acquire scheduler lock due to database error: {e}\n"
|
||||
"Check that the database is accessible and not corrupted."
|
||||
) from e
|
||||
|
||||
|
||||
async def release_scheduler_lock(db: aiosqlite.Connection) -> None:
|
||||
"""Release the scheduler lock.
|
||||
|
||||
This function should be called during application shutdown. It removes the
|
||||
lock record, allowing other instances to acquire it.
|
||||
|
||||
Args:
|
||||
db: The SQLite database connection.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If database operations fail.
|
||||
"""
|
||||
pid = os.getpid()
|
||||
|
||||
try:
|
||||
cursor = await db.execute(
|
||||
"DELETE FROM scheduler_lock WHERE id = 1 AND pid = ?",
|
||||
(pid,),
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
if cursor.rowcount == 0:
|
||||
# This shouldn't happen in normal operation, but log it for visibility
|
||||
log.warning(
|
||||
"scheduler_lock_release_mismatch",
|
||||
our_pid=pid,
|
||||
message="Tried to release lock but we don't hold it. Another instance may have replaced us.",
|
||||
)
|
||||
else:
|
||||
log.info("scheduler_lock_released", pid=pid)
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to release scheduler lock: {e}") from e
|
||||
|
||||
|
||||
async def update_scheduler_lock_heartbeat(db: aiosqlite.Connection) -> bool:
|
||||
"""Update the heartbeat timestamp to keep the lock alive.
|
||||
|
||||
This function should be called periodically (every ~10 seconds) to prevent
|
||||
the lock from being considered stale. It only succeeds if this process
|
||||
still holds the lock.
|
||||
|
||||
Args:
|
||||
db: The SQLite database connection.
|
||||
|
||||
Returns:
|
||||
True if the heartbeat was updated (we still hold the lock), False if
|
||||
we no longer hold the lock (another instance has taken over).
|
||||
|
||||
Raises:
|
||||
RuntimeError: If database operations fail.
|
||||
"""
|
||||
now = time.time()
|
||||
pid = os.getpid()
|
||||
|
||||
try:
|
||||
cursor = await db.execute(
|
||||
"UPDATE scheduler_lock SET heartbeat_at = ? WHERE id = 1 AND pid = ?",
|
||||
(now, pid),
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
if cursor.rowcount == 0:
|
||||
# We no longer hold the lock
|
||||
log.warning(
|
||||
"scheduler_lock_heartbeat_lost",
|
||||
our_pid=pid,
|
||||
message="Heartbeat failed; we no longer hold the lock.",
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to update scheduler lock heartbeat: {e}") from e
|
||||
|
||||
|
||||
async def get_scheduler_lock_info(db: aiosqlite.Connection) -> dict[str, Any] | None:
|
||||
"""Retrieve information about the current scheduler lock.
|
||||
|
||||
This function is useful for debugging and monitoring. Returns None if no
|
||||
lock is currently held.
|
||||
|
||||
Args:
|
||||
db: The SQLite database connection.
|
||||
|
||||
Returns:
|
||||
A dict with keys: pid, hostname, created_at, heartbeat_at, or None
|
||||
if no lock exists.
|
||||
"""
|
||||
try:
|
||||
cursor = await db.execute(
|
||||
"SELECT pid, hostname, created_at, heartbeat_at FROM scheduler_lock WHERE id = 1"
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
if row:
|
||||
pid, hostname, created_at, heartbeat_at = row
|
||||
return {
|
||||
"pid": pid,
|
||||
"hostname": hostname,
|
||||
"created_at": created_at,
|
||||
"heartbeat_at": heartbeat_at,
|
||||
}
|
||||
return None
|
||||
except Exception as e:
|
||||
log.warning("scheduler_lock_info_query_failed", error=str(e))
|
||||
return None
|
||||
222
backend/tests/test_scheduler_lock.py
Normal file
222
backend/tests/test_scheduler_lock.py
Normal file
@@ -0,0 +1,222 @@
|
||||
"""Tests for the scheduler lock mechanism.
|
||||
|
||||
These tests verify that the database-backed scheduler lock correctly enforces
|
||||
single-executor safety across multiple startup attempts, including stale lock
|
||||
cleanup and heartbeat updates.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import aiosqlite
|
||||
import pytest
|
||||
|
||||
from app.utils.scheduler_lock import (
|
||||
SCHEDULER_LOCK_TTL_SECONDS,
|
||||
acquire_scheduler_lock,
|
||||
get_scheduler_lock_info,
|
||||
release_scheduler_lock,
|
||||
update_scheduler_lock_heartbeat,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def lock_db(tmp_path: Any) -> aiosqlite.Connection:
|
||||
"""Create a temporary database with scheduler_lock table."""
|
||||
db_path = tmp_path / "test.db"
|
||||
db = await aiosqlite.connect(str(db_path))
|
||||
await db.execute(
|
||||
"""
|
||||
CREATE TABLE scheduler_lock (
|
||||
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||||
pid INTEGER NOT NULL,
|
||||
hostname TEXT NOT NULL,
|
||||
created_at REAL NOT NULL,
|
||||
heartbeat_at REAL NOT NULL
|
||||
);
|
||||
"""
|
||||
)
|
||||
await db.commit()
|
||||
yield db
|
||||
await db.close()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_acquire_scheduler_lock_success(lock_db: aiosqlite.Connection) -> None:
|
||||
"""Test successful lock acquisition."""
|
||||
result = await acquire_scheduler_lock(lock_db)
|
||||
assert result is True
|
||||
|
||||
# Verify the lock is in the database
|
||||
cursor = await lock_db.execute("SELECT COUNT(*) FROM scheduler_lock")
|
||||
count = await cursor.fetchone()
|
||||
assert count[0] == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_acquire_scheduler_lock_fails_when_held(
|
||||
lock_db: aiosqlite.Connection,
|
||||
) -> None:
|
||||
"""Test that lock acquisition fails if already held."""
|
||||
# First instance acquires the lock
|
||||
result1 = await acquire_scheduler_lock(lock_db)
|
||||
assert result1 is True
|
||||
|
||||
# Second instance tries to acquire, should fail
|
||||
result2 = await acquire_scheduler_lock(lock_db)
|
||||
assert result2 is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_acquire_scheduler_lock_cleans_stale_locks(
|
||||
lock_db: aiosqlite.Connection,
|
||||
) -> None:
|
||||
"""Test that stale locks are automatically cleaned up."""
|
||||
# Insert a stale lock manually (old heartbeat)
|
||||
now = time.time()
|
||||
stale_heartbeat = now - SCHEDULER_LOCK_TTL_SECONDS - 10
|
||||
await lock_db.execute(
|
||||
"""
|
||||
INSERT INTO scheduler_lock (id, pid, hostname, created_at, heartbeat_at)
|
||||
VALUES (1, 9999, 'stale-host', ?, ?)
|
||||
""",
|
||||
(now - 100, stale_heartbeat),
|
||||
)
|
||||
await lock_db.commit()
|
||||
|
||||
# New instance should clean up the stale lock and acquire
|
||||
result = await acquire_scheduler_lock(lock_db)
|
||||
assert result is True
|
||||
|
||||
# Verify the old lock is gone and new one is in place
|
||||
cursor = await lock_db.execute(
|
||||
"SELECT pid, hostname FROM scheduler_lock WHERE id = 1"
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
assert row is not None
|
||||
pid, hostname = row
|
||||
assert pid == os.getpid()
|
||||
assert hostname is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_release_scheduler_lock_success(
|
||||
lock_db: aiosqlite.Connection,
|
||||
) -> None:
|
||||
"""Test successful lock release."""
|
||||
# Acquire the lock
|
||||
await acquire_scheduler_lock(lock_db)
|
||||
|
||||
# Release it
|
||||
await release_scheduler_lock(lock_db)
|
||||
|
||||
# Verify the lock is gone
|
||||
cursor = await lock_db.execute("SELECT COUNT(*) FROM scheduler_lock")
|
||||
count = await cursor.fetchone()
|
||||
assert count[0] == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_release_scheduler_lock_not_held(
|
||||
lock_db: aiosqlite.Connection,
|
||||
) -> None:
|
||||
"""Test that releasing a lock we don't hold is safe."""
|
||||
# Try to release without acquiring — should not crash
|
||||
await release_scheduler_lock(lock_db)
|
||||
|
||||
# Verify the lock is still empty
|
||||
cursor = await lock_db.execute("SELECT COUNT(*) FROM scheduler_lock")
|
||||
count = await cursor.fetchone()
|
||||
assert count[0] == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_scheduler_lock_heartbeat_success(
|
||||
lock_db: aiosqlite.Connection,
|
||||
) -> None:
|
||||
"""Test successful heartbeat update."""
|
||||
# Acquire the lock
|
||||
await acquire_scheduler_lock(lock_db)
|
||||
|
||||
# Get the original heartbeat
|
||||
cursor = await lock_db.execute("SELECT heartbeat_at FROM scheduler_lock WHERE id = 1")
|
||||
original_row = await cursor.fetchone()
|
||||
original_heartbeat = original_row[0]
|
||||
|
||||
# Wait a moment and update the heartbeat
|
||||
time.sleep(0.01)
|
||||
result = await update_scheduler_lock_heartbeat(lock_db)
|
||||
assert result is True
|
||||
|
||||
# Verify the heartbeat was updated
|
||||
cursor = await lock_db.execute("SELECT heartbeat_at FROM scheduler_lock WHERE id = 1")
|
||||
new_row = await cursor.fetchone()
|
||||
new_heartbeat = new_row[0]
|
||||
assert new_heartbeat > original_heartbeat
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_scheduler_lock_heartbeat_fails_if_not_held(
|
||||
lock_db: aiosqlite.Connection,
|
||||
) -> None:
|
||||
"""Test that heartbeat update fails if we don't hold the lock."""
|
||||
result = await update_scheduler_lock_heartbeat(lock_db)
|
||||
assert result is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_scheduler_lock_info_returns_details(
|
||||
lock_db: aiosqlite.Connection,
|
||||
) -> None:
|
||||
"""Test that lock info includes all relevant fields."""
|
||||
await acquire_scheduler_lock(lock_db)
|
||||
|
||||
info = await get_scheduler_lock_info(lock_db)
|
||||
assert info is not None
|
||||
assert "pid" in info
|
||||
assert "hostname" in info
|
||||
assert "created_at" in info
|
||||
assert "heartbeat_at" in info
|
||||
assert info["pid"] == os.getpid()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_scheduler_lock_info_returns_none_when_empty(
|
||||
lock_db: aiosqlite.Connection,
|
||||
) -> None:
|
||||
"""Test that lock info returns None when no lock is held."""
|
||||
info = await get_scheduler_lock_info(lock_db)
|
||||
assert info is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_scheduler_lock_full_lifecycle(
|
||||
lock_db: aiosqlite.Connection,
|
||||
) -> None:
|
||||
"""Test the full lifecycle: acquire, update heartbeat, release."""
|
||||
# Initially no lock
|
||||
info = await get_scheduler_lock_info(lock_db)
|
||||
assert info is None
|
||||
|
||||
# Acquire the lock
|
||||
result = await acquire_scheduler_lock(lock_db)
|
||||
assert result is True
|
||||
info = await get_scheduler_lock_info(lock_db)
|
||||
assert info is not None
|
||||
initial_heartbeat = info["heartbeat_at"]
|
||||
|
||||
# Update heartbeat multiple times
|
||||
time.sleep(0.01)
|
||||
result = await update_scheduler_lock_heartbeat(lock_db)
|
||||
assert result is True
|
||||
info = await get_scheduler_lock_info(lock_db)
|
||||
updated_heartbeat = info["heartbeat_at"]
|
||||
assert updated_heartbeat > initial_heartbeat
|
||||
|
||||
# Release the lock
|
||||
await release_scheduler_lock(lock_db)
|
||||
info = await get_scheduler_lock_info(lock_db)
|
||||
assert info is None
|
||||
Reference in New Issue
Block a user