- Move src/core/ → src/server/ - Split SerieList.py (531 lines) and series.py (414 lines) into src/server/database/ - Add database/models.py for SQLAlchemy models - Update all test imports to reflect new structure - Remove deprecated test files (test_serie_class.py, test_serie_folder_with_year.py)
758 lines
28 KiB
Python
758 lines
28 KiB
Python
"""Unit tests for aniworld_provider.py - Anime catalog scraping, episode listing, streaming link extraction."""
|
|
|
|
import json
|
|
import os
|
|
from unittest.mock import MagicMock, Mock, patch
|
|
|
|
import pytest
|
|
import requests
|
|
|
|
from src.server.providers.aniworld_provider import AniworldLoader
|
|
|
|
|
|
@pytest.fixture
|
|
def loader():
|
|
"""Create AniworldLoader with mocked session to prevent real HTTP calls."""
|
|
with patch("src.server.providers.aniworld_provider.UserAgent") as mock_ua:
|
|
mock_ua.return_value.random = "MockUserAgent/1.0"
|
|
instance = AniworldLoader()
|
|
instance.session = MagicMock()
|
|
return instance
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_search_response():
|
|
"""Sample JSON response for anime search."""
|
|
return json.dumps([
|
|
{"link": "/anime/stream/naruto", "title": "Naruto"},
|
|
{"link": "/anime/stream/one-piece", "title": "One Piece"},
|
|
])
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_episode_html():
|
|
"""Sample HTML for an episode page with language info and providers."""
|
|
return """
|
|
<html>
|
|
<body>
|
|
<div class="changeLanguageBox">
|
|
<img data-lang-key="1" src="/flags/de.png" />
|
|
<img data-lang-key="2" src="/flags/en.png" />
|
|
</div>
|
|
<li class="episodeLink1">
|
|
<h4>VOE</h4>
|
|
<a class="watchEpisode" href="/redirect/12345"></a>
|
|
<span data-lang-key="1"></span>
|
|
</li>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_series_html():
|
|
"""Sample HTML for a series main page."""
|
|
return """
|
|
<html>
|
|
<body>
|
|
<div class="series-title">
|
|
<h1><span>Naruto Shippuden</span></h1>
|
|
</div>
|
|
<p>Jahr: 2007</p>
|
|
<div class="series-info">Aired: 2007-2017</div>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_season_html():
|
|
"""Sample HTML for a season page with episode links."""
|
|
return """
|
|
<html>
|
|
<body>
|
|
<meta itemprop="numberOfSeasons" content="2" />
|
|
<a href="/anime/stream/naruto/staffel-1/episode-1">Ep 1</a>
|
|
<a href="/anime/stream/naruto/staffel-1/episode-2">Ep 2</a>
|
|
<a href="/anime/stream/naruto/staffel-1/episode-3">Ep 3</a>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
class TestAniworldLoaderInit:
|
|
"""Test AniworldLoader initialization."""
|
|
|
|
def test_loader_initializes(self, loader):
|
|
"""Loader should initialize with expected attributes."""
|
|
assert loader.ANIWORLD_TO == "https://aniworld.to"
|
|
assert isinstance(loader.SUPPORTED_PROVIDERS, list)
|
|
assert len(loader.SUPPORTED_PROVIDERS) > 0
|
|
|
|
def test_loader_has_session(self, loader):
|
|
"""Loader should have a requests session."""
|
|
assert loader.session is not None
|
|
|
|
def test_loader_has_caches(self, loader):
|
|
"""Loader should initialize empty caches."""
|
|
assert isinstance(loader._KeyHTMLDict, dict)
|
|
assert isinstance(loader._EpisodeHTMLDict, dict)
|
|
|
|
def test_loader_site_key(self, loader):
|
|
"""get_site_key should return 'aniworld.to'."""
|
|
assert loader.get_site_key() == "aniworld.to"
|
|
|
|
def test_loader_provider_headers_initialized(self, loader):
|
|
"""Provider-specific headers should be initialized."""
|
|
assert isinstance(loader.PROVIDER_HEADERS, dict)
|
|
assert "VOE" in loader.PROVIDER_HEADERS
|
|
|
|
|
|
class TestAniworldSearch:
|
|
"""Test anime search functionality."""
|
|
|
|
def test_search_parses_json_response(self, loader, sample_search_response):
|
|
"""search() should parse JSON response into list."""
|
|
mock_response = MagicMock()
|
|
mock_response.text = sample_search_response
|
|
mock_response.status_code = 200
|
|
mock_response.raise_for_status = MagicMock()
|
|
loader.session.get.return_value = mock_response
|
|
|
|
result = loader.search("naruto")
|
|
assert isinstance(result, list)
|
|
assert len(result) == 2
|
|
assert result[0]["title"] == "Naruto"
|
|
|
|
def test_search_calls_correct_url(self, loader, sample_search_response):
|
|
"""search() should call the correct search URL."""
|
|
mock_response = MagicMock()
|
|
mock_response.text = sample_search_response
|
|
mock_response.status_code = 200
|
|
mock_response.raise_for_status = MagicMock()
|
|
loader.session.get.return_value = mock_response
|
|
|
|
loader.search("naruto")
|
|
call_args = loader.session.get.call_args
|
|
assert "seriesSearch" in call_args[0][0]
|
|
assert "naruto" in call_args[0][0]
|
|
|
|
def test_search_handles_empty_response(self, loader):
|
|
"""search() with empty JSON array should return empty list."""
|
|
mock_response = MagicMock()
|
|
mock_response.text = "[]"
|
|
mock_response.status_code = 200
|
|
mock_response.raise_for_status = MagicMock()
|
|
loader.session.get.return_value = mock_response
|
|
|
|
result = loader.search("nonexistent")
|
|
assert result == []
|
|
|
|
def test_search_handles_html_escaped_json(self, loader):
|
|
"""search() should handle HTML-escaped JSON response."""
|
|
escaped_json = '[{"title": "Naruto & Friends"}]'
|
|
mock_response = MagicMock()
|
|
mock_response.text = escaped_json
|
|
mock_response.status_code = 200
|
|
mock_response.raise_for_status = MagicMock()
|
|
loader.session.get.return_value = mock_response
|
|
|
|
result = loader.search("naruto")
|
|
assert len(result) == 1
|
|
assert result[0]["title"] == "Naruto & Friends"
|
|
|
|
def test_search_url_encodes_special_characters(self, loader, sample_search_response):
|
|
"""search() should URL-encode special characters in search term."""
|
|
mock_response = MagicMock()
|
|
mock_response.text = sample_search_response
|
|
mock_response.raise_for_status = MagicMock()
|
|
loader.session.get.return_value = mock_response
|
|
|
|
loader.search("attack on titan")
|
|
call_url = loader.session.get.call_args[0][0]
|
|
assert "attack" in call_url
|
|
|
|
def test_search_raises_on_invalid_json(self, loader):
|
|
"""search() should raise when response is not valid JSON."""
|
|
mock_response = MagicMock()
|
|
mock_response.text = "<html>Not JSON</html>"
|
|
mock_response.status_code = 200
|
|
mock_response.raise_for_status = MagicMock()
|
|
loader.session.get.return_value = mock_response
|
|
|
|
with pytest.raises((ValueError, json.JSONDecodeError)):
|
|
loader.search("naruto")
|
|
|
|
|
|
class TestAniworldLanguageCheck:
|
|
"""Test language availability checking."""
|
|
|
|
def test_get_language_key_german_dub(self, loader):
|
|
"""_get_language_key should return 1 for 'German Dub'."""
|
|
assert loader._get_language_key("German Dub") == 1
|
|
|
|
def test_get_language_key_english_sub(self, loader):
|
|
"""_get_language_key should return 2 for 'English Sub'."""
|
|
assert loader._get_language_key("English Sub") == 2
|
|
|
|
def test_get_language_key_german_sub(self, loader):
|
|
"""_get_language_key should return 3 for 'German Sub'."""
|
|
assert loader._get_language_key("German Sub") == 3
|
|
|
|
def test_get_language_key_unknown(self, loader):
|
|
"""_get_language_key should return 0 for unknown language."""
|
|
assert loader._get_language_key("French Dub") == 0
|
|
|
|
def test_is_language_with_available_language(self, loader, sample_episode_html):
|
|
"""is_language should return True when language is available."""
|
|
mock_response = MagicMock()
|
|
mock_response.content = sample_episode_html.encode("utf-8")
|
|
loader.session.get.return_value = mock_response
|
|
|
|
result = loader.is_language(1, 1, "naruto", "German Dub")
|
|
assert result is True
|
|
|
|
def test_is_language_english_sub_available(self, loader, sample_episode_html):
|
|
"""is_language should return True for English Sub when available."""
|
|
mock_response = MagicMock()
|
|
mock_response.content = sample_episode_html.encode("utf-8")
|
|
loader.session.get.return_value = mock_response
|
|
|
|
result = loader.is_language(1, 1, "naruto", "English Sub")
|
|
assert result is True
|
|
|
|
def test_is_language_unavailable_language(self, loader, sample_episode_html):
|
|
"""is_language should return False when language is not available."""
|
|
mock_response = MagicMock()
|
|
mock_response.content = sample_episode_html.encode("utf-8")
|
|
loader.session.get.return_value = mock_response
|
|
|
|
result = loader.is_language(1, 1, "naruto", "German Sub")
|
|
assert result is False
|
|
|
|
def test_is_language_no_language_box(self, loader):
|
|
"""is_language should return False when no language box exists."""
|
|
html = "<html><body><div></div></body></html>"
|
|
mock_response = MagicMock()
|
|
mock_response.content = html.encode("utf-8")
|
|
loader.session.get.return_value = mock_response
|
|
|
|
result = loader.is_language(1, 1, "naruto", "German Dub")
|
|
assert result is False
|
|
|
|
|
|
class TestAniworldTitle:
|
|
"""Test title extraction."""
|
|
|
|
def test_get_title_extracts_correctly(self, loader, sample_series_html):
|
|
"""get_title should extract title from HTML."""
|
|
mock_response = MagicMock()
|
|
mock_response.content = sample_series_html.encode("utf-8")
|
|
loader._KeyHTMLDict["naruto"] = mock_response
|
|
|
|
result = loader.get_title("naruto")
|
|
assert result == "Naruto Shippuden"
|
|
|
|
def test_get_title_missing_title_div(self, loader):
|
|
"""get_title should return empty string when title div is missing."""
|
|
html = "<html><body></body></html>"
|
|
mock_response = MagicMock()
|
|
mock_response.content = html.encode("utf-8")
|
|
loader._KeyHTMLDict["unknown"] = mock_response
|
|
|
|
result = loader.get_title("unknown")
|
|
assert result == ""
|
|
|
|
def test_get_title_caches_html(self, loader, sample_series_html):
|
|
"""get_title should use cached HTML on second call."""
|
|
mock_response = MagicMock()
|
|
mock_response.content = sample_series_html.encode("utf-8")
|
|
loader._KeyHTMLDict["naruto"] = mock_response
|
|
|
|
loader.get_title("naruto")
|
|
loader.get_title("naruto")
|
|
# Session should not be called since HTML is cached
|
|
loader.session.get.assert_not_called()
|
|
|
|
|
|
class TestAniworldYear:
|
|
"""Test year extraction."""
|
|
|
|
def test_get_year_extracts_from_metadata(self, loader, sample_series_html):
|
|
"""get_year should extract year from 'Jahr:' text."""
|
|
mock_response = MagicMock()
|
|
mock_response.content = sample_series_html.encode("utf-8")
|
|
loader._KeyHTMLDict["naruto"] = mock_response
|
|
|
|
result = loader.get_year("naruto")
|
|
assert result == 2007
|
|
|
|
def test_get_year_returns_none_when_not_found(self, loader):
|
|
"""get_year should return None when no year info exists."""
|
|
html = "<html><body><div class='series-title'></div></body></html>"
|
|
mock_response = MagicMock()
|
|
mock_response.content = html.encode("utf-8")
|
|
loader._KeyHTMLDict["unknown"] = mock_response
|
|
|
|
result = loader.get_year("unknown")
|
|
assert result is None
|
|
|
|
|
|
class TestAniworldEpisodeHtml:
|
|
"""Test episode HTML fetching and caching."""
|
|
|
|
def test_get_episode_html_fetches_from_session(self, loader):
|
|
"""_get_episode_html should fetch from session and cache."""
|
|
mock_response = MagicMock()
|
|
mock_response.content = b"<html></html>"
|
|
loader.session.get.return_value = mock_response
|
|
|
|
result = loader._get_episode_html(1, 1, "naruto")
|
|
assert result is mock_response
|
|
loader.session.get.assert_called_once()
|
|
|
|
def test_get_episode_html_invalid_season(self, loader):
|
|
"""_get_episode_html should raise ValueError for invalid season."""
|
|
with pytest.raises(ValueError, match="Invalid season number"):
|
|
loader._get_episode_html(0, 1, "naruto")
|
|
|
|
def test_get_episode_html_invalid_episode(self, loader):
|
|
"""_get_episode_html should raise ValueError for invalid episode."""
|
|
with pytest.raises(ValueError, match="Invalid episode number"):
|
|
loader._get_episode_html(1, 0, "naruto")
|
|
|
|
def test_get_episode_html_season_too_large(self, loader):
|
|
"""_get_episode_html should raise ValueError for season > 999."""
|
|
with pytest.raises(ValueError, match="Invalid season number"):
|
|
loader._get_episode_html(1000, 1, "naruto")
|
|
|
|
def test_get_episode_html_episode_too_large(self, loader):
|
|
"""_get_episode_html should raise ValueError for episode > 9999."""
|
|
with pytest.raises(ValueError, match="Invalid episode number"):
|
|
loader._get_episode_html(1, 10000, "naruto")
|
|
|
|
|
|
class TestAniworldProviderParsing:
|
|
"""Test provider extraction from HTML."""
|
|
|
|
def test_parse_providers_from_html(self, loader):
|
|
"""_get_provider_from_html should extract available providers."""
|
|
html = """
|
|
<html><body>
|
|
<li class="episodeLink1" data-lang-key="1">
|
|
<h4>VOE</h4>
|
|
<a class="watchEpisode" href="/redirect/111"></a>
|
|
</li>
|
|
<li class="episodeLink2" data-lang-key="2">
|
|
<h4>Vidmoly</h4>
|
|
<a class="watchEpisode" href="/redirect/222"></a>
|
|
</li>
|
|
</body></html>
|
|
"""
|
|
mock_response = MagicMock()
|
|
mock_response.content = html.encode("utf-8")
|
|
loader.session.get.return_value = mock_response
|
|
|
|
result = loader._get_provider_from_html(1, 1, "naruto")
|
|
assert "VOE" in result
|
|
assert "Vidmoly" in result
|
|
assert 1 in result["VOE"]
|
|
assert 2 in result["Vidmoly"]
|
|
|
|
def test_parse_providers_empty_html(self, loader):
|
|
"""_get_provider_from_html should return empty dict for no providers."""
|
|
html = "<html><body></body></html>"
|
|
mock_response = MagicMock()
|
|
mock_response.content = html.encode("utf-8")
|
|
loader.session.get.return_value = mock_response
|
|
|
|
result = loader._get_provider_from_html(1, 1, "naruto")
|
|
assert result == {}
|
|
|
|
def test_parse_providers_missing_lang_key(self, loader):
|
|
"""Providers without data-lang-key should be skipped."""
|
|
html = """
|
|
<html><body>
|
|
<li class="episodeLink1">
|
|
<h4>VOE</h4>
|
|
<a class="watchEpisode" href="/redirect/111"></a>
|
|
</li>
|
|
</body></html>
|
|
"""
|
|
mock_response = MagicMock()
|
|
mock_response.content = html.encode("utf-8")
|
|
loader.session.get.return_value = mock_response
|
|
|
|
result = loader._get_provider_from_html(1, 1, "naruto")
|
|
assert result == {}
|
|
|
|
|
|
class TestAniworldSeasonEpisodeCount:
|
|
"""Test season and episode count retrieval."""
|
|
|
|
@patch("src.server.providers.aniworld_provider.requests.get")
|
|
def test_get_season_episode_count(self, mock_get, loader):
|
|
"""get_season_episode_count should return correct counts."""
|
|
# Main page with 2 seasons
|
|
main_html = '<html><body><meta itemprop="numberOfSeasons" content="2" /></body></html>'
|
|
# Season 1 with 3 episodes
|
|
s1_html = """
|
|
<html><body>
|
|
<a href="/anime/stream/naruto/staffel-1/episode-1">Ep1</a>
|
|
<a href="/anime/stream/naruto/staffel-1/episode-2">Ep2</a>
|
|
<a href="/anime/stream/naruto/staffel-1/episode-3">Ep3</a>
|
|
</body></html>
|
|
"""
|
|
# Season 2 with 2 episodes
|
|
s2_html = """
|
|
<html><body>
|
|
<a href="/anime/stream/naruto/staffel-2/episode-1">Ep1</a>
|
|
<a href="/anime/stream/naruto/staffel-2/episode-2">Ep2</a>
|
|
</body></html>
|
|
"""
|
|
|
|
responses = [
|
|
MagicMock(content=main_html.encode()),
|
|
MagicMock(content=s1_html.encode()),
|
|
MagicMock(content=s2_html.encode()),
|
|
]
|
|
mock_get.side_effect = responses
|
|
|
|
result = loader.get_season_episode_count("naruto")
|
|
assert result == {1: 3, 2: 2}
|
|
|
|
@patch("src.server.providers.aniworld_provider.requests.get")
|
|
def test_get_season_episode_count_no_seasons(self, mock_get, loader):
|
|
"""get_season_episode_count should return empty dict when no seasons."""
|
|
html = "<html><body></body></html>"
|
|
mock_get.return_value = MagicMock(content=html.encode())
|
|
|
|
result = loader.get_season_episode_count("nonexistent")
|
|
assert result == {}
|
|
|
|
|
|
class TestAniworldCache:
|
|
"""Test cache operations."""
|
|
|
|
def test_clear_cache(self, loader):
|
|
"""clear_cache should empty both caches."""
|
|
loader._KeyHTMLDict["key1"] = "data"
|
|
loader._EpisodeHTMLDict[("key1", 1, 1)] = "data"
|
|
|
|
loader.clear_cache()
|
|
|
|
assert len(loader._KeyHTMLDict) == 0
|
|
assert len(loader._EpisodeHTMLDict) == 0
|
|
|
|
def test_remove_from_cache(self, loader):
|
|
"""remove_from_cache should only clear episode cache."""
|
|
loader._KeyHTMLDict["key1"] = "data"
|
|
loader._EpisodeHTMLDict[("key1", 1, 1)] = "data"
|
|
|
|
loader.remove_from_cache()
|
|
|
|
assert len(loader._KeyHTMLDict) == 1
|
|
assert len(loader._EpisodeHTMLDict) == 0
|
|
|
|
|
|
class TestAniworldEvents:
|
|
"""Test event subscription for download progress."""
|
|
|
|
def test_subscribe_download_progress(self, loader):
|
|
"""subscribe_download_progress should register handler."""
|
|
handler = MagicMock()
|
|
loader.subscribe_download_progress(handler)
|
|
# Fire event to verify handler was registered
|
|
loader.events.download_progress({"status": "downloading"})
|
|
handler.assert_called_once_with({"status": "downloading"})
|
|
|
|
def test_unsubscribe_download_progress(self, loader):
|
|
"""unsubscribe_download_progress should remove handler."""
|
|
handler = MagicMock()
|
|
loader.subscribe_download_progress(handler)
|
|
loader.unsubscribe_download_progress(handler)
|
|
# Fire event - handler should NOT be called
|
|
loader.events.download_progress({"status": "downloading"})
|
|
handler.assert_not_called()
|
|
|
|
|
|
class TestAniworldHealthCheck:
|
|
"""Tests for the _check_url_alive HEAD probe."""
|
|
|
|
def test_returns_true_on_200(self, loader):
|
|
loader.session.head.return_value = MagicMock(status_code=200)
|
|
assert loader._check_url_alive("https://provider/x") is True
|
|
|
|
def test_returns_false_on_404(self, loader):
|
|
loader.session.head.return_value = MagicMock(status_code=404)
|
|
assert loader._check_url_alive("https://provider/x") is False
|
|
|
|
def test_returns_false_on_403(self, loader):
|
|
loader.session.head.return_value = MagicMock(status_code=403)
|
|
assert loader._check_url_alive("https://provider/x") is False
|
|
|
|
def test_falls_back_to_get_when_head_disallowed(self, loader):
|
|
loader.session.head.return_value = MagicMock(status_code=405)
|
|
get_resp = MagicMock(status_code=200)
|
|
get_resp.close = MagicMock()
|
|
loader.session.get.return_value = get_resp
|
|
assert loader._check_url_alive("https://provider/x") is True
|
|
loader.session.get.assert_called_once()
|
|
|
|
def test_returns_false_on_connection_error(self, loader):
|
|
loader.session.head.side_effect = requests.ConnectionError("boom")
|
|
assert loader._check_url_alive("https://provider/x") is False
|
|
|
|
|
|
class TestAniworldDirectStream:
|
|
"""Tests for the _try_direct_stream fast-path."""
|
|
|
|
def _build_response(self, status, content_type, body=b""):
|
|
resp = MagicMock()
|
|
resp.ok = status < 400
|
|
resp.status_code = status
|
|
resp.headers = {"Content-Type": content_type}
|
|
resp.iter_content = MagicMock(return_value=[body])
|
|
resp.__enter__ = MagicMock(return_value=resp)
|
|
resp.__exit__ = MagicMock(return_value=False)
|
|
return resp
|
|
|
|
def test_skips_non_video_content(self, loader, tmp_path):
|
|
target = tmp_path / "out.mp4"
|
|
loader.session.get.return_value = self._build_response(
|
|
200, "text/html"
|
|
)
|
|
assert loader._try_direct_stream(
|
|
"https://x", str(target), None, 10
|
|
) is False
|
|
assert not target.exists()
|
|
|
|
def test_writes_video_content(self, loader, tmp_path):
|
|
target = tmp_path / "out.mp4"
|
|
loader.session.get.return_value = self._build_response(
|
|
200, "video/mp4", body=b"abc123"
|
|
)
|
|
assert loader._try_direct_stream(
|
|
"https://x", str(target), None, 10
|
|
) is True
|
|
assert target.read_bytes() == b"abc123"
|
|
|
|
def test_returns_false_on_http_error(self, loader, tmp_path):
|
|
target = tmp_path / "out.mp4"
|
|
loader.session.get.return_value = self._build_response(
|
|
404, "video/mp4"
|
|
)
|
|
assert loader._try_direct_stream(
|
|
"https://x", str(target), None, 10
|
|
) is False
|
|
|
|
def test_returns_false_on_request_exception(self, loader, tmp_path):
|
|
loader.session.get.side_effect = requests.RequestException("nope")
|
|
assert loader._try_direct_stream(
|
|
"https://x", str(tmp_path / "out.mp4"), None, 10
|
|
) is False
|
|
|
|
|
|
class TestAniworldProviderSelection:
|
|
"""Tests for _select_providers_for_episode ordering and filtering."""
|
|
|
|
def test_orders_by_supported_preference(self, loader):
|
|
loader.is_language = MagicMock(return_value=True)
|
|
loader._get_provider_from_html = MagicMock(return_value={
|
|
"Vidoza": {1: "https://aniworld.to/redirect/2"},
|
|
"VOE": {1: "https://aniworld.to/redirect/1"},
|
|
})
|
|
result = loader._select_providers_for_episode(1, 1, "k", "German Dub")
|
|
assert [name for name, _ in result] == ["VOE", "Vidoza"]
|
|
|
|
def test_filters_by_language(self, loader):
|
|
loader.is_language = MagicMock(return_value=True)
|
|
loader._get_provider_from_html = MagicMock(return_value={
|
|
"VOE": {2: "https://aniworld.to/redirect/1"}, # English only
|
|
})
|
|
result = loader._select_providers_for_episode(1, 1, "k", "German Dub")
|
|
assert result == []
|
|
|
|
def test_returns_empty_when_language_unavailable(self, loader):
|
|
loader.is_language = MagicMock(return_value=False)
|
|
loader._get_provider_from_html = MagicMock()
|
|
result = loader._select_providers_for_episode(1, 1, "k", "German Dub")
|
|
assert result == []
|
|
loader._get_provider_from_html.assert_not_called()
|
|
|
|
|
|
class TestAniworldDownloadFailover:
|
|
"""Tests for the failover rotation in download()."""
|
|
|
|
@pytest.fixture
|
|
def patched_loader(self, loader, tmp_path):
|
|
"""Loader with side-effect heavy methods stubbed."""
|
|
loader.get_title = MagicMock(return_value="Anime")
|
|
loader._select_providers_for_episode = MagicMock(return_value=[
|
|
("VOE", "https://aniworld.to/redirect/1"),
|
|
("Doodstream", "https://aniworld.to/redirect/2"),
|
|
])
|
|
loader._check_url_alive = MagicMock(return_value=True)
|
|
loader._try_direct_stream = MagicMock(return_value=False)
|
|
loader.clear_cache = MagicMock()
|
|
loader._resolve_direct_link = MagicMock(
|
|
return_value=("https://cdn/video.m3u8", {"Referer": "https://x"})
|
|
)
|
|
return loader
|
|
|
|
def test_skips_provider_when_url_dead(self, patched_loader, tmp_path):
|
|
# First provider URL fails health check, second succeeds and downloads
|
|
patched_loader._check_url_alive.side_effect = [False, True]
|
|
|
|
def fake_ytdl(opts):
|
|
outpath = opts["outtmpl"]
|
|
os.makedirs(os.path.dirname(outpath), exist_ok=True)
|
|
with open(outpath, "wb") as fh:
|
|
fh.write(b"data")
|
|
ydl = MagicMock()
|
|
ydl.__enter__ = MagicMock(return_value=ydl)
|
|
ydl.__exit__ = MagicMock(return_value=False)
|
|
ydl.extract_info = MagicMock(return_value={"title": "t"})
|
|
return ydl
|
|
|
|
with patch(
|
|
"src.server.providers.aniworld_provider.YoutubeDL",
|
|
side_effect=fake_ytdl,
|
|
):
|
|
result = patched_loader.download(
|
|
str(tmp_path), "Anime", 1, 1, "k", "German Dub"
|
|
)
|
|
assert result is True
|
|
assert patched_loader._check_url_alive.call_count == 2
|
|
# Only second provider (Doodstream) attempted resolve
|
|
patched_loader._resolve_direct_link.assert_called_once_with(
|
|
"https://aniworld.to/redirect/2", "Doodstream"
|
|
)
|
|
|
|
def test_falls_back_to_next_provider_on_ytdl_error(
|
|
self, patched_loader, tmp_path
|
|
):
|
|
calls = {"n": 0}
|
|
|
|
def fake_ytdl(opts):
|
|
calls["n"] += 1
|
|
if calls["n"] == 1:
|
|
raise Exception("HTTP 404 from VOE")
|
|
outpath = opts["outtmpl"]
|
|
os.makedirs(os.path.dirname(outpath), exist_ok=True)
|
|
with open(outpath, "wb") as fh:
|
|
fh.write(b"ok")
|
|
ydl = MagicMock()
|
|
ydl.__enter__ = MagicMock(return_value=ydl)
|
|
ydl.__exit__ = MagicMock(return_value=False)
|
|
ydl.extract_info = MagicMock(return_value={"title": "t"})
|
|
return ydl
|
|
|
|
with patch(
|
|
"src.server.providers.aniworld_provider.YoutubeDL",
|
|
side_effect=fake_ytdl,
|
|
):
|
|
result = patched_loader.download(
|
|
str(tmp_path), "Anime", 1, 1, "k", "German Dub"
|
|
)
|
|
assert result is True
|
|
assert calls["n"] == 2
|
|
|
|
def test_uses_direct_stream_when_available(
|
|
self, patched_loader, tmp_path
|
|
):
|
|
def write_direct(link, output, headers, timeout):
|
|
os.makedirs(os.path.dirname(output), exist_ok=True)
|
|
with open(output, "wb") as fh:
|
|
fh.write(b"vid")
|
|
return True
|
|
|
|
patched_loader._try_direct_stream.side_effect = write_direct
|
|
|
|
with patch(
|
|
"src.server.providers.aniworld_provider.YoutubeDL"
|
|
) as mock_ydl:
|
|
result = patched_loader.download(
|
|
str(tmp_path), "Anime", 1, 1, "k", "German Dub"
|
|
)
|
|
assert result is True
|
|
mock_ydl.assert_not_called()
|
|
|
|
def test_returns_false_when_all_providers_fail(
|
|
self, patched_loader, tmp_path, caplog
|
|
):
|
|
with patch(
|
|
"src.server.providers.aniworld_provider.YoutubeDL",
|
|
side_effect=Exception("HTTP 404"),
|
|
):
|
|
result = patched_loader.download(
|
|
str(tmp_path), "Anime", 1, 1, "k", "German Dub"
|
|
)
|
|
assert result is False
|
|
assert "All download providers failed" in caplog.text
|
|
# Both providers attempted
|
|
assert patched_loader._resolve_direct_link.call_count == 2
|
|
|
|
def test_returns_false_when_no_providers_advertised(
|
|
self, patched_loader, tmp_path, caplog
|
|
):
|
|
patched_loader._select_providers_for_episode.return_value = []
|
|
result = patched_loader.download(
|
|
str(tmp_path), "Anime", 1, 1, "k", "German Dub"
|
|
)
|
|
assert result is False
|
|
assert "No providers advertised" in caplog.text
|
|
|
|
|
|
class TestAniworldHeaderParsing:
|
|
"""_parse_provider_headers normalizes legacy strings to dict."""
|
|
|
|
def test_parses_referer(self):
|
|
result = AniworldLoader._parse_provider_headers(
|
|
['Referer: "https://vidmoly.to"']
|
|
)
|
|
assert result == {"Referer": "https://vidmoly.to"}
|
|
|
|
def test_handles_none(self):
|
|
assert AniworldLoader._parse_provider_headers(None) == {}
|
|
|
|
def test_skips_malformed_entries(self):
|
|
result = AniworldLoader._parse_provider_headers(
|
|
["not-a-header", "Key: value"]
|
|
)
|
|
assert result == {"Key": "value"}
|
|
|
|
|
|
class TestDecodeHtmlContent:
|
|
"""Test _decode_html_content function."""
|
|
|
|
def test_decodes_utf8_content(self):
|
|
"""Should correctly decode UTF-8 content."""
|
|
from src.server.providers.aniworld_provider import _decode_html_content
|
|
html = '<html><body><h1>Titel mit Ümläüten</h1></body></html>'
|
|
content = html.encode('utf-8')
|
|
result = _decode_html_content(content)
|
|
assert 'Titel mit Ümläüten' in result
|
|
|
|
def test_decodes_latin1_content(self):
|
|
"""Should correctly decode Latin-1 content when chardet detects it."""
|
|
from src.server.providers.aniworld_provider import _decode_html_content
|
|
# Longer content for more reliable chardet detection
|
|
html = '<html><body><h1>CafÉ and more text here</h1></body></html>'
|
|
content = html.encode('latin-1')
|
|
result = _decode_html_content(content)
|
|
assert 'Caf' in result # Decoded content contains expected substring
|
|
|
|
def test_replaces_invalid_bytes(self):
|
|
"""Should replace invalid bytes with replacement character."""
|
|
from src.server.providers.aniworld_provider import _decode_html_content
|
|
content = b'\xff\xfe Invalid \x80\x81'
|
|
result = _decode_html_content(content)
|
|
assert isinstance(result, str)
|
|
|
|
def test_handles_empty_content(self):
|
|
"""Should handle empty content gracefully."""
|
|
from src.server.providers.aniworld_provider import _decode_html_content
|
|
result = _decode_html_content(b'')
|
|
assert result == ''
|