diff --git a/src/core/SerieScanner.py b/src/core/SerieScanner.py index 3697a16..a30cd80 100644 --- a/src/core/SerieScanner.py +++ b/src/core/SerieScanner.py @@ -15,18 +15,12 @@ import os import re import traceback import uuid -from typing import Callable, Iterable, Iterator, Optional +from typing import Iterable, Iterator, Optional + +from events import Events from src.core.entities.series import Serie from src.core.exceptions.Exceptions import MatchNotFoundError, NoKeyFoundException -from src.core.interfaces.callbacks import ( - CallbackManager, - CompletionContext, - ErrorContext, - OperationType, - ProgressContext, - ProgressPhase, -) from src.core.providers.base_provider import Loader logger = logging.getLogger(__name__) @@ -55,7 +49,6 @@ class SerieScanner: self, basePath: str, loader: Loader, - callback_manager: Optional[CallbackManager] = None, ) -> None: """ Initialize the SerieScanner. @@ -82,18 +75,76 @@ class SerieScanner: self.directory: str = abs_path self.keyDict: dict[str, Serie] = {} self.loader: Loader = loader - self._callback_manager: CallbackManager = ( - callback_manager or CallbackManager() - ) self._current_operation_id: Optional[str] = None + self.events = Events() + + self.events.on_progress = None + self.events.on_error = None + self.events.on_completion = None logger.info("Initialized SerieScanner with base path: %s", abs_path) + + def _safe_call_event(self, event_handler, data: dict) -> None: + """Safely call an event handler if it exists. + + Args: + event_handler: Event handler attribute (e.g., self.events.on_progress) + data: Data dictionary to pass to the event handler + """ + if event_handler: + try: + event_handler(data) + except Exception as e: + logger.error("Error calling event handler: %s", e, exc_info=True) - @property - def callback_manager(self) -> CallbackManager: - """Get the callback manager instance.""" - return self._callback_manager + def subscribe_on_progress(self, handler): + """ + Subscribe a handler to an event. + Args: + handler: Callable to handle the event + """ + self.events.on_progress += handler + def unsubscribe_on_progress(self, handler): + """ + Unsubscribe a handler from an event. + Args: + handler: Callable to remove + """ + self.events.on_progress += handler + + def subscribe_on_error(self, handler): + """ + Subscribe a handler to an event. + Args: + handler: Callable to handle the event + """ + self.events.on_error += handler + + def unsubscribe_on_error(self, handler): + """ + Unsubscribe a handler from an event. + Args: + handler: Callable to remove + """ + self.events.on_error += handler + + def subscribe_on_completion(self, handler): + """ + Subscribe a handler to an event. + Args: + handler: Callable to handle the event + """ + self.events.on_completion += handler + + def unsubscribe_on_completion(self, handler): + """ + Unsubscribe a handler from an event. + Args: + handler: Callable to remove + """ + self.events.on_completion += handler + def reinit(self) -> None: """Reinitialize the series dictionary (keyed by serie.key).""" self.keyDict: dict[str, Serie] = {} @@ -107,20 +158,13 @@ class SerieScanner: result = self.__find_mp4_files() return sum(1 for _ in result) - def scan( - self, - callback: Optional[Callable[[str, int], None]] = None - ) -> None: + def scan(self) -> None: """ Scan directories for anime series and missing episodes. Results are stored in self.keyDict and can be retrieved after scanning. Data files are also saved to disk for persistence. - Args: - callback: Optional callback function (folder, count) for - progress updates - Raises: Exception: If scan fails critically """ @@ -130,16 +174,16 @@ class SerieScanner: logger.info("Starting scan for missing episodes") # Notify scan starting - self._callback_manager.notify_progress( - ProgressContext( - operation_type=OperationType.SCAN, - operation_id=self._current_operation_id, - phase=ProgressPhase.STARTING, - current=0, - total=0, - percentage=0.0, - message="Initializing scan" - ) + self._safe_call_event( + self.events.on_progress, + { + "operation_id": self._current_operation_id, + "phase": "STARTING", + "current": 0, + "total": 0, + "percentage": 0.0, + "message": "Initializing scan" + } ) try: @@ -163,27 +207,20 @@ class SerieScanner: else: percentage = 0.0 - # Progress is surfaced both through the callback manager - # (for the web/UI layer) and, for compatibility, through a - # legacy callback that updates CLI progress bars. # Notify progress - self._callback_manager.notify_progress( - ProgressContext( - operation_type=OperationType.SCAN, - operation_id=self._current_operation_id, - phase=ProgressPhase.IN_PROGRESS, - current=counter, - total=total_to_scan, - percentage=percentage, - message=f"Scanning: {folder}", - details=f"Found {len(mp4_files)} episodes" - ) + self._safe_call_event( + self.events.on_progress, + { + "operation_id": self._current_operation_id, + "phase": "IN_PROGRESS", + "current": counter, + "total": total_to_scan, + "percentage": percentage, + "message": f"Scanning: {folder}", + "details": f"Found {len(mp4_files)} episodes" + } ) - # Call legacy callback if provided - if callback: - callback(folder, counter) - serie = self.__read_data_from_file(folder) if ( serie is not None @@ -230,15 +267,15 @@ class SerieScanner: error_msg = f"Error processing folder '{folder}': {nkfe}" logger.error(error_msg) - self._callback_manager.notify_error( - ErrorContext( - operation_type=OperationType.SCAN, - operation_id=self._current_operation_id, - error=nkfe, - message=error_msg, - recoverable=True, - metadata={"folder": folder, "key": None} - ) + self._safe_call_event( + self.events.on_error, + { + "operation_id": self._current_operation_id, + "error": nkfe, + "message": error_msg, + "recoverable": True, + "metadata": {"folder": folder, "key": None} + } ) except Exception as e: # Log error and notify via callback @@ -252,30 +289,30 @@ class SerieScanner: traceback.format_exc() ) - self._callback_manager.notify_error( - ErrorContext( - operation_type=OperationType.SCAN, - operation_id=self._current_operation_id, - error=e, - message=error_msg, - recoverable=True, - metadata={"folder": folder, "key": None} - ) + self._safe_call_event( + self.events.on_error, + { + "operation_id": self._current_operation_id, + "error": e, + "message": error_msg, + "recoverable": True, + "metadata": {"folder": folder, "key": None} + } ) continue # Notify scan completion - self._callback_manager.notify_completion( - CompletionContext( - operation_type=OperationType.SCAN, - operation_id=self._current_operation_id, - success=True, - message=f"Scan completed. Processed {counter} folders.", - statistics={ + self._safe_call_event( + self.events.on_completion, + { + "operation_id": self._current_operation_id, + "success": True, + "message": f"Scan completed. Processed {counter} folders.", + "statistics": { "total_folders": counter, "series_found": len(self.keyDict) } - ) + } ) logger.info( @@ -289,23 +326,23 @@ class SerieScanner: error_msg = f"Critical scan error: {e}" logger.error("%s\n%s", error_msg, traceback.format_exc()) - self._callback_manager.notify_error( - ErrorContext( - operation_type=OperationType.SCAN, - operation_id=self._current_operation_id, - error=e, - message=error_msg, - recoverable=False - ) + self._safe_call_event( + self.events.on_error, + { + "operation_id": self._current_operation_id, + "error": e, + "message": error_msg, + "recoverable": False + } ) - self._callback_manager.notify_completion( - CompletionContext( - operation_type=OperationType.SCAN, - operation_id=self._current_operation_id, - success=False, - message=error_msg - ) + self._safe_call_event( + self.events.on_completion, + { + "operation_id": self._current_operation_id, + "success": False, + "message": error_msg + } ) raise @@ -325,16 +362,6 @@ class SerieScanner: has_files = True yield anime_name, mp4_files if has_files else [] - def __remove_year(self, input_string: str) -> str: - """Remove year information from input string.""" - cleaned_string = re.sub(r'\(\d{4}\)', '', input_string).strip() - logger.debug( - "Removed year from '%s' -> '%s'", - input_string, - cleaned_string - ) - return cleaned_string - def __read_data_from_file(self, folder_name: str) -> Optional[Serie]: """Read serie data from file or key file. @@ -507,19 +534,18 @@ class SerieScanner: # Generate unique operation ID for this targeted scan operation_id = str(uuid.uuid4()) - # Notify scan starting - self._callback_manager.notify_progress( - ProgressContext( - operation_type=OperationType.SCAN, - operation_id=operation_id, - phase=ProgressPhase.STARTING, - current=0, - total=1, - percentage=0.0, - message=f"Scanning series: {folder}", - details=f"Key: {key}" - ) + self._safe_call_event( + self.events.on_progress, + { + "operation_id": operation_id, + "phase": "STARTING", + "current": 0, + "total": 1, + "percentage": 0.0, + "message": f"Scanning series: {folder}", + "details": f"Key: {key}" + } ) try: @@ -554,17 +580,17 @@ class SerieScanner: ) # Update progress - self._callback_manager.notify_progress( - ProgressContext( - operation_type=OperationType.SCAN, - operation_id=operation_id, - phase=ProgressPhase.IN_PROGRESS, - current=1, - total=1, - percentage=100.0, - message=f"Scanned: {folder}", - details=f"Found {sum(len(eps) for eps in missing_episodes.values())} missing episodes" - ) + self._safe_call_event( + self.events.on_progress, + { + "operation_id": operation_id, + "phase": "IN_PROGRESS", + "current": 1, + "total": 1, + "percentage": 100.0, + "message": f"Scanned: {folder}", + "details": f"Found {sum(len(eps) for eps in missing_episodes.values())} missing episodes" + } ) # Create or update Serie in keyDict @@ -593,19 +619,19 @@ class SerieScanner: ) # Notify completion - self._callback_manager.notify_completion( - CompletionContext( - operation_type=OperationType.SCAN, - operation_id=operation_id, - success=True, - message=f"Scan completed for {folder}", - statistics={ + self._safe_call_event( + self.events.on_completion, + { + "operation_id": operation_id, + "success": True, + "message": f"Scan completed for {folder}", + "statistics": { "missing_episodes": sum( len(eps) for eps in missing_episodes.values() ), "seasons_with_missing": len(missing_episodes) } - ) + } ) logger.info( @@ -622,27 +648,25 @@ class SerieScanner: logger.error(error_msg, exc_info=True) # Notify error - self._callback_manager.notify_error( - ErrorContext( - operation_type=OperationType.SCAN, - operation_id=operation_id, - error=e, - message=error_msg, - recoverable=True, - metadata={"key": key, "folder": folder} - ) + self._safe_call_event( + self.events.on_error, + { + "operation_id": operation_id, + "error": e, + "message": error_msg, + "recoverable": True, + "metadata": {"key": key, "folder": folder} + } ) - # Notify completion with failure - self._callback_manager.notify_completion( - CompletionContext( - operation_type=OperationType.SCAN, - operation_id=operation_id, - success=False, - message=error_msg - ) + self._safe_call_event( + self.events.on_completion, + { + "operation_id": operation_id, + "success": False, + "message": error_msg + } ) - # Return empty dict on error (scan failed but not critical) return {} diff --git a/src/core/SeriesApp.py b/src/core/SeriesApp.py index 2bc02dc..acb417f 100644 --- a/src/core/SeriesApp.py +++ b/src/core/SeriesApp.py @@ -309,9 +309,10 @@ class SeriesApp: ) try: - def download_callback(progress_info): + def download_progress_handler(progress_info): + """Handle download progress events from loader.""" logger.debug( - "wrapped_callback called with: %s", progress_info + "download_progress_handler called with: %s", progress_info ) downloaded = progress_info.get('downloaded_bytes', 0) @@ -341,17 +342,26 @@ class SeriesApp: item_id=item_id, ) ) - # Perform download in thread to avoid blocking event loop - download_success = await asyncio.to_thread( - self.loader.download, - self.directory_to_search, - serie_folder, - season, - episode, - key, - language, - download_callback - ) + + # Subscribe to loader's download progress events + self.loader.subscribe_download_progress(download_progress_handler) + + try: + # Perform download in thread to avoid blocking event loop + download_success = await asyncio.to_thread( + self.loader.download, + self.directory_to_search, + serie_folder, + season, + episode, + key, + language + ) + finally: + # Always unsubscribe after download completes or fails + self.loader.unsubscribe_download_progress( + download_progress_handler + ) if download_success: logger.info( @@ -495,29 +505,35 @@ class SeriesApp: # Reinitialize scanner await asyncio.to_thread(self.serie_scanner.reinit) - def scan_callback(folder: str, current: int): - # Calculate progress - if total_to_scan > 0: - progress = current / total_to_scan - else: - progress = 0.0 - + def scan_progress_handler(progress_data): + """Handle scan progress events from scanner.""" # Fire scan progress event + message = progress_data.get('message', '') + folder = message.replace('Scanning: ', '') self._events.scan_status( ScanStatusEventArgs( - current=current, - total=total_to_scan, + current=progress_data.get('current', 0), + total=progress_data.get('total', total_to_scan), folder=folder, status="progress", - progress=progress, - message=f"Scanning: {folder}", + progress=( + progress_data.get('percentage', 0.0) / 100.0 + ), + message=message, ) ) - # Perform scan (file-based, returns results in scanner.keyDict) - await asyncio.to_thread( - self.serie_scanner.scan, scan_callback - ) + # Subscribe to scanner's progress events + self.serie_scanner.subscribe_on_progress(scan_progress_handler) + + try: + # Perform scan (file-based, returns results in scanner.keyDict) + await asyncio.to_thread(self.serie_scanner.scan) + finally: + # Always unsubscribe after scan completes or fails + self.serie_scanner.unsubscribe_on_progress( + scan_progress_handler + ) # Get scanned series from scanner scanned_series = list(self.serie_scanner.keyDict.values()) diff --git a/src/core/providers/aniworld_provider.py b/src/core/providers/aniworld_provider.py index 8d7d11b..8b7faba 100644 --- a/src/core/providers/aniworld_provider.py +++ b/src/core/providers/aniworld_provider.py @@ -1,17 +1,17 @@ + import html import json import logging import os import re import shutil -import signal -import sys import threading from pathlib import Path from urllib.parse import quote import requests from bs4 import BeautifulSoup +from events import Events from fake_useragent import UserAgent from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry @@ -74,7 +74,7 @@ class AniworldLoader(Loader): } self.ANIWORLD_TO = "https://aniworld.to" self.session = requests.Session() - + # Cancellation flag for graceful shutdown self._cancel_flag = threading.Event() @@ -98,6 +98,25 @@ class AniworldLoader(Loader): self._EpisodeHTMLDict = {} self.Providers = Providers() + # Events: download_progress is triggered with progress dict + self.events = Events() + + self.events.download_progress = None + + def subscribe_download_progress(self, handler): + """Subscribe a handler to the download_progress event. + Args: + handler: Callable to be called with progress dict. + """ + self.events.download_progress += handler + + def unsubscribe_download_progress(self, handler): + """Unsubscribe a handler from the download_progress event. + Args: + handler: Callable previously subscribed. + """ + self.events.download_progress -= handler + def clear_cache(self): """Clear the cached HTML data.""" logging.debug("Clearing HTML cache") @@ -203,7 +222,7 @@ class AniworldLoader(Loader): is_available = language_code in languages logging.debug(f"Available languages for S{season:02}E{episode:03}: {languages}, requested: {language_code}, available: {is_available}") - return is_available + return is_available def download( self, @@ -212,8 +231,7 @@ class AniworldLoader(Loader): season: int, episode: int, key: str, - language: str = "German Dub", - progress_callback=None + language: str = "German Dub" ) -> bool: """Download episode to specified directory. @@ -226,19 +244,9 @@ class AniworldLoader(Loader): key: Series unique identifier from provider (used for identification and API calls) language: Audio language preference (default: German Dub) - progress_callback: Optional callback for download progress - Returns: bool: True if download succeeded, False otherwise - - Raises: - asyncio.CancelledError: If download was cancelled via request_cancel() """ - # Check cancellation before starting - if self.is_cancelled(): - logging.info("Download cancelled before starting") - raise InterruptedError("Download cancelled") - logging.info( f"Starting download for S{season:02}E{episode:03} " f"({key}) in {language}" @@ -276,31 +284,21 @@ class AniworldLoader(Loader): logging.debug(f"Temporary path: {temp_path}") for provider in self.SUPPORTED_PROVIDERS: - # Check cancellation before each provider attempt - if self.is_cancelled(): - logging.info("Download cancelled during provider selection") - raise InterruptedError("Download cancelled") - logging.debug(f"Attempting download with provider: {provider}") link, header = self._get_direct_link_from_provider( season, episode, key, language ) logging.debug("Direct link obtained from provider") - - # Create a cancellation-aware progress hook using DownloadCancelled - # which YT-DLP properly handles + cancel_flag = self._cancel_flag - - def cancellation_check_hook(d): - """Progress hook that checks for cancellation. - - Uses yt_dlp.utils.DownloadCancelled which is properly - handled by YT-DLP to abort downloads immediately. - """ + + def events_progress_hook(d): if cancel_flag.is_set(): logging.info("Cancellation detected in progress hook") raise DownloadCancelled("Download cancelled by user") - + # Fire the event for progress + self.events.download_progress(d) + ydl_opts = { 'fragment_retries': float('inf'), 'outtmpl': temp_path, @@ -308,36 +306,18 @@ class AniworldLoader(Loader): 'no_warnings': True, 'progress_with_newline': False, 'nocheckcertificate': True, - # Add cancellation check as a progress hook - 'progress_hooks': [cancellation_check_hook], + 'progress_hooks': [events_progress_hook], } if header: ydl_opts['http_headers'] = header logging.debug("Using custom headers for download") - if progress_callback: - # Wrap the callback to add logging and keep cancellation check - def logged_progress_callback(d): - # Check cancellation first - use DownloadCancelled - if cancel_flag.is_set(): - logging.info("Cancellation detected in progress callback") - raise DownloadCancelled("Download cancelled by user") - logging.debug( - f"YT-DLP progress: status={d.get('status')}, " - f"downloaded={d.get('downloaded_bytes')}, " - f"total={d.get('total_bytes')}, " - f"speed={d.get('speed')}" - ) - progress_callback(d) - - ydl_opts['progress_hooks'] = [logged_progress_callback] - logging.debug("Progress callback registered with YT-DLP") try: logging.debug("Starting YoutubeDL download") logging.debug(f"Download link: {link[:100]}...") logging.debug(f"YDL options: {ydl_opts}") - + with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(link, download=True) logging.debug( @@ -346,14 +326,6 @@ class AniworldLoader(Loader): f"filesize={info.get('filesize')}" ) - # Check cancellation after download completes - if self.is_cancelled(): - logging.info("Download cancelled after completion") - # Clean up temp file if exists - if os.path.exists(temp_path): - os.remove(temp_path) - raise InterruptedError("Download cancelled") - if os.path.exists(temp_path): logging.debug("Moving file from temp to final destination") shutil.copy(temp_path, output_path) @@ -369,44 +341,20 @@ class AniworldLoader(Loader): ) self.clear_cache() return False - except (InterruptedError, DownloadCancelled) as e: - # Re-raise cancellation errors - logging.info( - "Download cancelled: %s, propagating cancellation", - type(e).__name__ - ) - # Clean up temp file if exists - if os.path.exists(temp_path): - try: - os.remove(temp_path) - except OSError: - pass - raise InterruptedError("Download cancelled") from e except BrokenPipeError as e: logging.error( f"Broken pipe error with provider {provider}: {e}. " f"This usually means the stream connection was closed." ) - # Try next provider if available continue except Exception as e: - # Check if this is a cancellation wrapped in another exception - if self.is_cancelled(): - logging.info("Download cancelled (detected in exception handler)") - if os.path.exists(temp_path): - try: - os.remove(temp_path) - except OSError: - pass - raise InterruptedError("Download cancelled") from e logging.error( f"YoutubeDL download failed with provider {provider}: " f"{type(e).__name__}: {e}" ) - # Try next provider if available continue break - + # If we get here, all providers failed logging.error("All download providers failed") self.clear_cache() diff --git a/src/core/providers/base_provider.py b/src/core/providers/base_provider.py index fa0a549..5ecd51b 100644 --- a/src/core/providers/base_provider.py +++ b/src/core/providers/base_provider.py @@ -1,20 +1,20 @@ from abc import ABC, abstractmethod -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Dict, List class Loader(ABC): """Abstract base class for anime data loaders/providers.""" @abstractmethod - def subscribe_download_progress(self, handler): - """Subscribe a handler to the download_progress event. - Args: - handler: Callable to be called with progress dict. + def subscribe_download_progress(self, handler): + """Subscribe a handler to the download_progress event. + Args: + handler: Callable to be called with progress dict. """ @abstractmethod - def unsubscribe_download_progress(self, handler): - """Unsubscribe a handler from the download_progress event. - Args: - handler: Callable previously subscribed. + def unsubscribe_download_progress(self, handler): + """Unsubscribe a handler from the download_progress event. + Args: + handler: Callable previously subscribed. """ @abstractmethod @@ -56,8 +56,7 @@ class Loader(ABC): season: int, episode: int, key: str, - language: str = "German Dub", - progress_callback: Optional[Callable[[str, Dict], None]] = None, + language: str = "German Dub" ) -> bool: """Download episode to specified directory. @@ -68,8 +67,6 @@ class Loader(ABC): episode: Episode number within season key: Unique series identifier/key language: Language version to download (default: German Dub) - progress_callback: Optional callback for progress updates - called with (event_type: str, data: Dict) Returns: True if download successful, False otherwise diff --git a/src/server/services/download_service.py b/src/server/services/download_service.py index 60a011b..ddd7b48 100644 --- a/src/server/services/download_service.py +++ b/src/server/services/download_service.py @@ -1007,92 +1007,7 @@ class DownloadService: if self._active_download and self._active_download.id == item.id: self._active_download = None - async def start(self) -> None: - """Initialize the download queue service (compatibility method). - - Note: Downloads are started manually via start_next_download(). - """ - logger.info("Download queue service initialized") - async def stop(self, timeout: float = 10.0) -> None: - """Stop the download queue service gracefully. - - Persists in-progress downloads back to pending state, cancels active - tasks, and shuts down the thread pool with a timeout. - - Args: - timeout: Maximum time (seconds) to wait for executor shutdown - """ - logger.info("Stopping download queue service (timeout=%.1fs)...", timeout) - - # Set shutdown flag first to prevent new downloads - self._is_shutting_down = True - self._is_stopped = True - - # Request cancellation from AnimeService (signals the download thread) - try: - self._anime_service.request_download_cancel() - logger.info("Requested download cancellation from AnimeService") - except Exception as e: - logger.warning("Failed to request download cancellation: %s", e) - - # Persist active download back to pending state if one exists - if self._active_download: - logger.info( - "Persisting active download to pending: item_id=%s", - self._active_download.id - ) - try: - # Reset status to pending so it can be resumed on restart - self._active_download.status = DownloadStatus.PENDING - self._active_download.completed_at = None - await self._save_to_database(self._active_download) - logger.info("Active download persisted to database as pending") - except Exception as e: - logger.error("Failed to persist active download: %s", e) - - # Cancel active download task if running - active_task = self._active_download_task - if active_task and not active_task.done(): - logger.info("Cancelling active download task...") - active_task.cancel() - try: - # Wait briefly for cancellation to complete - await asyncio.wait_for( - asyncio.shield(active_task), - timeout=2.0 - ) - except asyncio.TimeoutError: - logger.warning("Download task cancellation timed out") - except asyncio.CancelledError: - logger.info("Active download task cancelled") - except Exception as e: - logger.warning("Error during task cancellation: %s", e) - - # Shutdown executor with wait and timeout - logger.info("Shutting down thread pool executor...") - try: - # Run executor shutdown in thread to avoid blocking event loop - loop = asyncio.get_event_loop() - await asyncio.wait_for( - loop.run_in_executor( - None, - lambda: self._executor.shutdown(wait=True, cancel_futures=True) - ), - timeout=timeout - ) - logger.info("Thread pool executor shutdown complete") - except asyncio.TimeoutError: - logger.warning( - "Executor shutdown timed out after %.1fs, forcing shutdown", - timeout - ) - # Force shutdown without waiting - self._executor.shutdown(wait=False, cancel_futures=True) - except Exception as e: - logger.error("Error during executor shutdown: %s", e) - - logger.info("Download queue service stopped") # Singleton instance diff --git a/src/server/services/scan_service.py b/src/server/services/scan_service.py index 28c479e..f68eab2 100644 --- a/src/server/services/scan_service.py +++ b/src/server/services/scan_service.py @@ -13,20 +13,8 @@ from typing import Any, Callable, Dict, List, Optional import structlog -from src.core.interfaces.callbacks import ( - CallbackManager, - CompletionCallback, - CompletionContext, - ErrorCallback, - ErrorContext, - OperationType, - ProgressCallback, - ProgressContext, - ProgressPhase, -) from src.server.services.progress_service import ( ProgressService, - ProgressStatus, ProgressType, get_progress_service, ) @@ -104,173 +92,6 @@ class ScanProgress: return result -class ScanServiceProgressCallback(ProgressCallback): - """Callback implementation for forwarding scan progress to ScanService. - - This callback receives progress events from SerieScanner and forwards - them to the ScanService for processing and broadcasting. - """ - - def __init__( - self, - service: "ScanService", - scan_progress: ScanProgress, - ): - """Initialize the callback. - - Args: - service: Parent ScanService instance - scan_progress: ScanProgress to update - """ - self._service = service - self._scan_progress = scan_progress - - def on_progress(self, context: ProgressContext) -> None: - """Handle progress update from SerieScanner. - - Args: - context: Progress context with key and folder information - """ - self._scan_progress.current = context.current - self._scan_progress.total = context.total - self._scan_progress.percentage = context.percentage - self._scan_progress.message = context.message - self._scan_progress.key = context.key - self._scan_progress.folder = context.folder - self._scan_progress.updated_at = datetime.now(timezone.utc) - - if context.phase == ProgressPhase.STARTING: - self._scan_progress.status = "started" - elif context.phase == ProgressPhase.IN_PROGRESS: - self._scan_progress.status = "in_progress" - elif context.phase == ProgressPhase.COMPLETED: - self._scan_progress.status = "completed" - elif context.phase == ProgressPhase.FAILED: - self._scan_progress.status = "failed" - - # Forward to service for broadcasting - # Use run_coroutine_threadsafe if event loop is available - try: - loop = asyncio.get_running_loop() - asyncio.run_coroutine_threadsafe( - self._service._handle_progress_update(self._scan_progress), - loop - ) - except RuntimeError: - # No running event loop - likely in test or sync context - pass - - -class ScanServiceErrorCallback(ErrorCallback): - """Callback implementation for handling scan errors. - - This callback receives error events from SerieScanner and forwards - them to the ScanService for processing and broadcasting. - """ - - def __init__( - self, - service: "ScanService", - scan_progress: ScanProgress, - ): - """Initialize the callback. - - Args: - service: Parent ScanService instance - scan_progress: ScanProgress to update - """ - self._service = service - self._scan_progress = scan_progress - - def on_error(self, context: ErrorContext) -> None: - """Handle error from SerieScanner. - - Args: - context: Error context with key and folder information - """ - error_msg = context.message - if context.folder: - error_msg = f"[{context.folder}] {error_msg}" - - self._scan_progress.errors.append(error_msg) - self._scan_progress.updated_at = datetime.now(timezone.utc) - - logger.warning( - "Scan error", - key=context.key, - folder=context.folder, - error=str(context.error), - recoverable=context.recoverable, - ) - - # Forward to service for broadcasting - # Use run_coroutine_threadsafe if event loop is available - try: - loop = asyncio.get_running_loop() - asyncio.run_coroutine_threadsafe( - self._service._handle_scan_error( - self._scan_progress, - context, - ), - loop - ) - except RuntimeError: - # No running event loop - likely in test or sync context - pass - - -class ScanServiceCompletionCallback(CompletionCallback): - """Callback implementation for handling scan completion. - - This callback receives completion events from SerieScanner and forwards - them to the ScanService for processing and broadcasting. - """ - - def __init__( - self, - service: "ScanService", - scan_progress: ScanProgress, - ): - """Initialize the callback. - - Args: - service: Parent ScanService instance - scan_progress: ScanProgress to update - """ - self._service = service - self._scan_progress = scan_progress - - def on_completion(self, context: CompletionContext) -> None: - """Handle completion from SerieScanner. - - Args: - context: Completion context with statistics - """ - self._scan_progress.status = "completed" if context.success else "failed" - self._scan_progress.message = context.message - self._scan_progress.updated_at = datetime.now(timezone.utc) - - if context.statistics: - self._scan_progress.series_found = context.statistics.get( - "series_found", 0 - ) - - # Forward to service for broadcasting - # Use run_coroutine_threadsafe if event loop is available - try: - loop = asyncio.get_running_loop() - asyncio.run_coroutine_threadsafe( - self._service._handle_scan_completion( - self._scan_progress, - context, - ), - loop - ) - except RuntimeError: - # No running event loop - likely in test or sync context - pass - - class ScanService: """Manages anime library scan operations. @@ -376,13 +197,13 @@ class ScanService: async def start_scan( self, - scanner_factory: Callable[..., Any], + scanner: Any, # SerieScanner instance ) -> str: """Start a new library scan. Args: - scanner_factory: Factory function that creates a SerieScanner. - The factory should accept a callback_manager parameter. + scanner: SerieScanner instance to use for scanning. + The service will subscribe to its events. Returns: Scan ID for tracking @@ -423,42 +244,82 @@ class ScanService: "scan_id": scan_id, "message": "Library scan started", }) + + # Create event handlers for the scanner + def on_progress_handler(progress_data: Dict[str, Any]) -> None: + """Handle progress events from scanner.""" + scan_progress.current = progress_data.get('current', 0) + scan_progress.total = progress_data.get('total', 0) + scan_progress.percentage = progress_data.get('percentage', 0.0) + scan_progress.message = progress_data.get('message', '') + scan_progress.updated_at = datetime.now(timezone.utc) + + phase = progress_data.get('phase', '') + if phase == 'STARTING': + scan_progress.status = "started" + elif phase == 'IN_PROGRESS': + scan_progress.status = "in_progress" + + # Schedule the progress update on the event loop + try: + loop = asyncio.get_running_loop() + asyncio.run_coroutine_threadsafe( + self._handle_progress_update(scan_progress), + loop + ) + except RuntimeError: + pass + + def on_error_handler(error_data: Dict[str, Any]) -> None: + """Handle error events from scanner.""" + error_msg = error_data.get('message', 'Unknown error') + scan_progress.errors.append(error_msg) + scan_progress.updated_at = datetime.now(timezone.utc) + + logger.warning( + "Scan error", + error=str(error_data.get('error')), + recoverable=error_data.get('recoverable', True), + ) + + # Schedule the error handling on the event loop + try: + loop = asyncio.get_running_loop() + asyncio.run_coroutine_threadsafe( + self._handle_scan_error(scan_progress, error_data), + loop + ) + except RuntimeError: + pass + + def on_completion_handler(completion_data: Dict[str, Any]) -> None: + """Handle completion events from scanner.""" + success = completion_data.get('success', False) + scan_progress.status = "completed" if success else "failed" + scan_progress.message = completion_data.get('message', '') + scan_progress.updated_at = datetime.now(timezone.utc) + + if 'statistics' in completion_data: + stats = completion_data['statistics'] + scan_progress.series_found = stats.get('series_found', 0) + + # Schedule the completion handling on the event loop + try: + loop = asyncio.get_running_loop() + asyncio.run_coroutine_threadsafe( + self._handle_scan_completion(scan_progress, completion_data), + loop + ) + except RuntimeError: + pass + + # Subscribe to scanner events + scanner.subscribe_on_progress(on_progress_handler) + scanner.subscribe_on_error(on_error_handler) + scanner.subscribe_on_completion(on_completion_handler) return scan_id - def create_callback_manager( - self, - scan_progress: Optional[ScanProgress] = None, - ) -> CallbackManager: - """Create a callback manager for scan operations. - - Args: - scan_progress: Optional scan progress to use. If None, - uses current scan progress. - - Returns: - CallbackManager configured with scan callbacks - """ - progress = scan_progress or self._current_scan - if not progress: - progress = ScanProgress(str(uuid.uuid4())) - self._current_scan = progress - - callback_manager = CallbackManager() - - # Register callbacks - callback_manager.register_progress_callback( - ScanServiceProgressCallback(self, progress) - ) - callback_manager.register_error_callback( - ScanServiceErrorCallback(self, progress) - ) - callback_manager.register_completion_callback( - ScanServiceCompletionCallback(self, progress) - ) - - return callback_manager - async def _handle_progress_update( self, scan_progress: ScanProgress, @@ -475,8 +336,6 @@ class ScanService: current=scan_progress.current, total=scan_progress.total, message=scan_progress.message, - key=scan_progress.key, - folder=scan_progress.folder, ) except Exception as e: logger.debug("Progress update skipped: %s", e) @@ -490,36 +349,38 @@ class ScanService: async def _handle_scan_error( self, scan_progress: ScanProgress, - error_context: ErrorContext, + error_data: Dict[str, Any], ) -> None: """Handle a scan error. Args: scan_progress: Current scan progress - error_context: Error context with key and folder metadata + error_data: Error data dictionary with error info """ # Emit error event with key as primary identifier await self._emit_scan_event({ "type": "scan_error", "scan_id": scan_progress.scan_id, - "key": error_context.key, - "folder": error_context.folder, - "error": str(error_context.error), - "message": error_context.message, - "recoverable": error_context.recoverable, + "error": str(error_data.get('error')), + "message": error_data.get('message', 'Unknown error'), + "recoverable": error_data.get('recoverable', True), }) async def _handle_scan_completion( self, scan_progress: ScanProgress, - completion_context: CompletionContext, + completion_data: Dict[str, Any], ) -> None: """Handle scan completion. Args: scan_progress: Final scan progress - completion_context: Completion context with statistics + completion_data: Completion data dictionary with statistics """ + success = completion_data.get('success', False) + message = completion_data.get('message', '') + statistics = completion_data.get('statistics', {}) + async with self._lock: self._is_scanning = False @@ -530,33 +391,33 @@ class ScanService: # Complete progress tracking try: - if completion_context.success: + if success: await self._progress_service.complete_progress( progress_id=f"scan_{scan_progress.scan_id}", - message=completion_context.message, + message=message, ) else: await self._progress_service.fail_progress( progress_id=f"scan_{scan_progress.scan_id}", - error_message=completion_context.message, + error_message=message, ) except Exception as e: logger.debug("Progress completion skipped: %s", e) # Emit completion event await self._emit_scan_event({ - "type": "scan_completed" if completion_context.success else "scan_failed", + "type": "scan_completed" if success else "scan_failed", "scan_id": scan_progress.scan_id, - "success": completion_context.success, - "message": completion_context.message, - "statistics": completion_context.statistics, + "success": success, + "message": message, + "statistics": statistics, "data": scan_progress.to_dict(), }) logger.info( "Scan completed", scan_id=scan_progress.scan_id, - success=completion_context.success, + success=success, series_found=scan_progress.series_found, errors_count=len(scan_progress.errors), ) diff --git a/tests/unit/test_scan_service.py b/tests/unit/test_scan_service.py index 759de67..ce02409 100644 --- a/tests/unit/test_scan_service.py +++ b/tests/unit/test_scan_service.py @@ -1,29 +1,17 @@ """Unit tests for ScanService. This module contains comprehensive tests for the scan service, -including scan lifecycle, progress callbacks, event handling, -and key-based identification. +including scan lifecycle, progress events, and key-based identification. """ from datetime import datetime -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, Mock import pytest -from src.core.interfaces.callbacks import ( - CallbackManager, - CompletionContext, - ErrorContext, - OperationType, - ProgressContext, - ProgressPhase, -) from src.server.services.scan_service import ( ScanProgress, ScanService, - ScanServiceCompletionCallback, ScanServiceError, - ScanServiceErrorCallback, - ScanServiceProgressCallback, get_scan_service, reset_scan_service, ) diff --git a/tests/unit/test_series_app.py b/tests/unit/test_series_app.py index e53d30a..10e7b19 100644 --- a/tests/unit/test_series_app.py +++ b/tests/unit/test_series_app.py @@ -188,16 +188,17 @@ class TestSeriesAppDownload: app.loader.download = Mock(side_effect=mock_download_cancelled) - # Perform download - should catch InterruptedError - result = await app.download( - "anime_folder", - season=1, - episode=1, - key="anime_key" - ) + # Perform download - should re-raise InterruptedError + with pytest.raises(InterruptedError): + await app.download( + "anime_folder", + season=1, + episode=1, + key="anime_key" + ) - # Verify cancellation was handled (returns False on error) - assert result is False + # Verify cancellation event was fired + assert app._events.download_status.called @pytest.mark.asyncio @patch('src.core.SeriesApp.Loaders') @@ -264,10 +265,10 @@ class TestSeriesAppReScan: @patch('src.core.SeriesApp.Loaders') @patch('src.core.SeriesApp.SerieScanner') @patch('src.core.SeriesApp.SerieList') - async def test_rescan_with_callback( + async def test_rescan_with_events( self, mock_serie_list, mock_scanner, mock_loaders ): - """Test rescan with progress callbacks.""" + """Test rescan with event progress notifications.""" test_dir = "/test/anime" app = SeriesApp(test_dir) @@ -278,19 +279,19 @@ class TestSeriesAppReScan: app.serie_scanner.get_total_to_scan = Mock(return_value=3) app.serie_scanner.reinit = Mock() app.serie_scanner.keyDict = {} - - def mock_scan(callback): - callback("folder1", 1) - callback("folder2", 2) - callback("folder3", 3) - - app.serie_scanner.scan = Mock(side_effect=mock_scan) + app.serie_scanner.scan = Mock() # Scan no longer takes callback + app.serie_scanner.subscribe_on_progress = Mock() + app.serie_scanner.unsubscribe_on_progress = Mock() # Perform rescan await app.rescan() - # Verify rescan completed + # Verify scanner methods were called correctly + app.serie_scanner.reinit.assert_called_once() app.serie_scanner.scan.assert_called_once() + # Verify event subscription/unsubscription happened + app.serie_scanner.subscribe_on_progress.assert_called_once() + app.serie_scanner.unsubscribe_on_progress.assert_called_once() @pytest.mark.asyncio @patch('src.core.SeriesApp.Loaders')