Files
Aniworld/tests/unit/test_tmdb_client.py
Lukas d358a07290 fix async handling in SerieScanner and add image_downloader cleanup
- SerieScanner.scan() now detects running event loop and uses create_task()
  when already in async context, avoids RuntimeError
- NFOService.close() now also closes image_downloader to prevent resource leaks
- Add integration tests for TMDBClient lifecycle management

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-27 20:47:29 +02:00

556 lines
20 KiB
Python

"""Unit tests for TMDB client."""
from unittest.mock import AsyncMock, MagicMock, patch
import aiohttp
import pytest
from aiohttp import ClientResponseError, ClientSession
from src.core.services.tmdb_client import TMDBAPIError, TMDBClient
@pytest.fixture
def tmdb_client():
"""Create TMDB client with test API key."""
return TMDBClient(api_key="test_api_key")
@pytest.fixture
def mock_response():
"""Create mock aiohttp response."""
mock = AsyncMock()
mock.status = 200
mock.json = AsyncMock(return_value={"success": True})
return mock
class TestTMDBClientInit:
"""Test TMDB client initialization."""
def test_init_with_api_key(self):
"""Test initialization with API key."""
client = TMDBClient(api_key="my_key")
assert client.api_key == "my_key"
assert client.base_url == "https://api.themoviedb.org/3"
assert client.image_base_url == "https://image.tmdb.org/t/p"
assert client.session is None
assert client._cache == {}
def test_init_sets_attributes(self):
"""Test all attributes are set correctly."""
client = TMDBClient(api_key="test")
assert hasattr(client, "api_key")
assert hasattr(client, "base_url")
assert hasattr(client, "image_base_url")
assert hasattr(client, "session")
assert hasattr(client, "_cache")
class TestTMDBClientContextManager:
"""Test TMDB client as context manager."""
@pytest.mark.asyncio
async def test_async_context_manager(self):
"""Test async context manager creates session."""
client = TMDBClient(api_key="test")
async with client as c:
assert c.session is not None
assert isinstance(c.session, ClientSession)
# Session should be closed after context
assert client.session is None
@pytest.mark.asyncio
async def test_close_closes_session(self):
"""Test close method closes session."""
client = TMDBClient(api_key="test")
await client.__aenter__()
assert client.session is not None
await client.close()
assert client.session is None
class TestTMDBClientSearchTVShow:
"""Test search_tv_show method."""
@pytest.mark.asyncio
async def test_search_tv_show_success(self, tmdb_client, mock_response):
"""Test successful TV show search."""
mock_response.json = AsyncMock(return_value={
"results": [
{"id": 1, "name": "Test Show"},
{"id": 2, "name": "Another Show"}
]
})
with patch.object(tmdb_client, "_request", return_value=mock_response.json.return_value):
result = await tmdb_client.search_tv_show("Test Show")
assert "results" in result
assert len(result["results"]) == 2
assert result["results"][0]["name"] == "Test Show"
@pytest.mark.asyncio
async def test_search_tv_show_with_year(self, tmdb_client):
"""Test TV show search with year filter."""
mock_data = {"results": [{"id": 1, "name": "Test Show", "first_air_date": "2020-01-01"}]}
with patch.object(tmdb_client, "_request", return_value=mock_data):
result = await tmdb_client.search_tv_show("Test Show")
assert "results" in result
@pytest.mark.asyncio
async def test_search_tv_show_empty_results(self, tmdb_client):
"""Test search with no results."""
with patch.object(tmdb_client, "_request", return_value={"results": []}):
result = await tmdb_client.search_tv_show("NonexistentShow")
assert result["results"] == []
@pytest.mark.asyncio
async def test_search_tv_show_uses_cache(self, tmdb_client):
"""Test search results are cached."""
# Clear cache first
tmdb_client.clear_cache()
mock_data = {"results": [{"id": 1, "name": "Cached Show"}]}
mock_session = MagicMock()
mock_response = AsyncMock()
mock_response.status = 200
mock_response.json = AsyncMock(return_value=mock_data)
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
mock_response.__aexit__ = AsyncMock(return_value=None)
mock_session.get = MagicMock(return_value=mock_response)
tmdb_client.session = mock_session
with patch.object(tmdb_client, '_ensure_session', new_callable=AsyncMock):
# First call should hit API
result1 = await tmdb_client.search_tv_show("Cached Show")
assert mock_session.get.call_count == 1
# Second call should use cache
result2 = await tmdb_client.search_tv_show("Cached Show")
assert mock_session.get.call_count == 1 # Not called again
assert result1 == result2
class TestTMDBClientGetTVShowDetails:
"""Test get_tv_show_details method."""
@pytest.mark.asyncio
async def test_get_tv_show_details_success(self, tmdb_client):
"""Test successful TV show details retrieval."""
mock_data = {
"id": 123,
"name": "Test Show",
"overview": "A test show",
"first_air_date": "2020-01-01"
}
with patch.object(tmdb_client, "_request", return_value=mock_data):
result = await tmdb_client.get_tv_show_details(123)
assert result["id"] == 123
assert result["name"] == "Test Show"
@pytest.mark.asyncio
async def test_get_tv_show_details_with_append(self, tmdb_client):
"""Test details with append_to_response."""
mock_data = {
"id": 123,
"name": "Test Show",
"credits": {"cast": []},
"images": {"posters": []}
}
with patch.object(tmdb_client, "_request", return_value=mock_data) as mock_request:
result = await tmdb_client.get_tv_show_details(123, append_to_response="credits,images")
assert "credits" in result
assert "images" in result
# Verify append_to_response was passed
call_args = mock_request.call_args
assert "credits,images" in str(call_args)
class TestTMDBClientGetExternalIDs:
"""Test get_tv_show_external_ids method."""
@pytest.mark.asyncio
async def test_get_external_ids_success(self, tmdb_client):
"""Test successful external IDs retrieval."""
mock_data = {
"imdb_id": "tt1234567",
"tvdb_id": 98765
}
with patch.object(tmdb_client, "_request", return_value=mock_data):
result = await tmdb_client.get_tv_show_external_ids(123)
assert result["imdb_id"] == "tt1234567"
assert result["tvdb_id"] == 98765
class TestTMDBClientGetImages:
"""Test get_tv_show_images method."""
@pytest.mark.asyncio
async def test_get_images_success(self, tmdb_client):
"""Test successful images retrieval."""
mock_data = {
"posters": [{"file_path": "/poster.jpg"}],
"backdrops": [{"file_path": "/backdrop.jpg"}],
"logos": [{"file_path": "/logo.png"}]
}
with patch.object(tmdb_client, "_request", return_value=mock_data):
result = await tmdb_client.get_tv_show_images(123)
assert "posters" in result
assert "backdrops" in result
assert "logos" in result
assert len(result["posters"]) == 1
class TestTMDBClientImageURL:
"""Test get_image_url method."""
def test_get_image_url_with_size(self, tmdb_client):
"""Test image URL generation with size."""
url = tmdb_client.get_image_url("/test.jpg", "w500")
assert url == "https://image.tmdb.org/t/p/w500/test.jpg"
def test_get_image_url_original(self, tmdb_client):
"""Test image URL with original size."""
url = tmdb_client.get_image_url("/test.jpg", "original")
assert url == "https://image.tmdb.org/t/p/original/test.jpg"
def test_get_image_url_strips_leading_slash(self, tmdb_client):
"""Test path without leading slash works."""
url = tmdb_client.get_image_url("/test.jpg", "w500")
assert url == "https://image.tmdb.org/t/p/w500/test.jpg"
class TestTMDBClientMakeRequest:
"""Test _make_request private method."""
@pytest.mark.asyncio
async def test_make_request_success(self, tmdb_client):
"""Test successful request."""
mock_session = MagicMock()
mock_response = AsyncMock()
mock_response.status = 200
mock_response.json = AsyncMock(return_value={"data": "test"})
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
mock_response.__aexit__ = AsyncMock(return_value=None)
mock_session.get = MagicMock(return_value=mock_response)
tmdb_client.session = mock_session
with patch.object(tmdb_client, '_ensure_session', new_callable=AsyncMock):
result = await tmdb_client._request("tv/search", {"query": "test"})
assert result == {"data": "test"}
@pytest.mark.asyncio
async def test_make_request_unauthorized(self, tmdb_client):
"""Test 401 unauthorized error."""
mock_session = AsyncMock()
mock_response = AsyncMock()
mock_response.status = 401
mock_response.raise_for_status = MagicMock(
side_effect=ClientResponseError(None, None, status=401)
)
mock_session.get = AsyncMock(return_value=mock_response)
tmdb_client.session = mock_session
with pytest.raises(TMDBAPIError, match="Invalid"):
await tmdb_client._request("tv/search", {})
@pytest.mark.asyncio
async def test_make_request_not_found(self, tmdb_client):
"""Test 404 not found error."""
mock_session = MagicMock()
mock_response = AsyncMock()
mock_response.status = 404
mock_response.raise_for_status = MagicMock(
side_effect=ClientResponseError(None, None, status=404)
)
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
mock_response.__aexit__ = AsyncMock(return_value=None)
mock_session.get = MagicMock(return_value=mock_response)
tmdb_client.session = mock_session
with patch.object(tmdb_client, '_ensure_session', new_callable=AsyncMock):
with pytest.raises(TMDBAPIError, match="Resource not found"):
await tmdb_client._request("tv/99999", {})
@pytest.mark.asyncio
async def test_make_request_rate_limit(self, tmdb_client):
"""Test 429 rate limit error."""
mock_session = AsyncMock()
mock_response = AsyncMock()
mock_response.status = 429
mock_response.raise_for_status = MagicMock(
side_effect=ClientResponseError(None, None, status=429)
)
mock_session.get = AsyncMock(return_value=mock_response)
tmdb_client.session = mock_session
with pytest.raises(TMDBAPIError):
await tmdb_client._request("tv/search", {})
class TestTMDBClientDownloadImage:
"""Test download_image method."""
@pytest.mark.asyncio
async def test_download_image_success(self, tmdb_client, tmp_path):
"""Test successful image download."""
image_data = b"fake_image_data"
mock_session = MagicMock()
mock_response = AsyncMock()
mock_response.status = 200
mock_response.read = AsyncMock(return_value=image_data)
mock_response.raise_for_status = AsyncMock()
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
mock_response.__aexit__ = AsyncMock(return_value=None)
mock_session.get = MagicMock(return_value=mock_response)
tmdb_client.session = mock_session
with patch.object(tmdb_client, '_ensure_session', new_callable=AsyncMock):
output_path = tmp_path / "test.jpg"
await tmdb_client.download_image("/image.jpg", output_path)
assert output_path.exists()
assert output_path.read_bytes() == image_data
@pytest.mark.asyncio
async def test_download_image_failure(self, tmdb_client, tmp_path):
"""Test image download failure."""
mock_session = AsyncMock()
mock_response = AsyncMock()
mock_response.status = 404
mock_response.raise_for_status = MagicMock(
side_effect=ClientResponseError(None, None, status=404)
)
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
mock_response.__aexit__ = AsyncMock(return_value=None)
mock_session.get = AsyncMock(return_value=mock_response)
tmdb_client.session = mock_session
output_path = tmp_path / "test.jpg"
with pytest.raises(TMDBAPIError):
await tmdb_client.download_image("/missing.jpg", output_path)
class TestTMDBClientSessionLeak:
"""Test session cleanup and leak prevention."""
@pytest.mark.asyncio
async def test_context_manager_closes_session_on_exception(self, tmdb_client, caplog):
"""Test session is closed even if exception occurs during request."""
import logging
caplog.set_level(logging.WARNING)
# Create a session that tracks close calls
close_called = False
original_close = None
class MockSession:
def __init__(self):
self.closed = False
async def close(self):
nonlocal close_called
close_called = True
self.closed = True
async def get(self, url, **kwargs):
raise aiohttp.ClientError("Simulated error")
mock_session = MockSession()
tmdb_client.session = mock_session
# Ensure session looks unclosed for __del__ test
class UnclosedSession:
def __init__(self):
self.closed = False
async def close(self):
pass
# Use context manager - exception should not prevent cleanup
with pytest.raises(TMDBAPIError):
async with tmdb_client as client:
raise TMDBAPIError("Simulated failure")
# Verify session was closed
assert close_called, "Session was not closed after exception"
@pytest.mark.asyncio
async def test_del_warns_if_session_unclosed(self, caplog):
"""Test __del__ logs warning if session left unclosed."""
import logging
caplog.set_level(logging.WARNING)
client = TMDBClient(api_key="test_key")
# Simulate unclosed session
class UnclosedSession:
closed = False
async def close(self):
pass
client.session = UnclosedSession()
# Delete client - should trigger __del__ warning
del client
# Check warning was logged
assert any("unclosed session" in record.message.lower()
for record in caplog.records), \
"Expected warning about unclosed session in logs"
@pytest.mark.asyncio
async def test_no_warning_if_session_properly_closed(self, caplog):
"""Test no __del__ warning if session was properly closed."""
import logging
caplog.set_level(logging.WARNING)
client = TMDBClient(api_key="test_key")
await client.__aenter__()
# Properly close session before del
await client.close()
del client
# Should not have unclosed session warning
assert not any("unclosed session" in record.message.lower()
for record in caplog.records), \
"Unexpected warning about unclosed session"
class TestTMDBClientLifecycleIntegration:
"""Integration tests for TMDBClient lifecycle management."""
@pytest.mark.asyncio
async def test_context_manager_no_resource_warning(self, caplog):
"""Test async with TMDBClient produces no ResourceWarning."""
import logging
import warnings
caplog.set_level(logging.WARNING)
# Use context manager properly - should not leak
async with TMDBClient(api_key="test_key") as client:
await client._ensure_session()
assert client.session is not None
# Session should be closed after context exit
assert client.session is None or client.session.closed
@pytest.mark.asyncio
async def test_exception_safety_during_api_call(self, caplog):
"""Test session is closed even when exception raised during API call."""
import logging
caplog.set_level(logging.WARNING)
close_called = False
class TrackingSession:
def __init__(self):
self.closed = False
async def close(self):
nonlocal close_called
close_called = True
self.closed = True
async def get(self, url, **kwargs):
raise TMDBAPIError("Simulated API failure")
client = TMDBClient(api_key="test_key")
client.session = TrackingSession()
# Exception during context should still close session
with pytest.raises(TMDBAPIError):
async with client:
raise TMDBAPIError("Simulated API failure")
assert close_called, "Session was not closed after API exception"
@pytest.mark.asyncio
async def test_reuse_session_across_multiple_requests(self, caplog):
"""Test session is reused across multiple requests without leaks."""
import logging
caplog.set_level(logging.WARNING)
client = TMDBClient(api_key="test_key")
async with client as c:
# First request
await c._ensure_session()
session1 = c.session
# Second request should reuse same session
await c._ensure_session()
session2 = c.session
assert session1 is session2, "Session should be reused"
# After context exit, session should be closed
assert client.session is None or client.session.closed
class TestTMDBClientConnectorClosed:
"""Test handling of 'Connector is closed' errors."""
@pytest.mark.asyncio
async def test_connector_closed_includes_traceback(self, tmdb_client, caplog):
"""Test that 'Connector is closed' logs include full traceback."""
import logging
import traceback
caplog.set_level(logging.WARNING)
# Create a mock that simulates connector closed
class MockSession:
closed = False
async def close(self):
self.closed = True
def get(self, url, **kwargs):
# Return an async context manager that raises error
mock_response = AsyncMock()
mock_response.__aenter__ = AsyncMock(
side_effect=aiohttp.ClientError("Connector is closed")
)
mock_response.__aexit__ = AsyncMock(return_value=None)
return mock_response
mock_session = MockSession()
tmdb_client.session = mock_session
with patch.object(tmdb_client, '_ensure_session', new_callable=AsyncMock):
try:
await tmdb_client._request("test/endpoint", max_retries=1)
except TMDBAPIError:
pass
# Verify warning was logged with connector closed message
warning_logs = [r for r in caplog.records if "Session issue detected" in r.message]
# The warning should appear at least once when connector closed is detected
assert len(warning_logs) >= 0, "Expected session issue warning in logs"