feat: Add NFO metadata infrastructure (Task 3 - partial)

- Created TMDB API client with async requests, caching, and retry logic
- Implemented NFO XML generator for Kodi/XBMC format
- Created image downloader for poster/logo/fanart with validation
- Added NFO service to orchestrate metadata creation
- Added NFO-related configuration settings
- Updated requirements.txt with aiohttp, lxml, pillow
- Created unit tests (need refinement due to implementation mismatch)

Components created:
- src/core/services/tmdb_client.py (270 lines)
- src/core/services/nfo_service.py (390 lines)
- src/core/utils/nfo_generator.py (180 lines)
- src/core/utils/image_downloader.py (296 lines)
- tests/unit/test_tmdb_client.py
- tests/unit/test_nfo_generator.py
- tests/unit/test_image_downloader.py

Note: Tests need to be updated to match actual implementation APIs.
Dependencies installed: aiohttp, lxml, pillow
This commit is contained in:
2026-01-11 20:33:33 +01:00
parent 5e8815d143
commit 4895e487c0
10 changed files with 2270 additions and 1 deletions

View File

@@ -0,0 +1,392 @@
"""NFO service for creating and managing tvshow.nfo files.
This service orchestrates TMDB API calls, XML generation, and media downloads
to create complete NFO metadata for TV series.
Example:
>>> nfo_service = NFOService(tmdb_api_key="key", anime_directory="/anime")
>>> await nfo_service.create_tvshow_nfo("Attack on Titan", "/anime/aot", 2013)
"""
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional
from src.core.entities.nfo_models import (
ActorInfo,
ImageInfo,
RatingInfo,
TVShowNFO,
UniqueID,
)
from src.core.services.tmdb_client import TMDBAPIError, TMDBClient
from src.core.utils.image_downloader import ImageDownloader, ImageDownloadError
from src.core.utils.nfo_generator import generate_tvshow_nfo
logger = logging.getLogger(__name__)
class NFOService:
"""Service for creating and managing tvshow.nfo files.
Attributes:
tmdb_client: TMDB API client
image_downloader: Image downloader utility
anime_directory: Base directory for anime series
"""
def __init__(
self,
tmdb_api_key: str,
anime_directory: str,
image_size: str = "original",
auto_create: bool = True
):
"""Initialize NFO service.
Args:
tmdb_api_key: TMDB API key
anime_directory: Base anime directory path
image_size: Image size to download (original, w500, etc.)
auto_create: Whether to auto-create NFOs
"""
self.tmdb_client = TMDBClient(api_key=tmdb_api_key)
self.image_downloader = ImageDownloader()
self.anime_directory = Path(anime_directory)
self.image_size = image_size
self.auto_create = auto_create
async def check_nfo_exists(self, serie_folder: str) -> bool:
"""Check if tvshow.nfo exists for a series.
Args:
serie_folder: Series folder name
Returns:
True if tvshow.nfo exists
"""
nfo_path = self.anime_directory / serie_folder / "tvshow.nfo"
return nfo_path.exists()
async def create_tvshow_nfo(
self,
serie_name: str,
serie_folder: str,
year: Optional[int] = None,
download_poster: bool = True,
download_logo: bool = True,
download_fanart: bool = True
) -> Path:
"""Create tvshow.nfo by scraping TMDB.
Args:
serie_name: Name of the series to search
serie_folder: Series folder name
year: Release year (helps narrow search)
download_poster: Whether to download poster.jpg
download_logo: Whether to download logo.png
download_fanart: Whether to download fanart.jpg
Returns:
Path to created NFO file
Raises:
TMDBAPIError: If TMDB API fails
FileNotFoundError: If series folder doesn't exist
"""
logger.info(f"Creating NFO for {serie_name} (year: {year})")
folder_path = self.anime_directory / serie_folder
if not folder_path.exists():
raise FileNotFoundError(f"Series folder not found: {folder_path}")
async with self.tmdb_client:
# Search for TV show
logger.debug(f"Searching TMDB for: {serie_name}")
search_results = await self.tmdb_client.search_tv_show(serie_name)
if not search_results.get("results"):
raise TMDBAPIError(f"No results found for: {serie_name}")
# Find best match (consider year if provided)
tv_show = self._find_best_match(search_results["results"], serie_name, year)
tv_id = tv_show["id"]
logger.info(f"Found match: {tv_show['name']} (ID: {tv_id})")
# Get detailed information
details = await self.tmdb_client.get_tv_show_details(
tv_id,
append_to_response="credits,external_ids,images"
)
# Convert TMDB data to TVShowNFO model
nfo_model = self._tmdb_to_nfo_model(details)
# Generate XML
nfo_xml = generate_tvshow_nfo(nfo_model)
# Save NFO file
nfo_path = folder_path / "tvshow.nfo"
nfo_path.write_text(nfo_xml, encoding="utf-8")
logger.info(f"Created NFO: {nfo_path}")
# Download media files
await self._download_media_files(
details,
folder_path,
download_poster=download_poster,
download_logo=download_logo,
download_fanart=download_fanart
)
return nfo_path
async def update_tvshow_nfo(
self,
serie_folder: str,
download_media: bool = True
) -> Path:
"""Update existing tvshow.nfo with fresh data from TMDB.
Args:
serie_folder: Series folder name
download_media: Whether to re-download media files
Returns:
Path to updated NFO file
Raises:
FileNotFoundError: If NFO file doesn't exist
TMDBAPIError: If TMDB API fails
"""
nfo_path = self.anime_directory / serie_folder / "tvshow.nfo"
if not nfo_path.exists():
raise FileNotFoundError(f"NFO file not found: {nfo_path}")
# Parse existing NFO to get TMDB ID
# For simplicity, we'll recreate from scratch
# In production, you'd parse the XML to extract the ID
logger.info(f"Updating NFO for {serie_folder}")
# Implementation would extract serie name and call create_tvshow_nfo
# This is a simplified version
raise NotImplementedError("Update NFO not yet implemented")
def _find_best_match(
self,
results: List[Dict[str, Any]],
query: str,
year: Optional[int] = None
) -> Dict[str, Any]:
"""Find best matching TV show from search results.
Args:
results: TMDB search results
query: Original search query
year: Expected release year
Returns:
Best matching TV show data
"""
if not results:
raise TMDBAPIError("No search results to match")
# If year is provided, try to find exact match
if year:
for result in results:
first_air_date = result.get("first_air_date", "")
if first_air_date.startswith(str(year)):
logger.debug(f"Found year match: {result['name']} ({first_air_date})")
return result
# Return first result (usually best match)
return results[0]
def _tmdb_to_nfo_model(self, tmdb_data: Dict[str, Any]) -> TVShowNFO:
"""Convert TMDB API data to TVShowNFO model.
Args:
tmdb_data: TMDB TV show details
Returns:
TVShowNFO Pydantic model
"""
# Extract basic info
title = tmdb_data["name"]
original_title = tmdb_data.get("original_name", title)
year = None
if tmdb_data.get("first_air_date"):
year = int(tmdb_data["first_air_date"][:4])
# Extract ratings
ratings = []
if tmdb_data.get("vote_average"):
ratings.append(RatingInfo(
name="themoviedb",
value=float(tmdb_data["vote_average"]),
votes=tmdb_data.get("vote_count", 0),
max_rating=10,
default=True
))
# Extract external IDs
external_ids = tmdb_data.get("external_ids", {})
imdb_id = external_ids.get("imdb_id")
tvdb_id = external_ids.get("tvdb_id")
# Extract images
thumb_images = []
fanart_images = []
# Poster
if tmdb_data.get("poster_path"):
poster_url = self.tmdb_client.get_image_url(
tmdb_data["poster_path"],
self.image_size
)
thumb_images.append(ImageInfo(url=poster_url, aspect="poster"))
# Backdrop/Fanart
if tmdb_data.get("backdrop_path"):
fanart_url = self.tmdb_client.get_image_url(
tmdb_data["backdrop_path"],
self.image_size
)
fanart_images.append(ImageInfo(url=fanart_url))
# Logo from images if available
images_data = tmdb_data.get("images", {})
logos = images_data.get("logos", [])
if logos:
logo_url = self.tmdb_client.get_image_url(
logos[0]["file_path"],
self.image_size
)
thumb_images.append(ImageInfo(url=logo_url, aspect="clearlogo"))
# Extract cast
actors = []
credits = tmdb_data.get("credits", {})
for cast_member in credits.get("cast", [])[:10]: # Top 10 actors
actor_thumb = None
if cast_member.get("profile_path"):
actor_thumb = self.tmdb_client.get_image_url(
cast_member["profile_path"],
"h632"
)
actors.append(ActorInfo(
name=cast_member["name"],
role=cast_member.get("character"),
thumb=actor_thumb,
tmdbid=cast_member["id"]
))
# Create unique IDs
unique_ids = []
if tmdb_data.get("id"):
unique_ids.append(UniqueID(
type="tmdb",
value=str(tmdb_data["id"]),
default=False
))
if imdb_id:
unique_ids.append(UniqueID(
type="imdb",
value=imdb_id,
default=False
))
if tvdb_id:
unique_ids.append(UniqueID(
type="tvdb",
value=str(tvdb_id),
default=True
))
# Create NFO model
return TVShowNFO(
title=title,
originaltitle=original_title,
year=year,
plot=tmdb_data.get("overview"),
runtime=tmdb_data.get("episode_run_time", [None])[0] if tmdb_data.get("episode_run_time") else None,
premiered=tmdb_data.get("first_air_date"),
status=tmdb_data.get("status"),
genre=[g["name"] for g in tmdb_data.get("genres", [])],
studio=[n["name"] for n in tmdb_data.get("networks", [])],
country=[c["name"] for c in tmdb_data.get("production_countries", [])],
ratings=ratings,
tmdbid=tmdb_data.get("id"),
imdbid=imdb_id,
tvdbid=tvdb_id,
uniqueid=unique_ids,
thumb=thumb_images,
fanart=fanart_images,
actors=actors
)
async def _download_media_files(
self,
tmdb_data: Dict[str, Any],
folder_path: Path,
download_poster: bool = True,
download_logo: bool = True,
download_fanart: bool = True
) -> Dict[str, bool]:
"""Download media files (poster, logo, fanart).
Args:
tmdb_data: TMDB TV show details
folder_path: Series folder path
download_poster: Download poster.jpg
download_logo: Download logo.png
download_fanart: Download fanart.jpg
Returns:
Dictionary with download status for each file
"""
poster_url = None
logo_url = None
fanart_url = None
# Get poster URL
if download_poster and tmdb_data.get("poster_path"):
poster_url = self.tmdb_client.get_image_url(
tmdb_data["poster_path"],
self.image_size
)
# Get fanart URL
if download_fanart and tmdb_data.get("backdrop_path"):
fanart_url = self.tmdb_client.get_image_url(
tmdb_data["backdrop_path"],
"original" # Always use original for fanart
)
# Get logo URL
if download_logo:
images_data = tmdb_data.get("images", {})
logos = images_data.get("logos", [])
if logos:
logo_url = self.tmdb_client.get_image_url(
logos[0]["file_path"],
"original" # Logos should be original size
)
# Download all media concurrently
results = await self.image_downloader.download_all_media(
folder_path,
poster_url=poster_url,
logo_url=logo_url,
fanart_url=fanart_url,
skip_existing=True
)
logger.info(f"Media download results: {results}")
return results
async def close(self):
"""Clean up resources."""
await self.tmdb_client.close()