""" SerieScanner - Scans directories for anime series and missing episodes. This module provides functionality to scan anime directories, identify missing episodes, and report progress through callback interfaces. Note: This module is pure domain logic. Database operations are handled by the service layer (AnimeService). """ from __future__ import annotations import logging import os import re import traceback import uuid from typing import Iterable, Iterator, Optional from events import Events from src.core.entities.series import Serie from src.core.exceptions.Exceptions import MatchNotFoundError, NoKeyFoundException from src.core.providers.base_provider import Loader logger = logging.getLogger(__name__) error_logger = logging.getLogger("error") no_key_found_logger = logging.getLogger("series.nokey") class SerieScanner: """ Scans directories for anime series and identifies missing episodes. Supports progress callbacks for real-time scanning updates. Note: This class is pure domain logic. Database operations are handled by the service layer (AnimeService). Scan results are stored in keyDict and can be retrieved after scanning. Example: scanner = SerieScanner("/path/to/anime", loader) scanner.scan() # Results are in scanner.keyDict """ def __init__( self, basePath: str, loader: Loader, ) -> None: """ Initialize the SerieScanner. Args: basePath: Base directory containing anime series loader: Loader instance for fetching series information callback_manager: Optional callback manager for progress updates Raises: ValueError: If basePath is invalid or doesn't exist """ # Validate basePath to prevent directory traversal attacks if not basePath or not basePath.strip(): raise ValueError("Base path cannot be empty") # Resolve to absolute path and validate it exists abs_path = os.path.abspath(basePath) if not os.path.exists(abs_path): raise ValueError(f"Base path does not exist: {abs_path}") if not os.path.isdir(abs_path): raise ValueError(f"Base path is not a directory: {abs_path}") self.directory: str = abs_path self.keyDict: dict[str, Serie] = {} self.loader: Loader = loader self._current_operation_id: Optional[str] = None self.events = Events() self.events.on_progress = [] self.events.on_error = [] self.events.on_completion = [] logger.info("Initialized SerieScanner with base path: %s", abs_path) def _safe_call_event(self, event_handler, data: dict) -> None: """Safely call an event handler if it exists. Args: event_handler: Event handler attribute (e.g., self.events.on_progress) data: Data dictionary to pass to the event handler """ if event_handler: try: # Event handlers are stored as lists, iterate over them for handler in event_handler: handler(data) except Exception as e: logger.error("Error calling event handler: %s", e, exc_info=True) def subscribe_on_progress(self, handler): """ Subscribe a handler to an event. Args: handler: Callable to handle the event """ if handler not in self.events.on_progress: self.events.on_progress.append(handler) def unsubscribe_on_progress(self, handler): """ Unsubscribe a handler from an event. Args: handler: Callable to remove """ if handler in self.events.on_progress: self.events.on_progress.remove(handler) def _extract_year_from_folder_name(self, folder_name: str) -> int | None: """Extract year from folder name if present. Looks for year in format "(YYYY)" at the end of folder name. Args: folder_name: The folder name to check Returns: int or None: Year if found, None otherwise Example: >>> _extract_year_from_folder_name("Dororo (2025)") 2025 >>> _extract_year_from_folder_name("Dororo") None """ if not folder_name: return None # Look for year in format (YYYY) - typically at end of name match = re.search(r'\((\d{4})\)', folder_name) if match: try: year = int(match.group(1)) # Validate year is reasonable (between 1900 and 2100) if 1900 <= year <= 2100: logger.debug( "Extracted year from folder name: %s -> %d", folder_name, year ) return year except ValueError: pass return None def subscribe_on_error(self, handler): """ Subscribe a handler to an event. Args: handler: Callable to handle the event """ if handler not in self.events.on_error: self.events.on_error.append(handler) def unsubscribe_on_error(self, handler): """ Unsubscribe a handler from an event. Args: handler: Callable to remove """ if handler in self.events.on_error: self.events.on_error.remove(handler) def subscribe_on_completion(self, handler): """ Subscribe a handler to an event. Args: handler: Callable to handle the event """ if handler not in self.events.on_completion: self.events.on_completion.append(handler) def unsubscribe_on_completion(self, handler): """ Unsubscribe a handler from an event. Args: handler: Callable to remove """ if handler in self.events.on_completion: self.events.on_completion.remove(handler) def reinit(self) -> None: """Reinitialize the series dictionary (keyed by serie.key).""" self.keyDict: dict[str, Serie] = {} def get_total_to_scan(self) -> int: """Get the total number of folders to scan. Returns: Total count of folders with MP4 files """ result = self.__find_mp4_files() return sum(1 for _ in result) def scan(self) -> None: """ Scan directories for anime series and missing episodes. Results are stored in self.keyDict and can be retrieved after scanning. Data files are also saved to disk for persistence. Raises: Exception: If scan fails critically """ # Generate unique operation ID self._current_operation_id = str(uuid.uuid4()) logger.info("Starting scan for missing episodes") # Notify scan starting self._safe_call_event( self.events.on_progress, { "operation_id": self._current_operation_id, "phase": "STARTING", "current": 0, "total": 0, "percentage": 0.0, "message": "Initializing scan" } ) try: # Get total items to process total_to_scan = self.get_total_to_scan() logger.info("Total folders to scan: %d", total_to_scan) # The scanner enumerates folders with mp4 files, loads existing # metadata, calculates the missing episodes via the provider, and # persists the refreshed metadata while emitting progress events. result = self.__find_mp4_files() counter = 0 for folder, mp4_files in result: try: counter += 1 # Calculate progress if total_to_scan > 0: percentage = (counter / total_to_scan) * 100 else: percentage = 0.0 # Notify progress self._safe_call_event( self.events.on_progress, { "operation_id": self._current_operation_id, "phase": "IN_PROGRESS", "current": counter, "total": total_to_scan, "percentage": percentage, "message": f"Scanning: {folder}", "details": f"Found {len(mp4_files)} episodes" } ) serie = self.__read_data_from_file(folder) if ( serie is not None and serie.key and serie.key.strip() ): # Try to extract year from folder name first if not hasattr(serie, 'year') or not serie.year: year_from_folder = self._extract_year_from_folder_name(folder) if year_from_folder: serie.year = year_from_folder logger.info( "Using year from folder name: %s (year=%d)", folder, year_from_folder ) else: # If not in folder name, fetch from provider try: serie.year = self.loader.get_year(serie.key) if serie.year: logger.info( "Fetched year from provider: %s (year=%d)", serie.key, serie.year ) except Exception as e: logger.warning( "Could not fetch year for %s: %s", serie.key, str(e) ) # Delegate the provider to compare local files with # remote metadata, yielding missing episodes per # season. Results are saved back to disk so that both # CLI and API consumers see consistent state. missing_episodes, _site = ( self.__get_missing_episodes_and_season( serie.key, mp4_files ) ) serie.episodeDict = missing_episodes serie.folder = folder data_path = os.path.join( self.directory, folder, 'data' ) serie.save_to_file(data_path) # Store by key (primary identifier), not folder if serie.key in self.keyDict: logger.error( "Duplicate series found with key '%s' " "(folder: '%s')", serie.key, folder ) else: self.keyDict[serie.key] = serie logger.debug( "Stored series with key '%s' (folder: '%s')", serie.key, folder ) no_key_found_logger.info( "Saved Serie: '%s'", str(serie) ) except NoKeyFoundException as nkfe: # Log error and notify via callback error_msg = f"Error processing folder '{folder}': {nkfe}" logger.error(error_msg) self._safe_call_event( self.events.on_error, { "operation_id": self._current_operation_id, "error": nkfe, "message": error_msg, "recoverable": True, "metadata": {"folder": folder, "key": None} } ) except Exception as e: # Log error and notify via callback error_msg = ( f"Folder: '{folder}' - " f"Unexpected error: {e}" ) error_logger.error( "%s\n%s", error_msg, traceback.format_exc() ) self._safe_call_event( self.events.on_error, { "operation_id": self._current_operation_id, "error": e, "message": error_msg, "recoverable": True, "metadata": {"folder": folder, "key": None} } ) continue # Notify scan completion self._safe_call_event( self.events.on_completion, { "operation_id": self._current_operation_id, "success": True, "message": f"Scan completed. Processed {counter} folders.", "statistics": { "total_folders": counter, "series_found": len(self.keyDict) } } ) logger.info( "Scan completed. Processed %d folders, found %d series", counter, len(self.keyDict) ) except Exception as e: # Critical error - notify and re-raise error_msg = f"Critical scan error: {e}" logger.error("%s\n%s", error_msg, traceback.format_exc()) self._safe_call_event( self.events.on_error, { "operation_id": self._current_operation_id, "error": e, "message": error_msg, "recoverable": False } ) self._safe_call_event( self.events.on_completion, { "operation_id": self._current_operation_id, "success": False, "message": error_msg } ) raise def __find_mp4_files(self) -> Iterator[tuple[str, list[str]]]: """Find all .mp4 files in the directory structure.""" logger.info("Scanning for .mp4 files") for anime_name in os.listdir(self.directory): anime_path = os.path.join(self.directory, anime_name) if os.path.isdir(anime_path): mp4_files: list[str] = [] has_files = False for root, _, files in os.walk(anime_path): for file in files: if file.endswith(".mp4"): mp4_files.append(os.path.join(root, file)) has_files = True yield anime_name, mp4_files if has_files else [] def __read_data_from_file(self, folder_name: str) -> Optional[Serie]: """Read serie data from file or key file. Args: folder_name: Filesystem folder name (used only to locate data files) Returns: Serie object with valid key if found, None otherwise Note: The returned Serie will have its 'key' as the primary identifier. The 'folder' field is metadata only. """ folder_path = os.path.join(self.directory, folder_name) key = None key_file = os.path.join(folder_path, 'key') serie_file = os.path.join(folder_path, 'data') if os.path.exists(key_file): with open(key_file, 'r', encoding='utf-8') as file: key = file.read().strip() logger.info( "Key found for folder '%s': %s", folder_name, key ) return Serie(key, "", "aniworld.to", folder_name, dict()) if os.path.exists(serie_file): with open(serie_file, "rb") as file: logger.info( "load serie_file from '%s': %s", folder_name, serie_file ) return Serie.load_from_file(serie_file) return None def __get_episode_and_season(self, filename: str) -> tuple[int, int]: """Extract season and episode numbers from filename. Args: filename: Filename to parse Returns: Tuple of (season, episode) as integers Raises: MatchNotFoundError: If pattern not found """ pattern = r'S(\d+)E(\d+)' match = re.search(pattern, filename) if match: season = match.group(1) episode = match.group(2) logger.debug( "Extracted season %s, episode %s from '%s'", season, episode, filename ) return int(season), int(episode) else: logger.error( "Failed to find season/episode pattern in '%s'", filename ) raise MatchNotFoundError( "Season and episode pattern not found in the filename." ) def __get_episodes_and_seasons( self, mp4_files: Iterable[str] ) -> dict[int, list[int]]: """Get episodes grouped by season from mp4 files. Args: mp4_files: List of MP4 filenames Returns: Dictionary mapping season to list of episode numbers """ episodes_dict: dict[int, list[int]] = {} for file in mp4_files: season, episode = self.__get_episode_and_season(file) if season in episodes_dict: episodes_dict[season].append(episode) else: episodes_dict[season] = [episode] return episodes_dict def __get_missing_episodes_and_season( self, key: str, mp4_files: Iterable[str] ) -> tuple[dict[int, list[int]], str]: """Get missing episodes for a serie. Args: key: Series key mp4_files: List of MP4 filenames Returns: Tuple of (episodes_dict, site_name) """ # key season , value count of episodes expected_dict = self.loader.get_season_episode_count(key) filedict = self.__get_episodes_and_seasons(mp4_files) episodes_dict: dict[int, list[int]] = {} for season, expected_count in expected_dict.items(): existing_episodes = filedict.get(season, []) missing_episodes = [ ep for ep in range(1, expected_count + 1) if ep not in existing_episodes and self.loader.is_language(season, ep, key) ] if missing_episodes: episodes_dict[season] = missing_episodes return episodes_dict, "aniworld.to" def scan_single_series( self, key: str, folder: str, ) -> dict[int, list[int]]: """ Scan a single series for missing episodes. This method performs a targeted scan for only the specified series, without triggering a full library rescan. It fetches available episodes from the provider and compares with local files. Args: key: The unique provider key for the series folder: The filesystem folder name where the series is stored Returns: dict[int, list[int]]: Dictionary mapping season numbers to lists of missing episode numbers. Empty dict if no missing episodes. Raises: ValueError: If key or folder is empty Example: >>> scanner = SerieScanner("/path/to/anime", loader) >>> missing = scanner.scan_single_series( ... "attack-on-titan", ... "Attack on Titan" ... ) >>> print(missing) {1: [5, 6, 7], 2: [1, 2]} """ if not key or not key.strip(): raise ValueError("Series key cannot be empty") if not folder or not folder.strip(): raise ValueError("Series folder cannot be empty") logger.info( "Starting targeted scan for series: %s (folder: %s)", key, folder ) # Generate unique operation ID for this targeted scan operation_id = str(uuid.uuid4()) # Notify scan starting self._safe_call_event( self.events.on_progress, { "operation_id": operation_id, "phase": "STARTING", "current": 0, "total": 1, "percentage": 0.0, "message": f"Scanning series: {folder}", "details": f"Key: {key}" } ) try: # Get the folder path folder_path = os.path.join(self.directory, folder) # Check if folder exists if not os.path.isdir(folder_path): logger.info( "Series folder does not exist yet: %s - " "will scan for available episodes from provider", folder_path ) mp4_files: list[str] = [] else: # Find existing MP4 files in the folder mp4_files = [] for root, _, files in os.walk(folder_path): for file in files: if file.endswith(".mp4"): mp4_files.append(os.path.join(root, file)) logger.debug( "Found %d existing MP4 files in folder %s", len(mp4_files), folder ) # Get missing episodes from provider missing_episodes, site = self.__get_missing_episodes_and_season( key, mp4_files ) # Update progress self._safe_call_event( self.events.on_progress, { "operation_id": operation_id, "phase": "IN_PROGRESS", "current": 1, "total": 1, "percentage": 100.0, "message": f"Scanned: {folder}", "details": f"Found {sum(len(eps) for eps in missing_episodes.values())} missing episodes" } ) # Create or update Serie in keyDict if key in self.keyDict: # Update existing serie self.keyDict[key].episodeDict = missing_episodes logger.debug( "Updated existing series %s with %d missing episodes", key, sum(len(eps) for eps in missing_episodes.values()) ) else: # Try to extract year from folder name first year = self._extract_year_from_folder_name(folder) if year: logger.info( "Using year from folder name: %s (year=%d)", folder, year ) else: # If not in folder name, fetch from provider try: year = self.loader.get_year(key) if year: logger.info( "Fetched year from provider: %s (year=%d)", key, year ) except Exception as e: logger.warning( "Could not fetch year for %s: %s", key, str(e) ) # Create new serie entry serie = Serie( key=key, name="", # Will be populated by caller if needed site=site, folder=folder, episodeDict=missing_episodes, year=year ) self.keyDict[key] = serie logger.debug( "Created new series entry for %s with %d missing episodes (year=%s)", key, sum(len(eps) for eps in missing_episodes.values()), year ) # Notify completion self._safe_call_event( self.events.on_completion, { "operation_id": operation_id, "success": True, "message": f"Scan completed for {folder}", "statistics": { "missing_episodes": sum( len(eps) for eps in missing_episodes.values() ), "seasons_with_missing": len(missing_episodes) } } ) logger.info( "Targeted scan completed for %s: %d missing episodes across %d seasons", key, sum(len(eps) for eps in missing_episodes.values()), len(missing_episodes) ) return missing_episodes except Exception as e: error_msg = f"Failed to scan series {key}: {e}" logger.error(error_msg, exc_info=True) # Notify error self._safe_call_event( self.events.on_error, { "operation_id": operation_id, "error": e, "message": error_msg, "recoverable": True, "metadata": {"key": key, "folder": folder} } ) # Notify completion with failure self._safe_call_event( self.events.on_completion, { "operation_id": operation_id, "success": False, "message": error_msg } ) # Return empty dict on error (scan failed but not critical) return {}