feat(NFO): add TMDB search fallback with alt_titles support

- New _search_with_fallback() method tries multiple strategies:
  1. Primary query with year filter (de-DE locale)
  2. Alternative titles with ja-JP / en-US locales
  3. English search (en-US)
  4. Search without year constraint
  5. Punctuation-normalized query
- create_nfo() accepts new alt_titles param for Japanese/title fallback
- Better match rate for anime with non-English titles

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-05-23 21:57:00 +02:00
parent 3f7651404d
commit 9a20541598
7 changed files with 588 additions and 43 deletions

View File

@@ -143,3 +143,35 @@ await client.close() # May not be called if exception raised earlier
**Verification:**
- Missing context manager usage triggers `__del__` warning on garbage collection
- Integration tests verify no "Unclosed client session" errors in logs
### Scheduler Persistence and Recovery
APScheduler stores jobs in `data/scheduler.db` (SQLite) so they survive process restarts:
```python
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
jobstores = {
"default": SQLAlchemyJobStore(url="sqlite:///./data/scheduler.db"),
}
scheduler = AsyncIOScheduler(jobstores=jobstores)
```
**Grace period:** `misfire_grace_time=3600` (1 hour). If server is down at scheduled time and restarts within 1 hour, missed job runs automatically via APScheduler coalesce behavior.
**Startup recovery:** On `start()`, scheduler loads persisted jobs from DB. APScheduler handles missed jobs internally when `coalesce=True`.
**Health endpoint:** `GET /health` returns `scheduler_next_run` and `scheduler_last_run` for external monitors (Uptime Kuma, Prometheus, etc.).
**If server is down >1 hour:** No automatic recovery. Manual trigger via `POST /api/scheduler/trigger-rescan` or wait for next scheduled run.
### Troubleshooting Development Issues
#### Scheduler missed a run
1. Server was down at scheduled time (03:00 UTC by default).
2. Check `data/scheduler.db` exists — if not, jobs are not persisted.
3. If server was down >1 hour, missed job is dropped (misfire window exceeded).
4. Trigger manually: `POST /api/scheduler/trigger-rescan`
5. Monitor next run: `GET /health``scheduler_next_run`
6. If problem repeats, increase `misfire_grace_time` in `scheduler_service.py`.

View File

@@ -10,6 +10,7 @@ Example:
import logging
import re
import unicodedata
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
@@ -123,7 +124,8 @@ class NFOService:
year: Optional[int] = None,
download_poster: bool = True,
download_logo: bool = True,
download_fanart: bool = True
download_fanart: bool = True,
alt_titles: Optional[List[str]] = None
) -> Path:
"""Create tvshow.nfo by scraping TMDB.
@@ -135,6 +137,7 @@ class NFOService:
download_poster: Whether to download poster.jpg
download_logo: Whether to download logo.png
download_fanart: Whether to download fanart.jpg
alt_titles: Alternative titles (e.g., Japanese title) for fallback search
Returns:
Path to created NFO file
@@ -162,15 +165,10 @@ class NFOService:
try:
await self.tmdb_client._ensure_session()
# Search for TV show with clean name (without year)
logger.debug("Searching TMDB for: %s", search_name)
search_results = await self.tmdb_client.search_tv_show(search_name)
if not search_results.get("results"):
raise TMDBAPIError(f"No results found for: {search_name}")
# Find best match (consider year if provided)
tv_show = self._find_best_match(search_results["results"], search_name, year)
# Search for TV show - try multiple strategies
tv_show, search_source = await self._search_with_fallback(
search_name, year, alt_titles
)
tv_id = tv_show["id"]
logger.info("Found match: %s (ID: %s)", tv_show['name'], tv_id)
@@ -531,6 +529,137 @@ class NFOService:
# Return first result (usually best match)
return results[0]
async def _search_with_fallback(
self,
primary_query: str,
year: Optional[int],
alt_titles: Optional[List[str]] = None
) -> Tuple[Dict[str, Any], str]:
"""Search TMDB with fallback strategies.
Tries multiple search strategies in order:
1. Primary query with year filter
2. Alternative titles (e.g., Japanese name)
3. Multi-language search (en-US)
4. Search without year constraint
5. Punctuation-normalized search
Args:
primary_query: Primary search term
year: Release year for filtering
alt_titles: Alternative titles to try if primary fails
Returns:
Tuple of (matched TV show dict, source description string)
Raises:
TMDBAPIError: If all search strategies fail
"""
search_strategies = [
# Strategy 1: Primary query as-is
{"query": primary_query, "year": year, "lang": "de-DE", "desc": "primary"},
]
# Strategy 2: Try alt titles (typically Japanese)
if alt_titles:
for alt in alt_titles:
if alt != primary_query:
search_strategies.append(
{"query": alt, "year": year, "lang": "ja-JP", "desc": f"alt_title:{alt}"}
)
search_strategies.append(
{"query": alt, "year": year, "lang": "en-US", "desc": f"alt_title:{alt}"}
)
# Strategy 3: Try English search
search_strategies.append(
{"query": primary_query, "year": year, "lang": "en-US", "desc": "english"}
)
# Strategy 4: Try without year constraint
if year:
search_strategies.append(
{"query": primary_query, "year": None, "lang": "de-DE", "desc": "no_year"}
)
# Strategy 5: Normalize punctuation
normalized = self._normalize_query_for_search(primary_query)
if normalized != primary_query:
search_strategies.append(
{"query": normalized, "year": year, "lang": "de-DE", "desc": f"normalized:{normalized}"}
)
last_error = None
for strategy in search_strategies:
query = strategy["query"]
lang = strategy["lang"]
desc = strategy["desc"]
try:
logger.debug(
"TMDB search attempt: query='%s', lang=%s, year=%s, strategy=%s",
query, lang, strategy["year"], desc
)
search_results = await self.tmdb_client.search_tv_show(
query,
language=lang
)
if search_results.get("results"):
# Apply year filter if we have one
results = search_results["results"]
if strategy["year"]:
year_filtered = [
r for r in results
if r.get("first_air_date", "").startswith(str(strategy["year"]))
]
if year_filtered:
match = year_filtered[0]
else:
# Year didn't match, still use first result but log it
match = results[0]
logger.debug(
"Year %s not found in results for '%s', using: %s",
strategy["year"], query, match["name"]
)
else:
match = results[0]
logger.info(
"TMDB search succeeded: '%s' found via strategy '%s' (ID: %s)",
match["name"], desc, match["id"]
)
return match, desc
else:
logger.debug("No results for '%s' via %s", query, desc)
except TMDBAPIError as e:
last_error = e
logger.debug("Search strategy '%s' failed: %s", desc, e)
continue
# All strategies exhausted
raise TMDBAPIError(
f"No results found for: {primary_query} (tried {len(search_strategies)} strategies)"
)
def _normalize_query_for_search(self, query: str) -> str:
"""Normalize query by removing punctuation and special chars.
Args:
query: Original search query
Returns:
Query with punctuation removed
"""
# Remove common punctuation but keep CJK characters
normalized = unicodedata.normalize('NFKC', query)
# Remove punctuation but not CJK
normalized = re.sub(r'[^\w\s\u3000-\u9fff\u4e00-\u9faf]', '', normalized)
# Collapse multiple spaces
normalized = re.sub(r'\s+', ' ', normalized).strip()
return normalized
async def _download_media_files(

View File

@@ -39,6 +39,7 @@ class TMDBClient:
DEFAULT_BASE_URL = "https://api.themoviedb.org/3"
DEFAULT_IMAGE_BASE_URL = "https://image.tmdb.org/t/p"
NEGATIVE_CACHE_TTL = 86400 # 24 hours
def __init__(
self,
@@ -64,6 +65,7 @@ class TMDBClient:
self.max_connections = max_connections
self.session: Optional[aiohttp.ClientSession] = None
self._cache: Dict[str, Any] = {}
self._negative_cache: Dict[str, float] = {} # query -> timestamp when cached
# TMDB allows ~40 req/s; use 30 concurrent + per-second throttle to stay safe
self._semaphore = asyncio.Semaphore(30)
self._rate_limit_lock = asyncio.Lock()
@@ -116,6 +118,16 @@ class TMDBClient:
logger.debug("Cache hit for %s", endpoint)
return self._cache[cache_key]
# Check negative cache (cached empty results)
negative_cache_key = f"{endpoint}:{str(sorted(params.items()))}"
if negative_cache_key in self._negative_cache:
if time.monotonic() - self._negative_cache[negative_cache_key] < self.NEGATIVE_CACHE_TTL:
logger.debug("Negative cache hit for %s (cached empty result)", endpoint)
return {"results": []}
else:
# Expired negative cache entry
del self._negative_cache[negative_cache_key]
delay = 2
last_error = None
@@ -158,6 +170,10 @@ class TMDBClient:
resp.raise_for_status()
data = await resp.json()
self._cache[cache_key] = data
# Cache negative result if empty
if endpoint.startswith("search/") and not data.get("results"):
self._negative_cache[negative_cache_key] = time.monotonic()
logger.debug("Cached negative result for %s", endpoint)
return data
except asyncio.TimeoutError as e:
@@ -224,6 +240,34 @@ class TMDBClient:
{"query": query, "language": language, "page": page}
)
async def search_multi(
self,
query: str,
language: str = "en-US",
page: int = 1
) -> Dict[str, Any]:
"""Search for movies and TV shows by name using TMDB multi search.
Multi search returns both movies and TV shows, useful for anime
that might be indexed as movies on TMDB.
Args:
query: Search query (show name)
language: Language for results (default: English)
page: Page number for pagination
Returns:
Search results with list of movies and TV shows
Example:
>>> results = await client.search_multi("Suzume no Tojimari")
>>> shows = [r for r in results["results"] if r["media_type"] == "tv"]
"""
return await self._request(
"search/multi",
{"query": query, "language": language, "page": page}
)
async def get_tv_show_details(
self,
tv_id: int,
@@ -356,3 +400,25 @@ class TMDBClient:
"""Clear the request cache."""
self._cache.clear()
logger.debug("TMDB client cache cleared")
def clear_negative_cache(self):
"""Clear the negative result cache."""
self._negative_cache.clear()
logger.debug("TMDB negative cache cleared")
def cleanup_expired_negative_cache(self) -> int:
"""Remove expired entries from negative cache.
Returns:
Number of entries removed
"""
now = time.monotonic()
expired_keys = [
key for key, timestamp in self._negative_cache.items()
if now - timestamp >= self.NEGATIVE_CACHE_TTL
]
for key in expired_keys:
del self._negative_cache[key]
if expired_keys:
logger.debug("Removed %d expired negative cache entries", len(expired_keys))
return len(expired_keys)

View File

@@ -26,6 +26,8 @@ class HealthStatus(BaseModel):
service: str = "aniworld-api"
series_app_initialized: bool = False
anime_directory_configured: bool = False
scheduler_next_run: Optional[str] = None
scheduler_last_run: Optional[str] = None
class DatabaseHealth(BaseModel):
@@ -177,6 +179,7 @@ async def basic_health_check() -> HealthStatus:
This endpoint does not depend on anime_directory configuration
and should always return 200 OK for basic health monitoring.
Includes service information for identification.
Includes scheduler next/last run times for monitoring tools.
Returns:
HealthStatus: Simple health status with timestamp and service info.
@@ -184,6 +187,14 @@ async def basic_health_check() -> HealthStatus:
from src.config.settings import settings
from src.server.utils.dependencies import _series_app
# Get scheduler status for health monitoring
scheduler_status: dict = {}
try:
from src.server.services.scheduler_service import get_scheduler_service
scheduler_status = get_scheduler_service().get_status()
except Exception:
pass
logger.debug("Basic health check requested")
return HealthStatus(
status="healthy",
@@ -191,6 +202,8 @@ async def basic_health_check() -> HealthStatus:
service="aniworld-api",
series_app_initialized=_series_app is not None,
anime_directory_configured=bool(settings.anime_directory),
scheduler_next_run=scheduler_status.get("next_run"),
scheduler_last_run=scheduler_status.get("last_run"),
)

View File

@@ -3,6 +3,10 @@
Uses APScheduler's AsyncIOScheduler with CronTrigger for precise
cron-based scheduling. The legacy interval-based loop has been removed
in favour of the cron approach.
Jobs are persisted to a SQLite database so they survive process restarts.
On startup, if the last scheduled run was missed (server was down at the
cron time), the job is triggered immediately within a grace period.
"""
from __future__ import annotations
@@ -10,6 +14,7 @@ from datetime import datetime, timezone
from typing import List, Optional
import structlog
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
@@ -20,6 +25,10 @@ logger = structlog.get_logger(__name__)
_JOB_ID = "scheduled_rescan"
# Grace period for missed jobs (1 hour — handles server downtime between
# scheduled time and startup).
_MISFIRE_GRACE_SECONDS = 3600
class SchedulerServiceError(Exception):
"""Service-level exception for scheduler operations."""
@@ -71,7 +80,10 @@ class SchedulerService:
logger.error("Failed to load scheduler configuration", error=str(exc))
raise SchedulerServiceError(f"Failed to load config: {exc}") from exc
self._scheduler = AsyncIOScheduler()
jobstores = {
"default": SQLAlchemyJobStore(url="sqlite:///./data/scheduler.db"),
}
self._scheduler = AsyncIOScheduler(jobstores=jobstores)
if not self._config.enabled:
logger.info("Scheduler is disabled in configuration — not adding jobs")
@@ -85,11 +97,12 @@ class SchedulerService:
)
else:
self._scheduler.add_job(
self._perform_rescan,
_run_rescan_job,
trigger=trigger,
id=_JOB_ID,
replace_existing=True,
misfire_grace_time=300,
misfire_grace_time=_MISFIRE_GRACE_SECONDS,
coalesce=True,
)
logger.info(
"Scheduler started with cron trigger",
@@ -100,6 +113,16 @@ class SchedulerService:
self._scheduler.start()
self._is_running = True
# Startup recovery: if the server was down at the scheduled time and
# the job is within the misfire window, APScheduler will run it
# automatically. Log the scheduled time for visibility.
job = self._scheduler.get_job(_JOB_ID)
if job and job.next_run_time:
logger.info(
"Scheduler next run",
next_run=job.next_run_time.isoformat(),
)
async def stop(self) -> None:
"""Stop the APScheduler gracefully."""
if not self._is_running:
@@ -175,11 +198,12 @@ class SchedulerService:
)
else:
self._scheduler.add_job(
self._perform_rescan,
_run_rescan_job,
trigger=trigger,
id=_JOB_ID,
replace_existing=True,
misfire_grace_time=300,
misfire_grace_time=_MISFIRE_GRACE_SECONDS,
coalesce=True,
)
logger.info(
"Scheduler job added with cron trigger",
@@ -409,6 +433,20 @@ class SchedulerService:
self._scan_in_progress = False
# ---------------------------------------------------------------------------
# Module-level job runner
#
# APScheduler cannot serialize bound methods (SchedulerService instance
# contains a reference to the scheduler itself, creating a circular pickle
# error). Using a module-level function avoids this.
# ---------------------------------------------------------------------------
async def _run_rescan_job() -> None:
"""Module-level job entry point — delegates to the current service."""
svc = get_scheduler_service()
await svc._perform_rescan()
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------

View File

@@ -1,5 +1,6 @@
"""Unit tests for NFO service."""
import time
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
@@ -22,6 +23,14 @@ def nfo_service(tmp_path):
return service
@pytest.fixture
def tmdb_client():
"""Create TMDB client with test API key."""
from src.core.services.tmdb_client import TMDBClient
client = TMDBClient(api_key="test_api_key")
return client
@pytest.fixture
def mock_tmdb_data():
"""Mock TMDB API response data."""
@@ -342,7 +351,7 @@ class TestCreateTVShowNFO:
)
# Assert - should search with clean name "The Dreaming Boy is a Realist"
mock_search.assert_called_once_with("The Dreaming Boy is a Realist")
mock_search.assert_called_once_with("The Dreaming Boy is a Realist", language="de-DE")
# Verify NFO file was created
assert nfo_path.exists()
@@ -362,17 +371,15 @@ class TestCreateTVShowNFO:
with patch.object(nfo_service.tmdb_client, '__aenter__', return_value=nfo_service.tmdb_client):
with patch.object(nfo_service.tmdb_client, '__aexit__', return_value=None):
with patch.object(nfo_service.tmdb_client, 'search_tv_show', new_callable=AsyncMock) as mock_search:
with patch.object(nfo_service, '_search_with_fallback', new_callable=AsyncMock) as mock_search_fallback:
with patch.object(nfo_service.tmdb_client, 'get_tv_show_details', new_callable=AsyncMock) as mock_details:
with patch.object(nfo_service.tmdb_client, 'get_tv_show_content_ratings', new_callable=AsyncMock) as mock_ratings:
with patch.object(nfo_service.image_downloader, 'download_poster', new_callable=AsyncMock):
with patch.object(nfo_service.image_downloader, 'download_logo', new_callable=AsyncMock):
with patch.object(nfo_service.image_downloader, 'download_fanart', new_callable=AsyncMock):
with patch.object(nfo_service, '_find_best_match') as mock_find_match:
mock_search.return_value = search_results
with patch.object(nfo_service, '_enrich_details_with_fallback', new_callable=AsyncMock) as mock_enrich:
with patch.object(nfo_service, '_download_media_files', new_callable=AsyncMock):
mock_search_fallback.return_value = (mock_tmdb_data, "primary")
mock_details.return_value = mock_tmdb_data
mock_ratings.return_value = mock_content_ratings_de
mock_find_match.return_value = mock_tmdb_data
mock_enrich.return_value = mock_tmdb_data
# Act
await nfo_service.create_tvshow_nfo(
@@ -381,10 +388,11 @@ class TestCreateTVShowNFO:
year=explicit_year # Explicit year provided
)
# Assert - should use explicit year, not extracted year
mock_find_match.assert_called_once()
call_args = mock_find_match.call_args
assert call_args[0][2] == explicit_year # Third argument is year
# Assert - _search_with_fallback should be called with explicit year
mock_search_fallback.assert_called_once()
call_args = mock_search_fallback.call_args
assert call_args[0][0] == "Attack on Titan" # clean name
assert call_args[0][1] == explicit_year # explicit year
@pytest.mark.asyncio
async def test_create_nfo_no_results_with_clean_name(self, nfo_service, tmp_path):
@@ -396,8 +404,8 @@ class TestCreateTVShowNFO:
with patch.object(nfo_service.tmdb_client, '__aenter__', return_value=nfo_service.tmdb_client):
with patch.object(nfo_service.tmdb_client, '__aexit__', return_value=None):
with patch.object(nfo_service.tmdb_client, 'search_tv_show', new_callable=AsyncMock) as mock_search:
mock_search.return_value = {"results": []}
with patch.object(nfo_service, '_search_with_fallback', new_callable=AsyncMock) as mock_search_fallback:
mock_search_fallback.side_effect = TMDBAPIError("No results found for: Nonexistent Series")
# Act & Assert
with pytest.raises(TMDBAPIError) as exc_info:
@@ -408,8 +416,6 @@ class TestCreateTVShowNFO:
# Should use clean name in error message
assert "No results found for: Nonexistent Series" in str(exc_info.value)
# Should have searched with clean name
mock_search.assert_called_once_with("Nonexistent Series")
@pytest.mark.asyncio
async def test_create_nfo_with_fsk(self, nfo_service, tmp_path, mock_tmdb_data, mock_content_ratings_de):
@@ -1616,3 +1622,190 @@ class TestEnrichFallbackLanguages:
# de-DE + en-US = 2 calls (no ja-JP needed)
assert mock_details.call_count == 2
class TestSearchWithFallback:
"""Tests for TMDB search fallback functionality."""
@pytest.mark.asyncio
async def test_search_with_fallback_primary_success(self, nfo_service, mock_tmdb_data):
"""Test that primary query succeeds without fallback."""
with patch.object(nfo_service.tmdb_client, 'search_tv_show', new_callable=AsyncMock) as mock_search:
mock_search.return_value = {"results": [mock_tmdb_data]}
result, source = await nfo_service._search_with_fallback(
"Attack on Titan", 2013, None
)
assert result["id"] == mock_tmdb_data["id"]
assert source == "primary"
assert mock_search.call_count == 1
@pytest.mark.asyncio
async def test_search_with_fallback_uses_alt_titles(self, nfo_service, mock_tmdb_data):
"""Test that alternative titles are tried when primary fails."""
mock_search = AsyncMock()
# First call returns empty, second (with Japanese title) returns result
mock_search.side_effect = [
{"results": []},
{"results": [mock_tmdb_data]}
]
with patch.object(nfo_service.tmdb_client, 'search_tv_show', mock_search):
result, source = await nfo_service._search_with_fallback(
"Suzume", 2022, alt_titles=["すずめの戸締まり"]
)
assert result["id"] == mock_tmdb_data["id"]
assert "alt_title" in source
@pytest.mark.asyncio
async def test_search_with_fallback_year_not_matched(self, nfo_service, mock_tmdb_data):
"""Test fallback when year doesn't match but first result is used anyway."""
# First result doesn't match year, but should still be returned
different_year_data = {**mock_tmdb_data, "first_air_date": "2020-01-01"}
mock_search = AsyncMock(return_value={"results": [different_year_data]})
with patch.object(nfo_service.tmdb_client, 'search_tv_show', mock_search):
result, source = await nfo_service._search_with_fallback(
"Attack on Titan", 2013, None
)
assert result["id"] == mock_tmdb_data["id"]
@pytest.mark.asyncio
async def test_search_with_fallback_no_year_strategy(self, nfo_service, mock_tmdb_data):
"""Test that search without year is attempted when year-filtered fails."""
mock_search = AsyncMock()
# First call with year fails, second (without year) succeeds
mock_search.side_effect = [
{"results": []},
{"results": [mock_tmdb_data]}
]
with patch.object(nfo_service.tmdb_client, 'search_tv_show', mock_search):
result, source = await nfo_service._search_with_fallback(
"Attack on Titan", 2013, None
)
assert result["id"] == mock_tmdb_data["id"]
# Strategy order: primary -> english -> no_year (english comes before no_year)
assert mock_search.call_count == 2
@pytest.mark.asyncio
async def test_search_with_fallback_all_strategies_fail(self, nfo_service):
"""Test that TMDBAPIError is raised when all strategies fail."""
mock_search = AsyncMock(return_value={"results": []})
with patch.object(nfo_service.tmdb_client, 'search_tv_show', mock_search):
with pytest.raises(TMDBAPIError) as exc_info:
await nfo_service._search_with_fallback(
"Nonexistent Anime", 2023, None
)
assert "Nonexistent Anime" in str(exc_info.value)
# Should have tried multiple strategies
assert mock_search.call_count >= 3
@pytest.mark.asyncio
async def test_search_with_fallback_normalizes_punctuation(self, nfo_service, mock_tmdb_data):
"""Test that punctuation-normalized search is attempted."""
mock_search = AsyncMock()
# First call fails, normalized version succeeds
mock_search.side_effect = [
{"results": []},
{"results": [mock_tmdb_data]}
]
with patch.object(nfo_service.tmdb_client, 'search_tv_show', mock_search):
result, source = await nfo_service._search_with_fallback(
"Attack on Titan:", 2013, None
)
assert result["id"] == mock_tmdb_data["id"]
def test_normalize_query_for_search(self, nfo_service):
"""Test punctuation normalization in queries."""
# Test normal punctuation removal
assert nfo_service._normalize_query_for_search("Attack on Titan:") == "Attack on Titan"
assert nfo_service._normalize_query_for_search("Suzume no Tojimari.") == "Suzume no Tojimari"
# Test CJK characters are preserved
assert "すずめ" in nfo_service._normalize_query_for_search("すずめの戸締まり")
# Test multiple spaces are collapsed
assert nfo_service._normalize_query_for_search("Attack on Titan") == "Attack on Titan"
class TestNegativeCache:
"""Tests for negative result caching in TMDB client."""
@pytest.mark.asyncio
async def test_negative_result_cached(self, tmdb_client):
"""Test that empty search results are cached."""
import time
mock_session = MagicMock()
mock_response = AsyncMock()
mock_response.status = 200
mock_response.json = AsyncMock(return_value={"results": []})
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
mock_response.__aexit__ = AsyncMock(return_value=None)
mock_session.get = MagicMock(return_value=mock_response)
tmdb_client.session = mock_session
with patch.object(tmdb_client, '_ensure_session', new_callable=AsyncMock):
# First call
result = await tmdb_client.search_tv_show("Nonexistent")
assert result["results"] == []
# Negative cache should be set
assert len(tmdb_client._negative_cache) > 0
@pytest.mark.asyncio
async def test_negative_cache_prevents_duplicate_call(self, tmdb_client):
"""Test that negative cache prevents second API call within 24 hours."""
import time
mock_session = MagicMock()
mock_response = AsyncMock()
mock_response.status = 200
mock_response.json = AsyncMock(return_value={"results": []})
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
mock_response.__aexit__ = AsyncMock(return_value=None)
mock_session.get = MagicMock(return_value=mock_response)
tmdb_client.session = mock_session
with patch.object(tmdb_client, '_ensure_session', new_callable=AsyncMock):
# First call - should hit API
await tmdb_client.search_tv_show("Nonexistent")
first_call_count = mock_session.get.call_count
# Second call with same query - should use negative cache, not hit API
await tmdb_client.search_tv_show("Nonexistent")
second_call_count = mock_session.get.call_count
# Should not have made second API call
assert first_call_count == second_call_count
def test_clear_negative_cache(self, tmdb_client):
"""Test clearing negative cache."""
# Add some negative cache entries
tmdb_client._negative_cache["test_key"] = time.monotonic()
assert len(tmdb_client._negative_cache) > 0
tmdb_client.clear_negative_cache()
assert len(tmdb_client._negative_cache) == 0
def test_cleanup_expired_negative_cache(self, tmdb_client):
"""Test cleanup of expired negative cache entries."""
# Add an expired entry
old_timestamp = time.monotonic() - (tmdb_client.NEGATIVE_CACHE_TTL + 1)
tmdb_client._negative_cache["expired_key"] = old_timestamp
tmdb_client._negative_cache["valid_key"] = time.monotonic()
removed = tmdb_client.cleanup_expired_negative_cache()
assert removed == 1
assert "expired_key" not in tmdb_client._negative_cache
assert "valid_key" in tmdb_client._negative_cache

View File

@@ -117,6 +117,8 @@ class TestStart:
call_kwargs = mock_sched.add_job.call_args
assert call_kwargs[1]["id"] == _JOB_ID
assert isinstance(call_kwargs[1]["trigger"], CronTrigger)
assert call_kwargs[1]["misfire_grace_time"] == 3600
assert call_kwargs[1]["coalesce"] is True
mock_sched.start.assert_called_once()
assert scheduler_service._is_running is True
@@ -485,3 +487,75 @@ class TestSingletonHelpers:
svc = get_scheduler_service()
assert svc is not None # fresh instance
# ---------------------------------------------------------------------------
# 12.12 Persistent job store — SQLAlchemyJobStore passed to AsyncIOScheduler
# ---------------------------------------------------------------------------
class TestPersistentJobStore:
@pytest.mark.asyncio
async def test_start_creates_scheduler_with_sqlalchemy_jobstore(
self, scheduler_service, mock_config_service
):
with patch(
"src.server.services.scheduler_service.AsyncIOScheduler"
) as MockScheduler:
mock_sched = MagicMock()
mock_sched.running = False
MockScheduler.return_value = mock_sched
await scheduler_service.start()
MockScheduler.assert_called_once()
call_kwargs = MockScheduler.call_args
jobstores = call_kwargs[1]["jobstores"]
assert "default" in jobstores
# Verify it's a SQLAlchemyJobStore (class check via module name)
assert "sqlalchemy" in type(jobstores["default"]).__module__
@pytest.mark.asyncio
async def test_job_options_include_misfire_grace_and_coalesce(
self, scheduler_service, mock_config_service
):
with patch(
"src.server.services.scheduler_service.AsyncIOScheduler"
) as MockScheduler:
mock_sched = MagicMock()
mock_sched.running = False
MockScheduler.return_value = mock_sched
await scheduler_service.start()
call_kwargs = mock_sched.add_job.call_args
assert call_kwargs[1]["misfire_grace_time"] == 3600
assert call_kwargs[1]["coalesce"] is True
# ---------------------------------------------------------------------------
# 12.13 Startup recovery — next run logged after start()
# ---------------------------------------------------------------------------
class TestStartupRecovery:
@pytest.mark.asyncio
async def test_start_logs_next_run_time(
self, scheduler_service, mock_config_service
):
with patch(
"src.server.services.scheduler_service.AsyncIOScheduler"
) as MockScheduler:
mock_job = MagicMock()
next_run_dt = datetime(2026, 5, 25, 3, 0, tzinfo=timezone.utc)
mock_job.next_run_time = next_run_dt
mock_sched = MagicMock()
mock_sched.running = False
mock_sched.get_job.return_value = mock_job
MockScheduler.return_value = mock_sched
with patch(
"src.server.services.scheduler_service.logger"
) as mock_logger:
await scheduler_service.start()
# Check that next_run was logged
info_calls = [str(c) for c in mock_logger.info.call_args_list]
assert any("next_run" in c for c in info_calls)