Files
Aniworld/tests/unit/test_scheduler_service.py
Lukas 9a20541598 feat(NFO): add TMDB search fallback with alt_titles support
- New _search_with_fallback() method tries multiple strategies:
  1. Primary query with year filter (de-DE locale)
  2. Alternative titles with ja-JP / en-US locales
  3. English search (en-US)
  4. Search without year constraint
  5. Punctuation-normalized query
- create_nfo() accepts new alt_titles param for Japanese/title fallback
- Better match rate for anime with non-English titles

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-23 21:57:00 +02:00

562 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Unit tests for SchedulerService (APScheduler-based implementation).
Covers:
- _build_cron_trigger()
- start() / stop() with APScheduler mocking
- reload_config()
- get_status()
- _perform_rescan() with auto-download
- Error handling and edge cases
"""
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, Mock, call, patch
import pytest
from apscheduler.triggers.cron import CronTrigger
from src.server.models.config import AppConfig, SchedulerConfig
from src.server.services.scheduler_service import (
_JOB_ID,
SchedulerService,
SchedulerServiceError,
get_scheduler_service,
reset_scheduler_service,
)
# ---------------------------------------------------------------------------
# Shared fixtures
# ---------------------------------------------------------------------------
ALL_DAYS = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"]
def _make_app_config(**scheduler_kwargs) -> AppConfig:
return AppConfig(scheduler=SchedulerConfig(**scheduler_kwargs))
@pytest.fixture
def mock_config_service():
with patch("src.server.services.scheduler_service.get_config_service") as mock:
svc = Mock()
svc.load_config.return_value = _make_app_config(
enabled=True,
schedule_time="03:00",
schedule_days=ALL_DAYS,
)
mock.return_value = svc
yield svc
@pytest.fixture
def scheduler_service():
reset_scheduler_service()
yield SchedulerService()
reset_scheduler_service()
# ---------------------------------------------------------------------------
# 12.1 _build_cron_trigger
# ---------------------------------------------------------------------------
class TestBuildCronTrigger:
def test_standard_case(self, scheduler_service):
scheduler_service._config = SchedulerConfig(
schedule_time="03:00",
schedule_days=["mon", "wed", "fri"],
)
trigger = scheduler_service._build_cron_trigger()
assert isinstance(trigger, CronTrigger)
fields = {f.name: str(f) for f in trigger.fields}
assert fields["hour"] == "3"
assert fields["minute"] == "0"
assert "mon" in fields["day_of_week"]
assert "fri" in fields["day_of_week"]
def test_all_days(self, scheduler_service):
scheduler_service._config = SchedulerConfig(
schedule_time="23:59",
schedule_days=ALL_DAYS,
)
trigger = scheduler_service._build_cron_trigger()
assert trigger is not None
fields = {f.name: str(f) for f in trigger.fields}
for day in ALL_DAYS:
assert day in fields["day_of_week"]
def test_empty_days_returns_none(self, scheduler_service):
scheduler_service._config = SchedulerConfig(
schedule_time="03:00",
schedule_days=[],
)
assert scheduler_service._build_cron_trigger() is None
def test_no_config_returns_none(self, scheduler_service):
scheduler_service._config = None
assert scheduler_service._build_cron_trigger() is None
# ---------------------------------------------------------------------------
# 12.2 start() normal case
# ---------------------------------------------------------------------------
class TestStart:
@pytest.mark.asyncio
async def test_start_adds_job_and_starts_scheduler(
self, scheduler_service, mock_config_service
):
with patch(
"src.server.services.scheduler_service.AsyncIOScheduler"
) as MockScheduler:
mock_sched = MagicMock()
mock_sched.running = False
MockScheduler.return_value = mock_sched
await scheduler_service.start()
mock_sched.add_job.assert_called_once()
call_kwargs = mock_sched.add_job.call_args
assert call_kwargs[1]["id"] == _JOB_ID
assert isinstance(call_kwargs[1]["trigger"], CronTrigger)
assert call_kwargs[1]["misfire_grace_time"] == 3600
assert call_kwargs[1]["coalesce"] is True
mock_sched.start.assert_called_once()
assert scheduler_service._is_running is True
@pytest.mark.asyncio
async def test_start_raises_if_already_running(self, scheduler_service):
scheduler_service._is_running = True
with pytest.raises(SchedulerServiceError, match="already running"):
await scheduler_service.start()
# ---------------------------------------------------------------------------
# 12.3 start() empty schedule_days
# ---------------------------------------------------------------------------
class TestStartEmptyDays:
@pytest.mark.asyncio
async def test_no_job_added_when_days_empty(self, scheduler_service):
with patch(
"src.server.services.scheduler_service.get_config_service"
) as mock_cs, patch(
"src.server.services.scheduler_service.AsyncIOScheduler"
) as MockScheduler:
svc = Mock()
svc.load_config.return_value = _make_app_config(
enabled=True, schedule_days=[]
)
mock_cs.return_value = svc
mock_sched = MagicMock()
MockScheduler.return_value = mock_sched
await scheduler_service.start()
mock_sched.add_job.assert_not_called()
mock_sched.start.assert_called_once()
assert scheduler_service._is_running is True
# ---------------------------------------------------------------------------
# 12.4 stop()
# ---------------------------------------------------------------------------
class TestStop:
@pytest.mark.asyncio
async def test_stop_shuts_down_scheduler(self, scheduler_service):
mock_sched = MagicMock()
mock_sched.running = True
scheduler_service._scheduler = mock_sched
scheduler_service._is_running = True
await scheduler_service.stop()
mock_sched.shutdown.assert_called_once_with(wait=False)
assert scheduler_service._is_running is False
@pytest.mark.asyncio
async def test_stop_when_not_running_is_noop(self, scheduler_service):
# Should not raise
await scheduler_service.stop()
assert scheduler_service._is_running is False
# ---------------------------------------------------------------------------
# 12.5 reload_config() reschedule
# ---------------------------------------------------------------------------
class TestReloadConfig:
def test_reschedule_on_time_change(self, scheduler_service):
mock_sched = MagicMock()
mock_sched.running = True
mock_sched.get_job.return_value = Mock() # job exists
scheduler_service._scheduler = mock_sched
scheduler_service._config = SchedulerConfig(
schedule_time="03:00", schedule_days=ALL_DAYS
)
new_config = SchedulerConfig(schedule_time="05:00", schedule_days=ALL_DAYS)
scheduler_service.reload_config(new_config)
mock_sched.reschedule_job.assert_called_once_with(
_JOB_ID, trigger=mock_sched.reschedule_job.call_args[1]["trigger"]
)
assert scheduler_service._config.schedule_time == "05:00"
def test_reschedule_on_days_change(self, scheduler_service):
mock_sched = MagicMock()
mock_sched.running = True
mock_sched.get_job.return_value = Mock()
scheduler_service._scheduler = mock_sched
scheduler_service._config = SchedulerConfig(
schedule_time="03:00", schedule_days=ALL_DAYS
)
new_config = SchedulerConfig(schedule_time="03:00", schedule_days=["mon"])
scheduler_service.reload_config(new_config)
mock_sched.reschedule_job.assert_called_once()
# ---------------------------------------------------------------------------
# 12.6 reload_config() empty days removes job
# ---------------------------------------------------------------------------
class TestReloadConfigEmptyDays:
def test_removes_job_when_days_empty(self, scheduler_service):
mock_sched = MagicMock()
mock_sched.running = True
mock_sched.get_job.return_value = Mock() # job exists
scheduler_service._scheduler = mock_sched
scheduler_service._config = SchedulerConfig(
schedule_time="03:00", schedule_days=ALL_DAYS
)
new_config = SchedulerConfig(schedule_time="03:00", schedule_days=[])
scheduler_service.reload_config(new_config)
mock_sched.remove_job.assert_called_once_with(_JOB_ID)
mock_sched.reschedule_job.assert_not_called()
# ---------------------------------------------------------------------------
# 12.7 _perform_rescan() with auto_download=True
# ---------------------------------------------------------------------------
class TestPerformRescanAutoDownload:
@pytest.mark.asyncio
async def test_auto_download_queues_missing_episodes(self, scheduler_service):
scheduler_service._config = SchedulerConfig(
auto_download_after_rescan=True,
schedule_time="03:00",
schedule_days=ALL_DAYS,
)
mock_anime = MagicMock()
mock_anime.rescan = AsyncMock()
mock_anime._cached_list_missing.return_value = [
{"key": "series-a", "name": "Series A", "folder": "Series A",
"episodeDict": {"1": [1, 2, 3]}},
{"key": "series-b", "name": "Series B", "folder": "Series B",
"episodeDict": {}}, # no missing → should be skipped
]
mock_dl = MagicMock()
mock_dl.add_to_queue = AsyncMock(return_value=["id1", "id2", "id3"])
mock_dl.start_queue_processing = AsyncMock(return_value=None)
mock_ws = MagicMock()
mock_ws.manager.broadcast = AsyncMock()
with patch("src.server.utils.dependencies.get_anime_service", return_value=mock_anime), \
patch("src.server.utils.dependencies.get_download_service", return_value=mock_dl), \
patch("src.server.services.websocket_service.get_websocket_service", return_value=mock_ws):
await scheduler_service._perform_rescan()
mock_dl.add_to_queue.assert_called_once()
call_kwargs = mock_dl.add_to_queue.call_args[1]
assert len(call_kwargs["episodes"]) == 3
mock_dl.start_queue_processing.assert_called_once()
# Check auto_download_started was broadcast
calls = [str(c) for c in mock_ws.manager.broadcast.call_args_list]
assert any("auto_download_started" in c for c in calls)
# ---------------------------------------------------------------------------
# 12.8 _perform_rescan() with auto_download=False
# ---------------------------------------------------------------------------
class TestPerformRescanNoAutoDownload:
@pytest.mark.asyncio
async def test_no_download_when_disabled(self, scheduler_service):
scheduler_service._config = SchedulerConfig(
auto_download_after_rescan=False,
)
mock_anime = MagicMock()
mock_anime.rescan = AsyncMock()
mock_anime._cached_list_missing.return_value = [
{"key": "series-a", "name": "Series A", "folder": "Series A",
"episodeDict": {"1": [1, 2]}},
]
mock_dl = MagicMock()
mock_dl.add_to_queue = AsyncMock()
mock_ws = MagicMock()
mock_ws.manager.broadcast = AsyncMock()
with patch("src.server.utils.dependencies.get_anime_service", return_value=mock_anime), \
patch("src.server.utils.dependencies.get_download_service", return_value=mock_dl), \
patch("src.server.services.websocket_service.get_websocket_service", return_value=mock_ws):
await scheduler_service._perform_rescan()
mock_dl.add_to_queue.assert_not_called()
calls = [str(c) for c in mock_ws.manager.broadcast.call_args_list]
assert not any("auto_download_started" in c for c in calls)
# ---------------------------------------------------------------------------
# 12.9 Auto-download error handling
# ---------------------------------------------------------------------------
class TestAutoDownloadErrorHandling:
@pytest.mark.asyncio
async def test_download_error_broadcasts_and_does_not_crash(
self, scheduler_service
):
scheduler_service._config = SchedulerConfig(
auto_download_after_rescan=True,
)
mock_anime = MagicMock()
mock_anime.rescan = AsyncMock()
mock_anime._cached_list_missing.return_value = [
{"key": "series-a", "name": "Series A", "folder": "Series A",
"episodeDict": {"1": [1]}},
]
mock_dl = MagicMock()
mock_dl.add_to_queue = AsyncMock(side_effect=RuntimeError("boom"))
mock_ws = MagicMock()
mock_ws.manager.broadcast = AsyncMock()
with patch("src.server.utils.dependencies.get_anime_service", return_value=mock_anime), \
patch("src.server.utils.dependencies.get_download_service", return_value=mock_dl), \
patch("src.server.services.websocket_service.get_websocket_service", return_value=mock_ws):
# Should NOT raise
await scheduler_service._perform_rescan()
calls = [str(c) for c in mock_ws.manager.broadcast.call_args_list]
assert any("auto_download_error" in c for c in calls)
# Rescan itself succeeded — scan_in_progress must be False
assert scheduler_service._scan_in_progress is False
# ---------------------------------------------------------------------------
# 12.10 get_status() returns correct fields
# ---------------------------------------------------------------------------
class TestGetStatus:
def test_get_status_fields_when_not_running(self, scheduler_service):
scheduler_service._config = SchedulerConfig(
schedule_time="04:00",
schedule_days=["mon"],
auto_download_after_rescan=True,
folder_scan_enabled=True,
)
status = scheduler_service.get_status()
assert "is_running" in status
assert "next_run" in status
assert "last_run" in status
assert "schedule_time" in status
assert "schedule_days" in status
assert "auto_download_after_rescan" in status
assert "folder_scan_enabled" in status
assert status["schedule_time"] == "04:00"
assert status["schedule_days"] == ["mon"]
assert status["auto_download_after_rescan"] is True
assert status["folder_scan_enabled"] is True
assert status["is_running"] is False
assert status["next_run"] is None
# ---------------------------------------------------------------------------
# 12.11 _perform_rescan() with folder_scan_enabled=True
# ---------------------------------------------------------------------------
class TestPerformRescanFolderScan:
@pytest.mark.asyncio
async def test_folder_scan_called_when_enabled(self, scheduler_service):
scheduler_service._config = SchedulerConfig(
folder_scan_enabled=True,
schedule_time="03:00",
schedule_days=ALL_DAYS,
)
mock_anime = MagicMock()
mock_anime.rescan = AsyncMock()
mock_anime._cached_list_missing.return_value = []
mock_ws = MagicMock()
mock_ws.manager.broadcast = AsyncMock()
mock_folder_scan = AsyncMock()
with patch("src.server.utils.dependencies.get_anime_service", return_value=mock_anime), \
patch("src.server.services.websocket_service.get_websocket_service", return_value=mock_ws), \
patch("src.server.services.folder_scan_service.FolderScanService") as MockFSS:
MockFSS.return_value.run_folder_scan = mock_folder_scan
await scheduler_service._perform_rescan()
mock_folder_scan.assert_awaited_once()
@pytest.mark.asyncio
async def test_folder_scan_skipped_when_disabled(self, scheduler_service):
scheduler_service._config = SchedulerConfig(
folder_scan_enabled=False,
schedule_time="03:00",
schedule_days=ALL_DAYS,
)
mock_anime = MagicMock()
mock_anime.rescan = AsyncMock()
mock_anime._cached_list_missing.return_value = []
mock_ws = MagicMock()
mock_ws.manager.broadcast = AsyncMock()
mock_folder_scan = AsyncMock()
with patch("src.server.utils.dependencies.get_anime_service", return_value=mock_anime), \
patch("src.server.services.websocket_service.get_websocket_service", return_value=mock_ws), \
patch("src.server.services.folder_scan_service.FolderScanService") as MockFSS:
MockFSS.return_value.run_folder_scan = mock_folder_scan
await scheduler_service._perform_rescan()
mock_folder_scan.assert_not_called()
@pytest.mark.asyncio
async def test_folder_scan_error_broadcasts_and_does_not_crash(self, scheduler_service):
scheduler_service._config = SchedulerConfig(
folder_scan_enabled=True,
schedule_time="03:00",
schedule_days=ALL_DAYS,
)
mock_anime = MagicMock()
mock_anime.rescan = AsyncMock()
mock_anime._cached_list_missing.return_value = []
mock_ws = MagicMock()
mock_ws.manager.broadcast = AsyncMock()
mock_folder_scan = AsyncMock(side_effect=RuntimeError("folder scan boom"))
with patch("src.server.utils.dependencies.get_anime_service", return_value=mock_anime), \
patch("src.server.services.websocket_service.get_websocket_service", return_value=mock_ws), \
patch("src.server.services.folder_scan_service.FolderScanService") as MockFSS:
MockFSS.return_value.run_folder_scan = mock_folder_scan
# Should NOT raise
await scheduler_service._perform_rescan()
mock_folder_scan.assert_awaited_once()
calls = [str(c) for c in mock_ws.manager.broadcast.call_args_list]
assert any("folder_scan_error" in c for c in calls)
assert scheduler_service._scan_in_progress is False
# ---------------------------------------------------------------------------
# Singleton helpers
# ---------------------------------------------------------------------------
class TestSingletonHelpers:
def test_get_scheduler_service_returns_same_instance(self):
reset_scheduler_service()
svc1 = get_scheduler_service()
svc2 = get_scheduler_service()
assert svc1 is svc2
def test_reset_clears_singleton(self):
get_scheduler_service()
reset_scheduler_service()
svc = get_scheduler_service()
assert svc is not None # fresh instance
# ---------------------------------------------------------------------------
# 12.12 Persistent job store — SQLAlchemyJobStore passed to AsyncIOScheduler
# ---------------------------------------------------------------------------
class TestPersistentJobStore:
@pytest.mark.asyncio
async def test_start_creates_scheduler_with_sqlalchemy_jobstore(
self, scheduler_service, mock_config_service
):
with patch(
"src.server.services.scheduler_service.AsyncIOScheduler"
) as MockScheduler:
mock_sched = MagicMock()
mock_sched.running = False
MockScheduler.return_value = mock_sched
await scheduler_service.start()
MockScheduler.assert_called_once()
call_kwargs = MockScheduler.call_args
jobstores = call_kwargs[1]["jobstores"]
assert "default" in jobstores
# Verify it's a SQLAlchemyJobStore (class check via module name)
assert "sqlalchemy" in type(jobstores["default"]).__module__
@pytest.mark.asyncio
async def test_job_options_include_misfire_grace_and_coalesce(
self, scheduler_service, mock_config_service
):
with patch(
"src.server.services.scheduler_service.AsyncIOScheduler"
) as MockScheduler:
mock_sched = MagicMock()
mock_sched.running = False
MockScheduler.return_value = mock_sched
await scheduler_service.start()
call_kwargs = mock_sched.add_job.call_args
assert call_kwargs[1]["misfire_grace_time"] == 3600
assert call_kwargs[1]["coalesce"] is True
# ---------------------------------------------------------------------------
# 12.13 Startup recovery — next run logged after start()
# ---------------------------------------------------------------------------
class TestStartupRecovery:
@pytest.mark.asyncio
async def test_start_logs_next_run_time(
self, scheduler_service, mock_config_service
):
with patch(
"src.server.services.scheduler_service.AsyncIOScheduler"
) as MockScheduler:
mock_job = MagicMock()
next_run_dt = datetime(2026, 5, 25, 3, 0, tzinfo=timezone.utc)
mock_job.next_run_time = next_run_dt
mock_sched = MagicMock()
mock_sched.running = False
mock_sched.get_job.return_value = mock_job
MockScheduler.return_value = mock_sched
with patch(
"src.server.services.scheduler_service.logger"
) as mock_logger:
await scheduler_service.start()
# Check that next_run was logged
info_calls = [str(c) for c in mock_logger.info.call_args_list]
assert any("next_run" in c for c in info_calls)