feat(tmdb): improve rate limiting and retry resilience
- Increase max_retries from 3 to 5 with exponential backoff capped at 30s - Add per-second rate limiter (~35 req/s) to stay under TMDB's ~40/s limit - Replace small semaphore (4) with larger one (30) + token-bucket throttle - Abort retries immediately on DNS/name-resolution failures - Increase rate-limit fallback wait from default to max(delay*2, 10)s
This commit is contained in:
@@ -12,6 +12,7 @@ Example:
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
@@ -63,6 +64,11 @@ class TMDBClient:
|
||||
self.max_connections = max_connections
|
||||
self.session: Optional[aiohttp.ClientSession] = None
|
||||
self._cache: Dict[str, Any] = {}
|
||||
# TMDB allows ~40 req/s; use 30 concurrent + per-second throttle to stay safe
|
||||
self._semaphore = asyncio.Semaphore(30)
|
||||
self._rate_limit_lock = asyncio.Lock()
|
||||
self._request_timestamps: List[float] = []
|
||||
self._max_requests_per_second = 35 # Stay under TMDB's ~40/s limit
|
||||
|
||||
async def __aenter__(self):
|
||||
"""Async context manager entry."""
|
||||
@@ -83,7 +89,7 @@ class TMDBClient:
|
||||
self,
|
||||
endpoint: str,
|
||||
params: Optional[Dict[str, Any]] = None,
|
||||
max_retries: int = 3
|
||||
max_retries: int = 5
|
||||
) -> Dict[str, Any]:
|
||||
"""Make an async request to TMDB API with retries.
|
||||
|
||||
@@ -110,58 +116,82 @@ class TMDBClient:
|
||||
logger.debug("Cache hit for %s", endpoint)
|
||||
return self._cache[cache_key]
|
||||
|
||||
delay = 1
|
||||
delay = 2
|
||||
last_error = None
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
# Re-ensure session before each attempt in case it was closed
|
||||
await self._ensure_session()
|
||||
# Rate limiting: ensure we don't exceed ~35 requests/second
|
||||
async with self._rate_limit_lock:
|
||||
now = time.monotonic()
|
||||
# Remove timestamps older than 1 second
|
||||
self._request_timestamps = [
|
||||
ts for ts in self._request_timestamps if now - ts < 1.0
|
||||
]
|
||||
if len(self._request_timestamps) >= self._max_requests_per_second:
|
||||
sleep_time = 1.0 - (now - self._request_timestamps[0])
|
||||
if sleep_time > 0:
|
||||
logger.debug("Rate throttling: waiting %.2fs", sleep_time)
|
||||
await asyncio.sleep(sleep_time)
|
||||
self._request_timestamps.append(time.monotonic())
|
||||
|
||||
if self.session is None:
|
||||
raise TMDBAPIError("Session is not available")
|
||||
|
||||
logger.debug("TMDB API request: %s (attempt %s)", endpoint, attempt + 1)
|
||||
async with self.session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=60)) as resp:
|
||||
if resp.status == 401:
|
||||
raise TMDBAPIError("Invalid TMDB API key")
|
||||
elif resp.status == 404:
|
||||
raise TMDBAPIError(f"Resource not found: {endpoint}")
|
||||
elif resp.status == 429:
|
||||
# Rate limit - wait longer
|
||||
retry_after = int(resp.headers.get('Retry-After', delay * 2))
|
||||
logger.warning("Rate limited, waiting %ss", retry_after)
|
||||
await asyncio.sleep(retry_after)
|
||||
continue
|
||||
|
||||
resp.raise_for_status()
|
||||
data = await resp.json()
|
||||
self._cache[cache_key] = data
|
||||
return data
|
||||
|
||||
except asyncio.TimeoutError as e:
|
||||
last_error = e
|
||||
if attempt < max_retries - 1:
|
||||
logger.warning("Request timeout (attempt %s), retrying in %ss", attempt + 1, delay)
|
||||
await asyncio.sleep(delay)
|
||||
delay *= 2
|
||||
else:
|
||||
logger.error("Request timed out after %s attempts", max_retries)
|
||||
|
||||
except (aiohttp.ClientError, AttributeError) as e:
|
||||
last_error = e
|
||||
# If connector/session was closed, try to recreate it
|
||||
if "Connector is closed" in str(e) or isinstance(e, AttributeError):
|
||||
logger.warning("Session issue detected, recreating session: %s", e)
|
||||
self.session = None
|
||||
async with self._semaphore:
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
# Re-ensure session before each attempt in case it was closed
|
||||
await self._ensure_session()
|
||||
|
||||
if attempt < max_retries - 1:
|
||||
logger.warning("Request failed (attempt %s): %s, retrying in %ss", attempt + 1, e, delay)
|
||||
await asyncio.sleep(delay)
|
||||
delay *= 2
|
||||
else:
|
||||
logger.error("Request failed after %s attempts: %s", max_retries, e)
|
||||
if self.session is None:
|
||||
raise TMDBAPIError("Session is not available")
|
||||
|
||||
logger.debug("TMDB API request: %s (attempt %s)", endpoint, attempt + 1)
|
||||
async with self.session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=60)) as resp:
|
||||
if resp.status == 401:
|
||||
raise TMDBAPIError("Invalid TMDB API key")
|
||||
elif resp.status == 404:
|
||||
raise TMDBAPIError(f"Resource not found: {endpoint}")
|
||||
elif resp.status == 429:
|
||||
# Rate limit - wait longer with exponential backoff
|
||||
retry_after = int(resp.headers.get('Retry-After', max(delay * 2, 10)))
|
||||
logger.warning("Rate limited, waiting %ss", retry_after)
|
||||
await asyncio.sleep(retry_after)
|
||||
continue
|
||||
|
||||
resp.raise_for_status()
|
||||
data = await resp.json()
|
||||
self._cache[cache_key] = data
|
||||
return data
|
||||
|
||||
except asyncio.TimeoutError as e:
|
||||
last_error = e
|
||||
if attempt < max_retries - 1:
|
||||
logger.warning("Request timeout (attempt %s), retrying in %ss", attempt + 1, delay)
|
||||
await asyncio.sleep(delay)
|
||||
delay = min(delay * 2, 30)
|
||||
else:
|
||||
logger.error("Request timed out after %s attempts", max_retries)
|
||||
|
||||
except (aiohttp.ClientError, AttributeError) as e:
|
||||
last_error = e
|
||||
# If connector/session was closed, try to recreate it
|
||||
if "Connector is closed" in str(e) or isinstance(e, AttributeError):
|
||||
logger.warning("Session issue detected, recreating session: %s", e)
|
||||
self.session = None
|
||||
await self._ensure_session()
|
||||
|
||||
# DNS / host-unreachable errors are not transient — abort immediately
|
||||
error_str = str(e)
|
||||
if "name resolution" in error_str.lower() or (
|
||||
isinstance(e, aiohttp.ClientConnectorError) and
|
||||
"Cannot connect to host" in error_str
|
||||
):
|
||||
logger.error("Non-transient connection error, aborting retries: %s", e)
|
||||
raise TMDBAPIError(f"Request failed after {attempt + 1} attempts: {e}") from e
|
||||
|
||||
if attempt < max_retries - 1:
|
||||
logger.warning("Request failed (attempt %s): %s, retrying in %ss", attempt + 1, e, delay)
|
||||
await asyncio.sleep(delay)
|
||||
delay = min(delay * 2, 30)
|
||||
else:
|
||||
logger.error("Request failed after %s attempts: %s", max_retries, e)
|
||||
|
||||
raise TMDBAPIError(f"Request failed after {max_retries} attempts: {last_error}")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user