- Use dynamic APP_VERSION instead of hardcoded v1.3.6 in: test_template_helpers, test_health, test_page_controller - Add unresolved_folders to EXPECTED_TABLES in database/init.py - Fix shallow copy bug in test_serie_scanner.py episodeDict comparison - Update test_schema_constants to expect 6 tables instead of 5
631 lines
24 KiB
Python
631 lines
24 KiB
Python
"""Tests for SerieScanner class - file-based operations."""
|
||
|
||
import logging
|
||
import os
|
||
import tempfile
|
||
from unittest.mock import MagicMock, patch
|
||
|
||
import pytest
|
||
|
||
from src.server.database.models import AnimeSeries
|
||
from src.server.SerieScanner import SerieScanner
|
||
|
||
|
||
@pytest.fixture
|
||
def temp_directory():
|
||
"""Create a temporary directory with subdirectories for testing."""
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
# Create an anime folder with an mp4 file
|
||
anime_folder = os.path.join(tmpdir, "Attack on Titan (2013)")
|
||
os.makedirs(anime_folder, exist_ok=True)
|
||
|
||
# Create a dummy mp4 file
|
||
mp4_path = os.path.join(
|
||
anime_folder, "Attack on Titan - S01E001 - (German Dub).mp4"
|
||
)
|
||
with open(mp4_path, "w") as f:
|
||
f.write("dummy mp4")
|
||
|
||
yield tmpdir
|
||
|
||
|
||
@pytest.fixture
|
||
def mock_loader():
|
||
"""Create a mock Loader instance."""
|
||
loader = MagicMock()
|
||
loader.get_season_episode_count = MagicMock(return_value={1: 25})
|
||
loader.is_language = MagicMock(return_value=True)
|
||
return loader
|
||
|
||
|
||
@pytest.fixture
|
||
def sample_serie():
|
||
"""Create a sample AnimeSeries mock for testing."""
|
||
anime = MagicMock(spec=AnimeSeries)
|
||
anime.key = "attack-on-titan"
|
||
anime.name = "Attack on Titan"
|
||
anime.site = "aniworld.to"
|
||
anime.folder = "Attack on Titan (2013)"
|
||
anime.year = None
|
||
anime.nfo_path = None
|
||
anime.episodeDict = {1: [2, 3, 4]}
|
||
return anime
|
||
|
||
|
||
class TestSerieScannerInitialization:
|
||
"""Test SerieScanner initialization."""
|
||
|
||
def test_init_success(self, temp_directory, mock_loader):
|
||
"""Test successful initialization."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
assert scanner.directory == os.path.abspath(temp_directory)
|
||
assert scanner.loader == mock_loader
|
||
assert scanner.keyDict == {}
|
||
|
||
def test_init_empty_path_raises_error(self, mock_loader):
|
||
"""Test initialization with empty path raises ValueError."""
|
||
with pytest.raises(ValueError, match="empty"):
|
||
SerieScanner("", mock_loader)
|
||
|
||
def test_init_nonexistent_path_raises_error(self, mock_loader):
|
||
"""Test initialization with non-existent path raises ValueError."""
|
||
with pytest.raises(ValueError, match="does not exist"):
|
||
SerieScanner("/nonexistent/path", mock_loader)
|
||
|
||
|
||
class TestSerieScannerScan:
|
||
"""Test file-based scan operations."""
|
||
|
||
def test_scan_persists_to_db(
|
||
self, temp_directory, mock_loader, sample_serie
|
||
):
|
||
"""Test scan persists series to database."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=1):
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__find_mp4_files',
|
||
return_value=iter([
|
||
("Attack on Titan (2013)", ["S01E001.mp4"])
|
||
])
|
||
):
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__read_data_from_file',
|
||
return_value=sample_serie
|
||
):
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: [2, 3]}, "aniworld.to")
|
||
):
|
||
with patch.object(
|
||
scanner, '_persist_serie_to_db'
|
||
) as mock_persist:
|
||
scanner.scan()
|
||
|
||
# Verify DB persistence was called
|
||
mock_persist.assert_called_once()
|
||
# Check the serie passed matches
|
||
call_args = mock_persist.call_args
|
||
assert call_args[0][0].key == "attack-on-titan"
|
||
|
||
def test_keydict_populated_after_scan(
|
||
self, temp_directory, mock_loader, sample_serie
|
||
):
|
||
"""Test keyDict is populated after scan."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=1):
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__find_mp4_files',
|
||
return_value=iter([
|
||
("Attack on Titan (2013)", ["S01E001.mp4"])
|
||
])
|
||
):
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__read_data_from_file',
|
||
return_value=sample_serie
|
||
):
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: [2, 3]}, "aniworld.to")
|
||
):
|
||
with patch.object(
|
||
scanner, '_persist_serie_to_db'
|
||
):
|
||
scanner.scan()
|
||
|
||
assert sample_serie.key in scanner.keyDict
|
||
|
||
|
||
class TestSerieScannerSingleSeries:
|
||
"""Test scan_single_series method for targeted scanning."""
|
||
|
||
def test_scan_single_series_basic(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test basic scan_single_series functionality."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
# Mock the missing episodes calculation
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: [5, 6, 7], 2: [1, 2]}, "aniworld.to")
|
||
):
|
||
result = scanner.scan_single_series(
|
||
key="attack-on-titan",
|
||
folder="Attack on Titan (2013)"
|
||
)
|
||
|
||
# Verify result structure
|
||
assert isinstance(result, dict)
|
||
assert 1 in result
|
||
assert 2 in result
|
||
assert result[1] == [5, 6, 7]
|
||
assert result[2] == [1, 2]
|
||
|
||
def test_scan_single_series_updates_keydict(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test that scan_single_series updates keyDict."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: [1, 2, 3]}, "aniworld.to")
|
||
):
|
||
scanner.scan_single_series(
|
||
key="test-anime",
|
||
folder="Test Anime"
|
||
)
|
||
|
||
# Verify keyDict was updated
|
||
assert "test-anime" in scanner.keyDict
|
||
assert scanner.keyDict["test-anime"].episodeDict == {1: [1, 2, 3]}
|
||
|
||
def test_scan_single_series_existing_entry(
|
||
self, temp_directory, mock_loader, sample_serie
|
||
):
|
||
"""Test scan_single_series updates existing entry in keyDict."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
# Pre-populate keyDict
|
||
scanner.keyDict[sample_serie.key] = sample_serie
|
||
# Use deepcopy because episodeDict is modified in-place
|
||
import copy
|
||
old_episode_dict = copy.deepcopy(sample_serie.episodeDict)
|
||
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: [10, 11, 12]}, "aniworld.to")
|
||
):
|
||
scanner.scan_single_series(
|
||
key=sample_serie.key,
|
||
folder=sample_serie.folder
|
||
)
|
||
|
||
# Verify existing entry was updated - episodeDict is merged (not replaced)
|
||
# Old episodes [2, 3, 4] + new episodes [10, 11, 12] = merged result
|
||
assert scanner.keyDict[sample_serie.key].episodeDict != old_episode_dict
|
||
assert scanner.keyDict[sample_serie.key].episodeDict == {1: [2, 3, 4, 10, 11, 12]}
|
||
|
||
def test_scan_single_series_empty_key_raises_error(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test that empty key raises ValueError."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
with pytest.raises(ValueError, match="key cannot be empty"):
|
||
scanner.scan_single_series(key="", folder="Test Folder")
|
||
|
||
def test_scan_single_series_empty_folder_raises_error(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test that empty folder raises ValueError."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
with pytest.raises(ValueError, match="folder cannot be empty"):
|
||
scanner.scan_single_series(key="test-key", folder="")
|
||
|
||
def test_scan_single_series_nonexistent_folder(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test scanning a series with non-existent folder."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
# Mock to return some episodes (as if from provider)
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: [1, 2, 3, 4, 5]}, "aniworld.to")
|
||
):
|
||
result = scanner.scan_single_series(
|
||
key="new-anime",
|
||
folder="NonExistent Folder"
|
||
)
|
||
|
||
# Should still return missing episodes from provider
|
||
assert result == {1: [1, 2, 3, 4, 5]}
|
||
|
||
def test_scan_single_series_error_handling(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test that errors during scan return empty dict."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
side_effect=Exception("Provider error")
|
||
):
|
||
result = scanner.scan_single_series(
|
||
key="test-anime",
|
||
folder="Test Folder"
|
||
)
|
||
|
||
# Should return empty dict on error
|
||
assert result == {}
|
||
|
||
def test_scan_single_series_no_missing_episodes(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test scan when no episodes are missing."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({}, "aniworld.to")
|
||
):
|
||
result = scanner.scan_single_series(
|
||
key="complete-anime",
|
||
folder="Complete Anime"
|
||
)
|
||
|
||
assert result == {}
|
||
assert "complete-anime" in scanner.keyDict
|
||
assert scanner.keyDict["complete-anime"].episodeDict == {}
|
||
|
||
def test_scan_single_series_with_existing_files(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Test scan with existing MP4 files in folder."""
|
||
# Create folder with some files
|
||
anime_folder = os.path.join(temp_directory, "Test Anime")
|
||
os.makedirs(anime_folder, exist_ok=True)
|
||
season_folder = os.path.join(anime_folder, "Season 1")
|
||
os.makedirs(season_folder, exist_ok=True)
|
||
|
||
# Create dummy MP4 files
|
||
for ep in [1, 2, 3]:
|
||
mp4_path = os.path.join(
|
||
season_folder, f"Test Anime - S01E{ep:03d} - (German Dub).mp4"
|
||
)
|
||
with open(mp4_path, "w") as f:
|
||
f.write("dummy")
|
||
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
|
||
# Mock to return missing episodes (4, 5, 6)
|
||
with patch.object(
|
||
scanner,
|
||
'_SerieScanner__get_missing_episodes_and_season',
|
||
return_value=({1: [4, 5, 6]}, "aniworld.to")
|
||
):
|
||
result = scanner.scan_single_series(
|
||
key="test-anime",
|
||
folder="Test Anime"
|
||
)
|
||
|
||
# Should only show missing episodes
|
||
assert result == {1: [4, 5, 6]}
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
# New coverage tests – events, year extraction, find_mp4, read_data
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
class TestEventSubscription:
|
||
"""Test subscribe/unsubscribe for all event types."""
|
||
|
||
def test_subscribe_on_progress(self, temp_directory, mock_loader):
|
||
"""Should add handler to on_progress."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.subscribe_on_progress(handler)
|
||
assert handler in scanner.events.on_progress
|
||
|
||
def test_unsubscribe_on_progress(self, temp_directory, mock_loader):
|
||
"""Should remove handler from on_progress."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.subscribe_on_progress(handler)
|
||
scanner.unsubscribe_on_progress(handler)
|
||
assert handler not in scanner.events.on_progress
|
||
|
||
def test_subscribe_duplicate_ignored(self, temp_directory, mock_loader):
|
||
"""Subscribing same handler twice should not duplicate."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.subscribe_on_progress(handler)
|
||
scanner.subscribe_on_progress(handler)
|
||
assert scanner.events.on_progress.count(handler) == 1
|
||
|
||
def test_unsubscribe_missing_handler_noop(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Unsubscribing unknown handler should not raise."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.unsubscribe_on_progress(handler) # should not raise
|
||
|
||
def test_subscribe_on_error(self, temp_directory, mock_loader):
|
||
"""Should add handler to on_error."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.subscribe_on_error(handler)
|
||
assert handler in scanner.events.on_error
|
||
|
||
def test_unsubscribe_on_error(self, temp_directory, mock_loader):
|
||
"""Should remove handler from on_error."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.subscribe_on_error(handler)
|
||
scanner.unsubscribe_on_error(handler)
|
||
assert handler not in scanner.events.on_error
|
||
|
||
def test_subscribe_on_completion(self, temp_directory, mock_loader):
|
||
"""Should add handler to on_completion."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.subscribe_on_completion(handler)
|
||
assert handler in scanner.events.on_completion
|
||
|
||
def test_unsubscribe_on_completion(self, temp_directory, mock_loader):
|
||
"""Should remove handler from on_completion."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.subscribe_on_completion(handler)
|
||
scanner.unsubscribe_on_completion(handler)
|
||
assert handler not in scanner.events.on_completion
|
||
|
||
|
||
class TestExtractYearFromFolderName:
|
||
"""Test _extract_year_from_folder_name."""
|
||
|
||
def test_extracts_year(self, temp_directory, mock_loader):
|
||
"""Should extract year from folder like 'Title (2025)'."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
assert scanner._extract_year_from_folder_name("Dororo (2025)") == 2025
|
||
|
||
def test_no_year_returns_none(self, temp_directory, mock_loader):
|
||
"""Folder without year returns None."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
assert scanner._extract_year_from_folder_name("Dororo") is None
|
||
|
||
def test_empty_string_returns_none(self, temp_directory, mock_loader):
|
||
"""Empty string returns None."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
assert scanner._extract_year_from_folder_name("") is None
|
||
|
||
def test_none_returns_none(self, temp_directory, mock_loader):
|
||
"""None input returns None."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
assert scanner._extract_year_from_folder_name(None) is None
|
||
|
||
def test_year_out_of_range_returns_none(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Year outside 1900-2100 returns None."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
assert scanner._extract_year_from_folder_name("Title (1800)") is None
|
||
assert scanner._extract_year_from_folder_name("Title (2200)") is None
|
||
|
||
def test_year_in_middle(self, temp_directory, mock_loader):
|
||
"""Year in the middle of folder name should be extracted."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
assert (
|
||
scanner._extract_year_from_folder_name("Title (2020) - Extra")
|
||
== 2020
|
||
)
|
||
|
||
|
||
class TestSafeCallEvent:
|
||
"""Test _safe_call_event method."""
|
||
|
||
def test_calls_handler(self, temp_directory, mock_loader):
|
||
"""Handler should be called with data."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock()
|
||
scanner.events.on_progress = [handler]
|
||
scanner._safe_call_event(scanner.events.on_progress, {"test": True})
|
||
handler.assert_called_once_with({"test": True})
|
||
|
||
def test_handler_error_swallowed(self, temp_directory, mock_loader):
|
||
"""Handler exceptions should be swallowed."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
handler = MagicMock(side_effect=Exception("boom"))
|
||
scanner.events.on_progress = [handler]
|
||
# Should not raise
|
||
scanner._safe_call_event(scanner.events.on_progress, {"test": True})
|
||
|
||
def test_empty_handler_list_noop(self, temp_directory, mock_loader):
|
||
"""Empty handler list should not raise."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
scanner.events.on_progress = []
|
||
scanner._safe_call_event(scanner.events.on_progress, {"test": True})
|
||
|
||
|
||
class TestFindMp4Files:
|
||
"""Test __find_mp4_files method."""
|
||
|
||
def test_finds_mp4_files(self, temp_directory, mock_loader):
|
||
"""Should yield folders with mp4 files."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
result = list(scanner._SerieScanner__find_mp4_files())
|
||
# temp_directory has "Attack on Titan (2013)" with one mp4
|
||
assert len(result) >= 1
|
||
folder, mp4s = result[0]
|
||
assert folder == "Attack on Titan (2013)"
|
||
assert len(mp4s) == 1
|
||
|
||
def test_empty_directory(self, mock_loader):
|
||
"""Should yield nothing for empty directory."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
scanner = SerieScanner(tmpdir, mock_loader)
|
||
result = list(scanner._SerieScanner__find_mp4_files())
|
||
assert len(result) == 0
|
||
|
||
def test_nested_mp4_files(self, mock_loader):
|
||
"""Should find mp4 files in subdirectories."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
# Create nested structure
|
||
anime = os.path.join(tmpdir, "Naruto")
|
||
season = os.path.join(anime, "Season 1")
|
||
os.makedirs(season)
|
||
with open(os.path.join(season, "ep1.mp4"), "w") as f:
|
||
f.write("dummy")
|
||
|
||
scanner = SerieScanner(tmpdir, mock_loader)
|
||
result = list(scanner._SerieScanner__find_mp4_files())
|
||
assert len(result) == 1
|
||
assert "Naruto" == result[0][0]
|
||
assert len(result[0][1]) == 1
|
||
|
||
def test_non_mp4_ignored(self, mock_loader):
|
||
"""Should ignore non-mp4 files."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
anime = os.path.join(tmpdir, "TestAnime")
|
||
os.makedirs(anime)
|
||
with open(os.path.join(anime, "readme.txt"), "w") as f:
|
||
f.write("not a video")
|
||
|
||
scanner = SerieScanner(tmpdir, mock_loader)
|
||
result = list(scanner._SerieScanner__find_mp4_files())
|
||
# The folder is yielded but with empty mp4 list
|
||
assert len(result) == 1
|
||
assert result[0][1] == []
|
||
|
||
|
||
class TestReadDataFromFile:
|
||
"""Test __read_data_from_file method."""
|
||
|
||
def test_empty_folder_name_returns_none(self, temp_directory, mock_loader):
|
||
"""Empty folder name -> returns None (no DB lookup attempted)."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
result = scanner._SerieScanner__read_data_from_file("")
|
||
assert result is None
|
||
|
||
def test_nonexistent_folder_no_exception(self, temp_directory, mock_loader):
|
||
"""Folder doesn't exist -> returns None without raising."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
result = scanner._SerieScanner__read_data_from_file("Nonexistent Folder")
|
||
assert result is None
|
||
"""Test reinit method."""
|
||
|
||
def test_clears_keydict(self, temp_directory, mock_loader):
|
||
"""reinit should clear the keyDict."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
scanner.keyDict["test"] = MagicMock()
|
||
scanner.reinit()
|
||
assert scanner.keyDict == {}
|
||
|
||
|
||
class TestGetTotalToScan:
|
||
"""Test get_total_to_scan."""
|
||
|
||
def test_counts_folders(self, temp_directory, mock_loader):
|
||
"""Should count number of folders."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
count = scanner.get_total_to_scan()
|
||
assert count >= 1
|
||
|
||
def test_empty_directory(self, mock_loader):
|
||
"""Should return 0 for empty directory."""
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
scanner = SerieScanner(tmpdir, mock_loader)
|
||
assert scanner.get_total_to_scan() == 0
|
||
|
||
|
||
class TestScanProgressEvents:
|
||
"""Test that scan emits progress and completion events."""
|
||
|
||
def test_scan_emits_progress(self, temp_directory, mock_loader):
|
||
"""Should emit on_progress during scan."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
progress_handler = MagicMock()
|
||
scanner.subscribe_on_progress(progress_handler)
|
||
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=0), \
|
||
patch.object(
|
||
scanner, '_SerieScanner__find_mp4_files',
|
||
return_value=iter([])
|
||
):
|
||
scanner.scan()
|
||
|
||
# At minimum, STARTING event should fire
|
||
assert progress_handler.call_count >= 1
|
||
first_call = progress_handler.call_args_list[0][0][0]
|
||
assert first_call["phase"] == "STARTING"
|
||
|
||
def test_scan_emits_completion(self, temp_directory, mock_loader):
|
||
"""Should emit on_completion after scan."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
completion_handler = MagicMock()
|
||
scanner.subscribe_on_completion(completion_handler)
|
||
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=0), \
|
||
patch.object(
|
||
scanner, '_SerieScanner__find_mp4_files',
|
||
return_value=iter([])
|
||
):
|
||
scanner.scan()
|
||
|
||
completion_handler.assert_called_once()
|
||
call_data = completion_handler.call_args[0][0]
|
||
assert call_data["success"] is True
|
||
|
||
def test_scan_emits_error(
|
||
self, temp_directory, mock_loader
|
||
):
|
||
"""Should emit on_error when an exception occurs."""
|
||
scanner = SerieScanner(temp_directory, mock_loader)
|
||
error_handler = MagicMock()
|
||
scanner.subscribe_on_error(error_handler)
|
||
|
||
with patch.object(scanner, 'get_total_to_scan', return_value=1), \
|
||
patch.object(
|
||
scanner, '_SerieScanner__find_mp4_files',
|
||
return_value=iter([("BadFolder", ["e1.mp4"])])
|
||
), \
|
||
patch.object(
|
||
scanner, '_SerieScanner__read_data_from_file',
|
||
side_effect=RuntimeError("DB error"),
|
||
):
|
||
scanner.scan()
|
||
|
||
error_handler.assert_called_once()
|
||
call_data = error_handler.call_args[0][0]
|
||
assert call_data["recoverable"] is True
|
||
|
||
|
||
|