"""Integration tests for scheduler workflow. This module tests end-to-end scheduler workflows including: - Scheduler trigger → library rescan → database update workflow - Configuration changes apply immediately - Scheduler persistence after application restart - Concurrent manual and automated scan handling """ import asyncio from datetime import datetime, timezone from unittest.mock import AsyncMock, Mock, patch import pytest from src.server.models.config import AppConfig, SchedulerConfig from src.server.services.scheduler_service import ( SchedulerService, get_scheduler_service, reset_scheduler_service, ) @pytest.fixture def mock_config_service(): """Create a mock configuration service.""" with patch("src.server.services.scheduler_service.get_config_service") as mock: config_service = Mock() # Default configuration app_config = AppConfig( scheduler=SchedulerConfig( enabled=True, interval_minutes=1 # Short interval for testing ) ) config_service.load_config.return_value = app_config config_service.update_config = Mock() mock.return_value = config_service yield config_service @pytest.fixture def mock_anime_service(): """Create a mock anime service that simulates database updates.""" with patch("src.server.utils.dependencies.get_anime_service") as mock: service = Mock() service.rescan = AsyncMock() service.series_list = [] # Simulate database update during rescan async def rescan_side_effect(): # Simulate finding new series service.series_list = [ {"key": "series1", "name": "New Series 1"}, {"key": "series2", "name": "New Series 2"} ] await asyncio.sleep(0.1) # Simulate work service.rescan.side_effect = rescan_side_effect mock.return_value = service yield service @pytest.fixture def mock_websocket_service(): """Create a mock WebSocket service that tracks broadcasts.""" with patch("src.server.services.websocket_service.get_websocket_service") as mock: service = Mock() service.manager = Mock() service.broadcasts = [] # Track all broadcasts async def broadcast_side_effect(message): service.broadcasts.append(message) service.manager.broadcast = AsyncMock(side_effect=broadcast_side_effect) mock.return_value = service yield service @pytest.fixture async def scheduler_service(): """Create a fresh scheduler service instance for each test.""" reset_scheduler_service() service = SchedulerService() yield service # Cleanup if service._is_running: await service.stop() class TestSchedulerWorkflow: """Tests for end-to-end scheduler workflows.""" @pytest.mark.asyncio async def test_scheduled_rescan_updates_database( self, scheduler_service, mock_config_service, mock_anime_service, mock_websocket_service ): """Test that scheduled rescan updates the database with new series.""" # Start scheduler await scheduler_service.start() # Wait for at least one scan cycle (1 minute + buffer) await asyncio.sleep(65) # Verify database was updated assert mock_anime_service.rescan.call_count >= 1 assert len(mock_anime_service.series_list) == 2 # Verify WebSocket notifications were sent assert len(mock_websocket_service.broadcasts) >= 2 # Check for rescan events event_types = [b["type"] for b in mock_websocket_service.broadcasts] assert "scheduled_rescan_started" in event_types assert "scheduled_rescan_completed" in event_types # Cleanup await scheduler_service.stop() @pytest.mark.asyncio async def test_configuration_change_applies_immediately( self, scheduler_service, mock_config_service, mock_anime_service, mock_websocket_service ): """Test that configuration changes are applied immediately.""" # Start with 1 minute interval await scheduler_service.start() original_interval = scheduler_service._config.interval_minutes assert original_interval == 1 # Change interval to 2 minutes new_config = AppConfig( scheduler=SchedulerConfig( enabled=True, interval_minutes=2 ) ) mock_config_service.load_config.return_value = new_config # Reload configuration await scheduler_service.reload_config() # Verify new interval is applied assert scheduler_service._config.interval_minutes == 2 assert scheduler_service._is_running is True # Should still be running # Cleanup await scheduler_service.stop() @pytest.mark.asyncio async def test_disable_scheduler_stops_execution( self, scheduler_service, mock_config_service, mock_anime_service, mock_websocket_service ): """Test that disabling scheduler stops future rescans.""" # Start scheduler await scheduler_service.start() assert scheduler_service._is_running is True # Wait for one scan to complete await asyncio.sleep(65) initial_scan_count = mock_anime_service.rescan.call_count assert initial_scan_count >= 1 # Disable scheduler disabled_config = AppConfig( scheduler=SchedulerConfig( enabled=False, interval_minutes=1 ) ) mock_config_service.load_config.return_value = disabled_config await scheduler_service.reload_config() # Verify scheduler stopped assert scheduler_service._is_running is False # Wait another scan cycle await asyncio.sleep(65) # Verify no additional scans occurred assert mock_anime_service.rescan.call_count == initial_scan_count @pytest.mark.asyncio async def test_manual_scan_blocks_scheduled_scan( self, scheduler_service, mock_config_service, mock_anime_service, mock_websocket_service ): """Test that manual scan prevents concurrent scheduled scan.""" await scheduler_service.start() # Make rescan slow to simulate long-running operation async def slow_rescan(): await asyncio.sleep(2) mock_anime_service.rescan.side_effect = slow_rescan # Trigger manual scan task1 = asyncio.create_task(scheduler_service._perform_rescan()) # Wait a bit to ensure manual scan is in progress await asyncio.sleep(0.5) assert scheduler_service._scan_in_progress is True # Try to trigger another scan (simulating scheduled trigger) result = await scheduler_service.trigger_rescan() # Second scan should be blocked assert result is False # Wait for first scan to complete await task1 # Verify only one scan executed assert mock_anime_service.rescan.call_count == 1 # Cleanup await scheduler_service.stop() @pytest.mark.asyncio async def test_scheduler_state_persists_across_restart( self, mock_config_service, mock_anime_service, mock_websocket_service ): """Test that scheduler can restart with same configuration.""" # Create and start first scheduler instance reset_scheduler_service() scheduler1 = SchedulerService() await scheduler1.start() # Record configuration original_config = scheduler1._config assert scheduler1._is_running is True # Stop scheduler (simulating app shutdown) await scheduler1.stop() assert scheduler1._is_running is False # Create new scheduler instance (simulating app restart) reset_scheduler_service() scheduler2 = SchedulerService() # Start new scheduler with same configuration await scheduler2.start() # Verify it has same configuration and is running assert scheduler2._is_running is True assert scheduler2._config.enabled == original_config.enabled assert scheduler2._config.interval_minutes == original_config.interval_minutes # Cleanup await scheduler2.stop() @pytest.mark.asyncio async def test_scheduler_recovers_from_rescan_failure( self, scheduler_service, mock_config_service, mock_anime_service, mock_websocket_service ): """Test that scheduler continues after rescan failure.""" # Make first rescan fail, subsequent rescans succeed call_count = {"count": 0} async def failing_rescan(): call_count["count"] += 1 if call_count["count"] == 1: raise Exception("Database connection error") # Subsequent calls succeed mock_anime_service.rescan.side_effect = failing_rescan await scheduler_service.start() # Wait for multiple scan cycles (2 minutes + buffer) await asyncio.sleep(130) # Verify multiple scans were attempted despite failure assert mock_anime_service.rescan.call_count >= 2 # Verify error was broadcast error_broadcasts = [ b for b in mock_websocket_service.broadcasts if b.get("type") == "scheduled_rescan_error" ] assert len(error_broadcasts) >= 1 # Cleanup await scheduler_service.stop() @pytest.mark.asyncio async def test_full_workflow_trigger_rescan_update_notify( self, scheduler_service, mock_config_service, mock_anime_service, mock_websocket_service ): """Test complete workflow: trigger → rescan → update → notify.""" await scheduler_service.start() # Trigger manual rescan result = await scheduler_service.trigger_rescan() assert result is True # Verify workflow steps # 1. Rescan was performed assert mock_anime_service.rescan.call_count == 1 # 2. Database was updated with new series assert len(mock_anime_service.series_list) == 2 # 3. WebSocket notifications were sent assert len(mock_websocket_service.broadcasts) >= 2 # 4. Verify event sequence event_types = [b["type"] for b in mock_websocket_service.broadcasts] start_index = event_types.index("scheduled_rescan_started") complete_index = event_types.index("scheduled_rescan_completed") assert complete_index > start_index # Complete comes after start # 5. Verify scan time was recorded assert scheduler_service._last_scan_time is not None assert isinstance(scheduler_service._last_scan_time, datetime) # 6. Scan is no longer in progress assert scheduler_service._scan_in_progress is False # Cleanup await scheduler_service.stop() @pytest.mark.asyncio async def test_multiple_sequential_rescans( self, scheduler_service, mock_config_service, mock_anime_service, mock_websocket_service ): """Test multiple sequential rescans execute successfully.""" await scheduler_service.start() # Trigger 3 manual rescans sequentially for i in range(3): result = await scheduler_service.trigger_rescan() assert result is True # Small delay between rescans await asyncio.sleep(0.1) # Verify all 3 rescans executed assert mock_anime_service.rescan.call_count == 3 # Verify 6 WebSocket broadcasts (start + complete for each scan) assert len(mock_websocket_service.broadcasts) >= 6 # Cleanup await scheduler_service.stop() @pytest.mark.asyncio async def test_scheduler_status_accuracy_during_workflow( self, scheduler_service, mock_config_service, mock_anime_service, mock_websocket_service ): """Test that status accurately reflects scheduler state during workflow.""" # Initial status status = scheduler_service.get_status() assert status["is_running"] is False assert status["scan_in_progress"] is False # Start scheduler await scheduler_service.start() status = scheduler_service.get_status() assert status["is_running"] is True assert status["enabled"] is True assert status["interval_minutes"] == 1 # Make rescan slow to check in-progress status async def slow_rescan(): await asyncio.sleep(0.5) mock_anime_service.rescan.side_effect = slow_rescan # Start rescan task = asyncio.create_task(scheduler_service._perform_rescan()) # Check status during rescan await asyncio.sleep(0.1) status = scheduler_service.get_status() assert status["scan_in_progress"] is True # Wait for rescan to complete await task # Check status after rescan status = scheduler_service.get_status() assert status["scan_in_progress"] is False assert status["last_scan_time"] is not None # Cleanup await scheduler_service.stop() # Final status status = scheduler_service.get_status() assert status["is_running"] is False class TestSchedulerEdgeCases: """Tests for edge cases in scheduler workflows.""" @pytest.mark.asyncio async def test_rapid_enable_disable_cycles( self, mock_config_service, mock_anime_service, mock_websocket_service ): """Test rapid enable/disable cycles don't cause issues.""" reset_scheduler_service() scheduler = SchedulerService() # Rapidly enable and disable 5 times for i in range(5): enabled_config = AppConfig( scheduler=SchedulerConfig( enabled=True, interval_minutes=1 ) ) disabled_config = AppConfig( scheduler=SchedulerConfig( enabled=False, interval_minutes=1 ) ) if i % 2 == 0: mock_config_service.load_config.return_value = enabled_config await scheduler.reload_config() else: mock_config_service.load_config.return_value = disabled_config await scheduler.reload_config() await asyncio.sleep(0.1) # Final state should match last configuration (i=4 is even, so enabled) status = scheduler.get_status() assert status["is_running"] is True # Last config (i=4) was enabled # Cleanup if scheduler._is_running: await scheduler.stop() @pytest.mark.asyncio async def test_interval_change_during_active_scan( self, scheduler_service, mock_config_service, mock_anime_service, mock_websocket_service ): """Test configuration change during active scan.""" await scheduler_service.start() # Make rescan slow async def slow_rescan(): await asyncio.sleep(1) mock_anime_service.rescan.side_effect = slow_rescan # Start a rescan task = asyncio.create_task(scheduler_service._perform_rescan()) # Change interval while scan is in progress await asyncio.sleep(0.2) new_config = AppConfig( scheduler=SchedulerConfig( enabled=True, interval_minutes=5 ) ) mock_config_service.load_config.return_value = new_config # Reload config (should restart scheduler) await scheduler_service.reload_config() # Wait for scan to complete await task # Verify new interval is applied assert scheduler_service._config.interval_minutes == 5 # Cleanup await scheduler_service.stop()