445 lines
16 KiB
Python
445 lines
16 KiB
Python
"""Unit tests for enhanced_provider.py - Caching, recovery, download logic."""
|
|
|
|
import json
|
|
import os
|
|
from typing import Any, Dict, List
|
|
from unittest.mock import MagicMock, Mock, PropertyMock, patch
|
|
|
|
import pytest
|
|
|
|
from src.core.error_handler import (
|
|
DownloadError,
|
|
NetworkError,
|
|
NonRetryableError,
|
|
RetryableError,
|
|
)
|
|
from src.core.providers.base_provider import Loader
|
|
|
|
# Import the class but we need a concrete subclass to test it
|
|
from src.core.providers.enhanced_provider import EnhancedAniWorldLoader
|
|
|
|
|
|
class ConcreteEnhancedLoader(EnhancedAniWorldLoader):
|
|
"""Concrete subclass that bridges PascalCase methods to abstract interface."""
|
|
|
|
def subscribe_download_progress(self, handler):
|
|
pass
|
|
|
|
def unsubscribe_download_progress(self, handler):
|
|
pass
|
|
|
|
def search(self, word: str) -> List[Dict[str, Any]]:
|
|
return self.Search(word)
|
|
|
|
def is_language(self, season, episode, key, language="German Dub"):
|
|
return self.IsLanguage(season, episode, key, language)
|
|
|
|
def download(self, base_directory, serie_folder, season, episode, key,
|
|
language="German Dub", **kwargs):
|
|
return self.Download(base_directory, serie_folder, season, episode,
|
|
key, language)
|
|
|
|
def get_site_key(self) -> str:
|
|
return self.GetSiteKey()
|
|
|
|
def get_title(self, key: str) -> str:
|
|
return self.GetTitle(key)
|
|
|
|
|
|
@pytest.fixture
|
|
def enhanced_loader():
|
|
"""Create ConcreteEnhancedLoader with mocked externals."""
|
|
with patch(
|
|
"src.core.providers.enhanced_provider.UserAgent"
|
|
) as mock_ua, patch(
|
|
"src.core.providers.enhanced_provider.get_integrity_manager"
|
|
):
|
|
mock_ua.return_value.random = "MockAgent/1.0"
|
|
loader = ConcreteEnhancedLoader()
|
|
loader.session = MagicMock()
|
|
return loader
|
|
|
|
|
|
class TestEnhancedLoaderInit:
|
|
"""Test EnhancedAniWorldLoader initialization."""
|
|
|
|
def test_initializes_with_download_stats(self, enhanced_loader):
|
|
"""Should initialize download statistics at zero."""
|
|
stats = enhanced_loader.download_stats
|
|
assert stats["total_downloads"] == 0
|
|
assert stats["successful_downloads"] == 0
|
|
assert stats["failed_downloads"] == 0
|
|
assert stats["retried_downloads"] == 0
|
|
|
|
def test_initializes_with_caches(self, enhanced_loader):
|
|
"""Should initialize empty caches."""
|
|
assert enhanced_loader._KeyHTMLDict == {}
|
|
assert enhanced_loader._EpisodeHTMLDict == {}
|
|
|
|
def test_site_key(self, enhanced_loader):
|
|
"""GetSiteKey should return 'aniworld.to'."""
|
|
assert enhanced_loader.GetSiteKey() == "aniworld.to"
|
|
|
|
def test_has_supported_providers(self, enhanced_loader):
|
|
"""Should have a list of supported providers."""
|
|
assert isinstance(enhanced_loader.SUPPORTED_PROVIDERS, list)
|
|
assert len(enhanced_loader.SUPPORTED_PROVIDERS) > 0
|
|
assert "VOE" in enhanced_loader.SUPPORTED_PROVIDERS
|
|
|
|
|
|
class TestEnhancedSearch:
|
|
"""Test enhanced search with error recovery."""
|
|
|
|
def test_search_empty_term_raises(self, enhanced_loader):
|
|
"""Search with empty term should raise ValueError."""
|
|
with pytest.raises(ValueError, match="empty"):
|
|
enhanced_loader.Search("")
|
|
|
|
def test_search_whitespace_only_raises(self, enhanced_loader):
|
|
"""Search with whitespace-only term should raise ValueError."""
|
|
with pytest.raises(ValueError, match="empty"):
|
|
enhanced_loader.Search(" ")
|
|
|
|
def test_search_successful(self, enhanced_loader):
|
|
"""Successful search should return parsed list."""
|
|
mock_response = MagicMock()
|
|
mock_response.ok = True
|
|
mock_response.text = json.dumps([
|
|
{"title": "Naruto", "link": "/anime/stream/naruto"}
|
|
])
|
|
enhanced_loader.session.get.return_value = mock_response
|
|
|
|
result = enhanced_loader.Search("Naruto")
|
|
assert len(result) == 1
|
|
assert result[0]["title"] == "Naruto"
|
|
|
|
|
|
class TestParseAnimeResponse:
|
|
"""Test JSON parsing strategies."""
|
|
|
|
def test_parse_valid_json_list(self, enhanced_loader):
|
|
"""Should parse valid JSON list."""
|
|
text = '[{"title": "Naruto"}]'
|
|
result = enhanced_loader._parse_anime_response(text)
|
|
assert len(result) == 1
|
|
|
|
def test_parse_html_escaped_json(self, enhanced_loader):
|
|
"""Should handle HTML-escaped JSON."""
|
|
text = '[{"title": "Naruto & Boruto"}]'
|
|
result = enhanced_loader._parse_anime_response(text)
|
|
assert result[0]["title"] == "Naruto & Boruto"
|
|
|
|
def test_parse_empty_response_raises(self, enhanced_loader):
|
|
"""Empty response should raise ValueError."""
|
|
with pytest.raises(ValueError, match="Empty response"):
|
|
enhanced_loader._parse_anime_response("")
|
|
|
|
def test_parse_whitespace_only_raises(self, enhanced_loader):
|
|
"""Whitespace-only response should raise ValueError."""
|
|
with pytest.raises(ValueError, match="Empty response"):
|
|
enhanced_loader._parse_anime_response(" ")
|
|
|
|
def test_parse_html_response_raises(self, enhanced_loader):
|
|
"""HTML response instead of JSON should raise ValueError."""
|
|
with pytest.raises(ValueError):
|
|
enhanced_loader._parse_anime_response(
|
|
"<!DOCTYPE html><html></html>"
|
|
)
|
|
|
|
def test_parse_bom_json(self, enhanced_loader):
|
|
"""Should handle BOM-prefixed JSON."""
|
|
text = '\ufeff[{"title": "Test"}]'
|
|
result = enhanced_loader._parse_anime_response(text)
|
|
assert len(result) == 1
|
|
|
|
def test_parse_control_characters(self, enhanced_loader):
|
|
"""Should strip control characters and parse."""
|
|
text = '[{"title": "Na\x00ruto"}]'
|
|
result = enhanced_loader._parse_anime_response(text)
|
|
assert len(result) == 1
|
|
|
|
def test_parse_non_list_result_raises(self, enhanced_loader):
|
|
"""Non-list JSON should raise ValueError."""
|
|
with pytest.raises(ValueError):
|
|
enhanced_loader._parse_anime_response('{"key": "value"}')
|
|
|
|
|
|
class TestLanguageKey:
|
|
"""Test language code mapping."""
|
|
|
|
def test_german_dub(self, enhanced_loader):
|
|
"""German Dub should map to 1."""
|
|
assert enhanced_loader._GetLanguageKey("German Dub") == 1
|
|
|
|
def test_english_sub(self, enhanced_loader):
|
|
"""English Sub should map to 2."""
|
|
assert enhanced_loader._GetLanguageKey("English Sub") == 2
|
|
|
|
def test_german_sub(self, enhanced_loader):
|
|
"""German Sub should map to 3."""
|
|
assert enhanced_loader._GetLanguageKey("German Sub") == 3
|
|
|
|
def test_unknown_language(self, enhanced_loader):
|
|
"""Unknown language should map to 0."""
|
|
assert enhanced_loader._GetLanguageKey("French Dub") == 0
|
|
|
|
|
|
class TestEnhancedIsLanguage:
|
|
"""Test language availability checking with recovery."""
|
|
|
|
def test_is_language_available(self, enhanced_loader):
|
|
"""Should return True when language is available."""
|
|
html = """
|
|
<html><body>
|
|
<div class="changeLanguageBox">
|
|
<img data-lang-key="1" />
|
|
<img data-lang-key="2" />
|
|
</div>
|
|
</body></html>
|
|
"""
|
|
mock_response = MagicMock()
|
|
mock_response.content = html.encode("utf-8")
|
|
enhanced_loader._EpisodeHTMLDict[(
|
|
"naruto", 1, 1
|
|
)] = mock_response
|
|
|
|
result = enhanced_loader.IsLanguage(1, 1, "naruto", "German Dub")
|
|
assert result is True
|
|
|
|
def test_is_language_not_available(self, enhanced_loader):
|
|
"""Should return False when language is not available."""
|
|
html = """
|
|
<html><body>
|
|
<div class="changeLanguageBox">
|
|
<img data-lang-key="1" />
|
|
</div>
|
|
</body></html>
|
|
"""
|
|
mock_response = MagicMock()
|
|
mock_response.content = html.encode("utf-8")
|
|
enhanced_loader._EpisodeHTMLDict[(
|
|
"naruto", 1, 1
|
|
)] = mock_response
|
|
|
|
result = enhanced_loader.IsLanguage(1, 1, "naruto", "German Sub")
|
|
assert result is False
|
|
|
|
def test_is_language_no_language_box(self, enhanced_loader):
|
|
"""Should return False when no language box in HTML."""
|
|
html = "<html><body></body></html>"
|
|
mock_response = MagicMock()
|
|
mock_response.content = html.encode("utf-8")
|
|
enhanced_loader._EpisodeHTMLDict[(
|
|
"naruto", 1, 1
|
|
)] = mock_response
|
|
|
|
result = enhanced_loader.IsLanguage(1, 1, "naruto", "German Dub")
|
|
assert result is False
|
|
|
|
|
|
class TestEnhancedGetTitle:
|
|
"""Test title extraction with error recovery."""
|
|
|
|
def test_get_title_successful(self, enhanced_loader):
|
|
"""Should extract title from HTML."""
|
|
html = """
|
|
<html><body>
|
|
<div class="series-title">
|
|
<h1><span>Attack on Titan</span></h1>
|
|
</div>
|
|
</body></html>
|
|
"""
|
|
mock_response = MagicMock()
|
|
mock_response.content = html.encode("utf-8")
|
|
enhanced_loader._KeyHTMLDict["aot"] = mock_response
|
|
|
|
result = enhanced_loader.GetTitle("aot")
|
|
assert result == "Attack on Titan"
|
|
|
|
def test_get_title_missing_returns_fallback(self, enhanced_loader):
|
|
"""Should return fallback title when not found in HTML."""
|
|
html = "<html><body></body></html>"
|
|
mock_response = MagicMock()
|
|
mock_response.content = html.encode("utf-8")
|
|
enhanced_loader._KeyHTMLDict["unknown"] = mock_response
|
|
|
|
result = enhanced_loader.GetTitle("unknown")
|
|
assert "Unknown_Title" in result
|
|
|
|
|
|
class TestEnhancedCache:
|
|
"""Test cache operations."""
|
|
|
|
def test_clear_cache(self, enhanced_loader):
|
|
"""ClearCache should empty all caches."""
|
|
enhanced_loader._KeyHTMLDict["key"] = "data"
|
|
enhanced_loader._EpisodeHTMLDict[("key", 1, 1)] = "data"
|
|
|
|
enhanced_loader.ClearCache()
|
|
|
|
assert len(enhanced_loader._KeyHTMLDict) == 0
|
|
assert len(enhanced_loader._EpisodeHTMLDict) == 0
|
|
|
|
def test_remove_from_cache(self, enhanced_loader):
|
|
"""RemoveFromCache should only clear episode cache."""
|
|
enhanced_loader._KeyHTMLDict["key"] = "data"
|
|
enhanced_loader._EpisodeHTMLDict[("key", 1, 1)] = "data"
|
|
|
|
enhanced_loader.RemoveFromCache()
|
|
|
|
assert len(enhanced_loader._KeyHTMLDict) == 1
|
|
assert len(enhanced_loader._EpisodeHTMLDict) == 0
|
|
|
|
|
|
class TestEnhancedGetEpisodeHTML:
|
|
"""Test episode HTML fetching with validation."""
|
|
|
|
def test_empty_key_raises(self, enhanced_loader):
|
|
"""Empty key should raise ValueError."""
|
|
with pytest.raises(ValueError, match="empty"):
|
|
enhanced_loader._GetEpisodeHTML(1, 1, "")
|
|
|
|
def test_whitespace_key_raises(self, enhanced_loader):
|
|
"""Whitespace key should raise ValueError."""
|
|
with pytest.raises(ValueError, match="empty"):
|
|
enhanced_loader._GetEpisodeHTML(1, 1, " ")
|
|
|
|
def test_invalid_season_zero_raises(self, enhanced_loader):
|
|
"""Season 0 should raise ValueError."""
|
|
with pytest.raises(ValueError, match="Invalid season"):
|
|
enhanced_loader._GetEpisodeHTML(0, 1, "naruto")
|
|
|
|
def test_invalid_season_negative_raises(self, enhanced_loader):
|
|
"""Negative season should raise ValueError."""
|
|
with pytest.raises(ValueError, match="Invalid season"):
|
|
enhanced_loader._GetEpisodeHTML(-1, 1, "naruto")
|
|
|
|
def test_invalid_episode_zero_raises(self, enhanced_loader):
|
|
"""Episode 0 should raise ValueError."""
|
|
with pytest.raises(ValueError, match="Invalid episode"):
|
|
enhanced_loader._GetEpisodeHTML(1, 0, "naruto")
|
|
|
|
def test_cached_episode_returned(self, enhanced_loader):
|
|
"""Should return cached response without HTTP call."""
|
|
mock_response = MagicMock()
|
|
enhanced_loader._EpisodeHTMLDict[("naruto", 1, 1)] = mock_response
|
|
|
|
result = enhanced_loader._GetEpisodeHTML(1, 1, "naruto")
|
|
assert result is mock_response
|
|
enhanced_loader.session.get.assert_not_called()
|
|
|
|
|
|
class TestDownloadStatistics:
|
|
"""Test download statistics tracking."""
|
|
|
|
def test_get_download_statistics(self, enhanced_loader):
|
|
"""Should return stats with calculated success rate."""
|
|
enhanced_loader.download_stats["total_downloads"] = 10
|
|
enhanced_loader.download_stats["successful_downloads"] = 8
|
|
enhanced_loader.download_stats["failed_downloads"] = 2
|
|
|
|
stats = enhanced_loader.get_download_statistics()
|
|
assert stats["success_rate"] == 80.0
|
|
|
|
def test_statistics_zero_downloads(self, enhanced_loader):
|
|
"""Success rate should be 0 with no downloads."""
|
|
stats = enhanced_loader.get_download_statistics()
|
|
assert stats["success_rate"] == 0
|
|
|
|
def test_reset_statistics(self, enhanced_loader):
|
|
"""reset_statistics should zero all counters."""
|
|
enhanced_loader.download_stats["total_downloads"] = 10
|
|
enhanced_loader.download_stats["successful_downloads"] = 8
|
|
|
|
enhanced_loader.reset_statistics()
|
|
|
|
assert enhanced_loader.download_stats["total_downloads"] == 0
|
|
assert enhanced_loader.download_stats["successful_downloads"] == 0
|
|
|
|
|
|
class TestEnhancedDownloadValidation:
|
|
"""Test download input validation."""
|
|
|
|
@patch("src.core.providers.enhanced_provider.get_integrity_manager")
|
|
def test_download_missing_base_directory_raises(
|
|
self, mock_integrity, enhanced_loader
|
|
):
|
|
"""Download with empty base_directory should raise."""
|
|
with pytest.raises((ValueError, DownloadError)):
|
|
enhanced_loader.Download("", "folder", 1, 1, "key")
|
|
|
|
@patch("src.core.providers.enhanced_provider.get_integrity_manager")
|
|
def test_download_missing_serie_folder_raises(
|
|
self, mock_integrity, enhanced_loader
|
|
):
|
|
"""Download with empty serie_folder should raise."""
|
|
with pytest.raises((ValueError, DownloadError)):
|
|
enhanced_loader.Download("/base", "", 1, 1, "key")
|
|
|
|
@patch("src.core.providers.enhanced_provider.get_integrity_manager")
|
|
def test_download_negative_season_raises(
|
|
self, mock_integrity, enhanced_loader
|
|
):
|
|
"""Download with negative season should raise."""
|
|
with pytest.raises((ValueError, DownloadError)):
|
|
enhanced_loader.Download("/base", "folder", -1, 1, "key")
|
|
|
|
@patch("src.core.providers.enhanced_provider.get_integrity_manager")
|
|
def test_download_negative_episode_raises(
|
|
self, mock_integrity, enhanced_loader
|
|
):
|
|
"""Download with negative episode should raise."""
|
|
with pytest.raises((ValueError, DownloadError)):
|
|
enhanced_loader.Download("/base", "folder", 1, -1, "key")
|
|
|
|
@patch("src.core.providers.enhanced_provider.get_integrity_manager")
|
|
def test_download_increments_total_count(
|
|
self, mock_integrity, enhanced_loader
|
|
):
|
|
"""Download should increment total_downloads counter."""
|
|
# Make it fail fast so we don't need to mock everything
|
|
enhanced_loader._KeyHTMLDict["key"] = MagicMock(
|
|
content=b"<html><body><div class='series-title'><h1><span>Test</span></h1></div></body></html>"
|
|
)
|
|
try:
|
|
enhanced_loader.Download("/base", "folder", 1, 1, "key")
|
|
except Exception:
|
|
pass
|
|
assert enhanced_loader.download_stats["total_downloads"] >= 1
|
|
|
|
|
|
class TestEnhancedProviderFromHTML:
|
|
"""Test provider extraction from episode HTML."""
|
|
|
|
def test_extract_providers(self, enhanced_loader):
|
|
"""Should extract providers with language keys from HTML."""
|
|
html = """
|
|
<html><body>
|
|
<li class="episodeLink1" data-lang-key="1">
|
|
<h4>VOE</h4>
|
|
<a class="watchEpisode" href="/redirect/100"></a>
|
|
</li>
|
|
<li class="episodeLink2" data-lang-key="2">
|
|
<h4>Vidmoly</h4>
|
|
<a class="watchEpisode" href="/redirect/200"></a>
|
|
</li>
|
|
</body></html>
|
|
"""
|
|
mock_response = MagicMock()
|
|
mock_response.content = html.encode("utf-8")
|
|
enhanced_loader._EpisodeHTMLDict[("test", 1, 1)] = mock_response
|
|
|
|
result = enhanced_loader._get_provider_from_html(1, 1, "test")
|
|
assert "VOE" in result
|
|
assert "Vidmoly" in result
|
|
|
|
def test_extract_providers_empty_page(self, enhanced_loader):
|
|
"""Should return empty dict when no providers found."""
|
|
html = "<html><body></body></html>"
|
|
mock_response = MagicMock()
|
|
mock_response.content = html.encode("utf-8")
|
|
enhanced_loader._EpisodeHTMLDict[("test", 1, 1)] = mock_response
|
|
|
|
result = enhanced_loader._get_provider_from_html(1, 1, "test")
|
|
assert result == {}
|