"""TMDB API client for fetching TV show metadata. This module provides an async client for The Movie Database (TMDB) API, adapted from the scraper project to fit the AniworldMain architecture. Example: >>> async with TMDBClient(api_key="your_key") as client: ... results = await client.search_tv_show("Attack on Titan") ... show_id = results["results"][0]["id"] ... details = await client.get_tv_show_details(show_id) """ import asyncio import logging import time from pathlib import Path from typing import Any, Dict, List, Optional import aiohttp logger = logging.getLogger(__name__) class TMDBAPIError(Exception): """Exception raised for TMDB API errors.""" pass class TMDBClient: """Async TMDB API client for TV show metadata. Attributes: api_key: TMDB API key for authentication base_url: Base URL for TMDB API image_base_url: Base URL for TMDB images max_connections: Maximum concurrent connections session: aiohttp ClientSession for requests """ DEFAULT_BASE_URL = "https://api.themoviedb.org/3" DEFAULT_IMAGE_BASE_URL = "https://image.tmdb.org/t/p" NEGATIVE_CACHE_TTL = 86400 # 24 hours def __init__( self, api_key: str, base_url: str = DEFAULT_BASE_URL, image_base_url: str = DEFAULT_IMAGE_BASE_URL, max_connections: int = 10 ): """Initialize TMDB client. Args: api_key: TMDB API key base_url: TMDB API base URL image_base_url: TMDB image base URL max_connections: Maximum concurrent connections """ if not api_key: raise ValueError("TMDB API key is required") self.api_key = api_key self.base_url = base_url.rstrip('/') self.image_base_url = image_base_url.rstrip('/') self.max_connections = max_connections self.session: Optional[aiohttp.ClientSession] = None self._cache: Dict[str, Any] = {} self._negative_cache: Dict[str, float] = {} # query -> timestamp when cached # TMDB allows ~40 req/s; use 30 concurrent + per-second throttle to stay safe self._semaphore = asyncio.Semaphore(30) self._rate_limit_lock = asyncio.Lock() self._request_timestamps: List[float] = [] self._max_requests_per_second = 35 # Stay under TMDB's ~40/s limit async def __aenter__(self): """Async context manager entry.""" await self._ensure_session() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self.close() async def _ensure_session(self): """Ensure aiohttp session is created.""" if self.session is None or self.session.closed: connector = aiohttp.TCPConnector(limit=self.max_connections) self.session = aiohttp.ClientSession(connector=connector) async def _request( self, endpoint: str, params: Optional[Dict[str, Any]] = None, max_retries: int = 5 ) -> Dict[str, Any]: """Make an async request to TMDB API with retries. Args: endpoint: API endpoint (e.g., 'search/tv') params: Query parameters max_retries: Maximum retry attempts Returns: API response as dictionary Raises: TMDBAPIError: If request fails after retries """ await self._ensure_session() url = f"{self.base_url}/{endpoint}" params = params or {} params["api_key"] = self.api_key # Cache key for deduplication cache_key = f"{endpoint}:{str(sorted(params.items()))}" if cache_key in self._cache: logger.debug("Cache hit for %s", endpoint) return self._cache[cache_key] # Check negative cache (cached empty results) negative_cache_key = f"{endpoint}:{str(sorted(params.items()))}" if negative_cache_key in self._negative_cache: if time.monotonic() - self._negative_cache[negative_cache_key] < self.NEGATIVE_CACHE_TTL: logger.debug("Negative cache hit for %s (cached empty result)", endpoint) return {"results": []} else: # Expired negative cache entry del self._negative_cache[negative_cache_key] delay = 2 last_error = None # Rate limiting: ensure we don't exceed ~35 requests/second async with self._rate_limit_lock: now = time.monotonic() # Remove timestamps older than 1 second self._request_timestamps = [ ts for ts in self._request_timestamps if now - ts < 1.0 ] if len(self._request_timestamps) >= self._max_requests_per_second: sleep_time = 1.0 - (now - self._request_timestamps[0]) if sleep_time > 0: logger.debug("Rate throttling: waiting %.2fs", sleep_time) await asyncio.sleep(sleep_time) self._request_timestamps.append(time.monotonic()) async with self._semaphore: for attempt in range(max_retries): try: # Re-ensure session before each attempt in case it was closed await self._ensure_session() if self.session is None: raise TMDBAPIError("Session is not available") logger.debug("TMDB API request: %s (attempt %s)", endpoint, attempt + 1) async with self.session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=60)) as resp: if resp.status == 401: raise TMDBAPIError("Invalid TMDB API key") elif resp.status == 404: raise TMDBAPIError(f"Resource not found: {endpoint}") elif resp.status == 429: # Rate limit - wait longer with exponential backoff retry_after = int(resp.headers.get('Retry-After', max(delay * 2, 10))) logger.warning("Rate limited, waiting %ss", retry_after) await asyncio.sleep(retry_after) continue resp.raise_for_status() data = await resp.json() self._cache[cache_key] = data # Cache negative result if empty if endpoint.startswith("search/") and not data.get("results"): self._negative_cache[negative_cache_key] = time.monotonic() logger.debug("Cached negative result for %s", endpoint) return data except asyncio.TimeoutError as e: last_error = e if attempt < max_retries - 1: logger.warning("Request timeout (attempt %s), retrying in %ss", attempt + 1, delay) await asyncio.sleep(delay) delay = min(delay * 2, 30) else: logger.error("Request timed out after %s attempts", max_retries) except (aiohttp.ClientError, AttributeError) as e: last_error = e # If connector/session was closed, try to recreate it if "Connector is closed" in str(e) or isinstance(e, AttributeError): logger.warning( "Session issue detected, recreating session: %s", e, exc_info=True, ) self.session = None await self._ensure_session() # DNS / host-unreachable errors are not transient — abort immediately error_str = str(e) if "name resolution" in error_str.lower() or ( isinstance(e, aiohttp.ClientConnectorError) and "Cannot connect to host" in error_str ): logger.error("Non-transient connection error, aborting retries: %s", e) raise TMDBAPIError(f"Request failed after {attempt + 1} attempts: {e}") from e if attempt < max_retries - 1: logger.warning("Request failed (attempt %s): %s, retrying in %ss", attempt + 1, e, delay) await asyncio.sleep(delay) delay = min(delay * 2, 30) else: logger.error("Request failed after %s attempts: %s", max_retries, e) raise TMDBAPIError(f"Request failed after {max_retries} attempts: {last_error}") async def search_tv_show( self, query: str, language: str = "de-DE", page: int = 1 ) -> Dict[str, Any]: """Search for TV shows by name. Args: query: Search query (show name) language: Language for results (default: German) page: Page number for pagination Returns: Search results with list of shows Example: >>> results = await client.search_tv_show("Attack on Titan") >>> shows = results["results"] """ return await self._request( "search/tv", {"query": query, "language": language, "page": page} ) async def search_multi( self, query: str, language: str = "en-US", page: int = 1 ) -> Dict[str, Any]: """Search for movies and TV shows by name using TMDB multi search. Multi search returns both movies and TV shows, useful for anime that might be indexed as movies on TMDB. Args: query: Search query (show name) language: Language for results (default: English) page: Page number for pagination Returns: Search results with list of movies and TV shows Example: >>> results = await client.search_multi("Suzume no Tojimari") >>> shows = [r for r in results["results"] if r["media_type"] == "tv"] """ return await self._request( "search/multi", {"query": query, "language": language, "page": page} ) async def get_tv_show_details( self, tv_id: int, language: str = "de-DE", append_to_response: Optional[str] = None ) -> Dict[str, Any]: """Get detailed information about a TV show. Args: tv_id: TMDB TV show ID language: Language for metadata append_to_response: Additional data to include (e.g., "credits,images") Returns: TV show details including metadata, cast, etc. """ params = {"language": language} if append_to_response: params["append_to_response"] = append_to_response return await self._request(f"tv/{tv_id}", params) async def get_tv_show_content_ratings(self, tv_id: int) -> Dict[str, Any]: """Get content ratings for a TV show. Args: tv_id: TMDB TV show ID Returns: Content ratings by country """ return await self._request(f"tv/{tv_id}/content_ratings") async def get_tv_show_external_ids(self, tv_id: int) -> Dict[str, Any]: """Get external IDs (IMDB, TVDB) for a TV show. Args: tv_id: TMDB TV show ID Returns: Dictionary with external IDs (imdb_id, tvdb_id, etc.) """ return await self._request(f"tv/{tv_id}/external_ids") async def get_tv_show_images( self, tv_id: int, language: Optional[str] = None ) -> Dict[str, Any]: """Get images (posters, backdrops, logos) for a TV show. Args: tv_id: TMDB TV show ID language: Language filter for images (None = all languages) Returns: Dictionary with poster, backdrop, and logo lists """ params = {} if language: params["language"] = language return await self._request(f"tv/{tv_id}/images", params) async def download_image( self, image_path: str, local_path: Path, size: str = "original" ) -> None: """Download an image from TMDB. Args: image_path: Image path from TMDB API (e.g., "/abc123.jpg") local_path: Local file path to save image size: Image size (w500, original, etc.) Raises: TMDBAPIError: If download fails """ await self._ensure_session() url = f"{self.image_base_url}/{size}{image_path}" try: logger.debug("Downloading image from %s", url) async with self.session.get(url, timeout=aiohttp.ClientTimeout(total=60)) as resp: resp.raise_for_status() # Ensure parent directory exists local_path.parent.mkdir(parents=True, exist_ok=True) # Write image data with open(local_path, "wb") as f: f.write(await resp.read()) logger.info("Downloaded image to %s", local_path) except aiohttp.ClientError as e: raise TMDBAPIError(f"Failed to download image: {e}") def get_image_url(self, image_path: str, size: str = "original") -> str: """Get full URL for an image. Args: image_path: Image path from TMDB API size: Image size (w500, original, etc.) Returns: Full image URL """ return f"{self.image_base_url}/{size}{image_path}" async def close(self): """Close the aiohttp session and clean up resources.""" if self.session and not self.session.closed: await self.session.close() self.session = None logger.debug("TMDB client session closed") def __del__(self): """Warn if session is unclosed during garbage collection.""" if self.session is not None and not self.session.closed: logger.warning( "TMDBClient: unclosed session detected. " "Use 'async with TMDBClient(...)' or call close() explicitly." ) def clear_cache(self): """Clear the request cache.""" self._cache.clear() logger.debug("TMDB client cache cleared") def clear_negative_cache(self): """Clear the negative result cache.""" self._negative_cache.clear() logger.debug("TMDB negative cache cleared") def cleanup_expired_negative_cache(self) -> int: """Remove expired entries from negative cache. Returns: Number of entries removed """ now = time.monotonic() expired_keys = [ key for key, timestamp in self._negative_cache.items() if now - timestamp >= self.NEGATIVE_CACHE_TTL ] for key in expired_keys: del self._negative_cache[key] if expired_keys: logger.debug("Removed %d expired negative cache entries", len(expired_keys)) return len(expired_keys)