from __future__ import annotations import asyncio import time from functools import lru_cache from typing import Optional import structlog from src.core.SeriesApp import SeriesApp from src.server.services.progress_service import ( ProgressService, ProgressType, get_progress_service, ) from src.server.services.websocket_service import ( WebSocketService, get_websocket_service, ) logger = structlog.get_logger(__name__) class AnimeServiceError(Exception): """Service-level exception for anime operations.""" class AnimeService: """Wraps SeriesApp for use in the FastAPI web layer. This service provides a clean interface to anime operations, using 'key' as the primary series identifier (provider-assigned, URL-safe) and 'folder' as metadata only (filesystem folder name for display purposes). - SeriesApp methods are now async, no need for threadpool - Subscribes to SeriesApp events for progress tracking - Exposes async methods using 'key' for all series identification - Adds simple in-memory caching for read operations """ def __init__( self, series_app: SeriesApp, progress_service: Optional[ProgressService] = None, websocket_service: Optional[WebSocketService] = None, ): self._app = series_app self._directory = series_app.directory_to_search self._progress_service = progress_service or get_progress_service() self._websocket_service = websocket_service or get_websocket_service() self._event_loop: Optional[asyncio.AbstractEventLoop] = None # Track scan progress for WebSocket updates self._scan_start_time: Optional[float] = None self._scan_directories_count: int = 0 self._scan_files_count: int = 0 self._scan_total_items: int = 0 self._is_scanning: bool = False self._scan_current_directory: str = "" # Lock to prevent concurrent rescans self._scan_lock = asyncio.Lock() # Subscribe to SeriesApp events # Note: Events library uses assignment (=), not += operator try: self._app.download_status = self._on_download_status self._app.scan_status = self._on_scan_status logger.info( "Subscribed to SeriesApp events", scan_status_handler=str(self._app.scan_status), series_app_id=id(self._app), ) except Exception as e: logger.exception("Failed to subscribe to SeriesApp events") raise AnimeServiceError("Initialization failed") from e def _on_download_status(self, args) -> None: """Handle download status events from SeriesApp. Events include both 'key' (primary identifier) and 'serie_folder' (metadata for display and filesystem operations). Args: args: DownloadStatusEventArgs from SeriesApp containing key, serie_folder, season, episode, status, and progress info """ try: # Get event loop - try running loop first, then stored loop loop = None try: loop = asyncio.get_running_loop() except RuntimeError: # No running loop in this thread - use stored loop loop = self._event_loop if not loop: logger.debug( "No event loop available for download status event", status=args.status ) return # Use item_id if available, otherwise fallback to constructing ID progress_id = ( args.item_id if args.item_id else ( f"download_{args.serie_folder}_" f"{args.season}_{args.episode}" ) ) # Map SeriesApp download events to progress service if args.status == "started": asyncio.run_coroutine_threadsafe( self._progress_service.start_progress( progress_id=progress_id, progress_type=ProgressType.DOWNLOAD, title=f"Downloading {args.serie_folder}", message=f"S{args.season:02d}E{args.episode:02d}", metadata=( {"item_id": args.item_id} if args.item_id else None ), ), loop ) elif args.status == "progress": # Build metadata with item_id and speed progress_metadata = {} if args.item_id: progress_metadata["item_id"] = args.item_id if args.mbper_sec is not None: progress_metadata["speed_mbps"] = round(args.mbper_sec, 2) if args.eta is not None: progress_metadata["eta"] = args.eta asyncio.run_coroutine_threadsafe( self._progress_service.update_progress( progress_id=progress_id, current=args.progress, total=100, message=args.message or "Downloading...", metadata=( progress_metadata if progress_metadata else None ), ), loop ) elif args.status == "completed": asyncio.run_coroutine_threadsafe( self._progress_service.complete_progress( progress_id=progress_id, message="Download completed", ), loop ) elif args.status == "failed": asyncio.run_coroutine_threadsafe( self._progress_service.fail_progress( progress_id=progress_id, error_message=args.message or str(args.error), ), loop ) except Exception as exc: # pylint: disable=broad-except logger.error( "Error handling download status event", error=str(exc) ) def _on_scan_status(self, args) -> None: """Handle scan status events from SeriesApp. Events include both 'key' (primary identifier) and 'folder' (metadata for display purposes). Also broadcasts via WebSocket for real-time UI updates. Args: args: ScanStatusEventArgs from SeriesApp containing key, folder, current, total, status, and progress info """ try: scan_id = "library_scan" logger.info( "Scan status event received", status=args.status, current=args.current, total=args.total, folder=args.folder, ) # Get event loop - try running loop first, then stored loop loop = None try: loop = asyncio.get_running_loop() logger.debug("Using running event loop for scan status") except RuntimeError: # No running loop in this thread - use stored loop loop = self._event_loop logger.debug( "Using stored event loop for scan status", has_loop=loop is not None ) if not loop: logger.warning( "No event loop available for scan status event", status=args.status ) return logger.info( "Processing scan status event", status=args.status, loop_id=id(loop), ) # Map SeriesApp scan events to progress service if args.status == "started": # Track scan start time and reset counters self._scan_start_time = time.time() self._scan_directories_count = 0 self._scan_files_count = 0 self._scan_total_items = args.total self._is_scanning = True self._scan_current_directory = "" asyncio.run_coroutine_threadsafe( self._progress_service.start_progress( progress_id=scan_id, progress_type=ProgressType.SCAN, title="Scanning anime library", message=args.message or "Initializing scan...", ), loop ) # Broadcast scan started via WebSocket with total items asyncio.run_coroutine_threadsafe( self._broadcast_scan_started_safe(total_items=args.total), loop ) elif args.status == "progress": # Update scan counters self._scan_directories_count = args.current self._scan_current_directory = args.folder or "" # Estimate files found (use current as proxy since detailed # file count isn't available from SerieScanner) asyncio.run_coroutine_threadsafe( self._progress_service.update_progress( progress_id=scan_id, current=args.current, total=args.total, message=args.message or f"Scanning: {args.folder}", ), loop ) # Broadcast scan progress via WebSocket asyncio.run_coroutine_threadsafe( self._broadcast_scan_progress_safe( directories_scanned=args.current, files_found=args.current, # Use folder count as proxy current_directory=args.folder or "", total_items=args.total, ), loop ) elif args.status == "completed": # Calculate elapsed time elapsed = 0.0 if self._scan_start_time: elapsed = time.time() - self._scan_start_time # Mark scan as complete self._is_scanning = False asyncio.run_coroutine_threadsafe( self._progress_service.complete_progress( progress_id=scan_id, message=args.message or "Scan completed", ), loop ) # Broadcast scan completed via WebSocket asyncio.run_coroutine_threadsafe( self._broadcast_scan_completed_safe( total_directories=args.total, total_files=args.total, # Use folder count as proxy elapsed_seconds=elapsed, ), loop ) elif args.status == "failed": self._is_scanning = False asyncio.run_coroutine_threadsafe( self._progress_service.fail_progress( progress_id=scan_id, error_message=args.message or str(args.error), ), loop ) elif args.status == "cancelled": self._is_scanning = False asyncio.run_coroutine_threadsafe( self._progress_service.fail_progress( progress_id=scan_id, error_message=args.message or "Scan cancelled", ), loop ) except Exception as exc: # pylint: disable=broad-except logger.error("Error handling scan status event: %s", exc) async def _broadcast_scan_started_safe(self, total_items: int = 0) -> None: """Safely broadcast scan started event via WebSocket. Wraps the WebSocket broadcast in try/except to ensure scan continues even if WebSocket fails. Args: total_items: Total number of items to scan """ try: logger.info( "Broadcasting scan_started via WebSocket", directory=self._directory, total_items=total_items, ) await self._websocket_service.broadcast_scan_started( directory=self._directory, total_items=total_items, ) logger.info("scan_started broadcast sent successfully") except Exception as exc: logger.warning( "Failed to broadcast scan_started via WebSocket", error=str(exc) ) async def _broadcast_scan_progress_safe( self, directories_scanned: int, files_found: int, current_directory: str, total_items: int = 0, ) -> None: """Safely broadcast scan progress event via WebSocket. Wraps the WebSocket broadcast in try/except to ensure scan continues even if WebSocket fails. Args: directories_scanned: Number of directories scanned so far files_found: Number of files found so far current_directory: Current directory being scanned total_items: Total number of items to scan """ try: await self._websocket_service.broadcast_scan_progress( directories_scanned=directories_scanned, files_found=files_found, current_directory=current_directory, total_items=total_items, ) except Exception as exc: logger.warning( "Failed to broadcast scan_progress via WebSocket", error=str(exc) ) async def _broadcast_scan_completed_safe( self, total_directories: int, total_files: int, elapsed_seconds: float, ) -> None: """Safely broadcast scan completed event via WebSocket. Wraps the WebSocket broadcast in try/except to ensure scan cleanup continues even if WebSocket fails. Args: total_directories: Total directories scanned total_files: Total files found elapsed_seconds: Time taken for the scan """ try: await self._websocket_service.broadcast_scan_completed( total_directories=total_directories, total_files=total_files, elapsed_seconds=elapsed_seconds, ) except Exception as exc: logger.warning( "Failed to broadcast scan_completed via WebSocket", error=str(exc) ) def get_scan_status(self) -> dict: """Get the current scan status. Returns: Dictionary with scan status information including: - is_scanning: Whether a scan is currently in progress - total_items: Total number of items to scan - directories_scanned: Number of directories scanned so far - current_directory: Current directory being scanned - directory: Root directory being scanned """ status = { "is_scanning": self._is_scanning, "total_items": self._scan_total_items, "directories_scanned": self._scan_directories_count, "current_directory": self._scan_current_directory, "directory": self._directory, } logger.debug( "Scan status requested", is_scanning=self._is_scanning, total_items=self._scan_total_items, directories_scanned=self._scan_directories_count, ) return status @lru_cache(maxsize=128) def _cached_list_missing(self) -> list[dict]: # Synchronous cached call - SeriesApp.series_list is populated # during initialization try: series = self._app.series_list # normalize to simple dicts result: list[dict] = [] for s in series: if hasattr(s, "to_dict"): result.append(s.to_dict()) else: result.append(s) # type: ignore return result except Exception: logger.exception("Failed to get missing episodes list") raise async def list_missing(self) -> list[dict]: """Return list of series with missing episodes. Each series dictionary includes 'key' as the primary identifier and 'folder' as metadata for display purposes. Returns: List of series dictionaries with 'key', 'name', 'site', 'folder', and 'episodeDict' fields """ try: # series_list is already populated, just access it return self._cached_list_missing() except AnimeServiceError: raise except Exception as exc: logger.exception("list_missing failed") raise AnimeServiceError("Failed to list missing series") from exc async def search(self, query: str) -> list[dict]: """Search for series using underlying provider. Args: query: Search query string Returns: List of search results as dictionaries, each containing 'key' as the primary identifier and other metadata fields """ if not query: return [] try: # SeriesApp.search is now async result = await self._app.search(query) return result except Exception as exc: logger.exception("search failed") raise AnimeServiceError("Search failed") from exc async def rescan(self) -> None: """Trigger a re-scan of the anime library directory. Scans the filesystem for anime series and updates the series list. The SeriesApp handles progress tracking via events which are forwarded to the ProgressService through event handlers. After scanning, results are persisted to the database. All series are identified by their 'key' (provider identifier), with 'folder' stored as metadata. Note: Only one scan can run at a time. If a scan is already in progress, this method returns immediately without starting a new scan. """ # Check if a scan is already running (non-blocking) if self._scan_lock.locked(): logger.info("Rescan already in progress, ignoring request") return async with self._scan_lock: try: # Store event loop for event handlers self._event_loop = asyncio.get_running_loop() logger.info( "Rescan started, event loop stored", loop_id=id(self._event_loop), series_app_id=id(self._app), scan_handler=str(self._app.scan_status), ) # SeriesApp.rescan returns scanned series list scanned_series = await self._app.rescan() # Persist scan results to database if scanned_series: await self._save_scan_results_to_db(scanned_series) # Reload series from database to ensure consistency await self._load_series_from_db() # invalidate cache try: self._cached_list_missing.cache_clear() except Exception: # pylint: disable=broad-except pass except Exception as exc: # pylint: disable=broad-except logger.exception("rescan failed") raise AnimeServiceError("Rescan failed") from exc async def _save_scan_results_to_db(self, series_list: list) -> int: """ Save scan results to the database. Creates or updates series records in the database based on scan results. Args: series_list: List of Serie objects from scan Returns: Number of series saved/updated """ from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService saved_count = 0 async with get_db_session() as db: for serie in series_list: try: # Check if series already exists existing = await AnimeSeriesService.get_by_key( db, serie.key ) if existing: # Update existing series await self._update_series_in_db( serie, existing, db ) else: # Create new series await self._create_series_in_db(serie, db) saved_count += 1 except Exception as e: # pylint: disable=broad-except logger.warning( "Failed to save series to database: %s (key=%s) - %s", serie.name, serie.key, str(e) ) logger.info( "Saved %d series to database from scan results", saved_count ) return saved_count async def _create_series_in_db(self, serie, db) -> None: """Create a new series in the database.""" from src.server.database.service import AnimeSeriesService, EpisodeService anime_series = await AnimeSeriesService.create( db=db, key=serie.key, name=serie.name, site=serie.site, folder=serie.folder, year=serie.year if hasattr(serie, 'year') else None, ) # Create Episode records if serie.episodeDict: for season, episode_numbers in serie.episodeDict.items(): for ep_num in episode_numbers: await EpisodeService.create( db=db, series_id=anime_series.id, season=season, episode_number=ep_num, ) logger.debug( "Created series in database: %s (key=%s, year=%s)", serie.name, serie.key, serie.year if hasattr(serie, 'year') else None ) async def _update_series_in_db(self, serie, existing, db) -> None: """Update an existing series in the database. Syncs the database episodes with the current missing episodes from scan. - Adds new missing episodes that are not in the database - Removes episodes from database that are no longer missing (i.e., the file has been added to the filesystem) """ from src.server.database.service import AnimeSeriesService, EpisodeService # Get existing episodes from database existing_episodes = await EpisodeService.get_by_series(db, existing.id) # Build dict of existing episodes: {season: {ep_num: episode_id}} existing_dict: dict[int, dict[int, int]] = {} for ep in existing_episodes: if ep.season not in existing_dict: existing_dict[ep.season] = {} existing_dict[ep.season][ep.episode_number] = ep.id # Get new missing episodes from scan new_dict = serie.episodeDict or {} # Build set of new missing episodes for quick lookup new_missing_set: set[tuple[int, int]] = set() for season, episode_numbers in new_dict.items(): for ep_num in episode_numbers: new_missing_set.add((season, ep_num)) # Add new missing episodes that are not in the database for season, episode_numbers in new_dict.items(): existing_season_eps = existing_dict.get(season, {}) for ep_num in episode_numbers: if ep_num not in existing_season_eps: await EpisodeService.create( db=db, series_id=existing.id, season=season, episode_number=ep_num, ) logger.debug( "Added missing episode to database: %s S%02dE%02d", serie.key, season, ep_num ) # Remove episodes from database that are no longer missing # (i.e., the episode file now exists on the filesystem) for season, eps_dict in existing_dict.items(): for ep_num, episode_id in eps_dict.items(): if (season, ep_num) not in new_missing_set: await EpisodeService.delete(db, episode_id) logger.info( "Removed episode from database (no longer missing): " "%s S%02dE%02d", serie.key, season, ep_num ) # Update folder if changed if existing.folder != serie.folder: await AnimeSeriesService.update( db, existing.id, folder=serie.folder ) logger.debug( "Updated series in database: %s (key=%s)", serie.name, serie.key ) async def _load_series_from_db(self) -> None: """ Load series from the database into SeriesApp. This method is called during initialization and after rescans to ensure the in-memory series list is in sync with the database. """ from src.core.entities.series import Serie from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService async with get_db_session() as db: anime_series_list = await AnimeSeriesService.get_all( db, with_episodes=True ) # Convert to Serie objects series_list = [] for anime_series in anime_series_list: # Build episode_dict from episodes relationship episode_dict: dict[int, list[int]] = {} if anime_series.episodes: for episode in anime_series.episodes: season = episode.season if season not in episode_dict: episode_dict[season] = [] episode_dict[season].append(episode.episode_number) # Sort episode numbers for season in episode_dict: episode_dict[season].sort() serie = Serie( key=anime_series.key, name=anime_series.name, site=anime_series.site, folder=anime_series.folder, episodeDict=episode_dict ) series_list.append(serie) # Load into SeriesApp self._app.load_series_from_list(series_list) async def add_series_to_db( self, serie, db ): """ Add a series to the database if it doesn't already exist. Uses serie.key for identification. Creates a new AnimeSeries record in the database if it doesn't already exist. Args: serie: The Serie instance to add db: Database session for async operations Returns: Created AnimeSeries instance, or None if already exists """ from src.server.database.service import AnimeSeriesService, EpisodeService # Check if series already exists in DB existing = await AnimeSeriesService.get_by_key(db, serie.key) if existing: logger.debug( "Series already exists in database: %s (key=%s)", serie.name, serie.key ) return None # Create new series in database anime_series = await AnimeSeriesService.create( db=db, key=serie.key, name=serie.name, site=serie.site, folder=serie.folder, year=serie.year if hasattr(serie, 'year') else None, ) # Create Episode records for each episode in episodeDict if serie.episodeDict: for season, episode_numbers in serie.episodeDict.items(): for episode_number in episode_numbers: await EpisodeService.create( db=db, series_id=anime_series.id, season=season, episode_number=episode_number, ) logger.info( "Added series to database: %s (key=%s, year=%s)", serie.name, serie.key, serie.year if hasattr(serie, 'year') else None ) return anime_series async def contains_in_db(self, key: str, db) -> bool: """ Check if a series with the given key exists in the database. Args: key: The unique provider identifier for the series db: Database session for async operations Returns: True if the series exists in the database """ from src.server.database.service import AnimeSeriesService existing = await AnimeSeriesService.get_by_key(db, key) return existing is not None async def download( self, serie_folder: str, season: int, episode: int, key: str, item_id: Optional[str] = None, ) -> bool: """Start a download for a specific episode. The SeriesApp handles progress tracking via events which are forwarded to the ProgressService through event handlers. Args: serie_folder: Serie folder name (metadata only, used for filesystem operations and display) season: Season number episode: Episode number key: Serie unique identifier (primary identifier for series lookup, provider-assigned) item_id: Optional download queue item ID for tracking Returns: True on success Raises: AnimeServiceError: If download fails InterruptedError: If download was cancelled Note: The 'key' parameter is the primary identifier used for all series lookups. The 'serie_folder' is only used for filesystem path construction and display purposes. """ try: # Store event loop for event handlers self._event_loop = asyncio.get_running_loop() # SeriesApp.download is now async and handles events internally return await self._app.download( serie_folder=serie_folder, season=season, episode=episode, key=key, item_id=item_id, ) except InterruptedError: # Download was cancelled - re-raise for proper handling logger.info("Download cancelled, propagating cancellation") raise except Exception as exc: logger.exception("download failed") raise AnimeServiceError("Download failed") from exc def get_anime_service(series_app: SeriesApp) -> AnimeService: """Factory used for creating AnimeService with a SeriesApp instance.""" return AnimeService(series_app) async def sync_series_from_data_files( anime_directory: str, log_instance=None # pylint: disable=unused-argument ) -> int: """ Sync series from data files to the database. Scans the anime directory for data files and adds any new series to the database. Existing series are skipped (no duplicates). This function is typically called during application startup to ensure series metadata stored in filesystem data files is available in the database. Args: anime_directory: Path to the anime directory with data files log_instance: Optional logger instance (unused, kept for API compatibility). This function always uses structlog internally. Returns: Number of new series added to the database """ # Always use structlog for structured logging with keyword arguments log = structlog.get_logger(__name__) try: from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService, EpisodeService log.info( "Starting data file to database sync", directory=anime_directory ) # Get all series from data files using SeriesApp series_app = SeriesApp(anime_directory) all_series = await asyncio.to_thread( series_app.get_all_series_from_data_files ) if not all_series: log.info("No series found in data files to sync") return 0 log.info( "Found series in data files, syncing to database", count=len(all_series) ) async with get_db_session() as db: added_count = 0 skipped_count = 0 for serie in all_series: # Handle series with empty name - use folder as fallback if not serie.name or not serie.name.strip(): if serie.folder and serie.folder.strip(): serie.name = serie.folder.strip() log.debug( "Using folder as name fallback", key=serie.key, folder=serie.folder ) else: log.warning( "Skipping series with empty name and folder", key=serie.key ) skipped_count += 1 continue try: # Check if series already exists in DB existing = await AnimeSeriesService.get_by_key(db, serie.key) if existing: log.debug( "Series already exists in database", name=serie.name, key=serie.key ) continue # Create new series in database anime_series = await AnimeSeriesService.create( db=db, key=serie.key, name=serie.name, site=serie.site, folder=serie.folder, ) # Create Episode records for each episode in episodeDict if serie.episodeDict: for season, episode_numbers in serie.episodeDict.items(): for episode_number in episode_numbers: await EpisodeService.create( db=db, series_id=anime_series.id, season=season, episode_number=episode_number, ) added_count += 1 log.debug( "Added series to database", name=serie.name, key=serie.key ) except Exception as e: # pylint: disable=broad-except log.warning( "Failed to add series to database", key=serie.key, name=serie.name, error=str(e) ) skipped_count += 1 log.info( "Data file sync complete", added=added_count, skipped=len(all_series) - added_count ) return added_count except Exception as e: # pylint: disable=broad-except log.warning( "Failed to sync series to database", error=str(e), exc_info=True ) return 0