diff --git a/src/core/services/tmdb_client.py b/src/core/services/tmdb_client.py index b5370f9..00bf5dc 100644 --- a/src/core/services/tmdb_client.py +++ b/src/core/services/tmdb_client.py @@ -12,6 +12,7 @@ Example: import asyncio import logging +import time from pathlib import Path from typing import Any, Dict, List, Optional @@ -63,6 +64,11 @@ class TMDBClient: self.max_connections = max_connections self.session: Optional[aiohttp.ClientSession] = None self._cache: Dict[str, Any] = {} + # TMDB allows ~40 req/s; use 30 concurrent + per-second throttle to stay safe + self._semaphore = asyncio.Semaphore(30) + self._rate_limit_lock = asyncio.Lock() + self._request_timestamps: List[float] = [] + self._max_requests_per_second = 35 # Stay under TMDB's ~40/s limit async def __aenter__(self): """Async context manager entry.""" @@ -83,7 +89,7 @@ class TMDBClient: self, endpoint: str, params: Optional[Dict[str, Any]] = None, - max_retries: int = 3 + max_retries: int = 5 ) -> Dict[str, Any]: """Make an async request to TMDB API with retries. @@ -110,58 +116,82 @@ class TMDBClient: logger.debug("Cache hit for %s", endpoint) return self._cache[cache_key] - delay = 1 + delay = 2 last_error = None - for attempt in range(max_retries): - try: - # Re-ensure session before each attempt in case it was closed - await self._ensure_session() - - if self.session is None: - raise TMDBAPIError("Session is not available") - - logger.debug("TMDB API request: %s (attempt %s)", endpoint, attempt + 1) - async with self.session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=60)) as resp: - if resp.status == 401: - raise TMDBAPIError("Invalid TMDB API key") - elif resp.status == 404: - raise TMDBAPIError(f"Resource not found: {endpoint}") - elif resp.status == 429: - # Rate limit - wait longer - retry_after = int(resp.headers.get('Retry-After', delay * 2)) - logger.warning("Rate limited, waiting %ss", retry_after) - await asyncio.sleep(retry_after) - continue - - resp.raise_for_status() - data = await resp.json() - self._cache[cache_key] = data - return data - - except asyncio.TimeoutError as e: - last_error = e - if attempt < max_retries - 1: - logger.warning("Request timeout (attempt %s), retrying in %ss", attempt + 1, delay) - await asyncio.sleep(delay) - delay *= 2 - else: - logger.error("Request timed out after %s attempts", max_retries) - - except (aiohttp.ClientError, AttributeError) as e: - last_error = e - # If connector/session was closed, try to recreate it - if "Connector is closed" in str(e) or isinstance(e, AttributeError): - logger.warning("Session issue detected, recreating session: %s", e) - self.session = None + # Rate limiting: ensure we don't exceed ~35 requests/second + async with self._rate_limit_lock: + now = time.monotonic() + # Remove timestamps older than 1 second + self._request_timestamps = [ + ts for ts in self._request_timestamps if now - ts < 1.0 + ] + if len(self._request_timestamps) >= self._max_requests_per_second: + sleep_time = 1.0 - (now - self._request_timestamps[0]) + if sleep_time > 0: + logger.debug("Rate throttling: waiting %.2fs", sleep_time) + await asyncio.sleep(sleep_time) + self._request_timestamps.append(time.monotonic()) + + async with self._semaphore: + for attempt in range(max_retries): + try: + # Re-ensure session before each attempt in case it was closed await self._ensure_session() - - if attempt < max_retries - 1: - logger.warning("Request failed (attempt %s): %s, retrying in %ss", attempt + 1, e, delay) - await asyncio.sleep(delay) - delay *= 2 - else: - logger.error("Request failed after %s attempts: %s", max_retries, e) + + if self.session is None: + raise TMDBAPIError("Session is not available") + + logger.debug("TMDB API request: %s (attempt %s)", endpoint, attempt + 1) + async with self.session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=60)) as resp: + if resp.status == 401: + raise TMDBAPIError("Invalid TMDB API key") + elif resp.status == 404: + raise TMDBAPIError(f"Resource not found: {endpoint}") + elif resp.status == 429: + # Rate limit - wait longer with exponential backoff + retry_after = int(resp.headers.get('Retry-After', max(delay * 2, 10))) + logger.warning("Rate limited, waiting %ss", retry_after) + await asyncio.sleep(retry_after) + continue + + resp.raise_for_status() + data = await resp.json() + self._cache[cache_key] = data + return data + + except asyncio.TimeoutError as e: + last_error = e + if attempt < max_retries - 1: + logger.warning("Request timeout (attempt %s), retrying in %ss", attempt + 1, delay) + await asyncio.sleep(delay) + delay = min(delay * 2, 30) + else: + logger.error("Request timed out after %s attempts", max_retries) + + except (aiohttp.ClientError, AttributeError) as e: + last_error = e + # If connector/session was closed, try to recreate it + if "Connector is closed" in str(e) or isinstance(e, AttributeError): + logger.warning("Session issue detected, recreating session: %s", e) + self.session = None + await self._ensure_session() + + # DNS / host-unreachable errors are not transient — abort immediately + error_str = str(e) + if "name resolution" in error_str.lower() or ( + isinstance(e, aiohttp.ClientConnectorError) and + "Cannot connect to host" in error_str + ): + logger.error("Non-transient connection error, aborting retries: %s", e) + raise TMDBAPIError(f"Request failed after {attempt + 1} attempts: {e}") from e + + if attempt < max_retries - 1: + logger.warning("Request failed (attempt %s): %s, retrying in %ss", attempt + 1, e, delay) + await asyncio.sleep(delay) + delay = min(delay * 2, 30) + else: + logger.error("Request failed after %s attempts: %s", max_retries, e) raise TMDBAPIError(f"Request failed after {max_retries} attempts: {last_error}")