Files
Aniworld/tests/unit/test_serie_scanner.py
Lukas e84a220f55 Expand test coverage: ~188 new tests across 6 critical files
- Fix failing test_authenticated_request_succeeds (dependency override)
- Expand test_anime_service.py (+35 tests: status events, DB, broadcasts)
- Create test_queue_repository.py (27 tests: CRUD, model conversion)
- Expand test_enhanced_provider.py (+24 tests: fetch, download, redirect)
- Expand test_serie_scanner.py (+25 tests: events, year extract, mp4 scan)
- Create test_database_connection.py (38 tests: sessions, transactions)
- Expand test_anime_endpoints.py (+39 tests: status, search, loading)
- Clean up docs/instructions.md TODO list
2026-02-15 17:49:12 +01:00

654 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Tests for SerieScanner class - file-based operations."""
import os
import tempfile
from unittest.mock import MagicMock, patch
import pytest
from src.core.entities.series import Serie
from src.core.SerieScanner import SerieScanner
@pytest.fixture
def temp_directory():
"""Create a temporary directory with subdirectories for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create an anime folder with an mp4 file
anime_folder = os.path.join(tmpdir, "Attack on Titan (2013)")
os.makedirs(anime_folder, exist_ok=True)
# Create a dummy mp4 file
mp4_path = os.path.join(
anime_folder, "Attack on Titan - S01E001 - (German Dub).mp4"
)
with open(mp4_path, "w") as f:
f.write("dummy mp4")
yield tmpdir
@pytest.fixture
def mock_loader():
"""Create a mock Loader instance."""
loader = MagicMock()
loader.get_season_episode_count = MagicMock(return_value={1: 25})
loader.is_language = MagicMock(return_value=True)
return loader
@pytest.fixture
def sample_serie():
"""Create a sample Serie for testing."""
return Serie(
key="attack-on-titan",
name="Attack on Titan",
site="aniworld.to",
folder="Attack on Titan (2013)",
episodeDict={1: [2, 3, 4]}
)
class TestSerieScannerInitialization:
"""Test SerieScanner initialization."""
def test_init_success(self, temp_directory, mock_loader):
"""Test successful initialization."""
scanner = SerieScanner(temp_directory, mock_loader)
assert scanner.directory == os.path.abspath(temp_directory)
assert scanner.loader == mock_loader
assert scanner.keyDict == {}
def test_init_empty_path_raises_error(self, mock_loader):
"""Test initialization with empty path raises ValueError."""
with pytest.raises(ValueError, match="empty"):
SerieScanner("", mock_loader)
def test_init_nonexistent_path_raises_error(self, mock_loader):
"""Test initialization with non-existent path raises ValueError."""
with pytest.raises(ValueError, match="does not exist"):
SerieScanner("/nonexistent/path", mock_loader)
class TestSerieScannerScan:
"""Test file-based scan operations."""
def test_file_based_scan_works(
self, temp_directory, mock_loader, sample_serie
):
"""Test file-based scan works properly."""
scanner = SerieScanner(temp_directory, mock_loader)
with patch.object(scanner, 'get_total_to_scan', return_value=1):
with patch.object(
scanner,
'_SerieScanner__find_mp4_files',
return_value=iter([
("Attack on Titan (2013)", ["S01E001.mp4"])
])
):
with patch.object(
scanner,
'_SerieScanner__read_data_from_file',
return_value=sample_serie
):
with patch.object(
scanner,
'_SerieScanner__get_missing_episodes_and_season',
return_value=({1: [2, 3]}, "aniworld.to")
):
with patch.object(
sample_serie, 'save_to_file'
) as mock_save:
scanner.scan()
# Verify file was saved
mock_save.assert_called_once()
def test_keydict_populated_after_scan(
self, temp_directory, mock_loader, sample_serie
):
"""Test keyDict is populated after scan."""
scanner = SerieScanner(temp_directory, mock_loader)
with patch.object(scanner, 'get_total_to_scan', return_value=1):
with patch.object(
scanner,
'_SerieScanner__find_mp4_files',
return_value=iter([
("Attack on Titan (2013)", ["S01E001.mp4"])
])
):
with patch.object(
scanner,
'_SerieScanner__read_data_from_file',
return_value=sample_serie
):
with patch.object(
scanner,
'_SerieScanner__get_missing_episodes_and_season',
return_value=({1: [2, 3]}, "aniworld.to")
):
with patch.object(sample_serie, 'save_to_file'):
scanner.scan()
assert sample_serie.key in scanner.keyDict
class TestSerieScannerSingleSeries:
"""Test scan_single_series method for targeted scanning."""
def test_scan_single_series_basic(
self, temp_directory, mock_loader
):
"""Test basic scan_single_series functionality."""
scanner = SerieScanner(temp_directory, mock_loader)
# Mock the missing episodes calculation
with patch.object(
scanner,
'_SerieScanner__get_missing_episodes_and_season',
return_value=({1: [5, 6, 7], 2: [1, 2]}, "aniworld.to")
):
result = scanner.scan_single_series(
key="attack-on-titan",
folder="Attack on Titan (2013)"
)
# Verify result structure
assert isinstance(result, dict)
assert 1 in result
assert 2 in result
assert result[1] == [5, 6, 7]
assert result[2] == [1, 2]
def test_scan_single_series_updates_keydict(
self, temp_directory, mock_loader
):
"""Test that scan_single_series updates keyDict."""
scanner = SerieScanner(temp_directory, mock_loader)
with patch.object(
scanner,
'_SerieScanner__get_missing_episodes_and_season',
return_value=({1: [1, 2, 3]}, "aniworld.to")
):
scanner.scan_single_series(
key="test-anime",
folder="Test Anime"
)
# Verify keyDict was updated
assert "test-anime" in scanner.keyDict
assert scanner.keyDict["test-anime"].episodeDict == {1: [1, 2, 3]}
def test_scan_single_series_existing_entry(
self, temp_directory, mock_loader, sample_serie
):
"""Test scan_single_series updates existing entry in keyDict."""
scanner = SerieScanner(temp_directory, mock_loader)
# Pre-populate keyDict
scanner.keyDict[sample_serie.key] = sample_serie
old_episode_dict = sample_serie.episodeDict.copy()
with patch.object(
scanner,
'_SerieScanner__get_missing_episodes_and_season',
return_value=({1: [10, 11, 12]}, "aniworld.to")
):
scanner.scan_single_series(
key=sample_serie.key,
folder=sample_serie.folder
)
# Verify existing entry was updated
assert scanner.keyDict[sample_serie.key].episodeDict != old_episode_dict
assert scanner.keyDict[sample_serie.key].episodeDict == {1: [10, 11, 12]}
def test_scan_single_series_empty_key_raises_error(
self, temp_directory, mock_loader
):
"""Test that empty key raises ValueError."""
scanner = SerieScanner(temp_directory, mock_loader)
with pytest.raises(ValueError, match="key cannot be empty"):
scanner.scan_single_series(key="", folder="Test Folder")
def test_scan_single_series_empty_folder_raises_error(
self, temp_directory, mock_loader
):
"""Test that empty folder raises ValueError."""
scanner = SerieScanner(temp_directory, mock_loader)
with pytest.raises(ValueError, match="folder cannot be empty"):
scanner.scan_single_series(key="test-key", folder="")
def test_scan_single_series_nonexistent_folder(
self, temp_directory, mock_loader
):
"""Test scanning a series with non-existent folder."""
scanner = SerieScanner(temp_directory, mock_loader)
# Mock to return some episodes (as if from provider)
with patch.object(
scanner,
'_SerieScanner__get_missing_episodes_and_season',
return_value=({1: [1, 2, 3, 4, 5]}, "aniworld.to")
):
result = scanner.scan_single_series(
key="new-anime",
folder="NonExistent Folder"
)
# Should still return missing episodes from provider
assert result == {1: [1, 2, 3, 4, 5]}
def test_scan_single_series_error_handling(
self, temp_directory, mock_loader
):
"""Test that errors during scan return empty dict."""
scanner = SerieScanner(temp_directory, mock_loader)
with patch.object(
scanner,
'_SerieScanner__get_missing_episodes_and_season',
side_effect=Exception("Provider error")
):
result = scanner.scan_single_series(
key="test-anime",
folder="Test Folder"
)
# Should return empty dict on error
assert result == {}
def test_scan_single_series_no_missing_episodes(
self, temp_directory, mock_loader
):
"""Test scan when no episodes are missing."""
scanner = SerieScanner(temp_directory, mock_loader)
with patch.object(
scanner,
'_SerieScanner__get_missing_episodes_and_season',
return_value=({}, "aniworld.to")
):
result = scanner.scan_single_series(
key="complete-anime",
folder="Complete Anime"
)
assert result == {}
assert "complete-anime" in scanner.keyDict
assert scanner.keyDict["complete-anime"].episodeDict == {}
def test_scan_single_series_with_existing_files(
self, temp_directory, mock_loader
):
"""Test scan with existing MP4 files in folder."""
# Create folder with some files
anime_folder = os.path.join(temp_directory, "Test Anime")
os.makedirs(anime_folder, exist_ok=True)
season_folder = os.path.join(anime_folder, "Season 1")
os.makedirs(season_folder, exist_ok=True)
# Create dummy MP4 files
for ep in [1, 2, 3]:
mp4_path = os.path.join(
season_folder, f"Test Anime - S01E{ep:03d} - (German Dub).mp4"
)
with open(mp4_path, "w") as f:
f.write("dummy")
scanner = SerieScanner(temp_directory, mock_loader)
# Mock to return missing episodes (4, 5, 6)
with patch.object(
scanner,
'_SerieScanner__get_missing_episodes_and_season',
return_value=({1: [4, 5, 6]}, "aniworld.to")
):
result = scanner.scan_single_series(
key="test-anime",
folder="Test Anime"
)
# Should only show missing episodes
assert result == {1: [4, 5, 6]}
# ══════════════════════════════════════════════════════════════════════════════
# New coverage tests events, year extraction, find_mp4, read_data
# ══════════════════════════════════════════════════════════════════════════════
class TestEventSubscription:
"""Test subscribe/unsubscribe for all event types."""
def test_subscribe_on_progress(self, temp_directory, mock_loader):
"""Should add handler to on_progress."""
scanner = SerieScanner(temp_directory, mock_loader)
handler = MagicMock()
scanner.subscribe_on_progress(handler)
assert handler in scanner.events.on_progress
def test_unsubscribe_on_progress(self, temp_directory, mock_loader):
"""Should remove handler from on_progress."""
scanner = SerieScanner(temp_directory, mock_loader)
handler = MagicMock()
scanner.subscribe_on_progress(handler)
scanner.unsubscribe_on_progress(handler)
assert handler not in scanner.events.on_progress
def test_subscribe_duplicate_ignored(self, temp_directory, mock_loader):
"""Subscribing same handler twice should not duplicate."""
scanner = SerieScanner(temp_directory, mock_loader)
handler = MagicMock()
scanner.subscribe_on_progress(handler)
scanner.subscribe_on_progress(handler)
assert scanner.events.on_progress.count(handler) == 1
def test_unsubscribe_missing_handler_noop(
self, temp_directory, mock_loader
):
"""Unsubscribing unknown handler should not raise."""
scanner = SerieScanner(temp_directory, mock_loader)
handler = MagicMock()
scanner.unsubscribe_on_progress(handler) # should not raise
def test_subscribe_on_error(self, temp_directory, mock_loader):
"""Should add handler to on_error."""
scanner = SerieScanner(temp_directory, mock_loader)
handler = MagicMock()
scanner.subscribe_on_error(handler)
assert handler in scanner.events.on_error
def test_unsubscribe_on_error(self, temp_directory, mock_loader):
"""Should remove handler from on_error."""
scanner = SerieScanner(temp_directory, mock_loader)
handler = MagicMock()
scanner.subscribe_on_error(handler)
scanner.unsubscribe_on_error(handler)
assert handler not in scanner.events.on_error
def test_subscribe_on_completion(self, temp_directory, mock_loader):
"""Should add handler to on_completion."""
scanner = SerieScanner(temp_directory, mock_loader)
handler = MagicMock()
scanner.subscribe_on_completion(handler)
assert handler in scanner.events.on_completion
def test_unsubscribe_on_completion(self, temp_directory, mock_loader):
"""Should remove handler from on_completion."""
scanner = SerieScanner(temp_directory, mock_loader)
handler = MagicMock()
scanner.subscribe_on_completion(handler)
scanner.unsubscribe_on_completion(handler)
assert handler not in scanner.events.on_completion
class TestExtractYearFromFolderName:
"""Test _extract_year_from_folder_name."""
def test_extracts_year(self, temp_directory, mock_loader):
"""Should extract year from folder like 'Title (2025)'."""
scanner = SerieScanner(temp_directory, mock_loader)
assert scanner._extract_year_from_folder_name("Dororo (2025)") == 2025
def test_no_year_returns_none(self, temp_directory, mock_loader):
"""Folder without year returns None."""
scanner = SerieScanner(temp_directory, mock_loader)
assert scanner._extract_year_from_folder_name("Dororo") is None
def test_empty_string_returns_none(self, temp_directory, mock_loader):
"""Empty string returns None."""
scanner = SerieScanner(temp_directory, mock_loader)
assert scanner._extract_year_from_folder_name("") is None
def test_none_returns_none(self, temp_directory, mock_loader):
"""None input returns None."""
scanner = SerieScanner(temp_directory, mock_loader)
assert scanner._extract_year_from_folder_name(None) is None
def test_year_out_of_range_returns_none(
self, temp_directory, mock_loader
):
"""Year outside 1900-2100 returns None."""
scanner = SerieScanner(temp_directory, mock_loader)
assert scanner._extract_year_from_folder_name("Title (1800)") is None
assert scanner._extract_year_from_folder_name("Title (2200)") is None
def test_year_in_middle(self, temp_directory, mock_loader):
"""Year in the middle of folder name should be extracted."""
scanner = SerieScanner(temp_directory, mock_loader)
assert (
scanner._extract_year_from_folder_name("Title (2020) - Extra")
== 2020
)
class TestSafeCallEvent:
"""Test _safe_call_event method."""
def test_calls_handler(self, temp_directory, mock_loader):
"""Handler should be called with data."""
scanner = SerieScanner(temp_directory, mock_loader)
handler = MagicMock()
scanner.events.on_progress = [handler]
scanner._safe_call_event(scanner.events.on_progress, {"test": True})
handler.assert_called_once_with({"test": True})
def test_handler_error_swallowed(self, temp_directory, mock_loader):
"""Handler exceptions should be swallowed."""
scanner = SerieScanner(temp_directory, mock_loader)
handler = MagicMock(side_effect=Exception("boom"))
scanner.events.on_progress = [handler]
# Should not raise
scanner._safe_call_event(scanner.events.on_progress, {"test": True})
def test_empty_handler_list_noop(self, temp_directory, mock_loader):
"""Empty handler list should not raise."""
scanner = SerieScanner(temp_directory, mock_loader)
scanner.events.on_progress = []
scanner._safe_call_event(scanner.events.on_progress, {"test": True})
class TestFindMp4Files:
"""Test __find_mp4_files method."""
def test_finds_mp4_files(self, temp_directory, mock_loader):
"""Should yield folders with mp4 files."""
scanner = SerieScanner(temp_directory, mock_loader)
result = list(scanner._SerieScanner__find_mp4_files())
# temp_directory has "Attack on Titan (2013)" with one mp4
assert len(result) >= 1
folder, mp4s = result[0]
assert folder == "Attack on Titan (2013)"
assert len(mp4s) == 1
def test_empty_directory(self, mock_loader):
"""Should yield nothing for empty directory."""
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
scanner = SerieScanner(tmpdir, mock_loader)
result = list(scanner._SerieScanner__find_mp4_files())
assert len(result) == 0
def test_nested_mp4_files(self, mock_loader):
"""Should find mp4 files in subdirectories."""
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
# Create nested structure
anime = os.path.join(tmpdir, "Naruto")
season = os.path.join(anime, "Season 1")
os.makedirs(season)
with open(os.path.join(season, "ep1.mp4"), "w") as f:
f.write("dummy")
scanner = SerieScanner(tmpdir, mock_loader)
result = list(scanner._SerieScanner__find_mp4_files())
assert len(result) == 1
assert "Naruto" == result[0][0]
assert len(result[0][1]) == 1
def test_non_mp4_ignored(self, mock_loader):
"""Should ignore non-mp4 files."""
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
anime = os.path.join(tmpdir, "TestAnime")
os.makedirs(anime)
with open(os.path.join(anime, "readme.txt"), "w") as f:
f.write("not a video")
scanner = SerieScanner(tmpdir, mock_loader)
result = list(scanner._SerieScanner__find_mp4_files())
# The folder is yielded but with empty mp4 list
assert len(result) == 1
assert result[0][1] == []
class TestReadDataFromFile:
"""Test __read_data_from_file method."""
def test_reads_key_file(self, mock_loader):
"""Should read key from 'key' file."""
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
anime_folder = os.path.join(tmpdir, "SomeAnime")
os.makedirs(anime_folder)
with open(os.path.join(anime_folder, "key"), "w") as f:
f.write("some-key")
scanner = SerieScanner(tmpdir, mock_loader)
result = scanner._SerieScanner__read_data_from_file("SomeAnime")
assert result is not None
assert result.key == "some-key"
def test_reads_data_file(self, mock_loader):
"""Should read Serie from 'data' file when no 'key' file."""
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
anime_folder = os.path.join(tmpdir, "SomeAnime")
os.makedirs(anime_folder)
# Create a data file
serie = Serie("test-key", "Test", "aniworld.to", "SomeAnime", {})
data_path = os.path.join(anime_folder, "data")
serie.save_to_file(data_path)
scanner = SerieScanner(tmpdir, mock_loader)
result = scanner._SerieScanner__read_data_from_file("SomeAnime")
assert result is not None
assert result.key == "test-key"
def test_no_files_returns_none(self, mock_loader):
"""Should return None when no key or data file exists."""
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
anime_folder = os.path.join(tmpdir, "Empty")
os.makedirs(anime_folder)
scanner = SerieScanner(tmpdir, mock_loader)
result = scanner._SerieScanner__read_data_from_file("Empty")
assert result is None
class TestReinit:
"""Test reinit method."""
def test_clears_keydict(self, temp_directory, mock_loader):
"""reinit should clear the keyDict."""
scanner = SerieScanner(temp_directory, mock_loader)
scanner.keyDict["test"] = MagicMock()
scanner.reinit()
assert scanner.keyDict == {}
class TestGetTotalToScan:
"""Test get_total_to_scan."""
def test_counts_folders(self, temp_directory, mock_loader):
"""Should count number of folders."""
scanner = SerieScanner(temp_directory, mock_loader)
count = scanner.get_total_to_scan()
assert count >= 1
def test_empty_directory(self, mock_loader):
"""Should return 0 for empty directory."""
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
scanner = SerieScanner(tmpdir, mock_loader)
assert scanner.get_total_to_scan() == 0
class TestScanProgressEvents:
"""Test that scan emits progress and completion events."""
def test_scan_emits_progress(self, temp_directory, mock_loader):
"""Should emit on_progress during scan."""
scanner = SerieScanner(temp_directory, mock_loader)
progress_handler = MagicMock()
scanner.subscribe_on_progress(progress_handler)
with patch.object(scanner, 'get_total_to_scan', return_value=0), \
patch.object(
scanner, '_SerieScanner__find_mp4_files',
return_value=iter([])
):
scanner.scan()
# At minimum, STARTING event should fire
assert progress_handler.call_count >= 1
first_call = progress_handler.call_args_list[0][0][0]
assert first_call["phase"] == "STARTING"
def test_scan_emits_completion(self, temp_directory, mock_loader):
"""Should emit on_completion after scan."""
scanner = SerieScanner(temp_directory, mock_loader)
completion_handler = MagicMock()
scanner.subscribe_on_completion(completion_handler)
with patch.object(scanner, 'get_total_to_scan', return_value=0), \
patch.object(
scanner, '_SerieScanner__find_mp4_files',
return_value=iter([])
):
scanner.scan()
completion_handler.assert_called_once()
call_data = completion_handler.call_args[0][0]
assert call_data["success"] is True
def test_scan_emits_error_on_no_key(
self, temp_directory, mock_loader
):
"""Should emit on_error when NoKeyFoundException occurs."""
from src.core.exceptions.Exceptions import NoKeyFoundException
scanner = SerieScanner(temp_directory, mock_loader)
error_handler = MagicMock()
scanner.subscribe_on_error(error_handler)
with patch.object(scanner, 'get_total_to_scan', return_value=1), \
patch.object(
scanner, '_SerieScanner__find_mp4_files',
return_value=iter([("BadFolder", ["e1.mp4"])])
), \
patch.object(
scanner, '_SerieScanner__read_data_from_file',
side_effect=NoKeyFoundException("no key"),
):
scanner.scan()
error_handler.assert_called_once()
call_data = error_handler.call_args[0][0]
assert call_data["recoverable"] is True