- Add db_session parameter to SerieScanner.__init__ - Add async scan_async() method for database-backed scanning - Add _save_serie_to_db() helper for creating/updating series - Add _update_serie_in_db() helper for updating existing series - Add deprecation warning to file-based scan() method - Maintain backward compatibility for CLI usage - Add comprehensive unit tests (15 tests, all passing) - Update instructions.md to mark Task 5 complete
422 lines
16 KiB
Python
422 lines
16 KiB
Python
"""Tests for SerieScanner class - database and file-based operations."""
|
|
|
|
import os
|
|
import tempfile
|
|
import warnings
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from src.core.entities.series import Serie
|
|
from src.core.SerieScanner import SerieScanner
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_directory():
|
|
"""Create a temporary directory with subdirectories for testing."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Create an anime folder with an mp4 file
|
|
anime_folder = os.path.join(tmpdir, "Attack on Titan (2013)")
|
|
os.makedirs(anime_folder, exist_ok=True)
|
|
|
|
# Create a dummy mp4 file
|
|
mp4_path = os.path.join(
|
|
anime_folder, "Attack on Titan - S01E001 - (German Dub).mp4"
|
|
)
|
|
with open(mp4_path, "w") as f:
|
|
f.write("dummy mp4")
|
|
|
|
yield tmpdir
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_loader():
|
|
"""Create a mock Loader instance."""
|
|
loader = MagicMock()
|
|
loader.get_season_episode_count = MagicMock(return_value={1: 25})
|
|
loader.is_language = MagicMock(return_value=True)
|
|
return loader
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_db_session():
|
|
"""Create a mock async database session."""
|
|
session = AsyncMock()
|
|
return session
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_serie():
|
|
"""Create a sample Serie for testing."""
|
|
return Serie(
|
|
key="attack-on-titan",
|
|
name="Attack on Titan",
|
|
site="aniworld.to",
|
|
folder="Attack on Titan (2013)",
|
|
episodeDict={1: [2, 3, 4]}
|
|
)
|
|
|
|
|
|
class TestSerieScannerInitialization:
|
|
"""Test SerieScanner initialization."""
|
|
|
|
def test_init_success(self, temp_directory, mock_loader):
|
|
"""Test successful initialization."""
|
|
scanner = SerieScanner(temp_directory, mock_loader)
|
|
|
|
assert scanner.directory == os.path.abspath(temp_directory)
|
|
assert scanner.loader == mock_loader
|
|
assert scanner.keyDict == {}
|
|
|
|
def test_init_with_db_session(
|
|
self, temp_directory, mock_loader, mock_db_session
|
|
):
|
|
"""Test initialization with database session."""
|
|
scanner = SerieScanner(
|
|
temp_directory,
|
|
mock_loader,
|
|
db_session=mock_db_session
|
|
)
|
|
|
|
assert scanner._db_session == mock_db_session
|
|
|
|
def test_init_empty_path_raises_error(self, mock_loader):
|
|
"""Test initialization with empty path raises ValueError."""
|
|
with pytest.raises(ValueError, match="empty"):
|
|
SerieScanner("", mock_loader)
|
|
|
|
def test_init_nonexistent_path_raises_error(self, mock_loader):
|
|
"""Test initialization with non-existent path raises ValueError."""
|
|
with pytest.raises(ValueError, match="does not exist"):
|
|
SerieScanner("/nonexistent/path", mock_loader)
|
|
|
|
|
|
class TestSerieScannerScanDeprecation:
|
|
"""Test scan() deprecation warning."""
|
|
|
|
def test_scan_raises_deprecation_warning(
|
|
self, temp_directory, mock_loader
|
|
):
|
|
"""Test that scan() raises a deprecation warning."""
|
|
scanner = SerieScanner(temp_directory, mock_loader)
|
|
|
|
with warnings.catch_warnings(record=True) as w:
|
|
warnings.simplefilter("always")
|
|
|
|
# Mock the internal methods to avoid actual scanning
|
|
with patch.object(scanner, 'get_total_to_scan', return_value=0):
|
|
with patch.object(
|
|
scanner, '_SerieScanner__find_mp4_files',
|
|
return_value=iter([])
|
|
):
|
|
scanner.scan()
|
|
|
|
# Check deprecation warning was raised
|
|
assert len(w) >= 1
|
|
deprecation_warnings = [
|
|
warning for warning in w
|
|
if issubclass(warning.category, DeprecationWarning)
|
|
]
|
|
assert len(deprecation_warnings) >= 1
|
|
assert "scan_async()" in str(deprecation_warnings[0].message)
|
|
|
|
|
|
class TestSerieScannerAsyncScan:
|
|
"""Test async database scanning methods."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_async_saves_to_database(
|
|
self, temp_directory, mock_loader, mock_db_session, sample_serie
|
|
):
|
|
"""Test scan_async saves results to database."""
|
|
scanner = SerieScanner(temp_directory, mock_loader)
|
|
|
|
# Mock the internal methods
|
|
with patch.object(scanner, 'get_total_to_scan', return_value=1):
|
|
with patch.object(
|
|
scanner,
|
|
'_SerieScanner__find_mp4_files',
|
|
return_value=iter([
|
|
("Attack on Titan (2013)", ["S01E001.mp4"])
|
|
])
|
|
):
|
|
with patch.object(
|
|
scanner,
|
|
'_SerieScanner__read_data_from_file',
|
|
return_value=sample_serie
|
|
):
|
|
with patch.object(
|
|
scanner,
|
|
'_SerieScanner__get_missing_episodes_and_season',
|
|
return_value=({1: [2, 3]}, "aniworld.to")
|
|
):
|
|
with patch(
|
|
'src.server.database.service.AnimeSeriesService'
|
|
) as mock_service:
|
|
mock_service.get_by_key = AsyncMock(
|
|
return_value=None
|
|
)
|
|
mock_created = MagicMock()
|
|
mock_created.id = 1
|
|
mock_service.create = AsyncMock(
|
|
return_value=mock_created
|
|
)
|
|
|
|
await scanner.scan_async(mock_db_session)
|
|
|
|
# Verify database create was called
|
|
mock_service.create.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_async_updates_existing_series(
|
|
self, temp_directory, mock_loader, mock_db_session, sample_serie
|
|
):
|
|
"""Test scan_async updates existing series in database."""
|
|
scanner = SerieScanner(temp_directory, mock_loader)
|
|
|
|
# Mock existing series in database
|
|
existing = MagicMock()
|
|
existing.id = 1
|
|
existing.episode_dict = {1: [5, 6]} # Different from sample_serie
|
|
|
|
with patch.object(scanner, 'get_total_to_scan', return_value=1):
|
|
with patch.object(
|
|
scanner,
|
|
'_SerieScanner__find_mp4_files',
|
|
return_value=iter([
|
|
("Attack on Titan (2013)", ["S01E001.mp4"])
|
|
])
|
|
):
|
|
with patch.object(
|
|
scanner,
|
|
'_SerieScanner__read_data_from_file',
|
|
return_value=sample_serie
|
|
):
|
|
with patch.object(
|
|
scanner,
|
|
'_SerieScanner__get_missing_episodes_and_season',
|
|
return_value=({1: [2, 3]}, "aniworld.to")
|
|
):
|
|
with patch(
|
|
'src.server.database.service.AnimeSeriesService'
|
|
) as mock_service:
|
|
mock_service.get_by_key = AsyncMock(
|
|
return_value=existing
|
|
)
|
|
mock_service.update = AsyncMock(
|
|
return_value=existing
|
|
)
|
|
|
|
await scanner.scan_async(mock_db_session)
|
|
|
|
# Verify database update was called
|
|
mock_service.update.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_async_handles_errors_gracefully(
|
|
self, temp_directory, mock_loader, mock_db_session
|
|
):
|
|
"""Test scan_async handles folder processing errors gracefully."""
|
|
scanner = SerieScanner(temp_directory, mock_loader)
|
|
|
|
with patch.object(scanner, 'get_total_to_scan', return_value=1):
|
|
with patch.object(
|
|
scanner,
|
|
'_SerieScanner__find_mp4_files',
|
|
return_value=iter([
|
|
("Error Folder", ["S01E001.mp4"])
|
|
])
|
|
):
|
|
with patch.object(
|
|
scanner,
|
|
'_SerieScanner__read_data_from_file',
|
|
side_effect=Exception("Test error")
|
|
):
|
|
# Should not raise, should continue
|
|
await scanner.scan_async(mock_db_session)
|
|
|
|
|
|
class TestSerieScannerDatabaseHelpers:
|
|
"""Test database helper methods."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_save_serie_to_db_creates_new(
|
|
self, temp_directory, mock_loader, mock_db_session, sample_serie
|
|
):
|
|
"""Test _save_serie_to_db creates new series."""
|
|
scanner = SerieScanner(temp_directory, mock_loader)
|
|
|
|
with patch(
|
|
'src.server.database.service.AnimeSeriesService'
|
|
) as mock_service:
|
|
mock_service.get_by_key = AsyncMock(return_value=None)
|
|
mock_created = MagicMock()
|
|
mock_created.id = 1
|
|
mock_service.create = AsyncMock(return_value=mock_created)
|
|
|
|
result = await scanner._save_serie_to_db(
|
|
sample_serie, mock_db_session
|
|
)
|
|
|
|
assert result is mock_created
|
|
mock_service.create.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_save_serie_to_db_updates_existing(
|
|
self, temp_directory, mock_loader, mock_db_session, sample_serie
|
|
):
|
|
"""Test _save_serie_to_db updates existing series."""
|
|
scanner = SerieScanner(temp_directory, mock_loader)
|
|
|
|
existing = MagicMock()
|
|
existing.id = 1
|
|
existing.episode_dict = {1: [5, 6]} # Different episodes
|
|
|
|
with patch(
|
|
'src.server.database.service.AnimeSeriesService'
|
|
) as mock_service:
|
|
mock_service.get_by_key = AsyncMock(return_value=existing)
|
|
mock_service.update = AsyncMock(return_value=existing)
|
|
|
|
result = await scanner._save_serie_to_db(
|
|
sample_serie, mock_db_session
|
|
)
|
|
|
|
assert result is existing
|
|
mock_service.update.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_save_serie_to_db_skips_unchanged(
|
|
self, temp_directory, mock_loader, mock_db_session, sample_serie
|
|
):
|
|
"""Test _save_serie_to_db skips update if unchanged."""
|
|
scanner = SerieScanner(temp_directory, mock_loader)
|
|
|
|
existing = MagicMock()
|
|
existing.id = 1
|
|
existing.episode_dict = sample_serie.episodeDict # Same episodes
|
|
|
|
with patch(
|
|
'src.server.database.service.AnimeSeriesService'
|
|
) as mock_service:
|
|
mock_service.get_by_key = AsyncMock(return_value=existing)
|
|
|
|
result = await scanner._save_serie_to_db(
|
|
sample_serie, mock_db_session
|
|
)
|
|
|
|
assert result is None
|
|
mock_service.update.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_serie_in_db_updates_existing(
|
|
self, temp_directory, mock_loader, mock_db_session, sample_serie
|
|
):
|
|
"""Test _update_serie_in_db updates existing series."""
|
|
scanner = SerieScanner(temp_directory, mock_loader)
|
|
|
|
existing = MagicMock()
|
|
existing.id = 1
|
|
|
|
with patch(
|
|
'src.server.database.service.AnimeSeriesService'
|
|
) as mock_service:
|
|
mock_service.get_by_key = AsyncMock(return_value=existing)
|
|
mock_service.update = AsyncMock(return_value=existing)
|
|
|
|
result = await scanner._update_serie_in_db(
|
|
sample_serie, mock_db_session
|
|
)
|
|
|
|
assert result is existing
|
|
mock_service.update.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_serie_in_db_returns_none_if_not_found(
|
|
self, temp_directory, mock_loader, mock_db_session, sample_serie
|
|
):
|
|
"""Test _update_serie_in_db returns None if series not found."""
|
|
scanner = SerieScanner(temp_directory, mock_loader)
|
|
|
|
with patch(
|
|
'src.server.database.service.AnimeSeriesService'
|
|
) as mock_service:
|
|
mock_service.get_by_key = AsyncMock(return_value=None)
|
|
|
|
result = await scanner._update_serie_in_db(
|
|
sample_serie, mock_db_session
|
|
)
|
|
|
|
assert result is None
|
|
|
|
|
|
class TestSerieScannerBackwardCompatibility:
|
|
"""Test backward compatibility of file-based operations."""
|
|
|
|
def test_file_based_scan_still_works(
|
|
self, temp_directory, mock_loader, sample_serie
|
|
):
|
|
"""Test file-based scan still works with deprecation warning."""
|
|
scanner = SerieScanner(temp_directory, mock_loader)
|
|
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("ignore", DeprecationWarning)
|
|
|
|
with patch.object(scanner, 'get_total_to_scan', return_value=1):
|
|
with patch.object(
|
|
scanner,
|
|
'_SerieScanner__find_mp4_files',
|
|
return_value=iter([
|
|
("Attack on Titan (2013)", ["S01E001.mp4"])
|
|
])
|
|
):
|
|
with patch.object(
|
|
scanner,
|
|
'_SerieScanner__read_data_from_file',
|
|
return_value=sample_serie
|
|
):
|
|
with patch.object(
|
|
scanner,
|
|
'_SerieScanner__get_missing_episodes_and_season',
|
|
return_value=({1: [2, 3]}, "aniworld.to")
|
|
):
|
|
with patch.object(
|
|
sample_serie, 'save_to_file'
|
|
) as mock_save:
|
|
scanner.scan()
|
|
|
|
# Verify file was saved
|
|
mock_save.assert_called_once()
|
|
|
|
def test_keydict_populated_after_scan(
|
|
self, temp_directory, mock_loader, sample_serie
|
|
):
|
|
"""Test keyDict is populated after scan."""
|
|
scanner = SerieScanner(temp_directory, mock_loader)
|
|
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("ignore", DeprecationWarning)
|
|
|
|
with patch.object(scanner, 'get_total_to_scan', return_value=1):
|
|
with patch.object(
|
|
scanner,
|
|
'_SerieScanner__find_mp4_files',
|
|
return_value=iter([
|
|
("Attack on Titan (2013)", ["S01E001.mp4"])
|
|
])
|
|
):
|
|
with patch.object(
|
|
scanner,
|
|
'_SerieScanner__read_data_from_file',
|
|
return_value=sample_serie
|
|
):
|
|
with patch.object(
|
|
scanner,
|
|
'_SerieScanner__get_missing_episodes_and_season',
|
|
return_value=({1: [2, 3]}, "aniworld.to")
|
|
):
|
|
with patch.object(sample_serie, 'save_to_file'):
|
|
scanner.scan()
|
|
|
|
assert sample_serie.key in scanner.keyDict
|