Improve docs and security defaults

This commit is contained in:
2025-10-22 15:22:58 +02:00
parent ebb0769ed4
commit 92795cf9b3
16 changed files with 283 additions and 180 deletions

View File

@@ -124,6 +124,9 @@ class SerieScanner:
total_to_scan = self.get_total_to_scan()
logger.info("Total folders to scan: %d", total_to_scan)
# The scanner enumerates folders with mp4 files, loads existing
# metadata, calculates the missing episodes via the provider, and
# persists the refreshed metadata while emitting progress events.
result = self.__find_mp4_files()
counter = 0
@@ -137,6 +140,9 @@ class SerieScanner:
else:
percentage = 0.0
# Progress is surfaced both through the callback manager
# (for the web/UI layer) and, for compatibility, through a
# legacy callback that updates CLI progress bars.
# Notify progress
self._callback_manager.notify_progress(
ProgressContext(
@@ -160,12 +166,16 @@ class SerieScanner:
serie is not None
and not self.is_null_or_whitespace(serie.key)
):
missings, site = (
# Delegate the provider to compare local files with
# remote metadata, yielding missing episodes per
# season. Results are saved back to disk so that both
# CLI and API consumers see consistent state.
missing_episodes, site = (
self.__get_missing_episodes_and_season(
serie.key, mp4_files
)
)
serie.episodeDict = missings
serie.episodeDict = missing_episodes
serie.folder = folder
data_path = os.path.join(
self.directory, folder, 'data'

View File

@@ -241,7 +241,9 @@ class SeriesApp:
message="Download cancelled before starting"
)
# Wrap callback to check for cancellation and report progress
# Wrap callback to enforce cancellation checks and bridge the new
# event-driven progress reporting with the legacy callback API that
# the CLI still relies on.
def wrapped_callback(progress: float):
if self._is_cancelled():
raise InterruptedError("Download cancelled by user")
@@ -268,6 +270,9 @@ class SeriesApp:
if callback:
callback(progress)
# Propagate progress into the legacy callback chain so existing
# UI surfaces continue to receive updates without rewriting the
# old interfaces.
# Call legacy progress_callback if provided
if self.progress_callback:
self.progress_callback(ProgressInfo(
@@ -403,7 +408,9 @@ class SeriesApp:
# Reinitialize scanner
self.SerieScanner.reinit()
# Wrap callback for progress reporting and cancellation
# Wrap the scanner callback so we can surface progress through the
# new ProgressInfo pipeline while maintaining backwards
# compatibility with the legacy tuple-based callback signature.
def wrapped_callback(folder: str, current: int):
if self._is_cancelled():
raise InterruptedError("Scan cancelled by user")

View File

@@ -1,56 +1,99 @@
import os
import json
"""Utilities for loading and managing stored anime series metadata."""
import logging
from .series import Serie
import os
from json import JSONDecodeError
from typing import Dict, Iterable, List
from src.core.entities.series import Serie
class SerieList:
def __init__(self, basePath: str):
self.directory = basePath
self.folderDict: dict[str, Serie] = {} # Proper initialization
"""Represents the collection of cached series stored on disk."""
def __init__(self, base_path: str) -> None:
self.directory: str = base_path
self.folderDict: Dict[str, Serie] = {}
self.load_series()
def add(self, serie: Serie):
if (not self.contains(serie.key)):
dataPath = os.path.join(self.directory, serie.folder, "data")
animePath = os.path.join(self.directory, serie.folder)
os.makedirs(animePath, exist_ok=True)
if not os.path.isfile(dataPath):
serie.save_to_file(dataPath)
self.folderDict[serie.folder] = serie;
def add(self, serie: Serie) -> None:
"""Persist a new series if it is not already present."""
if self.contains(serie.key):
return
data_path = os.path.join(self.directory, serie.folder, "data")
anime_path = os.path.join(self.directory, serie.folder)
os.makedirs(anime_path, exist_ok=True)
if not os.path.isfile(data_path):
serie.save_to_file(data_path)
self.folderDict[serie.folder] = serie
def contains(self, key: str) -> bool:
for k, value in self.folderDict.items():
if value.key == key:
return True
return False
"""Return True when a series identified by ``key`` already exists."""
def load_series(self):
""" Scan folders and load data files """
logging.info(f"Scanning anime folders in: {self.directory}")
for anime_folder in os.listdir(self.directory):
return any(value.key == key for value in self.folderDict.values())
def load_series(self) -> None:
"""Populate the in-memory map with metadata discovered on disk."""
logging.info("Scanning anime folders in %s", self.directory)
try:
entries: Iterable[str] = os.listdir(self.directory)
except OSError as error:
logging.error(
"Unable to scan directory %s: %s",
self.directory,
error,
)
return
for anime_folder in entries:
anime_path = os.path.join(self.directory, anime_folder, "data")
if os.path.isfile(anime_path):
logging.debug(f"Found data folder: {anime_path}")
self.load_data(anime_folder, anime_path)
else:
logging.warning(f"Skipping {anime_folder} - No data folder found")
logging.debug("Found data file for folder %s", anime_folder)
self._load_data(anime_folder, anime_path)
continue
logging.warning(
"Skipping folder %s because no metadata file was found",
anime_folder,
)
def _load_data(self, anime_folder: str, data_path: str) -> None:
"""Load a single series metadata file into the in-memory collection."""
def load_data(self, anime_folder, data_path):
""" Load pickle files from the data folder """
try:
self.folderDict[anime_folder] = Serie.load_from_file(data_path)
logging.debug(f"Successfully loaded {data_path} for {anime_folder}")
except Exception as e:
logging.error(f"Failed to load {data_path} in {anime_folder}: {e}")
logging.debug("Successfully loaded metadata for %s", anime_folder)
except (OSError, JSONDecodeError, KeyError, ValueError) as error:
logging.error(
"Failed to load metadata for folder %s from %s: %s",
anime_folder,
data_path,
error,
)
def GetMissingEpisode(self) -> List[Serie]:
"""Return all series that still contain missing episodes."""
return [
serie
for serie in self.folderDict.values()
if serie.episodeDict
]
def get_missing_episodes(self) -> List[Serie]:
"""PEP8-friendly alias for :meth:`GetMissingEpisode`."""
return self.GetMissingEpisode()
def GetList(self) -> List[Serie]:
"""Return all series instances stored in the list."""
def GetMissingEpisode(self):
"""Find all series with a non-empty episodeDict"""
return [serie for serie in self.folderDict.values() if len(serie.episodeDict) > 0]
def GetList(self):
"""Get all series in the list"""
return list(self.folderDict.values())
def get_all(self) -> List[Serie]:
"""PEP8-friendly alias for :meth:`GetList`."""
#k = AnimeList("\\\\sshfs.r\\ubuntu@192.168.178.43\\media\\serien\\Serien")
#bbabab = k.GetMissingEpisode()
#print(bbabab)
return self.GetList()

View File

@@ -37,7 +37,10 @@ from .base_provider import Loader
class EnhancedAniWorldLoader(Loader):
"""Enhanced AniWorld loader with comprehensive error handling."""
"""Aniworld provider with retry and recovery strategies.
Also exposes metrics hooks for download statistics.
"""
def __init__(self):
super().__init__()
@@ -211,7 +214,9 @@ class EnhancedAniWorldLoader(Loader):
if not word or not word.strip():
raise ValueError("Search term cannot be empty")
search_url = f"{self.ANIWORLD_TO}/ajax/seriesSearch?keyword={quote(word)}"
search_url = (
f"{self.ANIWORLD_TO}/ajax/seriesSearch?keyword={quote(word)}"
)
try:
return self._fetch_anime_list_with_recovery(search_url)
@@ -250,7 +255,9 @@ class EnhancedAniWorldLoader(Loader):
clean_text = response_text.strip()
# Try multiple parsing strategies
# Try multiple parsing strategies. We progressively relax the parsing
# requirements to handle HTML-escaped payloads, stray BOM markers, and
# control characters injected by the upstream service.
parsing_strategies = [
lambda text: json.loads(html.unescape(text)),
lambda text: json.loads(text.encode('utf-8').decode('utf-8-sig')),

View File

@@ -1,5 +1,8 @@
"""Resolve Doodstream embed players into direct download URLs."""
import random
import re
import string
import time
from typing import Any
@@ -8,6 +11,12 @@ from fake_useragent import UserAgent
from .Provider import Provider
# Precompiled regex patterns to extract the ``pass_md5`` endpoint and the
# session token embedded in the obfuscated player script. Compiling once keeps
# repeated invocations fast and documents the parsing intent.
PASS_MD5_PATTERN = re.compile(r"\$\.get\('([^']*/pass_md5/[^']*)'")
TOKEN_PATTERN = re.compile(r"token=([a-zA-Z0-9]+)")
class Doodstream(Provider):
"""Doodstream video provider implementation."""
@@ -33,17 +42,15 @@ class Doodstream(Provider):
"Referer": "https://dood.li/",
}
def extract_data(pattern: str, content: str) -> str | None:
"""Extract data using regex pattern."""
match = re.search(pattern, content)
def extract_data(pattern: re.Pattern[str], content: str) -> str | None:
"""Extract data using a compiled regex pattern."""
match = pattern.search(content)
return match.group(1) if match else None
def generate_random_string(length: int = 10) -> str:
"""Generate random alphanumeric string."""
characters = (
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
)
return "".join(random.choice(characters) for _ in range(length))
charset = string.ascii_letters + string.digits
return "".join(random.choices(charset, k=length))
response = requests.get(
embedded_link,
@@ -53,15 +60,13 @@ class Doodstream(Provider):
)
response.raise_for_status()
pass_md5_pattern = r"\$\.get\('([^']*\/pass_md5\/[^']*)'"
pass_md5_url = extract_data(pass_md5_pattern, response.text)
pass_md5_url = extract_data(PASS_MD5_PATTERN, response.text)
if not pass_md5_url:
raise ValueError(f"pass_md5 URL not found using {embedded_link}.")
full_md5_url = f"https://dood.li{pass_md5_url}"
token_pattern = r"token=([a-zA-Z0-9]+)"
token = extract_data(token_pattern, response.text)
token = extract_data(TOKEN_PATTERN, response.text)
if not token:
raise ValueError(f"Token not found using {embedded_link}.")

View File

@@ -1,8 +1,12 @@
"""Resolve Filemoon embed pages into direct streaming asset URLs."""
import re
import requests
from aniworld import config
# import jsbeautifier.unpackers.packer as packer
from aniworld import config
REDIRECT_REGEX = re.compile(
r'<iframe *(?:[^>]+ )?src=(?:\'([^\']+)\'|"([^"]+)")[^>]*>')

View File

@@ -1,6 +1,9 @@
import re
"""Helpers for extracting direct stream URLs from hanime.tv pages."""
import json
import re
import sys
import requests
from aniworld.config import DEFAULT_REQUEST_TIMEOUT
@@ -83,7 +86,7 @@ def get_direct_link_from_hanime(url=None):
except ValueError as e:
print(f"Error: {e}")
except KeyboardInterrupt:
pass
print("\nOperation cancelled by user.")
if __name__ == "__main__":