feat: write all required NFO tags on creation

This commit is contained in:
2026-02-22 11:07:19 +01:00
parent 228964e928
commit e1abf90c81
7 changed files with 592 additions and 208 deletions

View File

@@ -15,16 +15,10 @@ from typing import Any, Dict, List, Optional, Tuple
from lxml import etree
from src.core.entities.nfo_models import (
ActorInfo,
ImageInfo,
RatingInfo,
TVShowNFO,
UniqueID,
)
from src.core.services.tmdb_client import TMDBAPIError, TMDBClient
from src.core.utils.image_downloader import ImageDownloader
from src.core.utils.nfo_generator import generate_tvshow_nfo
from src.core.utils.nfo_mapper import tmdb_to_nfo_model
logger = logging.getLogger(__name__)
@@ -176,7 +170,12 @@ class NFOService:
content_ratings = await self.tmdb_client.get_tv_show_content_ratings(tv_id)
# Convert TMDB data to TVShowNFO model
nfo_model = self._tmdb_to_nfo_model(details, content_ratings)
nfo_model = tmdb_to_nfo_model(
details,
content_ratings,
self.tmdb_client.get_image_url,
self.image_size,
)
# Generate XML
nfo_xml = generate_tvshow_nfo(nfo_model)
@@ -266,7 +265,12 @@ class NFOService:
content_ratings = await self.tmdb_client.get_tv_show_content_ratings(tmdb_id)
# Convert TMDB data to TVShowNFO model
nfo_model = self._tmdb_to_nfo_model(details, content_ratings)
nfo_model = tmdb_to_nfo_model(
details,
content_ratings,
self.tmdb_client.get_image_url,
self.image_size,
)
# Generate XML
nfo_xml = generate_tvshow_nfo(nfo_model)
@@ -398,137 +402,7 @@ class NFOService:
# Return first result (usually best match)
return results[0]
def _tmdb_to_nfo_model(
self,
tmdb_data: Dict[str, Any],
content_ratings: Optional[Dict[str, Any]] = None
) -> TVShowNFO:
"""Convert TMDB API data to TVShowNFO model.
Args:
tmdb_data: TMDB TV show details
content_ratings: TMDB content ratings data
Returns:
TVShowNFO Pydantic model
"""
# Extract basic info
title = tmdb_data["name"]
original_title = tmdb_data.get("original_name", title)
year = None
if tmdb_data.get("first_air_date"):
year = int(tmdb_data["first_air_date"][:4])
# Extract ratings
ratings = []
if tmdb_data.get("vote_average"):
ratings.append(RatingInfo(
name="themoviedb",
value=float(tmdb_data["vote_average"]),
votes=tmdb_data.get("vote_count", 0),
max_rating=10,
default=True
))
# Extract external IDs
external_ids = tmdb_data.get("external_ids", {})
imdb_id = external_ids.get("imdb_id")
tvdb_id = external_ids.get("tvdb_id")
# Extract images
thumb_images = []
fanart_images = []
# Poster
if tmdb_data.get("poster_path"):
poster_url = self.tmdb_client.get_image_url(
tmdb_data["poster_path"],
self.image_size
)
thumb_images.append(ImageInfo(url=poster_url, aspect="poster"))
# Backdrop/Fanart
if tmdb_data.get("backdrop_path"):
fanart_url = self.tmdb_client.get_image_url(
tmdb_data["backdrop_path"],
self.image_size
)
fanart_images.append(ImageInfo(url=fanart_url))
# Logo from images if available
images_data = tmdb_data.get("images", {})
logos = images_data.get("logos", [])
if logos:
logo_url = self.tmdb_client.get_image_url(
logos[0]["file_path"],
self.image_size
)
thumb_images.append(ImageInfo(url=logo_url, aspect="clearlogo"))
# Extract cast
actors = []
credits = tmdb_data.get("credits", {})
for cast_member in credits.get("cast", [])[:10]: # Top 10 actors
actor_thumb = None
if cast_member.get("profile_path"):
actor_thumb = self.tmdb_client.get_image_url(
cast_member["profile_path"],
"h632"
)
actors.append(ActorInfo(
name=cast_member["name"],
role=cast_member.get("character"),
thumb=actor_thumb,
tmdbid=cast_member["id"]
))
# Create unique IDs
unique_ids = []
if tmdb_data.get("id"):
unique_ids.append(UniqueID(
type="tmdb",
value=str(tmdb_data["id"]),
default=False
))
if imdb_id:
unique_ids.append(UniqueID(
type="imdb",
value=imdb_id,
default=False
))
if tvdb_id:
unique_ids.append(UniqueID(
type="tvdb",
value=str(tvdb_id),
default=True
))
# Extract FSK rating from content ratings
fsk_rating = self._extract_fsk_rating(content_ratings) if content_ratings else None
# Create NFO model
return TVShowNFO(
title=title,
originaltitle=original_title,
year=year,
plot=tmdb_data.get("overview"),
runtime=tmdb_data.get("episode_run_time", [None])[0] if tmdb_data.get("episode_run_time") else None,
premiered=tmdb_data.get("first_air_date"),
status=tmdb_data.get("status"),
genre=[g["name"] for g in tmdb_data.get("genres", [])],
studio=[n["name"] for n in tmdb_data.get("networks", [])],
country=[c["name"] for c in tmdb_data.get("production_countries", [])],
ratings=ratings,
fsk=fsk_rating,
tmdbid=tmdb_data.get("id"),
imdbid=imdb_id,
tvdbid=tvdb_id,
uniqueid=unique_ids,
thumb=thumb_images,
fanart=fanart_images,
actors=actors
)
async def _download_media_files(
self,
@@ -590,49 +464,7 @@ class NFOService:
logger.info(f"Media download results: {results}")
return results
def _extract_fsk_rating(self, content_ratings: Dict[str, Any]) -> Optional[str]:
"""Extract German FSK rating from TMDB content ratings.
Args:
content_ratings: TMDB content ratings response
Returns:
FSK rating string (e.g., 'FSK 12') or None
"""
if not content_ratings or "results" not in content_ratings:
return None
# Find German rating (iso_3166_1: "DE")
for rating in content_ratings["results"]:
if rating.get("iso_3166_1") == "DE":
rating_value = rating.get("rating", "")
# Map TMDB German ratings to FSK format
fsk_mapping = {
"0": "FSK 0",
"6": "FSK 6",
"12": "FSK 12",
"16": "FSK 16",
"18": "FSK 18",
}
# Try direct mapping
if rating_value in fsk_mapping:
return fsk_mapping[rating_value]
# Try to extract number from rating string (ordered from highest to lowest to avoid partial matches)
for key in ["18", "16", "12", "6", "0"]:
if key in rating_value:
return fsk_mapping[key]
# Return as-is if it already starts with FSK
if rating_value.startswith("FSK"):
return rating_value
logger.debug(f"Unmapped German rating: {rating_value}")
return None
return None
async def close(self):
"""Clean up resources."""

View File

@@ -0,0 +1,220 @@
"""TMDB to NFO model mapper.
This module converts TMDB API data to TVShowNFO Pydantic models,
keeping the mapping logic separate from the service orchestration.
Example:
>>> model = tmdb_to_nfo_model(tmdb_data, content_ratings, get_image_url, "original")
"""
import logging
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional
from src.core.entities.nfo_models import (
ActorInfo,
ImageInfo,
RatingInfo,
TVShowNFO,
UniqueID,
)
logger = logging.getLogger(__name__)
def _extract_rating_by_country(
content_ratings: Dict[str, Any],
country_code: str,
) -> Optional[str]:
"""Extract content rating for a specific country from TMDB content ratings.
Args:
content_ratings: TMDB content ratings response dict with "results" list.
country_code: ISO 3166-1 alpha-2 country code (e.g., "DE", "US").
Returns:
Raw rating string for the requested country, or None if not found.
Example:
>>> _extract_rating_by_country({"results": [{"iso_3166_1": "US", "rating": "TV-14"}]}, "US")
'TV-14'
"""
if not content_ratings or "results" not in content_ratings:
return None
for rating in content_ratings["results"]:
if rating.get("iso_3166_1") == country_code:
return rating.get("rating") or None
return None
def _extract_fsk_rating(content_ratings: Dict[str, Any]) -> Optional[str]:
"""Extract German FSK rating from TMDB content ratings.
Delegates to :func:`_extract_rating_by_country` and then normalises the
raw TMDB string into the 'FSK XX' format expected by Kodi/Jellyfin.
Args:
content_ratings: TMDB content ratings response.
Returns:
Formatted FSK string (e.g., 'FSK 12') or None.
"""
raw = _extract_rating_by_country(content_ratings, "DE")
if raw is None:
return None
fsk_mapping: Dict[str, str] = {
"0": "FSK 0",
"6": "FSK 6",
"12": "FSK 12",
"16": "FSK 16",
"18": "FSK 18",
}
if raw in fsk_mapping:
return fsk_mapping[raw]
# Try to extract numeric part (ordered high→low to avoid partial matches)
for key in ["18", "16", "12", "6", "0"]:
if key in raw:
return fsk_mapping[key]
if raw.startswith("FSK"):
return raw
logger.debug("Unmapped German rating: %s", raw)
return None
def tmdb_to_nfo_model(
tmdb_data: Dict[str, Any],
content_ratings: Optional[Dict[str, Any]],
get_image_url: Callable[[str, str], str],
image_size: str = "original",
) -> TVShowNFO:
"""Convert TMDB API data to a fully-populated TVShowNFO model.
All required NFO tags are explicitly set in this function so that newly
created files are complete without a subsequent repair pass.
Args:
tmdb_data: TMDB TV show details (with credits, external_ids, images
appended via ``append_to_response``).
content_ratings: TMDB content ratings response, or None.
get_image_url: Callable ``(path, size) -> url`` for TMDB images.
image_size: TMDB image size parameter (e.g., ``"original"``, ``"w500"``).
Returns:
TVShowNFO Pydantic model with all available fields populated.
"""
title: str = tmdb_data["name"]
original_title: str = tmdb_data.get("original_name") or title
# --- Year and dates ---
first_air_date: Optional[str] = tmdb_data.get("first_air_date") or None
year: Optional[int] = int(first_air_date[:4]) if first_air_date else None
# --- Ratings ---
ratings: List[RatingInfo] = []
if tmdb_data.get("vote_average"):
ratings.append(RatingInfo(
name="themoviedb",
value=float(tmdb_data["vote_average"]),
votes=tmdb_data.get("vote_count", 0),
max_rating=10,
default=True,
))
# --- External IDs ---
external_ids: Dict[str, Any] = tmdb_data.get("external_ids", {})
imdb_id: Optional[str] = external_ids.get("imdb_id")
tvdb_id: Optional[int] = external_ids.get("tvdb_id")
# --- Images ---
thumb_images: List[ImageInfo] = []
fanart_images: List[ImageInfo] = []
if tmdb_data.get("poster_path"):
thumb_images.append(ImageInfo(
url=get_image_url(tmdb_data["poster_path"], image_size),
aspect="poster",
))
if tmdb_data.get("backdrop_path"):
fanart_images.append(ImageInfo(
url=get_image_url(tmdb_data["backdrop_path"], image_size),
))
logos: List[Dict[str, Any]] = tmdb_data.get("images", {}).get("logos", [])
if logos:
thumb_images.append(ImageInfo(
url=get_image_url(logos[0]["file_path"], image_size),
aspect="clearlogo",
))
# --- Cast (top 10) ---
actors: List[ActorInfo] = []
for member in tmdb_data.get("credits", {}).get("cast", [])[:10]:
actor_thumb: Optional[str] = None
if member.get("profile_path"):
actor_thumb = get_image_url(member["profile_path"], "h632")
actors.append(ActorInfo(
name=member["name"],
role=member.get("character"),
thumb=actor_thumb,
tmdbid=member["id"],
))
# --- Unique IDs ---
unique_ids: List[UniqueID] = []
if tmdb_data.get("id"):
unique_ids.append(UniqueID(type="tmdb", value=str(tmdb_data["id"]), default=False))
if imdb_id:
unique_ids.append(UniqueID(type="imdb", value=imdb_id, default=False))
if tvdb_id:
unique_ids.append(UniqueID(type="tvdb", value=str(tvdb_id), default=True))
# --- Content ratings ---
fsk_rating: Optional[str] = _extract_fsk_rating(content_ratings) if content_ratings else None
mpaa_rating: Optional[str] = (
_extract_rating_by_country(content_ratings, "US") if content_ratings else None
)
# --- Country: prefer origin_country codes; fall back to production_countries names ---
country_list: List[str] = list(tmdb_data.get("origin_country", []))
if not country_list:
country_list = [c["name"] for c in tmdb_data.get("production_countries", [])]
# --- Runtime ---
runtime_list: List[int] = tmdb_data.get("episode_run_time", [])
runtime: Optional[int] = runtime_list[0] if runtime_list else None
return TVShowNFO(
title=title,
originaltitle=original_title,
sorttitle=title,
year=year,
plot=tmdb_data.get("overview") or None,
outline=tmdb_data.get("overview") or None,
tagline=tmdb_data.get("tagline") or None,
runtime=runtime,
premiered=first_air_date,
status=tmdb_data.get("status"),
genre=[g["name"] for g in tmdb_data.get("genres", [])],
studio=[n["name"] for n in tmdb_data.get("networks", [])],
country=country_list,
ratings=ratings,
fsk=fsk_rating,
mpaa=mpaa_rating,
tmdbid=tmdb_data.get("id"),
imdbid=imdb_id,
tvdbid=tvdb_id,
uniqueid=unique_ids,
thumb=thumb_images,
fanart=fanart_images,
actors=actors,
watched=False,
dateadded=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
)