""" SeriesApp - Core application logic for anime series management. This module provides the main application interface for searching, downloading, and managing anime series with support for async callbacks, progress reporting, and error handling. Note: This module is pure domain logic with no database dependencies. Database operations are handled by the service layer (AnimeService). """ import asyncio import logging import os from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, List, Optional from events import Events from src.config.settings import settings from src.core.entities.SerieList import SerieList from src.core.entities.series import Serie from src.core.providers.provider_factory import Loaders from src.core.SerieScanner import SerieScanner from src.core.services.nfo_service import NFOService from src.core.services.tmdb_client import TMDBAPIError logger = logging.getLogger(__name__) class DownloadStatusEventArgs: """Event arguments for download status events.""" def __init__( self, serie_folder: str, season: int, episode: int, status: str, key: Optional[str] = None, progress: float = 0.0, message: Optional[str] = None, error: Optional[Exception] = None, eta: Optional[int] = None, mbper_sec: Optional[float] = None, item_id: Optional[str] = None, ): """ Initialize download status event arguments. Args: serie_folder: Serie folder name (metadata only, used for file paths) season: Season number episode: Episode number status: Status message (e.g., "started", "progress", "completed", "failed") key: Serie unique identifier (provider key, primary identifier) progress: Download progress (0.0 to 1.0) message: Optional status message error: Optional error if status is "failed" eta: Estimated time remaining in seconds mbper_sec: Download speed in MB/s item_id: Optional download queue item ID for tracking """ self.serie_folder = serie_folder self.key = key self.season = season self.episode = episode self.status = status self.progress = progress self.message = message self.error = error self.eta = eta self.mbper_sec = mbper_sec self.item_id = item_id class ScanStatusEventArgs: """Event arguments for scan status events.""" def __init__( self, current: int, total: int, folder: str, status: str, key: Optional[str] = None, progress: float = 0.0, message: Optional[str] = None, error: Optional[Exception] = None, ): """ Initialize scan status event arguments. Args: current: Current item being scanned total: Total items to scan folder: Current folder being scanned (metadata only) status: Status message (e.g., "started", "progress", "completed", "failed", "cancelled") key: Serie unique identifier if applicable (provider key, primary identifier) progress: Scan progress (0.0 to 1.0) message: Optional status message error: Optional error if status is "failed" """ self.current = current self.total = total self.folder = folder self.key = key self.status = status self.progress = progress self.message = message self.error = error class SeriesApp: """ Main application class for anime series management. Provides functionality for: - Searching anime series - Downloading episodes - Scanning directories for missing episodes - Managing series lists Supports async callbacks for progress reporting. Note: This class is now pure domain logic with no database dependencies. Database operations are handled by the service layer (AnimeService). Events: download_status: Raised when download status changes. Handler signature: def handler(args: DownloadStatusEventArgs) scan_status: Raised when scan status changes. Handler signature: def handler(args: ScanStatusEventArgs) """ def __init__( self, directory_to_search: str, ): """ Initialize SeriesApp. Args: directory_to_search: Base directory for anime series """ self.directory_to_search = directory_to_search # Initialize thread pool executor self.executor = ThreadPoolExecutor(max_workers=3) # Initialize events self._events = Events() self.loaders = Loaders() self.loader = self.loaders.GetLoader(key="aniworld.to") self.serie_scanner = SerieScanner( directory_to_search, self.loader ) # Skip automatic loading from data files - series will be loaded # from database by the service layer during application setup self.list = SerieList(self.directory_to_search, skip_load=True) self.series_list: List[Any] = [] # Initialize empty list - series loaded later via load_series_from_list() # No need to call _init_list_sync() anymore # Initialize NFO service if TMDB API key is configured self.nfo_service: Optional[NFOService] = None if settings.tmdb_api_key: try: from src.core.services.nfo_factory import get_nfo_factory factory = get_nfo_factory() self.nfo_service = factory.create() logger.info("NFO service initialized successfully") except (ValueError, Exception) as e: # pylint: disable=broad-except logger.warning( "Failed to initialize NFO service: %s", str(e) ) self.nfo_service = None logger.info( "SeriesApp initialized for directory: %s", directory_to_search ) @property def download_status(self): """ Event raised when download status changes. Subscribe using: app.download_status += handler """ return self._events.download_status @download_status.setter def download_status(self, value): """Set download_status event handler.""" self._events.download_status = value @property def scan_status(self): """ Event raised when scan status changes. Subscribe using: app.scan_status += handler """ return self._events.scan_status @scan_status.setter def scan_status(self, value): """Set scan_status event handler.""" self._events.scan_status = value def load_series_from_list(self, series: list) -> None: """ Load series into the in-memory list. This method is called by the service layer after loading series from the database. Args: series: List of Serie objects to load """ self.list.keyDict.clear() for serie in series: self.list.keyDict[serie.key] = serie self.series_list = self.list.GetMissingEpisode() logger.debug( "Loaded %d series with %d having missing episodes", len(series), len(self.series_list) ) async def search(self, words: str) -> List[Dict[str, Any]]: """ Search for anime series (async). Args: words: Search query Returns: List of search results Raises: RuntimeError: If search fails """ logger.info("Searching for: %s", words) loop = asyncio.get_running_loop() results = await loop.run_in_executor( self.executor, self.loader.search, words ) logger.info("Found %d results", len(results)) return results async def download( self, serie_folder: str, season: int, episode: int, key: str, language: str = "German Dub", item_id: Optional[str] = None, ) -> bool: """ Download an episode (async). Args: serie_folder: Serie folder name (metadata only, used for file path construction) season: Season number episode: Episode number key: Serie unique identifier (provider key, primary identifier for lookups) language: Language preference item_id: Optional download queue item ID for progress tracking Returns: True if download succeeded, False otherwise Note: The 'key' parameter is the primary identifier for series lookups. The 'serie_folder' parameter is only used for filesystem operations. """ logger.info( "Starting download: %s (key: %s) S%02dE%02d", serie_folder, key, season, episode ) # Fire download started event self._events.download_status( DownloadStatusEventArgs( serie_folder=serie_folder, key=key, season=season, episode=episode, status="started", message="Download started", item_id=item_id, ) ) # Create series folder if it doesn't exist folder_path = os.path.join(self.directory_to_search, serie_folder) if not os.path.exists(folder_path): try: os.makedirs(folder_path, exist_ok=True) logger.info( "Created series folder: %s (key: %s)", folder_path, key ) except OSError as e: logger.error( "Failed to create series folder %s: %s", folder_path, str(e) ) # Fire download failed event self._events.download_status( DownloadStatusEventArgs( serie_folder=serie_folder, key=key, season=season, episode=episode, status="failed", message=f"Failed to create folder: {str(e)}", item_id=item_id, ) ) return False # Check and create NFO files if needed if self.nfo_service and settings.nfo_auto_create: try: # Check if NFO exists nfo_exists = await self.nfo_service.check_nfo_exists( serie_folder ) if not nfo_exists: logger.info( "NFO not found for %s, creating metadata...", serie_folder ) # Fire NFO creation started event self._events.download_status( DownloadStatusEventArgs( serie_folder=serie_folder, key=key, season=season, episode=episode, status="nfo_creating", message="Creating NFO metadata...", item_id=item_id, ) ) # Create NFO and download media files try: # Use folder name as series name await self.nfo_service.create_tvshow_nfo( serie_name=serie_folder, serie_folder=serie_folder, download_poster=settings.nfo_download_poster, download_logo=settings.nfo_download_logo, download_fanart=settings.nfo_download_fanart ) logger.info( "NFO and media files created for %s", serie_folder ) # Fire NFO creation completed event self._events.download_status( DownloadStatusEventArgs( serie_folder=serie_folder, key=key, season=season, episode=episode, status="nfo_completed", message="NFO metadata created", item_id=item_id, ) ) except TMDBAPIError as tmdb_error: logger.warning( "Failed to create NFO for %s: %s", serie_folder, str(tmdb_error) ) # Fire failed event (but continue with download) self._events.download_status( DownloadStatusEventArgs( serie_folder=serie_folder, key=key, season=season, episode=episode, status="nfo_failed", message=( f"NFO creation failed: " f"{str(tmdb_error)}" ), item_id=item_id, ) ) else: logger.debug("NFO already exists for %s", serie_folder) except Exception as nfo_error: # pylint: disable=broad-except logger.error( "Error checking/creating NFO for %s: %s", serie_folder, str(nfo_error), exc_info=True ) # Don't fail the download if NFO creation fails try: def download_progress_handler(progress_info): """Handle download progress events from loader.""" logger.debug( "download_progress_handler called with: %s", progress_info ) downloaded = progress_info.get('downloaded_bytes', 0) total_bytes = ( progress_info.get('total_bytes') or progress_info.get('total_bytes_estimate', 0) ) speed = progress_info.get('speed', 0) # bytes/sec eta = progress_info.get('eta') # seconds mbper_sec = speed / (1024 * 1024) if speed else None self._events.download_status( DownloadStatusEventArgs( serie_folder=serie_folder, key=key, season=season, episode=episode, status="progress", message="Download progress", progress=( (downloaded / total_bytes) * 100 if total_bytes else 0 ), eta=eta, mbper_sec=mbper_sec, item_id=item_id, ) ) # Subscribe to loader's download progress events self.loader.subscribe_download_progress(download_progress_handler) try: # Perform download in thread to avoid blocking event loop loop = asyncio.get_running_loop() download_success = await loop.run_in_executor( self.executor, self.loader.download, self.directory_to_search, serie_folder, season, episode, key, language ) finally: # Always unsubscribe after download completes or fails self.loader.unsubscribe_download_progress( download_progress_handler ) if download_success: logger.info( "Download completed: %s (key: %s) S%02dE%02d", serie_folder, key, season, episode ) # Fire download completed event self._events.download_status( DownloadStatusEventArgs( serie_folder=serie_folder, key=key, season=season, episode=episode, status="completed", progress=1.0, message="Download completed successfully", item_id=item_id, ) ) else: logger.warning( "Download failed: %s (key: %s) S%02dE%02d", serie_folder, key, season, episode ) # Fire download failed event self._events.download_status( DownloadStatusEventArgs( serie_folder=serie_folder, key=key, season=season, episode=episode, status="failed", message="Download failed", item_id=item_id, ) ) return download_success except InterruptedError: # Download was cancelled - propagate the cancellation logger.info( "Download cancelled: %s (key: %s) S%02dE%02d", serie_folder, key, season, episode, ) # Fire download cancelled event self._events.download_status( DownloadStatusEventArgs( serie_folder=serie_folder, key=key, season=season, episode=episode, status="cancelled", message="Download cancelled by user", item_id=item_id, ) ) raise # Re-raise to propagate cancellation except Exception as e: # pylint: disable=broad-except logger.error( "Download error: %s (key: %s) S%02dE%02d - %s", serie_folder, key, season, episode, str(e), exc_info=True, ) # Fire download error event self._events.download_status( DownloadStatusEventArgs( serie_folder=serie_folder, key=key, season=season, episode=episode, status="failed", error=e, message=f"Download error: {str(e)}", item_id=item_id, ) ) return False async def rescan(self) -> list: """ Rescan directory for missing episodes (async). This method performs a file-based scan and returns the results. Database persistence is handled by the service layer (AnimeService). Returns: List of Serie objects found during scan with their missing episodes. Note: This method no longer saves to database directly. The returned list should be persisted by the caller (AnimeService). """ logger.info("Starting directory rescan") total_to_scan = 0 try: # Get total items to scan logger.info("Getting total items to scan...") loop = asyncio.get_running_loop() total_to_scan = await loop.run_in_executor( self.executor, self.serie_scanner.get_total_to_scan ) logger.info("Total folders to scan: %d", total_to_scan) # Fire scan started event logger.info( "Firing scan_status 'started' event, handler=%s", self._events.scan_status ) self._events.scan_status( ScanStatusEventArgs( current=0, total=total_to_scan, folder="", status="started", progress=0.0, message="Scan started", ) ) # Reinitialize scanner await loop.run_in_executor( self.executor, self.serie_scanner.reinit ) def scan_progress_handler(progress_data): """Handle scan progress events from scanner.""" # Fire scan progress event message = progress_data.get('message', '') folder = message.replace('Scanning: ', '') self._events.scan_status( ScanStatusEventArgs( current=progress_data.get('current', 0), total=progress_data.get('total', total_to_scan), folder=folder, status="progress", progress=( progress_data.get('percentage', 0.0) / 100.0 ), message=message, ) ) # Subscribe to scanner's progress events self.serie_scanner.subscribe_on_progress(scan_progress_handler) try: # Perform scan (file-based, returns results in scanner.keyDict) await loop.run_in_executor( self.executor, self.serie_scanner.scan ) finally: # Always unsubscribe after scan completes or fails self.serie_scanner.unsubscribe_on_progress( scan_progress_handler ) # Get scanned series from scanner scanned_series = list(self.serie_scanner.keyDict.values()) # Update in-memory list with scan results self.list.keyDict.clear() for serie in scanned_series: self.list.keyDict[serie.key] = serie self.series_list = self.list.GetMissingEpisode() logger.info("Directory rescan completed successfully") # Fire scan completed event logger.info( "Firing scan_status 'completed' event, handler=%s", self._events.scan_status ) self._events.scan_status( ScanStatusEventArgs( current=total_to_scan, total=total_to_scan, folder="", status="completed", progress=1.0, message=( f"Scan completed. Found {len(self.series_list)} " "series with missing episodes." ), ) ) return scanned_series except InterruptedError: logger.warning("Scan cancelled by user") # Fire scan cancelled event self._events.scan_status( ScanStatusEventArgs( current=0, total=total_to_scan, folder="", status="cancelled", message="Scan cancelled by user", ) ) raise except Exception as e: logger.error("Scan error: %s", str(e), exc_info=True) # Fire scan failed event self._events.scan_status( ScanStatusEventArgs( current=0, total=total_to_scan, folder="", status="failed", error=e, message=f"Scan error: {str(e)}", ) ) raise async def get_series_list(self) -> List[Any]: """ Get the current series list (async). Returns: List of series with missing episodes """ return self.series_list async def refresh_series_list(self) -> None: """ Reload the cached series list from the underlying data store. This is an async operation. """ await self._init_list() def _get_serie_by_key(self, key: str) -> Optional[Serie]: """ Get a series by its unique provider key. This is the primary method for series lookups within SeriesApp. Args: key: The unique provider identifier (e.g., "attack-on-titan") Returns: The Serie instance if found, None otherwise Note: This method uses the SerieList.get_by_key() method which looks up series by their unique key, not by folder name. """ return self.list.get_by_key(key) def get_all_series_from_data_files(self) -> List[Serie]: """ Get all series from data files in the anime directory. Scans the directory_to_search for all 'data' files and loads the Serie metadata from each file. This method is synchronous and can be wrapped with asyncio.to_thread if needed for async contexts. Returns: List of Serie objects found in data files. Returns an empty list if no data files are found or if the directory doesn't exist. Example: series_app = SeriesApp("/path/to/anime") all_series = series_app.get_all_series_from_data_files() for serie in all_series: print(f"Found: {serie.name} (key={serie.key})") """ logger.info( "Scanning for data files in directory: %s", self.directory_to_search ) # Create a fresh SerieList instance for file-based loading # This ensures we get all series from data files without # interfering with the main instance's state try: temp_list = SerieList( self.directory_to_search, skip_load=False # Allow automatic loading ) except (OSError, ValueError) as e: logger.error( "Failed to scan directory for data files: %s", str(e), exc_info=True ) return [] # Get all series from the temporary list all_series = temp_list.get_all() logger.info( "Found %d series from data files in %s", len(all_series), self.directory_to_search ) return all_series def shutdown(self) -> None: """ Shutdown the thread pool executor. Should be called when the SeriesApp instance is no longer needed to properly clean up resources. """ if hasattr(self, 'executor'): self.executor.shutdown(wait=True) logger.info("ThreadPoolExecutor shut down successfully")