"""Unit tests for SchedulerService (APScheduler-based implementation). Covers: - _build_cron_trigger() - start() / stop() with APScheduler mocking - reload_config() - get_status() - _perform_rescan() with auto-download - Error handling and edge cases """ from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock, Mock, call, patch import pytest from apscheduler.triggers.cron import CronTrigger from src.server.models.config import AppConfig, SchedulerConfig from src.server.services.scheduler_service import ( _JOB_ID, SchedulerService, SchedulerServiceError, get_scheduler_service, reset_scheduler_service, ) # --------------------------------------------------------------------------- # Shared fixtures # --------------------------------------------------------------------------- ALL_DAYS = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"] def _make_app_config(**scheduler_kwargs) -> AppConfig: return AppConfig(scheduler=SchedulerConfig(**scheduler_kwargs)) @pytest.fixture def mock_config_service(): with patch("src.server.services.scheduler_service.get_config_service") as mock: svc = Mock() svc.load_config.return_value = _make_app_config( enabled=True, schedule_time="03:00", schedule_days=ALL_DAYS, ) mock.return_value = svc yield svc @pytest.fixture def scheduler_service(): reset_scheduler_service() yield SchedulerService() reset_scheduler_service() # --------------------------------------------------------------------------- # 12.1 _build_cron_trigger # --------------------------------------------------------------------------- class TestBuildCronTrigger: def test_standard_case(self, scheduler_service): scheduler_service._config = SchedulerConfig( schedule_time="03:00", schedule_days=["mon", "wed", "fri"], ) trigger = scheduler_service._build_cron_trigger() assert isinstance(trigger, CronTrigger) fields = {f.name: str(f) for f in trigger.fields} assert fields["hour"] == "3" assert fields["minute"] == "0" assert "mon" in fields["day_of_week"] assert "fri" in fields["day_of_week"] def test_all_days(self, scheduler_service): scheduler_service._config = SchedulerConfig( schedule_time="23:59", schedule_days=ALL_DAYS, ) trigger = scheduler_service._build_cron_trigger() assert trigger is not None fields = {f.name: str(f) for f in trigger.fields} for day in ALL_DAYS: assert day in fields["day_of_week"] def test_empty_days_returns_none(self, scheduler_service): scheduler_service._config = SchedulerConfig( schedule_time="03:00", schedule_days=[], ) assert scheduler_service._build_cron_trigger() is None def test_no_config_returns_none(self, scheduler_service): scheduler_service._config = None assert scheduler_service._build_cron_trigger() is None # --------------------------------------------------------------------------- # 12.2 start() – normal case # --------------------------------------------------------------------------- class TestStart: @pytest.mark.asyncio async def test_start_adds_job_and_starts_scheduler( self, scheduler_service, mock_config_service ): with patch( "src.server.services.scheduler_service.AsyncIOScheduler" ) as MockScheduler: mock_sched = MagicMock() mock_sched.running = False MockScheduler.return_value = mock_sched await scheduler_service.start() mock_sched.add_job.assert_called_once() call_kwargs = mock_sched.add_job.call_args assert call_kwargs[1]["id"] == _JOB_ID assert isinstance(call_kwargs[1]["trigger"], CronTrigger) assert call_kwargs[1]["misfire_grace_time"] == 3600 assert call_kwargs[1]["coalesce"] is True mock_sched.start.assert_called_once() assert scheduler_service._is_running is True @pytest.mark.asyncio async def test_start_raises_if_already_running(self, scheduler_service): scheduler_service._is_running = True with pytest.raises(SchedulerServiceError, match="already running"): await scheduler_service.start() # --------------------------------------------------------------------------- # 12.3 start() – empty schedule_days # --------------------------------------------------------------------------- class TestStartEmptyDays: @pytest.mark.asyncio async def test_no_job_added_when_days_empty(self, scheduler_service): with patch( "src.server.services.scheduler_service.get_config_service" ) as mock_cs, patch( "src.server.services.scheduler_service.AsyncIOScheduler" ) as MockScheduler: svc = Mock() svc.load_config.return_value = _make_app_config( enabled=True, schedule_days=[] ) mock_cs.return_value = svc mock_sched = MagicMock() MockScheduler.return_value = mock_sched await scheduler_service.start() mock_sched.add_job.assert_not_called() mock_sched.start.assert_called_once() assert scheduler_service._is_running is True # --------------------------------------------------------------------------- # 12.4 stop() # --------------------------------------------------------------------------- class TestStop: @pytest.mark.asyncio async def test_stop_shuts_down_scheduler(self, scheduler_service): mock_sched = MagicMock() mock_sched.running = True scheduler_service._scheduler = mock_sched scheduler_service._is_running = True await scheduler_service.stop() mock_sched.shutdown.assert_called_once_with(wait=False) assert scheduler_service._is_running is False @pytest.mark.asyncio async def test_stop_when_not_running_is_noop(self, scheduler_service): # Should not raise await scheduler_service.stop() assert scheduler_service._is_running is False # --------------------------------------------------------------------------- # 12.5 reload_config() – reschedule # --------------------------------------------------------------------------- class TestReloadConfig: def test_reschedule_on_time_change(self, scheduler_service): mock_sched = MagicMock() mock_sched.running = True mock_sched.get_job.return_value = Mock() # job exists scheduler_service._scheduler = mock_sched scheduler_service._config = SchedulerConfig( schedule_time="03:00", schedule_days=ALL_DAYS ) new_config = SchedulerConfig(schedule_time="05:00", schedule_days=ALL_DAYS) scheduler_service.reload_config(new_config) mock_sched.reschedule_job.assert_called_once_with( _JOB_ID, trigger=mock_sched.reschedule_job.call_args[1]["trigger"] ) assert scheduler_service._config.schedule_time == "05:00" def test_reschedule_on_days_change(self, scheduler_service): mock_sched = MagicMock() mock_sched.running = True mock_sched.get_job.return_value = Mock() scheduler_service._scheduler = mock_sched scheduler_service._config = SchedulerConfig( schedule_time="03:00", schedule_days=ALL_DAYS ) new_config = SchedulerConfig(schedule_time="03:00", schedule_days=["mon"]) scheduler_service.reload_config(new_config) mock_sched.reschedule_job.assert_called_once() # --------------------------------------------------------------------------- # 12.6 reload_config() – empty days removes job # --------------------------------------------------------------------------- class TestReloadConfigEmptyDays: def test_removes_job_when_days_empty(self, scheduler_service): mock_sched = MagicMock() mock_sched.running = True mock_sched.get_job.return_value = Mock() # job exists scheduler_service._scheduler = mock_sched scheduler_service._config = SchedulerConfig( schedule_time="03:00", schedule_days=ALL_DAYS ) new_config = SchedulerConfig(schedule_time="03:00", schedule_days=[]) scheduler_service.reload_config(new_config) mock_sched.remove_job.assert_called_once_with(_JOB_ID) mock_sched.reschedule_job.assert_not_called() # --------------------------------------------------------------------------- # 12.7 _perform_rescan() with auto_download=True # --------------------------------------------------------------------------- class TestPerformRescanAutoDownload: @pytest.mark.asyncio async def test_auto_download_queues_missing_episodes(self, scheduler_service): scheduler_service._config = SchedulerConfig( auto_download_after_rescan=True, schedule_time="03:00", schedule_days=ALL_DAYS, ) mock_anime = MagicMock() mock_anime.rescan = AsyncMock() mock_anime._cached_list_missing.return_value = [ {"key": "series-a", "name": "Series A", "folder": "Series A", "episodeDict": {"1": [1, 2, 3]}}, {"key": "series-b", "name": "Series B", "folder": "Series B", "episodeDict": {}}, # no missing → should be skipped ] mock_dl = MagicMock() mock_dl.add_to_queue = AsyncMock(return_value=["id1", "id2", "id3"]) mock_dl.start_queue_processing = AsyncMock(return_value=None) mock_ws = MagicMock() mock_ws.manager.broadcast = AsyncMock() with patch("src.server.utils.dependencies.get_anime_service", return_value=mock_anime), \ patch("src.server.utils.dependencies.get_download_service", return_value=mock_dl), \ patch("src.server.services.websocket_service.get_websocket_service", return_value=mock_ws): await scheduler_service._perform_rescan() mock_dl.add_to_queue.assert_called_once() call_kwargs = mock_dl.add_to_queue.call_args[1] assert len(call_kwargs["episodes"]) == 3 mock_dl.start_queue_processing.assert_called_once() # Check auto_download_started was broadcast calls = [str(c) for c in mock_ws.manager.broadcast.call_args_list] assert any("auto_download_started" in c for c in calls) # --------------------------------------------------------------------------- # 12.8 _perform_rescan() with auto_download=False # --------------------------------------------------------------------------- class TestPerformRescanNoAutoDownload: @pytest.mark.asyncio async def test_no_download_when_disabled(self, scheduler_service): scheduler_service._config = SchedulerConfig( auto_download_after_rescan=False, ) mock_anime = MagicMock() mock_anime.rescan = AsyncMock() mock_anime._cached_list_missing.return_value = [ {"key": "series-a", "name": "Series A", "folder": "Series A", "episodeDict": {"1": [1, 2]}}, ] mock_dl = MagicMock() mock_dl.add_to_queue = AsyncMock() mock_ws = MagicMock() mock_ws.manager.broadcast = AsyncMock() with patch("src.server.utils.dependencies.get_anime_service", return_value=mock_anime), \ patch("src.server.utils.dependencies.get_download_service", return_value=mock_dl), \ patch("src.server.services.websocket_service.get_websocket_service", return_value=mock_ws): await scheduler_service._perform_rescan() mock_dl.add_to_queue.assert_not_called() calls = [str(c) for c in mock_ws.manager.broadcast.call_args_list] assert not any("auto_download_started" in c for c in calls) # --------------------------------------------------------------------------- # 12.9 Auto-download error handling # --------------------------------------------------------------------------- class TestAutoDownloadErrorHandling: @pytest.mark.asyncio async def test_download_error_broadcasts_and_does_not_crash( self, scheduler_service ): scheduler_service._config = SchedulerConfig( auto_download_after_rescan=True, ) mock_anime = MagicMock() mock_anime.rescan = AsyncMock() mock_anime._cached_list_missing.return_value = [ {"key": "series-a", "name": "Series A", "folder": "Series A", "episodeDict": {"1": [1]}}, ] mock_dl = MagicMock() mock_dl.add_to_queue = AsyncMock(side_effect=RuntimeError("boom")) mock_ws = MagicMock() mock_ws.manager.broadcast = AsyncMock() with patch("src.server.utils.dependencies.get_anime_service", return_value=mock_anime), \ patch("src.server.utils.dependencies.get_download_service", return_value=mock_dl), \ patch("src.server.services.websocket_service.get_websocket_service", return_value=mock_ws): # Should NOT raise await scheduler_service._perform_rescan() calls = [str(c) for c in mock_ws.manager.broadcast.call_args_list] assert any("auto_download_error" in c for c in calls) # Rescan itself succeeded — scan_in_progress must be False assert scheduler_service._scan_in_progress is False # --------------------------------------------------------------------------- # 12.10 get_status() returns correct fields # --------------------------------------------------------------------------- class TestGetStatus: def test_get_status_fields_when_not_running(self, scheduler_service): scheduler_service._config = SchedulerConfig( schedule_time="04:00", schedule_days=["mon"], auto_download_after_rescan=True, folder_scan_enabled=True, ) status = scheduler_service.get_status() assert "is_running" in status assert "next_run" in status assert "last_run" in status assert "schedule_time" in status assert "schedule_days" in status assert "auto_download_after_rescan" in status assert "folder_scan_enabled" in status assert status["schedule_time"] == "04:00" assert status["schedule_days"] == ["mon"] assert status["auto_download_after_rescan"] is True assert status["folder_scan_enabled"] is True assert status["is_running"] is False assert status["next_run"] is None # --------------------------------------------------------------------------- # 12.11 _perform_rescan() with folder_scan_enabled=True # --------------------------------------------------------------------------- class TestPerformRescanFolderScan: @pytest.mark.asyncio async def test_folder_scan_called_when_enabled(self, scheduler_service): scheduler_service._config = SchedulerConfig( folder_scan_enabled=True, schedule_time="03:00", schedule_days=ALL_DAYS, ) mock_anime = MagicMock() mock_anime.rescan = AsyncMock() mock_anime._cached_list_missing.return_value = [] mock_ws = MagicMock() mock_ws.manager.broadcast = AsyncMock() mock_folder_scan = AsyncMock() with patch("src.server.utils.dependencies.get_anime_service", return_value=mock_anime), \ patch("src.server.services.websocket_service.get_websocket_service", return_value=mock_ws), \ patch("src.server.services.folder_scan_service.FolderScanService") as MockFSS: MockFSS.return_value.run_folder_scan = mock_folder_scan await scheduler_service._perform_rescan() mock_folder_scan.assert_awaited_once() @pytest.mark.asyncio async def test_folder_scan_skipped_when_disabled(self, scheduler_service): scheduler_service._config = SchedulerConfig( folder_scan_enabled=False, schedule_time="03:00", schedule_days=ALL_DAYS, ) mock_anime = MagicMock() mock_anime.rescan = AsyncMock() mock_anime._cached_list_missing.return_value = [] mock_ws = MagicMock() mock_ws.manager.broadcast = AsyncMock() mock_folder_scan = AsyncMock() with patch("src.server.utils.dependencies.get_anime_service", return_value=mock_anime), \ patch("src.server.services.websocket_service.get_websocket_service", return_value=mock_ws), \ patch("src.server.services.folder_scan_service.FolderScanService") as MockFSS: MockFSS.return_value.run_folder_scan = mock_folder_scan await scheduler_service._perform_rescan() mock_folder_scan.assert_not_called() @pytest.mark.asyncio async def test_folder_scan_error_broadcasts_and_does_not_crash(self, scheduler_service): scheduler_service._config = SchedulerConfig( folder_scan_enabled=True, schedule_time="03:00", schedule_days=ALL_DAYS, ) mock_anime = MagicMock() mock_anime.rescan = AsyncMock() mock_anime._cached_list_missing.return_value = [] mock_ws = MagicMock() mock_ws.manager.broadcast = AsyncMock() mock_folder_scan = AsyncMock(side_effect=RuntimeError("folder scan boom")) with patch("src.server.utils.dependencies.get_anime_service", return_value=mock_anime), \ patch("src.server.services.websocket_service.get_websocket_service", return_value=mock_ws), \ patch("src.server.services.folder_scan_service.FolderScanService") as MockFSS: MockFSS.return_value.run_folder_scan = mock_folder_scan # Should NOT raise await scheduler_service._perform_rescan() mock_folder_scan.assert_awaited_once() calls = [str(c) for c in mock_ws.manager.broadcast.call_args_list] assert any("folder_scan_error" in c for c in calls) assert scheduler_service._scan_in_progress is False # --------------------------------------------------------------------------- # Singleton helpers # --------------------------------------------------------------------------- class TestSingletonHelpers: def test_get_scheduler_service_returns_same_instance(self): reset_scheduler_service() svc1 = get_scheduler_service() svc2 = get_scheduler_service() assert svc1 is svc2 def test_reset_clears_singleton(self): get_scheduler_service() reset_scheduler_service() svc = get_scheduler_service() assert svc is not None # fresh instance # --------------------------------------------------------------------------- # 12.12 Persistent job store — SQLAlchemyJobStore passed to AsyncIOScheduler # --------------------------------------------------------------------------- class TestPersistentJobStore: @pytest.mark.asyncio async def test_start_creates_scheduler_with_sqlalchemy_jobstore( self, scheduler_service, mock_config_service ): with patch( "src.server.services.scheduler_service.AsyncIOScheduler" ) as MockScheduler: mock_sched = MagicMock() mock_sched.running = False MockScheduler.return_value = mock_sched await scheduler_service.start() MockScheduler.assert_called_once() call_kwargs = MockScheduler.call_args jobstores = call_kwargs[1]["jobstores"] assert "default" in jobstores # Verify it's a SQLAlchemyJobStore (class check via module name) assert "sqlalchemy" in type(jobstores["default"]).__module__ @pytest.mark.asyncio async def test_job_options_include_misfire_grace_and_coalesce( self, scheduler_service, mock_config_service ): with patch( "src.server.services.scheduler_service.AsyncIOScheduler" ) as MockScheduler: mock_sched = MagicMock() mock_sched.running = False MockScheduler.return_value = mock_sched await scheduler_service.start() call_kwargs = mock_sched.add_job.call_args assert call_kwargs[1]["misfire_grace_time"] == 3600 assert call_kwargs[1]["coalesce"] is True # --------------------------------------------------------------------------- # 12.13 Startup recovery — next run logged after start() # --------------------------------------------------------------------------- class TestStartupRecovery: @pytest.mark.asyncio async def test_start_logs_next_run_time( self, scheduler_service, mock_config_service ): with patch( "src.server.services.scheduler_service.AsyncIOScheduler" ) as MockScheduler: mock_job = MagicMock() next_run_dt = datetime(2026, 5, 25, 3, 0, tzinfo=timezone.utc) mock_job.next_run_time = next_run_dt mock_sched = MagicMock() mock_sched.running = False mock_sched.get_job.return_value = mock_job MockScheduler.return_value = mock_sched with patch( "src.server.services.scheduler_service.logger" ) as mock_logger: await scheduler_service.start() info_calls = [str(c) for c in mock_logger.info.call_args_list] assert any("next_run" in str(c) or "Scheduler" in str(c) for c in info_calls) # --------------------------------------------------------------------------- # 12.8 ensure_started() – idempotent startup # --------------------------------------------------------------------------- class TestEnsureStarted: @pytest.mark.asyncio async def test_ensure_started_when_not_running( self, scheduler_service, mock_config_service ): """ensure_started() calls start() when scheduler is not running.""" # Mock start method scheduler_service.start = AsyncMock() # Call ensure_started await scheduler_service.ensure_started() # Verify start() was called exactly once scheduler_service.start.assert_called_once() @pytest.mark.asyncio async def test_ensure_started_when_already_running(self, scheduler_service): """ensure_started() returns immediately when already running (idempotent).""" # Set up as already running scheduler_service._is_running = True # Mock start method scheduler_service.start = AsyncMock() # Call ensure_started await scheduler_service.ensure_started() # Verify start() was NOT called scheduler_service.start.assert_not_called()