Files
Aniworld/tests/unit/test_scheduler_service.py
2026-06-05 17:18:00 +02:00

507 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Unit tests for SchedulerService (APScheduler-based implementation).
Covers:
- _build_cron_trigger()
- start() / stop() with APScheduler mocking
- reload_config()
- get_status()
- _perform_rescan() with auto-download
- Error handling and edge cases
"""
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, Mock, call, patch
import pytest
from apscheduler.triggers.cron import CronTrigger
from src.server.models.config import AppConfig, SchedulerConfig
from src.server.services.scheduler.scheduler_service import (
_JOB_ID,
SchedulerService,
SchedulerServiceError,
get_scheduler_service,
reset_scheduler_service,
)
# ---------------------------------------------------------------------------
# Shared fixtures
# ---------------------------------------------------------------------------
ALL_DAYS = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"]
def _make_app_config(**scheduler_kwargs) -> AppConfig:
return AppConfig(scheduler=SchedulerConfig(**scheduler_kwargs))
@pytest.fixture
def mock_config_service():
with patch("src.server.services.scheduler.scheduler_service.get_config_service") as mock:
svc = Mock()
svc.load_config.return_value = _make_app_config(
enabled=True,
schedule_time="03:00",
schedule_days=ALL_DAYS,
)
mock.return_value = svc
yield svc
@pytest.fixture
def scheduler_service():
reset_scheduler_service()
yield SchedulerService()
reset_scheduler_service()
# ---------------------------------------------------------------------------
# 12.1 _build_cron_trigger
# ---------------------------------------------------------------------------
class TestBuildCronTrigger:
def test_standard_case(self, scheduler_service):
scheduler_service._config = SchedulerConfig(
schedule_time="03:00",
schedule_days=["mon", "wed", "fri"],
)
trigger = scheduler_service._build_cron_trigger()
assert isinstance(trigger, CronTrigger)
fields = {f.name: str(f) for f in trigger.fields}
assert fields["hour"] == "3"
assert fields["minute"] == "0"
assert "mon" in fields["day_of_week"]
assert "fri" in fields["day_of_week"]
def test_all_days(self, scheduler_service):
scheduler_service._config = SchedulerConfig(
schedule_time="23:59",
schedule_days=ALL_DAYS,
)
trigger = scheduler_service._build_cron_trigger()
assert trigger is not None
fields = {f.name: str(f) for f in trigger.fields}
for day in ALL_DAYS:
assert day in fields["day_of_week"]
def test_empty_days_returns_none(self, scheduler_service):
scheduler_service._config = SchedulerConfig(
schedule_time="03:00",
schedule_days=[],
)
assert scheduler_service._build_cron_trigger() is None
def test_no_config_returns_none(self, scheduler_service):
scheduler_service._config = None
assert scheduler_service._build_cron_trigger() is None
# ---------------------------------------------------------------------------
# 12.2 start() normal case
# ---------------------------------------------------------------------------
class TestStart:
@pytest.mark.asyncio
async def test_start_adds_job_and_starts_scheduler(
self, scheduler_service, mock_config_service
):
with patch(
"src.server.services.scheduler.scheduler_service.AsyncIOScheduler"
) as MockScheduler:
mock_sched = MagicMock()
mock_sched.running = False
MockScheduler.return_value = mock_sched
await scheduler_service.start()
mock_sched.add_job.assert_called_once()
call_kwargs = mock_sched.add_job.call_args
assert call_kwargs[1]["id"] == _JOB_ID
assert isinstance(call_kwargs[1]["trigger"], CronTrigger)
assert call_kwargs[1]["misfire_grace_time"] == 3600
assert call_kwargs[1]["coalesce"] is True
mock_sched.start.assert_called_once()
assert scheduler_service._is_running is True
@pytest.mark.asyncio
async def test_start_raises_if_already_running(self, scheduler_service):
scheduler_service._is_running = True
with pytest.raises(SchedulerServiceError, match="already running"):
await scheduler_service.start()
# ---------------------------------------------------------------------------
# 12.3 start() empty schedule_days
# ---------------------------------------------------------------------------
class TestStartEmptyDays:
@pytest.mark.asyncio
async def test_no_job_added_when_days_empty(self, scheduler_service):
with patch(
"src.server.services.scheduler.scheduler_service.get_config_service"
) as mock_cs, patch(
"src.server.services.scheduler.scheduler_service.AsyncIOScheduler"
) as MockScheduler:
svc = Mock()
svc.load_config.return_value = _make_app_config(
enabled=True, schedule_days=[]
)
mock_cs.return_value = svc
mock_sched = MagicMock()
MockScheduler.return_value = mock_sched
await scheduler_service.start()
mock_sched.add_job.assert_not_called()
mock_sched.start.assert_called_once()
assert scheduler_service._is_running is True
# ---------------------------------------------------------------------------
# 12.4 stop()
# ---------------------------------------------------------------------------
class TestStop:
@pytest.mark.asyncio
async def test_stop_shuts_down_scheduler(self, scheduler_service):
mock_sched = MagicMock()
mock_sched.running = True
scheduler_service._scheduler = mock_sched
scheduler_service._is_running = True
await scheduler_service.stop()
mock_sched.shutdown.assert_called_once_with(wait=False)
assert scheduler_service._is_running is False
@pytest.mark.asyncio
async def test_stop_when_not_running_is_noop(self, scheduler_service):
# Should not raise
await scheduler_service.stop()
assert scheduler_service._is_running is False
# ---------------------------------------------------------------------------
# 12.5 reload_config() reschedule
# ---------------------------------------------------------------------------
class TestReloadConfig:
def test_reschedule_on_time_change(self, scheduler_service):
mock_sched = MagicMock()
mock_sched.running = True
mock_sched.get_job.return_value = Mock() # job exists
scheduler_service._scheduler = mock_sched
scheduler_service._config = SchedulerConfig(
schedule_time="03:00", schedule_days=ALL_DAYS
)
new_config = SchedulerConfig(schedule_time="05:00", schedule_days=ALL_DAYS)
scheduler_service.reload_config(new_config)
mock_sched.reschedule_job.assert_called_once_with(
_JOB_ID, trigger=mock_sched.reschedule_job.call_args[1]["trigger"]
)
assert scheduler_service._config.schedule_time == "05:00"
def test_reschedule_on_days_change(self, scheduler_service):
mock_sched = MagicMock()
mock_sched.running = True
mock_sched.get_job.return_value = Mock()
scheduler_service._scheduler = mock_sched
scheduler_service._config = SchedulerConfig(
schedule_time="03:00", schedule_days=ALL_DAYS
)
new_config = SchedulerConfig(schedule_time="03:00", schedule_days=["mon"])
scheduler_service.reload_config(new_config)
mock_sched.reschedule_job.assert_called_once()
# ---------------------------------------------------------------------------
# 12.6 reload_config() empty days removes job
# ---------------------------------------------------------------------------
class TestReloadConfigEmptyDays:
def test_removes_job_when_days_empty(self, scheduler_service):
mock_sched = MagicMock()
mock_sched.running = True
mock_sched.get_job.return_value = Mock() # job exists
scheduler_service._scheduler = mock_sched
scheduler_service._config = SchedulerConfig(
schedule_time="03:00", schedule_days=ALL_DAYS
)
new_config = SchedulerConfig(schedule_time="03:00", schedule_days=[])
scheduler_service.reload_config(new_config)
mock_sched.remove_job.assert_called_once_with(_JOB_ID)
mock_sched.reschedule_job.assert_not_called()
# ---------------------------------------------------------------------------
# 12.7 _perform_rescan() with auto_download=True
# ---------------------------------------------------------------------------
class TestPerformRescanAutoDownload:
@pytest.mark.asyncio
async def test_auto_download_queues_missing_episodes(self, scheduler_service):
scheduler_service._config = SchedulerConfig(
auto_download_after_rescan=True,
schedule_time="03:00",
schedule_days=ALL_DAYS,
)
mock_anime = MagicMock()
mock_anime.rescan = AsyncMock()
mock_anime._cached_list_missing.return_value = [
{"key": "series-a", "name": "Series A", "folder": "Series A",
"episodeDict": {"1": [1, 2, 3]}},
{"key": "series-b", "name": "Series B", "folder": "Series B",
"episodeDict": {}}, # no missing → should be skipped
]
mock_dl = MagicMock()
mock_dl.add_to_queue = AsyncMock(return_value=["id1", "id2", "id3"])
mock_dl.start_queue_processing = AsyncMock(return_value=None)
mock_ws = MagicMock()
mock_ws.manager.broadcast = AsyncMock()
with patch("src.server.utils.dependencies.get_anime_service", return_value=mock_anime), \
patch("src.server.utils.dependencies.get_download_service", return_value=mock_dl), \
patch("src.server.services.websocket_service.get_websocket_service", return_value=mock_ws):
await scheduler_service._perform_rescan()
mock_dl.add_to_queue.assert_called_once()
call_kwargs = mock_dl.add_to_queue.call_args[1]
assert len(call_kwargs["episodes"]) == 3
mock_dl.start_queue_processing.assert_called_once()
# Check auto_download_started was broadcast
calls = [str(c) for c in mock_ws.manager.broadcast.call_args_list]
assert any("auto_download_started" in c for c in calls)
# ---------------------------------------------------------------------------
# 12.8 _perform_rescan() with auto_download=False
# ---------------------------------------------------------------------------
class TestPerformRescanNoAutoDownload:
@pytest.mark.asyncio
async def test_no_download_when_disabled(self, scheduler_service):
scheduler_service._config = SchedulerConfig(
auto_download_after_rescan=False,
)
mock_anime = MagicMock()
mock_anime.rescan = AsyncMock()
mock_anime._cached_list_missing.return_value = [
{"key": "series-a", "name": "Series A", "folder": "Series A",
"episodeDict": {"1": [1, 2]}},
]
mock_dl = MagicMock()
mock_dl.add_to_queue = AsyncMock()
mock_ws = MagicMock()
mock_ws.manager.broadcast = AsyncMock()
with patch("src.server.utils.dependencies.get_anime_service", return_value=mock_anime), \
patch("src.server.utils.dependencies.get_download_service", return_value=mock_dl), \
patch("src.server.services.websocket_service.get_websocket_service", return_value=mock_ws):
await scheduler_service._perform_rescan()
mock_dl.add_to_queue.assert_not_called()
calls = [str(c) for c in mock_ws.manager.broadcast.call_args_list]
assert not any("auto_download_started" in c for c in calls)
# ---------------------------------------------------------------------------
# 12.9 Auto-download error handling
# ---------------------------------------------------------------------------
class TestAutoDownloadErrorHandling:
@pytest.mark.asyncio
async def test_download_error_broadcasts_and_does_not_crash(
self, scheduler_service
):
scheduler_service._config = SchedulerConfig(
auto_download_after_rescan=True,
)
mock_anime = MagicMock()
mock_anime.rescan = AsyncMock()
mock_anime._cached_list_missing.return_value = [
{"key": "series-a", "name": "Series A", "folder": "Series A",
"episodeDict": {"1": [1]}},
]
mock_dl = MagicMock()
mock_dl.add_to_queue = AsyncMock(side_effect=RuntimeError("boom"))
mock_ws = MagicMock()
mock_ws.manager.broadcast = AsyncMock()
with patch("src.server.utils.dependencies.get_anime_service", return_value=mock_anime), \
patch("src.server.utils.dependencies.get_download_service", return_value=mock_dl), \
patch("src.server.services.websocket_service.get_websocket_service", return_value=mock_ws):
# Should NOT raise
await scheduler_service._perform_rescan()
calls = [str(c) for c in mock_ws.manager.broadcast.call_args_list]
assert any("auto_download_error" in c for c in calls)
# Rescan itself succeeded — scan_in_progress must be False
assert scheduler_service._scan_in_progress is False
# ---------------------------------------------------------------------------
# 12.10 get_status() returns correct fields
# ---------------------------------------------------------------------------
class TestGetStatus:
def test_get_status_fields_when_not_running(self, scheduler_service):
scheduler_service._config = SchedulerConfig(
schedule_time="04:00",
schedule_days=["mon"],
auto_download_after_rescan=True,
)
status = scheduler_service.get_status()
assert "is_running" in status
assert "next_run" in status
assert "last_run" in status
assert "schedule_time" in status
assert "schedule_days" in status
assert "auto_download_after_rescan" in status
assert status["schedule_time"] == "04:00"
assert status["schedule_days"] == ["mon"]
assert status["auto_download_after_rescan"] is True
assert status["is_running"] is False
assert status["next_run"] is None
# ---------------------------------------------------------------------------
# Singleton helpers
# ---------------------------------------------------------------------------
class TestSingletonHelpers:
def test_get_scheduler_service_returns_same_instance(self):
reset_scheduler_service()
svc1 = get_scheduler_service()
svc2 = get_scheduler_service()
assert svc1 is svc2
def test_reset_clears_singleton(self):
get_scheduler_service()
reset_scheduler_service()
svc = get_scheduler_service()
assert svc is not None # fresh instance
# ---------------------------------------------------------------------------
# 12.12 In-memory job store — no separate scheduler.db needed
# ---------------------------------------------------------------------------
class TestInMemoryJobStore:
@pytest.mark.asyncio
async def test_start_creates_scheduler_without_jobstore_arg(
self, scheduler_service, mock_config_service
):
with patch(
"src.server.services.scheduler.scheduler_service.AsyncIOScheduler"
) as MockScheduler:
mock_sched = MagicMock()
mock_sched.running = False
MockScheduler.return_value = mock_sched
await scheduler_service.start()
MockScheduler.assert_called_once()
call_kwargs = MockScheduler.call_args
# No jobstores argument — uses default MemoryJobStore
if call_kwargs[1]:
assert "jobstores" not in call_kwargs[1]
@pytest.mark.asyncio
async def test_job_options_include_misfire_grace_and_coalesce(
self, scheduler_service, mock_config_service
):
with patch(
"src.server.services.scheduler.scheduler_service.AsyncIOScheduler"
) as MockScheduler:
mock_sched = MagicMock()
mock_sched.running = False
MockScheduler.return_value = mock_sched
await scheduler_service.start()
call_kwargs = mock_sched.add_job.call_args
assert call_kwargs[1]["misfire_grace_time"] == 3600
assert call_kwargs[1]["coalesce"] is True
# ---------------------------------------------------------------------------
# 12.13 Startup recovery — next run logged after start()
# ---------------------------------------------------------------------------
class TestStartupRecovery:
@pytest.mark.asyncio
async def test_start_logs_next_run_time(
self, scheduler_service, mock_config_service
):
with patch(
"src.server.services.scheduler.scheduler_service.AsyncIOScheduler"
) as MockScheduler:
mock_job = MagicMock()
next_run_dt = datetime(2026, 5, 25, 3, 0, tzinfo=timezone.utc)
mock_job.next_run_time = next_run_dt
mock_sched = MagicMock()
mock_sched.running = False
mock_sched.get_job.return_value = mock_job
MockScheduler.return_value = mock_sched
with patch(
"src.server.services.scheduler.scheduler_service.logger"
) as mock_logger:
await scheduler_service.start()
info_calls = [str(c) for c in mock_logger.info.call_args_list]
assert any("next_run" in str(c) or "Scheduler" in str(c) for c in info_calls)
# ---------------------------------------------------------------------------
# 12.8 ensure_started() idempotent startup
# ---------------------------------------------------------------------------
class TestEnsureStarted:
@pytest.mark.asyncio
async def test_ensure_started_when_not_running(
self, scheduler_service, mock_config_service
):
"""ensure_started() calls start() when scheduler is not running."""
# Mock start method
scheduler_service.start = AsyncMock()
# Call ensure_started
await scheduler_service.ensure_started()
# Verify start() was called exactly once
scheduler_service.start.assert_called_once()
@pytest.mark.asyncio
async def test_ensure_started_when_already_running(self, scheduler_service):
"""ensure_started() returns immediately when already running (idempotent)."""
# Set up as already running
scheduler_service._is_running = True
# Mock start method
scheduler_service.start = AsyncMock()
# Call ensure_started
await scheduler_service.ensure_started()
# Verify start() was NOT called
scheduler_service.start.assert_not_called()