Files
Aniworld/src/core/utils/image_downloader.py

355 lines
11 KiB
Python

"""Image downloader utility for NFO media files.
This module provides functions to download poster, logo, and fanart images
from TMDB and validate them.
Example:
>>> downloader = ImageDownloader()
>>> await downloader.download_poster(poster_url, "/path/to/poster.jpg")
"""
import asyncio
import logging
from pathlib import Path
from typing import Optional
import aiohttp
from PIL import Image
logger = logging.getLogger(__name__)
class ImageDownloadError(Exception):
"""Exception raised for image download failures."""
pass
class ImageDownloader:
"""Utility for downloading and validating images.
Supports async context manager protocol for proper resource cleanup.
Attributes:
max_retries: Maximum retry attempts for downloads
timeout: Request timeout in seconds
min_file_size: Minimum valid file size in bytes
session: Optional aiohttp session (managed internally)
Example:
>>> async with ImageDownloader() as downloader:
... await downloader.download_poster(url, path)
"""
def __init__(
self,
max_retries: int = 3,
timeout: int = 30,
min_file_size: int = 1024, # 1 KB
retry_delay: float = 1.0
):
"""Initialize image downloader.
Args:
max_retries: Maximum retry attempts
timeout: Request timeout in seconds
min_file_size: Minimum valid file size in bytes
retry_delay: Delay between retries in seconds
"""
self.max_retries = max_retries
self.timeout = timeout
self.min_file_size = min_file_size
self.retry_delay = retry_delay
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
"""Enter async context manager and create session."""
self._get_session() # Ensure session is created
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async context manager and cleanup resources."""
await self.close()
return False
async def close(self):
"""Close aiohttp session if open."""
if self.session and not self.session.closed:
await self.session.close()
self.session = None
def _get_session(self) -> aiohttp.ClientSession:
"""Get or create aiohttp session.
Returns:
Active aiohttp session
"""
# If no session, create one
if self.session is None:
timeout = aiohttp.ClientTimeout(total=self.timeout)
self.session = aiohttp.ClientSession(timeout=timeout)
return self.session
# If session exists, check if it's closed (handle real sessions only)
# Mock sessions from tests won't have a boolean closed attribute
try:
if hasattr(self.session, 'closed') and self.session.closed is True:
timeout = aiohttp.ClientTimeout(total=self.timeout)
self.session = aiohttp.ClientSession(timeout=timeout)
except (AttributeError, TypeError):
# Mock session or unusual object, just use it as-is
pass
return self.session
async def download_image(
self,
url: str,
local_path: Path,
skip_existing: bool = True,
validate: bool = True
) -> bool:
"""Download an image from URL to local path.
Args:
url: Image URL
local_path: Local file path to save image
skip_existing: Skip download if file already exists
validate: Validate image after download
Returns:
True if download successful, False otherwise
Raises:
ImageDownloadError: If download fails after retries
"""
# Check if file already exists
if skip_existing and local_path.exists():
if local_path.stat().st_size >= self.min_file_size:
logger.debug("Image already exists: %s", local_path)
return True
# Ensure parent directory exists
local_path.parent.mkdir(parents=True, exist_ok=True)
delay = self.retry_delay
last_error = None
for attempt in range(self.max_retries):
try:
logger.debug(
"Downloading image from %s (attempt %d)",
url,
attempt + 1,
)
# Use persistent session
session = self._get_session()
async with session.get(url) as resp:
if resp.status == 404:
logger.warning("Image not found: %s", url)
return False
resp.raise_for_status()
# Download image data
data = await resp.read()
# Check file size
if len(data) < self.min_file_size:
raise ImageDownloadError(
f"Downloaded file too small: {len(data)} bytes"
)
# Write to file
with open(local_path, "wb") as f:
f.write(data)
# Validate image if requested
if validate and not self.validate_image(local_path):
local_path.unlink(missing_ok=True)
raise ImageDownloadError("Image validation failed")
logger.info("Downloaded image to %s", local_path)
return True
except (aiohttp.ClientError, IOError, ImageDownloadError) as e:
last_error = e
if attempt < self.max_retries - 1:
logger.warning(
"Download failed (attempt %d): %s, retrying in %s",
attempt + 1,
e,
delay,
)
await asyncio.sleep(delay)
delay *= 2
else:
logger.error(
"Download failed after %d attempts: %s",
self.max_retries,
e,
)
raise ImageDownloadError(
f"Failed to download image after {self.max_retries} attempts: {last_error}"
)
async def download_poster(
self,
url: str,
series_folder: Path,
filename: str = "poster.jpg",
skip_existing: bool = True
) -> bool:
"""Download poster image.
Args:
url: Poster URL
series_folder: Series folder path
filename: Output filename (default: poster.jpg)
skip_existing: Skip if file exists
Returns:
True if successful
"""
local_path = series_folder / filename
try:
return await self.download_image(url, local_path, skip_existing)
except ImageDownloadError as e:
logger.warning("Failed to download poster: %s", e)
return False
async def download_logo(
self,
url: str,
series_folder: Path,
filename: str = "logo.png",
skip_existing: bool = True
) -> bool:
"""Download logo image.
Args:
url: Logo URL
series_folder: Series folder path
filename: Output filename (default: logo.png)
skip_existing: Skip if file exists
Returns:
True if successful
"""
local_path = series_folder / filename
try:
return await self.download_image(url, local_path, skip_existing)
except ImageDownloadError as e:
logger.warning("Failed to download logo: %s", e)
return False
async def download_fanart(
self,
url: str,
series_folder: Path,
filename: str = "fanart.jpg",
skip_existing: bool = True
) -> bool:
"""Download fanart/backdrop image.
Args:
url: Fanart URL
series_folder: Series folder path
filename: Output filename (default: fanart.jpg)
skip_existing: Skip if file exists
Returns:
True if successful
"""
local_path = series_folder / filename
try:
return await self.download_image(url, local_path, skip_existing)
except ImageDownloadError as e:
logger.warning("Failed to download fanart: %s", e)
return False
def validate_image(self, image_path: Path) -> bool:
"""Validate that file is a valid image.
Args:
image_path: Path to image file
Returns:
True if valid image, False otherwise
"""
try:
with Image.open(image_path) as img:
# Verify it's a valid image
img.verify()
# Check file size
if image_path.stat().st_size < self.min_file_size:
logger.warning("Image file too small: %s", image_path)
return False
return True
except Exception as e:
logger.warning("Image validation failed for %s: %s", image_path, e)
return False
async def download_all_media(
self,
series_folder: Path,
poster_url: Optional[str] = None,
logo_url: Optional[str] = None,
fanart_url: Optional[str] = None,
skip_existing: bool = True
) -> dict[str, bool]:
"""Download all media files (poster, logo, fanart).
Args:
series_folder: Series folder path
poster_url: Poster URL (optional)
logo_url: Logo URL (optional)
fanart_url: Fanart URL (optional)
skip_existing: Skip existing files
Returns:
Dictionary with download status for each file type
"""
results = {
"poster": None,
"logo": None,
"fanart": None
}
tasks = []
if poster_url:
tasks.append(("poster", self.download_poster(
poster_url, series_folder, skip_existing=skip_existing
)))
if logo_url:
tasks.append(("logo", self.download_logo(
logo_url, series_folder, skip_existing=skip_existing
)))
if fanart_url:
tasks.append(("fanart", self.download_fanart(
fanart_url, series_folder, skip_existing=skip_existing
)))
# Download concurrently
if tasks:
task_results = await asyncio.gather(
*[task for _, task in tasks],
return_exceptions=True
)
for (media_type, _), result in zip(tasks, task_results):
if isinstance(result, Exception):
logger.error("Error downloading %s: %s", media_type, result)
results[media_type] = False
else:
results[media_type] = result
return results