diff --git a/QualityTODO.md b/QualityTODO.md index 7188b3a..500a6f1 100644 --- a/QualityTODO.md +++ b/QualityTODO.md @@ -106,19 +106,12 @@ conda run -n AniWorld python -m pytest tests/ -v -s **Class Docstrings** -- [ ] `src/server/utils/dependencies.py` - `CommonQueryParams` class lacks docstring - **Method/Function Docstrings** -- [ ] `src/server/utils/logging.py` - Check helper functions -- [ ] `src/server/utils/template_helpers.py` - Check helper functions - #### Unclear Variable Names #### Unclear Comments or Missing Context -- [ ] `src/core/providers/enhanced_provider.py` line 231 - - Comment style inconsistent with rest of codebase - [ ] `src/server/api/download.py` line 51 - Backward compatibility comment clear but needs more detail @@ -128,32 +121,14 @@ conda run -n AniWorld python -m pytest tests/ -v -s #### Complex Algorithms Without Comments -**Episode Discovery Pattern Matching** - -- [ ] `src/core/providers/streaming/*.py` files - - Regex patterns for extracting streams (complex) - - Each provider has different extraction logic - - Need comments explaining regex patterns - - Example: `src/core/providers/streaming/doodstream.py` line 35 - **JSON/HTML Parsing Logic** -**Session Retry Configuration** - -- [ ] `src/core/providers/enhanced_provider.py` lines 108-125 - - Session retry configuration with backoff - - Line 115: status codes list needs explanation - - Comment should explain why these specific codes need retry - --- ### 5️⃣ No Shortcuts or Hacks Used #### Code Smells and Shortcuts -- [ ] `src/core/providers/streaming/Provider.py` line 7 - Abstract method implementation should not have pass - **Bare Pass Statements (Incomplete Implementation)** - **Duplicate Code** - [ ] `src/cli/Main.py` vs `src/core/SeriesApp.py` diff --git a/src/cli/Main.py b/src/cli/Main.py index eb2812d..1a3a32a 100644 --- a/src/cli/Main.py +++ b/src/cli/Main.py @@ -1,341 +1,316 @@ -"""Command-line interface for the Aniworld anime download manager. +"""Command-line interface for the Aniworld anime download manager.""" -This module provides an interactive CLI for searching, selecting, and -downloading anime series. It coordinates between the SerieScanner for -finding missing episodes and the provider loaders for downloading content. -""" import logging import os -import time -from typing import Any, Callable, Mapping, Optional, Sequence +from typing import Optional, Sequence from rich.progress import Progress -from ..core.entities import SerieList -from ..core.entities.series import Serie -from ..core.providers.provider_factory import Loaders -from ..core.SerieScanner import SerieScanner +from src.core.entities.series import Serie +from src.core.SeriesApp import SeriesApp as CoreSeriesApp -# Configure logging -log_format = "%(asctime)s - %(levelname)s - %(funcName)s - %(message)s" -logging.basicConfig(level=logging.FATAL, format=log_format) -console_handler = logging.StreamHandler() -console_handler.setLevel(logging.ERROR) -console_handler.setFormatter(logging.Formatter(log_format)) -for h in logging.root.handlers: - logging.root.removeHandler(h) +LOG_FORMAT = "%(asctime)s - %(levelname)s - %(name)s - %(message)s" -logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR) -logging.getLogger('charset_normalizer').setLevel(logging.ERROR) -logging.getLogger().setLevel(logging.ERROR) -for h in logging.getLogger().handlers: - logging.getLogger().removeHandler(h) +logger = logging.getLogger(__name__) -class NoKeyFoundException(Exception): - """Exception raised when an anime key cannot be found.""" - pass +class SeriesCLI: + """Thin wrapper around :class:`SeriesApp` providing an interactive CLI.""" - -class MatchNotFoundError(Exception): - """Exception raised when an anime key cannot be found.""" - pass - - -class SeriesApp: - """Interactive CLI controller orchestrating scanning and downloads.""" - - _initialization_count = 0 # Track initialization calls - def __init__(self, directory_to_search: str) -> None: - SeriesApp._initialization_count += 1 - - # Only show initialization message for the first instance - if SeriesApp._initialization_count <= 1: - print("Please wait while initializing...") - - self.progress: Optional[Progress] = None + print("Please wait while initializing...") self.directory_to_search = directory_to_search - self.Loaders: Loaders = Loaders() - loader = self.Loaders.GetLoader(key="aniworld.to") - self.SerieScanner = SerieScanner(directory_to_search, loader) + self.series_app = CoreSeriesApp(directory_to_search) - self.List = SerieList(self.directory_to_search) - self.__init_list__() + self._progress: Optional[Progress] = None + self._overall_task_id: Optional[int] = None + self._series_task_id: Optional[int] = None + self._episode_task_id: Optional[int] = None + self._scan_task_id: Optional[int] = None - def __init_list__(self) -> None: - """Initialize the series list by fetching missing episodes.""" - self.series_list: Sequence[Serie] = self.List.GetMissingEpisode() + # ------------------------------------------------------------------ + # Utility helpers + # ------------------------------------------------------------------ + def _get_series_list(self) -> Sequence[Serie]: + """Return the currently cached series with missing episodes.""" + return self.series_app.get_series_list() + # ------------------------------------------------------------------ + # Display & selection + # ------------------------------------------------------------------ def display_series(self) -> None: """Print all series with assigned numbers.""" - print("\nCurrent result:") - for i, serie in enumerate(self.series_list, 1): - name = serie.name # Access the property on the instance - if name is None or str(name).strip() == "": - print(f"{i}. {serie.folder}") - else: - print(f"{i}. {serie.name}") - - def search(self, words: str) -> list[dict[str, Any]]: - """Search for anime series by name.""" - loader = self.Loaders.GetLoader(key="aniworld.to") - return loader.search(words) - - def get_user_selection(self) -> Optional[Sequence[Serie]]: - """Handle user input for selecting series.""" - self.display_series() - while True: - prompt = ( - "\nSelect series by number (e.g. '1', '1,2' or 'all') " - "or type 'exit' to return: " - ) - selection = input(prompt).strip().lower() - - if selection == "exit": - return None - - selected_series: list[Serie] = [] - if selection == "all": - selected_series = list(self.series_list) - else: - try: - indexes = [ - int(num) - 1 for num in selection.split(",") - ] - selected_series = [ - self.series_list[i] - for i in indexes - if 0 <= i < len(self.series_list) - ] - except ValueError: - msg = ( - "Invalid selection. " - "Going back to the result display." - ) - print(msg) - self.display_series() - continue - - if selected_series: - return selected_series - else: - msg = ( - "No valid series selected. " - "Going back to the result display." - ) - print(msg) - return None - - def retry( - self, - func: Callable[..., Any], - max_retries: int = 3, - delay: float = 2, - *args: Any, - **kwargs: Any, - ) -> bool: - """Retry a function with exponential backoff. - - Args: - func: Function to retry - max_retries: Maximum number of retry attempts - delay: Delay in seconds between retries - *args: Positional arguments for the function - **kwargs: Keyword arguments for the function - - Returns: - True if function succeeded, False otherwise - """ - for attempt in range(1, max_retries + 1): - try: - func(*args, **kwargs) - return True - except Exception as e: - print(e) - time.sleep(delay) - return False - - def download_series(self, series: Sequence[Serie]) -> None: - """Simulate the downloading process with a progress bar.""" - total_downloaded = 0 - total_episodes = sum( - sum(len(ep) for ep in serie.episodeDict.values()) - for serie in series - ) - self.progress = Progress() - task1 = self.progress.add_task( - "[red]Processing...", total=total_episodes - ) - task2 = self.progress.add_task("[green]...", total=0) - # Set total to 100 for percentage display - self.download_progress_task = self.progress.add_task( - "[Gray]...", total=100 - ) - self.progress.start() - - for serie in series: - serie_episodes = sum( - len(ep) for ep in serie.episodeDict.values() - ) - self.progress.update( - task2, - description=f"[green]{serie.folder}", - total=serie_episodes, - ) - downloaded = 0 - for season, episodes in serie.episodeDict.items(): - for episode in episodes: - loader = self.Loaders.GetLoader(key="aniworld.to") - if loader.is_language(season, episode, serie.key): - self.retry( - loader.download, - 3, - 1, - self.directory_to_search, - serie.folder, - season, - episode, - serie.key, - "German Dub", - self.print_download_progress, - ) - - downloaded += 1 - total_downloaded += 1 - - self.progress.update(task1, advance=1) - self.progress.update(task2, advance=1) - time.sleep(0.02) - - self.progress.stop() - self.progress = None - - def print_download_progress(self, d: Mapping[str, Any]) -> None: - """Update download progress in the UI. - - Args: - d: Dictionary containing download status information - """ - # Use self.progress and self.download_progress_task to display progress - if ( - self.progress is None - or not hasattr(self, "download_progress_task") - ): + series = self._get_series_list() + if not series: + print("\nNo series with missing episodes were found.") return - if d["status"] == "downloading": - total = ( - d.get("total_bytes") - or d.get("total_bytes_estimate") + print("\nCurrent result:") + for index, serie in enumerate(series, start=1): + name = (serie.name or "").strip() + label = name if name else serie.folder + print(f"{index}. {label}") + + def get_user_selection(self) -> Optional[Sequence[Serie]]: + """Prompt the user to select one or more series for download.""" + series = list(self._get_series_list()) + if not series: + print("No series available for download.") + return None + + self.display_series() + prompt = ( + "\nSelect series by number (e.g. '1', '1,2' or 'all') " + "or type 'exit' to return: " + ) + selection = input(prompt).strip().lower() + + if selection in {"exit", ""}: + return None + + if selection == "all": + return series + + try: + indexes = [ + int(value.strip()) - 1 + for value in selection.split(",") + ] + except ValueError: + print("Invalid selection. Returning to main menu.") + return None + + chosen = [ + series[i] + for i in indexes + if 0 <= i < len(series) + ] + + if not chosen: + print("No valid series selected.") + return None + + return chosen + + # ------------------------------------------------------------------ + # Download logic + # ------------------------------------------------------------------ + def download_series(self, series: Sequence[Serie]) -> None: + """Download all missing episodes for the provided series list.""" + total_episodes = sum( + len(episodes) + for serie in series + for episodes in serie.episodeDict.values() + ) + + if total_episodes == 0: + print("Selected series do not contain missing episodes.") + return + + self._progress = Progress() + with self._progress: + self._overall_task_id = self._progress.add_task( + "[red]Processing...", total=total_episodes ) - downloaded = d.get("downloaded_bytes", 0) - if total: - percent = downloaded / total * 100 - desc = f"[gray]Download: {percent:.1f}%" - self.progress.update( - self.download_progress_task, - completed=percent, - description=desc - ) - else: - mb_downloaded = downloaded / 1024 / 1024 - desc = f"[gray]{mb_downloaded:.2f}MB geladen" - self.progress.update( - self.download_progress_task, description=desc - ) - elif d["status"] == "finished": - desc = "[gray]Download abgeschlossen." - self.progress.update( - self.download_progress_task, - completed=100, - description=desc + self._series_task_id = self._progress.add_task( + "[green]Current series", total=1 + ) + self._episode_task_id = self._progress.add_task( + "[gray]Download", total=100 ) + for serie in series: + serie_total = sum(len(eps) for eps in serie.episodeDict.values()) + self._progress.update( + self._series_task_id, + total=max(serie_total, 1), + completed=0, + description=f"[green]{serie.folder}", + ) + + for season, episodes in serie.episodeDict.items(): + for episode in episodes: + if not self.series_app.loader.is_language( + season, episode, serie.key + ): + logger.info( + "Skipping %s S%02dE%02d because the desired language is unavailable", + serie.folder, + season, + episode, + ) + continue + + result = self.series_app.download( + serieFolder=serie.folder, + season=season, + episode=episode, + key=serie.key, + callback=self._update_download_progress, + ) + + if not result.success: + logger.error("Download failed: %s", result.message) + + self._progress.advance(self._overall_task_id) + self._progress.advance(self._series_task_id) + self._progress.update( + self._episode_task_id, + completed=0, + description="[gray]Waiting...", + ) + + self._progress = None + self.series_app.refresh_series_list() + + def _update_download_progress(self, percent: float) -> None: + """Update the episode progress bar based on download progress.""" + if not self._progress or self._episode_task_id is None: + return + + description = f"[gray]Download: {percent:.1f}%" + self._progress.update( + self._episode_task_id, + completed=percent, + description=description, + ) + + # ------------------------------------------------------------------ + # Rescan logic + # ------------------------------------------------------------------ + def rescan(self) -> None: + """Trigger a rescan of the anime directory using the core app.""" + total_to_scan = self.series_app.SerieScanner.get_total_to_scan() + total_to_scan = max(total_to_scan, 1) + + self._progress = Progress() + with self._progress: + self._scan_task_id = self._progress.add_task( + "[red]Scanning folders...", + total=total_to_scan, + ) + + result = self.series_app.ReScan( + callback=self._wrap_scan_callback(total_to_scan) + ) + + self._progress = None + self._scan_task_id = None + + if result.success: + print(result.message) + else: + print(f"Scan failed: {result.message}") + + def _wrap_scan_callback(self, total: int): + """Create a callback that updates the scan progress bar.""" + + def _callback(folder: str, current: int) -> None: + if not self._progress or self._scan_task_id is None: + return + + self._progress.update( + self._scan_task_id, + completed=min(current, total), + description=f"[green]{folder}", + ) + + return _callback + + # ------------------------------------------------------------------ + # Search & add logic + # ------------------------------------------------------------------ def search_mode(self) -> None: - """Search for a series and allow user to select an option.""" - search_string = input("Enter search string: ").strip() - results = self.search(search_string) + """Search for a series and add it to the local list if chosen.""" + query = input("Enter search string: ").strip() + if not query: + return + results = self.series_app.search(query) if not results: - print("No results found. Returning to start.") + print("No results found. Returning to main menu.") return print("\nSearch results:") - for i, result in enumerate(results, 1): - print(f"{i}. {result.get('name')}") + for index, result in enumerate(results, start=1): + print(f"{index}. {result.get('name', 'Unknown')}") - while True: - prompt = ( - "\nSelect an option by number or type '' to return: " - ) - selection = input(prompt).strip().lower() + selection = input( + "\nSelect an option by number or press to cancel: " + ).strip() - if selection == "": - return + if selection == "": + return - try: - index = int(selection) - 1 - if 0 <= index < len(results): - chosen_name = results[index] - serie = Serie( - chosen_name["link"], - chosen_name["name"], - "aniworld.to", - chosen_name["link"], - {}, - ) - self.List.add(serie) - return - else: - print("Invalid selection. Try again.") - except ValueError: - print("Invalid input. Try again.") + try: + chosen_index = int(selection) - 1 + except ValueError: + print("Invalid input. Returning to main menu.") + return - def updateFromReinit(self, folder: str, counter: int) -> None: - self.progress.update(self.task1, advance=1) + if not (0 <= chosen_index < len(results)): + print("Invalid selection. Returning to main menu.") + return + chosen = results[chosen_index] + serie = Serie( + chosen.get("link", ""), + chosen.get("name", "Unknown"), + "aniworld.to", + chosen.get("link", ""), + {}, + ) + self.series_app.List.add(serie) + self.series_app.refresh_series_list() + print(f"Added '{serie.name}' to the local catalogue.") + + # ------------------------------------------------------------------ + # Main loop + # ------------------------------------------------------------------ def run(self) -> None: - """Main function to run the app.""" + """Run the interactive CLI loop.""" while True: - prompt = ( - "\nChoose action ('s' for search, 'i' for init " - "or 'd' for download): " - ) - action = input(prompt).strip().lower() + action = input( + "\nChoose action ('s' for search, 'i' for rescan, 'd' for download, 'q' to quit): " + ).strip().lower() if action == "s": self.search_mode() - if action == "i": + elif action == "i": print("\nRescanning series...\n") - - self.progress = Progress() - task1 = self.progress.add_task( - "[red]items processed...", total=300 - ) - self.task1 = task1 - self.progress.start() - - self.SerieScanner.reinit() - self.SerieScanner.scan(self.updateFromReinit) - - self.List = SerieList(self.directory_to_search) - self.__InitList__() - - self.progress.stop() - self.progress = None - + self.rescan() elif action == "d": selected_series = self.get_user_selection() if selected_series: self.download_series(selected_series) + elif action in {"q", "quit", "exit"}: + print("Goodbye!") + break + else: + print("Unknown command. Please choose 's', 'i', 'd', or 'q'.") + + +def configure_logging() -> None: + """Set up a basic logging configuration for the CLI.""" + logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) + logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR) + logging.getLogger("charset_normalizer").setLevel(logging.ERROR) + + +def main() -> None: + """Entry point for the CLI application.""" + configure_logging() + + default_dir = os.getenv("ANIME_DIRECTORY") + if not default_dir: + print( + "Environment variable ANIME_DIRECTORY is not set. Please configure it to the base anime directory." + ) + return + + app = SeriesCLI(default_dir) + app.run() if __name__ == "__main__": - # Read the base directory from an environment variable - default_dir = ( - "\\\\sshfs.r\\ubuntu@192.168.178.43\\media\\serien\\Serien" - ) - directory_to_search = os.getenv("ANIME_DIRECTORY", default_dir) - app = SeriesApp(directory_to_search) - app.run() + main() diff --git a/src/core/SeriesApp.py b/src/core/SeriesApp.py index da5742d..d5f37b7 100644 --- a/src/core/SeriesApp.py +++ b/src/core/SeriesApp.py @@ -577,6 +577,10 @@ class SeriesApp: """ return self.series_list + def refresh_series_list(self) -> None: + """Reload the cached series list from the underlying data store.""" + self.__InitList__() + def get_operation_status(self) -> OperationStatus: """ Get the current operation status. diff --git a/src/core/providers/enhanced_provider.py b/src/core/providers/enhanced_provider.py index 73f3adc..ec53885 100644 --- a/src/core/providers/enhanced_provider.py +++ b/src/core/providers/enhanced_provider.py @@ -5,14 +5,12 @@ This module extends the original AniWorldLoader with comprehensive error handling, retry mechanisms, and recovery strategies. """ -import hashlib import html import json import logging import os import re import shutil -import time from typing import Any, Callable, Dict, Optional from urllib.parse import quote @@ -148,13 +146,28 @@ class EnhancedAniWorldLoader(Loader): """Create a session with robust retry and error handling configuration.""" session = requests.Session() - # Enhanced retry strategy + # Configure retries so transient network problems are retried while we + # still fail fast on permanent errors. The status codes cover + # timeouts, rate limits, and the Cloudflare-origin 52x responses that + # AniWorld occasionally emits under load. retries = Retry( total=5, backoff_factor=2, # More aggressive backoff - status_forcelist=[408, 429, 500, 502, 503, 504, 520, 521, 522, 523, 524], + status_forcelist=[ + 408, + 429, + 500, + 502, + 503, + 504, + 520, + 521, + 522, + 523, + 524, + ], allowed_methods=["GET", "POST", "HEAD"], - raise_on_status=False # Handle status errors manually + raise_on_status=False, # Handle status errors manually ) adapter = HTTPAdapter( @@ -255,9 +268,9 @@ class EnhancedAniWorldLoader(Loader): clean_text = response_text.strip() - # Try multiple parsing strategies. We progressively relax the parsing - # requirements to handle HTML-escaped payloads, stray BOM markers, and - # control characters injected by the upstream service. + # Attempt increasingly permissive parsing strategies to cope with + # upstream anomalies such as HTML escaping, stray BOM markers, and + # injected control characters. parsing_strategies = [ lambda text: json.loads(html.unescape(text)), lambda text: json.loads(text.encode('utf-8').decode('utf-8-sig')), diff --git a/src/core/providers/streaming/Provider.py b/src/core/providers/streaming/Provider.py index 766251a..e7ef382 100644 --- a/src/core/providers/streaming/Provider.py +++ b/src/core/providers/streaming/Provider.py @@ -21,4 +21,7 @@ class Provider(ABC): - direct_link: Direct URL to download resource - headers: Dictionary of HTTP headers to use for download """ + raise NotImplementedError( + "Streaming providers must implement get_link" + ) diff --git a/src/core/providers/streaming/filemoon.py b/src/core/providers/streaming/filemoon.py index 6a69a9d..0481e13 100644 --- a/src/core/providers/streaming/filemoon.py +++ b/src/core/providers/streaming/filemoon.py @@ -8,10 +8,14 @@ from aniworld import config # import jsbeautifier.unpackers.packer as packer +# Match the embedded ``iframe`` pointing to the actual Filemoon player. REDIRECT_REGEX = re.compile( r'