Files
Aniworld/src/server/providers/aniworld_provider.py
Lukas e74b04c1ee feat: add NFO scan after rescan and year caching
- Add nfo_scan_after_rescan config option (default: true)
- Implement year caching in AniworldLoader and EnhancedAniWorldLoader
- Make get_year abstract method in base provider
- Run NFO validation/creation after scheduled rescan completes
- Add _YearDict cache to avoid re-extracting year from HTML
2026-06-05 18:15:41 +02:00

1090 lines
39 KiB
Python

import html
import json
import logging
import os
import re
import shutil
import threading
from pathlib import Path
from urllib.parse import quote
import chardet
import requests
from bs4 import BeautifulSoup
from events import Events
from fake_useragent import UserAgent
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from yt_dlp import YoutubeDL
from yt_dlp.utils import DownloadCancelled
from ..interfaces.providers import Providers
from .base_provider import Loader
def _cleanup_temp_file(temp_path: str) -> None:
"""Clean up a temp file and any associated partial download files.
Removes the temp file itself and any yt-dlp partial files
(e.g. ``<name>.part``) that may have been left behind.
Args:
temp_path: Absolute or relative path to the temp file.
"""
paths_to_remove = [temp_path]
# yt-dlp writes partial fragments to <file>.part
paths_to_remove.extend(
str(p) for p in Path(temp_path).parent.glob(
Path(temp_path).name + ".*"
)
)
for path in paths_to_remove:
if os.path.exists(path):
try:
os.remove(path)
logger.debug("Removed temp file: %s", path)
except OSError as exc:
logger.warning("Failed to remove temp file %s: %s", path, exc)
# Imported shared provider configuration
from .provider_config import (
ANIWORLD_HEADERS,
DEFAULT_DOWNLOAD_TIMEOUT,
DEFAULT_PROVIDERS,
INVALID_PATH_CHARS,
LULUVDO_USER_AGENT,
ProviderType,
)
logger = logging.getLogger(__name__)
# Configure persistent loggers but don't add duplicate handlers when module
# is imported multiple times (common in test environments).
# Use absolute paths for log files to prevent security issues
# Determine project root (assuming this file is in src/core/providers/)
_module_dir = Path(__file__).parent
_project_root = _module_dir.parent.parent.parent
_logs_dir = _project_root / "logs"
# Ensure logs directory exists
_logs_dir.mkdir(parents=True, exist_ok=True)
download_error_logger = logging.getLogger("DownloadErrors")
if not download_error_logger.handlers:
log_path = _logs_dir / "download_errors.log"
download_error_handler = logging.FileHandler(str(log_path))
download_error_handler.setLevel(logging.ERROR)
download_error_logger.addHandler(download_error_handler)
noKeyFound_logger = logging.getLogger()
def _decode_html_content(content: bytes) -> str:
"""Decode HTML content with encoding detection.
Uses chardet to detect the actual encoding of the content,
falling back to utf-8 with replacement error handling.
Args:
content: Raw HTML bytes from the response
Returns:
Decoded string content
"""
detected = chardet.detect(content)
encoding = detected.get('encoding', 'utf-8')
confidence = detected.get('confidence', 0)
if confidence < 0.7:
logger.debug(
"Low encoding confidence (%.2f) for detected encoding '%s', using utf-8",
confidence,
encoding
)
encoding = 'utf-8'
try:
return content.decode(encoding, errors='replace')
except Exception as exc:
logger.warning("Failed to decode content with %s: %s, using utf-8 replace", encoding, exc)
return content.decode('utf-8', errors='replace')
class AniworldLoader(Loader):
def __init__(self) -> None:
self.SUPPORTED_PROVIDERS = DEFAULT_PROVIDERS
# Copy default AniWorld headers so modifications remain local
self.AniworldHeaders = dict(ANIWORLD_HEADERS)
self.INVALID_PATH_CHARS = INVALID_PATH_CHARS
self.RANDOM_USER_AGENT = UserAgent().random
self.LULUVDO_USER_AGENT = LULUVDO_USER_AGENT
self.PROVIDER_HEADERS = {
ProviderType.VIDMOLY.value: ['Referer: "https://vidmoly.to"'],
ProviderType.DOODSTREAM.value: [
'Referer: "https://dood.li/"',
'Referer: "https://playmogo.com/"',
],
ProviderType.VOE.value: [f"User-Agent: {self.RANDOM_USER_AGENT}"],
ProviderType.LULUVDO.value: [
f"User-Agent: {self.LULUVDO_USER_AGENT}",
"Accept-Language: de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
'Origin: "https://luluvdo.com"',
'Referer: "https://luluvdo.com/"',
],
}
self.ANIWORLD_TO = "https://aniworld.to"
self.session = requests.Session()
# Cancellation flag for graceful shutdown
self._cancel_flag = threading.Event()
# Configure retries with backoff
retries = Retry(
total=5, # Number of retries
backoff_factor=1, # Delay multiplier (1s, 2s, 4s, ...)
status_forcelist=[500, 502, 503, 504],
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retries)
self.session.mount("https://", adapter)
# Default HTTP request timeout used for requests.Session calls.
# Allows overriding via DOWNLOAD_TIMEOUT env var at runtime.
self.DEFAULT_REQUEST_TIMEOUT = int(
os.getenv("DOWNLOAD_TIMEOUT") or DEFAULT_DOWNLOAD_TIMEOUT
)
self._KeyHTMLDict = {}
self._EpisodeHTMLDict = {}
self._YearDict = {}
self.Providers = Providers()
# Events: download_progress is triggered with progress dict
self.events = Events()
def subscribe_download_progress(self, handler):
"""Subscribe a handler to the download_progress event.
Args:
handler: Callable to be called with progress dict.
"""
self.events.download_progress += handler
def unsubscribe_download_progress(self, handler):
"""Unsubscribe a handler from the download_progress event.
Args:
handler: Callable previously subscribed.
"""
self.events.download_progress -= handler
def clear_cache(self):
"""Clear the cached HTML data."""
logger.debug("Clearing HTML cache")
self._KeyHTMLDict = {}
self._EpisodeHTMLDict = {}
logger.debug("HTML cache cleared successfully")
def remove_from_cache(self):
"""Remove episode HTML from cache."""
logger.debug("Removing episode HTML from cache")
self._EpisodeHTMLDict = {}
logger.debug("Episode HTML cache cleared")
def search(self, word: str) -> list:
"""Search for anime series.
Args:
word: Search term
Returns:
List of found series
"""
logger.info("Searching for anime with keyword: '%s'", word)
search_url = (
f"{self.ANIWORLD_TO}/ajax/seriesSearch?keyword={quote(word)}"
)
logger.debug("Search URL: %s", search_url)
anime_list = self.fetch_anime_list(search_url)
logger.info("Found %s anime series for keyword '%s'", len(anime_list), word)
return anime_list
def fetch_anime_list(self, url: str) -> list:
logger.debug("Fetching anime list from URL: %s", url)
response = self.session.get(url, timeout=self.DEFAULT_REQUEST_TIMEOUT)
response.raise_for_status()
logger.debug("Response status code: %s", response.status_code)
clean_text = response.text.strip()
try:
decoded_data = json.loads(html.unescape(clean_text))
logger.debug("Successfully decoded JSON data on first attempt")
return decoded_data if isinstance(decoded_data, list) else []
except json.JSONDecodeError:
logger.warning("Initial JSON decode failed, attempting cleanup")
try:
# Remove BOM and problematic characters
clean_text = clean_text.encode('utf-8').decode('utf-8-sig')
# Remove problematic characters
clean_text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', clean_text)
# Parse the new text
decoded_data = json.loads(clean_text)
logger.debug("Successfully decoded JSON after cleanup")
return decoded_data if isinstance(decoded_data, list) else []
except (requests.RequestException, json.JSONDecodeError) as exc:
logger.error("Failed to decode anime list from %s: %s", url, exc)
raise ValueError("Could not get valid anime: ") from exc
def _get_language_key(self, language: str) -> int:
"""Convert language name to language code.
Language Codes:
1: German Dub
2: English Sub
3: German Sub
"""
language_code = 0
if language == "German Dub":
language_code = 1
if language == "English Sub":
language_code = 2
if language == "German Sub":
language_code = 3
logger.debug("Converted language '%s' to code %s", language, language_code)
return language_code
def is_language(
self,
season: int,
episode: int,
key: str,
language: str = "German Dub"
) -> bool:
"""Check if episode is available in specified language."""
logger.debug("Checking if S%02dE%03d (%s) is available in %s", season, episode, key, language)
language_code = self._get_language_key(language)
episode_soup = BeautifulSoup(
_decode_html_content(self._get_episode_html(season, episode, key).content),
'html.parser'
)
change_language_box_div = episode_soup.find(
'div', class_='changeLanguageBox')
languages = []
if change_language_box_div:
img_tags = change_language_box_div.find_all('img')
for img in img_tags:
lang_key = img.get('data-lang-key')
if lang_key and lang_key.isdigit():
languages.append(int(lang_key))
is_available = language_code in languages
logger.debug("Available languages for S%02dE%03d: %s, requested: %s, available: %s", season, episode, languages, language_code, is_available)
return is_available
def _check_url_alive(
self,
url: str,
headers: dict | None = None,
timeout: int = 10,
) -> bool:
"""Probe a provider URL with HEAD before committing to yt-dlp.
Skips dead providers quickly so the failover loop never blocks
waiting for yt-dlp to fail on a 404. Falls back to a streaming
GET when HEAD is not allowed by the upstream server.
Args:
url: URL to probe.
headers: Optional headers to forward with the probe.
timeout: Per-request timeout (seconds).
Returns:
True when the URL responds with a non-4xx status, else False.
"""
try:
response = self.session.head(
url,
headers=headers,
timeout=timeout,
allow_redirects=True,
)
if response.status_code == 405:
response = self.session.get(
url,
headers=headers,
timeout=timeout,
stream=True,
allow_redirects=True,
)
response.close()
if 400 <= response.status_code < 500:
logger.warning(
"Provider URL returned HTTP %s: %s",
response.status_code, url
)
return False
return True
except requests.RequestException as exc:
logger.warning("Provider URL unreachable %s: %s", url, exc)
return False
def _try_direct_stream(
self,
link: str,
output_path: str,
headers: dict | None,
timeout: int,
) -> bool:
"""Stream a direct video URL to disk without yt-dlp.
Used as a fast-path when the resolved provider link already points
at a downloadable video file (``Content-Type: video/*`` or
``application/octet-stream``). HLS and other non-video payloads
are rejected so the caller can fall back to yt-dlp.
Args:
link: Direct download URL.
output_path: Destination file path.
headers: Optional HTTP headers.
timeout: Per-request timeout (seconds).
Returns:
True on a successful save, False when the link is not a
direct video or the download fails.
"""
try:
with self.session.get(
link,
headers=headers,
timeout=timeout,
stream=True,
) as response:
if not response.ok:
logger.debug(
"Direct stream HEAD returned %s for %s",
response.status_code, link[:80]
)
return False
content_type = response.headers.get("Content-Type", "")
if not (
content_type.startswith("video/")
or content_type == "application/octet-stream"
):
logger.debug(
"Direct stream skipped, Content-Type=%s",
content_type
)
return False
logger.info(
"Direct stream download starting (type=%s)",
content_type
)
with open(output_path, "wb") as fh:
for chunk in response.iter_content(chunk_size=1024 * 1024):
if self._cancel_flag.is_set():
logger.info(
"Cancellation detected during direct stream"
)
return False
if chunk:
fh.write(chunk)
return True
except requests.RequestException as exc:
logger.warning("Direct stream download failed: %s", exc)
return False
def download(
self,
base_directory: str,
serie_folder: str,
season: int,
episode: int,
key: str,
language: str = "German Dub"
) -> bool:
"""Download episode to specified directory.
Iterates the providers actually advertised on the episode page
(ordered by SUPPORTED_PROVIDERS preference), probing each URL
before attempting an extraction so dead providers are skipped
immediately instead of stalling yt-dlp on a 404.
Args:
base_directory: Base download directory path
serie_folder: Filesystem folder name (metadata only, used for
file path construction)
season: Season number
episode: Episode number
key: Series unique identifier from provider (used for
identification and API calls)
language: Audio language preference (default: German Dub)
Returns:
bool: True if download succeeded, False otherwise
"""
logger.info(
"Starting download for S%02dE%03d (%s) in %s",
season, episode, key, language
)
sanitized_anime_title = ''.join(
char for char in self.get_title(key)
if char not in self.INVALID_PATH_CHARS
)
logger.debug("Sanitized anime title: %s", sanitized_anime_title)
if season == 0:
output_file = (
f"{sanitized_anime_title} - "
f"Movie {episode:02} - "
f"({language}).mp4"
)
else:
output_file = (
f"{sanitized_anime_title} - "
f"S{season:02}E{episode:03} - "
f"({language}).mp4"
)
folder_path = os.path.join(
os.path.join(base_directory, serie_folder),
f"Season {season}"
)
output_path = os.path.join(folder_path, output_file)
logger.debug("Output path: %s", output_path)
os.makedirs(os.path.dirname(output_path), exist_ok=True)
temp_dir = "./Temp/"
os.makedirs(os.path.dirname(temp_dir), exist_ok=True)
temp_path = os.path.join(temp_dir, output_file)
logger.debug("Temporary path: %s", temp_path)
candidate_providers = self._select_providers_for_episode(
season, episode, key, language
)
if not candidate_providers:
logger.error(
"No providers advertised for S%02dE%03d (%s) in %s",
season, episode, key, language
)
self.clear_cache()
return False
tried: list[str] = []
for provider_name, redirect_url in candidate_providers:
tried.append(provider_name)
logger.debug("Attempting download with provider: %s", provider_name)
probe_headers = {"User-Agent": self.RANDOM_USER_AGENT}
if not self._check_url_alive(
redirect_url,
headers=probe_headers,
timeout=self.DEFAULT_REQUEST_TIMEOUT,
):
logger.info(
"Skipping provider %s, redirect URL not reachable",
provider_name
)
continue
try:
resolved = self._resolve_direct_link(
redirect_url, provider_name
)
except Exception as exc:
logger.warning(
"Provider %s link resolution failed: %s: %s",
provider_name, type(exc).__name__, exc
)
continue
if resolved is None:
logger.info(
"Provider %s returned no direct link", provider_name
)
continue
link, header = resolved
if self._cancel_flag.is_set():
logger.info("Cancellation requested before download start")
_cleanup_temp_file(temp_path)
self.clear_cache()
return False
if self._try_direct_stream(
link,
temp_path,
header,
self.DEFAULT_REQUEST_TIMEOUT,
) and os.path.exists(temp_path):
logger.debug(
"Direct stream succeeded with provider %s", provider_name
)
shutil.copyfile(temp_path, output_path)
os.remove(temp_path)
logger.info(
"Download completed successfully (direct): %s",
output_file
)
self.clear_cache()
return True
_cleanup_temp_file(temp_path)
cancel_flag = self._cancel_flag
def events_progress_hook(d):
if cancel_flag.is_set():
logger.info("Cancellation detected in progress hook")
raise DownloadCancelled("Download cancelled by user")
self.events.download_progress(d)
ydl_opts = {
'fragment_retries': float('inf'),
'outtmpl': temp_path,
'quiet': True,
'no_warnings': True,
'progress_with_newline': False,
'nocheckcertificate': True,
'logger': logger,
'progress_hooks': [events_progress_hook],
# yt-dlp defaults to native HLS downloader which warns about
# "Live HLS streams are not supported" - disable to go
# straight to ffmpeg, avoiding the warning
'hls_prefer_native': False,
}
if header:
ydl_opts['http_headers'] = header
logger.debug("Using custom headers for download")
try:
logger.info(
"Starting yt-dlp download with %s: %s",
provider_name, output_file
)
logger.debug("Download link: %s...", link[:100])
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(link, download=True)
logger.debug(
"Download info: title=%s, filesize=%s",
info.get('title'), info.get('filesize')
)
if os.path.exists(temp_path):
logger.debug("Moving file from temp to final destination")
shutil.copyfile(temp_path, output_path)
os.remove(temp_path)
logger.info(
"Download completed successfully: %s", output_file
)
self.clear_cache()
return True
logger.error(
"Download failed: temp file not found at %s", temp_path
)
except DownloadCancelled:
logger.info("Download cancelled by user")
_cleanup_temp_file(temp_path)
self.clear_cache()
return False
except BrokenPipeError as exc:
logger.error(
"Broken pipe error with provider %s: %s",
provider_name, exc
)
_cleanup_temp_file(temp_path)
continue
except Exception as exc:
# Check if this is an HLS-related failure that might succeed
# with additional ffmpeg options
exc_str = str(exc).lower()
is_hls_related = (
'hls' in exc_str or
'live' in exc_str or
'native downloader' in exc_str
)
if is_hls_related and 'ffmpeg' not in str(ydl_opts.get('downloader', '')):
logger.info(
"HLS stream detected, retrying with ffmpeg options: %s",
output_file
)
# Retry with ffmpeg explicitly set
retry_opts = ydl_opts.copy()
retry_opts['downloader'] = 'ffmpeg'
retry_opts['hls_use_mpegts'] = True
try:
with YoutubeDL(retry_opts) as ydl:
info = ydl.extract_info(link, download=True)
if os.path.exists(temp_path):
shutil.copyfile(temp_path, output_path)
os.remove(temp_path)
logger.info(
"Download completed successfully (retry): %s",
output_file
)
self.clear_cache()
return True
except Exception:
_cleanup_temp_file(temp_path)
# Continue to next provider if retry also fails
continue
logger.error(
"YoutubeDL download failed with provider %s: %s: %s",
provider_name, type(exc).__name__, exc
)
_cleanup_temp_file(temp_path)
continue
logger.error(
"All download providers failed for S%02dE%03d (%s) in %s. "
"Tried: %s. Episode may be unavailable on the source site.",
season, episode, key, language, ", ".join(tried) or "none"
)
download_error_logger.error(
"All providers failed for %s S%02dE%03d (%s); tried=%s",
key, season, episode, language, tried
)
_cleanup_temp_file(temp_path)
self.clear_cache()
return False
def _select_providers_for_episode(
self,
season: int,
episode: int,
key: str,
language: str,
) -> list[tuple[str, str]]:
"""Return ``[(provider_name, redirect_url), ...]`` for an episode.
Filters by requested language and orders results by
``SUPPORTED_PROVIDERS`` preference so the failover chain matches
operator expectations. Returns an empty list when nothing is
advertised on the page.
"""
if not self.is_language(season, episode, key, language):
logger.warning(
"Language %s not advertised for S%02dE%03d (%s)",
language, season, episode, key
)
return []
language_code = self._get_language_key(language)
providers = self._get_provider_from_html(season, episode, key)
ordered: list[tuple[str, str]] = []
preferred = list(self.SUPPORTED_PROVIDERS)
for name in preferred:
lang_map = providers.get(name)
if lang_map and language_code in lang_map:
ordered.append((name, lang_map[language_code]))
for name, lang_map in providers.items():
if name in preferred:
continue
if language_code in lang_map:
ordered.append((name, lang_map[language_code]))
return ordered
def _resolve_direct_link(
self,
redirect_url: str,
provider_name: str,
) -> tuple[str, dict] | None:
"""Resolve a provider redirect URL into a direct stream link.
Follows the redirect to the embedded player, then delegates to a
provider-specific extractor (when registered) or returns the
embed URL itself so yt-dlp can attempt extraction.
Args:
redirect_url: AniWorld redirect URL.
provider_name: Provider key (e.g. ``"VOE"``).
Returns:
``(direct_link, headers)`` tuple or None when extraction fails.
"""
try:
embedded = self.session.get(
redirect_url,
timeout=self.DEFAULT_REQUEST_TIMEOUT,
headers={"User-Agent": self.RANDOM_USER_AGENT},
allow_redirects=True,
).url
except requests.RequestException as exc:
logger.warning(
"Failed resolving redirect for %s: %s", provider_name, exc
)
return None
try:
extractor = self.Providers.GetProvider(provider_name)
except (KeyError, AttributeError):
extractor = None
if extractor is not None:
try:
return extractor.get_link(
embedded, self.DEFAULT_REQUEST_TIMEOUT
)
except Exception as exc:
logger.warning(
"Custom extractor %s failed: %s",
provider_name, exc
)
return None
header_list = self.PROVIDER_HEADERS.get(provider_name)
header_dict = self._parse_provider_headers(header_list)
return embedded, header_dict
@staticmethod
def _parse_provider_headers(
header_list: list | None,
) -> dict[str, str]:
"""Convert legacy ``"Name: value"`` header strings to a dict."""
if not header_list:
return {}
parsed: dict[str, str] = {}
for entry in header_list:
if not isinstance(entry, str) or ":" not in entry:
continue
name, _, value = entry.partition(":")
parsed[name.strip()] = value.strip().strip('"')
return parsed
def get_site_key(self) -> str:
"""Get the site key for this provider."""
return "aniworld.to"
def get_title(self, key: str) -> str:
"""Get anime title from series key."""
logger.debug("Getting title for key: %s", key)
soup = BeautifulSoup(
_decode_html_content(self._get_key_html(key).content),
'html.parser'
)
title_div = soup.find('div', class_='series-title')
if title_div:
h1_tag = title_div.find('h1')
span_tag = h1_tag.find('span') if h1_tag else None
if span_tag:
title = span_tag.text
logger.debug("Found title: %s", title)
# Also try to extract year from sibling p tag "Jahr: {year}"
# Year is typically right after title in the HTML structure
year = self._extract_year_from_soup(soup)
if year is not None:
self._YearDict[key] = year
logger.debug("Cached year %d for key: %s", year, key)
return title
logger.warning("No title found for key: %s", key)
return ""
def _extract_year_from_soup(self, soup: BeautifulSoup) -> int | None:
"""Extract year from BeautifulSoup object.
Looks for 'Jahr: {year}' pattern in p tags adjacent to series-title.
Args:
soup: Parsed BeautifulSoup object
Returns:
Year as int or None if not found
"""
# Try to find year in metadata
for p_tag in soup.find_all('p'):
text = p_tag.get_text()
if 'Jahr:' in text or 'Year:' in text:
match = re.search(r'(\d{4})', text)
if match:
return int(match.group(1))
# Fallback: look in series-info div
info_div = soup.find('div', class_='series-info')
if info_div:
text = info_div.get_text()
match = re.search(r'\b(19\d{2}|20\d{2})\b', text)
if match:
return int(match.group(1))
return None
def get_year(self, key: str) -> int | None:
"""Get anime release year from series key.
Uses cached year from get_title if available,
otherwise extracts and caches it.
Args:
key: Series identifier
Returns:
Release year or None if not found
"""
logger.debug("Getting year for key: %s", key)
# Check cache first
if key in self._YearDict:
logger.debug("Using cached year %d for key: %s", self._YearDict[key], key)
return self._YearDict[key]
# Not cached - extract from HTML
try:
soup = BeautifulSoup(
_decode_html_content(self._get_key_html(key).content),
'html.parser'
)
year = self._extract_year_from_soup(soup)
if year is not None:
self._YearDict[key] = year
logger.debug("Found and cached year %d for key: %s", year, key)
return year
except Exception as e:
logger.warning("Error extracting year for key %s: %s", key, e)
return None
def _get_key_html(self, key: str):
"""Get cached HTML for series key.
Args:
key: Series identifier (will be URL-encoded for safety)
Returns:
Cached or fetched HTML response
"""
if key in self._KeyHTMLDict:
logger.debug("Using cached HTML for key: %s", key)
return self._KeyHTMLDict[key]
# Sanitize key parameter for URL
safe_key = quote(key, safe='')
url = f"{self.ANIWORLD_TO}/anime/stream/{safe_key}"
logger.debug("Fetching HTML for key: %s from %s", key, url)
self._KeyHTMLDict[key] = self.session.get(
url,
timeout=self.DEFAULT_REQUEST_TIMEOUT
)
logger.debug("Cached HTML for key: %s", key)
return self._KeyHTMLDict[key]
def _get_episode_html(self, season: int, episode: int, key: str):
"""Get cached HTML for episode.
Args:
season: Season number (validated to be positive)
episode: Episode number (validated to be positive)
key: Series identifier (will be URL-encoded for safety)
Returns:
Cached or fetched HTML response
Raises:
ValueError: If season or episode are invalid
"""
# Validate season and episode numbers
if season < 1 or season > 999:
logger.error("Invalid season number: %s", season)
raise ValueError(f"Invalid season number: {season}")
if episode < 1 or episode > 9999:
logger.error("Invalid episode number: %s", episode)
raise ValueError(f"Invalid episode number: {episode}")
if key in self._EpisodeHTMLDict:
logger.debug("Using cached HTML for S%02dE%03d (%s)", season, episode, key)
return self._EpisodeHTMLDict[(key, season, episode)]
# Sanitize key parameter for URL
safe_key = quote(key, safe='')
link = (
f"{self.ANIWORLD_TO}/anime/stream/{safe_key}/"
f"staffel-{season}/episode-{episode}"
)
logger.debug("Fetching episode HTML from: %s", link)
html = self.session.get(link, timeout=self.DEFAULT_REQUEST_TIMEOUT)
self._EpisodeHTMLDict[(key, season, episode)] = html
logger.debug("Cached episode HTML for S%02dE%03d (%s)", season, episode, key)
return self._EpisodeHTMLDict[(key, season, episode)]
def _get_provider_from_html(
self,
season: int,
episode: int,
key: str
) -> dict:
"""Parse HTML content to extract streaming providers.
Returns a dictionary with provider names as keys
and language key-to-redirect URL mappings as values.
Example:
{
'VOE': {1: 'https://aniworld.to/redirect/1766412',
2: 'https://aniworld.to/redirect/1766405'},
}
"""
logger.debug("Extracting providers from HTML for S%02dE%03d (%s)", season, episode, key)
soup = BeautifulSoup(
_decode_html_content(self._get_episode_html(season, episode, key).content),
'html.parser'
)
providers: dict[str, dict[int, str]] = {}
episode_links = soup.find_all(
'li', class_=lambda x: x and x.startswith('episodeLink')
)
if not episode_links:
logger.warning("No episode links found for S%02dE%03d (%s)", season, episode, key)
return providers
for link in episode_links:
provider_name_tag = link.find('h4')
provider_name = (
provider_name_tag.text.strip()
if provider_name_tag else None
)
redirect_link_tag = link.find('a', class_='watchEpisode')
redirect_link = (
redirect_link_tag.get('href')
if redirect_link_tag else None
)
lang_key = link.get('data-lang-key')
lang_key = (
int(lang_key)
if lang_key and lang_key.isdigit() else None
)
if provider_name and redirect_link and lang_key:
if provider_name not in providers:
providers[provider_name] = {}
providers[provider_name][lang_key] = (
f"{self.ANIWORLD_TO}{redirect_link}"
)
logger.debug("Found provider: %s, lang_key: %s", provider_name, lang_key)
logger.debug("Total providers found: %s", len(providers))
return providers
def _get_redirect_link(
self,
season: int,
episode: int,
key: str,
language: str = "German Dub"
):
"""Get redirect link for episode in specified language."""
logger.debug("Getting redirect link for S%02dE%03d (%s) in %s", season, episode, key, language)
language_code = self._get_language_key(language)
if self.is_language(season, episode, key, language):
for (provider_name, lang_dict) in (
self._get_provider_from_html(
season, episode, key
).items()
):
if language_code in lang_dict:
logger.debug("Found redirect link with provider: %s", provider_name)
return (lang_dict[language_code], provider_name)
logger.warning("No redirect link found for S%02dE%03d (%s) in %s", season, episode, key, language)
return None
def _get_embeded_link(
self,
season: int,
episode: int,
key: str,
language: str = "German Dub"
):
"""Get embedded link from redirect link."""
logger.debug("Getting embedded link for S%02dE%03d (%s) in %s", season, episode, key, language)
redirect_link, provider_name = (
self._get_redirect_link(season, episode, key, language)
)
logger.debug("Redirect link: %s, provider: %s", redirect_link, provider_name)
embeded_link = self.session.get(
redirect_link,
timeout=self.DEFAULT_REQUEST_TIMEOUT,
headers={'User-Agent': self.RANDOM_USER_AGENT}
).url
logger.debug("Embedded link: %s", embeded_link)
return embeded_link
def _get_direct_link_from_provider(
self,
season: int,
episode: int,
key: str,
language: str = "German Dub"
):
"""Get direct download link from streaming provider."""
logger.debug("Getting direct link from provider for S%02dE%03d (%s) in %s", season, episode, key, language)
embeded_link = self._get_embeded_link(
season, episode, key, language
)
if embeded_link is None:
logger.error("No embedded link found for S%02dE%03d (%s)", season, episode, key)
return None
logger.debug("Using VOE provider to extract direct link")
return self.Providers.GetProvider(
"VOE"
).get_link(embeded_link, self.DEFAULT_REQUEST_TIMEOUT)
def get_season_episode_count(self, slug: str) -> dict:
"""Get episode count for each season.
Args:
slug: Series identifier (will be URL-encoded for safety)
Returns:
Dictionary mapping season numbers to episode counts
"""
logger.info("Getting season and episode count for slug: %s", slug)
# Sanitize slug parameter for URL
safe_slug = quote(slug, safe='')
base_url = f"{self.ANIWORLD_TO}/anime/stream/{safe_slug}/"
logger.debug("Base URL: %s", base_url)
response = requests.get(base_url, timeout=self.DEFAULT_REQUEST_TIMEOUT)
soup = BeautifulSoup(_decode_html_content(response.content), 'html.parser')
season_meta = soup.find('meta', itemprop='numberOfSeasons')
number_of_seasons = int(season_meta['content']) if season_meta else 0
logger.info("Found %s seasons for '%s'", number_of_seasons, slug)
episode_counts = {}
for season in range(1, number_of_seasons + 1):
season_url = f"{base_url}staffel-{season}"
logger.debug("Fetching episodes for season %s from: %s", season, season_url)
response = requests.get(
season_url,
timeout=self.DEFAULT_REQUEST_TIMEOUT,
)
soup = BeautifulSoup(_decode_html_content(response.content), 'html.parser')
episode_links = soup.find_all('a', href=True)
unique_links = set(
link['href']
for link in episode_links
if f"staffel-{season}/episode-" in link['href']
)
episode_counts[season] = len(unique_links)
logger.debug("Season %s has %s episodes", season, episode_counts[season])
logger.info("Episode count retrieval complete for '%s': %s", slug, episode_counts)
return episode_counts