"""Tests for SerieScanner class - file-based operations.""" import os import tempfile from unittest.mock import MagicMock, patch import pytest from src.core.entities.series import Serie from src.core.SerieScanner import SerieScanner @pytest.fixture def temp_directory(): """Create a temporary directory with subdirectories for testing.""" with tempfile.TemporaryDirectory() as tmpdir: # Create an anime folder with an mp4 file anime_folder = os.path.join(tmpdir, "Attack on Titan (2013)") os.makedirs(anime_folder, exist_ok=True) # Create a dummy mp4 file mp4_path = os.path.join( anime_folder, "Attack on Titan - S01E001 - (German Dub).mp4" ) with open(mp4_path, "w") as f: f.write("dummy mp4") yield tmpdir @pytest.fixture def mock_loader(): """Create a mock Loader instance.""" loader = MagicMock() loader.get_season_episode_count = MagicMock(return_value={1: 25}) loader.is_language = MagicMock(return_value=True) return loader @pytest.fixture def sample_serie(): """Create a sample Serie for testing.""" return Serie( key="attack-on-titan", name="Attack on Titan", site="aniworld.to", folder="Attack on Titan (2013)", episodeDict={1: [2, 3, 4]} ) class TestSerieScannerInitialization: """Test SerieScanner initialization.""" def test_init_success(self, temp_directory, mock_loader): """Test successful initialization.""" scanner = SerieScanner(temp_directory, mock_loader) assert scanner.directory == os.path.abspath(temp_directory) assert scanner.loader == mock_loader assert scanner.keyDict == {} def test_init_empty_path_raises_error(self, mock_loader): """Test initialization with empty path raises ValueError.""" with pytest.raises(ValueError, match="empty"): SerieScanner("", mock_loader) def test_init_nonexistent_path_raises_error(self, mock_loader): """Test initialization with non-existent path raises ValueError.""" with pytest.raises(ValueError, match="does not exist"): SerieScanner("/nonexistent/path", mock_loader) class TestSerieScannerScan: """Test file-based scan operations.""" def test_file_based_scan_works( self, temp_directory, mock_loader, sample_serie ): """Test file-based scan works properly.""" scanner = SerieScanner(temp_directory, mock_loader) with patch.object(scanner, 'get_total_to_scan', return_value=1): with patch.object( scanner, '_SerieScanner__find_mp4_files', return_value=iter([ ("Attack on Titan (2013)", ["S01E001.mp4"]) ]) ): with patch.object( scanner, '_SerieScanner__read_data_from_file', return_value=sample_serie ): with patch.object( scanner, '_SerieScanner__get_missing_episodes_and_season', return_value=({1: [2, 3]}, "aniworld.to") ): with patch.object( sample_serie, 'save_to_file' ) as mock_save: scanner.scan() # Verify file was saved mock_save.assert_called_once() def test_keydict_populated_after_scan( self, temp_directory, mock_loader, sample_serie ): """Test keyDict is populated after scan.""" scanner = SerieScanner(temp_directory, mock_loader) with patch.object(scanner, 'get_total_to_scan', return_value=1): with patch.object( scanner, '_SerieScanner__find_mp4_files', return_value=iter([ ("Attack on Titan (2013)", ["S01E001.mp4"]) ]) ): with patch.object( scanner, '_SerieScanner__read_data_from_file', return_value=sample_serie ): with patch.object( scanner, '_SerieScanner__get_missing_episodes_and_season', return_value=({1: [2, 3]}, "aniworld.to") ): with patch.object(sample_serie, 'save_to_file'): scanner.scan() assert sample_serie.key in scanner.keyDict class TestSerieScannerSingleSeries: """Test scan_single_series method for targeted scanning.""" def test_scan_single_series_basic( self, temp_directory, mock_loader ): """Test basic scan_single_series functionality.""" scanner = SerieScanner(temp_directory, mock_loader) # Mock the missing episodes calculation with patch.object( scanner, '_SerieScanner__get_missing_episodes_and_season', return_value=({1: [5, 6, 7], 2: [1, 2]}, "aniworld.to") ): result = scanner.scan_single_series( key="attack-on-titan", folder="Attack on Titan (2013)" ) # Verify result structure assert isinstance(result, dict) assert 1 in result assert 2 in result assert result[1] == [5, 6, 7] assert result[2] == [1, 2] def test_scan_single_series_updates_keydict( self, temp_directory, mock_loader ): """Test that scan_single_series updates keyDict.""" scanner = SerieScanner(temp_directory, mock_loader) with patch.object( scanner, '_SerieScanner__get_missing_episodes_and_season', return_value=({1: [1, 2, 3]}, "aniworld.to") ): scanner.scan_single_series( key="test-anime", folder="Test Anime" ) # Verify keyDict was updated assert "test-anime" in scanner.keyDict assert scanner.keyDict["test-anime"].episodeDict == {1: [1, 2, 3]} def test_scan_single_series_existing_entry( self, temp_directory, mock_loader, sample_serie ): """Test scan_single_series updates existing entry in keyDict.""" scanner = SerieScanner(temp_directory, mock_loader) # Pre-populate keyDict scanner.keyDict[sample_serie.key] = sample_serie old_episode_dict = sample_serie.episodeDict.copy() with patch.object( scanner, '_SerieScanner__get_missing_episodes_and_season', return_value=({1: [10, 11, 12]}, "aniworld.to") ): scanner.scan_single_series( key=sample_serie.key, folder=sample_serie.folder ) # Verify existing entry was updated assert scanner.keyDict[sample_serie.key].episodeDict != old_episode_dict assert scanner.keyDict[sample_serie.key].episodeDict == {1: [10, 11, 12]} def test_scan_single_series_empty_key_raises_error( self, temp_directory, mock_loader ): """Test that empty key raises ValueError.""" scanner = SerieScanner(temp_directory, mock_loader) with pytest.raises(ValueError, match="key cannot be empty"): scanner.scan_single_series(key="", folder="Test Folder") def test_scan_single_series_empty_folder_raises_error( self, temp_directory, mock_loader ): """Test that empty folder raises ValueError.""" scanner = SerieScanner(temp_directory, mock_loader) with pytest.raises(ValueError, match="folder cannot be empty"): scanner.scan_single_series(key="test-key", folder="") def test_scan_single_series_nonexistent_folder( self, temp_directory, mock_loader ): """Test scanning a series with non-existent folder.""" scanner = SerieScanner(temp_directory, mock_loader) # Mock to return some episodes (as if from provider) with patch.object( scanner, '_SerieScanner__get_missing_episodes_and_season', return_value=({1: [1, 2, 3, 4, 5]}, "aniworld.to") ): result = scanner.scan_single_series( key="new-anime", folder="NonExistent Folder" ) # Should still return missing episodes from provider assert result == {1: [1, 2, 3, 4, 5]} def test_scan_single_series_error_handling( self, temp_directory, mock_loader ): """Test that errors during scan return empty dict.""" scanner = SerieScanner(temp_directory, mock_loader) with patch.object( scanner, '_SerieScanner__get_missing_episodes_and_season', side_effect=Exception("Provider error") ): result = scanner.scan_single_series( key="test-anime", folder="Test Folder" ) # Should return empty dict on error assert result == {} def test_scan_single_series_no_missing_episodes( self, temp_directory, mock_loader ): """Test scan when no episodes are missing.""" scanner = SerieScanner(temp_directory, mock_loader) with patch.object( scanner, '_SerieScanner__get_missing_episodes_and_season', return_value=({}, "aniworld.to") ): result = scanner.scan_single_series( key="complete-anime", folder="Complete Anime" ) assert result == {} assert "complete-anime" in scanner.keyDict assert scanner.keyDict["complete-anime"].episodeDict == {} def test_scan_single_series_with_existing_files( self, temp_directory, mock_loader ): """Test scan with existing MP4 files in folder.""" # Create folder with some files anime_folder = os.path.join(temp_directory, "Test Anime") os.makedirs(anime_folder, exist_ok=True) season_folder = os.path.join(anime_folder, "Season 1") os.makedirs(season_folder, exist_ok=True) # Create dummy MP4 files for ep in [1, 2, 3]: mp4_path = os.path.join( season_folder, f"Test Anime - S01E{ep:03d} - (German Dub).mp4" ) with open(mp4_path, "w") as f: f.write("dummy") scanner = SerieScanner(temp_directory, mock_loader) # Mock to return missing episodes (4, 5, 6) with patch.object( scanner, '_SerieScanner__get_missing_episodes_and_season', return_value=({1: [4, 5, 6]}, "aniworld.to") ): result = scanner.scan_single_series( key="test-anime", folder="Test Anime" ) # Should only show missing episodes assert result == {1: [4, 5, 6]} # ══════════════════════════════════════════════════════════════════════════════ # New coverage tests – events, year extraction, find_mp4, read_data # ══════════════════════════════════════════════════════════════════════════════ class TestEventSubscription: """Test subscribe/unsubscribe for all event types.""" def test_subscribe_on_progress(self, temp_directory, mock_loader): """Should add handler to on_progress.""" scanner = SerieScanner(temp_directory, mock_loader) handler = MagicMock() scanner.subscribe_on_progress(handler) assert handler in scanner.events.on_progress def test_unsubscribe_on_progress(self, temp_directory, mock_loader): """Should remove handler from on_progress.""" scanner = SerieScanner(temp_directory, mock_loader) handler = MagicMock() scanner.subscribe_on_progress(handler) scanner.unsubscribe_on_progress(handler) assert handler not in scanner.events.on_progress def test_subscribe_duplicate_ignored(self, temp_directory, mock_loader): """Subscribing same handler twice should not duplicate.""" scanner = SerieScanner(temp_directory, mock_loader) handler = MagicMock() scanner.subscribe_on_progress(handler) scanner.subscribe_on_progress(handler) assert scanner.events.on_progress.count(handler) == 1 def test_unsubscribe_missing_handler_noop( self, temp_directory, mock_loader ): """Unsubscribing unknown handler should not raise.""" scanner = SerieScanner(temp_directory, mock_loader) handler = MagicMock() scanner.unsubscribe_on_progress(handler) # should not raise def test_subscribe_on_error(self, temp_directory, mock_loader): """Should add handler to on_error.""" scanner = SerieScanner(temp_directory, mock_loader) handler = MagicMock() scanner.subscribe_on_error(handler) assert handler in scanner.events.on_error def test_unsubscribe_on_error(self, temp_directory, mock_loader): """Should remove handler from on_error.""" scanner = SerieScanner(temp_directory, mock_loader) handler = MagicMock() scanner.subscribe_on_error(handler) scanner.unsubscribe_on_error(handler) assert handler not in scanner.events.on_error def test_subscribe_on_completion(self, temp_directory, mock_loader): """Should add handler to on_completion.""" scanner = SerieScanner(temp_directory, mock_loader) handler = MagicMock() scanner.subscribe_on_completion(handler) assert handler in scanner.events.on_completion def test_unsubscribe_on_completion(self, temp_directory, mock_loader): """Should remove handler from on_completion.""" scanner = SerieScanner(temp_directory, mock_loader) handler = MagicMock() scanner.subscribe_on_completion(handler) scanner.unsubscribe_on_completion(handler) assert handler not in scanner.events.on_completion class TestExtractYearFromFolderName: """Test _extract_year_from_folder_name.""" def test_extracts_year(self, temp_directory, mock_loader): """Should extract year from folder like 'Title (2025)'.""" scanner = SerieScanner(temp_directory, mock_loader) assert scanner._extract_year_from_folder_name("Dororo (2025)") == 2025 def test_no_year_returns_none(self, temp_directory, mock_loader): """Folder without year returns None.""" scanner = SerieScanner(temp_directory, mock_loader) assert scanner._extract_year_from_folder_name("Dororo") is None def test_empty_string_returns_none(self, temp_directory, mock_loader): """Empty string returns None.""" scanner = SerieScanner(temp_directory, mock_loader) assert scanner._extract_year_from_folder_name("") is None def test_none_returns_none(self, temp_directory, mock_loader): """None input returns None.""" scanner = SerieScanner(temp_directory, mock_loader) assert scanner._extract_year_from_folder_name(None) is None def test_year_out_of_range_returns_none( self, temp_directory, mock_loader ): """Year outside 1900-2100 returns None.""" scanner = SerieScanner(temp_directory, mock_loader) assert scanner._extract_year_from_folder_name("Title (1800)") is None assert scanner._extract_year_from_folder_name("Title (2200)") is None def test_year_in_middle(self, temp_directory, mock_loader): """Year in the middle of folder name should be extracted.""" scanner = SerieScanner(temp_directory, mock_loader) assert ( scanner._extract_year_from_folder_name("Title (2020) - Extra") == 2020 ) class TestSafeCallEvent: """Test _safe_call_event method.""" def test_calls_handler(self, temp_directory, mock_loader): """Handler should be called with data.""" scanner = SerieScanner(temp_directory, mock_loader) handler = MagicMock() scanner.events.on_progress = [handler] scanner._safe_call_event(scanner.events.on_progress, {"test": True}) handler.assert_called_once_with({"test": True}) def test_handler_error_swallowed(self, temp_directory, mock_loader): """Handler exceptions should be swallowed.""" scanner = SerieScanner(temp_directory, mock_loader) handler = MagicMock(side_effect=Exception("boom")) scanner.events.on_progress = [handler] # Should not raise scanner._safe_call_event(scanner.events.on_progress, {"test": True}) def test_empty_handler_list_noop(self, temp_directory, mock_loader): """Empty handler list should not raise.""" scanner = SerieScanner(temp_directory, mock_loader) scanner.events.on_progress = [] scanner._safe_call_event(scanner.events.on_progress, {"test": True}) class TestFindMp4Files: """Test __find_mp4_files method.""" def test_finds_mp4_files(self, temp_directory, mock_loader): """Should yield folders with mp4 files.""" scanner = SerieScanner(temp_directory, mock_loader) result = list(scanner._SerieScanner__find_mp4_files()) # temp_directory has "Attack on Titan (2013)" with one mp4 assert len(result) >= 1 folder, mp4s = result[0] assert folder == "Attack on Titan (2013)" assert len(mp4s) == 1 def test_empty_directory(self, mock_loader): """Should yield nothing for empty directory.""" import tempfile with tempfile.TemporaryDirectory() as tmpdir: scanner = SerieScanner(tmpdir, mock_loader) result = list(scanner._SerieScanner__find_mp4_files()) assert len(result) == 0 def test_nested_mp4_files(self, mock_loader): """Should find mp4 files in subdirectories.""" import tempfile with tempfile.TemporaryDirectory() as tmpdir: # Create nested structure anime = os.path.join(tmpdir, "Naruto") season = os.path.join(anime, "Season 1") os.makedirs(season) with open(os.path.join(season, "ep1.mp4"), "w") as f: f.write("dummy") scanner = SerieScanner(tmpdir, mock_loader) result = list(scanner._SerieScanner__find_mp4_files()) assert len(result) == 1 assert "Naruto" == result[0][0] assert len(result[0][1]) == 1 def test_non_mp4_ignored(self, mock_loader): """Should ignore non-mp4 files.""" import tempfile with tempfile.TemporaryDirectory() as tmpdir: anime = os.path.join(tmpdir, "TestAnime") os.makedirs(anime) with open(os.path.join(anime, "readme.txt"), "w") as f: f.write("not a video") scanner = SerieScanner(tmpdir, mock_loader) result = list(scanner._SerieScanner__find_mp4_files()) # The folder is yielded but with empty mp4 list assert len(result) == 1 assert result[0][1] == [] class TestReadDataFromFile: """Test __read_data_from_file method.""" def test_reads_key_file(self, mock_loader): """Should read key from 'key' file.""" import tempfile with tempfile.TemporaryDirectory() as tmpdir: anime_folder = os.path.join(tmpdir, "SomeAnime") os.makedirs(anime_folder) with open(os.path.join(anime_folder, "key"), "w") as f: f.write("some-key") scanner = SerieScanner(tmpdir, mock_loader) result = scanner._SerieScanner__read_data_from_file("SomeAnime") assert result is not None assert result.key == "some-key" def test_reads_data_file(self, mock_loader): """Should read Serie from 'data' file when no 'key' file.""" import tempfile with tempfile.TemporaryDirectory() as tmpdir: anime_folder = os.path.join(tmpdir, "SomeAnime") os.makedirs(anime_folder) # Create a data file serie = Serie("test-key", "Test", "aniworld.to", "SomeAnime", {}) data_path = os.path.join(anime_folder, "data") serie.save_to_file(data_path) scanner = SerieScanner(tmpdir, mock_loader) result = scanner._SerieScanner__read_data_from_file("SomeAnime") assert result is not None assert result.key == "test-key" def test_no_files_returns_none(self, mock_loader): """Should return None when no key or data file exists.""" import tempfile with tempfile.TemporaryDirectory() as tmpdir: anime_folder = os.path.join(tmpdir, "Empty") os.makedirs(anime_folder) scanner = SerieScanner(tmpdir, mock_loader) result = scanner._SerieScanner__read_data_from_file("Empty") assert result is None class TestReinit: """Test reinit method.""" def test_clears_keydict(self, temp_directory, mock_loader): """reinit should clear the keyDict.""" scanner = SerieScanner(temp_directory, mock_loader) scanner.keyDict["test"] = MagicMock() scanner.reinit() assert scanner.keyDict == {} class TestGetTotalToScan: """Test get_total_to_scan.""" def test_counts_folders(self, temp_directory, mock_loader): """Should count number of folders.""" scanner = SerieScanner(temp_directory, mock_loader) count = scanner.get_total_to_scan() assert count >= 1 def test_empty_directory(self, mock_loader): """Should return 0 for empty directory.""" import tempfile with tempfile.TemporaryDirectory() as tmpdir: scanner = SerieScanner(tmpdir, mock_loader) assert scanner.get_total_to_scan() == 0 class TestScanProgressEvents: """Test that scan emits progress and completion events.""" def test_scan_emits_progress(self, temp_directory, mock_loader): """Should emit on_progress during scan.""" scanner = SerieScanner(temp_directory, mock_loader) progress_handler = MagicMock() scanner.subscribe_on_progress(progress_handler) with patch.object(scanner, 'get_total_to_scan', return_value=0), \ patch.object( scanner, '_SerieScanner__find_mp4_files', return_value=iter([]) ): scanner.scan() # At minimum, STARTING event should fire assert progress_handler.call_count >= 1 first_call = progress_handler.call_args_list[0][0][0] assert first_call["phase"] == "STARTING" def test_scan_emits_completion(self, temp_directory, mock_loader): """Should emit on_completion after scan.""" scanner = SerieScanner(temp_directory, mock_loader) completion_handler = MagicMock() scanner.subscribe_on_completion(completion_handler) with patch.object(scanner, 'get_total_to_scan', return_value=0), \ patch.object( scanner, '_SerieScanner__find_mp4_files', return_value=iter([]) ): scanner.scan() completion_handler.assert_called_once() call_data = completion_handler.call_args[0][0] assert call_data["success"] is True def test_scan_emits_error_on_no_key( self, temp_directory, mock_loader ): """Should emit on_error when NoKeyFoundException occurs.""" from src.core.exceptions.Exceptions import NoKeyFoundException scanner = SerieScanner(temp_directory, mock_loader) error_handler = MagicMock() scanner.subscribe_on_error(error_handler) with patch.object(scanner, 'get_total_to_scan', return_value=1), \ patch.object( scanner, '_SerieScanner__find_mp4_files', return_value=iter([("BadFolder", ["e1.mp4"])]) ), \ patch.object( scanner, '_SerieScanner__read_data_from_file', side_effect=NoKeyFoundException("no key"), ): scanner.scan() error_handler.assert_called_once() call_data = error_handler.call_args[0][0] assert call_data["recoverable"] is True