Files
Aniworld/src/core/services/nfo_service.py

559 lines
19 KiB
Python

"""NFO service for creating and managing tvshow.nfo files.
This service orchestrates TMDB API calls, XML generation, and media downloads
to create complete NFO metadata for TV series.
Example:
>>> nfo_service = NFOService(tmdb_api_key="key", anime_directory="/anime")
>>> await nfo_service.create_tvshow_nfo("Attack on Titan", "/anime/aot", 2013)
"""
import logging
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from lxml import etree
from src.core.entities.nfo_models import (
ActorInfo,
ImageInfo,
RatingInfo,
TVShowNFO,
UniqueID,
)
from src.core.services.tmdb_client import TMDBAPIError, TMDBClient
from src.core.utils.image_downloader import ImageDownloader
from src.core.utils.nfo_generator import generate_tvshow_nfo
logger = logging.getLogger(__name__)
class NFOService:
"""Service for creating and managing tvshow.nfo files.
Attributes:
tmdb_client: TMDB API client
image_downloader: Image downloader utility
anime_directory: Base directory for anime series
"""
def __init__(
self,
tmdb_api_key: str,
anime_directory: str,
image_size: str = "original",
auto_create: bool = True
):
"""Initialize NFO service.
Args:
tmdb_api_key: TMDB API key
anime_directory: Base anime directory path
image_size: Image size to download (original, w500, etc.)
auto_create: Whether to auto-create NFOs
"""
self.tmdb_client = TMDBClient(api_key=tmdb_api_key)
self.image_downloader = ImageDownloader()
self.anime_directory = Path(anime_directory)
self.image_size = image_size
self.auto_create = auto_create
def has_nfo(self, serie_folder: str) -> bool:
"""Check if tvshow.nfo exists for a series.
Args:
serie_folder: Series folder name
Returns:
True if NFO file exists
"""
nfo_path = self.anime_directory / serie_folder / "tvshow.nfo"
return nfo_path.exists()
@staticmethod
def _extract_year_from_name(serie_name: str) -> Tuple[str, Optional[int]]:
"""Extract year from series name if present in format 'Name (YYYY)'.
Args:
serie_name: Series name, possibly with year in parentheses
Returns:
Tuple of (clean_name, year)
- clean_name: Series name without year
- year: Extracted year or None
Examples:
>>> _extract_year_from_name("Attack on Titan (2013)")
("Attack on Titan", 2013)
>>> _extract_year_from_name("Attack on Titan")
("Attack on Titan", None)
"""
# Match year in parentheses at the end: (YYYY)
match = re.search(r'\((\d{4})\)\s*$', serie_name)
if match:
year = int(match.group(1))
clean_name = serie_name[:match.start()].strip()
return clean_name, year
return serie_name, None
async def check_nfo_exists(self, serie_folder: str) -> bool:
"""Check if tvshow.nfo exists for a series.
Args:
serie_folder: Series folder name
Returns:
True if tvshow.nfo exists
"""
nfo_path = self.anime_directory / serie_folder / "tvshow.nfo"
return nfo_path.exists()
async def create_tvshow_nfo(
self,
serie_name: str,
serie_folder: str,
year: Optional[int] = None,
download_poster: bool = True,
download_logo: bool = True,
download_fanart: bool = True
) -> Path:
"""Create tvshow.nfo by scraping TMDB.
Args:
serie_name: Name of the series to search (may include year in parentheses)
serie_folder: Series folder name
year: Release year (helps narrow search). If None and name contains year,
year will be auto-extracted
download_poster: Whether to download poster.jpg
download_logo: Whether to download logo.png
download_fanart: Whether to download fanart.jpg
Returns:
Path to created NFO file
Raises:
TMDBAPIError: If TMDB API fails
FileNotFoundError: If series folder doesn't exist
"""
# Extract year from name if not provided
clean_name, extracted_year = self._extract_year_from_name(serie_name)
if year is None and extracted_year is not None:
year = extracted_year
logger.info(f"Extracted year {year} from series name")
# Use clean name for search
search_name = clean_name
logger.info(f"Creating NFO for {search_name} (year: {year})")
folder_path = self.anime_directory / serie_folder
if not folder_path.exists():
logger.info(f"Creating series folder: {folder_path}")
folder_path.mkdir(parents=True, exist_ok=True)
async with self.tmdb_client:
# Search for TV show with clean name (without year)
logger.debug(f"Searching TMDB for: {search_name}")
search_results = await self.tmdb_client.search_tv_show(search_name)
if not search_results.get("results"):
raise TMDBAPIError(f"No results found for: {search_name}")
# Find best match (consider year if provided)
tv_show = self._find_best_match(search_results["results"], search_name, year)
tv_id = tv_show["id"]
logger.info(f"Found match: {tv_show['name']} (ID: {tv_id})")
# Get detailed information
details = await self.tmdb_client.get_tv_show_details(
tv_id,
append_to_response="credits,external_ids,images"
)
# Get content ratings for FSK
content_ratings = await self.tmdb_client.get_tv_show_content_ratings(tv_id)
# Convert TMDB data to TVShowNFO model
nfo_model = self._tmdb_to_nfo_model(details, content_ratings)
# Generate XML
nfo_xml = generate_tvshow_nfo(nfo_model)
# Save NFO file
nfo_path = folder_path / "tvshow.nfo"
nfo_path.write_text(nfo_xml, encoding="utf-8")
logger.info(f"Created NFO: {nfo_path}")
# Download media files
await self._download_media_files(
details,
folder_path,
download_poster=download_poster,
download_logo=download_logo,
download_fanart=download_fanart
)
return nfo_path
async def update_tvshow_nfo(
self,
serie_folder: str,
download_media: bool = True
) -> Path:
"""Update existing tvshow.nfo with fresh data from TMDB.
Args:
serie_folder: Series folder name
download_media: Whether to re-download media files
Returns:
Path to updated NFO file
Raises:
FileNotFoundError: If NFO file doesn't exist
TMDBAPIError: If TMDB API fails or no TMDB ID found in NFO
"""
folder_path = self.anime_directory / serie_folder
nfo_path = folder_path / "tvshow.nfo"
if not nfo_path.exists():
raise FileNotFoundError(f"NFO file not found: {nfo_path}")
logger.info(f"Updating NFO for {serie_folder}")
# Parse existing NFO to extract TMDB ID
try:
tree = etree.parse(str(nfo_path))
root = tree.getroot()
# Try to find TMDB ID from uniqueid elements
tmdb_id = None
for uniqueid in root.findall(".//uniqueid"):
if uniqueid.get("type") == "tmdb":
tmdb_id = int(uniqueid.text)
break
# Fallback: check for tmdbid element
if tmdb_id is None:
tmdbid_elem = root.find(".//tmdbid")
if tmdbid_elem is not None and tmdbid_elem.text:
tmdb_id = int(tmdbid_elem.text)
if tmdb_id is None:
raise TMDBAPIError(
f"No TMDB ID found in existing NFO. "
f"Delete the NFO and create a new one instead."
)
logger.debug(f"Found TMDB ID: {tmdb_id}")
except etree.XMLSyntaxError as e:
raise TMDBAPIError(f"Invalid XML in NFO file: {e}")
except ValueError as e:
raise TMDBAPIError(f"Invalid TMDB ID format in NFO: {e}")
# Fetch fresh data from TMDB
async with self.tmdb_client:
logger.debug(f"Fetching fresh data for TMDB ID: {tmdb_id}")
details = await self.tmdb_client.get_tv_show_details(
tmdb_id,
append_to_response="credits,external_ids,images"
)
# Get content ratings for FSK
content_ratings = await self.tmdb_client.get_tv_show_content_ratings(tmdb_id)
# Convert TMDB data to TVShowNFO model
nfo_model = self._tmdb_to_nfo_model(details, content_ratings)
# Generate XML
nfo_xml = generate_tvshow_nfo(nfo_model)
# Save updated NFO file
nfo_path.write_text(nfo_xml, encoding="utf-8")
logger.info(f"Updated NFO: {nfo_path}")
# Re-download media files if requested
if download_media:
await self._download_media_files(
details,
folder_path,
download_poster=True,
download_logo=True,
download_fanart=True
)
return nfo_path
def _find_best_match(
self,
results: List[Dict[str, Any]],
query: str,
year: Optional[int] = None
) -> Dict[str, Any]:
"""Find best matching TV show from search results.
Args:
results: TMDB search results
query: Original search query
year: Expected release year
Returns:
Best matching TV show data
"""
if not results:
raise TMDBAPIError("No search results to match")
# If year is provided, try to find exact match
if year:
for result in results:
first_air_date = result.get("first_air_date", "")
if first_air_date.startswith(str(year)):
logger.debug(f"Found year match: {result['name']} ({first_air_date})")
return result
# Return first result (usually best match)
return results[0]
def _tmdb_to_nfo_model(
self,
tmdb_data: Dict[str, Any],
content_ratings: Optional[Dict[str, Any]] = None
) -> TVShowNFO:
"""Convert TMDB API data to TVShowNFO model.
Args:
tmdb_data: TMDB TV show details
content_ratings: TMDB content ratings data
Returns:
TVShowNFO Pydantic model
"""
# Extract basic info
title = tmdb_data["name"]
original_title = tmdb_data.get("original_name", title)
year = None
if tmdb_data.get("first_air_date"):
year = int(tmdb_data["first_air_date"][:4])
# Extract ratings
ratings = []
if tmdb_data.get("vote_average"):
ratings.append(RatingInfo(
name="themoviedb",
value=float(tmdb_data["vote_average"]),
votes=tmdb_data.get("vote_count", 0),
max_rating=10,
default=True
))
# Extract external IDs
external_ids = tmdb_data.get("external_ids", {})
imdb_id = external_ids.get("imdb_id")
tvdb_id = external_ids.get("tvdb_id")
# Extract images
thumb_images = []
fanart_images = []
# Poster
if tmdb_data.get("poster_path"):
poster_url = self.tmdb_client.get_image_url(
tmdb_data["poster_path"],
self.image_size
)
thumb_images.append(ImageInfo(url=poster_url, aspect="poster"))
# Backdrop/Fanart
if tmdb_data.get("backdrop_path"):
fanart_url = self.tmdb_client.get_image_url(
tmdb_data["backdrop_path"],
self.image_size
)
fanart_images.append(ImageInfo(url=fanart_url))
# Logo from images if available
images_data = tmdb_data.get("images", {})
logos = images_data.get("logos", [])
if logos:
logo_url = self.tmdb_client.get_image_url(
logos[0]["file_path"],
self.image_size
)
thumb_images.append(ImageInfo(url=logo_url, aspect="clearlogo"))
# Extract cast
actors = []
credits = tmdb_data.get("credits", {})
for cast_member in credits.get("cast", [])[:10]: # Top 10 actors
actor_thumb = None
if cast_member.get("profile_path"):
actor_thumb = self.tmdb_client.get_image_url(
cast_member["profile_path"],
"h632"
)
actors.append(ActorInfo(
name=cast_member["name"],
role=cast_member.get("character"),
thumb=actor_thumb,
tmdbid=cast_member["id"]
))
# Create unique IDs
unique_ids = []
if tmdb_data.get("id"):
unique_ids.append(UniqueID(
type="tmdb",
value=str(tmdb_data["id"]),
default=False
))
if imdb_id:
unique_ids.append(UniqueID(
type="imdb",
value=imdb_id,
default=False
))
if tvdb_id:
unique_ids.append(UniqueID(
type="tvdb",
value=str(tvdb_id),
default=True
))
# Extract FSK rating from content ratings
fsk_rating = self._extract_fsk_rating(content_ratings) if content_ratings else None
# Create NFO model
return TVShowNFO(
title=title,
originaltitle=original_title,
year=year,
plot=tmdb_data.get("overview"),
runtime=tmdb_data.get("episode_run_time", [None])[0] if tmdb_data.get("episode_run_time") else None,
premiered=tmdb_data.get("first_air_date"),
status=tmdb_data.get("status"),
genre=[g["name"] for g in tmdb_data.get("genres", [])],
studio=[n["name"] for n in tmdb_data.get("networks", [])],
country=[c["name"] for c in tmdb_data.get("production_countries", [])],
ratings=ratings,
fsk=fsk_rating,
tmdbid=tmdb_data.get("id"),
imdbid=imdb_id,
tvdbid=tvdb_id,
uniqueid=unique_ids,
thumb=thumb_images,
fanart=fanart_images,
actors=actors
)
async def _download_media_files(
self,
tmdb_data: Dict[str, Any],
folder_path: Path,
download_poster: bool = True,
download_logo: bool = True,
download_fanart: bool = True
) -> Dict[str, bool]:
"""Download media files (poster, logo, fanart).
Args:
tmdb_data: TMDB TV show details
folder_path: Series folder path
download_poster: Download poster.jpg
download_logo: Download logo.png
download_fanart: Download fanart.jpg
Returns:
Dictionary with download status for each file
"""
poster_url = None
logo_url = None
fanart_url = None
# Get poster URL
if download_poster and tmdb_data.get("poster_path"):
poster_url = self.tmdb_client.get_image_url(
tmdb_data["poster_path"],
self.image_size
)
# Get fanart URL
if download_fanart and tmdb_data.get("backdrop_path"):
fanart_url = self.tmdb_client.get_image_url(
tmdb_data["backdrop_path"],
"original" # Always use original for fanart
)
# Get logo URL
if download_logo:
images_data = tmdb_data.get("images", {})
logos = images_data.get("logos", [])
if logos:
logo_url = self.tmdb_client.get_image_url(
logos[0]["file_path"],
"original" # Logos should be original size
)
# Download all media concurrently
results = await self.image_downloader.download_all_media(
folder_path,
poster_url=poster_url,
logo_url=logo_url,
fanart_url=fanart_url,
skip_existing=True
)
logger.info(f"Media download results: {results}")
return results
def _extract_fsk_rating(self, content_ratings: Dict[str, Any]) -> Optional[str]:
"""Extract German FSK rating from TMDB content ratings.
Args:
content_ratings: TMDB content ratings response
Returns:
FSK rating string (e.g., 'FSK 12') or None
"""
if not content_ratings or "results" not in content_ratings:
return None
# Find German rating (iso_3166_1: "DE")
for rating in content_ratings["results"]:
if rating.get("iso_3166_1") == "DE":
rating_value = rating.get("rating", "")
# Map TMDB German ratings to FSK format
fsk_mapping = {
"0": "FSK 0",
"6": "FSK 6",
"12": "FSK 12",
"16": "FSK 16",
"18": "FSK 18",
}
# Try direct mapping
if rating_value in fsk_mapping:
return fsk_mapping[rating_value]
# Try to extract number from rating string (ordered from highest to lowest to avoid partial matches)
for key in ["18", "16", "12", "6", "0"]:
if key in rating_value:
return fsk_mapping[key]
# Return as-is if it already starts with FSK
if rating_value.startswith("FSK"):
return rating_value
logger.debug(f"Unmapped German rating: {rating_value}")
return None
return None
async def close(self):
"""Clean up resources."""
await self.tmdb_client.close()