From 2de3317aee391d98a726aebc35bb534e9a623c6a Mon Sep 17 00:00:00 2001 From: Lukas Date: Sun, 2 Nov 2025 09:52:43 +0100 Subject: [PATCH] refactoring backup --- infrastructure.md | 2 - src/core/SeriesApp.py | 668 ++++-------------------- src/server/services/download_service.py | 4 +- tests/unit/test_series_app.py | 53 -- 4 files changed, 90 insertions(+), 637 deletions(-) diff --git a/infrastructure.md b/infrastructure.md index 4f2722d..ac8a2ac 100644 --- a/infrastructure.md +++ b/infrastructure.md @@ -771,8 +771,6 @@ The `SeriesApp` class (`src/core/SeriesApp.py`) is the main application engine f - `search(words)`: Search for anime series - `download()`: Download episodes with progress tracking - `ReScan()`: Scan directory for missing episodes -- `async_download()`: Async version of download -- `async_rescan()`: Async version of rescan - `cancel_operation()`: Cancel current operation - `get_operation_status()`: Get current status - `get_series_list()`: Get series with missing episodes diff --git a/src/core/SeriesApp.py b/src/core/SeriesApp.py index 57705dd..031fa83 100644 --- a/src/core/SeriesApp.py +++ b/src/core/SeriesApp.py @@ -3,59 +3,20 @@ SeriesApp - Core application logic for anime series management. This module provides the main application interface for searching, downloading, and managing anime series with support for async callbacks, -progress reporting, error handling, and operation cancellation. +progress reporting, and error handling. """ import asyncio import logging -import uuid -from dataclasses import dataclass -from enum import Enum from typing import Any, Callable, Dict, List, Optional from src.core.entities.SerieList import SerieList -from src.core.interfaces.callbacks import ( - CallbackManager, - CompletionContext, - ErrorContext, - OperationType, - ProgressContext, - ProgressPhase, -) from src.core.providers.provider_factory import Loaders from src.core.SerieScanner import SerieScanner logger = logging.getLogger(__name__) -class OperationStatus(Enum): - """Status of an operation.""" - IDLE = "idle" - RUNNING = "running" - COMPLETED = "completed" - CANCELLED = "cancelled" - FAILED = "failed" - - -@dataclass -class ProgressInfo: - """Progress information for long-running operations.""" - current: int - total: int - message: str - percentage: float - status: OperationStatus - - -@dataclass -class OperationResult: - """Result of an operation.""" - success: bool - message: str - data: Optional[Any] = None - error: Optional[Exception] = None - - class SeriesApp: """ Main application class for anime series management. @@ -66,7 +27,7 @@ class SeriesApp: - Scanning directories for missing episodes - Managing series lists - Supports async callbacks for progress reporting and cancellation. + Supports async callbacks for progress reporting. """ _initialization_count = 0 @@ -74,18 +35,12 @@ class SeriesApp: def __init__( self, directory_to_search: str, - progress_callback: Optional[Callable[[ProgressInfo], None]] = None, - error_callback: Optional[Callable[[Exception], None]] = None, - callback_manager: Optional[CallbackManager] = None ): """ Initialize SeriesApp. Args: directory_to_search: Base directory for anime series - progress_callback: Optional legacy callback for progress updates - error_callback: Optional legacy callback for error notifications - callback_manager: Optional callback manager for new callback system """ SeriesApp._initialization_count += 1 @@ -94,60 +49,41 @@ class SeriesApp: logger.info("Initializing SeriesApp...") self.directory_to_search = directory_to_search - self.progress_callback = progress_callback - self.error_callback = error_callback - # Initialize new callback system - self._callback_manager = callback_manager or CallbackManager() + self.loaders = Loaders() + self.loader = self.loaders.GetLoader(key="aniworld.to") + self.serie_scanner = SerieScanner( + directory_to_search, + self.loader + ) + self.list = SerieList(self.directory_to_search) + # Synchronous init used during constructor to avoid awaiting in __init__ + self._init_list_sync() - # Cancellation support - self._cancel_flag = False - self._current_operation: Optional[str] = None - self._current_operation_id: Optional[str] = None - self._operation_status = OperationStatus.IDLE - - # Initialize components - try: - self.Loaders = Loaders() - self.loader = self.Loaders.GetLoader(key="aniworld.to") - self.SerieScanner = SerieScanner( - directory_to_search, - self.loader, - self._callback_manager - ) - self.List = SerieList(self.directory_to_search) - self.__InitList__() - - logger.info( - "SeriesApp initialized for directory: %s", - directory_to_search - ) - except (IOError, OSError, RuntimeError) as e: - logger.error("Failed to initialize SeriesApp: %s", e) - self._handle_error(e) - raise + logger.info( + "SeriesApp initialized for directory: %s", + directory_to_search + ) - @property - def callback_manager(self) -> CallbackManager: - """Get the callback manager instance.""" - return self._callback_manager + def _init_list_sync(self) -> None: + """Synchronous initialization helper for constructor.""" + self.series_list = self.list.GetMissingEpisode() + logger.debug( + "Loaded %d series with missing episodes", + len(self.series_list) + ) - def __InitList__(self): - """Initialize the series list with missing episodes.""" - try: - self.series_list = self.List.GetMissingEpisode() - logger.debug( - "Loaded %d series with missing episodes", - len(self.series_list) - ) - except (IOError, OSError, RuntimeError) as e: - logger.error("Failed to initialize series list: %s", e) - self._handle_error(e) - raise + async def _init_list(self) -> None: + """Initialize the series list with missing episodes (async).""" + self.series_list = await asyncio.to_thread(self.list.GetMissingEpisode) + logger.debug( + "Loaded %d series with missing episodes", + len(self.series_list) + ) - def search(self, words: str) -> List[Dict[str, Any]]: + async def search(self, words: str) -> List[Dict[str, Any]]: """ - Search for anime series. + Search for anime series (async). Args: words: Search query @@ -158,525 +94,97 @@ class SeriesApp: Raises: RuntimeError: If search fails """ - try: - logger.info("Searching for: %s", words) - results = self.loader.search(words) - logger.info("Found %d results", len(results)) - return results - except (IOError, OSError, RuntimeError) as e: - logger.error("Search failed for '%s': %s", words, e) - self._handle_error(e) - raise + logger.info("Searching for: %s", words) + results = await asyncio.to_thread(self.loader.search, words) + logger.info("Found %d results", len(results)) + return results - def download( + async def download( self, - serieFolder: str, + serie_folder: str, season: int, episode: int, key: str, - callback: Optional[Callable[[float], None]] = None, language: str = "German Dub" - ) -> OperationResult: + ) -> bool: """ - Download an episode. + Download an episode (async). Args: - serieFolder: Serie folder name + serie_folder: Serie folder name season: Season number episode: Episode number key: Serie key - callback: Optional legacy progress callback language: Language preference Returns: - OperationResult with download status + True if download succeeded, False otherwise """ - self._current_operation = f"download_S{season:02d}E{episode:02d}" - self._current_operation_id = str(uuid.uuid4()) - self._operation_status = OperationStatus.RUNNING - self._cancel_flag = False - - try: - logger.info( - "Starting download: %s S%02dE%02d", - serieFolder, season, episode - ) - - # Notify download starting - start_msg = ( - f"Starting download: {serieFolder} " - f"S{season:02d}E{episode:02d}" - ) - self._callback_manager.notify_progress( - ProgressContext( - operation_type=OperationType.DOWNLOAD, - operation_id=self._current_operation_id, - phase=ProgressPhase.STARTING, - current=0, - total=100, - percentage=0.0, - message=start_msg, - metadata={ - "series": serieFolder, - "season": season, - "episode": episode, - "key": key, - "language": language - } - ) - ) - - # Check for cancellation before starting - if self._is_cancelled(): - self._callback_manager.notify_completion( - CompletionContext( - operation_type=OperationType.DOWNLOAD, - operation_id=self._current_operation_id, - success=False, - message="Download cancelled before starting" - ) - ) - return OperationResult( - success=False, - message="Download cancelled before starting" - ) - - # Wrap callback to enforce cancellation checks and bridge the new - # event-driven progress reporting with the legacy callback API that - # the CLI still relies on. - def wrapped_callback(progress_info): - logger.debug(f"wrapped_callback called with: {progress_info}") - - if self._is_cancelled(): - raise InterruptedError("Download cancelled by user") - - # yt-dlp passes a dict with progress information - # Only process progress updates when status is 'downloading' - # (yt-dlp also sends 'finished', 'error', etc.) - if isinstance(progress_info, dict): - status = progress_info.get('status') - if status and status != 'downloading': - logger.debug( - f"Skipping progress update with status: {status}" - ) - return - - # Extract percentage from the dict - # Calculate percentage based on downloaded/total bytes - downloaded = progress_info.get('downloaded_bytes', 0) - total_bytes = ( - progress_info.get('total_bytes') - or progress_info.get('total_bytes_estimate', 0) - ) - - if total_bytes > 0: - progress = (downloaded / total_bytes) * 100 - else: - progress = 0 - - # Extract speed and ETA from yt-dlp progress dict - speed = progress_info.get('speed', 0) # bytes/sec - eta = progress_info.get('eta') # seconds - - logger.debug( - f"Progress calculation: {downloaded}/{total_bytes} = " - f"{progress:.1f}%, speed={speed}, eta={eta}" - ) - - # Convert to expected format for web API callback - # Web API expects: percent, downloaded_mb, total_mb, - # speed_mbps, eta_seconds - web_progress_dict = { - 'percent': progress, - # Convert bytes to MB - 'downloaded_mb': downloaded / (1024 * 1024), - 'total_mb': ( - total_bytes / (1024 * 1024) - if total_bytes > 0 - else None - ), - # Convert bytes/sec to MB/sec - 'speed_mbps': ( - speed / (1024 * 1024) if speed else None - ), - 'eta_seconds': eta, - } - else: - # Fallback for old-style float progress - progress = float(progress_info) - web_progress_dict = { - 'percent': progress, - 'downloaded_mb': 0.0, - 'total_mb': None, - 'speed_mbps': None, - 'eta_seconds': None, - } - logger.debug(f"Old-style progress: {progress}%") - - # Notify progress via new callback system - self._callback_manager.notify_progress( - ProgressContext( - operation_type=OperationType.DOWNLOAD, - operation_id=self._current_operation_id, - phase=ProgressPhase.IN_PROGRESS, - current=int(progress), - total=100, - percentage=progress, - message=f"Downloading: {progress:.1f}%", - metadata={ - "series": serieFolder, - "season": season, - "episode": episode - } - ) - ) - - # Call callback with web API format - # (dict with detailed progress info) - if callback: - logger.debug( - f"Calling progress callback: {web_progress_dict}" - ) - try: - callback(web_progress_dict) - logger.debug("Progress callback executed successfully") - except Exception as e: - logger.error( - f"Error in progress callback: {e}", - exc_info=True - ) - - # Propagate progress into the legacy callback chain so - # existing UI surfaces continue to receive updates without - # rewriting the old interfaces. - # Call legacy progress_callback if provided - if self.progress_callback: - self.progress_callback(ProgressInfo( - current=int(progress), - total=100, - message=f"Downloading S{season:02d}E{episode:02d}", - percentage=progress, - status=OperationStatus.RUNNING - )) - - # Perform download - download_success = self.loader.download( - self.directory_to_search, - serieFolder, - season, - episode, - key, - language, - wrapped_callback - ) - - # Check if download was successful - if not download_success: - raise RuntimeError( - f"Download failed for S{season:02d}E{episode:02d}" - ) - - self._operation_status = OperationStatus.COMPLETED - logger.info( - "Download completed: %s S%02dE%02d", - serieFolder, season, episode - ) - - # Notify completion - msg = f"Successfully downloaded S{season:02d}E{episode:02d}" - self._callback_manager.notify_completion( - CompletionContext( - operation_type=OperationType.DOWNLOAD, - operation_id=self._current_operation_id, - success=True, - message=msg, - statistics={ - "series": serieFolder, - "season": season, - "episode": episode - } - ) - ) - - return OperationResult( - success=True, - message=msg - ) - - except InterruptedError as e: - self._operation_status = OperationStatus.CANCELLED - logger.warning("Download cancelled: %s", e) - - # Notify cancellation - self._callback_manager.notify_completion( - CompletionContext( - operation_type=OperationType.DOWNLOAD, - operation_id=self._current_operation_id, - success=False, - message="Download cancelled" - ) - ) - - return OperationResult( - success=False, - message="Download cancelled", - error=e - ) - except (IOError, OSError, RuntimeError) as e: - self._operation_status = OperationStatus.FAILED - logger.error("Download failed: %s", e) - - # Notify error - error_msg = f"Download failed: {str(e)}" - self._callback_manager.notify_error( - ErrorContext( - operation_type=OperationType.DOWNLOAD, - operation_id=self._current_operation_id, - error=e, - message=error_msg, - recoverable=False, - metadata={ - "series": serieFolder, - "season": season, - "episode": episode - } - ) - ) - - # Notify completion with failure - self._callback_manager.notify_completion( - CompletionContext( - operation_type=OperationType.DOWNLOAD, - operation_id=self._current_operation_id, - success=False, - message=error_msg - ) - ) - - self._handle_error(e) - return OperationResult( - success=False, - message=error_msg, - error=e - ) - finally: - self._current_operation = None - self._current_operation_id = None - - def ReScan( - self, - callback: Optional[Callable[[str, int], None]] = None - ) -> OperationResult: - """ - Rescan directory for missing episodes. - - Args: - callback: Optional progress callback (folder, current_count) - - Returns: - OperationResult with scan status - """ - self._current_operation = "rescan" - self._operation_status = OperationStatus.RUNNING - self._cancel_flag = False - - try: - logger.info("Starting directory rescan") - - # Get total items to scan - total_to_scan = self.SerieScanner.get_total_to_scan() - logger.info("Total folders to scan: %d", total_to_scan) - - # Reinitialize scanner - self.SerieScanner.reinit() - - # Wrap the scanner callback so we can surface progress through the - # new ProgressInfo pipeline while maintaining backwards - # compatibility with the legacy tuple-based callback signature. - def wrapped_callback(folder: str, current: int): - if self._is_cancelled(): - raise InterruptedError("Scan cancelled by user") - - # Calculate progress - if total_to_scan > 0: - percentage = (current / total_to_scan * 100) - else: - percentage = 0 - - # Report progress - if self.progress_callback: - progress_info = ProgressInfo( - current=current, - total=total_to_scan, - message=f"Scanning: {folder}", - percentage=percentage, - status=OperationStatus.RUNNING - ) - self.progress_callback(progress_info) - - # Call original callback if provided - if callback: - callback(folder, current) - - # Perform scan - self.SerieScanner.scan(wrapped_callback) - - # Reinitialize list - self.List = SerieList(self.directory_to_search) - self.__InitList__() - - self._operation_status = OperationStatus.COMPLETED - logger.info("Directory rescan completed successfully") - - msg = ( - f"Scan completed. Found {len(self.series_list)} " - f"series." - ) - return OperationResult( - success=True, - message=msg, - data={"series_count": len(self.series_list)} - ) - - except InterruptedError as e: - self._operation_status = OperationStatus.CANCELLED - logger.warning("Scan cancelled: %s", e) - return OperationResult( - success=False, - message="Scan cancelled", - error=e - ) - except (IOError, OSError, RuntimeError) as e: - self._operation_status = OperationStatus.FAILED - logger.error("Scan failed: %s", e) - self._handle_error(e) - return OperationResult( - success=False, - message=f"Scan failed: {str(e)}", - error=e - ) - finally: - self._current_operation = None - - async def async_download( - self, - serieFolder: str, - season: int, - episode: int, - key: str, - callback: Optional[Callable[[float], None]] = None, - language: str = "German Dub" - ) -> OperationResult: - """ - Async version of download method. - - Args: - serieFolder: Serie folder name - season: Season number - episode: Episode number - key: Serie key - callback: Optional progress callback - language: Language preference - - Returns: - OperationResult with download status - """ - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - self.download, - serieFolder, + logger.info( + "Starting download: %s S%02dE%02d", + serie_folder, + season, + episode + ) + # Perform download in thread to avoid blocking event loop + download_success = await asyncio.to_thread( + self.loader.download, + self.directory_to_search, + serie_folder, season, episode, key, - callback, language ) - async def async_rescan( - self, - callback: Optional[Callable[[str, int], None]] = None - ) -> OperationResult: - """ - Async version of ReScan method. - - Args: - callback: Optional progress callback - - Returns: - OperationResult with scan status - """ - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - self.ReScan, - callback + logger.info( + "Download completed: %s S%02dE%02d", + serie_folder, + season, + episode ) + + return download_success + - def cancel_operation(self) -> bool: + async def re_scan( + self + ) -> int: """ - Cancel the current operation. + Rescan directory for missing episodes (async). Returns: - True if operation cancelled, False if no operation running + Number of series with missing episodes after rescan. """ - if (self._current_operation and - self._operation_status == OperationStatus.RUNNING): - logger.info( - "Cancelling operation: %s", - self._current_operation - ) - self._cancel_flag = True - return True - return False - - def _is_cancelled(self) -> bool: - """Check if the current operation has been cancelled.""" - return self._cancel_flag - - def _handle_error(self, error: Exception) -> None: - """ - Handle errors and notify via callback. + logger.info("Starting directory rescan") - Args: - error: Exception that occurred - """ - if self.error_callback: - try: - self.error_callback(error) - except (RuntimeError, ValueError) as callback_error: - logger.error( - "Error in error callback: %s", - callback_error - ) + # Get total items to scan + total_to_scan = await asyncio.to_thread(self.serie_scanner.get_total_to_scan) + logger.info("Total folders to scan: %d", total_to_scan) + + # Reinitialize scanner + await asyncio.to_thread(self.serie_scanner.reinit) - def get_series_list(self) -> List[Any]: + # Perform scan + await asyncio.to_thread(self.serie_scanner.scan) + + # Reinitialize list + self.list = SerieList(self.directory_to_search) + await self._init_list() + + logger.info("Directory rescan completed successfully") + + return len(self.series_list) + + async def get_series_list(self) -> List[Any]: """ - Get the current series list. + Get the current series list (async). Returns: List of series with missing episodes """ return self.series_list - def refresh_series_list(self) -> None: - """Reload the cached series list from the underlying data store.""" - self.__InitList__() - - def get_operation_status(self) -> OperationStatus: - """ - Get the current operation status. - - Returns: - Current operation status - """ - return self._operation_status - - def get_current_operation(self) -> Optional[str]: - """ - Get the current operation name. - - Returns: - Name of current operation or None - """ - return self._current_operation + async def refresh_series_list(self) -> None: + """Reload the cached series list from the underlying data store (async).""" + await self._init_list() diff --git a/src/server/services/download_service.py b/src/server/services/download_service.py index 61aef92..ca68c8a 100644 --- a/src/server/services/download_service.py +++ b/src/server/services/download_service.py @@ -71,7 +71,7 @@ class DownloadService: self._max_retries = max_retries self._persistence_path = Path(persistence_path) self._progress_service = progress_service or get_progress_service() - + # Queue storage by status self._pending_queue: deque[DownloadItem] = deque() # Helper dict for O(1) lookup of pending items by ID @@ -92,7 +92,7 @@ class DownloadService: # Statistics tracking self._total_downloaded_mb: float = 0.0 self._download_speeds: deque[float] = deque(maxlen=10) - + # Load persisted queue self._load_queue() diff --git a/tests/unit/test_series_app.py b/tests/unit/test_series_app.py index a39acff..4819a3b 100644 --- a/tests/unit/test_series_app.py +++ b/tests/unit/test_series_app.py @@ -353,59 +353,6 @@ class TestSeriesAppReScan: assert "cancelled" in result.message.lower() -class TestSeriesAppAsync: - """Test async operations.""" - - @pytest.mark.asyncio - @patch('src.core.SeriesApp.Loaders') - @patch('src.core.SeriesApp.SerieScanner') - @patch('src.core.SeriesApp.SerieList') - async def test_async_download( - self, mock_serie_list, mock_scanner, mock_loaders - ): - """Test async download.""" - test_dir = "/test/anime" - app = SeriesApp(test_dir) - - # Mock download - app.loader.Download = Mock() - - # Perform async download - result = await app.async_download( - "anime_folder", - season=1, - episode=1, - key="anime_key" - ) - - # Verify result - assert isinstance(result, OperationResult) - assert result.success is True - - @pytest.mark.asyncio - @patch('src.core.SeriesApp.Loaders') - @patch('src.core.SeriesApp.SerieScanner') - @patch('src.core.SeriesApp.SerieList') - async def test_async_rescan( - self, mock_serie_list, mock_scanner, mock_loaders - ): - """Test async rescan.""" - test_dir = "/test/anime" - app = SeriesApp(test_dir) - - # Mock scanner - app.SerieScanner.GetTotalToScan = Mock(return_value=5) - app.SerieScanner.Reinit = Mock() - app.SerieScanner.Scan = Mock() - - # Perform async rescan - result = await app.async_rescan() - - # Verify result - assert isinstance(result, OperationResult) - assert result.success is True - - class TestSeriesAppCancellation: """Test operation cancellation."""