|
|
|
|
@@ -12,6 +12,7 @@ Example:
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
import logging
|
|
|
|
|
import time
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
|
|
|
|
|
|
@@ -63,6 +64,11 @@ class TMDBClient:
|
|
|
|
|
self.max_connections = max_connections
|
|
|
|
|
self.session: Optional[aiohttp.ClientSession] = None
|
|
|
|
|
self._cache: Dict[str, Any] = {}
|
|
|
|
|
# TMDB allows ~40 req/s; use 30 concurrent + per-second throttle to stay safe
|
|
|
|
|
self._semaphore = asyncio.Semaphore(30)
|
|
|
|
|
self._rate_limit_lock = asyncio.Lock()
|
|
|
|
|
self._request_timestamps: List[float] = []
|
|
|
|
|
self._max_requests_per_second = 35 # Stay under TMDB's ~40/s limit
|
|
|
|
|
|
|
|
|
|
async def __aenter__(self):
|
|
|
|
|
"""Async context manager entry."""
|
|
|
|
|
@@ -83,7 +89,7 @@ class TMDBClient:
|
|
|
|
|
self,
|
|
|
|
|
endpoint: str,
|
|
|
|
|
params: Optional[Dict[str, Any]] = None,
|
|
|
|
|
max_retries: int = 3
|
|
|
|
|
max_retries: int = 5
|
|
|
|
|
) -> Dict[str, Any]:
|
|
|
|
|
"""Make an async request to TMDB API with retries.
|
|
|
|
|
|
|
|
|
|
@@ -110,58 +116,82 @@ class TMDBClient:
|
|
|
|
|
logger.debug("Cache hit for %s", endpoint)
|
|
|
|
|
return self._cache[cache_key]
|
|
|
|
|
|
|
|
|
|
delay = 1
|
|
|
|
|
delay = 2
|
|
|
|
|
last_error = None
|
|
|
|
|
|
|
|
|
|
for attempt in range(max_retries):
|
|
|
|
|
try:
|
|
|
|
|
# Re-ensure session before each attempt in case it was closed
|
|
|
|
|
await self._ensure_session()
|
|
|
|
|
|
|
|
|
|
if self.session is None:
|
|
|
|
|
raise TMDBAPIError("Session is not available")
|
|
|
|
|
|
|
|
|
|
logger.debug("TMDB API request: %s (attempt %s)", endpoint, attempt + 1)
|
|
|
|
|
async with self.session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=60)) as resp:
|
|
|
|
|
if resp.status == 401:
|
|
|
|
|
raise TMDBAPIError("Invalid TMDB API key")
|
|
|
|
|
elif resp.status == 404:
|
|
|
|
|
raise TMDBAPIError(f"Resource not found: {endpoint}")
|
|
|
|
|
elif resp.status == 429:
|
|
|
|
|
# Rate limit - wait longer
|
|
|
|
|
retry_after = int(resp.headers.get('Retry-After', delay * 2))
|
|
|
|
|
logger.warning("Rate limited, waiting %ss", retry_after)
|
|
|
|
|
await asyncio.sleep(retry_after)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
resp.raise_for_status()
|
|
|
|
|
data = await resp.json()
|
|
|
|
|
self._cache[cache_key] = data
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
except asyncio.TimeoutError as e:
|
|
|
|
|
last_error = e
|
|
|
|
|
if attempt < max_retries - 1:
|
|
|
|
|
logger.warning("Request timeout (attempt %s), retrying in %ss", attempt + 1, delay)
|
|
|
|
|
await asyncio.sleep(delay)
|
|
|
|
|
delay *= 2
|
|
|
|
|
else:
|
|
|
|
|
logger.error("Request timed out after %s attempts", max_retries)
|
|
|
|
|
|
|
|
|
|
except (aiohttp.ClientError, AttributeError) as e:
|
|
|
|
|
last_error = e
|
|
|
|
|
# If connector/session was closed, try to recreate it
|
|
|
|
|
if "Connector is closed" in str(e) or isinstance(e, AttributeError):
|
|
|
|
|
logger.warning("Session issue detected, recreating session: %s", e)
|
|
|
|
|
self.session = None
|
|
|
|
|
# Rate limiting: ensure we don't exceed ~35 requests/second
|
|
|
|
|
async with self._rate_limit_lock:
|
|
|
|
|
now = time.monotonic()
|
|
|
|
|
# Remove timestamps older than 1 second
|
|
|
|
|
self._request_timestamps = [
|
|
|
|
|
ts for ts in self._request_timestamps if now - ts < 1.0
|
|
|
|
|
]
|
|
|
|
|
if len(self._request_timestamps) >= self._max_requests_per_second:
|
|
|
|
|
sleep_time = 1.0 - (now - self._request_timestamps[0])
|
|
|
|
|
if sleep_time > 0:
|
|
|
|
|
logger.debug("Rate throttling: waiting %.2fs", sleep_time)
|
|
|
|
|
await asyncio.sleep(sleep_time)
|
|
|
|
|
self._request_timestamps.append(time.monotonic())
|
|
|
|
|
|
|
|
|
|
async with self._semaphore:
|
|
|
|
|
for attempt in range(max_retries):
|
|
|
|
|
try:
|
|
|
|
|
# Re-ensure session before each attempt in case it was closed
|
|
|
|
|
await self._ensure_session()
|
|
|
|
|
|
|
|
|
|
if attempt < max_retries - 1:
|
|
|
|
|
logger.warning("Request failed (attempt %s): %s, retrying in %ss", attempt + 1, e, delay)
|
|
|
|
|
await asyncio.sleep(delay)
|
|
|
|
|
delay *= 2
|
|
|
|
|
else:
|
|
|
|
|
logger.error("Request failed after %s attempts: %s", max_retries, e)
|
|
|
|
|
|
|
|
|
|
if self.session is None:
|
|
|
|
|
raise TMDBAPIError("Session is not available")
|
|
|
|
|
|
|
|
|
|
logger.debug("TMDB API request: %s (attempt %s)", endpoint, attempt + 1)
|
|
|
|
|
async with self.session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=60)) as resp:
|
|
|
|
|
if resp.status == 401:
|
|
|
|
|
raise TMDBAPIError("Invalid TMDB API key")
|
|
|
|
|
elif resp.status == 404:
|
|
|
|
|
raise TMDBAPIError(f"Resource not found: {endpoint}")
|
|
|
|
|
elif resp.status == 429:
|
|
|
|
|
# Rate limit - wait longer with exponential backoff
|
|
|
|
|
retry_after = int(resp.headers.get('Retry-After', max(delay * 2, 10)))
|
|
|
|
|
logger.warning("Rate limited, waiting %ss", retry_after)
|
|
|
|
|
await asyncio.sleep(retry_after)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
resp.raise_for_status()
|
|
|
|
|
data = await resp.json()
|
|
|
|
|
self._cache[cache_key] = data
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
except asyncio.TimeoutError as e:
|
|
|
|
|
last_error = e
|
|
|
|
|
if attempt < max_retries - 1:
|
|
|
|
|
logger.warning("Request timeout (attempt %s), retrying in %ss", attempt + 1, delay)
|
|
|
|
|
await asyncio.sleep(delay)
|
|
|
|
|
delay = min(delay * 2, 30)
|
|
|
|
|
else:
|
|
|
|
|
logger.error("Request timed out after %s attempts", max_retries)
|
|
|
|
|
|
|
|
|
|
except (aiohttp.ClientError, AttributeError) as e:
|
|
|
|
|
last_error = e
|
|
|
|
|
# If connector/session was closed, try to recreate it
|
|
|
|
|
if "Connector is closed" in str(e) or isinstance(e, AttributeError):
|
|
|
|
|
logger.warning("Session issue detected, recreating session: %s", e)
|
|
|
|
|
self.session = None
|
|
|
|
|
await self._ensure_session()
|
|
|
|
|
|
|
|
|
|
# DNS / host-unreachable errors are not transient — abort immediately
|
|
|
|
|
error_str = str(e)
|
|
|
|
|
if "name resolution" in error_str.lower() or (
|
|
|
|
|
isinstance(e, aiohttp.ClientConnectorError) and
|
|
|
|
|
"Cannot connect to host" in error_str
|
|
|
|
|
):
|
|
|
|
|
logger.error("Non-transient connection error, aborting retries: %s", e)
|
|
|
|
|
raise TMDBAPIError(f"Request failed after {attempt + 1} attempts: {e}") from e
|
|
|
|
|
|
|
|
|
|
if attempt < max_retries - 1:
|
|
|
|
|
logger.warning("Request failed (attempt %s): %s, retrying in %ss", attempt + 1, e, delay)
|
|
|
|
|
await asyncio.sleep(delay)
|
|
|
|
|
delay = min(delay * 2, 30)
|
|
|
|
|
else:
|
|
|
|
|
logger.error("Request failed after %s attempts: %s", max_retries, e)
|
|
|
|
|
|
|
|
|
|
raise TMDBAPIError(f"Request failed after {max_retries} attempts: {last_error}")
|
|
|
|
|
|
|
|
|
|
|