344 lines
14 KiB
Python
344 lines
14 KiB
Python
import os
|
|
import re
|
|
import logging
|
|
import json
|
|
import requests
|
|
import html
|
|
from urllib.parse import quote
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
from fake_useragent import UserAgent
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
from server.infrastructure.providers.base_provider import Loader
|
|
from server.core.interfaces.providers import Providers
|
|
from yt_dlp import YoutubeDL
|
|
import shutil
|
|
|
|
# Read timeout from environment variable, default to 600 seconds (10 minutes)
|
|
timeout = int(os.getenv("DOWNLOAD_TIMEOUT", 600))
|
|
|
|
download_error_logger = logging.getLogger("DownloadErrors")
|
|
download_error_handler = logging.FileHandler("../../download_errors.log")
|
|
download_error_handler.setLevel(logging.ERROR)
|
|
|
|
noKeyFound_logger = logging.getLogger("NoKeyFound")
|
|
noKeyFound_handler = logging.FileHandler("../../NoKeyFound.log")
|
|
noKeyFound_handler.setLevel(logging.ERROR)
|
|
|
|
class AniworldLoader(Loader):
|
|
def __init__(self):
|
|
self.SUPPORTED_PROVIDERS = ["VOE", "Doodstream", "Vidmoly", "Vidoza", "SpeedFiles", "Streamtape", "Luluvdo"]
|
|
self.AniworldHeaders = {
|
|
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
|
|
"accept-encoding": "gzip, deflate, br, zstd",
|
|
"accept-language": "de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
|
|
"cache-control": "max-age=0",
|
|
"priority": "u=0, i",
|
|
"sec-ch-ua": '"Chromium";v="136", "Microsoft Edge";v="136", "Not.A/Brand";v="99"',
|
|
"sec-ch-ua-mobile": "?0",
|
|
"sec-ch-ua-platform": '"Windows"',
|
|
"sec-fetch-dest": "document",
|
|
"sec-fetch-mode": "navigate",
|
|
"sec-fetch-site": "none",
|
|
"sec-fetch-user": "?1",
|
|
"upgrade-insecure-requests": "1",
|
|
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0"
|
|
}
|
|
self.INVALID_PATH_CHARS = ['<', '>', ':', '"', '/', '\\', '|', '?', '*', '&']
|
|
self.RANDOM_USER_AGENT = UserAgent().random
|
|
self.LULUVDO_USER_AGENT = "Mozilla/5.0 (Android 15; Mobile; rv:132.0) Gecko/132.0 Firefox/132.0"
|
|
self.PROVIDER_HEADERS = {
|
|
"Vidmoly": ['Referer: "https://vidmoly.to"'],
|
|
"Doodstream": ['Referer: "https://dood.li/"'],
|
|
"VOE": [f'User-Agent: {self.RANDOM_USER_AGENT}'],
|
|
"Luluvdo": [
|
|
f'User-Agent: {self.LULUVDO_USER_AGENT}',
|
|
'Accept-Language: de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7',
|
|
'Origin: "https://luluvdo.com"',
|
|
'Referer: "https://luluvdo.com/"'
|
|
]}
|
|
self.ANIWORLD_TO = "https://aniworld.to"
|
|
self.session = requests.Session()
|
|
|
|
# Configure retries with backoff
|
|
retries = Retry(
|
|
total=5, # Number of retries
|
|
backoff_factor=1, # Delay multiplier (1s, 2s, 4s, ...)
|
|
status_forcelist=[500, 502, 503, 504], # Retry for specific HTTP errors
|
|
allowed_methods=["GET"]
|
|
)
|
|
|
|
adapter = HTTPAdapter(max_retries=retries)
|
|
self.session.mount("https://", adapter)
|
|
self.DEFAULT_REQUEST_TIMEOUT = 30
|
|
|
|
self._KeyHTMLDict = {}
|
|
self._EpisodeHTMLDict = {}
|
|
self.Providers = Providers()
|
|
|
|
def ClearCache(self):
|
|
self._KeyHTMLDict = {}
|
|
self._EpisodeHTMLDict = {}
|
|
|
|
def RemoveFromCache(self):
|
|
self._EpisodeHTMLDict = {}
|
|
|
|
def Search(self, word: str) -> list:
|
|
search_url = f"{self.ANIWORLD_TO}/ajax/seriesSearch?keyword={quote(word)}"
|
|
anime_list = self.fetch_anime_list(search_url)
|
|
|
|
return anime_list
|
|
|
|
|
|
def fetch_anime_list(self, url: str) -> list:
|
|
response = self.session.get(url, timeout=self.DEFAULT_REQUEST_TIMEOUT)
|
|
response.raise_for_status()
|
|
|
|
clean_text = response.text.strip()
|
|
|
|
try:
|
|
decoded_data = json.loads(html.unescape(clean_text))
|
|
return decoded_data if isinstance(decoded_data, list) else []
|
|
except json.JSONDecodeError:
|
|
try:
|
|
# Remove BOM and problematic characters
|
|
clean_text = clean_text.encode('utf-8').decode('utf-8-sig')
|
|
# Remove problematic characters
|
|
clean_text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', clean_text)
|
|
# Parse the new text
|
|
decoded_data = json.loads(clean_text)
|
|
return decoded_data if isinstance(decoded_data, list) else []
|
|
except (requests.RequestException, json.JSONDecodeError) as exc:
|
|
raise ValueError("Could not get valid anime: ") from exc
|
|
|
|
def _GetLanguageKey(self, language: str) -> int:
|
|
languageCode = 0
|
|
if (language == "German Dub"):
|
|
languageCode = 1
|
|
if (language == "English Sub"):
|
|
languageCode = 2
|
|
if (language == "German Sub"):
|
|
languageCode = 3
|
|
return languageCode
|
|
def IsLanguage(self, season: int, episode: int, key: str, language: str = "German Dub") -> bool:
|
|
"""
|
|
Language Codes:
|
|
1: German Dub
|
|
2: English Sub
|
|
3: German Sub
|
|
"""
|
|
languageCode = self._GetLanguageKey(language)
|
|
|
|
episode_soup = BeautifulSoup(self._GetEpisodeHTML(season, episode, key).content, 'html.parser')
|
|
change_language_box_div = episode_soup.find(
|
|
'div', class_='changeLanguageBox')
|
|
languages = []
|
|
|
|
if change_language_box_div:
|
|
img_tags = change_language_box_div.find_all('img')
|
|
for img in img_tags:
|
|
lang_key = img.get('data-lang-key')
|
|
if lang_key and lang_key.isdigit():
|
|
languages.append(int(lang_key))
|
|
|
|
return languageCode in languages
|
|
|
|
def Download(self, baseDirectory: str, serieFolder: str, season: int, episode: int, key: str, language: str = "German Dub", progress_callback: callable = None) -> bool:
|
|
sanitized_anime_title = ''.join(
|
|
char for char in self.GetTitle(key) if char not in self.INVALID_PATH_CHARS
|
|
)
|
|
|
|
if season == 0:
|
|
output_file = (
|
|
f"{sanitized_anime_title} - "
|
|
f"Movie {episode:02} - "
|
|
f"({language}).mp4"
|
|
)
|
|
else:
|
|
output_file = (
|
|
f"{sanitized_anime_title} - "
|
|
f"S{season:02}E{episode:03} - "
|
|
f"({language}).mp4"
|
|
)
|
|
|
|
folderPath = os.path.join(os.path.join(baseDirectory, serieFolder), f"Season {season}")
|
|
output_path = os.path.join(folderPath, output_file)
|
|
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
|
|
|
temp_dir = "./Temp/"
|
|
os.makedirs(os.path.dirname(temp_dir), exist_ok=True)
|
|
temp_Path = os.path.join(temp_dir, output_file)
|
|
|
|
for provider in self.SUPPORTED_PROVIDERS:
|
|
link, header = self._get_direct_link_from_provider(season, episode, key, language)
|
|
ydl_opts = {
|
|
'fragment_retries': float('inf'),
|
|
'outtmpl': temp_Path,
|
|
'quiet': True,
|
|
'no_warnings': True,
|
|
'progress_with_newline': False,
|
|
'nocheckcertificate': True,
|
|
}
|
|
|
|
if header:
|
|
ydl_opts['http_headers'] = header
|
|
if progress_callback:
|
|
ydl_opts['progress_hooks'] = [progress_callback]
|
|
|
|
with YoutubeDL(ydl_opts) as ydl:
|
|
ydl.download([link])
|
|
|
|
if (os.path.exists(temp_Path)):
|
|
shutil.copy(temp_Path, output_path)
|
|
os.remove(temp_Path)
|
|
break
|
|
self.ClearCache()
|
|
|
|
|
|
def GetSiteKey(self) -> str:
|
|
return "aniworld.to"
|
|
|
|
def GetTitle(self, key: str) -> str:
|
|
soup = BeautifulSoup(self._GetKeyHTML(key).content, 'html.parser')
|
|
title_div = soup.find('div', class_='series-title')
|
|
|
|
if title_div:
|
|
return title_div.find('h1').find('span').text
|
|
|
|
return ""
|
|
|
|
def _GetKeyHTML(self, key: str):
|
|
if key in self._KeyHTMLDict:
|
|
return self._KeyHTMLDict[key]
|
|
|
|
|
|
self._KeyHTMLDict[key] = self.session.get(
|
|
f"{self.ANIWORLD_TO}/anime/stream/{key}",
|
|
timeout=self.DEFAULT_REQUEST_TIMEOUT
|
|
)
|
|
return self._KeyHTMLDict[key]
|
|
def _GetEpisodeHTML(self, season: int, episode: int, key: str):
|
|
if key in self._EpisodeHTMLDict:
|
|
return self._EpisodeHTMLDict[(key, season, episode)]
|
|
|
|
|
|
link = (
|
|
f"{self.ANIWORLD_TO}/anime/stream/{key}/"
|
|
f"staffel-{season}/episode-{episode}"
|
|
)
|
|
html = self.session.get(link, timeout=self.DEFAULT_REQUEST_TIMEOUT)
|
|
self._EpisodeHTMLDict[(key, season, episode)] = html
|
|
return self._EpisodeHTMLDict[(key, season, episode)]
|
|
|
|
def _get_provider_from_html(self, season: int, episode: int, key: str) -> dict:
|
|
"""
|
|
Parses the HTML content to extract streaming providers,
|
|
their language keys, and redirect links.
|
|
|
|
Returns a dictionary with provider names as keys
|
|
and language key-to-redirect URL mappings as values.
|
|
|
|
Example:
|
|
|
|
{
|
|
'VOE': {1: 'https://aniworld.to/redirect/1766412',
|
|
2: 'https://aniworld.to/redirect/1766405'},
|
|
'Doodstream': {1: 'https://aniworld.to/redirect/1987922',
|
|
2: 'https://aniworld.to/redirect/2700342'},
|
|
...
|
|
}
|
|
|
|
Access redirect link with:
|
|
print(self.provider["VOE"][2])
|
|
"""
|
|
|
|
soup = BeautifulSoup(self._GetEpisodeHTML(season, episode, key).content, 'html.parser')
|
|
providers = {}
|
|
|
|
episode_links = soup.find_all(
|
|
'li', class_=lambda x: x and x.startswith('episodeLink')
|
|
)
|
|
|
|
if not episode_links:
|
|
return providers
|
|
|
|
for link in episode_links:
|
|
provider_name_tag = link.find('h4')
|
|
provider_name = provider_name_tag.text.strip() if provider_name_tag else None
|
|
|
|
redirect_link_tag = link.find('a', class_='watchEpisode')
|
|
redirect_link = redirect_link_tag['href'] if redirect_link_tag else None
|
|
|
|
lang_key = link.get('data-lang-key')
|
|
lang_key = int(
|
|
lang_key) if lang_key and lang_key.isdigit() else None
|
|
|
|
if provider_name and redirect_link and lang_key:
|
|
if provider_name not in providers:
|
|
providers[provider_name] = {}
|
|
providers[provider_name][lang_key] = f"{self.ANIWORLD_TO}{redirect_link}"
|
|
|
|
|
|
return providers
|
|
def _get_redirect_link(self, season: int, episode: int, key: str, language: str = "German Dub") -> str:
|
|
languageCode = self._GetLanguageKey(language)
|
|
if (self.IsLanguage(season, episode, key, language)):
|
|
for provider_name, lang_dict in self._get_provider_from_html(season, episode, key).items():
|
|
if languageCode in lang_dict:
|
|
return(lang_dict[languageCode], provider_name)
|
|
break
|
|
return None
|
|
def _get_embeded_link(self, season: int, episode: int, key: str, language: str = "German Dub"):
|
|
redirect_link, provider_name = self._get_redirect_link(season, episode, key, language)
|
|
|
|
embeded_link = self.session.get(
|
|
redirect_link, timeout=self.DEFAULT_REQUEST_TIMEOUT,
|
|
headers={'User-Agent': self.RANDOM_USER_AGENT}).url
|
|
return embeded_link
|
|
def _get_direct_link_from_provider(self, season: int, episode: int, key: str, language: str = "German Dub") -> str:
|
|
"""
|
|
providers = {
|
|
"Vidmoly": get_direct_link_from_vidmoly,
|
|
"Vidoza": get_direct_link_from_vidoza,
|
|
"VOE": get_direct_link_from_voe,
|
|
"Doodstream": get_direct_link_from_doodstream,
|
|
"SpeedFiles": get_direct_link_from_speedfiles,
|
|
"Luluvdo": get_direct_link_from_luluvdo
|
|
}
|
|
|
|
"""
|
|
embeded_link = self._get_embeded_link(season, episode, key, language)
|
|
if embeded_link is None:
|
|
return None
|
|
|
|
return self.Providers.GetProvider("VOE").GetLink(embeded_link, self.DEFAULT_REQUEST_TIMEOUT)
|
|
|
|
def get_season_episode_count(self, slug : str) -> dict:
|
|
base_url = f"{self.ANIWORLD_TO}/anime/stream/{slug}/"
|
|
response = requests.get(base_url, timeout=self.DEFAULT_REQUEST_TIMEOUT)
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
season_meta = soup.find('meta', itemprop='numberOfSeasons')
|
|
number_of_seasons = int(season_meta['content']) if season_meta else 0
|
|
|
|
episode_counts = {}
|
|
|
|
for season in range(1, number_of_seasons + 1):
|
|
season_url = f"{base_url}staffel-{season}"
|
|
response = requests.get(season_url, timeout=self.DEFAULT_REQUEST_TIMEOUT)
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
episode_links = soup.find_all('a', href=True)
|
|
unique_links = set(
|
|
link['href']
|
|
for link in episode_links
|
|
if f"staffel-{season}/episode-" in link['href']
|
|
)
|
|
|
|
episode_counts[season] = len(unique_links)
|
|
|
|
return episode_counts
|