refactoring

This commit is contained in:
2025-09-29 21:18:42 +02:00
parent 1719a36f57
commit e477780ed6
105 changed files with 568 additions and 6646 deletions

131
src/core/SerieScanner.py Normal file
View File

@@ -0,0 +1,131 @@
import os
import re
import logging
from server.core.entities.series import Serie
import traceback
from server.infrastructure.logging.GlobalLogger import error_logger, noKeyFound_logger
from server.core.exceptions.Exceptions import NoKeyFoundException, MatchNotFoundError
from server.infrastructure.providers.base_provider import Loader
class SerieScanner:
def __init__(self, basePath: str, loader: Loader):
self.directory = basePath
self.folderDict: dict[str, Serie] = {} # Proper initialization
self.loader = loader
logging.info(f"Initialized Loader with base path: {self.directory}")
def Reinit(self):
self.folderDict: dict[str, Serie] = {} # Proper initialization
def is_null_or_whitespace(self, s):
return s is None or s.strip() == ""
def GetTotalToScan(self):
result = self.__find_mp4_files()
return sum(1 for _ in result)
def Scan(self, callback):
logging.info("Starting process to load missing episodes")
result = self.__find_mp4_files()
counter = 0
for folder, mp4_files in result:
try:
counter += 1
callback(folder, counter)
serie = self.__ReadDataFromFile(folder)
if (serie != None and not self.is_null_or_whitespace(serie.key)):
missings, site = self.__GetMissingEpisodesAndSeason(serie.key, mp4_files)
serie.episodeDict = missings
serie.folder = folder
serie.save_to_file(os.path.join(os.path.join(self.directory, folder), 'data'))
if (serie.key in self.folderDict):
logging.ERROR(f"dublication found: {serie.key}");
pass
self.folderDict[serie.key] = serie
noKeyFound_logger.info(f"Saved Serie: '{str(serie)}'")
except NoKeyFoundException as nkfe:
NoKeyFoundException.error(f"Error processing folder '{folder}': {nkfe}")
except Exception as e:
error_logger.error(f"Folder: '{folder}' - Unexpected error processing folder '{folder}': {e} \n {traceback.format_exc()}")
continue
def __find_mp4_files(self):
logging.info("Scanning for .mp4 files")
for anime_name in os.listdir(self.directory):
anime_path = os.path.join(self.directory, anime_name)
if os.path.isdir(anime_path):
mp4_files = []
has_files = False
for root, _, files in os.walk(anime_path):
for file in files:
if file.endswith(".mp4"):
mp4_files.append(os.path.join(root, file))
has_files = True
yield anime_name, mp4_files if has_files else []
def __remove_year(self, input_string: str):
cleaned_string = re.sub(r'\(\d{4}\)', '', input_string).strip()
logging.debug(f"Removed year from '{input_string}' -> '{cleaned_string}'")
return cleaned_string
def __ReadDataFromFile(self, folder_name: str):
folder_path = os.path.join(self.directory, folder_name)
key = None
key_file = os.path.join(folder_path, 'key')
serie_file = os.path.join(folder_path, 'data')
if os.path.exists(key_file):
with open(key_file, 'r') as file:
key = file.read().strip()
logging.info(f"Key found for folder '{folder_name}': {key}")
return Serie(key, "", "aniworld.to", folder_name, dict())
if os.path.exists(serie_file):
with open(serie_file, "rb") as file:
logging.info(f"load serie_file from '{folder_name}': {serie_file}")
return Serie.load_from_file(serie_file)
return None
def __GetEpisodeAndSeason(self, filename: str):
pattern = r'S(\d+)E(\d+)'
match = re.search(pattern, filename)
if match:
season = match.group(1)
episode = match.group(2)
logging.debug(f"Extracted season {season}, episode {episode} from '{filename}'")
return int(season), int(episode)
else:
logging.error(f"Failed to find season/episode pattern in '{filename}'")
raise MatchNotFoundError("Season and episode pattern not found in the filename.")
def __GetEpisodesAndSeasons(self, mp4_files: []):
episodes_dict = {}
for file in mp4_files:
season, episode = self.__GetEpisodeAndSeason(file)
if season in episodes_dict:
episodes_dict[season].append(episode)
else:
episodes_dict[season] = [episode]
return episodes_dict
def __GetMissingEpisodesAndSeason(self, key: str, mp4_files: []):
expected_dict = self.loader.get_season_episode_count(key) # key season , value count of episodes
filedict = self.__GetEpisodesAndSeasons(mp4_files)
episodes_dict = {}
for season, expected_count in expected_dict.items():
existing_episodes = filedict.get(season, [])
missing_episodes = [ep for ep in range(1, expected_count + 1) if ep not in existing_episodes and self.loader.IsLanguage(season, ep, key)]
if missing_episodes:
episodes_dict[season] = missing_episodes
return episodes_dict, "aniworld.to"

11
src/core/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
"""
Core module for AniWorld application.
Contains domain entities, interfaces, use cases, and exceptions.
"""
from . import entities
from . import exceptions
from . import interfaces
from . import use_cases
__all__ = ['entities', 'exceptions', 'interfaces', 'use_cases']

View File

@@ -0,0 +1,56 @@
import os
import json
import logging
from .series import Serie
class SerieList:
def __init__(self, basePath: str):
self.directory = basePath
self.folderDict: dict[str, Serie] = {} # Proper initialization
self.load_series()
def add(self, serie: Serie):
if (not self.contains(serie.key)):
dataPath = os.path.join(self.directory, serie.folder, "data")
animePath = os.path.join(self.directory, serie.folder)
os.makedirs(animePath, exist_ok=True)
if not os.path.isfile(dataPath):
serie.save_to_file(dataPath)
self.folderDict[serie.folder] = serie;
def contains(self, key: str) -> bool:
for k, value in self.folderDict.items():
if value.key == key:
return True
return False
def load_series(self):
""" Scan folders and load data files """
logging.info(f"Scanning anime folders in: {self.directory}")
for anime_folder in os.listdir(self.directory):
anime_path = os.path.join(self.directory, anime_folder, "data")
if os.path.isfile(anime_path):
logging.debug(f"Found data folder: {anime_path}")
self.load_data(anime_folder, anime_path)
else:
logging.warning(f"Skipping {anime_folder} - No data folder found")
def load_data(self, anime_folder, data_path):
""" Load pickle files from the data folder """
try:
self.folderDict[anime_folder] = Serie.load_from_file(data_path)
logging.debug(f"Successfully loaded {data_path} for {anime_folder}")
except Exception as e:
logging.error(f"Failed to load {data_path} in {anime_folder}: {e}")
def GetMissingEpisode(self):
"""Find all series with a non-empty episodeDict"""
return [serie for serie in self.folderDict.values() if len(serie.episodeDict) > 0]
def GetList(self):
"""Get all series in the list"""
return list(self.folderDict.values())
#k = AnimeList("\\\\sshfs.r\\ubuntu@192.168.178.43\\media\\serien\\Serien")
#bbabab = k.GetMissingEpisode()
#print(bbabab)

View File

@@ -0,0 +1,82 @@
import json
class Serie:
def __init__(self, key: str, name: str, site: str, folder: str, episodeDict: dict[int, list[int]]):
self._key = key
self._name = name
self._site = site
self._folder = folder
self._episodeDict = episodeDict
def __str__(self):
"""String representation of Serie object"""
return f"Serie(key='{self.key}', name='{self.name}', site='{self.site}', folder='{self.folder}', episodeDict={self.episodeDict})"
@property
def key(self) -> str:
return self._key
@key.setter
def key(self, value: str):
self._key = value
@property
def name(self) -> str:
return self._name
@name.setter
def name(self, value: str):
self._name = value
@property
def site(self) -> str:
return self._site
@site.setter
def site(self, value: str):
self._site = value
@property
def folder(self) -> str:
return self._folder
@folder.setter
def folder(self, value: str):
self._folder = value
@property
def episodeDict(self) -> dict[int, list[int]]:
return self._episodeDict
@episodeDict.setter
def episodeDict(self, value: dict[int, list[int]]):
self._episodeDict = value
def to_dict(self):
"""Convert Serie object to dictionary for JSON serialization."""
return {
"key": self.key,
"name": self.name,
"site": self.site,
"folder": self.folder,
"episodeDict": {str(k): list(v) for k, v in self.episodeDict.items()}
}
@staticmethod
def from_dict(data: dict):
"""Create a Serie object from dictionary."""
episode_dict = {int(k): v for k, v in data["episodeDict"].items()} # Convert keys to int
return Serie(data["key"], data["name"], data["site"], data["folder"], episode_dict)
def save_to_file(self, filename: str):
"""Save Serie object to JSON file."""
with open(filename, "w") as file:
json.dump(self.to_dict(), file, indent=4)
@classmethod
def load_from_file(cls, filename: str) -> "Serie":
"""Load Serie object from JSON file."""
with open(filename, "r") as file:
data = json.load(file)
return cls.from_dict(data)

View File

@@ -0,0 +1,7 @@
class NoKeyFoundException(Exception):
"""Exception raised when an anime key cannot be found."""
pass
class MatchNotFoundError(Exception):
"""Exception raised when an anime key cannot be found."""
pass

View File

@@ -0,0 +1,12 @@
from server.infrastructure.providers.streaming.Provider import Provider
from server.infrastructure.providers.streaming.voe import VOE
class Providers:
def __init__(self):
self.dict = {"VOE": VOE()}
def GetProvider(self, key: str) -> Provider:
return self.dict[key]

View File

View File

@@ -0,0 +1,343 @@
import os
import re
import logging
import json
import requests
import html
from urllib.parse import quote
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from server.infrastructure.providers.base_provider import Loader
from server.core.interfaces.providers import Providers
from yt_dlp import YoutubeDL
import shutil
# Read timeout from environment variable, default to 600 seconds (10 minutes)
timeout = int(os.getenv("DOWNLOAD_TIMEOUT", 600))
download_error_logger = logging.getLogger("DownloadErrors")
download_error_handler = logging.FileHandler("../../download_errors.log")
download_error_handler.setLevel(logging.ERROR)
noKeyFound_logger = logging.getLogger("NoKeyFound")
noKeyFound_handler = logging.FileHandler("../../NoKeyFound.log")
noKeyFound_handler.setLevel(logging.ERROR)
class AniworldLoader(Loader):
def __init__(self):
self.SUPPORTED_PROVIDERS = ["VOE", "Doodstream", "Vidmoly", "Vidoza", "SpeedFiles", "Streamtape", "Luluvdo"]
self.AniworldHeaders = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"accept-encoding": "gzip, deflate, br, zstd",
"accept-language": "de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"cache-control": "max-age=0",
"priority": "u=0, i",
"sec-ch-ua": '"Chromium";v="136", "Microsoft Edge";v="136", "Not.A/Brand";v="99"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0"
}
self.INVALID_PATH_CHARS = ['<', '>', ':', '"', '/', '\\', '|', '?', '*', '&']
self.RANDOM_USER_AGENT = UserAgent().random
self.LULUVDO_USER_AGENT = "Mozilla/5.0 (Android 15; Mobile; rv:132.0) Gecko/132.0 Firefox/132.0"
self.PROVIDER_HEADERS = {
"Vidmoly": ['Referer: "https://vidmoly.to"'],
"Doodstream": ['Referer: "https://dood.li/"'],
"VOE": [f'User-Agent: {self.RANDOM_USER_AGENT}'],
"Luluvdo": [
f'User-Agent: {self.LULUVDO_USER_AGENT}',
'Accept-Language: de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7',
'Origin: "https://luluvdo.com"',
'Referer: "https://luluvdo.com/"'
]}
self.ANIWORLD_TO = "https://aniworld.to"
self.session = requests.Session()
# Configure retries with backoff
retries = Retry(
total=5, # Number of retries
backoff_factor=1, # Delay multiplier (1s, 2s, 4s, ...)
status_forcelist=[500, 502, 503, 504], # Retry for specific HTTP errors
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retries)
self.session.mount("https://", adapter)
self.DEFAULT_REQUEST_TIMEOUT = 30
self._KeyHTMLDict = {}
self._EpisodeHTMLDict = {}
self.Providers = Providers()
def ClearCache(self):
self._KeyHTMLDict = {}
self._EpisodeHTMLDict = {}
def RemoveFromCache(self):
self._EpisodeHTMLDict = {}
def Search(self, word: str) -> list:
search_url = f"{self.ANIWORLD_TO}/ajax/seriesSearch?keyword={quote(word)}"
anime_list = self.fetch_anime_list(search_url)
return anime_list
def fetch_anime_list(self, url: str) -> list:
response = self.session.get(url, timeout=self.DEFAULT_REQUEST_TIMEOUT)
response.raise_for_status()
clean_text = response.text.strip()
try:
decoded_data = json.loads(html.unescape(clean_text))
return decoded_data if isinstance(decoded_data, list) else []
except json.JSONDecodeError:
try:
# Remove BOM and problematic characters
clean_text = clean_text.encode('utf-8').decode('utf-8-sig')
# Remove problematic characters
clean_text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', clean_text)
# Parse the new text
decoded_data = json.loads(clean_text)
return decoded_data if isinstance(decoded_data, list) else []
except (requests.RequestException, json.JSONDecodeError) as exc:
raise ValueError("Could not get valid anime: ") from exc
def _GetLanguageKey(self, language: str) -> int:
languageCode = 0
if (language == "German Dub"):
languageCode = 1
if (language == "English Sub"):
languageCode = 2
if (language == "German Sub"):
languageCode = 3
return languageCode
def IsLanguage(self, season: int, episode: int, key: str, language: str = "German Dub") -> bool:
"""
Language Codes:
1: German Dub
2: English Sub
3: German Sub
"""
languageCode = self._GetLanguageKey(language)
episode_soup = BeautifulSoup(self._GetEpisodeHTML(season, episode, key).content, 'html.parser')
change_language_box_div = episode_soup.find(
'div', class_='changeLanguageBox')
languages = []
if change_language_box_div:
img_tags = change_language_box_div.find_all('img')
for img in img_tags:
lang_key = img.get('data-lang-key')
if lang_key and lang_key.isdigit():
languages.append(int(lang_key))
return languageCode in languages
def Download(self, baseDirectory: str, serieFolder: str, season: int, episode: int, key: str, language: str = "German Dub", progress_callback: callable = None) -> bool:
sanitized_anime_title = ''.join(
char for char in self.GetTitle(key) if char not in self.INVALID_PATH_CHARS
)
if season == 0:
output_file = (
f"{sanitized_anime_title} - "
f"Movie {episode:02} - "
f"({language}).mp4"
)
else:
output_file = (
f"{sanitized_anime_title} - "
f"S{season:02}E{episode:03} - "
f"({language}).mp4"
)
folderPath = os.path.join(os.path.join(baseDirectory, serieFolder), f"Season {season}")
output_path = os.path.join(folderPath, output_file)
os.makedirs(os.path.dirname(output_path), exist_ok=True)
temp_dir = "./Temp/"
os.makedirs(os.path.dirname(temp_dir), exist_ok=True)
temp_Path = os.path.join(temp_dir, output_file)
for provider in self.SUPPORTED_PROVIDERS:
link, header = self._get_direct_link_from_provider(season, episode, key, language)
ydl_opts = {
'fragment_retries': float('inf'),
'outtmpl': temp_Path,
'quiet': True,
'no_warnings': True,
'progress_with_newline': False,
'nocheckcertificate': True,
}
if header:
ydl_opts['http_headers'] = header
if progress_callback:
ydl_opts['progress_hooks'] = [progress_callback]
with YoutubeDL(ydl_opts) as ydl:
ydl.download([link])
if (os.path.exists(temp_Path)):
shutil.copy(temp_Path, output_path)
os.remove(temp_Path)
break
self.ClearCache()
def GetSiteKey(self) -> str:
return "aniworld.to"
def GetTitle(self, key: str) -> str:
soup = BeautifulSoup(self._GetKeyHTML(key).content, 'html.parser')
title_div = soup.find('div', class_='series-title')
if title_div:
return title_div.find('h1').find('span').text
return ""
def _GetKeyHTML(self, key: str):
if key in self._KeyHTMLDict:
return self._KeyHTMLDict[key]
self._KeyHTMLDict[key] = self.session.get(
f"{self.ANIWORLD_TO}/anime/stream/{key}",
timeout=self.DEFAULT_REQUEST_TIMEOUT
)
return self._KeyHTMLDict[key]
def _GetEpisodeHTML(self, season: int, episode: int, key: str):
if key in self._EpisodeHTMLDict:
return self._EpisodeHTMLDict[(key, season, episode)]
link = (
f"{self.ANIWORLD_TO}/anime/stream/{key}/"
f"staffel-{season}/episode-{episode}"
)
html = self.session.get(link, timeout=self.DEFAULT_REQUEST_TIMEOUT)
self._EpisodeHTMLDict[(key, season, episode)] = html
return self._EpisodeHTMLDict[(key, season, episode)]
def _get_provider_from_html(self, season: int, episode: int, key: str) -> dict:
"""
Parses the HTML content to extract streaming providers,
their language keys, and redirect links.
Returns a dictionary with provider names as keys
and language key-to-redirect URL mappings as values.
Example:
{
'VOE': {1: 'https://aniworld.to/redirect/1766412',
2: 'https://aniworld.to/redirect/1766405'},
'Doodstream': {1: 'https://aniworld.to/redirect/1987922',
2: 'https://aniworld.to/redirect/2700342'},
...
}
Access redirect link with:
print(self.provider["VOE"][2])
"""
soup = BeautifulSoup(self._GetEpisodeHTML(season, episode, key).content, 'html.parser')
providers = {}
episode_links = soup.find_all(
'li', class_=lambda x: x and x.startswith('episodeLink')
)
if not episode_links:
return providers
for link in episode_links:
provider_name_tag = link.find('h4')
provider_name = provider_name_tag.text.strip() if provider_name_tag else None
redirect_link_tag = link.find('a', class_='watchEpisode')
redirect_link = redirect_link_tag['href'] if redirect_link_tag else None
lang_key = link.get('data-lang-key')
lang_key = int(
lang_key) if lang_key and lang_key.isdigit() else None
if provider_name and redirect_link and lang_key:
if provider_name not in providers:
providers[provider_name] = {}
providers[provider_name][lang_key] = f"{self.ANIWORLD_TO}{redirect_link}"
return providers
def _get_redirect_link(self, season: int, episode: int, key: str, language: str = "German Dub") -> str:
languageCode = self._GetLanguageKey(language)
if (self.IsLanguage(season, episode, key, language)):
for provider_name, lang_dict in self._get_provider_from_html(season, episode, key).items():
if languageCode in lang_dict:
return(lang_dict[languageCode], provider_name)
break
return None
def _get_embeded_link(self, season: int, episode: int, key: str, language: str = "German Dub"):
redirect_link, provider_name = self._get_redirect_link(season, episode, key, language)
embeded_link = self.session.get(
redirect_link, timeout=self.DEFAULT_REQUEST_TIMEOUT,
headers={'User-Agent': self.RANDOM_USER_AGENT}).url
return embeded_link
def _get_direct_link_from_provider(self, season: int, episode: int, key: str, language: str = "German Dub") -> str:
"""
providers = {
"Vidmoly": get_direct_link_from_vidmoly,
"Vidoza": get_direct_link_from_vidoza,
"VOE": get_direct_link_from_voe,
"Doodstream": get_direct_link_from_doodstream,
"SpeedFiles": get_direct_link_from_speedfiles,
"Luluvdo": get_direct_link_from_luluvdo
}
"""
embeded_link = self._get_embeded_link(season, episode, key, language)
if embeded_link is None:
return None
return self.Providers.GetProvider("VOE").GetLink(embeded_link, self.DEFAULT_REQUEST_TIMEOUT)
def get_season_episode_count(self, slug : str) -> dict:
base_url = f"{self.ANIWORLD_TO}/anime/stream/{slug}/"
response = requests.get(base_url, timeout=self.DEFAULT_REQUEST_TIMEOUT)
soup = BeautifulSoup(response.content, 'html.parser')
season_meta = soup.find('meta', itemprop='numberOfSeasons')
number_of_seasons = int(season_meta['content']) if season_meta else 0
episode_counts = {}
for season in range(1, number_of_seasons + 1):
season_url = f"{base_url}staffel-{season}"
response = requests.get(season_url, timeout=self.DEFAULT_REQUEST_TIMEOUT)
soup = BeautifulSoup(response.content, 'html.parser')
episode_links = soup.find_all('a', href=True)
unique_links = set(
link['href']
for link in episode_links
if f"staffel-{season}/episode-" in link['href']
)
episode_counts[season] = len(unique_links)
return episode_counts

View File

@@ -0,0 +1,27 @@
from abc import ABC, abstractmethod
class Loader(ABC):
@abstractmethod
def Search(self, word: str) -> list:
pass
@abstractmethod
def IsLanguage(self, season: int, episode: int, key: str, language: str = "German Dub") -> bool:
pass
@abstractmethod
def Download(self, baseDirectory: str, serieFolder: str, season: int, episode: int, key: str, progress_callback: callable = None) -> bool:
pass
@abstractmethod
def GetSiteKey(self) -> str:
pass
@abstractmethod
def GetTitle(self) -> str:
pass
@abstractmethod
def get_season_episode_count(self, slug: str) -> dict:
pass

View File

@@ -0,0 +1,671 @@
"""
Enhanced AniWorld Loader with Error Handling and Recovery
This module extends the original AniWorldLoader with comprehensive
error handling, retry mechanisms, and recovery strategies.
"""
import os
import re
import logging
import json
import requests
import html
from urllib.parse import quote
import time
import hashlib
from typing import Optional, Dict, Any, Callable
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from yt_dlp import YoutubeDL
import shutil
from server.infrastructure.providers.base_provider import Loader
from server.core.interfaces.providers import Providers
from error_handler import (
with_error_recovery,
recovery_strategies,
NetworkError,
DownloadError,
RetryableError,
NonRetryableError,
file_corruption_detector
)
class EnhancedAniWorldLoader(Loader):
"""Enhanced AniWorld loader with comprehensive error handling."""
def __init__(self):
super().__init__()
self.logger = logging.getLogger(__name__)
self.SUPPORTED_PROVIDERS = ["VOE", "Doodstream", "Vidmoly", "Vidoza", "SpeedFiles", "Streamtape", "Luluvdo"]
self.AniworldHeaders = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"accept-encoding": "gzip, deflate, br, zstd",
"accept-language": "de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"cache-control": "max-age=0",
"priority": "u=0, i",
"sec-ch-ua": '"Chromium";v="136", "Microsoft Edge";v="136", "Not.A/Brand";v="99"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0"
}
self.INVALID_PATH_CHARS = ['<', '>', ':', '"', '/', '\\', '|', '?', '*', '&']
self.RANDOM_USER_AGENT = UserAgent().random
self.LULUVDO_USER_AGENT = "Mozilla/5.0 (Android 15; Mobile; rv:132.0) Gecko/132.0 Firefox/132.0"
self.PROVIDER_HEADERS = {
"Vidmoly": ['Referer: "https://vidmoly.to"'],
"Doodstream": ['Referer: "https://dood.li/"'],
"VOE": [f'User-Agent: {self.RANDOM_USER_AGENT}'],
"Luluvdo": [
f'User-Agent: {self.LULUVDO_USER_AGENT}',
'Accept-Language: de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7',
'Origin: "https://luluvdo.com"',
'Referer: "https://luluvdo.com/"'
]
}
self.ANIWORLD_TO = "https://aniworld.to"
self.DEFAULT_REQUEST_TIMEOUT = 30
# Initialize session with enhanced retry configuration
self.session = self._create_robust_session()
# Cache dictionaries
self._KeyHTMLDict = {}
self._EpisodeHTMLDict = {}
# Provider manager
self.Providers = Providers()
# Download statistics
self.download_stats = {
'total_downloads': 0,
'successful_downloads': 0,
'failed_downloads': 0,
'retried_downloads': 0
}
# Read timeout from environment variable
self.download_timeout = int(os.getenv("DOWNLOAD_TIMEOUT", 600))
# Setup logging
self._setup_logging()
def _create_robust_session(self) -> requests.Session:
"""Create a session with robust retry and error handling configuration."""
session = requests.Session()
# Enhanced retry strategy
retries = Retry(
total=5,
backoff_factor=2, # More aggressive backoff
status_forcelist=[408, 429, 500, 502, 503, 504, 520, 521, 522, 523, 524],
allowed_methods=["GET", "POST", "HEAD"],
raise_on_status=False # Handle status errors manually
)
adapter = HTTPAdapter(
max_retries=retries,
pool_connections=10,
pool_maxsize=20,
pool_block=True
)
session.mount("https://", adapter)
session.mount("http://", adapter)
# Set default headers
session.headers.update(self.AniworldHeaders)
return session
def _setup_logging(self):
"""Setup specialized logging for download errors and missing keys."""
# Download error logger
self.download_error_logger = logging.getLogger("DownloadErrors")
download_error_handler = logging.FileHandler("../../download_errors.log")
download_error_handler.setLevel(logging.ERROR)
download_error_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
download_error_handler.setFormatter(download_error_formatter)
if not self.download_error_logger.handlers:
self.download_error_logger.addHandler(download_error_handler)
self.download_error_logger.setLevel(logging.ERROR)
# No key found logger
self.nokey_logger = logging.getLogger("NoKeyFound")
nokey_handler = logging.FileHandler("../../NoKeyFound.log")
nokey_handler.setLevel(logging.ERROR)
nokey_handler.setFormatter(download_error_formatter)
if not self.nokey_logger.handlers:
self.nokey_logger.addHandler(nokey_handler)
self.nokey_logger.setLevel(logging.ERROR)
def ClearCache(self):
"""Clear all cached data."""
self._KeyHTMLDict.clear()
self._EpisodeHTMLDict.clear()
self.logger.debug("Cache cleared")
def RemoveFromCache(self):
"""Remove episode HTML cache."""
self._EpisodeHTMLDict.clear()
self.logger.debug("Episode cache cleared")
@with_error_recovery(max_retries=3, context="anime_search")
def Search(self, word: str) -> list:
"""Search for anime with error handling."""
if not word or not word.strip():
raise ValueError("Search term cannot be empty")
search_url = f"{self.ANIWORLD_TO}/ajax/seriesSearch?keyword={quote(word)}"
try:
return self._fetch_anime_list_with_recovery(search_url)
except Exception as e:
self.logger.error(f"Search failed for term '{word}': {e}")
raise RetryableError(f"Search failed: {e}") from e
def _fetch_anime_list_with_recovery(self, url: str) -> list:
"""Fetch anime list with comprehensive error handling."""
try:
response = recovery_strategies.handle_network_failure(
self.session.get,
url,
timeout=self.DEFAULT_REQUEST_TIMEOUT
)
if not response.ok:
if response.status_code == 404:
raise NonRetryableError(f"URL not found: {url}")
elif response.status_code == 403:
raise NonRetryableError(f"Access forbidden: {url}")
elif response.status_code >= 500:
raise RetryableError(f"Server error {response.status_code}")
else:
raise RetryableError(f"HTTP error {response.status_code}")
return self._parse_anime_response(response.text)
except (requests.RequestException, ConnectionError) as e:
raise NetworkError(f"Network error during anime search: {e}") from e
def _parse_anime_response(self, response_text: str) -> list:
"""Parse anime search response with error handling."""
if not response_text or not response_text.strip():
raise ValueError("Empty response from server")
clean_text = response_text.strip()
# Try multiple parsing strategies
parsing_strategies = [
lambda text: json.loads(html.unescape(text)),
lambda text: json.loads(text.encode('utf-8').decode('utf-8-sig')),
lambda text: json.loads(re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text))
]
for i, strategy in enumerate(parsing_strategies):
try:
decoded_data = strategy(clean_text)
if isinstance(decoded_data, list):
self.logger.debug(f"Successfully parsed anime response with strategy {i + 1}")
return decoded_data
else:
self.logger.warning(f"Strategy {i + 1} returned non-list data: {type(decoded_data)}")
except json.JSONDecodeError as e:
self.logger.debug(f"Parsing strategy {i + 1} failed: {e}")
continue
raise ValueError("Could not parse anime search response with any strategy")
def _GetLanguageKey(self, language: str) -> int:
"""Get numeric language code."""
language_map = {
"German Dub": 1,
"English Sub": 2,
"German Sub": 3
}
return language_map.get(language, 0)
@with_error_recovery(max_retries=2, context="language_check")
def IsLanguage(self, season: int, episode: int, key: str, language: str = "German Dub") -> bool:
"""Check if episode is available in specified language with error handling."""
try:
languageCode = self._GetLanguageKey(language)
if languageCode == 0:
raise ValueError(f"Unknown language: {language}")
episode_response = self._GetEpisodeHTML(season, episode, key)
soup = BeautifulSoup(episode_response.content, 'html.parser')
change_language_box_div = soup.find('div', class_='changeLanguageBox')
if not change_language_box_div:
self.logger.debug(f"No language box found for {key} S{season}E{episode}")
return False
img_tags = change_language_box_div.find_all('img')
available_languages = []
for img in img_tags:
lang_key = img.get('data-lang-key')
if lang_key and lang_key.isdigit():
available_languages.append(int(lang_key))
is_available = languageCode in available_languages
self.logger.debug(f"Language check for {key} S{season}E{episode} - "
f"Requested: {languageCode}, Available: {available_languages}, "
f"Result: {is_available}")
return is_available
except Exception as e:
self.logger.error(f"Language check failed for {key} S{season}E{episode}: {e}")
raise RetryableError(f"Language check failed: {e}") from e
def Download(self, baseDirectory: str, serieFolder: str, season: int, episode: int,
key: str, language: str = "German Dub", progress_callback: Callable = None) -> bool:
"""Download episode with comprehensive error handling and recovery."""
self.download_stats['total_downloads'] += 1
try:
# Validate inputs
if not all([baseDirectory, serieFolder, key]):
raise ValueError("Missing required parameters for download")
if season < 0 or episode < 0:
raise ValueError("Season and episode must be non-negative")
# Prepare file paths
sanitized_anime_title = ''.join(
char for char in self.GetTitle(key) if char not in self.INVALID_PATH_CHARS
)
if not sanitized_anime_title:
sanitized_anime_title = f"Unknown_{key}"
# Generate output filename
if season == 0:
output_file = f"{sanitized_anime_title} - Movie {episode:02} - ({language}).mp4"
else:
output_file = f"{sanitized_anime_title} - S{season:02}E{episode:03} - ({language}).mp4"
# Create directory structure
folder_path = os.path.join(baseDirectory, serieFolder, f"Season {season}")
output_path = os.path.join(folder_path, output_file)
# Check if file already exists and is valid
if os.path.exists(output_path):
if file_corruption_detector.is_valid_video_file(output_path):
self.logger.info(f"File already exists and is valid: {output_file}")
self.download_stats['successful_downloads'] += 1
return True
else:
self.logger.warning(f"Existing file appears corrupted, removing: {output_path}")
try:
os.remove(output_path)
except Exception as e:
self.logger.error(f"Failed to remove corrupted file: {e}")
os.makedirs(folder_path, exist_ok=True)
# Create temp directory
temp_dir = "./Temp/"
os.makedirs(temp_dir, exist_ok=True)
temp_path = os.path.join(temp_dir, output_file)
# Attempt download with recovery strategies
success = self._download_with_recovery(
season, episode, key, language, temp_path, output_path, progress_callback
)
if success:
self.download_stats['successful_downloads'] += 1
self.logger.info(f"Successfully downloaded: {output_file}")
else:
self.download_stats['failed_downloads'] += 1
self.download_error_logger.error(
f"Download failed for {key} S{season}E{episode} ({language})"
)
return success
except Exception as e:
self.download_stats['failed_downloads'] += 1
self.download_error_logger.error(
f"Download error for {key} S{season}E{episode}: {e}", exc_info=True
)
raise DownloadError(f"Download failed: {e}") from e
finally:
self.ClearCache()
def _download_with_recovery(self, season: int, episode: int, key: str, language: str,
temp_path: str, output_path: str, progress_callback: Callable) -> bool:
"""Attempt download with multiple providers and recovery strategies."""
for provider_name in self.SUPPORTED_PROVIDERS:
try:
self.logger.info(f"Attempting download with provider: {provider_name}")
# Get download link and headers for provider
link, headers = recovery_strategies.handle_network_failure(
self._get_direct_link_from_provider,
season, episode, key, language
)
if not link:
self.logger.warning(f"No download link found for provider: {provider_name}")
continue
# Configure yt-dlp options
ydl_opts = {
'fragment_retries': float('inf'),
'outtmpl': temp_path,
'quiet': True,
'no_warnings': True,
'progress_with_newline': False,
'nocheckcertificate': True,
'socket_timeout': self.download_timeout,
'http_chunk_size': 1024 * 1024, # 1MB chunks
}
if headers:
ydl_opts['http_headers'] = headers
if progress_callback:
ydl_opts['progress_hooks'] = [progress_callback]
# Perform download with recovery
success = recovery_strategies.handle_download_failure(
self._perform_ytdl_download,
temp_path,
ydl_opts,
link
)
if success and os.path.exists(temp_path):
# Verify downloaded file
if file_corruption_detector.is_valid_video_file(temp_path):
# Move to final location
shutil.copy2(temp_path, output_path)
# Clean up temp file
try:
os.remove(temp_path)
except Exception as e:
self.logger.warning(f"Failed to remove temp file: {e}")
return True
else:
self.logger.warning(f"Downloaded file failed validation: {temp_path}")
try:
os.remove(temp_path)
except Exception:
pass
except Exception as e:
self.logger.warning(f"Provider {provider_name} failed: {e}")
self.download_stats['retried_downloads'] += 1
continue
return False
def _perform_ytdl_download(self, ydl_opts: Dict[str, Any], link: str) -> bool:
"""Perform actual download using yt-dlp."""
try:
with YoutubeDL(ydl_opts) as ydl:
ydl.download([link])
return True
except Exception as e:
self.logger.error(f"yt-dlp download failed: {e}")
raise DownloadError(f"Download failed: {e}") from e
@with_error_recovery(max_retries=2, context="get_title")
def GetTitle(self, key: str) -> str:
"""Get anime title with error handling."""
try:
soup = BeautifulSoup(self._GetKeyHTML(key).content, 'html.parser')
title_div = soup.find('div', class_='series-title')
if title_div:
title_span = title_div.find('h1')
if title_span:
span = title_span.find('span')
if span:
return span.text.strip()
self.logger.warning(f"Could not extract title for key: {key}")
return f"Unknown_Title_{key}"
except Exception as e:
self.logger.error(f"Failed to get title for key {key}: {e}")
raise RetryableError(f"Title extraction failed: {e}") from e
def GetSiteKey(self) -> str:
"""Get site identifier."""
return "aniworld.to"
@with_error_recovery(max_retries=2, context="get_key_html")
def _GetKeyHTML(self, key: str):
"""Get cached HTML for anime key."""
if key in self._KeyHTMLDict:
return self._KeyHTMLDict[key]
try:
url = f"{self.ANIWORLD_TO}/anime/stream/{key}"
response = recovery_strategies.handle_network_failure(
self.session.get,
url,
timeout=self.DEFAULT_REQUEST_TIMEOUT
)
if not response.ok:
if response.status_code == 404:
self.nokey_logger.error(f"Anime key not found: {key}")
raise NonRetryableError(f"Anime key not found: {key}")
else:
raise RetryableError(f"HTTP error {response.status_code} for key {key}")
self._KeyHTMLDict[key] = response
return self._KeyHTMLDict[key]
except Exception as e:
self.logger.error(f"Failed to get HTML for key {key}: {e}")
raise
@with_error_recovery(max_retries=2, context="get_episode_html")
def _GetEpisodeHTML(self, season: int, episode: int, key: str):
"""Get cached HTML for specific episode."""
cache_key = (key, season, episode)
if cache_key in self._EpisodeHTMLDict:
return self._EpisodeHTMLDict[cache_key]
try:
url = f"{self.ANIWORLD_TO}/anime/stream/{key}/staffel-{season}/episode-{episode}"
response = recovery_strategies.handle_network_failure(
self.session.get,
url,
timeout=self.DEFAULT_REQUEST_TIMEOUT
)
if not response.ok:
if response.status_code == 404:
raise NonRetryableError(f"Episode not found: {key} S{season}E{episode}")
else:
raise RetryableError(f"HTTP error {response.status_code} for episode")
self._EpisodeHTMLDict[cache_key] = response
return self._EpisodeHTMLDict[cache_key]
except Exception as e:
self.logger.error(f"Failed to get episode HTML for {key} S{season}E{episode}: {e}")
raise
def _get_provider_from_html(self, season: int, episode: int, key: str) -> dict:
"""Extract providers from HTML with error handling."""
try:
soup = BeautifulSoup(self._GetEpisodeHTML(season, episode, key).content, 'html.parser')
providers = {}
episode_links = soup.find_all(
'li', class_=lambda x: x and x.startswith('episodeLink')
)
if not episode_links:
self.logger.warning(f"No episode links found for {key} S{season}E{episode}")
return providers
for link in episode_links:
provider_name_tag = link.find('h4')
provider_name = provider_name_tag.text.strip() if provider_name_tag else None
redirect_link_tag = link.find('a', class_='watchEpisode')
redirect_link = redirect_link_tag['href'] if redirect_link_tag else None
lang_key = link.get('data-lang-key')
lang_key = int(lang_key) if lang_key and lang_key.isdigit() else None
if provider_name and redirect_link and lang_key:
if provider_name not in providers:
providers[provider_name] = {}
providers[provider_name][lang_key] = f"{self.ANIWORLD_TO}{redirect_link}"
self.logger.debug(f"Found {len(providers)} providers for {key} S{season}E{episode}")
return providers
except Exception as e:
self.logger.error(f"Failed to parse providers from HTML: {e}")
raise RetryableError(f"Provider parsing failed: {e}") from e
def _get_redirect_link(self, season: int, episode: int, key: str, language: str = "German Dub"):
"""Get redirect link for episode with error handling."""
languageCode = self._GetLanguageKey(language)
if not self.IsLanguage(season, episode, key, language):
raise NonRetryableError(f"Language {language} not available for {key} S{season}E{episode}")
providers = self._get_provider_from_html(season, episode, key)
for provider_name, lang_dict in providers.items():
if languageCode in lang_dict:
return lang_dict[languageCode], provider_name
raise NonRetryableError(f"No provider found for {language} in {key} S{season}E{episode}")
def _get_embeded_link(self, season: int, episode: int, key: str, language: str = "German Dub"):
"""Get embedded link with error handling."""
try:
redirect_link, provider_name = self._get_redirect_link(season, episode, key, language)
response = recovery_strategies.handle_network_failure(
self.session.get,
redirect_link,
timeout=self.DEFAULT_REQUEST_TIMEOUT,
headers={'User-Agent': self.RANDOM_USER_AGENT}
)
return response.url
except Exception as e:
self.logger.error(f"Failed to get embedded link: {e}")
raise
def _get_direct_link_from_provider(self, season: int, episode: int, key: str, language: str = "German Dub"):
"""Get direct download link from provider with error handling."""
try:
embedded_link = self._get_embeded_link(season, episode, key, language)
if not embedded_link:
raise NonRetryableError("No embedded link found")
# Use VOE provider as default (could be made configurable)
provider = self.Providers.GetProvider("VOE")
if not provider:
raise NonRetryableError("VOE provider not available")
return provider.GetLink(embedded_link, self.DEFAULT_REQUEST_TIMEOUT)
except Exception as e:
self.logger.error(f"Failed to get direct link from provider: {e}")
raise
@with_error_recovery(max_retries=2, context="get_season_episode_count")
def get_season_episode_count(self, slug: str) -> dict:
"""Get episode count per season with error handling."""
try:
base_url = f"{self.ANIWORLD_TO}/anime/stream/{slug}/"
response = recovery_strategies.handle_network_failure(
requests.get,
base_url,
timeout=self.DEFAULT_REQUEST_TIMEOUT
)
soup = BeautifulSoup(response.content, 'html.parser')
season_meta = soup.find('meta', itemprop='numberOfSeasons')
number_of_seasons = int(season_meta['content']) if season_meta else 0
episode_counts = {}
for season in range(1, number_of_seasons + 1):
season_url = f"{base_url}staffel-{season}"
season_response = recovery_strategies.handle_network_failure(
requests.get,
season_url,
timeout=self.DEFAULT_REQUEST_TIMEOUT
)
season_soup = BeautifulSoup(season_response.content, 'html.parser')
episode_links = season_soup.find_all('a', href=True)
unique_links = set(
link['href']
for link in episode_links
if f"staffel-{season}/episode-" in link['href']
)
episode_counts[season] = len(unique_links)
return episode_counts
except Exception as e:
self.logger.error(f"Failed to get episode counts for {slug}: {e}")
raise RetryableError(f"Episode count retrieval failed: {e}") from e
def get_download_statistics(self) -> Dict[str, Any]:
"""Get download statistics."""
stats = self.download_stats.copy()
stats['success_rate'] = (
(stats['successful_downloads'] / stats['total_downloads'] * 100)
if stats['total_downloads'] > 0 else 0
)
return stats
def reset_statistics(self):
"""Reset download statistics."""
self.download_stats = {
'total_downloads': 0,
'successful_downloads': 0,
'failed_downloads': 0,
'retried_downloads': 0
}
# For backward compatibility, create wrapper that uses enhanced loader
class AniworldLoader(EnhancedAniWorldLoader):
"""Backward compatibility wrapper for the enhanced loader."""
pass

View File

@@ -0,0 +1,10 @@
from server.infrastructure.providers.aniworld_provider import AniworldLoader
from server.infrastructure.providers.base_provider import Loader
class Loaders:
def __init__(self):
self.dict = {"aniworld.to": AniworldLoader()}
def GetLoader(self, key: str) -> Loader:
return self.dict[key]

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,7 @@
from abc import ABC, abstractmethod
class Provider(ABC):
@abstractmethod
def GetLink(self, embededLink: str, DEFAULT_REQUEST_TIMEOUT: int) -> (str, [str]):
pass

View File

@@ -0,0 +1,59 @@
import re
import random
import time
from fake_useragent import UserAgent
import requests
from .Provider import Provider
class Doodstream(Provider):
def __init__(self):
self.RANDOM_USER_AGENT = UserAgent().random
def GetLink(self, embededLink: str, DEFAULT_REQUEST_TIMEOUT: int) -> str:
headers = {
'User-Agent': self.RANDOM_USER_AGENT,
'Referer': 'https://dood.li/'
}
def extract_data(pattern, content):
match = re.search(pattern, content)
return match.group(1) if match else None
def generate_random_string(length=10):
characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
return ''.join(random.choice(characters) for _ in range(length))
response = requests.get(
embededLink,
headers=headers,
timeout=DEFAULT_REQUEST_TIMEOUT,
verify=False
)
response.raise_for_status()
pass_md5_pattern = r"\$\.get\('([^']*\/pass_md5\/[^']*)'"
pass_md5_url = extract_data(pass_md5_pattern, response.text)
if not pass_md5_url:
raise ValueError(
f'pass_md5 URL not found using {embededLink}.')
full_md5_url = f"https://dood.li{pass_md5_url}"
token_pattern = r"token=([a-zA-Z0-9]+)"
token = extract_data(token_pattern, response.text)
if not token:
raise ValueError(f'Token not found using {embededLink}.')
md5_response = requests.get(
full_md5_url, headers=headers, timeout=DEFAULT_REQUEST_TIMEOUT, verify=False)
md5_response.raise_for_status()
video_base_url = md5_response.text.strip()
random_string = generate_random_string(10)
expiry = int(time.time())
direct_link = f"{video_base_url}{random_string}?token={token}&expiry={expiry}"
# print(direct_link)
return direct_link

View File

@@ -0,0 +1,51 @@
import re
import requests
# import jsbeautifier.unpackers.packer as packer
from aniworld import config
REDIRECT_REGEX = re.compile(
r'<iframe *(?:[^>]+ )?src=(?:\'([^\']+)\'|"([^"]+)")[^>]*>')
SCRIPT_REGEX = re.compile(
r'(?s)<script\s+[^>]*?data-cfasync=["\']?false["\']?[^>]*>(.+?)</script>')
VIDEO_URL_REGEX = re.compile(r'file:\s*"([^"]+\.m3u8[^"]*)"')
# TODO Implement this script fully
def get_direct_link_from_filemoon(embeded_filemoon_link: str):
session = requests.Session()
session.verify = False
headers = {
"User-Agent": config.RANDOM_USER_AGENT,
"Referer": embeded_filemoon_link,
}
response = session.get(embeded_filemoon_link, headers=headers)
source = response.text
match = REDIRECT_REGEX.search(source)
if match:
redirect_url = match.group(1) or match.group(2)
response = session.get(redirect_url, headers=headers)
source = response.text
for script_match in SCRIPT_REGEX.finditer(source):
script_content = script_match.group(1).strip()
if not script_content.startswith("eval("):
continue
if packer.detect(script_content):
unpacked = packer.unpack(script_content)
video_match = VIDEO_URL_REGEX.search(unpacked)
if video_match:
return video_match.group(1)
raise Exception("No Video link found!")
if __name__ == '__main__':
url = input("Enter Filemoon Link: ")
print(get_direct_link_from_filemoon(url))

View File

@@ -0,0 +1,90 @@
import re
import json
import sys
import requests
from aniworld.config import DEFAULT_REQUEST_TIMEOUT
def fetch_page_content(url):
try:
response = requests.get(url, timeout=DEFAULT_REQUEST_TIMEOUT)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
print(f"Failed to fetch the page content: {e}")
return None
def extract_video_data(page_content):
match = re.search(r'^.*videos_manifest.*$', page_content, re.MULTILINE)
if not match:
raise ValueError("Failed to extract video manifest from the response.")
json_str = match.group(0)[match.group(0).find(
'{'):match.group(0).rfind('}') + 1]
return json.loads(json_str)
def get_streams(url):
page_content = fetch_page_content(url)
data = extract_video_data(page_content)
video_info = data['state']['data']['video']
name = video_info['hentai_video']['name']
streams = video_info['videos_manifest']['servers'][0]['streams']
return {"name": name, "streams": streams}
def display_streams(streams):
if not streams:
print("No streams available.")
return
print("Available qualities:")
for i, stream in enumerate(streams, 1):
premium_tag = "(Premium)" if not stream['is_guest_allowed'] else ""
print(
f"{i}. {stream['width']}x{stream['height']}\t"
f"({stream['filesize_mbs']}MB) {premium_tag}")
def get_user_selection(streams):
try:
selected_index = int(input("Select a stream: ").strip()) - 1
if 0 <= selected_index < len(streams):
return selected_index
print("Invalid selection.")
return None
except ValueError:
print("Invalid input.")
return None
def get_direct_link_from_hanime(url=None):
try:
if url is None:
if len(sys.argv) > 1:
url = sys.argv[1]
else:
url = input("Please enter the hanime.tv video URL: ").strip()
try:
video_data = get_streams(url)
print(f"Video: {video_data['name']}")
print('*' * 40)
display_streams(video_data['streams'])
selected_index = None
while selected_index is None:
selected_index = get_user_selection(video_data['streams'])
print(f"M3U8 URL: {video_data['streams'][selected_index]['url']}")
except ValueError as e:
print(f"Error: {e}")
except KeyboardInterrupt:
pass
if __name__ == "__main__":
get_direct_link_from_hanime()

View File

@@ -0,0 +1,35 @@
import requests
import json
from urllib.parse import urlparse
# TODO Doesn't work on download yet and has to be implemented
def get_direct_link_from_loadx(embeded_loadx_link: str):
response = requests.head(
embeded_loadx_link, allow_redirects=True, verify=False)
parsed_url = urlparse(response.url)
path_parts = parsed_url.path.split("/")
if len(path_parts) < 3:
raise ValueError("Invalid path!")
id_hash = path_parts[2]
host = parsed_url.netloc
post_url = f"https://{host}/player/index.php?data={id_hash}&do=getVideo"
headers = {"X-Requested-With": "XMLHttpRequest"}
response = requests.post(post_url, headers=headers, verify=False)
data = json.loads(response.text)
print(data)
video_url = data.get("videoSource")
if not video_url:
raise ValueError("No Video link found!")
return video_url
if __name__ == '__main__':
url = input("Enter Loadx Link: ")
print(get_direct_link_from_loadx(url))

View File

@@ -0,0 +1,39 @@
import re
import requests
from aniworld import config
def get_direct_link_from_luluvdo(embeded_luluvdo_link, arguments=None):
luluvdo_id = embeded_luluvdo_link.split('/')[-1]
filelink = (
f"https://luluvdo.com/dl?op=embed&file_code={luluvdo_id}&embed=1&referer=luluvdo.com&adb=0"
)
# The User-Agent needs to be the same as the direct-link ones to work
headers = {
"Origin": "https://luluvdo.com",
"Referer": "https://luluvdo.com/",
"User-Agent": config.LULUVDO_USER_AGENT
}
if arguments.action == "Download":
headers["Accept-Language"] = "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7"
response = requests.get(filelink, headers=headers,
timeout=config.DEFAULT_REQUEST_TIMEOUT)
if response.status_code == 200:
pattern = r'file:\s*"([^"]+)"'
matches = re.findall(pattern, str(response.text))
if matches:
return matches[0]
raise ValueError("No match found")
if __name__ == '__main__':
url = input("Enter Luluvdo Link: ")
print(get_direct_link_from_luluvdo(url))

View File

@@ -0,0 +1,43 @@
import re
import base64
import requests
from aniworld.config import DEFAULT_REQUEST_TIMEOUT, RANDOM_USER_AGENT
SPEEDFILES_PATTERN = re.compile(r'var _0x5opu234 = "(?P<encoded_data>.*?)";')
def get_direct_link_from_speedfiles(embeded_speedfiles_link):
response = requests.get(
embeded_speedfiles_link,
timeout=DEFAULT_REQUEST_TIMEOUT,
headers={'User-Agent': RANDOM_USER_AGENT}
)
if "<span class=\"inline-block\">Web server is down</span>" in response.text:
raise ValueError(
"The SpeedFiles server is currently down.\n"
"Please try again later or choose a different hoster."
)
match = SPEEDFILES_PATTERN.search(response.text)
if not match:
raise ValueError("Pattern not found in the response.")
encoded_data = match.group("encoded_data")
decoded = base64.b64decode(encoded_data).decode()
decoded = decoded.swapcase()[::-1]
decoded = base64.b64decode(decoded).decode()[::-1]
decoded_hex = ''.join(chr(int(decoded[i:i + 2], 16))
for i in range(0, len(decoded), 2))
shifted = ''.join(chr(ord(char) - 3) for char in decoded_hex)
result = base64.b64decode(shifted.swapcase()[::-1]).decode()
return result
if __name__ == '__main__':
speedfiles_link = input("Enter Speedfiles Link: ")
print(get_direct_link_from_speedfiles(
embeded_speedfiles_link=speedfiles_link))

View File

@@ -0,0 +1,2 @@
def get_direct_link_from_streamtape(embeded_streamtape_link: str) -> str:
pass

View File

@@ -0,0 +1,34 @@
import re
import requests
from bs4 import BeautifulSoup
from aniworld.config import DEFAULT_REQUEST_TIMEOUT, RANDOM_USER_AGENT
def get_direct_link_from_vidmoly(embeded_vidmoly_link: str):
response = requests.get(
embeded_vidmoly_link,
headers={'User-Agent': RANDOM_USER_AGENT},
timeout=DEFAULT_REQUEST_TIMEOUT
)
html_content = response.text
soup = BeautifulSoup(html_content, 'html.parser')
scripts = soup.find_all('script')
file_link_pattern = r'file:\s*"(https?://.*?)"'
for script in scripts:
if script.string:
match = re.search(file_link_pattern, script.string)
if match:
file_link = match.group(1)
return file_link
raise ValueError("No direct link found.")
if __name__ == '__main__':
link = input("Enter Vidmoly Link: ")
print('Note: --referer "https://vidmoly.to"')
print(get_direct_link_from_vidmoly(embeded_vidmoly_link=link))

View File

@@ -0,0 +1,29 @@
import re
import requests
from bs4 import BeautifulSoup
from aniworld.config import DEFAULT_REQUEST_TIMEOUT, RANDOM_USER_AGENT
def get_direct_link_from_vidoza(embeded_vidoza_link: str) -> str:
response = requests.get(
embeded_vidoza_link,
headers={'User-Agent': RANDOM_USER_AGENT},
timeout=DEFAULT_REQUEST_TIMEOUT
)
soup = BeautifulSoup(response.content, "html.parser")
for tag in soup.find_all('script'):
if 'sourcesCode:' in tag.text:
match = re.search(r'src: "(.*?)"', tag.text)
if match:
return match.group(1)
raise ValueError("No direct link found.")
if __name__ == '__main__':
link = input("Enter Vidoza Link: ")
print(get_direct_link_from_vidoza(embeded_vidoza_link=link))

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,113 @@
import re
import base64
import json
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import requests
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from .Provider import Provider
# Compile regex patterns once for better performance
REDIRECT_PATTERN = re.compile(r"https?://[^'\"<>]+")
B64_PATTERN = re.compile(r"var a168c='([^']+)'")
HLS_PATTERN = re.compile(r"'hls': '(?P<hls>[^']+)'")
class VOE(Provider):
def __init__(self):
self.RANDOM_USER_AGENT = UserAgent().random
self.Header = {
"User-Agent": self.RANDOM_USER_AGENT
}
def GetLink(self, embededLink: str, DEFAULT_REQUEST_TIMEOUT: int) -> (str, [str]):
self.session = requests.Session()
# Configure retries with backoff
retries = Retry(
total=5, # Number of retries
backoff_factor=1, # Delay multiplier (1s, 2s, 4s, ...)
status_forcelist=[500, 502, 503, 504], # Retry for specific HTTP errors
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retries)
self.session.mount("https://", adapter)
DEFAULT_REQUEST_TIMEOUT = 30
response = self.session.get(
embededLink,
headers={'User-Agent': self.RANDOM_USER_AGENT},
timeout=DEFAULT_REQUEST_TIMEOUT
)
redirect = re.search(r"https?://[^'\"<>]+", response.text)
if not redirect:
raise ValueError("No redirect found.")
redirect_url = redirect.group(0)
parts = redirect_url.strip().split("/")
self.Header["Referer"] = f"{parts[0]}//{parts[2]}/"
response = self.session.get(redirect_url, headers={'User-Agent': self.RANDOM_USER_AGENT})
html = response.content
# Method 1: Extract from script tag
extracted = self.extract_voe_from_script(html)
if extracted:
return extracted, self.Header
# Method 2: Extract from base64 encoded variable
htmlText = html.decode('utf-8')
b64_match = B64_PATTERN.search(htmlText)
if b64_match:
decoded = base64.b64decode(b64_match.group(1)).decode()[::-1]
source = json.loads(decoded).get("source")
if source:
return source, self.Header
# Method 3: Extract HLS source
hls_match = HLS_PATTERN.search(htmlText)
if hls_match:
return base64.b64decode(hls_match.group("hls")).decode(), self.Header
def shift_letters(self, input_str):
result = ''
for c in input_str:
code = ord(c)
if 65 <= code <= 90:
code = (code - 65 + 13) % 26 + 65
elif 97 <= code <= 122:
code = (code - 97 + 13) % 26 + 97
result += chr(code)
return result
def replace_junk(self, input_str):
junk_parts = ['@$', '^^', '~@', '%?', '*~', '!!', '#&']
for part in junk_parts:
input_str = re.sub(re.escape(part), '_', input_str)
return input_str
def shift_back(self, s, n):
return ''.join(chr(ord(c) - n) for c in s)
def decode_voe_string(self, encoded):
step1 = self.shift_letters(encoded)
step2 = self.replace_junk(step1).replace('_', '')
step3 = base64.b64decode(step2).decode()
step4 = self.shift_back(step3, 3)
step5 = base64.b64decode(step4[::-1]).decode()
return json.loads(step5)
def extract_voe_from_script(self, html):
soup = BeautifulSoup(html, "html.parser")
script = soup.find("script", type="application/json")
return self.decode_voe_string(script.text[2:-2])["source"]