"""Tests for the scheduler lock mechanism. These tests verify that the database-backed scheduler lock correctly enforces single-executor safety across multiple startup attempts, including stale lock cleanup and heartbeat updates. """ from __future__ import annotations import os import time from typing import Any import aiosqlite import pytest from app.utils.scheduler_lock import ( SCHEDULER_LOCK_TTL_SECONDS, acquire_scheduler_lock, get_scheduler_lock_info, release_scheduler_lock, update_scheduler_lock_heartbeat, ) @pytest.fixture async def lock_db(tmp_path: Any) -> aiosqlite.Connection: """Create a temporary database with scheduler_lock table.""" db_path = tmp_path / "test.db" db = await aiosqlite.connect(str(db_path)) await db.execute( """ CREATE TABLE scheduler_lock ( id INTEGER PRIMARY KEY CHECK (id = 1), pid INTEGER NOT NULL, hostname TEXT NOT NULL, created_at REAL NOT NULL, heartbeat_at REAL NOT NULL ); """ ) await db.commit() yield db await db.close() @pytest.mark.asyncio async def test_acquire_scheduler_lock_success(lock_db: aiosqlite.Connection) -> None: """Test successful lock acquisition.""" result = await acquire_scheduler_lock(lock_db) assert result is True # Verify the lock is in the database cursor = await lock_db.execute("SELECT COUNT(*) FROM scheduler_lock") count = await cursor.fetchone() assert count[0] == 1 @pytest.mark.asyncio async def test_acquire_scheduler_lock_fails_when_held( lock_db: aiosqlite.Connection, ) -> None: """Test that lock acquisition fails if already held.""" # First instance acquires the lock result1 = await acquire_scheduler_lock(lock_db) assert result1 is True # Second instance tries to acquire, should fail result2 = await acquire_scheduler_lock(lock_db) assert result2 is False @pytest.mark.asyncio async def test_acquire_scheduler_lock_cleans_stale_locks( lock_db: aiosqlite.Connection, ) -> None: """Test that stale locks are automatically cleaned up.""" # Insert a stale lock manually (old heartbeat) now = time.time() stale_heartbeat = now - SCHEDULER_LOCK_TTL_SECONDS - 10 await lock_db.execute( """ INSERT INTO scheduler_lock (id, pid, hostname, created_at, heartbeat_at) VALUES (1, 9999, 'stale-host', ?, ?) """, (now - 100, stale_heartbeat), ) await lock_db.commit() # New instance should clean up the stale lock and acquire result = await acquire_scheduler_lock(lock_db) assert result is True # Verify the old lock is gone and new one is in place cursor = await lock_db.execute( "SELECT pid, hostname FROM scheduler_lock WHERE id = 1" ) row = await cursor.fetchone() assert row is not None pid, hostname = row assert pid == os.getpid() assert hostname is not None @pytest.mark.asyncio async def test_release_scheduler_lock_success( lock_db: aiosqlite.Connection, ) -> None: """Test successful lock release.""" # Acquire the lock await acquire_scheduler_lock(lock_db) # Release it await release_scheduler_lock(lock_db) # Verify the lock is gone cursor = await lock_db.execute("SELECT COUNT(*) FROM scheduler_lock") count = await cursor.fetchone() assert count[0] == 0 @pytest.mark.asyncio async def test_release_scheduler_lock_not_held( lock_db: aiosqlite.Connection, ) -> None: """Test that releasing a lock we don't hold is safe.""" # Try to release without acquiring — should not crash await release_scheduler_lock(lock_db) # Verify the lock is still empty cursor = await lock_db.execute("SELECT COUNT(*) FROM scheduler_lock") count = await cursor.fetchone() assert count[0] == 0 @pytest.mark.asyncio async def test_update_scheduler_lock_heartbeat_success( lock_db: aiosqlite.Connection, ) -> None: """Test successful heartbeat update.""" # Acquire the lock await acquire_scheduler_lock(lock_db) # Get the original heartbeat cursor = await lock_db.execute("SELECT heartbeat_at FROM scheduler_lock WHERE id = 1") original_row = await cursor.fetchone() original_heartbeat = original_row[0] # Wait a moment and update the heartbeat time.sleep(0.01) result = await update_scheduler_lock_heartbeat(lock_db) assert result is True # Verify the heartbeat was updated cursor = await lock_db.execute("SELECT heartbeat_at FROM scheduler_lock WHERE id = 1") new_row = await cursor.fetchone() new_heartbeat = new_row[0] assert new_heartbeat > original_heartbeat @pytest.mark.asyncio async def test_update_scheduler_lock_heartbeat_fails_if_not_held( lock_db: aiosqlite.Connection, ) -> None: """Test that heartbeat update fails if we don't hold the lock.""" result = await update_scheduler_lock_heartbeat(lock_db) assert result is False @pytest.mark.asyncio async def test_get_scheduler_lock_info_returns_details( lock_db: aiosqlite.Connection, ) -> None: """Test that lock info includes all relevant fields.""" await acquire_scheduler_lock(lock_db) info = await get_scheduler_lock_info(lock_db) assert info is not None assert "pid" in info assert "hostname" in info assert "created_at" in info assert "heartbeat_at" in info assert info["pid"] == os.getpid() @pytest.mark.asyncio async def test_get_scheduler_lock_info_returns_none_when_empty( lock_db: aiosqlite.Connection, ) -> None: """Test that lock info returns None when no lock is held.""" info = await get_scheduler_lock_info(lock_db) assert info is None @pytest.mark.asyncio async def test_scheduler_lock_full_lifecycle( lock_db: aiosqlite.Connection, ) -> None: """Test the full lifecycle: acquire, update heartbeat, release.""" # Initially no lock info = await get_scheduler_lock_info(lock_db) assert info is None # Acquire the lock result = await acquire_scheduler_lock(lock_db) assert result is True info = await get_scheduler_lock_info(lock_db) assert info is not None initial_heartbeat = info["heartbeat_at"] # Update heartbeat multiple times time.sleep(0.01) result = await update_scheduler_lock_heartbeat(lock_db) assert result is True info = await get_scheduler_lock_info(lock_db) updated_heartbeat = info["heartbeat_at"] assert updated_heartbeat > initial_heartbeat # Release the lock await release_scheduler_lock(lock_db) info = await get_scheduler_lock_info(lock_db) assert info is None