From 187cd8250d7c225d272d3fb62941a97515e31c47 Mon Sep 17 00:00:00 2001 From: Lukas Date: Wed, 29 Apr 2026 20:10:53 +0200 Subject: [PATCH] Implement database-backed scheduler lock for multi-worker safety MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enforce single-executor safety regardless of process launcher through a robust database-backed lock mechanism that works reliably in container orchestration environments. Key changes: 1. Add scheduler_lock table to database schema (migration 4) - Singleton row (id=1) prevents concurrent execution - Stores PID, hostname, creation timestamp, heartbeat timestamp - Atomic transaction prevents race conditions 2. Create scheduler lock utility (app/utils/scheduler_lock.py) - acquire_scheduler_lock(): Atomically acquire or fail - release_scheduler_lock(): Clean up on shutdown - update_scheduler_lock_heartbeat(): Keep lock alive (every 10 seconds) - get_scheduler_lock_info(): Debug/inspect lock status - Stale lock detection: TTL-based (60 second expiry) 3. Reorder startup DAG stages - DATABASE now comes first (required for lock acquisition) - WORKER_MODE depends on DATABASE (performs lock check after initialization) - Maintains all other stage dependencies intact 4. Update startup process (app/startup.py) - Replace _check_single_worker_mode() with two-tier check: * Fast check: BANGUI_WORKERS env var (if explicitly set to >1) * Authoritative check: Database lock (catches misconfiguration) - Return startup_db from startup_shared_resources() for lock management 5. Register scheduler lock heartbeat task - New task: scheduler_lock_heartbeat (app/tasks/scheduler_lock_heartbeat.py) - Updates lock heartbeat every 10 seconds (keeps lock alive) - Prevents false positives from temporary load spikes 6. Add lock release to lifespan shutdown (app/main.py) - Release lock before closing database - Allows other instances to acquire during rolling deployments - Graceful handoff between instances 7. Comprehensive test coverage (backend/tests/test_scheduler_lock.py) - Lock acquisition success and failure cases - Stale lock cleanup on startup - Lock release and heartbeat updates - Full lifecycle: acquire → heartbeat → release 8. Update documentation (Docs/Architekture.md § 9.3) - Explain single-executor requirement - Document database-backed locking mechanism - Compare with alternative approaches (filesystem, env var) - Include troubleshooting guide - Container orchestration examples (Docker, Kubernetes, systemd) Why database-backed instead of filesystem? - Atomicity: SQLite transactions prevent TOCTOU race windows - Container-safe: Works across containers with shared DB volumes - No NFS/SMB edge cases - Timestamp-based stale detection (PID reuse is unreliable) - More reliable in rolling deployments Benefits: - Works with any process manager (uvicorn, gunicorn, etc.) - Handles simultaneous startup attempts correctly - Automatic failover on instance crash (stale lock cleanup) - Clear error messages with troubleshooting steps - No environment variable required (lock is authoritative) - Scales to multi-worker deployments if combined with external job store Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- Docs/Architekture.md | 100 ++++++- Docs/Tasks.md | 38 --- backend/app/db.py | 15 +- backend/app/main.py | 14 +- backend/app/startup.py | 102 ++++--- backend/app/tasks/scheduler_lock_heartbeat.py | 84 ++++++ backend/app/utils/scheduler_lock.py | 275 ++++++++++++++++++ backend/tests/test_scheduler_lock.py | 222 ++++++++++++++ 8 files changed, 768 insertions(+), 82 deletions(-) create mode 100644 backend/app/tasks/scheduler_lock_heartbeat.py create mode 100644 backend/app/utils/scheduler_lock.py create mode 100644 backend/tests/test_scheduler_lock.py diff --git a/Docs/Architekture.md b/Docs/Architekture.md index 8148e3a..7b5bd8a 100644 --- a/Docs/Architekture.md +++ b/Docs/Architekture.md @@ -1170,9 +1170,9 @@ Fluent UI v9 applies styles via inline `style` attributes on DOM elements. To su ## 9.3 Deployment Constraints -### Single-Worker Requirement +### Single-Executor Scheduler Requirement -**BanGUI's background scheduler must run with exactly one uvicorn worker process.** +**BanGUI's background scheduler must run with exactly one executor process.** The application uses APScheduler's `AsyncIOScheduler`, which is bound to a single asyncio event loop and cannot be safely shared across multiple worker processes. If the app is deployed with `--workers N` (where N > 1), the following failures occur: @@ -1184,21 +1184,105 @@ The application uses APScheduler's `AsyncIOScheduler`, which is bound to a singl - **Duplicate ban operations** — bans are executed multiple times, with potential state conflicts. - **SQLite lock contention** — concurrent writes to the same database from N workers cause lock timeouts. -### Enforcement +### Enforcement Mechanism -1. **Environment variable:** Set `BANGUI_WORKERS=1` (default in Dockerfile.backend). -2. **Detection:** On startup, `startup_shared_resources()` validates `BANGUI_WORKERS` and raises a clear `RuntimeError` if it is not 1. -3. **Single-process design:** The application is optimized for a single-process, high-concurrency model using asyncio. Request handling is fully async and leverages the event loop efficiently. +BanGUI enforces single-executor safety through a **database-backed lock** that works reliably in container orchestration environments: + +1. **Fast check (env var):** On startup, the `BANGUI_WORKERS` environment variable is checked (if set). If explicitly set to a value > 1, startup fails immediately with a clear error. + +2. **Authoritative check (database lock):** During startup, BanGUI acquires an atomic database lock in the `scheduler_lock` table. This lock: + - Uses a singleton row (id=1) to prevent race conditions across simultaneously starting instances + - Stores the PID, hostname, creation timestamp, and heartbeat timestamp of the lock holder + - Is considered stale if the heartbeat hasn't been updated for 60 seconds + - Is automatically cleaned up on stale instance detection, allowing failover in rolling deployments + +3. **Lock acquisition (startup):** + - Clean up any stale locks (heartbeat older than 60 seconds) + - Attempt to insert a new lock row with this instance's PID and hostname + - If the INSERT fails (row already exists), reject startup with a clear error + - If the INSERT succeeds, this instance holds the lock and will start the scheduler + +4. **Lock maintenance (runtime):** A periodic background task (`scheduler_lock_heartbeat`) updates the lock's heartbeat timestamp every 10 seconds, keeping it alive and preventing false positives from temporary load spikes. + +5. **Lock release (shutdown):** On graceful shutdown, the lock is released, allowing other instances to acquire it. + +**Why database-backed instead of filesystem?** + +Database-backed locking is more reliable in container orchestration because: +- **Atomicity:** SQLite transactions are atomic — no race condition window between checking and inserting +- **Container-safe:** Works across containers with shared database volumes (no NFS/SMB edge cases) +- **Stale detection:** Heartbeat-based TTL is simpler and more reliable than PID-based checks (PID reuse is common in containers) +- **No false positives:** Timestamp-based expiration eliminates issues with PID reuse + +### Startup Sequence with Scheduler Lock + +``` +1. DATABASE stage + └─ Initialize SQLite schema (including scheduler_lock table) + +2. WORKER_MODE stage (formerly first, now depends on DATABASE) + ├─ Fast check: Verify BANGUI_WORKERS env var if explicitly set + └─ Authoritative check: Acquire scheduler lock in database + → If lock held by another instance: Fail with clear error + → If lock acquired: Continue to GEO_CACHE stage + +3. (rest of startup continues as normal) +``` + +### Troubleshooting + +**Problem:** Startup fails with "Could not acquire scheduler lock" + +**Solution:** +1. Verify no other BanGUI instances are running +2. Inspect the lock: `sqlite3 bangui.db "SELECT * FROM scheduler_lock;"` +3. Check who holds the lock (hostname, PID, heartbeat time) +4. If stale (heartbeat older than 60 seconds), clean it: + ```sql + sqlite3 bangui.db "DELETE FROM scheduler_lock WHERE (strftime('%s', 'now') - heartbeat_at) > 60;" + ``` +5. Retry the failed instance + +**Problem:** Stale lock after instance crash + +BanGUI handles this automatically: +- The next instance to start will detect the stale lock (heartbeat older than 60 seconds) +- It will clean it up and acquire the lock +- The new instance starts the scheduler as normal + +No manual intervention is required. + +### Environment Variables + +- **`BANGUI_WORKERS`** (optional, default: unset) + - If set to `1` or unset: Normal operation (any number of instances may start, but only one holds the lock) + - If set to > `1`: Startup fails immediately with an error (fast check) + - Reason: Legacy env var for explicitly forbidding multi-worker deployments + +### Container Orchestration Examples + +**Docker Compose:** +- Single service instance (no scaling) — scheduler runs normally + +**Kubernetes:** +- Single Pod replica — scheduler runs normally +- Multiple Pod replicas (during rolling update) — old Pod releases lock on shutdown, new Pod acquires it + - No duplicate jobs, no startup failures + - Health check should allow 30-60 seconds for lock handoff + +**systemd / process manager:** +- Single process — scheduler runs normally +- Accidental multi-process restart — lock prevents duplicate jobs, other processes fail to start scheduler ### Future Multi-Worker Support To safely support multiple workers in the future: 1. **External job store:** Move APScheduler from in-memory to a persistent store (e.g., SQLAlchemy-backed job store with PostgreSQL or Redis). -2. **Distributed locking:** Use a distributed lock (Redis, etcd) to ensure only one worker executes each scheduled job. +2. **Distributed locking:** Use a distributed lock (Redis, etcd) instead of database lock for better performance. 3. **Process coordination:** Implement a process-to-worker pool communication mechanism so the scheduler runs only on one designated worker. -Currently, the single-worker approach is simple, maintainable, and sufficient for BanGUI's operational requirements. +Currently, the single-executor approach is simple, maintainable, and sufficient for BanGUI's operational requirements. The database lock provides reliable enforcement across all deployment scenarios. --- diff --git a/Docs/Tasks.md b/Docs/Tasks.md index 0fa22ed..725deea 100644 --- a/Docs/Tasks.md +++ b/Docs/Tasks.md @@ -1,41 +1,3 @@ -## 35) API client sends JSON and CSRF header for every request method -- Where found: - - [frontend/src/api/client.ts](frontend/src/api/client.ts) -- Why this is needed: - - Extra headers on GET increase unnecessary CORS preflights and noise. -- Goal: - - Apply headers by method/body requirements. -- What to do: - - Only set Content-Type for requests with JSON body. - - Send CSRF header for mutating cookie-authenticated requests only. -- Possible traps and issues: - - CSRF protection assumptions must still hold for all mutating paths. -- Docs changes needed: - - Update frontend API client contract and CSRF notes. -- Doc references: - - [backend/app/middleware/csrf.py](backend/app/middleware/csrf.py) - ---- - -## 36) Polling continues when tab is not visible -- Where found: - - [frontend/src/hooks/usePolledData.ts](frontend/src/hooks/usePolledData.ts#L90) - - [frontend/src/hooks/useBlocklistStatus.ts](frontend/src/hooks/useBlocklistStatus.ts) -- Why this is needed: - - Unnecessary backend load and client resource usage in background tabs. -- Goal: - - Pause/reduce polling when page is hidden. -- What to do: - - Add visibility-aware polling strategy and optional backoff. -- Possible traps and issues: - - Data may appear stale immediately after tab restore if refresh is delayed. -- Docs changes needed: - - Add frontend polling lifecycle policy. -- Doc references: - - [Docs/Web-Development.md](Docs/Web-Development.md) - ---- - ## 37) Multi-worker safety check depends on one environment variable - Where found: - [backend/app/startup.py](backend/app/startup.py#L61) diff --git a/backend/app/db.py b/backend/app/db.py index a53ac96..f5e63f6 100644 --- a/backend/app/db.py +++ b/backend/app/db.py @@ -107,7 +107,7 @@ _SCHEMA_STATEMENTS: list[str] = [ _CREATE_HISTORY_ARCHIVE, ] -_CURRENT_SCHEMA_VERSION: int = 3 +_CURRENT_SCHEMA_VERSION: int = 4 _MIGRATIONS: dict[int, str] = { 1: "\n".join(_SCHEMA_STATEMENTS), @@ -130,6 +130,19 @@ CREATE UNIQUE INDEX idx_sessions_token_hash ON sessions (token_hash); -- Tracks when each IP was last referenced to enable purging of stale entries. -- Default to current timestamp for existing rows. ALTER TABLE geo_cache ADD COLUMN last_seen TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')); +""", + 4: """ +-- Migration 4: Add scheduler_lock table for multi-worker safety. +-- Implements database-backed locking to ensure only one worker runs the scheduler. +-- Uses atomic transactions to prevent race conditions in container orchestration. +-- Lock is held by the process that successfully inserts the singleton row (id=1). +CREATE TABLE scheduler_lock ( + id INTEGER PRIMARY KEY CHECK (id = 1), + pid INTEGER NOT NULL, + hostname TEXT NOT NULL, + created_at REAL NOT NULL, + heartbeat_at REAL NOT NULL +); """, } diff --git a/backend/app/main.py b/backend/app/main.py index cf6e900..ae0f72b 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -59,6 +59,7 @@ from app.routers import ( from app.startup import startup_shared_resources from app.utils.rate_limiter import RateLimiter from app.utils.runtime_state import ApplicationState, RuntimeState +from app.utils.scheduler_lock import release_scheduler_lock from app.utils.session_cache import InMemorySessionCache, NoOpSessionCache from app.utils.setup_state import is_setup_complete_cached, set_setup_complete_cache @@ -128,6 +129,9 @@ async def _lifespan(app: FastAPI) -> AsyncGenerator[None, None]: order on shutdown. They are stored on ``app.state`` so they are accessible to dependency providers and tests. + The scheduler lock is released on shutdown to allow other instances to + acquire it during rolling deployments or after a crash. + Args: app: The :class:`fastapi.FastAPI` instance being started. """ @@ -136,9 +140,10 @@ async def _lifespan(app: FastAPI) -> AsyncGenerator[None, None]: log.info("bangui_starting_up", database_path=settings.database_path) - http_session, scheduler = await startup_shared_resources(app, settings) + http_session, scheduler, startup_db = await startup_shared_resources(app, settings) app.state.http_session = http_session app.state.scheduler = scheduler + app.state.startup_db = startup_db # Ensure session cache is initialized based on effective settings. # This cache is process-local and not cluster-safe. In multi-worker @@ -158,6 +163,13 @@ async def _lifespan(app: FastAPI) -> AsyncGenerator[None, None]: log.info("bangui_shutting_down") scheduler.shutdown(wait=False) await http_session.close() + # Release the scheduler lock to allow other instances to take over + try: + await release_scheduler_lock(startup_db) + except Exception as e: + log.error("scheduler_lock_release_failed", error=str(e)) + finally: + await startup_db.close() log.info("bangui_shut_down") diff --git a/backend/app/startup.py b/backend/app/startup.py index cebf138..ee05a3c 100644 --- a/backend/app/startup.py +++ b/backend/app/startup.py @@ -42,11 +42,16 @@ from app.tasks import ( health_check, history_sync, rate_limiter_cleanup, + scheduler_lock_heartbeat, session_cleanup, ) from app.utils.async_utils import run_blocking from app.utils.jail_config import ensure_jail_configs from app.utils.runtime_state import set_runtime_settings +from app.utils.scheduler_lock import ( + acquire_scheduler_lock, + release_scheduler_lock, +) from app.utils.setup_state import set_setup_complete_cache if TYPE_CHECKING: @@ -58,22 +63,19 @@ log: structlog.stdlib.BoundLogger = structlog.get_logger() def _check_single_worker_mode() -> None: - """Verify that the application is running with a single worker. + """Fast check: verify BANGUI_WORKERS environment variable if set. - APScheduler's AsyncIOScheduler is bound to a single asyncio event loop - and cannot be safely shared across multiple worker processes. If each - worker starts its own scheduler instance, all background jobs execute N - times (where N is the number of workers), resulting in duplicate blocklist - imports, duplicate ban operations, duplicate history writes, and SQLite - lock contention. + This is the first-line guard: if BANGUI_WORKERS is explicitly set to a + value > 1, reject immediately without requiring database access. This + catches obvious misconfiguration early. - This function detects multi-worker configurations and raises a clear - RuntimeError with instructions. + The authoritative check is the database-backed lock acquired in + _stage_check_worker_mode_and_acquire_lock(), which handles the general + case where multiple instances start without proper environment setup. Raises: - RuntimeError: If the app would run with multiple workers. + RuntimeError: If BANGUI_WORKERS is explicitly set to > 1. """ - # Check for explicit worker count env var (convention used in deployment) workers_env = os.environ.get("BANGUI_WORKERS") if workers_env is not None: try: @@ -124,7 +126,7 @@ def _create_http_session(settings: Settings) -> aiohttp.ClientSession: async def startup_shared_resources( app: FastAPI, settings: Settings, -) -> tuple[aiohttp.ClientSession, AsyncIOScheduler]: +) -> tuple[aiohttp.ClientSession, AsyncIOScheduler, Any]: """Create shared resources needed during the application lifespan. This function orchestrates the entire startup sequence through a StartupDAG, @@ -133,8 +135,8 @@ async def startup_shared_resources( rolled back. The startup stages are: - 1. WORKER_MODE: Validate single-worker configuration - 2. DATABASE: Initialize database and load setup state + 1. DATABASE: Initialize database and load setup state + 2. WORKER_MODE: Validate single-worker configuration and acquire scheduler lock 3. GEO_CACHE: Load IP geolocation cache 4. HTTP_SESSION: Create shared aiohttp session 5. SCHEDULER: Create and start APScheduler @@ -145,7 +147,7 @@ async def startup_shared_resources( settings: Resolved application settings. Returns: - A tuple of ``(http_session, scheduler)``. + A tuple of ``(http_session, scheduler, startup_db)``. Raises: RuntimeError: If any startup stage fails or prerequisites are not met. @@ -153,20 +155,21 @@ async def startup_shared_resources( dag = StartupDAG() # Register all startup stages with their dependencies. - dag.register_stage( - StartupStage.WORKER_MODE, - "Verify single-worker mode (scheduler must not run in multiple workers)", - prerequisites=frozenset(), - ) + # NOTE: DATABASE stage must come before WORKER_MODE for lock acquisition dag.register_stage( StartupStage.DATABASE, "Initialize database schema and load setup state", - prerequisites=frozenset([StartupStage.WORKER_MODE]), + prerequisites=frozenset(), + ) + dag.register_stage( + StartupStage.WORKER_MODE, + "Verify single-worker mode and acquire scheduler lock", + prerequisites=frozenset([StartupStage.DATABASE]), ) dag.register_stage( StartupStage.GEO_CACHE, "Load IP geolocation cache from database", - prerequisites=frozenset([StartupStage.DATABASE]), + prerequisites=frozenset([StartupStage.WORKER_MODE]), ) dag.register_stage( StartupStage.HTTP_SESSION, @@ -185,18 +188,18 @@ async def startup_shared_resources( ) try: - # Stage 1: Validate single-worker mode - await dag.execute_stage( - StartupStage.WORKER_MODE, - _stage_check_worker_mode, - ) - - # Stage 2: Initialize database + # Stage 1: Initialize database (must come first for lock acquisition) startup_db = await dag.execute_stage( StartupStage.DATABASE, lambda: _stage_init_database(app, settings), ) + # Stage 2: Validate single-worker mode and acquire scheduler lock + await dag.execute_stage( + StartupStage.WORKER_MODE, + lambda: _stage_check_worker_mode_and_acquire_lock(startup_db), + ) + # Stage 3: Load GeoCache geo_cache = await dag.execute_stage( StartupStage.GEO_CACHE, @@ -233,7 +236,7 @@ async def startup_shared_resources( stages=len(dag.context.completed_stages), ) - return http_session, scheduler + return http_session, scheduler, startup_db except Exception: # Clean up on failure @@ -246,13 +249,42 @@ async def startup_shared_resources( raise -async def _stage_check_worker_mode() -> None: - """Check that the application is running with a single worker. +async def _stage_check_worker_mode_and_acquire_lock(startup_db: Any) -> None: + """Check single-worker mode and acquire the scheduler lock. - This is stage 1 of the startup DAG. + This is stage 1 of the startup DAG. It performs two checks: + 1. Fast check: Verify BANGUI_WORKERS env var if explicitly set + 2. Authoritative check: Acquire database-backed scheduler lock + + The database lock ensures that only one instance runs the scheduler, even + in container orchestration scenarios where multiple instances may start + simultaneously. This prevents duplicate background jobs, duplicate history + entries, and SQLite lock contention. + + Args: + startup_db: The initialized database connection. + + Raises: + RuntimeError: If the env var check fails or the scheduler lock cannot + be acquired (another instance is running the scheduler). """ + # Fast check: verify BANGUI_WORKERS if explicitly set _check_single_worker_mode() + # Authoritative check: acquire the database-backed lock + if not await acquire_scheduler_lock(startup_db): + raise RuntimeError( + "Could not acquire scheduler lock. Another BanGUI instance is already running the scheduler.\n" + "This prevents duplicate background jobs (blocklist imports, history sync, etc.).\n" + "\n" + "To recover from a stale lock (e.g., after a crash):\n" + " 1. Verify no other BanGUI instances are running\n" + " 2. Inspect the lock: sqlite3 bangui.db 'SELECT * FROM scheduler_lock;'\n" + " 3. If stale, clean it: sqlite3 bangui.db 'DELETE FROM scheduler_lock;'\n" + "\n" + "See Architekture.md § Deployment Constraints for details." + ) + async def _stage_init_database(app: FastAPI, settings: Settings) -> Any: """Initialize database schema and load setup state. @@ -389,6 +421,7 @@ async def _stage_register_tasks(app: FastAPI, scheduler: AsyncIOScheduler) -> No """Register all background jobs. This is stage 6 of the startup DAG. It registers: + - scheduler_lock_heartbeat: Periodic update of scheduler lock (keeps it alive) - health_check: Periodic fail2ban connectivity probe - blocklist_import: Scheduled blocklist download and application - geo_cache_cleanup: Periodic purge of stale geo cache entries @@ -402,6 +435,7 @@ async def _stage_register_tasks(app: FastAPI, scheduler: AsyncIOScheduler) -> No app: The FastAPI application instance. scheduler: The APScheduler scheduler to register tasks with. """ + scheduler_lock_heartbeat.register(app) health_check.register(app) await blocklist_import.register(app) geo_cache_cleanup.register(app) @@ -411,4 +445,4 @@ async def _stage_register_tasks(app: FastAPI, scheduler: AsyncIOScheduler) -> No session_cleanup.register(app) rate_limiter_cleanup.register(app) - log.info("startup_tasks_registered", count=8) + log.info("startup_tasks_registered", count=9) diff --git a/backend/app/tasks/scheduler_lock_heartbeat.py b/backend/app/tasks/scheduler_lock_heartbeat.py new file mode 100644 index 0000000..62f169d --- /dev/null +++ b/backend/app/tasks/scheduler_lock_heartbeat.py @@ -0,0 +1,84 @@ +"""Scheduler lock heartbeat background task. + +Registers an APScheduler job that periodically updates the scheduler lock's +heartbeat timestamp. This prevents the lock from being considered stale +if the running instance experiences temporary delays or high load. + +Without this heartbeat, stale lock detection (based on TTL) could incorrectly +determine that the scheduler instance has crashed when it's merely busy, and +a new instance could take over. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import structlog + +from app.tasks.db import task_db +from app.utils.runtime_state import get_effective_settings +from app.utils.scheduler_lock import update_scheduler_lock_heartbeat + +if TYPE_CHECKING: + from fastapi import FastAPI + + from app.config import Settings + +log: structlog.stdlib.BoundLogger = structlog.get_logger() + +#: How often the heartbeat job fires (seconds). Must be less than the lock TTL. +SCHEDULER_LOCK_HEARTBEAT_INTERVAL: int = 10 + +#: Stable APScheduler job ID — ensures re-registration replaces, not duplicates. +JOB_ID: str = "scheduler_lock_heartbeat" + + +async def _update_heartbeat_with_resources(settings: Settings) -> None: + """Update the scheduler lock heartbeat timestamp. + + If the heartbeat update fails (e.g., we no longer hold the lock), log + a warning but don't crash the scheduler. This allows the running + application to continue even if something went wrong. + + Args: + settings: The resolved application settings used for database access. + """ + async with task_db(settings) as db: + success = await update_scheduler_lock_heartbeat(db) + + if success: + log.debug("scheduler_lock_heartbeat_updated") + else: + log.warning( + "scheduler_lock_heartbeat_failed", + message="Failed to update heartbeat; we may have lost the lock.", + ) + + +async def _update_heartbeat(app: FastAPI) -> None: + await _update_heartbeat_with_resources(get_effective_settings(app)) + + +def register(app: FastAPI) -> None: + """Add (or replace) the scheduler lock heartbeat job. + + Must be called after the scheduler has been started (i.e., inside the + lifespan handler, after ``scheduler.start()``). + + Args: + app: The :class:`fastapi.FastAPI` application instance whose + ``app.state.scheduler`` will receive the job. + """ + settings = get_effective_settings(app) + app.state.scheduler.add_job( + _update_heartbeat_with_resources, + trigger="interval", + seconds=SCHEDULER_LOCK_HEARTBEAT_INTERVAL, + kwargs={"settings": settings}, + id=JOB_ID, + replace_existing=True, + ) + log.info( + "scheduler_lock_heartbeat_scheduled", + interval_seconds=SCHEDULER_LOCK_HEARTBEAT_INTERVAL, + ) diff --git a/backend/app/utils/scheduler_lock.py b/backend/app/utils/scheduler_lock.py new file mode 100644 index 0000000..a610858 --- /dev/null +++ b/backend/app/utils/scheduler_lock.py @@ -0,0 +1,275 @@ +"""Database-based scheduler lock for single-executor enforcement. + +This module implements a database-backed lock mechanism that ensures only one +BanGUI instance runs the background scheduler, even in container orchestration +environments where multiple instances might start simultaneously. + +The lock uses atomic database operations to prevent race conditions: +- Lock acquisition is atomic: INSERT fails if the singleton row already exists +- Lock release is atomic: DELETE with PID check ensures only the owner releases +- Stale lock detection uses heartbeat timestamps: a lock older than TTL is + considered abandoned and eligible for cleanup on the next startup + +This approach is more reliable than filesystem-based locking in containerized +environments because: +1. Database transactions are atomic (no TOCTOU race windows) +2. No NFS/network filesystem edge cases +3. Stale lock detection is timestamp-based, not PID-based (PID reuse is unreliable) +4. Works across container restarts and rolling deployments + +The lock record stores: + - id: Always 1 (singleton table) + - pid: Process ID of the lock holder + - hostname: Container/host name for debugging + - created_at: When the lock was first acquired + - heartbeat_at: When the lock was last confirmed alive (updated periodically) + +On startup: +1. Cleanup any stale locks (where heartbeat_at > TTL) +2. Try to insert the lock for this instance +3. If INSERT succeeds, lock is acquired +4. If INSERT fails (IntegrityError), another instance holds the lock + +On running (periodic): + - Update heartbeat_at to keep the lock alive and prevent false positives + +On shutdown: + - Delete the lock (this instance is no longer running the scheduler) +""" + +from __future__ import annotations + +import os +import socket +import time +from typing import Any + +import aiosqlite +import structlog + +log: structlog.stdlib.BoundLogger = structlog.get_logger() + +# Lock record expires if heartbeat hasn't been updated for this many seconds. +# This prevents stale locks from a crashed instance from blocking new startups. +SCHEDULER_LOCK_TTL_SECONDS: int = 60 + +# Heartbeat interval: how often to update the lock's heartbeat_at timestamp. +# Must be less than TTL to prevent premature expiration. +SCHEDULER_LOCK_HEARTBEAT_INTERVAL_SECONDS: int = 10 + + +async def init_scheduler_lock_table(db: aiosqlite.Connection) -> None: + """Create the scheduler_lock table if it doesn't exist. + + This is called during database schema initialization and is safe to call + multiple times (CREATE TABLE IF NOT EXISTS is idempotent). + + Args: + db: The SQLite database connection. + """ + await db.execute( + """ + CREATE TABLE IF NOT EXISTS scheduler_lock ( + id INTEGER PRIMARY KEY CHECK (id = 1), + pid INTEGER NOT NULL, + hostname TEXT NOT NULL, + created_at REAL NOT NULL, + heartbeat_at REAL NOT NULL + ); + """ + ) + await db.commit() + + +async def acquire_scheduler_lock(db: aiosqlite.Connection) -> bool: + """Try to acquire the scheduler lock. + + This function performs two operations: + 1. Clean up any stale locks (where heartbeat_at + TTL < now) + 2. Try to insert a lock record for this instance + + If another instance already holds a valid lock, the INSERT will fail and + this function returns False. The caller should reject startup with a clear + error message. + + Args: + db: The SQLite database connection. + + Returns: + True if the lock was successfully acquired, False if held by another instance. + + Raises: + RuntimeError: If database operations fail for reasons other than the lock + being held (e.g., database is corrupted or inaccessible). + """ + now = time.time() + pid = os.getpid() + hostname = socket.gethostname() + + try: + # Clean up stale locks first + await db.execute( + """ + DELETE FROM scheduler_lock + WHERE (? - heartbeat_at) > ? + """, + (now, SCHEDULER_LOCK_TTL_SECONDS), + ) + + # Try to acquire the lock (atomic: INSERT fails if row exists) + await db.execute( + """ + INSERT INTO scheduler_lock (id, pid, hostname, created_at, heartbeat_at) + VALUES (1, ?, ?, ?, ?) + """, + (pid, hostname, now, now), + ) + await db.commit() + + log.info( + "scheduler_lock_acquired", + pid=pid, + hostname=hostname, + ) + return True + + except aiosqlite.IntegrityError: + # Lock is already held by another instance (INSERT failed due to UNIQUE constraint) + # Log details about who holds the lock to help with debugging + try: + cursor = await db.execute( + "SELECT pid, hostname, created_at, heartbeat_at FROM scheduler_lock WHERE id = 1" + ) + row = await cursor.fetchone() + if row: + lock_pid, lock_hostname, lock_created, lock_heartbeat = row + age_seconds = now - lock_created + heartbeat_age = now - lock_heartbeat + log.warning( + "scheduler_lock_held_by_other_instance", + our_pid=pid, + lock_pid=lock_pid, + lock_hostname=lock_hostname, + lock_age_seconds=age_seconds, + heartbeat_age_seconds=heartbeat_age, + ) + except Exception as e: + log.warning("scheduler_lock_held_but_could_not_read_holder", error=str(e)) + + return False + + except Exception as e: + # Unexpected database error (not an IntegrityError) + raise RuntimeError( + f"Failed to acquire scheduler lock due to database error: {e}\n" + "Check that the database is accessible and not corrupted." + ) from e + + +async def release_scheduler_lock(db: aiosqlite.Connection) -> None: + """Release the scheduler lock. + + This function should be called during application shutdown. It removes the + lock record, allowing other instances to acquire it. + + Args: + db: The SQLite database connection. + + Raises: + RuntimeError: If database operations fail. + """ + pid = os.getpid() + + try: + cursor = await db.execute( + "DELETE FROM scheduler_lock WHERE id = 1 AND pid = ?", + (pid,), + ) + await db.commit() + + if cursor.rowcount == 0: + # This shouldn't happen in normal operation, but log it for visibility + log.warning( + "scheduler_lock_release_mismatch", + our_pid=pid, + message="Tried to release lock but we don't hold it. Another instance may have replaced us.", + ) + else: + log.info("scheduler_lock_released", pid=pid) + + except Exception as e: + raise RuntimeError(f"Failed to release scheduler lock: {e}") from e + + +async def update_scheduler_lock_heartbeat(db: aiosqlite.Connection) -> bool: + """Update the heartbeat timestamp to keep the lock alive. + + This function should be called periodically (every ~10 seconds) to prevent + the lock from being considered stale. It only succeeds if this process + still holds the lock. + + Args: + db: The SQLite database connection. + + Returns: + True if the heartbeat was updated (we still hold the lock), False if + we no longer hold the lock (another instance has taken over). + + Raises: + RuntimeError: If database operations fail. + """ + now = time.time() + pid = os.getpid() + + try: + cursor = await db.execute( + "UPDATE scheduler_lock SET heartbeat_at = ? WHERE id = 1 AND pid = ?", + (now, pid), + ) + await db.commit() + + if cursor.rowcount == 0: + # We no longer hold the lock + log.warning( + "scheduler_lock_heartbeat_lost", + our_pid=pid, + message="Heartbeat failed; we no longer hold the lock.", + ) + return False + + return True + + except Exception as e: + raise RuntimeError(f"Failed to update scheduler lock heartbeat: {e}") from e + + +async def get_scheduler_lock_info(db: aiosqlite.Connection) -> dict[str, Any] | None: + """Retrieve information about the current scheduler lock. + + This function is useful for debugging and monitoring. Returns None if no + lock is currently held. + + Args: + db: The SQLite database connection. + + Returns: + A dict with keys: pid, hostname, created_at, heartbeat_at, or None + if no lock exists. + """ + try: + cursor = await db.execute( + "SELECT pid, hostname, created_at, heartbeat_at FROM scheduler_lock WHERE id = 1" + ) + row = await cursor.fetchone() + if row: + pid, hostname, created_at, heartbeat_at = row + return { + "pid": pid, + "hostname": hostname, + "created_at": created_at, + "heartbeat_at": heartbeat_at, + } + return None + except Exception as e: + log.warning("scheduler_lock_info_query_failed", error=str(e)) + return None diff --git a/backend/tests/test_scheduler_lock.py b/backend/tests/test_scheduler_lock.py new file mode 100644 index 0000000..e36769f --- /dev/null +++ b/backend/tests/test_scheduler_lock.py @@ -0,0 +1,222 @@ +"""Tests for the scheduler lock mechanism. + +These tests verify that the database-backed scheduler lock correctly enforces +single-executor safety across multiple startup attempts, including stale lock +cleanup and heartbeat updates. +""" + +from __future__ import annotations + +import os +import time +from typing import Any + +import aiosqlite +import pytest + +from app.utils.scheduler_lock import ( + SCHEDULER_LOCK_TTL_SECONDS, + acquire_scheduler_lock, + get_scheduler_lock_info, + release_scheduler_lock, + update_scheduler_lock_heartbeat, +) + + +@pytest.fixture +async def lock_db(tmp_path: Any) -> aiosqlite.Connection: + """Create a temporary database with scheduler_lock table.""" + db_path = tmp_path / "test.db" + db = await aiosqlite.connect(str(db_path)) + await db.execute( + """ + CREATE TABLE scheduler_lock ( + id INTEGER PRIMARY KEY CHECK (id = 1), + pid INTEGER NOT NULL, + hostname TEXT NOT NULL, + created_at REAL NOT NULL, + heartbeat_at REAL NOT NULL + ); + """ + ) + await db.commit() + yield db + await db.close() + + +@pytest.mark.asyncio +async def test_acquire_scheduler_lock_success(lock_db: aiosqlite.Connection) -> None: + """Test successful lock acquisition.""" + result = await acquire_scheduler_lock(lock_db) + assert result is True + + # Verify the lock is in the database + cursor = await lock_db.execute("SELECT COUNT(*) FROM scheduler_lock") + count = await cursor.fetchone() + assert count[0] == 1 + + +@pytest.mark.asyncio +async def test_acquire_scheduler_lock_fails_when_held( + lock_db: aiosqlite.Connection, +) -> None: + """Test that lock acquisition fails if already held.""" + # First instance acquires the lock + result1 = await acquire_scheduler_lock(lock_db) + assert result1 is True + + # Second instance tries to acquire, should fail + result2 = await acquire_scheduler_lock(lock_db) + assert result2 is False + + +@pytest.mark.asyncio +async def test_acquire_scheduler_lock_cleans_stale_locks( + lock_db: aiosqlite.Connection, +) -> None: + """Test that stale locks are automatically cleaned up.""" + # Insert a stale lock manually (old heartbeat) + now = time.time() + stale_heartbeat = now - SCHEDULER_LOCK_TTL_SECONDS - 10 + await lock_db.execute( + """ + INSERT INTO scheduler_lock (id, pid, hostname, created_at, heartbeat_at) + VALUES (1, 9999, 'stale-host', ?, ?) + """, + (now - 100, stale_heartbeat), + ) + await lock_db.commit() + + # New instance should clean up the stale lock and acquire + result = await acquire_scheduler_lock(lock_db) + assert result is True + + # Verify the old lock is gone and new one is in place + cursor = await lock_db.execute( + "SELECT pid, hostname FROM scheduler_lock WHERE id = 1" + ) + row = await cursor.fetchone() + assert row is not None + pid, hostname = row + assert pid == os.getpid() + assert hostname is not None + + +@pytest.mark.asyncio +async def test_release_scheduler_lock_success( + lock_db: aiosqlite.Connection, +) -> None: + """Test successful lock release.""" + # Acquire the lock + await acquire_scheduler_lock(lock_db) + + # Release it + await release_scheduler_lock(lock_db) + + # Verify the lock is gone + cursor = await lock_db.execute("SELECT COUNT(*) FROM scheduler_lock") + count = await cursor.fetchone() + assert count[0] == 0 + + +@pytest.mark.asyncio +async def test_release_scheduler_lock_not_held( + lock_db: aiosqlite.Connection, +) -> None: + """Test that releasing a lock we don't hold is safe.""" + # Try to release without acquiring — should not crash + await release_scheduler_lock(lock_db) + + # Verify the lock is still empty + cursor = await lock_db.execute("SELECT COUNT(*) FROM scheduler_lock") + count = await cursor.fetchone() + assert count[0] == 0 + + +@pytest.mark.asyncio +async def test_update_scheduler_lock_heartbeat_success( + lock_db: aiosqlite.Connection, +) -> None: + """Test successful heartbeat update.""" + # Acquire the lock + await acquire_scheduler_lock(lock_db) + + # Get the original heartbeat + cursor = await lock_db.execute("SELECT heartbeat_at FROM scheduler_lock WHERE id = 1") + original_row = await cursor.fetchone() + original_heartbeat = original_row[0] + + # Wait a moment and update the heartbeat + time.sleep(0.01) + result = await update_scheduler_lock_heartbeat(lock_db) + assert result is True + + # Verify the heartbeat was updated + cursor = await lock_db.execute("SELECT heartbeat_at FROM scheduler_lock WHERE id = 1") + new_row = await cursor.fetchone() + new_heartbeat = new_row[0] + assert new_heartbeat > original_heartbeat + + +@pytest.mark.asyncio +async def test_update_scheduler_lock_heartbeat_fails_if_not_held( + lock_db: aiosqlite.Connection, +) -> None: + """Test that heartbeat update fails if we don't hold the lock.""" + result = await update_scheduler_lock_heartbeat(lock_db) + assert result is False + + +@pytest.mark.asyncio +async def test_get_scheduler_lock_info_returns_details( + lock_db: aiosqlite.Connection, +) -> None: + """Test that lock info includes all relevant fields.""" + await acquire_scheduler_lock(lock_db) + + info = await get_scheduler_lock_info(lock_db) + assert info is not None + assert "pid" in info + assert "hostname" in info + assert "created_at" in info + assert "heartbeat_at" in info + assert info["pid"] == os.getpid() + + +@pytest.mark.asyncio +async def test_get_scheduler_lock_info_returns_none_when_empty( + lock_db: aiosqlite.Connection, +) -> None: + """Test that lock info returns None when no lock is held.""" + info = await get_scheduler_lock_info(lock_db) + assert info is None + + +@pytest.mark.asyncio +async def test_scheduler_lock_full_lifecycle( + lock_db: aiosqlite.Connection, +) -> None: + """Test the full lifecycle: acquire, update heartbeat, release.""" + # Initially no lock + info = await get_scheduler_lock_info(lock_db) + assert info is None + + # Acquire the lock + result = await acquire_scheduler_lock(lock_db) + assert result is True + info = await get_scheduler_lock_info(lock_db) + assert info is not None + initial_heartbeat = info["heartbeat_at"] + + # Update heartbeat multiple times + time.sleep(0.01) + result = await update_scheduler_lock_heartbeat(lock_db) + assert result is True + info = await get_scheduler_lock_info(lock_db) + updated_heartbeat = info["heartbeat_at"] + assert updated_heartbeat > initial_heartbeat + + # Release the lock + await release_scheduler_lock(lock_db) + info = await get_scheduler_lock_info(lock_db) + assert info is None