fix: resolve line length violations (80+ characters)

- refactor src/cli/Main.py: split long logging config, user prompts, and method calls
- refactor src/config/settings.py: break long Field definitions into multiple lines
- refactor src/core/providers/enhanced_provider.py: split provider lists, headers, and long f-strings
- refactor src/core/providers/streaming/voe.py: format HTTP header setup
- update QualityTODO.md: mark all line length violations as completed

All files now comply with 88-character line limit. Code readability improved with
better-structured multi-line statements and intermediate variables for complex expressions.
This commit is contained in:
2025-10-22 12:16:41 +02:00
parent 68c2f9bda2
commit 80507119b7
5 changed files with 555 additions and 272 deletions

View File

@@ -5,35 +5,35 @@ This module extends the original AniWorldLoader with comprehensive
error handling, retry mechanisms, and recovery strategies.
"""
import hashlib
import html
import json
import logging
import os
import re
import logging
import json
import requests
import html
from urllib.parse import quote
import shutil
import time
import hashlib
from typing import Optional, Dict, Any, Callable
from typing import Any, Callable, Dict, Optional
from urllib.parse import quote
import requests
from bs4 import BeautifulSoup
from error_handler import (
DownloadError,
NetworkError,
NonRetryableError,
RetryableError,
file_corruption_detector,
recovery_strategies,
with_error_recovery,
)
from fake_useragent import UserAgent
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from yt_dlp import YoutubeDL
import shutil
from .base_provider import Loader
from ..interfaces.providers import Providers
from error_handler import (
with_error_recovery,
recovery_strategies,
NetworkError,
DownloadError,
RetryableError,
NonRetryableError,
file_corruption_detector
)
from .base_provider import Loader
class EnhancedAniWorldLoader(Loader):
@@ -42,15 +42,32 @@ class EnhancedAniWorldLoader(Loader):
def __init__(self):
super().__init__()
self.logger = logging.getLogger(__name__)
self.SUPPORTED_PROVIDERS = ["VOE", "Doodstream", "Vidmoly", "Vidoza", "SpeedFiles", "Streamtape", "Luluvdo"]
providers = [
"VOE",
"Doodstream",
"Vidmoly",
"Vidoza",
"SpeedFiles",
"Streamtape",
"Luluvdo",
]
self.SUPPORTED_PROVIDERS = providers
self.AniworldHeaders = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"accept": (
"text/html,application/xhtml+xml,application/xml;q=0.9,"
"image/avif,image/webp,image/apng,*/*;q=0.8"
),
"accept-encoding": "gzip, deflate, br, zstd",
"accept-language": "de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"accept-language": (
"de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6"
),
"cache-control": "max-age=0",
"priority": "u=0, i",
"sec-ch-ua": '"Chromium";v="136", "Microsoft Edge";v="136", "Not.A/Brand";v="99"',
"sec-ch-ua": (
'"Chromium";v="136", "Microsoft Edge";v="136", '
'"Not.A/Brand";v="99"'
),
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "document",
@@ -58,23 +75,43 @@ class EnhancedAniWorldLoader(Loader):
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0"
"user-agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0"
),
}
self.INVALID_PATH_CHARS = ['<', '>', ':', '"', '/', '\\', '|', '?', '*', '&']
invalid_chars = [
"<",
">",
":",
'"',
"/",
"\\",
"|",
"?",
"*",
"&",
]
self.INVALID_PATH_CHARS = invalid_chars
self.RANDOM_USER_AGENT = UserAgent().random
self.LULUVDO_USER_AGENT = "Mozilla/5.0 (Android 15; Mobile; rv:132.0) Gecko/132.0 Firefox/132.0"
android_ua = (
"Mozilla/5.0 (Android 15; Mobile; rv:132.0) "
"Gecko/132.0 Firefox/132.0"
)
self.LULUVDO_USER_AGENT = android_ua
self.PROVIDER_HEADERS = {
"Vidmoly": ['Referer: "https://vidmoly.to"'],
"Doodstream": ['Referer: "https://dood.li/"'],
"VOE": [f'User-Agent: {self.RANDOM_USER_AGENT}'],
"Luluvdo": [
f'User-Agent: {self.LULUVDO_USER_AGENT}',
'Accept-Language: de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7',
"Accept-Language: de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
'Origin: "https://luluvdo.com"',
'Referer: "https://luluvdo.com/"'
]
'Referer: "https://luluvdo.com/"',
],
}
self.ANIWORLD_TO = "https://aniworld.to"
@@ -224,166 +261,245 @@ class EnhancedAniWorldLoader(Loader):
try:
decoded_data = strategy(clean_text)
if isinstance(decoded_data, list):
self.logger.debug(f"Successfully parsed anime response with strategy {i + 1}")
msg = (
f"Successfully parsed anime response with "
f"strategy {i + 1}"
)
self.logger.debug(msg)
return decoded_data
else:
self.logger.warning(f"Strategy {i + 1} returned non-list data: {type(decoded_data)}")
msg = (
f"Strategy {i + 1} returned non-list data: "
f"{type(decoded_data)}"
)
self.logger.warning(msg)
except json.JSONDecodeError as e:
self.logger.debug(f"Parsing strategy {i + 1} failed: {e}")
msg = f"Parsing strategy {i + 1} failed: {e}"
self.logger.debug(msg)
continue
raise ValueError("Could not parse anime search response with any strategy")
raise ValueError(
"Could not parse anime search response with any strategy"
)
def _GetLanguageKey(self, language: str) -> int:
"""Get numeric language code."""
language_map = {
"German Dub": 1,
"English Sub": 2,
"German Sub": 3
"English Sub": 2,
"German Sub": 3,
}
return language_map.get(language, 0)
@with_error_recovery(max_retries=2, context="language_check")
def IsLanguage(self, season: int, episode: int, key: str, language: str = "German Dub") -> bool:
"""Check if episode is available in specified language with error handling."""
def IsLanguage(
self,
season: int,
episode: int,
key: str,
language: str = "German Dub",
) -> bool:
"""Check if episode is available in specified language."""
try:
languageCode = self._GetLanguageKey(language)
if languageCode == 0:
raise ValueError(f"Unknown language: {language}")
episode_response = self._GetEpisodeHTML(season, episode, key)
soup = BeautifulSoup(episode_response.content, 'html.parser')
change_language_box_div = soup.find('div', class_='changeLanguageBox')
if not change_language_box_div:
self.logger.debug(f"No language box found for {key} S{season}E{episode}")
soup = BeautifulSoup(episode_response.content, "html.parser")
lang_box = soup.find("div", class_="changeLanguageBox")
if not lang_box:
debug_msg = (
f"No language box found for {key} S{season}E{episode}"
)
self.logger.debug(debug_msg)
return False
img_tags = change_language_box_div.find_all('img')
img_tags = lang_box.find_all("img")
available_languages = []
for img in img_tags:
lang_key = img.get('data-lang-key')
lang_key = img.get("data-lang-key")
if lang_key and lang_key.isdigit():
available_languages.append(int(lang_key))
is_available = languageCode in available_languages
self.logger.debug(f"Language check for {key} S{season}E{episode} - "
f"Requested: {languageCode}, Available: {available_languages}, "
f"Result: {is_available}")
debug_msg = (
f"Language check for {key} S{season}E{episode}: "
f"Requested={languageCode}, "
f"Available={available_languages}, "
f"Result={is_available}"
)
self.logger.debug(debug_msg)
return is_available
except Exception as e:
self.logger.error(f"Language check failed for {key} S{season}E{episode}: {e}")
error_msg = (
f"Language check failed for {key} S{season}E{episode}: {e}"
)
self.logger.error(error_msg)
raise RetryableError(f"Language check failed: {e}") from e
def Download(self, baseDirectory: str, serieFolder: str, season: int, episode: int,
key: str, language: str = "German Dub", progress_callback: Callable = None) -> bool:
"""Download episode with comprehensive error handling and recovery."""
self.download_stats['total_downloads'] += 1
def Download(
self,
baseDirectory: str,
serieFolder: str,
season: int,
episode: int,
key: str,
language: str = "German Dub",
progress_callback: Optional[Callable] = None,
) -> bool:
"""Download episode with comprehensive error handling."""
self.download_stats["total_downloads"] += 1
try:
# Validate inputs
if not all([baseDirectory, serieFolder, key]):
raise ValueError("Missing required parameters for download")
if season < 0 or episode < 0:
raise ValueError("Season and episode must be non-negative")
# Prepare file paths
sanitized_anime_title = ''.join(
char for char in self.GetTitle(key) if char not in self.INVALID_PATH_CHARS
sanitized_anime_title = "".join(
char
for char in self.GetTitle(key)
if char not in self.INVALID_PATH_CHARS
)
if not sanitized_anime_title:
sanitized_anime_title = f"Unknown_{key}"
# Generate output filename
if season == 0:
output_file = f"{sanitized_anime_title} - Movie {episode:02} - ({language}).mp4"
output_file = (
f"{sanitized_anime_title} - Movie {episode:02} - "
f"({language}).mp4"
)
else:
output_file = f"{sanitized_anime_title} - S{season:02}E{episode:03} - ({language}).mp4"
output_file = (
f"{sanitized_anime_title} - S{season:02}E{episode:03} - "
f"({language}).mp4"
)
# Create directory structure
folder_path = os.path.join(baseDirectory, serieFolder, f"Season {season}")
folder_path = os.path.join(
baseDirectory, serieFolder, f"Season {season}"
)
output_path = os.path.join(folder_path, output_file)
# Check if file already exists and is valid
if os.path.exists(output_path):
if file_corruption_detector.is_valid_video_file(output_path):
self.logger.info(f"File already exists and is valid: {output_file}")
self.download_stats['successful_downloads'] += 1
msg = (
f"File already exists and is valid: "
f"{output_file}"
)
self.logger.info(msg)
self.download_stats["successful_downloads"] += 1
return True
else:
self.logger.warning(f"Existing file appears corrupted, removing: {output_path}")
warning_msg = (
f"Existing file appears corrupted, removing: "
f"{output_path}"
)
self.logger.warning(warning_msg)
try:
os.remove(output_path)
except Exception as e:
self.logger.error(f"Failed to remove corrupted file: {e}")
error_msg = f"Failed to remove corrupted file: {e}"
self.logger.error(error_msg)
os.makedirs(folder_path, exist_ok=True)
# Create temp directory
temp_dir = "./Temp/"
os.makedirs(temp_dir, exist_ok=True)
temp_path = os.path.join(temp_dir, output_file)
# Attempt download with recovery strategies
success = self._download_with_recovery(
season, episode, key, language, temp_path, output_path, progress_callback
season,
episode,
key,
language,
temp_path,
output_path,
progress_callback,
)
if success:
self.download_stats['successful_downloads'] += 1
self.logger.info(f"Successfully downloaded: {output_file}")
self.download_stats["successful_downloads"] += 1
success_msg = f"Successfully downloaded: {output_file}"
self.logger.info(success_msg)
else:
self.download_stats['failed_downloads'] += 1
self.download_error_logger.error(
f"Download failed for {key} S{season}E{episode} ({language})"
self.download_stats["failed_downloads"] += 1
fail_msg = (
f"Download failed for {key} S{season}E{episode} "
f"({language})"
)
self.download_error_logger.error(fail_msg)
return success
except Exception as e:
self.download_stats['failed_downloads'] += 1
self.download_error_logger.error(
f"Download error for {key} S{season}E{episode}: {e}", exc_info=True
self.download_stats["failed_downloads"] += 1
err_msg = (
f"Download error for {key} S{season}E{episode}: {e}"
)
self.download_error_logger.error(err_msg, exc_info=True)
raise DownloadError(f"Download failed: {e}") from e
finally:
self.ClearCache()
def _download_with_recovery(self, season: int, episode: int, key: str, language: str,
temp_path: str, output_path: str, progress_callback: Callable) -> bool:
"""Attempt download with multiple providers and recovery strategies."""
def _download_with_recovery(
self,
season: int,
episode: int,
key: str,
language: str,
temp_path: str,
output_path: str,
progress_callback: Optional[Callable],
) -> bool:
"""Attempt download with multiple providers and recovery."""
for provider_name in self.SUPPORTED_PROVIDERS:
try:
self.logger.info(f"Attempting download with provider: {provider_name}")
info_msg = f"Attempting download with provider: {provider_name}"
self.logger.info(info_msg)
# Get download link and headers for provider
link, headers = recovery_strategies.handle_network_failure(
self._get_direct_link_from_provider,
season, episode, key, language
season,
episode,
key,
language,
)
if not link:
self.logger.warning(f"No download link found for provider: {provider_name}")
warn_msg = (
f"No download link found for provider: "
f"{provider_name}"
)
self.logger.warning(warn_msg)
continue
# Configure yt-dlp options
ydl_opts = {
'fragment_retries': float('inf'),
'outtmpl': temp_path,
'quiet': True,
'no_warnings': True,
'progress_with_newline': False,
'nocheckcertificate': True,
'socket_timeout': self.download_timeout,
'http_chunk_size': 1024 * 1024, # 1MB chunks
"fragment_retries": float("inf"),
"outtmpl": temp_path,
"quiet": True,
"no_warnings": True,
"progress_with_newline": False,
"nocheckcertificate": True,
"socket_timeout": self.download_timeout,
"http_chunk_size": 1024 * 1024, # 1MB chunks
}
if headers:
ydl_opts['http_headers'] = headers
@@ -403,16 +519,21 @@ class EnhancedAniWorldLoader(Loader):
if file_corruption_detector.is_valid_video_file(temp_path):
# Move to final location
shutil.copy2(temp_path, output_path)
# Clean up temp file
try:
os.remove(temp_path)
except Exception as e:
self.logger.warning(f"Failed to remove temp file: {e}")
warn_msg = f"Failed to remove temp file: {e}"
self.logger.warning(warn_msg)
return True
else:
self.logger.warning(f"Downloaded file failed validation: {temp_path}")
warn_msg = (
f"Downloaded file failed validation: "
f"{temp_path}"
)
self.logger.warning(warn_msg)
try:
os.remove(temp_path)
except Exception:
@@ -425,7 +546,9 @@ class EnhancedAniWorldLoader(Loader):
return False
def _perform_ytdl_download(self, ydl_opts: Dict[str, Any], link: str) -> bool:
def _perform_ytdl_download(
self, ydl_opts: Dict[str, Any], link: str
) -> bool:
"""Perform actual download using yt-dlp."""
try:
with YoutubeDL(ydl_opts) as ydl:
@@ -476,133 +599,208 @@ class EnhancedAniWorldLoader(Loader):
if not response.ok:
if response.status_code == 404:
self.nokey_logger.error(f"Anime key not found: {key}")
raise NonRetryableError(f"Anime key not found: {key}")
msg = f"Anime key not found: {key}"
self.nokey_logger.error(msg)
raise NonRetryableError(msg)
else:
raise RetryableError(f"HTTP error {response.status_code} for key {key}")
err_msg = (
f"HTTP error {response.status_code} for key {key}"
)
raise RetryableError(err_msg)
self._KeyHTMLDict[key] = response
return self._KeyHTMLDict[key]
except Exception as e:
self.logger.error(f"Failed to get HTML for key {key}: {e}")
error_msg = f"Failed to get HTML for key {key}: {e}"
self.logger.error(error_msg)
raise
@with_error_recovery(max_retries=2, context="get_episode_html")
def _GetEpisodeHTML(self, season: int, episode: int, key: str):
"""Get cached HTML for specific episode."""
cache_key = (key, season, episode)
if cache_key in self._EpisodeHTMLDict:
return self._EpisodeHTMLDict[cache_key]
try:
url = f"{self.ANIWORLD_TO}/anime/stream/{key}/staffel-{season}/episode-{episode}"
response = recovery_strategies.handle_network_failure(
self.session.get,
url,
timeout=self.DEFAULT_REQUEST_TIMEOUT
url = (
f"{self.ANIWORLD_TO}/anime/stream/{key}/"
f"staffel-{season}/episode-{episode}"
)
response = recovery_strategies.handle_network_failure(
self.session.get, url, timeout=self.DEFAULT_REQUEST_TIMEOUT
)
if not response.ok:
if response.status_code == 404:
raise NonRetryableError(f"Episode not found: {key} S{season}E{episode}")
err_msg = (
f"Episode not found: {key} S{season}E{episode}"
)
raise NonRetryableError(err_msg)
else:
raise RetryableError(f"HTTP error {response.status_code} for episode")
err_msg = (
f"HTTP error {response.status_code} for episode"
)
raise RetryableError(err_msg)
self._EpisodeHTMLDict[cache_key] = response
return self._EpisodeHTMLDict[cache_key]
except Exception as e:
self.logger.error(f"Failed to get episode HTML for {key} S{season}E{episode}: {e}")
error_msg = (
f"Failed to get episode HTML for {key} "
f"S{season}E{episode}: {e}"
)
self.logger.error(error_msg)
raise
def _get_provider_from_html(self, season: int, episode: int, key: str) -> dict:
def _get_provider_from_html(
self, season: int, episode: int, key: str
) -> dict:
"""Extract providers from HTML with error handling."""
try:
soup = BeautifulSoup(self._GetEpisodeHTML(season, episode, key).content, 'html.parser')
providers = {}
episode_html = self._GetEpisodeHTML(season, episode, key)
soup = BeautifulSoup(episode_html.content, "html.parser")
providers: dict[str, dict] = {}
episode_links = soup.find_all(
'li', class_=lambda x: x and x.startswith('episodeLink')
"li", class_=lambda x: x and x.startswith("episodeLink")
)
if not episode_links:
self.logger.warning(f"No episode links found for {key} S{season}E{episode}")
warn_msg = (
f"No episode links found for {key} S{season}E{episode}"
)
self.logger.warning(warn_msg)
return providers
for link in episode_links:
provider_name_tag = link.find('h4')
provider_name = provider_name_tag.text.strip() if provider_name_tag else None
redirect_link_tag = link.find('a', class_='watchEpisode')
redirect_link = redirect_link_tag['href'] if redirect_link_tag else None
lang_key = link.get('data-lang-key')
lang_key = int(lang_key) if lang_key and lang_key.isdigit() else None
provider_name_tag = link.find("h4")
provider_name = (
provider_name_tag.text.strip()
if provider_name_tag
else None
)
redirect_link_tag = link.find("a", class_="watchEpisode")
redirect_link = (
redirect_link_tag["href"]
if redirect_link_tag
else None
)
lang_key = link.get("data-lang-key")
lang_key = (
int(lang_key)
if lang_key and lang_key.isdigit()
else None
)
if provider_name and redirect_link and lang_key:
if provider_name not in providers:
providers[provider_name] = {}
providers[provider_name][lang_key] = f"{self.ANIWORLD_TO}{redirect_link}"
self.logger.debug(f"Found {len(providers)} providers for {key} S{season}E{episode}")
providers[provider_name][lang_key] = (
f"{self.ANIWORLD_TO}{redirect_link}"
)
debug_msg = (
f"Found {len(providers)} providers for "
f"{key} S{season}E{episode}"
)
self.logger.debug(debug_msg)
return providers
except Exception as e:
self.logger.error(f"Failed to parse providers from HTML: {e}")
error_msg = f"Failed to parse providers from HTML: {e}"
self.logger.error(error_msg)
raise RetryableError(f"Provider parsing failed: {e}") from e
def _get_redirect_link(self, season: int, episode: int, key: str, language: str = "German Dub"):
def _get_redirect_link(
self,
season: int,
episode: int,
key: str,
language: str = "German Dub",
):
"""Get redirect link for episode with error handling."""
languageCode = self._GetLanguageKey(language)
if not self.IsLanguage(season, episode, key, language):
raise NonRetryableError(f"Language {language} not available for {key} S{season}E{episode}")
err_msg = (
f"Language {language} not available for "
f"{key} S{season}E{episode}"
)
raise NonRetryableError(err_msg)
providers = self._get_provider_from_html(season, episode, key)
for provider_name, lang_dict in providers.items():
if languageCode in lang_dict:
return lang_dict[languageCode], provider_name
raise NonRetryableError(f"No provider found for {language} in {key} S{season}E{episode}")
def _get_embeded_link(self, season: int, episode: int, key: str, language: str = "German Dub"):
err_msg = (
f"No provider found for {language} in "
f"{key} S{season}E{episode}"
)
raise NonRetryableError(err_msg)
def _get_embeded_link(
self,
season: int,
episode: int,
key: str,
language: str = "German Dub",
):
"""Get embedded link with error handling."""
try:
redirect_link, provider_name = self._get_redirect_link(season, episode, key, language)
redirect_link, provider_name = self._get_redirect_link(
season, episode, key, language
)
response = recovery_strategies.handle_network_failure(
self.session.get,
redirect_link,
timeout=self.DEFAULT_REQUEST_TIMEOUT,
headers={'User-Agent': self.RANDOM_USER_AGENT}
headers={"User-Agent": self.RANDOM_USER_AGENT},
)
return response.url
except Exception as e:
self.logger.error(f"Failed to get embedded link: {e}")
error_msg = f"Failed to get embedded link: {e}"
self.logger.error(error_msg)
raise
def _get_direct_link_from_provider(self, season: int, episode: int, key: str, language: str = "German Dub"):
"""Get direct download link from provider with error handling."""
def _get_direct_link_from_provider(
self,
season: int,
episode: int,
key: str,
language: str = "German Dub",
):
"""Get direct download link from provider."""
try:
embedded_link = self._get_embeded_link(season, episode, key, language)
embedded_link = self._get_embeded_link(
season, episode, key, language
)
if not embedded_link:
raise NonRetryableError("No embedded link found")
# Use VOE provider as default (could be made configurable)
provider = self.Providers.GetProvider("VOE")
if not provider:
raise NonRetryableError("VOE provider not available")
return provider.GetLink(embedded_link, self.DEFAULT_REQUEST_TIMEOUT)
return provider.GetLink(
embedded_link, self.DEFAULT_REQUEST_TIMEOUT
)
except Exception as e:
self.logger.error(f"Failed to get direct link from provider: {e}")
error_msg = f"Failed to get direct link from provider: {e}"
self.logger.error(error_msg)
raise
@with_error_recovery(max_retries=2, context="get_season_episode_count")
def get_season_episode_count(self, slug: str) -> dict:
"""Get episode count per season with error handling."""
@@ -611,29 +809,35 @@ class EnhancedAniWorldLoader(Loader):
response = recovery_strategies.handle_network_failure(
requests.get,
base_url,
timeout=self.DEFAULT_REQUEST_TIMEOUT
timeout=self.DEFAULT_REQUEST_TIMEOUT,
)
soup = BeautifulSoup(response.content, 'html.parser')
season_meta = soup.find('meta', itemprop='numberOfSeasons')
number_of_seasons = int(season_meta['content']) if season_meta else 0
soup = BeautifulSoup(response.content, "html.parser")
season_meta = soup.find("meta", itemprop="numberOfSeasons")
number_of_seasons = (
int(season_meta["content"]) if season_meta else 0
)
episode_counts = {}
for season in range(1, number_of_seasons + 1):
season_url = f"{base_url}staffel-{season}"
season_response = recovery_strategies.handle_network_failure(
requests.get,
season_url,
timeout=self.DEFAULT_REQUEST_TIMEOUT
season_response = (
recovery_strategies.handle_network_failure(
requests.get,
season_url,
timeout=self.DEFAULT_REQUEST_TIMEOUT,
)
)
season_soup = BeautifulSoup(season_response.content, 'html.parser')
episode_links = season_soup.find_all('a', href=True)
season_soup = BeautifulSoup(
season_response.content, "html.parser"
)
episode_links = season_soup.find_all("a", href=True)
unique_links = set(
link['href']
link["href"]
for link in episode_links
if f"staffel-{season}/episode-" in link['href']
)
@@ -668,4 +872,5 @@ class EnhancedAniWorldLoader(Loader):
# For backward compatibility, create wrapper that uses enhanced loader
class AniworldLoader(EnhancedAniWorldLoader):
"""Backward compatibility wrapper for the enhanced loader."""
pass
pass

View File

@@ -1,12 +1,13 @@
import re
import base64
import json
import re
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import requests
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .Provider import Provider
# Compile regex patterns once for better performance
@@ -49,7 +50,9 @@ class VOE(Provider):
parts = redirect_url.strip().split("/")
self.Header["Referer"] = f"{parts[0]}//{parts[2]}/"
response = self.session.get(redirect_url, headers={'User-Agent': self.RANDOM_USER_AGENT})
response = self.session.get(
redirect_url, headers={"User-Agent": self.RANDOM_USER_AGENT}
)
html = response.content