import os import re import logging import json import requests import html from urllib.parse import quote from bs4 import BeautifulSoup from fake_useragent import UserAgent from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from server.infrastructure.providers.base_provider import Loader from server.core.interfaces.providers import Providers from yt_dlp import YoutubeDL import shutil # Read timeout from environment variable, default to 600 seconds (10 minutes) timeout = int(os.getenv("DOWNLOAD_TIMEOUT", 600)) download_error_logger = logging.getLogger("DownloadErrors") download_error_handler = logging.FileHandler("../../download_errors.log") download_error_handler.setLevel(logging.ERROR) noKeyFound_logger = logging.getLogger("NoKeyFound") noKeyFound_handler = logging.FileHandler("../../NoKeyFound.log") noKeyFound_handler.setLevel(logging.ERROR) class AniworldLoader(Loader): def __init__(self): self.SUPPORTED_PROVIDERS = ["VOE", "Doodstream", "Vidmoly", "Vidoza", "SpeedFiles", "Streamtape", "Luluvdo"] self.AniworldHeaders = { "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", "accept-encoding": "gzip, deflate, br, zstd", "accept-language": "de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "cache-control": "max-age=0", "priority": "u=0, i", "sec-ch-ua": '"Chromium";v="136", "Microsoft Edge";v="136", "Not.A/Brand";v="99"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "document", "sec-fetch-mode": "navigate", "sec-fetch-site": "none", "sec-fetch-user": "?1", "upgrade-insecure-requests": "1", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0" } self.INVALID_PATH_CHARS = ['<', '>', ':', '"', '/', '\\', '|', '?', '*', '&'] self.RANDOM_USER_AGENT = UserAgent().random self.LULUVDO_USER_AGENT = "Mozilla/5.0 (Android 15; Mobile; rv:132.0) Gecko/132.0 Firefox/132.0" self.PROVIDER_HEADERS = { "Vidmoly": ['Referer: "https://vidmoly.to"'], "Doodstream": ['Referer: "https://dood.li/"'], "VOE": [f'User-Agent: {self.RANDOM_USER_AGENT}'], "Luluvdo": [ f'User-Agent: {self.LULUVDO_USER_AGENT}', 'Accept-Language: de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7', 'Origin: "https://luluvdo.com"', 'Referer: "https://luluvdo.com/"' ]} self.ANIWORLD_TO = "https://aniworld.to" self.session = requests.Session() # Configure retries with backoff retries = Retry( total=5, # Number of retries backoff_factor=1, # Delay multiplier (1s, 2s, 4s, ...) status_forcelist=[500, 502, 503, 504], # Retry for specific HTTP errors allowed_methods=["GET"] ) adapter = HTTPAdapter(max_retries=retries) self.session.mount("https://", adapter) self.DEFAULT_REQUEST_TIMEOUT = 30 self._KeyHTMLDict = {} self._EpisodeHTMLDict = {} self.Providers = Providers() def ClearCache(self): self._KeyHTMLDict = {} self._EpisodeHTMLDict = {} def RemoveFromCache(self): self._EpisodeHTMLDict = {} def Search(self, word: str) -> list: search_url = f"{self.ANIWORLD_TO}/ajax/seriesSearch?keyword={quote(word)}" anime_list = self.fetch_anime_list(search_url) return anime_list def fetch_anime_list(self, url: str) -> list: response = self.session.get(url, timeout=self.DEFAULT_REQUEST_TIMEOUT) response.raise_for_status() clean_text = response.text.strip() try: decoded_data = json.loads(html.unescape(clean_text)) return decoded_data if isinstance(decoded_data, list) else [] except json.JSONDecodeError: try: # Remove BOM and problematic characters clean_text = clean_text.encode('utf-8').decode('utf-8-sig') # Remove problematic characters clean_text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', clean_text) # Parse the new text decoded_data = json.loads(clean_text) return decoded_data if isinstance(decoded_data, list) else [] except (requests.RequestException, json.JSONDecodeError) as exc: raise ValueError("Could not get valid anime: ") from exc def _GetLanguageKey(self, language: str) -> int: languageCode = 0 if (language == "German Dub"): languageCode = 1 if (language == "English Sub"): languageCode = 2 if (language == "German Sub"): languageCode = 3 return languageCode def IsLanguage(self, season: int, episode: int, key: str, language: str = "German Dub") -> bool: """ Language Codes: 1: German Dub 2: English Sub 3: German Sub """ languageCode = self._GetLanguageKey(language) episode_soup = BeautifulSoup(self._GetEpisodeHTML(season, episode, key).content, 'html.parser') change_language_box_div = episode_soup.find( 'div', class_='changeLanguageBox') languages = [] if change_language_box_div: img_tags = change_language_box_div.find_all('img') for img in img_tags: lang_key = img.get('data-lang-key') if lang_key and lang_key.isdigit(): languages.append(int(lang_key)) return languageCode in languages def Download(self, baseDirectory: str, serieFolder: str, season: int, episode: int, key: str, language: str = "German Dub", progress_callback: callable = None) -> bool: sanitized_anime_title = ''.join( char for char in self.GetTitle(key) if char not in self.INVALID_PATH_CHARS ) if season == 0: output_file = ( f"{sanitized_anime_title} - " f"Movie {episode:02} - " f"({language}).mp4" ) else: output_file = ( f"{sanitized_anime_title} - " f"S{season:02}E{episode:03} - " f"({language}).mp4" ) folderPath = os.path.join(os.path.join(baseDirectory, serieFolder), f"Season {season}") output_path = os.path.join(folderPath, output_file) os.makedirs(os.path.dirname(output_path), exist_ok=True) temp_dir = "./Temp/" os.makedirs(os.path.dirname(temp_dir), exist_ok=True) temp_Path = os.path.join(temp_dir, output_file) for provider in self.SUPPORTED_PROVIDERS: link, header = self._get_direct_link_from_provider(season, episode, key, language) ydl_opts = { 'fragment_retries': float('inf'), 'outtmpl': temp_Path, 'quiet': True, 'no_warnings': True, 'progress_with_newline': False, 'nocheckcertificate': True, } if header: ydl_opts['http_headers'] = header if progress_callback: ydl_opts['progress_hooks'] = [progress_callback] with YoutubeDL(ydl_opts) as ydl: ydl.download([link]) if (os.path.exists(temp_Path)): shutil.copy(temp_Path, output_path) os.remove(temp_Path) break self.ClearCache() def GetSiteKey(self) -> str: return "aniworld.to" def GetTitle(self, key: str) -> str: soup = BeautifulSoup(self._GetKeyHTML(key).content, 'html.parser') title_div = soup.find('div', class_='series-title') if title_div: return title_div.find('h1').find('span').text return "" def _GetKeyHTML(self, key: str): if key in self._KeyHTMLDict: return self._KeyHTMLDict[key] self._KeyHTMLDict[key] = self.session.get( f"{self.ANIWORLD_TO}/anime/stream/{key}", timeout=self.DEFAULT_REQUEST_TIMEOUT ) return self._KeyHTMLDict[key] def _GetEpisodeHTML(self, season: int, episode: int, key: str): if key in self._EpisodeHTMLDict: return self._EpisodeHTMLDict[(key, season, episode)] link = ( f"{self.ANIWORLD_TO}/anime/stream/{key}/" f"staffel-{season}/episode-{episode}" ) html = self.session.get(link, timeout=self.DEFAULT_REQUEST_TIMEOUT) self._EpisodeHTMLDict[(key, season, episode)] = html return self._EpisodeHTMLDict[(key, season, episode)] def _get_provider_from_html(self, season: int, episode: int, key: str) -> dict: """ Parses the HTML content to extract streaming providers, their language keys, and redirect links. Returns a dictionary with provider names as keys and language key-to-redirect URL mappings as values. Example: { 'VOE': {1: 'https://aniworld.to/redirect/1766412', 2: 'https://aniworld.to/redirect/1766405'}, 'Doodstream': {1: 'https://aniworld.to/redirect/1987922', 2: 'https://aniworld.to/redirect/2700342'}, ... } Access redirect link with: print(self.provider["VOE"][2]) """ soup = BeautifulSoup(self._GetEpisodeHTML(season, episode, key).content, 'html.parser') providers = {} episode_links = soup.find_all( 'li', class_=lambda x: x and x.startswith('episodeLink') ) if not episode_links: return providers for link in episode_links: provider_name_tag = link.find('h4') provider_name = provider_name_tag.text.strip() if provider_name_tag else None redirect_link_tag = link.find('a', class_='watchEpisode') redirect_link = redirect_link_tag['href'] if redirect_link_tag else None lang_key = link.get('data-lang-key') lang_key = int( lang_key) if lang_key and lang_key.isdigit() else None if provider_name and redirect_link and lang_key: if provider_name not in providers: providers[provider_name] = {} providers[provider_name][lang_key] = f"{self.ANIWORLD_TO}{redirect_link}" return providers def _get_redirect_link(self, season: int, episode: int, key: str, language: str = "German Dub") -> str: languageCode = self._GetLanguageKey(language) if (self.IsLanguage(season, episode, key, language)): for provider_name, lang_dict in self._get_provider_from_html(season, episode, key).items(): if languageCode in lang_dict: return(lang_dict[languageCode], provider_name) break return None def _get_embeded_link(self, season: int, episode: int, key: str, language: str = "German Dub"): redirect_link, provider_name = self._get_redirect_link(season, episode, key, language) embeded_link = self.session.get( redirect_link, timeout=self.DEFAULT_REQUEST_TIMEOUT, headers={'User-Agent': self.RANDOM_USER_AGENT}).url return embeded_link def _get_direct_link_from_provider(self, season: int, episode: int, key: str, language: str = "German Dub") -> str: """ providers = { "Vidmoly": get_direct_link_from_vidmoly, "Vidoza": get_direct_link_from_vidoza, "VOE": get_direct_link_from_voe, "Doodstream": get_direct_link_from_doodstream, "SpeedFiles": get_direct_link_from_speedfiles, "Luluvdo": get_direct_link_from_luluvdo } """ embeded_link = self._get_embeded_link(season, episode, key, language) if embeded_link is None: return None return self.Providers.GetProvider("VOE").GetLink(embeded_link, self.DEFAULT_REQUEST_TIMEOUT) def get_season_episode_count(self, slug : str) -> dict: base_url = f"{self.ANIWORLD_TO}/anime/stream/{slug}/" response = requests.get(base_url, timeout=self.DEFAULT_REQUEST_TIMEOUT) soup = BeautifulSoup(response.content, 'html.parser') season_meta = soup.find('meta', itemprop='numberOfSeasons') number_of_seasons = int(season_meta['content']) if season_meta else 0 episode_counts = {} for season in range(1, number_of_seasons + 1): season_url = f"{base_url}staffel-{season}" response = requests.get(season_url, timeout=self.DEFAULT_REQUEST_TIMEOUT) soup = BeautifulSoup(response.content, 'html.parser') episode_links = soup.find_all('a', href=True) unique_links = set( link['href'] for link in episode_links if f"staffel-{season}/episode-" in link['href'] ) episode_counts[season] = len(unique_links) return episode_counts