"""Background loader service for asynchronous series data loading. This service orchestrates background loading of series metadata (episodes, NFO files, logos, images) without blocking the user. It provides a task queue system for managing loading operations and real-time status updates via WebSocket. Key Features: - Asynchronous task queue for series data loading - Reuses existing services (AnimeService, NFOService) to avoid code duplication - Real-time progress updates via WebSocket - Graceful startup and shutdown handling - Error handling with retry logic """ from __future__ import annotations import asyncio from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional import structlog from src.server.services.websocket_service import WebSocketService logger = structlog.get_logger(__name__) class LoadingStatus(str, Enum): """Status of a series loading task.""" PENDING = "pending" LOADING_EPISODES = "loading_episodes" LOADING_NFO = "loading_nfo" LOADING_LOGO = "loading_logo" LOADING_IMAGES = "loading_images" COMPLETED = "completed" FAILED = "failed" @dataclass class SeriesLoadingTask: """Represents a series loading task with progress tracking. Attributes: key: Series unique identifier (primary key) folder: Series folder name (metadata only) name: Series display name year: Series release year status: Current loading status progress: Dict tracking what data has been loaded started_at: When loading started completed_at: When loading completed error: Error message if loading failed """ key: str folder: str name: str year: Optional[int] = None status: LoadingStatus = LoadingStatus.PENDING progress: Dict[str, bool] = field(default_factory=lambda: { "episodes": False, "nfo": False, "logo": False, "images": False }) started_at: Optional[datetime] = None completed_at: Optional[datetime] = None error: Optional[str] = None class BackgroundLoaderService: """Service for managing background loading of series metadata. This service orchestrates asynchronous loading by delegating to existing services (AnimeService for episodes, NFOService for NFO/images) rather than reimplementing logic. It provides task queuing, status tracking, and WebSocket notifications. Supports concurrent processing of multiple series simultaneously for improved performance when adding multiple anime. Attributes: websocket_service: Service for broadcasting status updates anime_service: Service for episode scanning (reused) series_app: Core SeriesApp instance for NFO service access task_queue: Queue of pending loading tasks active_tasks: Dict of currently processing tasks processing_tasks: Dict of asyncio tasks being processed worker_tasks: List of background worker tasks max_concurrent_loads: Maximum number of series to load concurrently """ def __init__( self, websocket_service: WebSocketService, anime_service: Any, # AnimeService - avoiding circular import series_app: Any, # SeriesApp - avoiding circular import max_concurrent_loads: int = 5, ): """Initialize the background loader service. Args: websocket_service: WebSocket service for status broadcasts anime_service: AnimeService instance for episode operations series_app: SeriesApp instance for NFO operations max_concurrent_loads: Maximum number of series to load concurrently (default: 5) """ self.websocket_service = websocket_service self.anime_service = anime_service self.series_app = series_app self.max_concurrent_loads = max_concurrent_loads # Task management self.task_queue: asyncio.Queue[SeriesLoadingTask] = asyncio.Queue() self.active_tasks: Dict[str, SeriesLoadingTask] = {} self.processing_tasks: Dict[str, asyncio.Task] = {} self.worker_tasks: List[asyncio.Task] = [] self._shutdown = False logger.info( "BackgroundLoaderService initialized", extra={"max_concurrent_loads": max_concurrent_loads} ) async def start(self) -> None: """Start the background worker tasks for concurrent processing.""" if self.worker_tasks and any(not task.done() for task in self.worker_tasks): logger.warning("Background workers already running") return self._shutdown = False # Start multiple workers for concurrent processing self.worker_tasks = [] for i in range(self.max_concurrent_loads): worker = asyncio.create_task(self._worker(worker_id=i)) self.worker_tasks.append(worker) logger.info( "Background workers started", extra={"num_workers": len(self.worker_tasks)} ) async def stop(self) -> None: """Stop all background workers gracefully.""" if not self.worker_tasks: return logger.info("Stopping background workers...") self._shutdown = True # Cancel all worker tasks for worker_task in self.worker_tasks: if not worker_task.done(): worker_task.cancel() # Wait for all workers to finish results = await asyncio.gather(*self.worker_tasks, return_exceptions=True) # Log any unexpected exceptions (ignore CancelledError) for i, result in enumerate(results): if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError): logger.error( f"Worker {i} stopped with exception", extra={"exception": str(result)} ) self.worker_tasks = [] logger.info("All background workers stopped") async def add_series_loading_task( self, key: str, folder: str, name: str, year: Optional[int] = None ) -> None: """Add a series to the loading queue. Args: key: Series unique identifier (primary key) folder: Series folder name (metadata only) name: Series display name year: Series release year """ # Check if task already exists if key in self.active_tasks: logger.debug(f"Task for series {key} already exists, skipping") return task = SeriesLoadingTask( key=key, folder=folder, name=name, year=year, started_at=datetime.now(timezone.utc) ) self.active_tasks[key] = task await self.task_queue.put(task) logger.info(f"Added loading task for series: {key}") # Broadcast initial status await self._broadcast_status(task) async def check_missing_data( self, key: str, folder: str, anime_directory: str, db: Any ) -> Dict[str, bool]: """Check what data is missing for a series. Args: key: Series unique identifier folder: Series folder name anime_directory: Base anime directory path db: Database session Returns: Dict indicating what data is missing (True = missing, False = exists) """ missing = { "episodes": False, "nfo": False, "logo": False, "images": False } # Check database for series info from src.server.database.service import AnimeSeriesService from src.server.utils.media import check_media_files series_db = await AnimeSeriesService.get_by_key(db, key) if not series_db: # Series doesn't exist in DB, need everything missing = {k: True for k in missing} return missing # Check episodes missing["episodes"] = not series_db.episodes_loaded # Check files using utility function folder_path = Path(anime_directory) / folder media_status = check_media_files( folder_path, check_poster=True, check_logo=True, check_fanart=True, check_nfo=True ) # Check NFO file missing["nfo"] = not media_status.get("nfo", False) or not series_db.has_nfo # Check logo missing["logo"] = not media_status.get("logo", False) or not series_db.logo_loaded # Check images (poster and fanart) missing["images"] = ( not (media_status.get("poster", False) and media_status.get("fanart", False)) or not series_db.images_loaded ) return missing async def _worker(self, worker_id: int = 0) -> None: """Background worker that processes loading tasks from the queue. Multiple workers can run concurrently to process tasks in parallel. Args: worker_id: Unique identifier for this worker instance """ logger.info(f"Background worker {worker_id} started processing tasks") while not self._shutdown: try: # Wait for a task with timeout to allow shutdown checks task = await asyncio.wait_for( self.task_queue.get(), timeout=1.0 ) logger.info( f"Worker {worker_id} processing loading task for series: {task.key}" ) # Process the task await self._load_series_data(task) # Mark task as done self.task_queue.task_done() except asyncio.TimeoutError: # No task available, continue loop continue except asyncio.CancelledError: logger.info(f"Worker {worker_id} task cancelled") break except Exception as e: logger.exception(f"Error in background worker {worker_id}: {e}") # Continue processing other tasks continue logger.info(f"Background worker {worker_id} stopped") async def _load_series_data(self, task: SeriesLoadingTask) -> None: """Load all missing data for a series. Orchestrates loading by calling existing services (AnimeService, NFOService) rather than reimplementing logic. Updates status and broadcasts progress. Args: task: The loading task to process """ try: # Get database session from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService async with get_db_session() as db: try: # Check what data is missing missing = await self.check_missing_data( task.key, task.folder, self.series_app.directory_to_search, db ) # Load NFO and images if missing if missing["nfo"] or missing["logo"] or missing["images"]: await self._load_nfo_and_images(task, db) else: task.progress["nfo"] = True task.progress["logo"] = True task.progress["images"] = True # Scan for missing episodes # This discovers seasons/episodes from provider and compares with filesystem # to populate episodeDict with episodes available for download await self._scan_missing_episodes(task, db) # Mark as completed task.status = LoadingStatus.COMPLETED task.completed_at = datetime.now(timezone.utc) # Update database series_db = await AnimeSeriesService.get_by_key(db, task.key) if series_db: series_db.loading_status = "completed" series_db.loading_completed_at = task.completed_at series_db.loading_error = None await db.commit() # Broadcast completion await self._broadcast_status(task) logger.info(f"Successfully loaded all data for series: {task.key}") except Exception as e: logger.exception(f"Error loading series data: {e}") task.status = LoadingStatus.FAILED task.error = str(e) task.completed_at = datetime.now(timezone.utc) # Update database with error series_db = await AnimeSeriesService.get_by_key(db, task.key) if series_db: series_db.loading_status = "failed" series_db.loading_error = str(e) series_db.loading_completed_at = task.completed_at await db.commit() # Broadcast error await self._broadcast_status(task) finally: # Remove from active tasks self.active_tasks.pop(task.key, None) async def _find_series_directory(self, task: SeriesLoadingTask) -> Optional[Path]: """Find the series directory without triggering full rescan. Args: task: The loading task with series information Returns: Path to series directory if found, None otherwise """ try: # Construct expected directory path series_dir = Path(self.series_app.directory_to_search) / task.folder # Check if directory exists if series_dir.exists() and series_dir.is_dir(): logger.debug(f"Found series directory: {series_dir}") return series_dir else: logger.warning(f"Series directory not found: {series_dir}") return None except Exception as e: logger.error(f"Error finding series directory for {task.key}: {e}") return None async def _scan_series_episodes(self, series_dir: Path, task: SeriesLoadingTask) -> Dict[str, List[str]]: """Scan episodes for a specific series directory only. This method scans only the given series directory instead of the entire anime library, making it much more efficient for single series operations. Args: series_dir: Path to the series directory task: The loading task Returns: Dict mapping season names to lists of episode files """ episodes_by_season = {} try: # Scan for season directories for item in sorted(series_dir.iterdir()): if not item.is_dir(): continue season_name = item.name episodes = [] # Scan for .mp4 files in season directory for episode_file in sorted(item.glob("*.mp4")): episodes.append(episode_file.name) if episodes: episodes_by_season[season_name] = episodes logger.debug(f"Found {len(episodes)} episodes in {season_name}") logger.info(f"Scanned {len(episodes_by_season)} seasons for {task.key}") return episodes_by_season except Exception as e: logger.error(f"Error scanning episodes for {task.key}: {e}") return {} async def _load_episodes(self, task: SeriesLoadingTask, db: Any) -> None: """Load episodes for a series by scanning only its directory. This optimized version scans only the specific series directory instead of triggering a full library rescan. Args: task: The loading task db: Database session """ task.status = LoadingStatus.LOADING_EPISODES await self._broadcast_status(task, "Loading episodes...") try: # Find series directory without full rescan series_dir = await self._find_series_directory(task) if not series_dir: logger.error(f"Cannot load episodes - directory not found for {task.key}") task.progress["episodes"] = False return # Scan episodes in this specific series directory only episodes_by_season = await self._scan_series_episodes(series_dir, task) if not episodes_by_season: logger.warning(f"No episodes found for {task.key}") task.progress["episodes"] = False return # Update task progress task.progress["episodes"] = True # Update database from src.server.database.service import AnimeSeriesService series_db = await AnimeSeriesService.get_by_key(db, task.key) if series_db: series_db.episodes_loaded = True series_db.loading_status = "loading_episodes" await db.commit() logger.info(f"Episodes loaded for series: {task.key} ({len(episodes_by_season)} seasons)") except Exception as e: logger.exception(f"Failed to load episodes for {task.key}: {e}") raise async def _load_nfo_and_images(self, task: SeriesLoadingTask, db: Any) -> bool: """Load NFO file and images for a series by reusing NFOService. Args: task: The loading task db: Database session Returns: bool: True if NFO was created, False if it already existed or failed """ task.status = LoadingStatus.LOADING_NFO await self._broadcast_status(task, "Checking NFO file...") try: # Check if NFOService is available if not self.series_app.nfo_service: logger.warning( f"NFOService not available, skipping NFO/images for {task.key}" ) task.progress["nfo"] = False task.progress["logo"] = False task.progress["images"] = False return False # Check if NFO already exists if self.series_app.nfo_service.has_nfo(task.folder): logger.info(f"NFO already exists for {task.key}, skipping creation") # Update task progress task.progress["nfo"] = True task.progress["logo"] = True # Assume logo exists if NFO exists task.progress["images"] = True # Assume images exist if NFO exists # Update database with existing NFO info from src.server.database.service import AnimeSeriesService series_db = await AnimeSeriesService.get_by_key(db, task.key) if series_db: # Only update if not already marked if not series_db.has_nfo: series_db.has_nfo = True series_db.nfo_created_at = datetime.now(timezone.utc) logger.info(f"Updated database with existing NFO for {task.key}") if not series_db.logo_loaded: series_db.logo_loaded = True if not series_db.images_loaded: series_db.images_loaded = True await db.commit() logger.info(f"Existing NFO found and database updated for series: {task.key}") return False # NFO doesn't exist, create it await self._broadcast_status(task, "Generating NFO file...") logger.info(f"Creating new NFO for {task.key}") # Use existing NFOService to create NFO with all images # This reuses all existing TMDB API logic and image downloading nfo_path = await self.series_app.nfo_service.create_tvshow_nfo( serie_name=task.name, serie_folder=task.folder, year=task.year, download_poster=True, download_logo=True, download_fanart=True ) # Update task progress task.progress["nfo"] = True task.progress["logo"] = True task.progress["images"] = True # Update database from src.server.database.service import AnimeSeriesService series_db = await AnimeSeriesService.get_by_key(db, task.key) if series_db: series_db.has_nfo = True series_db.nfo_created_at = datetime.now(timezone.utc) series_db.logo_loaded = True series_db.images_loaded = True series_db.loading_status = "loading_nfo" await db.commit() logger.info(f"NFO and images created and loaded for series: {task.key}") return True except Exception as e: logger.exception(f"Failed to load NFO/images for {task.key}: {e}") # Don't fail the entire task if NFO fails task.progress["nfo"] = False task.progress["logo"] = False task.progress["images"] = False return False async def _scan_missing_episodes(self, task: SeriesLoadingTask, db: Any) -> None: """Scan for missing episodes after NFO creation. This method calls SerieScanner.scan_single_series() to populate the episodeDict with available episodes that can be downloaded. Args: task: The loading task db: Database session """ task.status = LoadingStatus.LOADING_EPISODES await self._broadcast_status(task, "Scanning for missing episodes...") try: # Get scanner from SeriesApp if not hasattr(self.series_app, 'serie_scanner'): logger.warning( f"Scanner not available, skipping episode scan for {task.key}" ) return # Scan for missing episodes using the targeted scan method # This populates the episodeDict without triggering a full rescan logger.info(f"Scanning missing episodes for {task.key}") missing_episodes = self.series_app.serie_scanner.scan_single_series( key=task.key, folder=task.folder ) # Log the results total_missing = sum(len(eps) for eps in missing_episodes.values()) if total_missing > 0: logger.info( f"Found {total_missing} missing episodes across " f"{len(missing_episodes)} seasons for {task.key}" ) # Notify anime_service to sync episodes to database if self.anime_service: logger.debug(f"Calling anime_service.sync_episodes_to_db for {task.key}") await self.anime_service.sync_episodes_to_db(task.key) else: logger.warning(f"anime_service not available, episodes will not be synced to DB for {task.key}") else: logger.info(f"No missing episodes found for {task.key}") # Update series status in database from src.server.database.service import AnimeSeriesService series_db = await AnimeSeriesService.get_by_key(db, task.key) if series_db: series_db.episodes_loaded = True series_db.loading_status = "loading_episodes" await db.commit() # Mark progress as complete task.progress["episodes"] = True task.progress["episodes"] = True except Exception as e: logger.exception(f"Failed to scan missing episodes for {task.key}: {e}") task.progress["episodes"] = False async def _broadcast_status( self, task: SeriesLoadingTask, message: Optional[str] = None ) -> None: """Broadcast loading status update via WebSocket. Args: task: The loading task message: Optional status message """ if not message: if task.status == LoadingStatus.PENDING: message = "Queued for loading..." elif task.status == LoadingStatus.LOADING_EPISODES: message = "Loading episodes..." elif task.status == LoadingStatus.LOADING_NFO: message = "Generating NFO file..." elif task.status == LoadingStatus.COMPLETED: message = "All data loaded successfully" elif task.status == LoadingStatus.FAILED: message = f"Loading failed: {task.error}" else: message = "Loading..." payload = { "type": "series_loading_update", "key": task.key, "series_key": task.key, # For frontend compatibility "folder": task.folder, "status": task.status.value, # For frontend compatibility "loading_status": task.status.value, "progress": task.progress, "message": message, "timestamp": datetime.now(timezone.utc).isoformat(), "error": task.error } await self.websocket_service.broadcast(payload) # Singleton instance _background_loader_service: Optional[BackgroundLoaderService] = None def init_background_loader_service( websocket_service: WebSocketService, anime_service: Any, series_app: Any, max_concurrent_loads: int = 5, ) -> BackgroundLoaderService: """Initialize the background loader service singleton. Args: websocket_service: WebSocket service for broadcasts anime_service: AnimeService instance series_app: SeriesApp instance max_concurrent_loads: Maximum number of series to load concurrently (default: 5) Returns: BackgroundLoaderService instance """ global _background_loader_service if _background_loader_service is None: _background_loader_service = BackgroundLoaderService( websocket_service=websocket_service, anime_service=anime_service, series_app=series_app, max_concurrent_loads=max_concurrent_loads, ) return _background_loader_service def get_background_loader_service() -> BackgroundLoaderService: """Get the background loader service singleton. Returns: BackgroundLoaderService instance Raises: RuntimeError: If service not initialized """ if _background_loader_service is None: raise RuntimeError( "BackgroundLoaderService not initialized. " "Call init_background_loader_service() first." ) return _background_loader_service