"""Download queue service for managing anime episode downloads. This module provides a comprehensive queue management system for handling concurrent anime episode downloads with priority-based scheduling, progress tracking, persistence, and automatic retry functionality. """ from __future__ import annotations import asyncio import json import uuid from collections import deque from concurrent.futures import ThreadPoolExecutor from datetime import datetime from pathlib import Path from typing import Callable, Dict, List, Optional import structlog from src.server.models.download import ( DownloadItem, DownloadPriority, DownloadProgress, DownloadStatus, EpisodeIdentifier, QueueStats, QueueStatus, ) from src.server.services.anime_service import AnimeService, AnimeServiceError from src.server.services.progress_service import ( ProgressService, ProgressType, get_progress_service, ) logger = structlog.get_logger(__name__) class DownloadServiceError(Exception): """Service-level exception for download queue operations.""" class DownloadService: """Manages the download queue with concurrent processing and persistence. Features: - Priority-based queue management - Concurrent download processing - Real-time progress tracking - Queue persistence and recovery - Automatic retry logic - WebSocket broadcast support """ def __init__( self, anime_service: AnimeService, max_concurrent_downloads: int = 2, max_retries: int = 3, persistence_path: str = "./data/download_queue.json", progress_service: Optional[ProgressService] = None, ): """Initialize the download service. Args: anime_service: Service for anime operations max_concurrent_downloads: Maximum simultaneous downloads max_retries: Maximum retry attempts for failed downloads persistence_path: Path to persist queue state progress_service: Optional progress service for tracking """ self._anime_service = anime_service self._max_concurrent = max_concurrent_downloads self._max_retries = max_retries self._persistence_path = Path(persistence_path) self._progress_service = progress_service or get_progress_service() # Queue storage by status self._pending_queue: deque[DownloadItem] = deque() self._active_downloads: Dict[str, DownloadItem] = {} self._completed_items: deque[DownloadItem] = deque(maxlen=100) self._failed_items: deque[DownloadItem] = deque(maxlen=50) # Control flags self._is_running = False self._is_paused = False self._shutdown_event = asyncio.Event() # Executor for blocking operations self._executor = ThreadPoolExecutor( max_workers=max_concurrent_downloads ) # WebSocket broadcast callback self._broadcast_callback: Optional[Callable] = None # Statistics tracking self._total_downloaded_mb: float = 0.0 self._download_speeds: deque[float] = deque(maxlen=10) # Load persisted queue self._load_queue() logger.info( "DownloadService initialized", max_concurrent=max_concurrent_downloads, max_retries=max_retries, ) def set_broadcast_callback(self, callback: Callable) -> None: """Set callback for broadcasting status updates via WebSocket.""" self._broadcast_callback = callback logger.debug("Broadcast callback registered") async def _broadcast_update(self, update_type: str, data: dict) -> None: """Broadcast update to connected WebSocket clients.""" if self._broadcast_callback: try: await self._broadcast_callback(update_type, data) except Exception as e: logger.error("Failed to broadcast update", error=str(e)) def _generate_item_id(self) -> str: """Generate unique identifier for download items.""" return str(uuid.uuid4()) def _load_queue(self) -> None: """Load persisted queue from disk.""" try: if self._persistence_path.exists(): with open(self._persistence_path, "r", encoding="utf-8") as f: data = json.load(f) # Restore pending items for item_dict in data.get("pending", []): item = DownloadItem(**item_dict) # Reset status if was downloading when saved if item.status == DownloadStatus.DOWNLOADING: item.status = DownloadStatus.PENDING self._pending_queue.append(item) # Restore failed items that can be retried for item_dict in data.get("failed", []): item = DownloadItem(**item_dict) if item.retry_count < self._max_retries: item.status = DownloadStatus.PENDING self._pending_queue.append(item) else: self._failed_items.append(item) logger.info( "Queue restored from disk", pending_count=len(self._pending_queue), failed_count=len(self._failed_items), ) except Exception as e: logger.error("Failed to load persisted queue", error=str(e)) def _save_queue(self) -> None: """Persist current queue state to disk.""" try: self._persistence_path.parent.mkdir(parents=True, exist_ok=True) data = { "pending": [ item.model_dump(mode="json") for item in self._pending_queue ], "active": [ item.model_dump(mode="json") for item in self._active_downloads.values() ], "failed": [ item.model_dump(mode="json") for item in self._failed_items ], "timestamp": datetime.utcnow().isoformat(), } with open(self._persistence_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) logger.debug("Queue persisted to disk") except Exception as e: logger.error("Failed to persist queue", error=str(e)) async def add_to_queue( self, serie_id: str, serie_name: str, episodes: List[EpisodeIdentifier], priority: DownloadPriority = DownloadPriority.NORMAL, ) -> List[str]: """Add episodes to the download queue. Args: serie_id: Series identifier serie_name: Series display name episodes: List of episodes to download priority: Queue priority level Returns: List of created download item IDs Raises: DownloadServiceError: If adding items fails """ created_ids = [] try: for episode in episodes: item = DownloadItem( id=self._generate_item_id(), serie_id=serie_id, serie_name=serie_name, episode=episode, status=DownloadStatus.PENDING, priority=priority, added_at=datetime.utcnow(), ) # Insert based on priority if priority == DownloadPriority.HIGH: self._pending_queue.appendleft(item) else: self._pending_queue.append(item) created_ids.append(item.id) logger.info( "Item added to queue", item_id=item.id, serie=serie_name, season=episode.season, episode=episode.episode, priority=priority.value, ) self._save_queue() # Broadcast update await self._broadcast_update( "queue_updated", {"added_ids": created_ids} ) return created_ids except Exception as e: logger.error("Failed to add items to queue", error=str(e)) raise DownloadServiceError(f"Failed to add items: {str(e)}") from e async def remove_from_queue(self, item_ids: List[str]) -> List[str]: """Remove items from the queue. Args: item_ids: List of download item IDs to remove Returns: List of successfully removed item IDs Raises: DownloadServiceError: If removal fails """ removed_ids = [] try: for item_id in item_ids: # Check if item is currently downloading if item_id in self._active_downloads: item = self._active_downloads[item_id] item.status = DownloadStatus.CANCELLED item.completed_at = datetime.utcnow() self._failed_items.append(item) del self._active_downloads[item_id] removed_ids.append(item_id) logger.info("Cancelled active download", item_id=item_id) continue # Check pending queue for item in list(self._pending_queue): if item.id == item_id: self._pending_queue.remove(item) removed_ids.append(item_id) logger.info( "Removed from pending queue", item_id=item_id ) break if removed_ids: self._save_queue() await self._broadcast_update( "queue_updated", {"removed_ids": removed_ids} ) return removed_ids except Exception as e: logger.error("Failed to remove items", error=str(e)) raise DownloadServiceError( f"Failed to remove items: {str(e)}" ) from e async def reorder_queue(self, item_id: str, new_position: int) -> bool: """Reorder an item in the pending queue. Args: item_id: Download item ID to reorder new_position: New position in queue (0-based) Returns: True if reordering was successful Raises: DownloadServiceError: If reordering fails """ try: # Find and remove item item_to_move = None for item in list(self._pending_queue): if item.id == item_id: self._pending_queue.remove(item) item_to_move = item break if not item_to_move: raise DownloadServiceError( f"Item {item_id} not found in pending queue" ) # Insert at new position queue_list = list(self._pending_queue) new_position = max(0, min(new_position, len(queue_list))) queue_list.insert(new_position, item_to_move) self._pending_queue = deque(queue_list) self._save_queue() await self._broadcast_update( "queue_reordered", {"item_id": item_id, "position": new_position} ) logger.info( "Queue item reordered", item_id=item_id, new_position=new_position ) return True except Exception as e: logger.error("Failed to reorder queue", error=str(e)) raise DownloadServiceError( f"Failed to reorder: {str(e)}" ) from e async def get_queue_status(self) -> QueueStatus: """Get current status of all queues. Returns: Complete queue status with all items """ return QueueStatus( is_running=self._is_running, is_paused=self._is_paused, active_downloads=list(self._active_downloads.values()), pending_queue=list(self._pending_queue), completed_downloads=list(self._completed_items), failed_downloads=list(self._failed_items), ) async def get_queue_stats(self) -> QueueStats: """Calculate queue statistics. Returns: Statistics about the download queue """ active_count = len(self._active_downloads) pending_count = len(self._pending_queue) completed_count = len(self._completed_items) failed_count = len(self._failed_items) # Calculate average speed avg_speed = None if self._download_speeds: avg_speed = ( sum(self._download_speeds) / len(self._download_speeds) ) # Estimate remaining time eta_seconds = None if avg_speed and avg_speed > 0 and pending_count > 0: # Rough estimation based on average file size estimated_size_per_episode = 500 # MB remaining_mb = pending_count * estimated_size_per_episode eta_seconds = int(remaining_mb / avg_speed) return QueueStats( total_items=( active_count + pending_count + completed_count + failed_count ), pending_count=pending_count, active_count=active_count, completed_count=completed_count, failed_count=failed_count, total_downloaded_mb=self._total_downloaded_mb, average_speed_mbps=avg_speed, estimated_time_remaining=eta_seconds, ) async def pause_queue(self) -> None: """Pause download processing.""" self._is_paused = True logger.info("Download queue paused") await self._broadcast_update("queue_paused", {}) async def resume_queue(self) -> None: """Resume download processing.""" self._is_paused = False logger.info("Download queue resumed") await self._broadcast_update("queue_resumed", {}) async def clear_completed(self) -> int: """Clear completed downloads from history. Returns: Number of items cleared """ count = len(self._completed_items) self._completed_items.clear() logger.info("Cleared completed items", count=count) return count async def retry_failed( self, item_ids: Optional[List[str]] = None ) -> List[str]: """Retry failed downloads. Args: item_ids: Specific item IDs to retry, or None for all failed items Returns: List of item IDs moved back to pending queue """ retried_ids = [] try: failed_list = list(self._failed_items) for item in failed_list: # Skip if specific IDs requested and this isn't one if item_ids and item.id not in item_ids: continue # Skip if max retries reached if item.retry_count >= self._max_retries: continue # Move back to pending self._failed_items.remove(item) item.status = DownloadStatus.PENDING item.retry_count += 1 item.error = None item.progress = None self._pending_queue.append(item) retried_ids.append(item.id) logger.info( "Retrying failed item", item_id=item.id, retry_count=item.retry_count ) if retried_ids: self._save_queue() await self._broadcast_update( "items_retried", {"item_ids": retried_ids} ) return retried_ids except Exception as e: logger.error("Failed to retry items", error=str(e)) raise DownloadServiceError( f"Failed to retry: {str(e)}" ) from e def _create_progress_callback(self, item: DownloadItem) -> Callable: """Create a progress callback for a download item. Args: item: Download item to track progress for Returns: Callback function for progress updates """ def progress_callback(progress_data: dict) -> None: """Update progress and broadcast to clients.""" try: # Update item progress item.progress = DownloadProgress( percent=progress_data.get("percent", 0.0), downloaded_mb=progress_data.get("downloaded_mb", 0.0), total_mb=progress_data.get("total_mb"), speed_mbps=progress_data.get("speed_mbps"), eta_seconds=progress_data.get("eta_seconds"), ) # Track speed for statistics if item.progress.speed_mbps: self._download_speeds.append(item.progress.speed_mbps) # Update progress service if item.progress.total_mb and item.progress.total_mb > 0: current_mb = int(item.progress.downloaded_mb) total_mb = int(item.progress.total_mb) asyncio.create_task( self._progress_service.update_progress( progress_id=f"download_{item.id}", current=current_mb, total=total_mb, metadata={ "speed_mbps": item.progress.speed_mbps, "eta_seconds": item.progress.eta_seconds, }, ) ) # Broadcast update (fire and forget) asyncio.create_task( self._broadcast_update( "download_progress", { "item_id": item.id, "progress": item.progress.model_dump(mode="json"), }, ) ) except Exception as e: logger.error("Progress callback error", error=str(e)) return progress_callback async def _process_download(self, item: DownloadItem) -> None: """Process a single download item. Args: item: Download item to process """ try: # Update status item.status = DownloadStatus.DOWNLOADING item.started_at = datetime.utcnow() self._active_downloads[item.id] = item logger.info( "Starting download", item_id=item.id, serie=item.serie_name, season=item.episode.season, episode=item.episode.episode, ) # Start progress tracking await self._progress_service.start_progress( progress_id=f"download_{item.id}", progress_type=ProgressType.DOWNLOAD, title=f"Downloading {item.serie_name}", message=( f"S{item.episode.season:02d}E{item.episode.episode:02d}" ), metadata={ "item_id": item.id, "serie_name": item.serie_name, "season": item.episode.season, "episode": item.episode.episode, }, ) # Create progress callback progress_callback = self._create_progress_callback(item) # Execute download via anime service success = await self._anime_service.download( serie_folder=item.serie_id, season=item.episode.season, episode=item.episode.episode, key=item.serie_id, # Assuming serie_id is the provider key callback=progress_callback, ) # Handle result if success: item.status = DownloadStatus.COMPLETED item.completed_at = datetime.utcnow() # Track downloaded size if item.progress and item.progress.downloaded_mb: self._total_downloaded_mb += item.progress.downloaded_mb self._completed_items.append(item) logger.info( "Download completed successfully", item_id=item.id ) # Complete progress tracking await self._progress_service.complete_progress( progress_id=f"download_{item.id}", message="Download completed successfully", metadata={ "downloaded_mb": item.progress.downloaded_mb if item.progress else 0, }, ) await self._broadcast_update( "download_completed", {"item_id": item.id} ) else: raise AnimeServiceError("Download returned False") except Exception as e: # Handle failure item.status = DownloadStatus.FAILED item.completed_at = datetime.utcnow() item.error = str(e) self._failed_items.append(item) logger.error( "Download failed", item_id=item.id, error=str(e), retry_count=item.retry_count, ) # Fail progress tracking await self._progress_service.fail_progress( progress_id=f"download_{item.id}", error_message=str(e), metadata={"retry_count": item.retry_count}, ) await self._broadcast_update( "download_failed", {"item_id": item.id, "error": item.error}, ) finally: # Remove from active downloads if item.id in self._active_downloads: del self._active_downloads[item.id] self._save_queue() async def _queue_processor(self) -> None: """Main queue processing loop.""" logger.info("Queue processor started") while not self._shutdown_event.is_set(): try: # Wait if paused if self._is_paused: await asyncio.sleep(1) continue # Check if we can start more downloads if len(self._active_downloads) >= self._max_concurrent: await asyncio.sleep(1) continue # Get next item from queue if not self._pending_queue: await asyncio.sleep(1) continue item = self._pending_queue.popleft() # Process download in background asyncio.create_task(self._process_download(item)) except Exception as e: logger.error("Queue processor error", error=str(e)) await asyncio.sleep(5) logger.info("Queue processor stopped") async def start(self) -> None: """Start the download queue processor.""" if self._is_running: logger.warning("Queue processor already running") return self._is_running = True self._shutdown_event.clear() # Start processor task asyncio.create_task(self._queue_processor()) logger.info("Download queue service started") async def stop(self) -> None: """Stop the download queue processor.""" if not self._is_running: return logger.info("Stopping download queue service...") self._is_running = False self._shutdown_event.set() # Wait for active downloads to complete (with timeout) timeout = 30 # seconds start_time = asyncio.get_event_loop().time() while ( self._active_downloads and (asyncio.get_event_loop().time() - start_time) < timeout ): await asyncio.sleep(1) # Save final state self._save_queue() # Shutdown executor self._executor.shutdown(wait=True) logger.info("Download queue service stopped") # Singleton instance _download_service_instance: Optional[DownloadService] = None def get_download_service(anime_service: AnimeService) -> DownloadService: """Factory function for FastAPI dependency injection. Args: anime_service: AnimeService instance Returns: Singleton DownloadService instance """ global _download_service_instance if _download_service_instance is None: _download_service_instance = DownloadService(anime_service) return _download_service_instance