"""TMDB API client for fetching TV show metadata. This module provides an async client for The Movie Database (TMDB) API, adapted from the scraper project to fit the AniworldMain architecture. Example: >>> async with TMDBClient(api_key="your_key") as client: ... results = await client.search_tv_show("Attack on Titan") ... show_id = results["results"][0]["id"] ... details = await client.get_tv_show_details(show_id) """ import asyncio import logging from pathlib import Path from typing import Any, Dict, List, Optional import aiohttp logger = logging.getLogger(__name__) class TMDBAPIError(Exception): """Exception raised for TMDB API errors.""" pass class TMDBClient: """Async TMDB API client for TV show metadata. Attributes: api_key: TMDB API key for authentication base_url: Base URL for TMDB API image_base_url: Base URL for TMDB images max_connections: Maximum concurrent connections session: aiohttp ClientSession for requests """ DEFAULT_BASE_URL = "https://api.themoviedb.org/3" DEFAULT_IMAGE_BASE_URL = "https://image.tmdb.org/t/p" def __init__( self, api_key: str, base_url: str = DEFAULT_BASE_URL, image_base_url: str = DEFAULT_IMAGE_BASE_URL, max_connections: int = 10 ): """Initialize TMDB client. Args: api_key: TMDB API key base_url: TMDB API base URL image_base_url: TMDB image base URL max_connections: Maximum concurrent connections """ if not api_key: raise ValueError("TMDB API key is required") self.api_key = api_key self.base_url = base_url.rstrip('/') self.image_base_url = image_base_url.rstrip('/') self.max_connections = max_connections self.session: Optional[aiohttp.ClientSession] = None self._cache: Dict[str, Any] = {} async def __aenter__(self): """Async context manager entry.""" await self._ensure_session() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self.close() async def _ensure_session(self): """Ensure aiohttp session is created.""" if self.session is None or self.session.closed: connector = aiohttp.TCPConnector(limit=self.max_connections) self.session = aiohttp.ClientSession(connector=connector) async def _request( self, endpoint: str, params: Optional[Dict[str, Any]] = None, max_retries: int = 3 ) -> Dict[str, Any]: """Make an async request to TMDB API with retries. Args: endpoint: API endpoint (e.g., 'search/tv') params: Query parameters max_retries: Maximum retry attempts Returns: API response as dictionary Raises: TMDBAPIError: If request fails after retries """ await self._ensure_session() url = f"{self.base_url}/{endpoint}" params = params or {} params["api_key"] = self.api_key # Cache key for deduplication cache_key = f"{endpoint}:{str(sorted(params.items()))}" if cache_key in self._cache: logger.debug(f"Cache hit for {endpoint}") return self._cache[cache_key] delay = 1 last_error = None for attempt in range(max_retries): try: # Re-ensure session before each attempt in case it was closed await self._ensure_session() if self.session is None: raise TMDBAPIError("Session is not available") logger.debug(f"TMDB API request: {endpoint} (attempt {attempt + 1})") async with self.session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=60)) as resp: if resp.status == 401: raise TMDBAPIError("Invalid TMDB API key") elif resp.status == 404: raise TMDBAPIError(f"Resource not found: {endpoint}") elif resp.status == 429: # Rate limit - wait longer retry_after = int(resp.headers.get('Retry-After', delay * 2)) logger.warning(f"Rate limited, waiting {retry_after}s") await asyncio.sleep(retry_after) continue resp.raise_for_status() data = await resp.json() self._cache[cache_key] = data return data except asyncio.TimeoutError as e: last_error = e if attempt < max_retries - 1: logger.warning(f"Request timeout (attempt {attempt + 1}), retrying in {delay}s") await asyncio.sleep(delay) delay *= 2 else: logger.error(f"Request timed out after {max_retries} attempts") except (aiohttp.ClientError, AttributeError) as e: last_error = e # If connector/session was closed, try to recreate it if "Connector is closed" in str(e) or isinstance(e, AttributeError): logger.warning(f"Session issue detected, recreating session: {e}") self.session = None await self._ensure_session() if attempt < max_retries - 1: logger.warning(f"Request failed (attempt {attempt + 1}): {e}, retrying in {delay}s") await asyncio.sleep(delay) delay *= 2 else: logger.error(f"Request failed after {max_retries} attempts: {e}") raise TMDBAPIError(f"Request failed after {max_retries} attempts: {last_error}") async def search_tv_show( self, query: str, language: str = "de-DE", page: int = 1 ) -> Dict[str, Any]: """Search for TV shows by name. Args: query: Search query (show name) language: Language for results (default: German) page: Page number for pagination Returns: Search results with list of shows Example: >>> results = await client.search_tv_show("Attack on Titan") >>> shows = results["results"] """ return await self._request( "search/tv", {"query": query, "language": language, "page": page} ) async def get_tv_show_details( self, tv_id: int, language: str = "de-DE", append_to_response: Optional[str] = None ) -> Dict[str, Any]: """Get detailed information about a TV show. Args: tv_id: TMDB TV show ID language: Language for metadata append_to_response: Additional data to include (e.g., "credits,images") Returns: TV show details including metadata, cast, etc. """ params = {"language": language} if append_to_response: params["append_to_response"] = append_to_response return await self._request(f"tv/{tv_id}", params) async def get_tv_show_content_ratings(self, tv_id: int) -> Dict[str, Any]: """Get content ratings for a TV show. Args: tv_id: TMDB TV show ID Returns: Content ratings by country """ return await self._request(f"tv/{tv_id}/content_ratings") async def get_tv_show_external_ids(self, tv_id: int) -> Dict[str, Any]: """Get external IDs (IMDB, TVDB) for a TV show. Args: tv_id: TMDB TV show ID Returns: Dictionary with external IDs (imdb_id, tvdb_id, etc.) """ return await self._request(f"tv/{tv_id}/external_ids") async def get_tv_show_images( self, tv_id: int, language: Optional[str] = None ) -> Dict[str, Any]: """Get images (posters, backdrops, logos) for a TV show. Args: tv_id: TMDB TV show ID language: Language filter for images (None = all languages) Returns: Dictionary with poster, backdrop, and logo lists """ params = {} if language: params["language"] = language return await self._request(f"tv/{tv_id}/images", params) async def download_image( self, image_path: str, local_path: Path, size: str = "original" ) -> None: """Download an image from TMDB. Args: image_path: Image path from TMDB API (e.g., "/abc123.jpg") local_path: Local file path to save image size: Image size (w500, original, etc.) Raises: TMDBAPIError: If download fails """ await self._ensure_session() url = f"{self.image_base_url}/{size}{image_path}" try: logger.debug(f"Downloading image from {url}") async with self.session.get(url, timeout=aiohttp.ClientTimeout(total=60)) as resp: resp.raise_for_status() # Ensure parent directory exists local_path.parent.mkdir(parents=True, exist_ok=True) # Write image data with open(local_path, "wb") as f: f.write(await resp.read()) logger.info(f"Downloaded image to {local_path}") except aiohttp.ClientError as e: raise TMDBAPIError(f"Failed to download image: {e}") def get_image_url(self, image_path: str, size: str = "original") -> str: """Get full URL for an image. Args: image_path: Image path from TMDB API size: Image size (w500, original, etc.) Returns: Full image URL """ return f"{self.image_base_url}/{size}{image_path}" async def close(self): """Close the aiohttp session and clean up resources.""" if self.session and not self.session.closed: await self.session.close() self.session = None logger.debug("TMDB client session closed") def clear_cache(self): """Clear the request cache.""" self._cache.clear() logger.debug("TMDB client cache cleared")