Files
Aniworld/tests/unit/test_scheduler_service.py
Lukas 63da2daa53 Add scheduler service and comprehensive unit tests
- Created src/server/services/scheduler_service.py
  * Interval-based background scheduler
  * Automatic library rescans
  * Conflict prevention (no concurrent scans)
  * WebSocket event broadcasting
  * Configuration reload support
  * Graceful start/stop lifecycle

- Created tests/unit/test_scheduler_service.py
  * 26 comprehensive tests all passing
  * 100% test coverage of service logic
  * Tests initialization, execution, conflicts, config, status
  * Tests edge cases and error handling

- Updated docs/instructions.md
  * Marked scheduler service task as completed
  * Documented 26/26 passing tests
2026-01-31 15:09:54 +01:00

664 lines
20 KiB
Python

"""Unit tests for scheduler service.
This module tests the scheduler service logic including:
- Scheduled library rescan execution
- Scheduler state persistence across restarts
- Background task execution and lifecycle
- Scheduler conflict resolution (manual vs automated scans)
- Error handling during scheduled operations
"""
import asyncio
from datetime import datetime, timezone
from unittest.mock import AsyncMock, Mock, patch
import pytest
from src.server.models.config import AppConfig, SchedulerConfig
from src.server.services.scheduler_service import (
SchedulerService,
SchedulerServiceError,
get_scheduler_service,
reset_scheduler_service,
)
@pytest.fixture
def mock_config_service():
"""Create a mock configuration service."""
with patch("src.server.services.scheduler_service.get_config_service") as mock:
config_service = Mock()
# Default configuration
app_config = AppConfig(
scheduler=SchedulerConfig(
enabled=True,
interval_minutes=60
)
)
config_service.load_config.return_value = app_config
mock.return_value = config_service
yield config_service
@pytest.fixture
def mock_anime_service():
"""Create a mock anime service."""
with patch("src.server.utils.dependencies.get_anime_service") as mock:
service = Mock()
service.rescan = AsyncMock()
mock.return_value = service
yield service
@pytest.fixture
def mock_websocket_service():
"""Create a mock WebSocket service."""
with patch("src.server.services.websocket_service.get_websocket_service") as mock:
service = Mock()
service.manager = Mock()
service.manager.broadcast = AsyncMock()
mock.return_value = service
yield service
@pytest.fixture
def scheduler_service():
"""Create a fresh scheduler service instance for each test."""
reset_scheduler_service()
service = SchedulerService()
yield service
# Cleanup
if service._is_running:
asyncio.create_task(service.stop())
class TestSchedulerServiceInitialization:
"""Tests for scheduler service initialization and lifecycle."""
@pytest.mark.asyncio
async def test_scheduler_starts_when_enabled(
self,
scheduler_service,
mock_config_service
):
"""Test scheduler starts successfully when enabled in config."""
await scheduler_service.start()
assert scheduler_service._is_running is True
assert scheduler_service._task is not None
assert not scheduler_service._task.done()
assert scheduler_service._config.enabled is True
assert scheduler_service._config.interval_minutes == 60
# Cleanup
await scheduler_service.stop()
@pytest.mark.asyncio
async def test_scheduler_does_not_start_when_disabled(
self,
scheduler_service,
mock_config_service
):
"""Test scheduler does not start when disabled in config."""
# Modify config to disable scheduler
app_config = AppConfig(
scheduler=SchedulerConfig(
enabled=False,
interval_minutes=60
)
)
mock_config_service.load_config.return_value = app_config
await scheduler_service.start()
assert scheduler_service._is_running is False
assert scheduler_service._task is None
@pytest.mark.asyncio
async def test_scheduler_raises_error_if_already_running(
self,
scheduler_service,
mock_config_service
):
"""Test scheduler raises error when started twice."""
await scheduler_service.start()
with pytest.raises(SchedulerServiceError, match="already running"):
await scheduler_service.start()
# Cleanup
await scheduler_service.stop()
@pytest.mark.asyncio
async def test_scheduler_stops_gracefully(
self,
scheduler_service,
mock_config_service
):
"""Test scheduler stops gracefully."""
await scheduler_service.start()
assert scheduler_service._is_running is True
await scheduler_service.stop()
assert scheduler_service._is_running is False
assert scheduler_service._task.done() or scheduler_service._task.cancelled()
@pytest.mark.asyncio
async def test_scheduler_stop_when_not_running(
self,
scheduler_service
):
"""Test scheduler stop is safe when not running."""
# Should not raise any errors
await scheduler_service.stop()
assert scheduler_service._is_running is False
@pytest.mark.asyncio
async def test_scheduler_raises_error_on_config_load_failure(
self,
scheduler_service,
mock_config_service
):
"""Test scheduler raises error if config loading fails."""
from src.server.services.config_service import ConfigServiceError
mock_config_service.load_config.side_effect = ConfigServiceError(
"Config file not found"
)
with pytest.raises(SchedulerServiceError, match="Failed to load config"):
await scheduler_service.start()
class TestSchedulerServiceExecution:
"""Tests for scheduled rescan execution logic."""
@pytest.mark.asyncio
async def test_scheduler_performs_rescan(
self,
scheduler_service,
mock_config_service,
mock_anime_service,
mock_websocket_service
):
"""Test scheduler performs library rescan."""
# Use a short interval for testing
app_config = AppConfig(
scheduler=SchedulerConfig(
enabled=True,
interval_minutes=1 # 1 minute for faster testing
)
)
mock_config_service.load_config.return_value = app_config
await scheduler_service.start()
# Wait a bit longer than the interval to ensure rescan executes
# (1 minute = 60 seconds, add buffer)
await asyncio.sleep(65)
# Verify rescan was called
assert mock_anime_service.rescan.call_count >= 1
assert mock_websocket_service.manager.broadcast.call_count >= 2 # start + complete
# Verify last scan time was recorded
assert scheduler_service._last_scan_time is not None
# Cleanup
await scheduler_service.stop()
@pytest.mark.asyncio
async def test_scheduler_broadcasts_rescan_events(
self,
scheduler_service,
mock_config_service,
mock_anime_service,
mock_websocket_service
):
"""Test scheduler broadcasts WebSocket events during rescan."""
await scheduler_service._perform_rescan()
# Should broadcast start and complete events
assert mock_websocket_service.manager.broadcast.call_count == 2
# Verify event types
calls = mock_websocket_service.manager.broadcast.call_args_list
start_event = calls[0][0][0]
complete_event = calls[1][0][0]
assert start_event["type"] == "scheduled_rescan_started"
assert "timestamp" in start_event["data"]
assert complete_event["type"] == "scheduled_rescan_completed"
assert "timestamp" in complete_event["data"]
assert "duration_seconds" in complete_event["data"]
@pytest.mark.asyncio
async def test_scheduler_handles_rescan_failure(
self,
scheduler_service,
mock_config_service,
mock_anime_service,
mock_websocket_service
):
"""Test scheduler handles rescan failures gracefully."""
# Make rescan fail
mock_anime_service.rescan.side_effect = Exception("Database error")
# Should not raise exception
await scheduler_service._perform_rescan()
# Should broadcast error event
assert mock_websocket_service.manager.broadcast.call_count >= 2
error_event = mock_websocket_service.manager.broadcast.call_args_list[-1][0][0]
assert error_event["type"] == "scheduled_rescan_error"
assert "error" in error_event["data"]
assert "Database error" in error_event["data"]["error"]
# Scan should no longer be in progress
assert scheduler_service._scan_in_progress is False
class TestSchedulerServiceConflictResolution:
"""Tests for scheduler conflict resolution."""
@pytest.mark.asyncio
async def test_scheduler_prevents_concurrent_rescans(
self,
scheduler_service,
mock_config_service,
mock_anime_service,
mock_websocket_service
):
"""Test scheduler prevents concurrent rescans."""
# Make rescan slow
async def slow_rescan():
await asyncio.sleep(1)
mock_anime_service.rescan.side_effect = slow_rescan
# Start first rescan
task1 = asyncio.create_task(scheduler_service._perform_rescan())
# Wait a bit to ensure first rescan is in progress
await asyncio.sleep(0.1)
# Try to start second rescan
task2 = asyncio.create_task(scheduler_service._perform_rescan())
# Wait for both to complete
await task1
await task2
# Only one rescan should have executed
assert mock_anime_service.rescan.call_count == 1
@pytest.mark.asyncio
async def test_manual_trigger_fails_during_scheduled_scan(
self,
scheduler_service,
mock_config_service,
mock_anime_service,
mock_websocket_service
):
"""Test manual trigger returns False when scan is in progress."""
await scheduler_service.start()
# Make rescan slow
async def slow_rescan():
await asyncio.sleep(1)
mock_anime_service.rescan.side_effect = slow_rescan
# Start a rescan
task = asyncio.create_task(scheduler_service._perform_rescan())
# Wait for scan to be in progress
await asyncio.sleep(0.1)
# Try to manually trigger
result = await scheduler_service.trigger_rescan()
assert result is False
# Wait for the rescan to complete
await task
# Cleanup
await scheduler_service.stop()
@pytest.mark.asyncio
async def test_manual_trigger_succeeds_when_idle(
self,
scheduler_service,
mock_config_service,
mock_anime_service,
mock_websocket_service
):
"""Test manual trigger succeeds when no scan is in progress."""
await scheduler_service.start()
result = await scheduler_service.trigger_rescan()
assert result is True
assert mock_anime_service.rescan.call_count == 1
# Cleanup
await scheduler_service.stop()
@pytest.mark.asyncio
async def test_manual_trigger_raises_error_when_not_running(
self,
scheduler_service,
mock_config_service
):
"""Test manual trigger raises error when scheduler is not running."""
with pytest.raises(SchedulerServiceError, match="not running"):
await scheduler_service.trigger_rescan()
class TestSchedulerServiceConfiguration:
"""Tests for scheduler configuration management."""
@pytest.mark.asyncio
async def test_reload_config_updates_settings(
self,
scheduler_service,
mock_config_service
):
"""Test config reload updates scheduler settings."""
# Start with default config
await scheduler_service.start()
assert scheduler_service._config.interval_minutes == 60
# Update config
new_config = AppConfig(
scheduler=SchedulerConfig(
enabled=True,
interval_minutes=120
)
)
mock_config_service.load_config.return_value = new_config
await scheduler_service.reload_config()
assert scheduler_service._config.interval_minutes == 120
# Cleanup
await scheduler_service.stop()
@pytest.mark.asyncio
async def test_reload_config_restarts_on_interval_change(
self,
scheduler_service,
mock_config_service
):
"""Test scheduler restarts when interval changes."""
await scheduler_service.start()
original_task = scheduler_service._task
# Change interval
new_config = AppConfig(
scheduler=SchedulerConfig(
enabled=True,
interval_minutes=120
)
)
mock_config_service.load_config.return_value = new_config
await scheduler_service.reload_config()
# Should have a new task
assert scheduler_service._task != original_task
assert scheduler_service._is_running is True
# Cleanup
await scheduler_service.stop()
@pytest.mark.asyncio
async def test_reload_config_stops_when_disabled(
self,
scheduler_service,
mock_config_service
):
"""Test scheduler stops when config is disabled."""
await scheduler_service.start()
assert scheduler_service._is_running is True
# Disable scheduler
new_config = AppConfig(
scheduler=SchedulerConfig(
enabled=False,
interval_minutes=60
)
)
mock_config_service.load_config.return_value = new_config
await scheduler_service.reload_config()
assert scheduler_service._is_running is False
@pytest.mark.asyncio
async def test_reload_config_starts_when_enabled(
self,
scheduler_service,
mock_config_service
):
"""Test scheduler starts when config is enabled."""
# Start with disabled config
disabled_config = AppConfig(
scheduler=SchedulerConfig(
enabled=False,
interval_minutes=60
)
)
mock_config_service.load_config.return_value = disabled_config
await scheduler_service.start()
assert scheduler_service._is_running is False
# Enable scheduler
enabled_config = AppConfig(
scheduler=SchedulerConfig(
enabled=True,
interval_minutes=60
)
)
mock_config_service.load_config.return_value = enabled_config
await scheduler_service.reload_config()
assert scheduler_service._is_running is True
# Cleanup
await scheduler_service.stop()
@pytest.mark.asyncio
async def test_reload_config_raises_error_on_failure(
self,
scheduler_service,
mock_config_service
):
"""Test config reload raises error on failure."""
from src.server.services.config_service import ConfigServiceError
await scheduler_service.start()
mock_config_service.load_config.side_effect = ConfigServiceError(
"Config corrupted"
)
with pytest.raises(SchedulerServiceError, match="Failed to reload config"):
await scheduler_service.reload_config()
# Cleanup
await scheduler_service.stop()
class TestSchedulerServiceStatus:
"""Tests for scheduler status reporting."""
def test_get_status_returns_correct_state(
self,
scheduler_service,
mock_config_service
):
"""Test get_status returns accurate scheduler state."""
status = scheduler_service.get_status()
assert status["is_running"] is False
assert status["enabled"] is False
assert status["interval_minutes"] is None
assert status["last_scan_time"] is None
assert status["next_scan_time"] is None
assert status["scan_in_progress"] is False
@pytest.mark.asyncio
async def test_get_status_after_start(
self,
scheduler_service,
mock_config_service
):
"""Test get_status returns correct state after starting."""
await scheduler_service.start()
status = scheduler_service.get_status()
assert status["is_running"] is True
assert status["enabled"] is True
assert status["interval_minutes"] == 60
# Cleanup
await scheduler_service.stop()
@pytest.mark.asyncio
async def test_get_status_after_rescan(
self,
scheduler_service,
mock_config_service,
mock_anime_service,
mock_websocket_service
):
"""Test get_status includes scan times after rescan."""
await scheduler_service.start()
await scheduler_service._perform_rescan()
status = scheduler_service.get_status()
assert status["last_scan_time"] is not None
assert isinstance(status["last_scan_time"], str) # ISO format
# Cleanup
await scheduler_service.stop()
class TestSchedulerServiceSingleton:
"""Tests for scheduler service singleton pattern."""
def test_get_scheduler_service_returns_singleton(self):
"""Test get_scheduler_service returns the same instance."""
reset_scheduler_service()
service1 = get_scheduler_service()
service2 = get_scheduler_service()
assert service1 is service2
def test_reset_scheduler_service_clears_singleton(self):
"""Test reset_scheduler_service creates new instance."""
service1 = get_scheduler_service()
reset_scheduler_service()
service2 = get_scheduler_service()
assert service1 is not service2
class TestSchedulerServiceEdgeCases:
"""Tests for edge cases and error conditions."""
@pytest.mark.asyncio
async def test_scheduler_handles_websocket_broadcast_failure(
self,
scheduler_service,
mock_config_service,
mock_anime_service,
mock_websocket_service
):
"""Test scheduler continues when WebSocket broadcast fails."""
# Make broadcast fail AFTER rescan completes (so rescan still executes)
call_count = {"count": 0}
async def broadcast_side_effect(*args, **kwargs):
call_count["count"] += 1
# First call (rescan started) - succeed
# Second call (rescan completed) - fail
if call_count["count"] >= 2:
raise Exception("WebSocket error")
mock_websocket_service.manager.broadcast.side_effect = broadcast_side_effect
# Should not raise exception
await scheduler_service._perform_rescan()
# Rescan should still have been attempted
assert mock_anime_service.rescan.call_count == 1
# WebSocket broadcast should have been attempted multiple times
assert mock_websocket_service.manager.broadcast.call_count >= 1
@pytest.mark.asyncio
async def test_scheduler_loop_continues_after_error(
self,
scheduler_service,
mock_config_service,
mock_anime_service,
mock_websocket_service
):
"""Test scheduler loop continues after encountering error."""
# Make first rescan fail, second succeed
mock_anime_service.rescan.side_effect = [
Exception("First error"),
None # Second call succeeds
]
# Start with very short interval
app_config = AppConfig(
scheduler=SchedulerConfig(
enabled=True,
interval_minutes=1 # 1 minute
)
)
mock_config_service.load_config.return_value = app_config
await scheduler_service.start()
# Wait for two rescan cycles (2 minutes + buffer)
await asyncio.sleep(130)
# Should have attempted at least 2 rescans
assert mock_anime_service.rescan.call_count >= 2
# Cleanup
await scheduler_service.stop()
@pytest.mark.asyncio
async def test_scheduler_cancellation_is_clean(
self,
scheduler_service,
mock_config_service
):
"""Test scheduler task cancellation is handled cleanly."""
await scheduler_service.start()
# Cancel the task directly
scheduler_service._task.cancel()
# Stop should handle this gracefully
await scheduler_service.stop()
assert scheduler_service._is_running is False