- SerieScanner: Remove key file fallback, keep data file fallback - SystemSettings: Add legacy_key_cleanup_completed flag - initialization_service: Add cleanup task to remove key files from folders with DB entries - Tests updated to reflect key file removal from legacy path Key files caused duplicate key errors on folder rename. DB is now sole source of truth.
832 lines
32 KiB
Python
832 lines
32 KiB
Python
"""Tests for SerieScanner class - file-based operations."""
|
||
|
||
import logging
|
||
import os
|
||
import tempfile
|
||
from unittest.mock import MagicMock, patch
|
||
|
||
import pytest
|
||
|
||
from src.core.entities.series import Serie
|
||
from src.core.SerieScanner import SerieScanner
|
||
|
||
|
||
@pytest.fixture
|
||
def temp_directory():
|
||
"""Create a temporary directory with subdirectories for testing."""
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
# Create an anime folder with an mp4 file
|
||
anime_folder = os.path.join(tmpdir, "Attack on Titan (2013)")
|
||
os.makedirs(anime_folder, exist_ok=True)
|
||
|
||
# Create a dummy mp4 file
|
||
mp4_path = os.path.join(
|
||
anime_folder, "Attack on Titan - S01E001 - (German Dub).mp4"
|
||
)
|
||
with open(mp4_path, "w") as f:
|
||
f.write("dummy mp4")
|
||
|
||
yield tmpdir
|
||
|
||
|
||
@pytest.fixture
|
||
def mock_loader():
|
||
"""Create a mock Loader instance."""
|
||
loader = MagicMock()
|
||
loader.get_season_episode_count = MagicMock(return_value={1: 25})
|
||
loader.is_language = MagicMock(return_value=True)
|
||
return loader
|
||
|
||
|
||
@pytest.fixture
|
||
def sample_serie():
|
||
"""Create a sample Serie for testing."""
|
||
return Serie(
|
||
key="attack-on-titan",
|
||
name="Attack on Titan",
|
||
site="aniworld.to",
|
||
folder="Attack on Titan (2013)",
|
||
episodeDict={1: [2, 3, 4]}
|
||
)
|
||
|
||
|
||
class TestSerieScannerInitialization:
|
||
"""Test SerieScanner initialization."""
|
||
|
||
def test_init_success(self, temp_directory, mock_loader):
|
||
"""Test successful initialization."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
assert scanner.directory == os.path.abspath(temp_directory)
|
||
assert scanner.loader == mock_loader
|
||
assert scanner.keyDict == {}
|
||
|
||
def test_init_empty_path_raises_error(self, mock_loader):
|
||
"""Test initialization with empty path raises ValueError."""
|
||
with pytest.raises(ValueError, match="empty"):
|
||
SerieScanner("", mock_loader)
|
||
|
||
def test_init_nonexistent_path_raises_error(self, mock_loader):
|
||
"""Test initialization with non-existent path raises ValueError."""
|
||
with pytest.raises(ValueError, match="does not exist"):
|
||
SerieScanner("/nonexistent/path", mock_loader)
|
||
|
||
|
||
class TestSerieScannerScan:
|
||
"""Test file-based scan operations."""
|
||
|
||
def test_scan_persists_to_db(
|
||
self, temp_directory, mock_loader, sample_serie
|
||
):
|
||
"""Test scan persists series to database."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=1):
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__find_mp4_files',
|
||
return_value=iter([
|
||
("Attack on Titan (2013)", ["S01E001.mp4"])
|
||
])
|
||
):
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__read_data_from_file',
|
||
return_value=sample_serie
|
||
):
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: [2, 3]}, "aniworld.to")
|
||
):
|
||
with patch.object(
|
||
scanner, '_persist_serie_to_db'
|
||
) as mock_persist:
|
||
scanner.scan()
|
||
|
||
# Verify DB persistence was called
|
||
mock_persist.assert_called_once()
|
||
# Check the serie passed matches
|
||
call_args = mock_persist.call_args
|
||
assert call_args[0][0].key == "attack-on-titan"
|
||
|
||
def test_keydict_populated_after_scan(
|
||
self, temp_directory, mock_loader, sample_serie
|
||
):
|
||
"""Test keyDict is populated after scan."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=1):
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__find_mp4_files',
|
||
return_value=iter([
|
||
("Attack on Titan (2013)", ["S01E001.mp4"])
|
||
])
|
||
):
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__read_data_from_file',
|
||
return_value=sample_serie
|
||
):
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: [2, 3]}, "aniworld.to")
|
||
):
|
||
with patch.object(sample_serie, 'save_to_file'):
|
||
scanner.scan()
|
||
|
||
assert sample_serie.key in scanner.keyDict
|
||
|
||
|
||
class TestSerieScannerSingleSeries:
|
||
"""Test scan_single_series method for targeted scanning."""
|
||
|
||
def test_scan_single_series_basic(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test basic scan_single_series functionality."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
# Mock the missing episodes calculation
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: [5, 6, 7], 2: [1, 2]}, "aniworld.to")
|
||
):
|
||
result = scanner.scan_single_series(
|
||
key="attack-on-titan",
|
||
folder="Attack on Titan (2013)"
|
||
)
|
||
|
||
# Verify result structure
|
||
assert isinstance(result, dict)
|
||
assert 1 in result
|
||
assert 2 in result
|
||
assert result[1] == [5, 6, 7]
|
||
assert result[2] == [1, 2]
|
||
|
||
def test_scan_single_series_updates_keydict(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test that scan_single_series updates keyDict."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: [1, 2, 3]}, "aniworld.to")
|
||
):
|
||
scanner.scan_single_series(
|
||
key="test-anime",
|
||
folder="Test Anime"
|
||
)
|
||
|
||
# Verify keyDict was updated
|
||
assert "test-anime" in scanner.keyDict
|
||
assert scanner.keyDict["test-anime"].episodeDict == {1: [1, 2, 3]}
|
||
|
||
def test_scan_single_series_existing_entry(
|
||
self, temp_directory, mock_loader, sample_serie
|
||
):
|
||
"""Test scan_single_series updates existing entry in keyDict."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
# Pre-populate keyDict
|
||
scanner.keyDict[sample_serie.key] = sample_serie
|
||
old_episode_dict = sample_serie.episodeDict.copy()
|
||
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: [10, 11, 12]}, "aniworld.to")
|
||
):
|
||
scanner.scan_single_series(
|
||
key=sample_serie.key,
|
||
folder=sample_serie.folder
|
||
)
|
||
|
||
# Verify existing entry was updated
|
||
assert scanner.keyDict[sample_serie.key].episodeDict != old_episode_dict
|
||
assert scanner.keyDict[sample_serie.key].episodeDict == {1: [10, 11, 12]}
|
||
|
||
def test_scan_single_series_empty_key_raises_error(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test that empty key raises ValueError."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
with pytest.raises(ValueError, match="key cannot be empty"):
|
||
scanner.scan_single_series(key="", folder="Test Folder")
|
||
|
||
def test_scan_single_series_empty_folder_raises_error(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test that empty folder raises ValueError."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
with pytest.raises(ValueError, match="folder cannot be empty"):
|
||
scanner.scan_single_series(key="test-key", folder="")
|
||
|
||
def test_scan_single_series_nonexistent_folder(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test scanning a series with non-existent folder."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
# Mock to return some episodes (as if from provider)
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: [1, 2, 3, 4, 5]}, "aniworld.to")
|
||
):
|
||
result = scanner.scan_single_series(
|
||
key="new-anime",
|
||
folder="NonExistent Folder"
|
||
)
|
||
|
||
# Should still return missing episodes from provider
|
||
assert result == {1: [1, 2, 3, 4, 5]}
|
||
|
||
def test_scan_single_series_error_handling(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test that errors during scan return empty dict."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
side_effect=Exception("Provider error")
|
||
):
|
||
result = scanner.scan_single_series(
|
||
key="test-anime",
|
||
folder="Test Folder"
|
||
)
|
||
|
||
# Should return empty dict on error
|
||
assert result == {}
|
||
|
||
def test_scan_single_series_no_missing_episodes(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test scan when no episodes are missing."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({}, "aniworld.to")
|
||
):
|
||
result = scanner.scan_single_series(
|
||
key="complete-anime",
|
||
folder="Complete Anime"
|
||
)
|
||
|
||
assert result == {}
|
||
assert "complete-anime" in scanner.keyDict
|
||
assert scanner.keyDict["complete-anime"].episodeDict == {}
|
||
|
||
def test_scan_single_series_with_existing_files(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test scan with existing MP4 files in folder."""
|
||
# Create folder with some files
|
||
anime_folder = os.path.join(temp_directory, "Test Anime")
|
||
os.makedirs(anime_folder, exist_ok=True)
|
||
season_folder = os.path.join(anime_folder, "Season 1")
|
||
os.makedirs(season_folder, exist_ok=True)
|
||
|
||
# Create dummy MP4 files
|
||
for ep in [1, 2, 3]:
|
||
mp4_path = os.path.join(
|
||
season_folder, f"Test Anime - S01E{ep:03d} - (German Dub).mp4"
|
||
)
|
||
with open(mp4_path, "w") as f:
|
||
f.write("dummy")
|
||
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
# Mock to return missing episodes (4, 5, 6)
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: [4, 5, 6]}, "aniworld.to")
|
||
):
|
||
result = scanner.scan_single_series(
|
||
key="test-anime",
|
||
folder="Test Anime"
|
||
)
|
||
|
||
# Should only show missing episodes
|
||
assert result == {1: [4, 5, 6]}
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
# New coverage tests – events, year extraction, find_mp4, read_data
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
class TestEventSubscription:
|
||
"""Test subscribe/unsubscribe for all event types."""
|
||
|
||
def test_subscribe_on_progress(self, temp_directory, mock_loader):
|
||
"""Should add handler to on_progress."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.subscribe_on_progress(handler)
|
||
assert handler in scanner.events.on_progress
|
||
|
||
def test_unsubscribe_on_progress(self, temp_directory, mock_loader):
|
||
"""Should remove handler from on_progress."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.subscribe_on_progress(handler)
|
||
scanner.unsubscribe_on_progress(handler)
|
||
assert handler not in scanner.events.on_progress
|
||
|
||
def test_subscribe_duplicate_ignored(self, temp_directory, mock_loader):
|
||
"""Subscribing same handler twice should not duplicate."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.subscribe_on_progress(handler)
|
||
scanner.subscribe_on_progress(handler)
|
||
assert scanner.events.on_progress.count(handler) == 1
|
||
|
||
def test_unsubscribe_missing_handler_noop(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Unsubscribing unknown handler should not raise."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.unsubscribe_on_progress(handler) # should not raise
|
||
|
||
def test_subscribe_on_error(self, temp_directory, mock_loader):
|
||
"""Should add handler to on_error."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.subscribe_on_error(handler)
|
||
assert handler in scanner.events.on_error
|
||
|
||
def test_unsubscribe_on_error(self, temp_directory, mock_loader):
|
||
"""Should remove handler from on_error."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.subscribe_on_error(handler)
|
||
scanner.unsubscribe_on_error(handler)
|
||
assert handler not in scanner.events.on_error
|
||
|
||
def test_subscribe_on_completion(self, temp_directory, mock_loader):
|
||
"""Should add handler to on_completion."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.subscribe_on_completion(handler)
|
||
assert handler in scanner.events.on_completion
|
||
|
||
def test_unsubscribe_on_completion(self, temp_directory, mock_loader):
|
||
"""Should remove handler from on_completion."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.subscribe_on_completion(handler)
|
||
scanner.unsubscribe_on_completion(handler)
|
||
assert handler not in scanner.events.on_completion
|
||
|
||
|
||
class TestExtractYearFromFolderName:
|
||
"""Test _extract_year_from_folder_name."""
|
||
|
||
def test_extracts_year(self, temp_directory, mock_loader):
|
||
"""Should extract year from folder like 'Title (2025)'."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
assert scanner._extract_year_from_folder_name("Dororo (2025)") == 2025
|
||
|
||
def test_no_year_returns_none(self, temp_directory, mock_loader):
|
||
"""Folder without year returns None."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
assert scanner._extract_year_from_folder_name("Dororo") is None
|
||
|
||
def test_empty_string_returns_none(self, temp_directory, mock_loader):
|
||
"""Empty string returns None."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
assert scanner._extract_year_from_folder_name("") is None
|
||
|
||
def test_none_returns_none(self, temp_directory, mock_loader):
|
||
"""None input returns None."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
assert scanner._extract_year_from_folder_name(None) is None
|
||
|
||
def test_year_out_of_range_returns_none(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Year outside 1900-2100 returns None."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
assert scanner._extract_year_from_folder_name("Title (1800)") is None
|
||
assert scanner._extract_year_from_folder_name("Title (2200)") is None
|
||
|
||
def test_year_in_middle(self, temp_directory, mock_loader):
|
||
"""Year in the middle of folder name should be extracted."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
assert (
|
||
scanner._extract_year_from_folder_name("Title (2020) - Extra")
|
||
== 2020
|
||
)
|
||
|
||
|
||
class TestSafeCallEvent:
|
||
"""Test _safe_call_event method."""
|
||
|
||
def test_calls_handler(self, temp_directory, mock_loader):
|
||
"""Handler should be called with data."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.events.on_progress = [handler]
|
||
scanner._safe_call_event(scanner.events.on_progress, {"test": True})
|
||
handler.assert_called_once_with({"test": True})
|
||
|
||
def test_handler_error_swallowed(self, temp_directory, mock_loader):
|
||
"""Handler exceptions should be swallowed."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock(side_effect=Exception("boom"))
|
||
scanner.events.on_progress = [handler]
|
||
# Should not raise
|
||
scanner._safe_call_event(scanner.events.on_progress, {"test": True})
|
||
|
||
def test_empty_handler_list_noop(self, temp_directory, mock_loader):
|
||
"""Empty handler list should not raise."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
scanner.events.on_progress = []
|
||
scanner._safe_call_event(scanner.events.on_progress, {"test": True})
|
||
|
||
|
||
class TestFindMp4Files:
|
||
"""Test __find_mp4_files method."""
|
||
|
||
def test_finds_mp4_files(self, temp_directory, mock_loader):
|
||
"""Should yield folders with mp4 files."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
result = list(scanner._SerieScanner__find_mp4_files())
|
||
# temp_directory has "Attack on Titan (2013)" with one mp4
|
||
assert len(result) >= 1
|
||
folder, mp4s = result[0]
|
||
assert folder == "Attack on Titan (2013)"
|
||
assert len(mp4s) == 1
|
||
|
||
def test_empty_directory(self, mock_loader):
|
||
"""Should yield nothing for empty directory."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
scanner = SerieScanner(tmpdir, mock_loader)
|
||
result = list(scanner._SerieScanner__find_mp4_files())
|
||
assert len(result) == 0
|
||
|
||
def test_nested_mp4_files(self, mock_loader):
|
||
"""Should find mp4 files in subdirectories."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
# Create nested structure
|
||
anime = os.path.join(tmpdir, "Naruto")
|
||
season = os.path.join(anime, "Season 1")
|
||
os.makedirs(season)
|
||
with open(os.path.join(season, "ep1.mp4"), "w") as f:
|
||
f.write("dummy")
|
||
|
||
scanner = SerieScanner(tmpdir, mock_loader)
|
||
result = list(scanner._SerieScanner__find_mp4_files())
|
||
assert len(result) == 1
|
||
assert "Naruto" == result[0][0]
|
||
assert len(result[0][1]) == 1
|
||
|
||
def test_non_mp4_ignored(self, mock_loader):
|
||
"""Should ignore non-mp4 files."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
anime = os.path.join(tmpdir, "TestAnime")
|
||
os.makedirs(anime)
|
||
with open(os.path.join(anime, "readme.txt"), "w") as f:
|
||
f.write("not a video")
|
||
|
||
scanner = SerieScanner(tmpdir, mock_loader)
|
||
result = list(scanner._SerieScanner__find_mp4_files())
|
||
# The folder is yielded but with empty mp4 list
|
||
assert len(result) == 1
|
||
assert result[0][1] == []
|
||
|
||
|
||
class TestReadDataFromFile:
|
||
"""Test __read_data_from_file method."""
|
||
|
||
def test_reads_data_file(self, mock_loader):
|
||
"""Should read Serie from 'data' file when no DB entry exists."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
anime_folder = os.path.join(tmpdir, "SomeAnime")
|
||
os.makedirs(anime_folder)
|
||
|
||
# Create a data file
|
||
serie = Serie("test-key", "Test", "aniworld.to", "SomeAnime", {})
|
||
data_path = os.path.join(anime_folder, "data")
|
||
serie.save_to_file(data_path)
|
||
|
||
scanner = SerieScanner(tmpdir, mock_loader)
|
||
result = scanner._SerieScanner__read_data_from_file("SomeAnime")
|
||
assert result is not None
|
||
assert result.key == "test-key"
|
||
|
||
def test_no_files_returns_serie_with_generated_key(self, mock_loader):
|
||
"""Should return Serie with generated key when no key or data file exists."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
anime_folder = os.path.join(tmpdir, "Empty")
|
||
os.makedirs(anime_folder)
|
||
|
||
scanner = SerieScanner(tmpdir, mock_loader)
|
||
result = scanner._SerieScanner__read_data_from_file("Empty")
|
||
# Step 4 generates key from folder name when no files exist
|
||
assert result is not None
|
||
assert isinstance(result, Serie)
|
||
assert result.key == "empty"
|
||
|
||
|
||
class TestReinit:
|
||
"""Test reinit method."""
|
||
|
||
def test_clears_keydict(self, temp_directory, mock_loader):
|
||
"""reinit should clear the keyDict."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
scanner.keyDict["test"] = MagicMock()
|
||
scanner.reinit()
|
||
assert scanner.keyDict == {}
|
||
|
||
|
||
class TestGetTotalToScan:
|
||
"""Test get_total_to_scan."""
|
||
|
||
def test_counts_folders(self, temp_directory, mock_loader):
|
||
"""Should count number of folders."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
count = scanner.get_total_to_scan()
|
||
assert count >= 1
|
||
|
||
def test_empty_directory(self, mock_loader):
|
||
"""Should return 0 for empty directory."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
scanner = SerieScanner(tmpdir, mock_loader)
|
||
assert scanner.get_total_to_scan() == 0
|
||
|
||
|
||
class TestScanProgressEvents:
|
||
"""Test that scan emits progress and completion events."""
|
||
|
||
def test_scan_emits_progress(self, temp_directory, mock_loader):
|
||
"""Should emit on_progress during scan."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
progress_handler = MagicMock()
|
||
scanner.subscribe_on_progress(progress_handler)
|
||
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=0), \
|
||
patch.object(
|
||
scanner, '_SerieScanner__find_mp4_files',
|
||
return_value=iter([])
|
||
):
|
||
scanner.scan()
|
||
|
||
# At minimum, STARTING event should fire
|
||
assert progress_handler.call_count >= 1
|
||
first_call = progress_handler.call_args_list[0][0][0]
|
||
assert first_call["phase"] == "STARTING"
|
||
|
||
def test_scan_emits_completion(self, temp_directory, mock_loader):
|
||
"""Should emit on_completion after scan."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
completion_handler = MagicMock()
|
||
scanner.subscribe_on_completion(completion_handler)
|
||
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=0), \
|
||
patch.object(
|
||
scanner, '_SerieScanner__find_mp4_files',
|
||
return_value=iter([])
|
||
):
|
||
scanner.scan()
|
||
|
||
completion_handler.assert_called_once()
|
||
call_data = completion_handler.call_args[0][0]
|
||
assert call_data["success"] is True
|
||
|
||
def test_scan_emits_error_on_no_key(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Should emit on_error when NoKeyFoundException occurs."""
|
||
from src.core.exceptions.Exceptions import NoKeyFoundException
|
||
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
error_handler = MagicMock()
|
||
scanner.subscribe_on_error(error_handler)
|
||
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=1), \
|
||
patch.object(
|
||
scanner, '_SerieScanner__find_mp4_files',
|
||
return_value=iter([("BadFolder", ["e1.mp4"])])
|
||
), \
|
||
patch.object(
|
||
scanner, '_SerieScanner__read_data_from_file',
|
||
side_effect=NoKeyFoundException("no key"),
|
||
):
|
||
scanner.scan()
|
||
|
||
error_handler.assert_called_once()
|
||
call_data = error_handler.call_args[0][0]
|
||
assert call_data["recoverable"] is True
|
||
|
||
|
||
class TestDbLookupFallback:
|
||
"""Tests for the db_lookup callback in SerieScanner."""
|
||
|
||
def _make_scanner(self, tmp_dir, mock_loader, db_lookup=None):
|
||
"""Create a scanner with an optional db_lookup."""
|
||
# Create a folder with an mp4 but NO key/data file
|
||
folder = os.path.join(tmp_dir, "Rooster Fighter (2026)")
|
||
os.makedirs(folder, exist_ok=True)
|
||
mp4 = os.path.join(folder, "Rooster Fighter - S01E001 - (German Dub).mp4")
|
||
with open(mp4, "w") as f:
|
||
f.write("dummy")
|
||
return SerieScanner(tmp_dir, mock_loader, db_lookup=db_lookup)
|
||
|
||
def test_db_lookup_stored_on_init(self, temp_directory, mock_loader):
|
||
"""db_lookup callable should be stored as _db_lookup."""
|
||
lookup = MagicMock(return_value=None)
|
||
scanner = SerieScanner(temp_directory, mock_loader, db_lookup=lookup)
|
||
assert scanner._db_lookup is lookup
|
||
|
||
def test_no_db_lookup_defaults_to_none(self, temp_directory, mock_loader):
|
||
"""Without db_lookup, _db_lookup should be None."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
assert scanner._db_lookup is None
|
||
|
||
def test_db_lookup_called_when_no_files(self, mock_loader):
|
||
"""db_lookup is called when neither key nor data file exists."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||
lookup = MagicMock(return_value=None)
|
||
scanner = self._make_scanner(tmp_dir, mock_loader, db_lookup=lookup)
|
||
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=1), \
|
||
patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({}, "aniworld.to"),
|
||
):
|
||
scanner.scan()
|
||
|
||
lookup.assert_called_once_with("Rooster Fighter (2026)")
|
||
|
||
def test_db_lookup_not_called_when_key_file_exists(self, mock_loader):
|
||
"""db_lookup is NOT called when a key file is present."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||
folder = os.path.join(tmp_dir, "Rooster Fighter (2026)")
|
||
os.makedirs(folder, exist_ok=True)
|
||
mp4 = os.path.join(folder, "S01E001.mp4")
|
||
with open(mp4, "w") as f:
|
||
f.write("dummy")
|
||
with open(os.path.join(folder, "key"), "w") as f:
|
||
f.write("rooster-fighter")
|
||
|
||
lookup = MagicMock(return_value=None)
|
||
scanner = SerieScanner(tmp_dir, mock_loader, db_lookup=lookup)
|
||
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=1), \
|
||
patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: []}, "aniworld.to"),
|
||
), \
|
||
patch.object(
|
||
SerieScanner,
|
||
'_SerieScanner__read_data_from_file',
|
||
return_value=Serie(
|
||
key="rooster-fighter", name="", site="aniworld.to",
|
||
folder="Rooster Fighter (2026)", episodeDict={},
|
||
),
|
||
):
|
||
scanner.scan()
|
||
|
||
lookup.assert_not_called()
|
||
|
||
def test_db_lookup_resolves_serie_and_scans(self, mock_loader):
|
||
"""When db_lookup returns a Serie, scanning continues normally."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||
resolved = Serie(
|
||
key="rooster-fighter",
|
||
name="Rooster Fighter",
|
||
site="aniworld.to",
|
||
folder="Rooster Fighter (2026)",
|
||
episodeDict={},
|
||
year=2026,
|
||
)
|
||
lookup = MagicMock(return_value=resolved)
|
||
scanner = self._make_scanner(tmp_dir, mock_loader, db_lookup=lookup)
|
||
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=1), \
|
||
patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: [1, 2, 3]}, "aniworld.to"),
|
||
), \
|
||
patch.object(resolved, 'save_to_file'):
|
||
scanner.scan()
|
||
|
||
assert "rooster-fighter" in scanner.keyDict
|
||
assert scanner.keyDict["rooster-fighter"].episodeDict == {1: [1, 2, 3]}
|
||
|
||
def test_db_lookup_returns_none_folder_skipped(self, mock_loader):
|
||
"""When db_lookup returns None, Step 4 fallback generates key from folder name."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||
lookup = MagicMock(return_value=None)
|
||
scanner = self._make_scanner(tmp_dir, mock_loader, db_lookup=lookup)
|
||
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=1):
|
||
scanner.scan()
|
||
|
||
# Step 4 generates key from folder name, so keyDict is not empty
|
||
assert len(scanner.keyDict) == 1
|
||
|
||
def test_db_lookup_exception_skips_folder(self, mock_loader):
|
||
"""When db_lookup raises, Step 4 fallback generates key from folder name."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||
lookup = MagicMock(side_effect=RuntimeError("DB offline"))
|
||
scanner = self._make_scanner(tmp_dir, mock_loader, db_lookup=lookup)
|
||
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=1):
|
||
scanner.scan() # should not raise
|
||
|
||
# Step 4 generates key from folder name, so keyDict is not empty
|
||
assert len(scanner.keyDict) == 1
|
||
|
||
def test_db_lookup_warning_logged_when_no_files(
|
||
self, mock_loader, caplog
|
||
):
|
||
"""A warning is logged for folders without key/data file."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||
scanner = self._make_scanner(tmp_dir, mock_loader, db_lookup=None)
|
||
|
||
with caplog.at_level(logging.WARNING, logger="src.core.SerieScanner"):
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=1):
|
||
scanner.scan()
|
||
|
||
assert any(
|
||
"Rooster Fighter (2026)" in record.message
|
||
for record in caplog.records
|
||
if record.levelname == "WARNING"
|
||
)
|
||
|
||
def test_db_lookup_info_logged_on_resolution(
|
||
self, mock_loader, caplog
|
||
):
|
||
"""An INFO log is emitted when db_lookup resolves a folder."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||
resolved = Serie(
|
||
key="rooster-fighter",
|
||
name="",
|
||
site="aniworld.to",
|
||
folder="Rooster Fighter (2026)",
|
||
episodeDict={},
|
||
)
|
||
lookup = MagicMock(return_value=resolved)
|
||
scanner = self._make_scanner(tmp_dir, mock_loader, db_lookup=lookup)
|
||
|
||
with caplog.at_level(logging.INFO, logger="src.core.SerieScanner"), \
|
||
patch.object(scanner, 'get_total_to_scan', return_value=1), \
|
||
patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({}, "aniworld.to"),
|
||
), \
|
||
patch.object(resolved, 'save_to_file'):
|
||
scanner.scan()
|
||
|
||
assert any(
|
||
"rooster-fighter" in record.message
|
||
for record in caplog.records
|
||
if record.levelname == "INFO"
|
||
)
|