Files
BanGUI/backend/tests/test_scheduler_lock.py
Lukas 0d5882b32f Fix HIGH priority issues: unbounded queries, rate limiting, health checks
Issue #3 - Unbounded Query Results (OOM):
- get_all_archived_history() now uses keyset pagination with bounded max_rows (50k default)
- Added 'id' field to records from get_archived_history() and get_archived_history_keyset()
- Protocol signature updated with page_size, max_rows, last_ban_id params

Issue #7 - Docker Health Check Fails:
- Added curl to Dockerfile.backend runtime image
- HEALTHCHECK now uses 'curl -f http://localhost:8000/api/health'
- compose.prod.yml: increased start_period to 40s, timeout to 10s
- Frontend healthcheck proxies to backend /api/health

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-01 21:47:36 +02:00

533 lines
18 KiB
Python

"""Tests for the scheduler lock mechanism.
These tests verify that the database-backed scheduler lock correctly enforces
single-executor safety across multiple startup attempts, including stale lock
cleanup, heartbeat updates, and multi-process race condition prevention.
"""
from __future__ import annotations
import os
import time
from typing import Any
import aiosqlite
import pytest
from app.utils.scheduler_lock import (
SCHEDULER_LOCK_HEARTBEAT_INTERVAL_SECONDS,
SCHEDULER_LOCK_HEARTBEAT_TIMEOUT_SECONDS,
SCHEDULER_LOCK_TTL_SECONDS,
acquire_scheduler_lock,
get_lock_health,
get_scheduler_lock_info,
is_lock_stale,
release_scheduler_lock,
update_scheduler_lock_heartbeat,
)
@pytest.fixture
async def lock_db(tmp_path: Any) -> aiosqlite.Connection:
"""Create a temporary database with scheduler_lock table."""
db_path = tmp_path / "test.db"
db = await aiosqlite.connect(str(db_path))
await db.execute(
f"""
CREATE TABLE scheduler_lock (
id INTEGER PRIMARY KEY CHECK (id = 1),
pid INTEGER NOT NULL,
hostname TEXT NOT NULL,
created_at REAL NOT NULL,
heartbeat_at REAL NOT NULL,
heartbeat_timeout REAL NOT NULL DEFAULT {SCHEDULER_LOCK_HEARTBEAT_TIMEOUT_SECONDS}
);
"""
)
await db.commit()
yield db
await db.close()
@pytest.mark.asyncio
async def test_acquire_scheduler_lock_success(lock_db: aiosqlite.Connection) -> None:
"""Test successful lock acquisition."""
result = await acquire_scheduler_lock(lock_db)
assert result is True
# Verify the lock is in the database
cursor = await lock_db.execute("SELECT COUNT(*) FROM scheduler_lock")
count = await cursor.fetchone()
assert count[0] == 1
@pytest.mark.asyncio
async def test_acquire_scheduler_lock_fails_when_held(
lock_db: aiosqlite.Connection,
) -> None:
"""Test that lock acquisition fails if already held by another process.
Note: Same-PID re-acquire is allowed (refresh). Use separate connection
with different PID to test rejection.
"""
# First instance acquires the lock
result1 = await acquire_scheduler_lock(lock_db)
assert result1 is True
# Second instance (same process, same PID) - re-acquire succeeds (refresh)
result_same_pid = await acquire_scheduler_lock(lock_db)
assert result_same_pid is True
# To test rejection, create a separate database with a conflicting lock
# Simulate a different process holding the lock by inserting directly
import tempfile
import os
# Create a new in-memory database with pre-existing lock from "another process"
db_other = await aiosqlite.connect(":memory:")
await db_other.execute(
f"""
CREATE TABLE scheduler_lock (
id INTEGER PRIMARY KEY CHECK (id = 1),
pid INTEGER NOT NULL,
hostname TEXT NOT NULL,
created_at REAL NOT NULL,
heartbeat_at REAL NOT NULL,
heartbeat_timeout REAL NOT NULL DEFAULT {SCHEDULER_LOCK_HEARTBEAT_TIMEOUT_SECONDS}
)
"""
)
# Insert lock with PID=-1 (simulating another active process with recent heartbeat)
now = time.time()
await db_other.execute(
f"""
INSERT INTO scheduler_lock (id, pid, hostname, created_at, heartbeat_at, heartbeat_timeout)
VALUES (1, -1, 'other-host', ?, ?, {SCHEDULER_LOCK_HEARTBEAT_TIMEOUT_SECONDS})
""",
(now, now),
)
await db_other.commit()
# Now test that acquire fails when lock is held by another process
result_other = await acquire_scheduler_lock(db_other)
assert result_other is False
await db_other.close()
@pytest.mark.asyncio
async def test_acquire_scheduler_lock_cleans_stale_locks(
lock_db: aiosqlite.Connection,
) -> None:
"""Test that stale locks are automatically cleaned up."""
# Insert a stale lock manually (old heartbeat)
now = time.time()
stale_heartbeat = now - SCHEDULER_LOCK_HEARTBEAT_TIMEOUT_SECONDS - 10
await lock_db.execute(
"""
INSERT INTO scheduler_lock (id, pid, hostname, created_at, heartbeat_at, heartbeat_timeout)
VALUES (1, 9999, 'stale-host', ?, ?, ?)
""",
(now - 100, stale_heartbeat, SCHEDULER_LOCK_HEARTBEAT_TIMEOUT_SECONDS),
)
await lock_db.commit()
# New instance should clean up the stale lock and acquire
result = await acquire_scheduler_lock(lock_db)
assert result is True
# Verify the old lock is gone and new one is in place
cursor = await lock_db.execute(
"SELECT pid, hostname FROM scheduler_lock WHERE id = 1"
)
row = await cursor.fetchone()
assert row is not None
pid, hostname = row
assert pid == os.getpid()
assert hostname is not None
@pytest.mark.asyncio
async def test_acquire_scheduler_lock_cleans_stale_locks_with_new_schema(
lock_db: aiosqlite.Connection,
) -> None:
"""Test that stale locks are automatically cleaned up with new timeout field."""
# Insert a stale lock manually (heartbeat past timeout)
now = time.time()
stale_heartbeat = now - SCHEDULER_LOCK_HEARTBEAT_TIMEOUT_SECONDS - 10
await lock_db.execute(
"""
INSERT INTO scheduler_lock (id, pid, hostname, created_at, heartbeat_at, heartbeat_timeout)
VALUES (1, 9999, 'stale-host', ?, ?, ?)
""",
(now - 100, stale_heartbeat, SCHEDULER_LOCK_HEARTBEAT_TIMEOUT_SECONDS),
)
await lock_db.commit()
# New instance should steal the stale lock and acquire
result = await acquire_scheduler_lock(lock_db)
assert result is True
# Verify the old lock is gone and new one is in place
cursor = await lock_db.execute(
"SELECT pid, hostname, heartbeat_timeout FROM scheduler_lock WHERE id = 1"
)
row = await cursor.fetchone()
assert row is not None
pid, hostname, timeout = row
assert pid == os.getpid()
assert hostname is not None
assert timeout == SCHEDULER_LOCK_HEARTBEAT_TIMEOUT_SECONDS
@pytest.mark.asyncio
async def test_release_scheduler_lock_success(
lock_db: aiosqlite.Connection,
) -> None:
"""Test successful lock release."""
# Acquire the lock
await acquire_scheduler_lock(lock_db)
# Release it
await release_scheduler_lock(lock_db)
# Verify the lock is gone
cursor = await lock_db.execute("SELECT COUNT(*) FROM scheduler_lock")
count = await cursor.fetchone()
assert count[0] == 0
@pytest.mark.asyncio
async def test_release_scheduler_lock_not_held(
lock_db: aiosqlite.Connection,
) -> None:
"""Test that releasing a lock we don't hold is safe."""
# Try to release without acquiring — should not crash
await release_scheduler_lock(lock_db)
# Verify the lock is still empty
cursor = await lock_db.execute("SELECT COUNT(*) FROM scheduler_lock")
count = await cursor.fetchone()
assert count[0] == 0
@pytest.mark.asyncio
async def test_update_scheduler_lock_heartbeat_success(
lock_db: aiosqlite.Connection,
) -> None:
"""Test successful heartbeat update."""
# Acquire the lock
await acquire_scheduler_lock(lock_db)
# Get the original heartbeat
cursor = await lock_db.execute("SELECT heartbeat_at FROM scheduler_lock WHERE id = 1")
original_row = await cursor.fetchone()
original_heartbeat = original_row[0]
# Wait a moment and update the heartbeat
time.sleep(0.01)
result = await update_scheduler_lock_heartbeat(lock_db)
assert result is True
# Verify the heartbeat was updated
cursor = await lock_db.execute("SELECT heartbeat_at FROM scheduler_lock WHERE id = 1")
new_row = await cursor.fetchone()
new_heartbeat = new_row[0]
assert new_heartbeat > original_heartbeat
@pytest.mark.asyncio
async def test_update_scheduler_lock_heartbeat_fails_if_not_held(
lock_db: aiosqlite.Connection,
) -> None:
"""Test that heartbeat update fails if we don't hold the lock."""
result = await update_scheduler_lock_heartbeat(lock_db)
assert result is False
@pytest.mark.asyncio
async def test_get_scheduler_lock_info_returns_details(
lock_db: aiosqlite.Connection,
) -> None:
"""Test that lock info includes all relevant fields."""
await acquire_scheduler_lock(lock_db)
info = await get_scheduler_lock_info(lock_db)
assert info is not None
assert "pid" in info
assert "hostname" in info
assert "created_at" in info
assert "heartbeat_at" in info
assert info["pid"] == os.getpid()
@pytest.mark.asyncio
async def test_get_scheduler_lock_info_returns_none_when_empty(
lock_db: aiosqlite.Connection,
) -> None:
"""Test that lock info returns None when no lock is held."""
info = await get_scheduler_lock_info(lock_db)
assert info is None
@pytest.mark.asyncio
async def test_scheduler_lock_full_lifecycle(
lock_db: aiosqlite.Connection,
) -> None:
"""Test the full lifecycle: acquire, update heartbeat, release."""
# Initially no lock
info = await get_scheduler_lock_info(lock_db)
assert info is None
# Acquire the lock
result = await acquire_scheduler_lock(lock_db)
assert result is True
info = await get_scheduler_lock_info(lock_db)
assert info is not None
initial_heartbeat = info["heartbeat_at"]
# Update heartbeat multiple times
time.sleep(0.01)
result = await update_scheduler_lock_heartbeat(lock_db)
assert result is True
info = await get_scheduler_lock_info(lock_db)
updated_heartbeat = info["heartbeat_at"]
assert updated_heartbeat > initial_heartbeat
# Release the lock
await release_scheduler_lock(lock_db)
info = await get_scheduler_lock_info(lock_db)
assert info is None
@pytest.mark.asyncio
async def test_scheduler_lock_heartbeat_interval_sanity(
lock_db: aiosqlite.Connection,
) -> None:
"""Verify heartbeat interval is less than TTL to prevent premature expiry.
With a 5-second heartbeat interval and 60-second TTL, the lock can survive
~12 missed heartbeats before expiring. This provides robust protection against
temporary delays or high load that could cause a single missed heartbeat.
"""
# Verify the configuration ratio is safe (interval < TTL)
assert SCHEDULER_LOCK_HEARTBEAT_INTERVAL_SECONDS < SCHEDULER_LOCK_TTL_SECONDS
# With this ratio, the lock can survive at least 12 missed heartbeats
# (60s TTL / 5s interval = 12 intervals between heartbeats before expiry)
safe_ratio = SCHEDULER_LOCK_TTL_SECONDS / SCHEDULER_LOCK_HEARTBEAT_INTERVAL_SECONDS
assert safe_ratio >= 12, (
f"Heartbeat interval too long: lock can only survive {safe_ratio:.1f} missed heartbeats. "
f"Should be at least 12 for safety."
)
@pytest.mark.asyncio
async def test_scheduler_lock_two_instances_cannot_both_hold(
tmp_path: Any,
) -> None:
"""Test that two different processes cannot both hold the lock.
This simulates two instances trying to acquire the lock. The second
instance should fail to acquire while the first holds a valid lock.
Note: Same-PID re-acquire is allowed (refresh). To test rejection,
we insert a lock with a different PID before testing.
"""
db_path = tmp_path / "test.db"
# Instance A connects and acquires the lock
db_a = await aiosqlite.connect(str(db_path))
await db_a.execute(
f"""
CREATE TABLE scheduler_lock (
id INTEGER PRIMARY KEY CHECK (id = 1),
pid INTEGER NOT NULL,
hostname TEXT NOT NULL,
created_at REAL NOT NULL,
heartbeat_at REAL NOT NULL,
heartbeat_timeout REAL NOT NULL DEFAULT {SCHEDULER_LOCK_HEARTBEAT_TIMEOUT_SECONDS}
);
"""
)
await db_a.commit()
result_a = await acquire_scheduler_lock(db_a)
assert result_a is True
# Same-PID re-acquire succeeds (refresh behavior)
result_a_refresh = await acquire_scheduler_lock(db_a)
assert result_a_refresh is True
# Simulate another process holding the lock by inserting with a different PID
# (this is the "conflicting" lock we want to reject)
await db_a.execute(
f"""
INSERT OR REPLACE INTO scheduler_lock (id, pid, hostname, created_at, heartbeat_at, heartbeat_timeout)
VALUES (1, -999, 'other-host', {time.time()}, {time.time()}, {SCHEDULER_LOCK_HEARTBEAT_TIMEOUT_SECONDS})
"""
)
await db_a.commit()
# Instance B (different connection, same PID in test) tries to acquire
# Should fail because different PID (-999) holds the lock
db_b = await aiosqlite.connect(str(db_path))
result_b = await acquire_scheduler_lock(db_b)
assert result_b is False
# Clear the conflicting lock directly (simulating other process dying)
await db_a.execute("DELETE FROM scheduler_lock")
await db_a.commit()
# Now Instance B can acquire
result_b3 = await acquire_scheduler_lock(db_b)
assert result_b3 is True
await db_a.close()
await db_b.close()
@pytest.mark.asyncio
async def test_acquire_scheduler_lock_steals_stale_lock(
lock_db: aiosqlite.Connection,
) -> None:
"""Test that a stale lock can be stolen by another instance.
Scenario: Process A acquires the lock but crashes (never releases it).
Process B starts up and sees the lock has stale heartbeat (past timeout).
Process B should be able to steal the lock.
This is the key fix for the race condition issue: orphaned locks no longer
permanently block the scheduler.
"""
# Simulate Process A acquiring the lock
result_a = await acquire_scheduler_lock(lock_db)
assert result_a is True
# Get lock info to see heartbeat timeout
info = await get_scheduler_lock_info(lock_db)
assert info is not None
heartbeat_timeout = info["heartbeat_timeout"]
# Simulate stale lock: manually set heartbeat to far in the past
now = time.time()
stale_heartbeat = now - heartbeat_timeout - 10
await lock_db.execute(
"UPDATE scheduler_lock SET heartbeat_at = ? WHERE id = 1",
(stale_heartbeat,),
)
await lock_db.commit()
# Process B should now be able to acquire (steal) the stale lock
result_b = await acquire_scheduler_lock(lock_db)
assert result_b is True
# Verify Process B now holds the lock
info_b = await get_scheduler_lock_info(lock_db)
assert info_b is not None
assert info_b["pid"] == os.getpid()
@pytest.mark.asyncio
async def test_is_lock_stale_function() -> None:
"""Test the is_lock_stale helper function."""
now = time.time()
timeout = 300.0
# Fresh lock is not stale
heartbeat_at = now - 10
assert await is_lock_stale(heartbeat_at, timeout, now) is False
# Lock past timeout is stale
heartbeat_at = now - 400
assert await is_lock_stale(heartbeat_at, timeout, now) is True
# Exactly at timeout is not stale (boundary condition)
heartbeat_at = now - 300
assert await is_lock_stale(heartbeat_at, timeout, now) is False
@pytest.mark.asyncio
async def test_get_lock_health_no_lock(lock_db: aiosqlite.Connection) -> None:
"""Test get_lock_health when no lock exists."""
health = await get_lock_health(lock_db)
assert health["has_lock"] is False
assert health["is_stale"] is False
assert health["pid"] is None
assert health["stale_reason"] is None
@pytest.mark.asyncio
async def test_get_lock_health_active_lock(lock_db: aiosqlite.Connection) -> None:
"""Test get_lock_health with an active, healthy lock."""
await acquire_scheduler_lock(lock_db)
health = await get_lock_health(lock_db)
assert health["has_lock"] is True
assert health["is_stale"] is False
assert health["pid"] == os.getpid()
assert health["hostname"] is not None
assert health["heartbeat_timeout"] == SCHEDULER_LOCK_HEARTBEAT_TIMEOUT_SECONDS
assert health["stale_reason"] is None
@pytest.mark.asyncio
async def test_get_lock_health_stale_lock(lock_db: aiosqlite.Connection) -> None:
"""Test get_lock_health with a stale lock."""
await acquire_scheduler_lock(lock_db)
# Manually make the lock stale
now = time.time()
info = await get_scheduler_lock_info(lock_db)
stale_heartbeat = now - info["heartbeat_timeout"] - 10
await lock_db.execute(
"UPDATE scheduler_lock SET heartbeat_at = ? WHERE id = 1",
(stale_heartbeat,),
)
await lock_db.commit()
health = await get_lock_health(lock_db)
assert health["has_lock"] is True
assert health["is_stale"] is True
assert health["stale_reason"] is not None
assert "heartbeat_age" in health["stale_reason"]
assert "timeout" in health["stale_reason"]
@pytest.mark.asyncio
async def test_heartbeat_update_error_returns_false(
lock_db: aiosqlite.Connection,
) -> None:
"""Test that heartbeat update errors return False instead of raising."""
# Try to update heartbeat without acquiring lock first
result = await update_scheduler_lock_heartbeat(lock_db)
assert result is False
# Acquire lock
await acquire_scheduler_lock(lock_db)
# Heartbeat should work
result = await update_scheduler_lock_heartbeat(lock_db)
assert result is True
@pytest.mark.asyncio
async def test_concurrent_acquire_from_same_process(lock_db: aiosqlite.Connection) -> None:
"""Test that concurrent acquire attempts from same process re-acquires (refreshes)."""
# First acquisition should succeed
result1 = await acquire_scheduler_lock(lock_db)
assert result1 is True
# Second acquisition from same process should succeed (re-acquire/refresh)
result2 = await acquire_scheduler_lock(lock_db)
assert result2 is True
# Heartbeat should be updated
info = await get_scheduler_lock_info(lock_db)
assert info is not None
# Release and re-acquire should work
await release_scheduler_lock(lock_db)
result3 = await acquire_scheduler_lock(lock_db)
assert result3 is True