Aniworld/tests/unit/test_serie_scanner.py
Lukas 46ca4c9aac Task 5: Update SerieScanner to use database storage
- Add db_session parameter to SerieScanner.__init__
- Add async scan_async() method for database-backed scanning
- Add _save_serie_to_db() helper for creating/updating series
- Add _update_serie_in_db() helper for updating existing series
- Add deprecation warning to file-based scan() method
- Maintain backward compatibility for CLI usage
- Add comprehensive unit tests (15 tests, all passing)
- Update instructions.md to mark Task 5 complete
2025-12-01 19:25:28 +01:00

422 lines
16 KiB
Python

"""Tests for SerieScanner class - database and file-based operations."""
import os
import tempfile
import warnings
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.core.entities.series import Serie
from src.core.SerieScanner import SerieScanner
@pytest.fixture
def temp_directory():
"""Create a temporary directory with subdirectories for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create an anime folder with an mp4 file
anime_folder = os.path.join(tmpdir, "Attack on Titan (2013)")
os.makedirs(anime_folder, exist_ok=True)
# Create a dummy mp4 file
mp4_path = os.path.join(
anime_folder, "Attack on Titan - S01E001 - (German Dub).mp4"
)
with open(mp4_path, "w") as f:
f.write("dummy mp4")
yield tmpdir
@pytest.fixture
def mock_loader():
"""Create a mock Loader instance."""
loader = MagicMock()
loader.get_season_episode_count = MagicMock(return_value={1: 25})
loader.is_language = MagicMock(return_value=True)
return loader
@pytest.fixture
def mock_db_session():
"""Create a mock async database session."""
session = AsyncMock()
return session
@pytest.fixture
def sample_serie():
"""Create a sample Serie for testing."""
return Serie(
key="attack-on-titan",
name="Attack on Titan",
site="aniworld.to",
folder="Attack on Titan (2013)",
episodeDict={1: [2, 3, 4]}
)
class TestSerieScannerInitialization:
"""Test SerieScanner initialization."""
def test_init_success(self, temp_directory, mock_loader):
"""Test successful initialization."""
scanner = SerieScanner(temp_directory, mock_loader)
assert scanner.directory == os.path.abspath(temp_directory)
assert scanner.loader == mock_loader
assert scanner.keyDict == {}
def test_init_with_db_session(
self, temp_directory, mock_loader, mock_db_session
):
"""Test initialization with database session."""
scanner = SerieScanner(
temp_directory,
mock_loader,
db_session=mock_db_session
)
assert scanner._db_session == mock_db_session
def test_init_empty_path_raises_error(self, mock_loader):
"""Test initialization with empty path raises ValueError."""
with pytest.raises(ValueError, match="empty"):
SerieScanner("", mock_loader)
def test_init_nonexistent_path_raises_error(self, mock_loader):
"""Test initialization with non-existent path raises ValueError."""
with pytest.raises(ValueError, match="does not exist"):
SerieScanner("/nonexistent/path", mock_loader)
class TestSerieScannerScanDeprecation:
"""Test scan() deprecation warning."""
def test_scan_raises_deprecation_warning(
self, temp_directory, mock_loader
):
"""Test that scan() raises a deprecation warning."""
scanner = SerieScanner(temp_directory, mock_loader)
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
# Mock the internal methods to avoid actual scanning
with patch.object(scanner, 'get_total_to_scan', return_value=0):
with patch.object(
scanner, '_SerieScanner__find_mp4_files',
return_value=iter([])
):
scanner.scan()
# Check deprecation warning was raised
assert len(w) >= 1
deprecation_warnings = [
warning for warning in w
if issubclass(warning.category, DeprecationWarning)
]
assert len(deprecation_warnings) >= 1
assert "scan_async()" in str(deprecation_warnings[0].message)
class TestSerieScannerAsyncScan:
"""Test async database scanning methods."""
@pytest.mark.asyncio
async def test_scan_async_saves_to_database(
self, temp_directory, mock_loader, mock_db_session, sample_serie
):
"""Test scan_async saves results to database."""
scanner = SerieScanner(temp_directory, mock_loader)
# Mock the internal methods
with patch.object(scanner, 'get_total_to_scan', return_value=1):
with patch.object(
scanner,
'_SerieScanner__find_mp4_files',
return_value=iter([
("Attack on Titan (2013)", ["S01E001.mp4"])
])
):
with patch.object(
scanner,
'_SerieScanner__read_data_from_file',
return_value=sample_serie
):
with patch.object(
scanner,
'_SerieScanner__get_missing_episodes_and_season',
return_value=({1: [2, 3]}, "aniworld.to")
):
with patch(
'src.server.database.service.AnimeSeriesService'
) as mock_service:
mock_service.get_by_key = AsyncMock(
return_value=None
)
mock_created = MagicMock()
mock_created.id = 1
mock_service.create = AsyncMock(
return_value=mock_created
)
await scanner.scan_async(mock_db_session)
# Verify database create was called
mock_service.create.assert_called_once()
@pytest.mark.asyncio
async def test_scan_async_updates_existing_series(
self, temp_directory, mock_loader, mock_db_session, sample_serie
):
"""Test scan_async updates existing series in database."""
scanner = SerieScanner(temp_directory, mock_loader)
# Mock existing series in database
existing = MagicMock()
existing.id = 1
existing.episode_dict = {1: [5, 6]} # Different from sample_serie
with patch.object(scanner, 'get_total_to_scan', return_value=1):
with patch.object(
scanner,
'_SerieScanner__find_mp4_files',
return_value=iter([
("Attack on Titan (2013)", ["S01E001.mp4"])
])
):
with patch.object(
scanner,
'_SerieScanner__read_data_from_file',
return_value=sample_serie
):
with patch.object(
scanner,
'_SerieScanner__get_missing_episodes_and_season',
return_value=({1: [2, 3]}, "aniworld.to")
):
with patch(
'src.server.database.service.AnimeSeriesService'
) as mock_service:
mock_service.get_by_key = AsyncMock(
return_value=existing
)
mock_service.update = AsyncMock(
return_value=existing
)
await scanner.scan_async(mock_db_session)
# Verify database update was called
mock_service.update.assert_called_once()
@pytest.mark.asyncio
async def test_scan_async_handles_errors_gracefully(
self, temp_directory, mock_loader, mock_db_session
):
"""Test scan_async handles folder processing errors gracefully."""
scanner = SerieScanner(temp_directory, mock_loader)
with patch.object(scanner, 'get_total_to_scan', return_value=1):
with patch.object(
scanner,
'_SerieScanner__find_mp4_files',
return_value=iter([
("Error Folder", ["S01E001.mp4"])
])
):
with patch.object(
scanner,
'_SerieScanner__read_data_from_file',
side_effect=Exception("Test error")
):
# Should not raise, should continue
await scanner.scan_async(mock_db_session)
class TestSerieScannerDatabaseHelpers:
"""Test database helper methods."""
@pytest.mark.asyncio
async def test_save_serie_to_db_creates_new(
self, temp_directory, mock_loader, mock_db_session, sample_serie
):
"""Test _save_serie_to_db creates new series."""
scanner = SerieScanner(temp_directory, mock_loader)
with patch(
'src.server.database.service.AnimeSeriesService'
) as mock_service:
mock_service.get_by_key = AsyncMock(return_value=None)
mock_created = MagicMock()
mock_created.id = 1
mock_service.create = AsyncMock(return_value=mock_created)
result = await scanner._save_serie_to_db(
sample_serie, mock_db_session
)
assert result is mock_created
mock_service.create.assert_called_once()
@pytest.mark.asyncio
async def test_save_serie_to_db_updates_existing(
self, temp_directory, mock_loader, mock_db_session, sample_serie
):
"""Test _save_serie_to_db updates existing series."""
scanner = SerieScanner(temp_directory, mock_loader)
existing = MagicMock()
existing.id = 1
existing.episode_dict = {1: [5, 6]} # Different episodes
with patch(
'src.server.database.service.AnimeSeriesService'
) as mock_service:
mock_service.get_by_key = AsyncMock(return_value=existing)
mock_service.update = AsyncMock(return_value=existing)
result = await scanner._save_serie_to_db(
sample_serie, mock_db_session
)
assert result is existing
mock_service.update.assert_called_once()
@pytest.mark.asyncio
async def test_save_serie_to_db_skips_unchanged(
self, temp_directory, mock_loader, mock_db_session, sample_serie
):
"""Test _save_serie_to_db skips update if unchanged."""
scanner = SerieScanner(temp_directory, mock_loader)
existing = MagicMock()
existing.id = 1
existing.episode_dict = sample_serie.episodeDict # Same episodes
with patch(
'src.server.database.service.AnimeSeriesService'
) as mock_service:
mock_service.get_by_key = AsyncMock(return_value=existing)
result = await scanner._save_serie_to_db(
sample_serie, mock_db_session
)
assert result is None
mock_service.update.assert_not_called()
@pytest.mark.asyncio
async def test_update_serie_in_db_updates_existing(
self, temp_directory, mock_loader, mock_db_session, sample_serie
):
"""Test _update_serie_in_db updates existing series."""
scanner = SerieScanner(temp_directory, mock_loader)
existing = MagicMock()
existing.id = 1
with patch(
'src.server.database.service.AnimeSeriesService'
) as mock_service:
mock_service.get_by_key = AsyncMock(return_value=existing)
mock_service.update = AsyncMock(return_value=existing)
result = await scanner._update_serie_in_db(
sample_serie, mock_db_session
)
assert result is existing
mock_service.update.assert_called_once()
@pytest.mark.asyncio
async def test_update_serie_in_db_returns_none_if_not_found(
self, temp_directory, mock_loader, mock_db_session, sample_serie
):
"""Test _update_serie_in_db returns None if series not found."""
scanner = SerieScanner(temp_directory, mock_loader)
with patch(
'src.server.database.service.AnimeSeriesService'
) as mock_service:
mock_service.get_by_key = AsyncMock(return_value=None)
result = await scanner._update_serie_in_db(
sample_serie, mock_db_session
)
assert result is None
class TestSerieScannerBackwardCompatibility:
"""Test backward compatibility of file-based operations."""
def test_file_based_scan_still_works(
self, temp_directory, mock_loader, sample_serie
):
"""Test file-based scan still works with deprecation warning."""
scanner = SerieScanner(temp_directory, mock_loader)
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
with patch.object(scanner, 'get_total_to_scan', return_value=1):
with patch.object(
scanner,
'_SerieScanner__find_mp4_files',
return_value=iter([
("Attack on Titan (2013)", ["S01E001.mp4"])
])
):
with patch.object(
scanner,
'_SerieScanner__read_data_from_file',
return_value=sample_serie
):
with patch.object(
scanner,
'_SerieScanner__get_missing_episodes_and_season',
return_value=({1: [2, 3]}, "aniworld.to")
):
with patch.object(
sample_serie, 'save_to_file'
) as mock_save:
scanner.scan()
# Verify file was saved
mock_save.assert_called_once()
def test_keydict_populated_after_scan(
self, temp_directory, mock_loader, sample_serie
):
"""Test keyDict is populated after scan."""
scanner = SerieScanner(temp_directory, mock_loader)
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
with patch.object(scanner, 'get_total_to_scan', return_value=1):
with patch.object(
scanner,
'_SerieScanner__find_mp4_files',
return_value=iter([
("Attack on Titan (2013)", ["S01E001.mp4"])
])
):
with patch.object(
scanner,
'_SerieScanner__read_data_from_file',
return_value=sample_serie
):
with patch.object(
scanner,
'_SerieScanner__get_missing_episodes_and_season',
return_value=({1: [2, 3]}, "aniworld.to")
):
with patch.object(sample_serie, 'save_to_file'):
scanner.scan()
assert sample_serie.key in scanner.keyDict