from __future__ import annotations import asyncio import time from datetime import datetime, timezone from functools import lru_cache from typing import Optional import structlog from src.core.SeriesApp import SeriesApp from src.server.services.progress_service import ( ProgressService, ProgressType, get_progress_service, ) from src.server.services.websocket_service import ( WebSocketService, get_websocket_service, ) logger = structlog.get_logger(__name__) class AnimeServiceError(Exception): """Service-level exception for anime operations.""" class AnimeService: """Wraps SeriesApp for use in the FastAPI web layer. This service provides a clean interface to anime operations, using 'key' as the primary series identifier (provider-assigned, URL-safe) and 'folder' as metadata only (filesystem folder name for display purposes). - SeriesApp methods are now async, no need for threadpool - Subscribes to SeriesApp events for progress tracking - Exposes async methods using 'key' for all series identification - Adds simple in-memory caching for read operations """ def __init__( self, series_app: SeriesApp, progress_service: Optional[ProgressService] = None, websocket_service: Optional[WebSocketService] = None, ): self._app = series_app self._directory = series_app.directory_to_search self._progress_service = progress_service or get_progress_service() self._websocket_service = websocket_service or get_websocket_service() self._event_loop: Optional[asyncio.AbstractEventLoop] = None # Track scan progress for WebSocket updates self._scan_start_time: Optional[float] = None self._scan_directories_count: int = 0 self._scan_files_count: int = 0 self._scan_total_items: int = 0 self._is_scanning: bool = False self._scan_current_directory: str = "" # Lock to prevent concurrent rescans self._scan_lock = asyncio.Lock() # Subscribe to SeriesApp events # Note: Events library uses assignment (=), not += operator try: self._app.download_status = self._on_download_status self._app.scan_status = self._on_scan_status logger.info( "Subscribed to SeriesApp events", scan_status_handler=str(self._app.scan_status), series_app_id=id(self._app), ) except Exception as e: logger.exception("Failed to subscribe to SeriesApp events") raise AnimeServiceError("Initialization failed") from e def _on_download_status(self, args) -> None: """Handle download status events from SeriesApp. Events include both 'key' (primary identifier) and 'serie_folder' (metadata for display and filesystem operations). Args: args: DownloadStatusEventArgs from SeriesApp containing key, serie_folder, season, episode, status, and progress info """ try: # Get event loop - try running loop first, then stored loop loop = None try: loop = asyncio.get_running_loop() except RuntimeError: # No running loop in this thread - use stored loop loop = self._event_loop if not loop: logger.debug( "No event loop available for download status event", status=args.status ) return # Use item_id if available, otherwise fallback to constructing ID progress_id = ( args.item_id if args.item_id else ( f"download_{args.serie_folder}_" f"{args.season}_{args.episode}" ) ) # Map SeriesApp download events to progress service if args.status == "started": asyncio.run_coroutine_threadsafe( self._progress_service.start_progress( progress_id=progress_id, progress_type=ProgressType.DOWNLOAD, title=f"Downloading {args.serie_folder}", message=f"S{args.season:02d}E{args.episode:02d}", metadata=( {"item_id": args.item_id} if args.item_id else None ), ), loop ) elif args.status == "progress": # Build metadata with item_id and speed progress_metadata = {} if args.item_id: progress_metadata["item_id"] = args.item_id if args.mbper_sec is not None: progress_metadata["speed_mbps"] = round(args.mbper_sec, 2) if args.eta is not None: progress_metadata["eta"] = args.eta asyncio.run_coroutine_threadsafe( self._progress_service.update_progress( progress_id=progress_id, current=args.progress, total=100, message=args.message or "Downloading...", metadata=( progress_metadata if progress_metadata else None ), ), loop ) elif args.status == "completed": asyncio.run_coroutine_threadsafe( self._progress_service.complete_progress( progress_id=progress_id, message="Download completed", ), loop ) elif args.status == "failed": asyncio.run_coroutine_threadsafe( self._progress_service.fail_progress( progress_id=progress_id, error_message=args.message or str(args.error), ), loop ) except Exception as exc: # pylint: disable=broad-except logger.error( "Error handling download status event", error=str(exc) ) def _on_scan_status(self, args) -> None: """Handle scan status events from SeriesApp. Events include both 'key' (primary identifier) and 'folder' (metadata for display purposes). Also broadcasts via WebSocket for real-time UI updates. Args: args: ScanStatusEventArgs from SeriesApp containing key, folder, current, total, status, and progress info """ try: scan_id = "library_scan" logger.info( "Scan status event received", status=args.status, current=args.current, total=args.total, folder=args.folder, ) # Get event loop - try running loop first, then stored loop loop = None try: loop = asyncio.get_running_loop() logger.debug("Using running event loop for scan status") except RuntimeError: # No running loop in this thread - use stored loop loop = self._event_loop logger.debug( "Using stored event loop for scan status", has_loop=loop is not None ) if not loop: logger.warning( "No event loop available for scan status event", status=args.status ) return logger.info( "Processing scan status event", status=args.status, loop_id=id(loop), ) # Map SeriesApp scan events to progress service if args.status == "started": # Track scan start time and reset counters self._scan_start_time = time.time() self._scan_directories_count = 0 self._scan_files_count = 0 self._scan_total_items = args.total self._is_scanning = True self._scan_current_directory = "" asyncio.run_coroutine_threadsafe( self._progress_service.start_progress( progress_id=scan_id, progress_type=ProgressType.SCAN, title="Scanning anime library", message=args.message or "Initializing scan...", ), loop ) # Broadcast scan started via WebSocket with total items asyncio.run_coroutine_threadsafe( self._broadcast_scan_started_safe(total_items=args.total), loop ) elif args.status == "progress": # Update scan counters self._scan_directories_count = args.current self._scan_current_directory = args.folder or "" # Estimate files found (use current as proxy since detailed # file count isn't available from SerieScanner) asyncio.run_coroutine_threadsafe( self._progress_service.update_progress( progress_id=scan_id, current=args.current, total=args.total, message=args.message or f"Scanning: {args.folder}", ), loop ) # Broadcast scan progress via WebSocket asyncio.run_coroutine_threadsafe( self._broadcast_scan_progress_safe( directories_scanned=args.current, files_found=args.current, # Use folder count as proxy current_directory=args.folder or "", total_items=args.total, ), loop ) elif args.status == "completed": # Calculate elapsed time elapsed = 0.0 if self._scan_start_time: elapsed = time.time() - self._scan_start_time # Mark scan as complete self._is_scanning = False asyncio.run_coroutine_threadsafe( self._progress_service.complete_progress( progress_id=scan_id, message=args.message or "Scan completed", ), loop ) # Broadcast scan completed via WebSocket asyncio.run_coroutine_threadsafe( self._broadcast_scan_completed_safe( total_directories=args.total, total_files=args.total, # Use folder count as proxy elapsed_seconds=elapsed, ), loop ) elif args.status == "failed": self._is_scanning = False asyncio.run_coroutine_threadsafe( self._progress_service.fail_progress( progress_id=scan_id, error_message=args.message or str(args.error), ), loop ) elif args.status == "cancelled": self._is_scanning = False asyncio.run_coroutine_threadsafe( self._progress_service.fail_progress( progress_id=scan_id, error_message=args.message or "Scan cancelled", ), loop ) except Exception as exc: # pylint: disable=broad-except logger.error("Error handling scan status event: %s", exc) async def _broadcast_scan_started_safe(self, total_items: int = 0) -> None: """Safely broadcast scan started event via WebSocket. Wraps the WebSocket broadcast in try/except to ensure scan continues even if WebSocket fails. Args: total_items: Total number of items to scan """ try: logger.info( "Broadcasting scan_started via WebSocket", directory=self._directory, total_items=total_items, ) await self._websocket_service.broadcast_scan_started( directory=self._directory, total_items=total_items, ) logger.info("scan_started broadcast sent successfully") except Exception as exc: logger.warning( "Failed to broadcast scan_started via WebSocket", error=str(exc) ) async def _broadcast_scan_progress_safe( self, directories_scanned: int, files_found: int, current_directory: str, total_items: int = 0, ) -> None: """Safely broadcast scan progress event via WebSocket. Wraps the WebSocket broadcast in try/except to ensure scan continues even if WebSocket fails. Args: directories_scanned: Number of directories scanned so far files_found: Number of files found so far current_directory: Current directory being scanned total_items: Total number of items to scan """ try: await self._websocket_service.broadcast_scan_progress( directories_scanned=directories_scanned, files_found=files_found, current_directory=current_directory, total_items=total_items, ) except Exception as exc: logger.warning( "Failed to broadcast scan_progress via WebSocket", error=str(exc) ) async def _broadcast_scan_completed_safe( self, total_directories: int, total_files: int, elapsed_seconds: float, ) -> None: """Safely broadcast scan completed event via WebSocket. Wraps the WebSocket broadcast in try/except to ensure scan cleanup continues even if WebSocket fails. Args: total_directories: Total directories scanned total_files: Total files found elapsed_seconds: Time taken for the scan """ try: await self._websocket_service.broadcast_scan_completed( total_directories=total_directories, total_files=total_files, elapsed_seconds=elapsed_seconds, ) except Exception as exc: logger.warning( "Failed to broadcast scan_completed via WebSocket", error=str(exc) ) def get_scan_status(self) -> dict: """Get the current scan status. Returns: Dictionary with scan status information including: - is_scanning: Whether a scan is currently in progress - total_items: Total number of items to scan - directories_scanned: Number of directories scanned so far - current_directory: Current directory being scanned - directory: Root directory being scanned """ status = { "is_scanning": self._is_scanning, "total_items": self._scan_total_items, "directories_scanned": self._scan_directories_count, "current_directory": self._scan_current_directory, "directory": self._directory, } logger.debug( "Scan status requested", is_scanning=self._is_scanning, total_items=self._scan_total_items, directories_scanned=self._scan_directories_count, ) return status @lru_cache(maxsize=128) def _cached_list_missing(self) -> list[dict]: # Synchronous cached call - SeriesApp.series_list is populated # during initialization try: series = self._app.series_list # normalize to simple dicts result: list[dict] = [] for s in series: if hasattr(s, "to_dict"): result.append(s.to_dict()) else: result.append(s) # type: ignore return result except Exception: logger.exception("Failed to get missing episodes list") raise async def list_missing(self) -> list[dict]: """Return list of series with missing episodes. Each series dictionary includes 'key' as the primary identifier and 'folder' as metadata for display purposes. Returns: List of series dictionaries with 'key', 'name', 'site', 'folder', and 'episodeDict' fields """ try: # series_list is already populated, just access it return self._cached_list_missing() except AnimeServiceError: raise except Exception as exc: logger.exception("list_missing failed") raise AnimeServiceError("Failed to list missing series") from exc async def list_series_with_filters( self, filter_type: Optional[str] = None ) -> list[dict]: """Return all series with NFO metadata from database. Retrieves series from SeriesApp and enriches them with NFO metadata from the database. Supports filtering options like 'no_episodes'. Args: filter_type: Optional filter. Supported values: - "no_episodes": Only series with no downloaded episodes - None: All series Returns: List of series dictionaries with 'key', 'name', 'site', 'folder', 'episodeDict', and NFO metadata fields (has_nfo, nfo_created_at, nfo_updated_at, tmdb_id, tvdb_id, series_id) Raises: AnimeServiceError: If operation fails """ try: from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService # Get all series from SeriesApp if not hasattr(self._app, "list"): logger.warning("SeriesApp has no list attribute") return [] series = self._app.list.GetList() if not series: logger.info("No series found in SeriesApp") return [] # Build NFO metadata map and filter data from database nfo_map = {} series_with_no_episodes = set() async with get_db_session() as db: # Get all series NFO metadata using service layer db_series_list = await AnimeSeriesService.get_all(db) for db_series in db_series_list: nfo_created = ( db_series.nfo_created_at.isoformat() if db_series.nfo_created_at else None ) nfo_updated = ( db_series.nfo_updated_at.isoformat() if db_series.nfo_updated_at else None ) nfo_map[db_series.folder] = { "has_nfo": db_series.has_nfo or False, "nfo_created_at": nfo_created, "nfo_updated_at": nfo_updated, "tmdb_id": db_series.tmdb_id, "tvdb_id": db_series.tvdb_id, "series_id": db_series.id, } # If filter is "no_episodes", get series with no # downloaded episodes if filter_type == "no_episodes": # Use service method to get series with # undownloaded episodes series_no_downloads = ( await AnimeSeriesService .get_series_with_no_episodes(db) ) series_with_no_episodes = { s.folder for s in series_no_downloads } # Build result list with enriched metadata result_list = [] for serie in series: key = getattr(serie, "key", "") name = getattr(serie, "name", "") site = getattr(serie, "site", "") folder = getattr(serie, "folder", "") episode_dict = getattr(serie, "episodeDict", {}) or {} # Apply filter if specified if filter_type == "no_episodes": if folder not in series_with_no_episodes: continue # Get NFO data from map nfo_data = nfo_map.get(folder, {}) # Build enriched series dict series_dict = { "key": key, "name": name, "site": site, "folder": folder, "episodeDict": episode_dict, "has_nfo": nfo_data.get("has_nfo", False), "nfo_created_at": nfo_data.get("nfo_created_at"), "nfo_updated_at": nfo_data.get("nfo_updated_at"), "tmdb_id": nfo_data.get("tmdb_id"), "tvdb_id": nfo_data.get("tvdb_id"), "series_id": nfo_data.get("series_id"), } result_list.append(series_dict) logger.info( "Listed series with filters", total_count=len(result_list), filter_type=filter_type ) return result_list except AnimeServiceError: raise except Exception as exc: logger.exception("list_series_with_filters failed") raise AnimeServiceError( "Failed to list series with metadata" ) from exc async def search(self, query: str) -> list[dict]: """Search for series using underlying provider. Args: query: Search query string Returns: List of search results as dictionaries, each containing 'key' as the primary identifier and other metadata fields """ if not query: return [] try: # SeriesApp.search is now async result = await self._app.search(query) return result except Exception as exc: logger.exception("search failed") raise AnimeServiceError("Search failed") from exc async def rescan(self) -> None: """Trigger a re-scan of the anime library directory. Scans the filesystem for anime series and updates the series list. The SeriesApp handles progress tracking via events which are forwarded to the ProgressService through event handlers. After scanning, results are persisted to the database. All series are identified by their 'key' (provider identifier), with 'folder' stored as metadata. Note: Only one scan can run at a time. If a scan is already in progress, this method returns immediately without starting a new scan. """ # Check if a scan is already running (non-blocking) if self._scan_lock.locked(): logger.info("Rescan already in progress, ignoring request") return async with self._scan_lock: try: # Store event loop for event handlers self._event_loop = asyncio.get_running_loop() logger.info( "Rescan started, event loop stored", loop_id=id(self._event_loop), series_app_id=id(self._app), scan_handler=str(self._app.scan_status), ) # SeriesApp.rescan returns scanned series list scanned_series = await self._app.rescan() # Persist scan results to database if scanned_series: await self._save_scan_results_to_db(scanned_series) # Reload series from database to ensure consistency await self._load_series_from_db() # invalidate cache try: self._cached_list_missing.cache_clear() except Exception: # pylint: disable=broad-except pass except Exception as exc: # pylint: disable=broad-except logger.exception("rescan failed") raise AnimeServiceError("Rescan failed") from exc async def sync_single_series_after_scan(self, series_key: str) -> None: """Persist a single scanned series and refresh cached state. Reuses the same save/reload/cache invalidation flow as `rescan` to keep the database, in-memory list, and UI in sync. Args: series_key: Series key to persist and refresh. """ # Get serie from scanner's keyDict, not series_app.list.keyDict # scan_single_series updates serie_scanner.keyDict with episodeDict if not hasattr(self._app, "serie_scanner") or not hasattr(self._app.serie_scanner, "keyDict"): logger.warning( "Serie scanner not available for single-series sync: %s", series_key, ) return serie = self._app.serie_scanner.keyDict.get(series_key) if not serie: logger.warning( "Series not found in scanner keyDict for single-series sync: %s", series_key, ) return total_episodes = sum(len(eps) for eps in (serie.episodeDict or {}).values()) logger.info( "Syncing series %s with %d missing episodes. episodeDict: %s", series_key, total_episodes, serie.episodeDict ) await self._save_scan_results_to_db([serie]) await self._load_series_from_db() try: self._cached_list_missing.cache_clear() except Exception: # pylint: disable=broad-except pass try: await self._broadcast_series_updated(series_key) except Exception as exc: # pylint: disable=broad-except logger.warning( "Failed to broadcast series update for %s: %s", series_key, exc, ) async def _save_scan_results_to_db(self, series_list: list) -> int: """ Save scan results to the database. Creates or updates series records in the database based on scan results. Args: series_list: List of Serie objects from scan Returns: Number of series saved/updated """ from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService saved_count = 0 async with get_db_session() as db: for serie in series_list: try: # Check if series already exists existing = await AnimeSeriesService.get_by_key( db, serie.key ) total_episodes = sum(len(eps) for eps in (serie.episodeDict or {}).values()) if existing: # Update existing series logger.info( "Updating existing series %s with %d episodes. episodeDict: %s", serie.key, total_episodes, serie.episodeDict ) await self._update_series_in_db( serie, existing, db ) else: # Create new series logger.info( "Creating new series %s with %d episodes. episodeDict: %s", serie.key, total_episodes, serie.episodeDict ) await self._create_series_in_db(serie, db) saved_count += 1 except Exception as e: # pylint: disable=broad-except logger.warning( "Failed to save series to database: %s (key=%s) - %s", serie.name, serie.key, str(e) ) logger.info( "Saved %d series to database from scan results", saved_count ) return saved_count async def _create_series_in_db(self, serie, db) -> None: """Create a new series in the database.""" from src.server.database.service import AnimeSeriesService, EpisodeService anime_series = await AnimeSeriesService.create( db=db, key=serie.key, name=serie.name, site=serie.site, folder=serie.folder, year=serie.year if hasattr(serie, 'year') else None, ) # Create Episode records if serie.episodeDict: for season, episode_numbers in serie.episodeDict.items(): for ep_num in episode_numbers: await EpisodeService.create( db=db, series_id=anime_series.id, season=season, episode_number=ep_num, ) logger.debug( "Created series in database: %s (key=%s, year=%s)", serie.name, serie.key, serie.year if hasattr(serie, 'year') else None ) async def _update_series_in_db(self, serie, existing, db) -> None: """Update an existing series in the database. Syncs the database episodes with the current missing episodes from scan. - Adds new missing episodes that are not in the database - Removes episodes from database that are no longer missing (i.e., the file has been added to the filesystem) """ from src.server.database.service import AnimeSeriesService, EpisodeService # Get existing episodes from database existing_episodes = await EpisodeService.get_by_series(db, existing.id) # Build dict of existing episodes: {season: {ep_num: episode_id}} existing_dict: dict[int, dict[int, int]] = {} for ep in existing_episodes: if ep.season not in existing_dict: existing_dict[ep.season] = {} existing_dict[ep.season][ep.episode_number] = ep.id # Get new missing episodes from scan new_dict = serie.episodeDict or {} # Build set of new missing episodes for quick lookup new_missing_set: set[tuple[int, int]] = set() for season, episode_numbers in new_dict.items(): for ep_num in episode_numbers: new_missing_set.add((season, ep_num)) # Add new missing episodes that are not in the database for season, episode_numbers in new_dict.items(): existing_season_eps = existing_dict.get(season, {}) for ep_num in episode_numbers: if ep_num not in existing_season_eps: await EpisodeService.create( db=db, series_id=existing.id, season=season, episode_number=ep_num, ) logger.debug( "Added missing episode to database: %s S%02dE%02d", serie.key, season, ep_num ) # Remove episodes from database that are no longer missing # (i.e., the episode file now exists on the filesystem) for season, eps_dict in existing_dict.items(): for ep_num, episode_id in eps_dict.items(): if (season, ep_num) not in new_missing_set: await EpisodeService.delete(db, episode_id) logger.info( "Removed episode from database (no longer missing): " "%s S%02dE%02d", serie.key, season, ep_num ) # Update folder if changed if existing.folder != serie.folder: await AnimeSeriesService.update( db, existing.id, folder=serie.folder ) logger.debug( "Updated series in database: %s (key=%s)", serie.name, serie.key ) async def _load_series_from_db(self) -> None: """ Load series from the database into SeriesApp. This method is called during initialization and after rescans to ensure the in-memory series list is in sync with the database. """ from src.core.entities.series import Serie from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService async with get_db_session() as db: anime_series_list = await AnimeSeriesService.get_all( db, with_episodes=True ) # Convert to Serie objects series_list = [] for anime_series in anime_series_list: # Build episode_dict from episodes relationship episode_dict: dict[int, list[int]] = {} if anime_series.episodes: for episode in anime_series.episodes: season = episode.season if season not in episode_dict: episode_dict[season] = [] episode_dict[season].append(episode.episode_number) # Sort episode numbers for season in episode_dict: episode_dict[season].sort() serie = Serie( key=anime_series.key, name=anime_series.name, site=anime_series.site, folder=anime_series.folder, episodeDict=episode_dict ) series_list.append(serie) # Load into SeriesApp self._app.load_series_from_list(series_list) async def sync_episodes_to_db(self, series_key: str) -> int: """ Sync episodes from in-memory SeriesApp to database for a specific series. This method reads the episodeDict from the in-memory series (populated by scanner) and syncs it to the database. Called after scanning for missing episodes. Args: series_key: The series key to sync episodes for Returns: Number of episodes synced to database """ from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService, EpisodeService # Get the serie from in-memory cache if not hasattr(self._app, 'list') or not hasattr(self._app.list, 'keyDict'): logger.warning(f"Series list not available for episode sync: {series_key}") return 0 serie = self._app.list.keyDict.get(series_key) if not serie: logger.warning(f"Series not found in memory for episode sync: {series_key}") return 0 episodes_added = 0 async with get_db_session() as db: # Get series from database series_db = await AnimeSeriesService.get_by_key(db, series_key) if not series_db: logger.warning(f"Series not found in database: {series_key}") return 0 # Get existing episodes from database existing_episodes = await EpisodeService.get_by_series(db, series_db.id) # Build dict of existing episodes: {season: {ep_num: episode_id}} existing_dict: dict[int, dict[int, int]] = {} for ep in existing_episodes: if ep.season not in existing_dict: existing_dict[ep.season] = {} existing_dict[ep.season][ep.episode_number] = ep.id # Get new missing episodes from in-memory serie new_dict = serie.episodeDict or {} # Add new missing episodes that are not in the database for season, episode_numbers in new_dict.items(): existing_season_eps = existing_dict.get(season, {}) for ep_num in episode_numbers: if ep_num not in existing_season_eps: await EpisodeService.create( db=db, series_id=series_db.id, season=season, episode_number=ep_num, ) episodes_added += 1 logger.debug( f"Added missing episode to database: {series_key} S{season:02d}E{ep_num:02d}" ) if episodes_added > 0: logger.info( f"Synced {episodes_added} missing episodes to database for {series_key}" ) # Broadcast update to frontend to refresh series list try: await self._broadcast_series_updated(series_key) except Exception as e: logger.warning(f"Failed to broadcast series update: {e}") return episodes_added async def _broadcast_series_updated(self, series_key: str) -> None: """Broadcast series update event to WebSocket clients with full data.""" if not self._websocket_service: return # Get updated series data to send to frontend series_data = None if hasattr(self._app, 'list') and hasattr(self._app.list, 'keyDict'): serie = self._app.list.keyDict.get(series_key) if serie: # Convert episode dict keys to strings for JSON missing_episodes = {str(k): v for k, v in (serie.episodeDict or {}).items()} total_missing = sum(len(eps) for eps in missing_episodes.values()) # Fetch NFO metadata from database has_nfo = False nfo_created_at = None nfo_updated_at = None tmdb_id = None tvdb_id = None try: from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService async with get_db_session() as db: db_series = await AnimeSeriesService.get_by_key(db, series_key) if db_series: has_nfo = db_series.has_nfo or False nfo_created_at = ( db_series.nfo_created_at.isoformat() if db_series.nfo_created_at else None ) nfo_updated_at = ( db_series.nfo_updated_at.isoformat() if db_series.nfo_updated_at else None ) tmdb_id = db_series.tmdb_id tvdb_id = db_series.tvdb_id except Exception as e: logger.warning( "Could not fetch NFO data for %s: %s", series_key, str(e) ) series_data = { "key": serie.key, "name": serie.name, "folder": serie.folder, "site": serie.site, "missing_episodes": missing_episodes, "has_missing": total_missing > 0, "has_nfo": has_nfo, "nfo_created_at": nfo_created_at, "nfo_updated_at": nfo_updated_at, "tmdb_id": tmdb_id, "tvdb_id": tvdb_id, } payload = { "type": "series_updated", "key": series_key, "data": series_data, "message": "Series episodes updated", "timestamp": datetime.now(timezone.utc).isoformat() } logger.info( "Broadcasting series update for %s with %d missing episodes", series_key, sum(len(eps) for eps in (series_data.get("missing_episodes", {}).values())) if series_data else 0 ) await self._websocket_service.broadcast(payload) async def add_series_to_db( self, serie, db ): """ Add a series to the database if it doesn't already exist. Uses serie.key for identification. Creates a new AnimeSeries record in the database if it doesn't already exist. Args: serie: The Serie instance to add db: Database session for async operations Returns: Created AnimeSeries instance, or None if already exists """ from src.server.database.service import AnimeSeriesService, EpisodeService # Check if series already exists in DB existing = await AnimeSeriesService.get_by_key(db, serie.key) if existing: logger.debug( "Series already exists in database: %s (key=%s)", serie.name, serie.key ) return None # Create new series in database anime_series = await AnimeSeriesService.create( db=db, key=serie.key, name=serie.name, site=serie.site, folder=serie.folder, year=serie.year if hasattr(serie, 'year') else None, ) # Create Episode records for each episode in episodeDict if serie.episodeDict: for season, episode_numbers in serie.episodeDict.items(): for episode_number in episode_numbers: await EpisodeService.create( db=db, series_id=anime_series.id, season=season, episode_number=episode_number, ) logger.info( "Added series to database: %s (key=%s, year=%s)", serie.name, serie.key, serie.year if hasattr(serie, 'year') else None ) return anime_series async def contains_in_db(self, key: str, db) -> bool: """ Check if a series with the given key exists in the database. Args: key: The unique provider identifier for the series db: Database session for async operations Returns: True if the series exists in the database """ from src.server.database.service import AnimeSeriesService existing = await AnimeSeriesService.get_by_key(db, key) return existing is not None async def download( self, serie_folder: str, season: int, episode: int, key: str, item_id: Optional[str] = None, ) -> bool: """Start a download for a specific episode. The SeriesApp handles progress tracking via events which are forwarded to the ProgressService through event handlers. Args: serie_folder: Serie folder name (metadata only, used for filesystem operations and display) season: Season number episode: Episode number key: Serie unique identifier (primary identifier for series lookup, provider-assigned) item_id: Optional download queue item ID for tracking Returns: True on success Raises: AnimeServiceError: If download fails InterruptedError: If download was cancelled Note: The 'key' parameter is the primary identifier used for all series lookups. The 'serie_folder' is only used for filesystem path construction and display purposes. """ try: # Store event loop for event handlers self._event_loop = asyncio.get_running_loop() # SeriesApp.download is now async and handles events internally return await self._app.download( serie_folder=serie_folder, season=season, episode=episode, key=key, item_id=item_id, ) except InterruptedError: # Download was cancelled - re-raise for proper handling logger.info("Download cancelled, propagating cancellation") raise except Exception as exc: logger.exception("download failed") raise AnimeServiceError("Download failed") from exc async def update_nfo_status( self, key: str, has_nfo: bool, tmdb_id: Optional[int] = None, tvdb_id: Optional[int] = None, db=None ) -> None: """Update NFO status for a series in the database. Args: key: Serie unique identifier has_nfo: Whether tvshow.nfo exists tmdb_id: Optional TMDB ID tvdb_id: Optional TVDB ID db: Optional database session (will create if not provided) Raises: AnimeServiceError: If update fails """ from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService try: # Get or create database session if db is None: async with get_db_session() as db: # Find series by key using service layer series = await AnimeSeriesService.get_by_key(db, key) if not series: logger.warning( "Series not found in database for NFO update", key=key ) return # Prepare update fields now = datetime.now(timezone.utc) update_fields = {"has_nfo": has_nfo} if has_nfo: if series.nfo_created_at is None: update_fields["nfo_created_at"] = now update_fields["nfo_updated_at"] = now if tmdb_id is not None: update_fields["tmdb_id"] = tmdb_id if tvdb_id is not None: update_fields["tvdb_id"] = tvdb_id # Use service layer for update await AnimeSeriesService.update(db, series.id, **update_fields) await db.commit() logger.info( "Updated NFO status in database", key=key, has_nfo=has_nfo, tmdb_id=tmdb_id, tvdb_id=tvdb_id ) else: # Use provided session series = await AnimeSeriesService.get_by_key(db, key) if not series: logger.warning( "Series not found in database for NFO update", key=key ) return # Update fields directly on the ORM object now = datetime.now(timezone.utc) series.has_nfo = has_nfo if has_nfo: if series.nfo_created_at is None: series.nfo_created_at = now series.nfo_updated_at = now if tmdb_id is not None: series.tmdb_id = tmdb_id if tvdb_id is not None: series.tvdb_id = tvdb_id await db.commit() logger.info( "Updated NFO status in database", key=key, has_nfo=has_nfo, tmdb_id=tmdb_id, tvdb_id=tvdb_id ) except Exception as exc: logger.exception( "Failed to update NFO status", key=key, has_nfo=has_nfo ) raise AnimeServiceError("NFO status update failed") from exc async def get_series_without_nfo(self, db=None) -> list[dict]: """Get list of series that don't have NFO files. Args: db: Optional database session Returns: List of series dictionaries with keys: - key: Series unique identifier - name: Series name - folder: Series folder name - has_nfo: Always False - tmdb_id: TMDB ID if available - tvdb_id: TVDB ID if available Raises: AnimeServiceError: If query fails """ from sqlalchemy import select from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService try: # Get or create database session if db is None: async with get_db_session() as db: # Query series without NFO using service layer series_list = await AnimeSeriesService.get_series_without_nfo(db) result = [] for series in series_list: result.append({ "key": series.key, "name": series.name, "folder": series.folder, "has_nfo": False, "tmdb_id": series.tmdb_id, "tvdb_id": series.tvdb_id, "nfo_created_at": None, "nfo_updated_at": None }) logger.info( "Retrieved series without NFO", count=len(result) ) return result else: # Use provided session series_list = await AnimeSeriesService.get_series_without_nfo(db) result = [] for series in series_list: result.append({ "key": series.key, "name": series.name, "folder": series.folder, "has_nfo": False, "tmdb_id": series.tmdb_id, "tvdb_id": series.tvdb_id, "nfo_created_at": None, "nfo_updated_at": None }) logger.info( "Retrieved series without NFO", count=len(result) ) return result except Exception as exc: logger.exception("Failed to query series without NFO") raise AnimeServiceError( "Query for series without NFO failed" ) from exc async def get_nfo_statistics(self, db=None) -> dict: """Get NFO statistics for all series. Args: db: Optional database session Returns: Dictionary with statistics: - total: Total series count - with_nfo: Series with NFO files - without_nfo: Series without NFO files - with_tmdb_id: Series with TMDB ID - with_tvdb_id: Series with TVDB ID Raises: AnimeServiceError: If query fails """ from sqlalchemy import func, select from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService try: # Get or create database session if db is None: async with get_db_session() as db: # Use service layer count methods total = await AnimeSeriesService.count_all(db) with_nfo = await AnimeSeriesService.count_with_nfo(db) with_tmdb = await AnimeSeriesService.count_with_tmdb_id(db) with_tvdb = await AnimeSeriesService.count_with_tvdb_id(db) stats = { "total": total, "with_nfo": with_nfo, "without_nfo": total - with_nfo, "with_tmdb_id": with_tmdb, "with_tvdb_id": with_tvdb } logger.info("Retrieved NFO statistics", **stats) return stats else: # Use provided session and service layer count methods total = await AnimeSeriesService.count_all(db) with_nfo = await AnimeSeriesService.count_with_nfo(db) with_tmdb = await AnimeSeriesService.count_with_tmdb_id(db) with_tvdb = await AnimeSeriesService.count_with_tvdb_id(db) stats = { "total": total, "with_nfo": with_nfo, "without_nfo": total - with_nfo, "with_tmdb_id": with_tmdb, "with_tvdb_id": with_tvdb } logger.info("Retrieved NFO statistics", **stats) return stats except Exception as exc: logger.exception("Failed to get NFO statistics") raise AnimeServiceError("NFO statistics query failed") from exc def get_anime_service(series_app: SeriesApp) -> AnimeService: """Factory used for creating AnimeService with a SeriesApp instance.""" return AnimeService(series_app) async def sync_series_from_data_files( anime_directory: str, log_instance=None # pylint: disable=unused-argument ) -> int: """ Sync series from data files to the database. Scans the anime directory for data files and adds any new series to the database. Existing series are skipped (no duplicates). This function is typically called during application startup to ensure series metadata stored in filesystem data files is available in the database. Args: anime_directory: Path to the anime directory with data files log_instance: Optional logger instance (unused, kept for API compatibility). This function always uses structlog internally. Returns: Number of new series added to the database """ # Always use structlog for structured logging with keyword arguments log = structlog.get_logger(__name__) try: from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService, EpisodeService log.info( "Starting data file to database sync", directory=anime_directory ) # Get all series from data files using SeriesApp series_app = SeriesApp(anime_directory) all_series = await asyncio.to_thread( series_app.get_all_series_from_data_files ) if not all_series: log.info("No series found in data files to sync") return 0 log.info( "Found series in data files, syncing to database", count=len(all_series) ) async with get_db_session() as db: added_count = 0 skipped_count = 0 for serie in all_series: # Handle series with empty name - use folder as fallback if not serie.name or not serie.name.strip(): if serie.folder and serie.folder.strip(): serie.name = serie.folder.strip() log.debug( "Using folder as name fallback", key=serie.key, folder=serie.folder ) else: log.warning( "Skipping series with empty name and folder", key=serie.key ) skipped_count += 1 continue try: # Check if series already exists in DB existing = await AnimeSeriesService.get_by_key(db, serie.key) if existing: log.debug( "Series already exists in database", name=serie.name, key=serie.key ) continue # Create new series in database anime_series = await AnimeSeriesService.create( db=db, key=serie.key, name=serie.name, site=serie.site, folder=serie.folder, ) # Create Episode records for each episode in episodeDict if serie.episodeDict: for season, episode_numbers in serie.episodeDict.items(): for episode_number in episode_numbers: await EpisodeService.create( db=db, series_id=anime_series.id, season=season, episode_number=episode_number, ) added_count += 1 log.debug( "Added series to database", name=serie.name, key=serie.key ) except Exception as e: # pylint: disable=broad-except log.warning( "Failed to add series to database", key=serie.key, name=serie.name, error=str(e) ) skipped_count += 1 log.info( "Data file sync complete", added=added_count, skipped=len(all_series) - added_count ) return added_count except Exception as e: # pylint: disable=broad-except log.warning( "Failed to sync series to database", error=str(e), exc_info=True ) return 0