"""Integration tests for scheduler workflow. Tests end-to-end scheduler workflows with the APScheduler-based SchedulerService, covering lifecycle, manual triggers, config reloading, WebSocket broadcasting, auto-download, and concurrency protection. """ import asyncio from datetime import datetime, timezone from unittest.mock import AsyncMock, Mock, patch import pytest from src.server.models.config import AppConfig, SchedulerConfig from src.server.services.scheduler_service import ( SchedulerService, SchedulerServiceError, _JOB_ID, get_scheduler_service, reset_scheduler_service, ) # --------------------------------------------------------------------------- # Shared fixtures # --------------------------------------------------------------------------- @pytest.fixture def mock_config_service(): """Patch get_config_service used by SchedulerService.start().""" with patch("src.server.services.scheduler_service.get_config_service") as mock: config_service = Mock() app_config = AppConfig( scheduler=SchedulerConfig( enabled=True, schedule_time="03:00", schedule_days=["mon", "tue", "wed", "thu", "fri", "sat", "sun"], auto_download_after_rescan=False, ) ) config_service.load_config.return_value = app_config mock.return_value = config_service yield config_service @pytest.fixture def mock_anime_service(): """Patch get_anime_service used inside _perform_rescan.""" with patch("src.server.utils.dependencies.get_anime_service") as mock: service = Mock() service.rescan = AsyncMock() mock.return_value = service yield service @pytest.fixture def mock_websocket_service(): """Patch get_websocket_service to capture broadcasts.""" with patch("src.server.services.websocket_service.get_websocket_service") as mock: service = Mock() service.manager = Mock() service.broadcasts = [] async def broadcast_side_effect(message): service.broadcasts.append(message) service.manager.broadcast = AsyncMock(side_effect=broadcast_side_effect) mock.return_value = service yield service @pytest.fixture async def scheduler_service(mock_config_service): """Fresh SchedulerService instance; stopped automatically after each test.""" reset_scheduler_service() svc = SchedulerService() yield svc if svc._is_running: await svc.stop() # --------------------------------------------------------------------------- # TestSchedulerLifecycle # --------------------------------------------------------------------------- class TestSchedulerLifecycle: """Tests for SchedulerService start/stop lifecycle.""" @pytest.mark.asyncio async def test_start_sets_is_running(self, scheduler_service): """start() sets _is_running to True.""" await scheduler_service.start() assert scheduler_service._is_running is True @pytest.mark.asyncio async def test_stop_clears_is_running(self, scheduler_service): """stop() sets _is_running to False.""" await scheduler_service.start() await scheduler_service.stop() assert scheduler_service._is_running is False @pytest.mark.asyncio async def test_start_twice_raises(self, scheduler_service): """Calling start() when already running raises SchedulerServiceError.""" await scheduler_service.start() with pytest.raises(SchedulerServiceError, match="already running"): await scheduler_service.start() @pytest.mark.asyncio async def test_stop_when_not_running_is_noop(self, scheduler_service): """stop() when not started does not raise.""" await scheduler_service.stop() # should not raise assert scheduler_service._is_running is False @pytest.mark.asyncio async def test_start_loads_config(self, scheduler_service, mock_config_service): """start() loads configuration via config_service.""" await scheduler_service.start() mock_config_service.load_config.assert_called_once() @pytest.mark.asyncio async def test_start_disabled_scheduler_no_job(self, mock_config_service): """Disabled scheduler starts but does not add an APScheduler job.""" mock_config_service.load_config.return_value = AppConfig( scheduler=SchedulerConfig(enabled=False) ) reset_scheduler_service() svc = SchedulerService() await svc.start() assert svc._is_running is True # No job should be registered assert svc._scheduler.get_job(_JOB_ID) is None await svc.stop() @pytest.mark.asyncio async def test_start_registers_apscheduler_job(self, scheduler_service): """Enabled scheduler registers a job with _JOB_ID.""" await scheduler_service.start() job = scheduler_service._scheduler.get_job(_JOB_ID) assert job is not None @pytest.mark.asyncio async def test_restart_after_stop(self, scheduler_service): """Service can be started again after being stopped.""" await scheduler_service.start() await scheduler_service.stop() await scheduler_service.start() assert scheduler_service._is_running is True # --------------------------------------------------------------------------- # TestSchedulerTriggerRescan # --------------------------------------------------------------------------- class TestSchedulerTriggerRescan: """Tests for manual trigger_rescan workflow.""" @pytest.mark.asyncio async def test_trigger_rescan_calls_anime_service( self, scheduler_service, mock_anime_service, mock_websocket_service ): """trigger_rescan() calls anime_service.rescan().""" await scheduler_service.start() result = await scheduler_service.trigger_rescan() assert result is True mock_anime_service.rescan.assert_called_once() @pytest.mark.asyncio async def test_trigger_rescan_records_last_run( self, scheduler_service, mock_anime_service, mock_websocket_service ): """trigger_rescan() updates _last_scan_time.""" await scheduler_service.start() await scheduler_service.trigger_rescan() assert scheduler_service._last_scan_time is not None assert isinstance(scheduler_service._last_scan_time, datetime) @pytest.mark.asyncio async def test_trigger_rescan_when_not_running_raises(self, scheduler_service): """trigger_rescan() without start() raises SchedulerServiceError.""" with pytest.raises(SchedulerServiceError, match="not running"): await scheduler_service.trigger_rescan() @pytest.mark.asyncio async def test_trigger_rescan_blocked_during_scan( self, scheduler_service, mock_anime_service, mock_websocket_service ): """Second trigger_rescan() returns False while a scan is in progress.""" async def slow_rescan(): await asyncio.sleep(0.3) mock_anime_service.rescan.side_effect = slow_rescan await scheduler_service.start() task = asyncio.create_task(scheduler_service._perform_rescan()) await asyncio.sleep(0.05) assert scheduler_service._scan_in_progress is True result = await scheduler_service.trigger_rescan() assert result is False await task @pytest.mark.asyncio async def test_trigger_rescan_scan_in_progress_false_after_completion( self, scheduler_service, mock_anime_service, mock_websocket_service ): """scan_in_progress returns to False after trigger_rescan completes.""" await scheduler_service.start() await scheduler_service.trigger_rescan() assert scheduler_service._scan_in_progress is False @pytest.mark.asyncio async def test_multiple_sequential_rescans( self, scheduler_service, mock_anime_service, mock_websocket_service ): """Three sequential manual rescans all execute successfully.""" await scheduler_service.start() for _ in range(3): result = await scheduler_service.trigger_rescan() assert result is True assert mock_anime_service.rescan.call_count == 3 # --------------------------------------------------------------------------- # TestSchedulerWebSocketBroadcasts # --------------------------------------------------------------------------- class TestSchedulerWebSocketBroadcasts: """Tests for WebSocket event emission during rescan.""" @pytest.mark.asyncio async def test_rescan_broadcasts_started_event( self, scheduler_service, mock_anime_service, mock_websocket_service ): """_perform_rescan() broadcasts 'scheduled_rescan_started'.""" await scheduler_service.start() await scheduler_service.trigger_rescan() event_types = [b["type"] for b in mock_websocket_service.broadcasts] assert "scheduled_rescan_started" in event_types @pytest.mark.asyncio async def test_rescan_broadcasts_completed_event( self, scheduler_service, mock_anime_service, mock_websocket_service ): """_perform_rescan() broadcasts 'scheduled_rescan_completed'.""" await scheduler_service.start() await scheduler_service.trigger_rescan() event_types = [b["type"] for b in mock_websocket_service.broadcasts] assert "scheduled_rescan_completed" in event_types @pytest.mark.asyncio async def test_rescan_broadcasts_error_on_failure( self, scheduler_service, mock_anime_service, mock_websocket_service ): """_perform_rescan() broadcasts 'scheduled_rescan_error' when rescan raises.""" mock_anime_service.rescan.side_effect = RuntimeError("DB failure") await scheduler_service.start() await scheduler_service._perform_rescan() error_events = [ b for b in mock_websocket_service.broadcasts if b["type"] == "scheduled_rescan_error" ] assert len(error_events) >= 1 @pytest.mark.asyncio async def test_rescan_completed_event_order( self, scheduler_service, mock_anime_service, mock_websocket_service ): """'started' event precedes 'completed' event in broadcast sequence.""" await scheduler_service.start() await scheduler_service.trigger_rescan() types = [b["type"] for b in mock_websocket_service.broadcasts] started_idx = types.index("scheduled_rescan_started") completed_idx = types.index("scheduled_rescan_completed") assert completed_idx > started_idx # --------------------------------------------------------------------------- # TestSchedulerGetStatus # --------------------------------------------------------------------------- class TestSchedulerGetStatus: """Tests for get_status() accuracy.""" @pytest.mark.asyncio async def test_status_not_running_before_start(self, scheduler_service): """is_running is False before start().""" status = scheduler_service.get_status() assert status["is_running"] is False assert status["scan_in_progress"] is False @pytest.mark.asyncio async def test_status_is_running_after_start(self, scheduler_service): """is_running is True after start().""" await scheduler_service.start() status = scheduler_service.get_status() assert status["is_running"] is True assert status["enabled"] is True @pytest.mark.asyncio async def test_status_last_run_populated_after_rescan( self, scheduler_service, mock_anime_service, mock_websocket_service ): """last_run is not None after a successful rescan.""" await scheduler_service.start() await scheduler_service.trigger_rescan() status = scheduler_service.get_status() assert status["last_run"] is not None @pytest.mark.asyncio async def test_status_scan_in_progress_during_slow_rescan( self, scheduler_service, mock_anime_service, mock_websocket_service ): """scan_in_progress is True while rescan is executing.""" async def slow_rescan(): await asyncio.sleep(0.3) mock_anime_service.rescan.side_effect = slow_rescan await scheduler_service.start() task = asyncio.create_task(scheduler_service._perform_rescan()) await asyncio.sleep(0.05) assert scheduler_service.get_status()["scan_in_progress"] is True await task @pytest.mark.asyncio async def test_status_is_running_false_after_stop(self, scheduler_service): """is_running is False after stop().""" await scheduler_service.start() await scheduler_service.stop() assert scheduler_service.get_status()["is_running"] is False @pytest.mark.asyncio async def test_status_includes_cron_fields(self, scheduler_service): """get_status() includes schedule_time, schedule_days, auto_download keys.""" await scheduler_service.start() status = scheduler_service.get_status() for key in ("schedule_time", "schedule_days", "auto_download_after_rescan", "next_run"): assert key in status # --------------------------------------------------------------------------- # TestReloadConfig # --------------------------------------------------------------------------- class TestReloadConfig: """Tests for reload_config() live reconfiguration.""" @pytest.mark.asyncio async def test_reload_reschedules_job_on_time_change(self, scheduler_service): """Changing schedule_time reschedules the existing job.""" await scheduler_service.start() assert scheduler_service._scheduler.get_job(_JOB_ID) is not None new_config = SchedulerConfig(enabled=True, schedule_time="08:00") scheduler_service.reload_config(new_config) job = scheduler_service._scheduler.get_job(_JOB_ID) assert job is not None assert scheduler_service._config.schedule_time == "08:00" @pytest.mark.asyncio async def test_reload_removes_job_when_disabled(self, scheduler_service): """Setting enabled=False removes the APScheduler job.""" await scheduler_service.start() assert scheduler_service._scheduler.get_job(_JOB_ID) is not None scheduler_service.reload_config( SchedulerConfig(enabled=False) ) assert scheduler_service._scheduler.get_job(_JOB_ID) is None @pytest.mark.asyncio async def test_reload_removes_job_when_days_empty(self, scheduler_service): """Empty schedule_days removes the APScheduler job.""" await scheduler_service.start() scheduler_service.reload_config( SchedulerConfig(enabled=True, schedule_days=[]) ) assert scheduler_service._scheduler.get_job(_JOB_ID) is None @pytest.mark.asyncio async def test_reload_adds_job_when_reenabling(self, scheduler_service): """Re-enabling after disable adds a new job.""" await scheduler_service.start() scheduler_service.reload_config(SchedulerConfig(enabled=False)) assert scheduler_service._scheduler.get_job(_JOB_ID) is None scheduler_service.reload_config( SchedulerConfig(enabled=True, schedule_time="09:00") ) assert scheduler_service._scheduler.get_job(_JOB_ID) is not None @pytest.mark.asyncio async def test_reload_updates_config_attribute(self, scheduler_service): """reload_config() updates self._config with the supplied instance.""" await scheduler_service.start() new = SchedulerConfig(enabled=True, schedule_time="14:30", schedule_days=["mon"]) scheduler_service.reload_config(new) assert scheduler_service._config.schedule_time == "14:30" assert scheduler_service._config.schedule_days == ["mon"] def test_reload_before_start_stores_config(self, scheduler_service): """reload_config() before start() stores config without raising.""" new = SchedulerConfig(enabled=True, schedule_time="22:00") scheduler_service.reload_config(new) assert scheduler_service._config.schedule_time == "22:00" # --------------------------------------------------------------------------- # TestAutoDownloadWorkflow # --------------------------------------------------------------------------- class TestAutoDownloadWorkflow: """Tests for auto-download-after-rescan integration.""" @pytest.mark.asyncio async def test_auto_download_triggered_when_enabled( self, scheduler_service, mock_anime_service, mock_websocket_service ): """_auto_download_missing() is called when auto_download_after_rescan=True.""" scheduler_service._config = SchedulerConfig( enabled=True, auto_download_after_rescan=True, ) scheduler_service._is_running = True called = [] async def fake_auto_download(): called.append(True) scheduler_service._auto_download_missing = fake_auto_download await scheduler_service._perform_rescan() assert called == [True] @pytest.mark.asyncio async def test_auto_download_not_called_when_disabled( self, scheduler_service, mock_anime_service, mock_websocket_service ): """_auto_download_missing() is NOT called when auto_download_after_rescan=False.""" scheduler_service._config = SchedulerConfig( enabled=True, auto_download_after_rescan=False, ) scheduler_service._is_running = True called = [] async def fake_auto_download(): called.append(True) scheduler_service._auto_download_missing = fake_auto_download await scheduler_service._perform_rescan() assert called == [] @pytest.mark.asyncio async def test_auto_download_error_broadcasts_event( self, scheduler_service, mock_anime_service, mock_websocket_service ): """Error in _auto_download_missing broadcasts 'auto_download_error'.""" scheduler_service._config = SchedulerConfig( enabled=True, auto_download_after_rescan=True, ) scheduler_service._is_running = True async def failing_auto_download(): raise RuntimeError("download failed") scheduler_service._auto_download_missing = failing_auto_download await scheduler_service._perform_rescan() error_events = [ b for b in mock_websocket_service.broadcasts if b["type"] == "auto_download_error" ] assert len(error_events) == 1 # --------------------------------------------------------------------------- # TestSchedulerSingletonHelpers # --------------------------------------------------------------------------- class TestSchedulerSingletonHelpers: """Tests for module-level singleton helpers.""" def test_get_scheduler_service_returns_same_instance(self): """get_scheduler_service() returns the same object on repeated calls.""" svc1 = get_scheduler_service() svc2 = get_scheduler_service() assert svc1 is svc2 def test_reset_clears_singleton(self): """reset_scheduler_service() causes get_scheduler_service() to return a new instance.""" svc1 = get_scheduler_service() reset_scheduler_service() svc2 = get_scheduler_service() assert svc1 is not svc2 @pytest.mark.asyncio async def test_state_persists_across_restart(self, mock_config_service): """Stopping and restarting loads config from service each time.""" reset_scheduler_service() svc = SchedulerService() await svc.start() original_time = svc._config.schedule_time assert svc._is_running is True await svc.stop() assert svc._is_running is False reset_scheduler_service() svc2 = SchedulerService() await svc2.start() assert svc2._is_running is True assert svc2._config.schedule_time == original_time await svc2.stop()