"""NFO service for creating and managing tvshow.nfo files. This service orchestrates TMDB API calls, XML generation, and media downloads to create complete NFO metadata for TV series. Example: >>> nfo_service = NFOService(tmdb_api_key="key", anime_directory="/anime") >>> await nfo_service.create_tvshow_nfo("Attack on Titan", "/anime/aot", 2013) """ import logging import re from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from lxml import etree from src.core.entities.nfo_models import ( ActorInfo, ImageInfo, RatingInfo, TVShowNFO, UniqueID, ) from src.core.services.tmdb_client import TMDBAPIError, TMDBClient from src.core.utils.image_downloader import ImageDownloader from src.core.utils.nfo_generator import generate_tvshow_nfo logger = logging.getLogger(__name__) class NFOService: """Service for creating and managing tvshow.nfo files. Attributes: tmdb_client: TMDB API client image_downloader: Image downloader utility anime_directory: Base directory for anime series """ def __init__( self, tmdb_api_key: str, anime_directory: str, image_size: str = "original", auto_create: bool = True ): """Initialize NFO service. Args: tmdb_api_key: TMDB API key anime_directory: Base anime directory path image_size: Image size to download (original, w500, etc.) auto_create: Whether to auto-create NFOs """ self.tmdb_client = TMDBClient(api_key=tmdb_api_key) self.image_downloader = ImageDownloader() self.anime_directory = Path(anime_directory) self.image_size = image_size self.auto_create = auto_create def has_nfo(self, serie_folder: str) -> bool: """Check if tvshow.nfo exists for a series. Args: serie_folder: Series folder name Returns: True if NFO file exists """ nfo_path = self.anime_directory / serie_folder / "tvshow.nfo" return nfo_path.exists() @staticmethod def _extract_year_from_name(serie_name: str) -> Tuple[str, Optional[int]]: """Extract year from series name if present in format 'Name (YYYY)'. Args: serie_name: Series name, possibly with year in parentheses Returns: Tuple of (clean_name, year) - clean_name: Series name without year - year: Extracted year or None Examples: >>> _extract_year_from_name("Attack on Titan (2013)") ("Attack on Titan", 2013) >>> _extract_year_from_name("Attack on Titan") ("Attack on Titan", None) """ # Match year in parentheses at the end: (YYYY) match = re.search(r'\((\d{4})\)\s*$', serie_name) if match: year = int(match.group(1)) clean_name = serie_name[:match.start()].strip() return clean_name, year return serie_name, None async def check_nfo_exists(self, serie_folder: str) -> bool: """Check if tvshow.nfo exists for a series. Args: serie_folder: Series folder name Returns: True if tvshow.nfo exists """ nfo_path = self.anime_directory / serie_folder / "tvshow.nfo" return nfo_path.exists() async def create_tvshow_nfo( self, serie_name: str, serie_folder: str, year: Optional[int] = None, download_poster: bool = True, download_logo: bool = True, download_fanart: bool = True ) -> Path: """Create tvshow.nfo by scraping TMDB. Args: serie_name: Name of the series to search (may include year in parentheses) serie_folder: Series folder name year: Release year (helps narrow search). If None and name contains year, year will be auto-extracted download_poster: Whether to download poster.jpg download_logo: Whether to download logo.png download_fanart: Whether to download fanart.jpg Returns: Path to created NFO file Raises: TMDBAPIError: If TMDB API fails FileNotFoundError: If series folder doesn't exist """ # Extract year from name if not provided clean_name, extracted_year = self._extract_year_from_name(serie_name) if year is None and extracted_year is not None: year = extracted_year logger.info(f"Extracted year {year} from series name") # Use clean name for search search_name = clean_name logger.info(f"Creating NFO for {search_name} (year: {year})") folder_path = self.anime_directory / serie_folder if not folder_path.exists(): logger.info(f"Creating series folder: {folder_path}") folder_path.mkdir(parents=True, exist_ok=True) async with self.tmdb_client: # Search for TV show with clean name (without year) logger.debug(f"Searching TMDB for: {search_name}") search_results = await self.tmdb_client.search_tv_show(search_name) if not search_results.get("results"): raise TMDBAPIError(f"No results found for: {search_name}") # Find best match (consider year if provided) tv_show = self._find_best_match(search_results["results"], search_name, year) tv_id = tv_show["id"] logger.info(f"Found match: {tv_show['name']} (ID: {tv_id})") # Get detailed information details = await self.tmdb_client.get_tv_show_details( tv_id, append_to_response="credits,external_ids,images" ) # Get content ratings for FSK content_ratings = await self.tmdb_client.get_tv_show_content_ratings(tv_id) # Convert TMDB data to TVShowNFO model nfo_model = self._tmdb_to_nfo_model(details, content_ratings) # Generate XML nfo_xml = generate_tvshow_nfo(nfo_model) # Save NFO file nfo_path = folder_path / "tvshow.nfo" nfo_path.write_text(nfo_xml, encoding="utf-8") logger.info(f"Created NFO: {nfo_path}") # Download media files await self._download_media_files( details, folder_path, download_poster=download_poster, download_logo=download_logo, download_fanart=download_fanart ) return nfo_path async def update_tvshow_nfo( self, serie_folder: str, download_media: bool = True ) -> Path: """Update existing tvshow.nfo with fresh data from TMDB. Args: serie_folder: Series folder name download_media: Whether to re-download media files Returns: Path to updated NFO file Raises: FileNotFoundError: If NFO file doesn't exist TMDBAPIError: If TMDB API fails or no TMDB ID found in NFO """ folder_path = self.anime_directory / serie_folder nfo_path = folder_path / "tvshow.nfo" if not nfo_path.exists(): raise FileNotFoundError(f"NFO file not found: {nfo_path}") logger.info(f"Updating NFO for {serie_folder}") # Parse existing NFO to extract TMDB ID try: tree = etree.parse(str(nfo_path)) root = tree.getroot() # Try to find TMDB ID from uniqueid elements tmdb_id = None for uniqueid in root.findall(".//uniqueid"): if uniqueid.get("type") == "tmdb": tmdb_id = int(uniqueid.text) break # Fallback: check for tmdbid element if tmdb_id is None: tmdbid_elem = root.find(".//tmdbid") if tmdbid_elem is not None and tmdbid_elem.text: tmdb_id = int(tmdbid_elem.text) if tmdb_id is None: raise TMDBAPIError( f"No TMDB ID found in existing NFO. " f"Delete the NFO and create a new one instead." ) logger.debug(f"Found TMDB ID: {tmdb_id}") except etree.XMLSyntaxError as e: raise TMDBAPIError(f"Invalid XML in NFO file: {e}") except ValueError as e: raise TMDBAPIError(f"Invalid TMDB ID format in NFO: {e}") # Fetch fresh data from TMDB async with self.tmdb_client: logger.debug(f"Fetching fresh data for TMDB ID: {tmdb_id}") details = await self.tmdb_client.get_tv_show_details( tmdb_id, append_to_response="credits,external_ids,images" ) # Get content ratings for FSK content_ratings = await self.tmdb_client.get_tv_show_content_ratings(tmdb_id) # Convert TMDB data to TVShowNFO model nfo_model = self._tmdb_to_nfo_model(details, content_ratings) # Generate XML nfo_xml = generate_tvshow_nfo(nfo_model) # Save updated NFO file nfo_path.write_text(nfo_xml, encoding="utf-8") logger.info(f"Updated NFO: {nfo_path}") # Re-download media files if requested if download_media: await self._download_media_files( details, folder_path, download_poster=True, download_logo=True, download_fanart=True ) return nfo_path def parse_nfo_ids(self, nfo_path: Path) -> Dict[str, Optional[int]]: """Parse TMDB ID and TVDB ID from an existing NFO file. Args: nfo_path: Path to tvshow.nfo file Returns: Dictionary with 'tmdb_id' and 'tvdb_id' keys. Values are integers if found, None otherwise. Example: >>> ids = nfo_service.parse_nfo_ids(Path("/anime/series/tvshow.nfo")) >>> print(ids) {'tmdb_id': 1429, 'tvdb_id': 79168} """ result = {"tmdb_id": None, "tvdb_id": None} if not nfo_path.exists(): logger.debug(f"NFO file not found: {nfo_path}") return result try: tree = etree.parse(str(nfo_path)) root = tree.getroot() # Try to find TMDB ID from uniqueid elements first for uniqueid in root.findall(".//uniqueid"): uid_type = uniqueid.get("type") uid_text = uniqueid.text if uid_type == "tmdb" and uid_text: try: result["tmdb_id"] = int(uid_text) except ValueError: logger.warning( f"Invalid TMDB ID format in NFO: {uid_text}" ) elif uid_type == "tvdb" and uid_text: try: result["tvdb_id"] = int(uid_text) except ValueError: logger.warning( f"Invalid TVDB ID format in NFO: {uid_text}" ) # Fallback: check for dedicated tmdbid/tvdbid elements if result["tmdb_id"] is None: tmdbid_elem = root.find(".//tmdbid") if tmdbid_elem is not None and tmdbid_elem.text: try: result["tmdb_id"] = int(tmdbid_elem.text) except ValueError: logger.warning( f"Invalid TMDB ID format in tmdbid element: " f"{tmdbid_elem.text}" ) if result["tvdb_id"] is None: tvdbid_elem = root.find(".//tvdbid") if tvdbid_elem is not None and tvdbid_elem.text: try: result["tvdb_id"] = int(tvdbid_elem.text) except ValueError: logger.warning( f"Invalid TVDB ID format in tvdbid element: " f"{tvdbid_elem.text}" ) logger.debug( f"Parsed IDs from NFO: {nfo_path.name} - " f"TMDB: {result['tmdb_id']}, TVDB: {result['tvdb_id']}" ) except etree.XMLSyntaxError as e: logger.error(f"Invalid XML in NFO file {nfo_path}: {e}") except Exception as e: # pylint: disable=broad-except logger.error(f"Error parsing NFO file {nfo_path}: {e}") return result def _find_best_match( self, results: List[Dict[str, Any]], query: str, year: Optional[int] = None ) -> Dict[str, Any]: """Find best matching TV show from search results. Args: results: TMDB search results query: Original search query year: Expected release year Returns: Best matching TV show data """ if not results: raise TMDBAPIError("No search results to match") # If year is provided, try to find exact match if year: for result in results: first_air_date = result.get("first_air_date", "") if first_air_date.startswith(str(year)): logger.debug(f"Found year match: {result['name']} ({first_air_date})") return result # Return first result (usually best match) return results[0] def _tmdb_to_nfo_model( self, tmdb_data: Dict[str, Any], content_ratings: Optional[Dict[str, Any]] = None ) -> TVShowNFO: """Convert TMDB API data to TVShowNFO model. Args: tmdb_data: TMDB TV show details content_ratings: TMDB content ratings data Returns: TVShowNFO Pydantic model """ # Extract basic info title = tmdb_data["name"] original_title = tmdb_data.get("original_name", title) year = None if tmdb_data.get("first_air_date"): year = int(tmdb_data["first_air_date"][:4]) # Extract ratings ratings = [] if tmdb_data.get("vote_average"): ratings.append(RatingInfo( name="themoviedb", value=float(tmdb_data["vote_average"]), votes=tmdb_data.get("vote_count", 0), max_rating=10, default=True )) # Extract external IDs external_ids = tmdb_data.get("external_ids", {}) imdb_id = external_ids.get("imdb_id") tvdb_id = external_ids.get("tvdb_id") # Extract images thumb_images = [] fanart_images = [] # Poster if tmdb_data.get("poster_path"): poster_url = self.tmdb_client.get_image_url( tmdb_data["poster_path"], self.image_size ) thumb_images.append(ImageInfo(url=poster_url, aspect="poster")) # Backdrop/Fanart if tmdb_data.get("backdrop_path"): fanart_url = self.tmdb_client.get_image_url( tmdb_data["backdrop_path"], self.image_size ) fanart_images.append(ImageInfo(url=fanart_url)) # Logo from images if available images_data = tmdb_data.get("images", {}) logos = images_data.get("logos", []) if logos: logo_url = self.tmdb_client.get_image_url( logos[0]["file_path"], self.image_size ) thumb_images.append(ImageInfo(url=logo_url, aspect="clearlogo")) # Extract cast actors = [] credits = tmdb_data.get("credits", {}) for cast_member in credits.get("cast", [])[:10]: # Top 10 actors actor_thumb = None if cast_member.get("profile_path"): actor_thumb = self.tmdb_client.get_image_url( cast_member["profile_path"], "h632" ) actors.append(ActorInfo( name=cast_member["name"], role=cast_member.get("character"), thumb=actor_thumb, tmdbid=cast_member["id"] )) # Create unique IDs unique_ids = [] if tmdb_data.get("id"): unique_ids.append(UniqueID( type="tmdb", value=str(tmdb_data["id"]), default=False )) if imdb_id: unique_ids.append(UniqueID( type="imdb", value=imdb_id, default=False )) if tvdb_id: unique_ids.append(UniqueID( type="tvdb", value=str(tvdb_id), default=True )) # Extract FSK rating from content ratings fsk_rating = self._extract_fsk_rating(content_ratings) if content_ratings else None # Create NFO model return TVShowNFO( title=title, originaltitle=original_title, year=year, plot=tmdb_data.get("overview"), runtime=tmdb_data.get("episode_run_time", [None])[0] if tmdb_data.get("episode_run_time") else None, premiered=tmdb_data.get("first_air_date"), status=tmdb_data.get("status"), genre=[g["name"] for g in tmdb_data.get("genres", [])], studio=[n["name"] for n in tmdb_data.get("networks", [])], country=[c["name"] for c in tmdb_data.get("production_countries", [])], ratings=ratings, fsk=fsk_rating, tmdbid=tmdb_data.get("id"), imdbid=imdb_id, tvdbid=tvdb_id, uniqueid=unique_ids, thumb=thumb_images, fanart=fanart_images, actors=actors ) async def _download_media_files( self, tmdb_data: Dict[str, Any], folder_path: Path, download_poster: bool = True, download_logo: bool = True, download_fanart: bool = True ) -> Dict[str, bool]: """Download media files (poster, logo, fanart). Args: tmdb_data: TMDB TV show details folder_path: Series folder path download_poster: Download poster.jpg download_logo: Download logo.png download_fanart: Download fanart.jpg Returns: Dictionary with download status for each file """ poster_url = None logo_url = None fanart_url = None # Get poster URL if download_poster and tmdb_data.get("poster_path"): poster_url = self.tmdb_client.get_image_url( tmdb_data["poster_path"], self.image_size ) # Get fanart URL if download_fanart and tmdb_data.get("backdrop_path"): fanart_url = self.tmdb_client.get_image_url( tmdb_data["backdrop_path"], "original" # Always use original for fanart ) # Get logo URL if download_logo: images_data = tmdb_data.get("images", {}) logos = images_data.get("logos", []) if logos: logo_url = self.tmdb_client.get_image_url( logos[0]["file_path"], "original" # Logos should be original size ) # Download all media concurrently results = await self.image_downloader.download_all_media( folder_path, poster_url=poster_url, logo_url=logo_url, fanart_url=fanart_url, skip_existing=True ) logger.info(f"Media download results: {results}") return results def _extract_fsk_rating(self, content_ratings: Dict[str, Any]) -> Optional[str]: """Extract German FSK rating from TMDB content ratings. Args: content_ratings: TMDB content ratings response Returns: FSK rating string (e.g., 'FSK 12') or None """ if not content_ratings or "results" not in content_ratings: return None # Find German rating (iso_3166_1: "DE") for rating in content_ratings["results"]: if rating.get("iso_3166_1") == "DE": rating_value = rating.get("rating", "") # Map TMDB German ratings to FSK format fsk_mapping = { "0": "FSK 0", "6": "FSK 6", "12": "FSK 12", "16": "FSK 16", "18": "FSK 18", } # Try direct mapping if rating_value in fsk_mapping: return fsk_mapping[rating_value] # Try to extract number from rating string (ordered from highest to lowest to avoid partial matches) for key in ["18", "16", "12", "6", "0"]: if key in rating_value: return fsk_mapping[key] # Return as-is if it already starts with FSK if rating_value.startswith("FSK"): return rating_value logger.debug(f"Unmapped German rating: {rating_value}") return None return None async def close(self): """Clean up resources.""" await self.tmdb_client.close()