"""Background loader service for asynchronous series data loading. This service orchestrates background loading of series metadata (episodes, NFO files, logos, images) without blocking the user. It provides a task queue system for managing loading operations and real-time status updates via WebSocket. Key Features: - Asynchronous task queue for series data loading - Reuses existing services (AnimeService, NFOService) to avoid code duplication - Real-time progress updates via WebSocket - Graceful startup and shutdown handling - Error handling with retry logic """ from __future__ import annotations import asyncio from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional import structlog from src.server.services.websocket_service import WebSocketService logger = structlog.get_logger(__name__) class LoadingStatus(str, Enum): """Status of a series loading task.""" PENDING = "pending" LOADING_EPISODES = "loading_episodes" LOADING_NFO = "loading_nfo" LOADING_LOGO = "loading_logo" LOADING_IMAGES = "loading_images" COMPLETED = "completed" FAILED = "failed" @dataclass class SeriesLoadingTask: """Represents a series loading task with progress tracking. Attributes: key: Series unique identifier (primary key) folder: Series folder name (metadata only) name: Series display name year: Series release year status: Current loading status progress: Dict tracking what data has been loaded started_at: When loading started completed_at: When loading completed error: Error message if loading failed """ key: str folder: str name: str year: Optional[int] = None status: LoadingStatus = LoadingStatus.PENDING progress: Dict[str, bool] = field(default_factory=lambda: { "episodes": False, "nfo": False, "logo": False, "images": False }) started_at: Optional[datetime] = None completed_at: Optional[datetime] = None error: Optional[str] = None class BackgroundLoaderService: """Service for managing background loading of series metadata. This service orchestrates asynchronous loading by delegating to existing services (AnimeService for episodes, NFOService for NFO/images) rather than reimplementing logic. It provides task queuing, status tracking, and WebSocket notifications. Attributes: websocket_service: Service for broadcasting status updates anime_service: Service for episode scanning (reused) series_app: Core SeriesApp instance for NFO service access task_queue: Queue of pending loading tasks active_tasks: Dict of currently processing tasks worker_task: Background worker task """ def __init__( self, websocket_service: WebSocketService, anime_service: Any, # AnimeService - avoiding circular import series_app: Any, # SeriesApp - avoiding circular import ): """Initialize the background loader service. Args: websocket_service: WebSocket service for status broadcasts anime_service: AnimeService instance for episode operations series_app: SeriesApp instance for NFO operations """ self.websocket_service = websocket_service self.anime_service = anime_service self.series_app = series_app # Task management self.task_queue: asyncio.Queue[SeriesLoadingTask] = asyncio.Queue() self.active_tasks: Dict[str, SeriesLoadingTask] = {} self.worker_task: Optional[asyncio.Task] = None self._shutdown = False logger.info("BackgroundLoaderService initialized") async def start(self) -> None: """Start the background worker task.""" if self.worker_task is not None and not self.worker_task.done(): logger.warning("Background worker already running") return self._shutdown = False self.worker_task = asyncio.create_task(self._worker()) logger.info("Background worker started") async def stop(self) -> None: """Stop the background worker gracefully.""" if self.worker_task is None: return logger.info("Stopping background worker...") self._shutdown = True # Cancel the worker task if not self.worker_task.done(): self.worker_task.cancel() try: await self.worker_task except asyncio.CancelledError: pass logger.info("Background worker stopped") async def add_series_loading_task( self, key: str, folder: str, name: str, year: Optional[int] = None ) -> None: """Add a series to the loading queue. Args: key: Series unique identifier (primary key) folder: Series folder name (metadata only) name: Series display name year: Series release year """ # Check if task already exists if key in self.active_tasks: logger.debug(f"Task for series {key} already exists, skipping") return task = SeriesLoadingTask( key=key, folder=folder, name=name, year=year, started_at=datetime.now(timezone.utc) ) self.active_tasks[key] = task await self.task_queue.put(task) logger.info(f"Added loading task for series: {key}") # Broadcast initial status await self._broadcast_status(task) async def check_missing_data( self, key: str, folder: str, anime_directory: str, db: Any ) -> Dict[str, bool]: """Check what data is missing for a series. Args: key: Series unique identifier folder: Series folder name anime_directory: Base anime directory path db: Database session Returns: Dict indicating what data is missing (True = missing, False = exists) """ missing = { "episodes": False, "nfo": False, "logo": False, "images": False } # Check database for series info from src.server.database.service import AnimeSeriesService series_db = await AnimeSeriesService.get_by_key(db, key) if not series_db: # Series doesn't exist in DB, need everything missing = {k: True for k in missing} return missing # Check episodes missing["episodes"] = not series_db.episodes_loaded # Check NFO file nfo_path = Path(anime_directory) / folder / "tvshow.nfo" missing["nfo"] = not nfo_path.exists() or not series_db.has_nfo # Check logo logo_path = Path(anime_directory) / folder / "logo.png" missing["logo"] = not logo_path.exists() or not series_db.logo_loaded # Check images (poster and fanart) poster_path = Path(anime_directory) / folder / "poster.jpg" fanart_path = Path(anime_directory) / folder / "fanart.jpg" missing["images"] = ( not (poster_path.exists() and fanart_path.exists()) or not series_db.images_loaded ) return missing async def _worker(self) -> None: """Background worker that processes loading tasks from the queue.""" logger.info("Background worker started processing tasks") while not self._shutdown: try: # Wait for a task with timeout to allow shutdown checks task = await asyncio.wait_for( self.task_queue.get(), timeout=1.0 ) logger.info(f"Processing loading task for series: {task.key}") # Process the task await self._load_series_data(task) # Mark task as done self.task_queue.task_done() except asyncio.TimeoutError: # No task available, continue loop continue except asyncio.CancelledError: logger.info("Worker task cancelled") break except Exception as e: logger.exception(f"Error in background worker: {e}") # Continue processing other tasks continue logger.info("Background worker stopped") async def _load_series_data(self, task: SeriesLoadingTask) -> None: """Load all missing data for a series. Orchestrates loading by calling existing services (AnimeService, NFOService) rather than reimplementing logic. Updates status and broadcasts progress. Args: task: The loading task to process """ try: # Get database session from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService async with get_db_session() as db: try: # Check what data is missing missing = await self.check_missing_data( task.key, task.folder, self.series_app.directory_to_search, db ) # Load episodes if missing if missing["episodes"]: await self._load_episodes(task, db) else: task.progress["episodes"] = True # Load NFO and images if missing if missing["nfo"] or missing["logo"] or missing["images"]: await self._load_nfo_and_images(task, db) else: task.progress["nfo"] = True task.progress["logo"] = True task.progress["images"] = True # Mark as completed task.status = LoadingStatus.COMPLETED task.completed_at = datetime.now(timezone.utc) # Update database series_db = await AnimeSeriesService.get_by_key(db, task.key) if series_db: series_db.loading_status = "completed" series_db.loading_completed_at = task.completed_at series_db.loading_error = None await db.commit() # Broadcast completion await self._broadcast_status(task) logger.info(f"Successfully loaded all data for series: {task.key}") except Exception as e: logger.exception(f"Error loading series data: {e}") task.status = LoadingStatus.FAILED task.error = str(e) task.completed_at = datetime.now(timezone.utc) # Update database with error series_db = await AnimeSeriesService.get_by_key(db, task.key) if series_db: series_db.loading_status = "failed" series_db.loading_error = str(e) series_db.loading_completed_at = task.completed_at await db.commit() # Broadcast error await self._broadcast_status(task) finally: # Remove from active tasks self.active_tasks.pop(task.key, None) async def _find_series_directory(self, task: SeriesLoadingTask) -> Optional[Path]: """Find the series directory without triggering full rescan. Args: task: The loading task with series information Returns: Path to series directory if found, None otherwise """ try: # Construct expected directory path series_dir = Path(self.series_app.directory_to_search) / task.folder # Check if directory exists if series_dir.exists() and series_dir.is_dir(): logger.debug(f"Found series directory: {series_dir}") return series_dir else: logger.warning(f"Series directory not found: {series_dir}") return None except Exception as e: logger.error(f"Error finding series directory for {task.key}: {e}") return None async def _scan_series_episodes(self, series_dir: Path, task: SeriesLoadingTask) -> Dict[str, List[str]]: """Scan episodes for a specific series directory only. This method scans only the given series directory instead of the entire anime library, making it much more efficient for single series operations. Args: series_dir: Path to the series directory task: The loading task Returns: Dict mapping season names to lists of episode files """ episodes_by_season = {} try: # Scan for season directories for item in sorted(series_dir.iterdir()): if not item.is_dir(): continue season_name = item.name episodes = [] # Scan for .mp4 files in season directory for episode_file in sorted(item.glob("*.mp4")): episodes.append(episode_file.name) if episodes: episodes_by_season[season_name] = episodes logger.debug(f"Found {len(episodes)} episodes in {season_name}") logger.info(f"Scanned {len(episodes_by_season)} seasons for {task.key}") return episodes_by_season except Exception as e: logger.error(f"Error scanning episodes for {task.key}: {e}") return {} async def _load_episodes(self, task: SeriesLoadingTask, db: Any) -> None: """Load episodes for a series by scanning only its directory. This optimized version scans only the specific series directory instead of triggering a full library rescan. Args: task: The loading task db: Database session """ task.status = LoadingStatus.LOADING_EPISODES await self._broadcast_status(task, "Loading episodes...") try: # Find series directory without full rescan series_dir = await self._find_series_directory(task) if not series_dir: logger.error(f"Cannot load episodes - directory not found for {task.key}") task.progress["episodes"] = False return # Scan episodes in this specific series directory only episodes_by_season = await self._scan_series_episodes(series_dir, task) if not episodes_by_season: logger.warning(f"No episodes found for {task.key}") task.progress["episodes"] = False return # Update task progress task.progress["episodes"] = True # Update database from src.server.database.service import AnimeSeriesService series_db = await AnimeSeriesService.get_by_key(db, task.key) if series_db: series_db.episodes_loaded = True series_db.loading_status = "loading_episodes" await db.commit() logger.info(f"Episodes loaded for series: {task.key} ({len(episodes_by_season)} seasons)") except Exception as e: logger.exception(f"Failed to load episodes for {task.key}: {e}") raise async def _load_nfo_and_images(self, task: SeriesLoadingTask, db: Any) -> None: """Load NFO file and images for a series by reusing NFOService. Args: task: The loading task db: Database session """ task.status = LoadingStatus.LOADING_NFO await self._broadcast_status(task, "Checking NFO file...") try: # Check if NFOService is available if not self.series_app.nfo_service: logger.warning( f"NFOService not available, skipping NFO/images for {task.key}" ) task.progress["nfo"] = False task.progress["logo"] = False task.progress["images"] = False return # Check if NFO already exists if self.series_app.nfo_service.has_nfo(task.folder): logger.info(f"NFO already exists for {task.key}, skipping creation") # Update task progress task.progress["nfo"] = True task.progress["logo"] = True # Assume logo exists if NFO exists task.progress["images"] = True # Assume images exist if NFO exists # Update database with existing NFO info from src.server.database.service import AnimeSeriesService series_db = await AnimeSeriesService.get_by_key(db, task.key) if series_db: # Only update if not already marked if not series_db.has_nfo: series_db.has_nfo = True series_db.nfo_created_at = datetime.now(timezone.utc) logger.info(f"Updated database with existing NFO for {task.key}") if not series_db.logo_loaded: series_db.logo_loaded = True if not series_db.images_loaded: series_db.images_loaded = True await db.commit() logger.info(f"Existing NFO found and database updated for series: {task.key}") return # NFO doesn't exist, create it await self._broadcast_status(task, "Generating NFO file...") logger.info(f"Creating new NFO for {task.key}") # Use existing NFOService to create NFO with all images # This reuses all existing TMDB API logic and image downloading nfo_path = await self.series_app.nfo_service.create_tvshow_nfo( serie_name=task.name, serie_folder=task.folder, year=task.year, download_poster=True, download_logo=True, download_fanart=True ) # Update task progress task.progress["nfo"] = True task.progress["logo"] = True task.progress["images"] = True # Update database from src.server.database.service import AnimeSeriesService series_db = await AnimeSeriesService.get_by_key(db, task.key) if series_db: series_db.has_nfo = True series_db.nfo_created_at = datetime.now(timezone.utc) series_db.logo_loaded = True series_db.images_loaded = True series_db.loading_status = "loading_nfo" await db.commit() logger.info(f"NFO and images created and loaded for series: {task.key}") except Exception as e: logger.exception(f"Failed to load NFO/images for {task.key}: {e}") # Don't fail the entire task if NFO fails task.progress["nfo"] = False task.progress["logo"] = False task.progress["images"] = False async def _broadcast_status( self, task: SeriesLoadingTask, message: Optional[str] = None ) -> None: """Broadcast loading status update via WebSocket. Args: task: The loading task message: Optional status message """ if not message: if task.status == LoadingStatus.PENDING: message = "Queued for loading..." elif task.status == LoadingStatus.LOADING_EPISODES: message = "Loading episodes..." elif task.status == LoadingStatus.LOADING_NFO: message = "Generating NFO file..." elif task.status == LoadingStatus.COMPLETED: message = "All data loaded successfully" elif task.status == LoadingStatus.FAILED: message = f"Loading failed: {task.error}" else: message = "Loading..." payload = { "type": "series_loading_update", "key": task.key, "folder": task.folder, "loading_status": task.status.value, "progress": task.progress, "message": message, "timestamp": datetime.now(timezone.utc).isoformat(), "error": task.error } await self.websocket_service.broadcast(payload) # Singleton instance _background_loader_service: Optional[BackgroundLoaderService] = None def init_background_loader_service( websocket_service: WebSocketService, anime_service: Any, series_app: Any ) -> BackgroundLoaderService: """Initialize the background loader service singleton. Args: websocket_service: WebSocket service for broadcasts anime_service: AnimeService instance series_app: SeriesApp instance Returns: BackgroundLoaderService instance """ global _background_loader_service if _background_loader_service is None: _background_loader_service = BackgroundLoaderService( websocket_service=websocket_service, anime_service=anime_service, series_app=series_app ) return _background_loader_service def get_background_loader_service() -> BackgroundLoaderService: """Get the background loader service singleton. Returns: BackgroundLoaderService instance Raises: RuntimeError: If service not initialized """ if _background_loader_service is None: raise RuntimeError( "BackgroundLoaderService not initialized. " "Call init_background_loader_service() first." ) return _background_loader_service