"""Download queue service for managing anime episode downloads. This module provides a simplified queue management system for handling anime episode downloads with manual start/stop controls, progress tracking, persistence, and retry functionality. """ from __future__ import annotations import asyncio import json import uuid from collections import deque from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional import structlog from src.server.models.download import ( DownloadItem, DownloadPriority, DownloadStatus, EpisodeIdentifier, QueueStats, QueueStatus, ) from src.server.services.anime_service import AnimeService, AnimeServiceError from src.server.services.progress_service import ProgressService, get_progress_service logger = structlog.get_logger(__name__) class DownloadServiceError(Exception): """Service-level exception for download queue operations.""" class DownloadService: """Manages the download queue with manual start/stop controls. Features: - Manual download start/stop - FIFO queue processing - Real-time progress tracking - Queue persistence and recovery - Automatic retry logic - WebSocket broadcast support """ def __init__( self, anime_service: AnimeService, max_retries: int = 3, persistence_path: str = "./data/download_queue.json", progress_service: Optional[ProgressService] = None, ): """Initialize the download service. Args: anime_service: Service for anime operations max_retries: Maximum retry attempts for failed downloads persistence_path: Path to persist queue state progress_service: Optional progress service for tracking """ self._anime_service = anime_service self._max_retries = max_retries self._persistence_path = Path(persistence_path) self._progress_service = progress_service or get_progress_service() # Queue storage by status self._pending_queue: deque[DownloadItem] = deque() # Helper dict for O(1) lookup of pending items by ID self._pending_items_by_id: Dict[str, DownloadItem] = {} self._active_download: Optional[DownloadItem] = None self._completed_items: deque[DownloadItem] = deque(maxlen=100) self._failed_items: deque[DownloadItem] = deque(maxlen=50) # Control flags self._is_stopped = True # Queue processing is stopped by default self._is_shutting_down = False # Flag to indicate shutdown # Executor for blocking operations self._executor = ThreadPoolExecutor(max_workers=1) # Track active download task for cancellation self._active_download_task: Optional[asyncio.Task] = None # Statistics tracking self._total_downloaded_mb: float = 0.0 self._download_speeds: deque[float] = deque(maxlen=10) # Track if queue progress has been initialized self._queue_progress_initialized: bool = False # Load persisted queue self._load_queue() logger.info( "DownloadService initialized", max_retries=max_retries, ) async def _init_queue_progress(self) -> None: """Initialize the download queue progress tracking. This is called lazily when queue processing starts to ensure the event loop is running and the coroutine can be properly awaited. """ if self._queue_progress_initialized: return try: from src.server.services.progress_service import ProgressType await self._progress_service.start_progress( progress_id="download_queue", progress_type=ProgressType.QUEUE, title="Download Queue", message="Queue ready", ) self._queue_progress_initialized = True except Exception as e: logger.error("Failed to initialize queue progress", error=str(e)) def _add_to_pending_queue( self, item: DownloadItem, front: bool = False ) -> None: """Add item to pending queue and update helper dict. Args: item: Download item to add front: If True, add to front of queue (higher priority) """ if front: self._pending_queue.appendleft(item) else: self._pending_queue.append(item) self._pending_items_by_id[item.id] = item def _remove_from_pending_queue(self, item_or_id: str) -> Optional[DownloadItem]: # noqa: E501 """Remove item from pending queue and update helper dict. Args: item_or_id: Item ID to remove Returns: Removed item or None if not found """ if isinstance(item_or_id, str): item = self._pending_items_by_id.get(item_or_id) if not item: return None item_id = item_or_id else: item = item_or_id item_id = item.id try: self._pending_queue.remove(item) del self._pending_items_by_id[item_id] return item except (ValueError, KeyError): return None def _generate_item_id(self) -> str: """Generate unique identifier for download items.""" return str(uuid.uuid4()) def _load_queue(self) -> None: """Load persisted queue from disk.""" try: if self._persistence_path.exists(): with open(self._persistence_path, "r", encoding="utf-8") as f: data = json.load(f) # Restore pending items for item_dict in data.get("pending", []): item = DownloadItem(**item_dict) # Reset status if was downloading when saved if item.status == DownloadStatus.DOWNLOADING: item.status = DownloadStatus.PENDING self._add_to_pending_queue(item) # Restore failed items that can be retried for item_dict in data.get("failed", []): item = DownloadItem(**item_dict) if item.retry_count < self._max_retries: item.status = DownloadStatus.PENDING self._add_to_pending_queue(item) else: self._failed_items.append(item) logger.info( "Queue restored from disk", pending_count=len(self._pending_queue), failed_count=len(self._failed_items), ) except Exception as e: logger.error("Failed to load persisted queue", error=str(e)) def _save_queue(self) -> None: """Persist current queue state to disk.""" try: self._persistence_path.parent.mkdir(parents=True, exist_ok=True) active_items = ( [self._active_download] if self._active_download else [] ) data = { "pending": [ item.model_dump(mode="json") for item in self._pending_queue ], "active": [ item.model_dump(mode="json") for item in active_items ], "failed": [ item.model_dump(mode="json") for item in self._failed_items ], "timestamp": datetime.now(timezone.utc).isoformat(), } with open(self._persistence_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) logger.debug("Queue persisted to disk") except Exception as e: logger.error("Failed to persist queue", error=str(e)) async def add_to_queue( self, serie_id: str, serie_folder: str, serie_name: str, episodes: List[EpisodeIdentifier], priority: DownloadPriority = DownloadPriority.NORMAL, ) -> List[str]: """Add episodes to the download queue (FIFO order). Args: serie_id: Series identifier (provider key) serie_folder: Series folder name on disk serie_name: Series display name episodes: List of episodes to download priority: Queue priority level (ignored, kept for compatibility) Returns: List of created download item IDs Raises: DownloadServiceError: If adding items fails """ # Initialize queue progress tracking if not already done await self._init_queue_progress() created_ids = [] try: for episode in episodes: item = DownloadItem( id=self._generate_item_id(), serie_id=serie_id, serie_folder=serie_folder, serie_name=serie_name, episode=episode, status=DownloadStatus.PENDING, priority=priority, added_at=datetime.now(timezone.utc), ) # Always append to end (FIFO order) self._add_to_pending_queue(item, front=False) created_ids.append(item.id) logger.info( "Item added to queue", item_id=item.id, serie=serie_name, season=episode.season, episode=episode.episode, ) self._save_queue() # Notify via progress service queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message=f"Added {len(created_ids)} items to queue", metadata={ "action": "items_added", "added_ids": created_ids, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) return created_ids except Exception as e: logger.error("Failed to add items to queue", error=str(e)) raise DownloadServiceError(f"Failed to add items: {str(e)}") from e async def remove_from_queue(self, item_ids: List[str]) -> List[str]: """Remove items from the queue. Args: item_ids: List of download item IDs to remove Returns: List of successfully removed item IDs Raises: DownloadServiceError: If removal fails """ removed_ids = [] try: for item_id in item_ids: # Check if item is currently downloading active = self._active_download if active and active.id == item_id: item = active item.status = DownloadStatus.CANCELLED item.completed_at = datetime.now(timezone.utc) self._failed_items.append(item) self._active_download = None removed_ids.append(item_id) logger.info("Cancelled active download", item_id=item_id) continue # Check pending queue - O(1) lookup using helper dict if item_id in self._pending_items_by_id: item = self._pending_items_by_id[item_id] self._pending_queue.remove(item) del self._pending_items_by_id[item_id] removed_ids.append(item_id) logger.info( "Removed from pending queue", item_id=item_id ) if removed_ids: self._save_queue() # Notify via progress service queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message=f"Removed {len(removed_ids)} items from queue", metadata={ "action": "items_removed", "removed_ids": removed_ids, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) return removed_ids except Exception as e: logger.error("Failed to remove items", error=str(e)) raise DownloadServiceError( f"Failed to remove items: {str(e)}" ) from e async def reorder_queue(self, item_ids: List[str]) -> None: """Reorder pending queue items. Args: item_ids: List of item IDs in desired order. Items not in this list remain at end of queue. Raises: DownloadServiceError: If reordering fails """ try: # Build new queue based on specified order new_queue = deque() remaining_items = list(self._pending_queue) # Add items in specified order for item_id in item_ids: if item_id in self._pending_items_by_id: item = self._pending_items_by_id[item_id] new_queue.append(item) remaining_items.remove(item) # Add remaining items that weren't in the reorder list for item in remaining_items: new_queue.append(item) # Replace queue self._pending_queue = new_queue # Save updated queue self._save_queue() # Notify via progress service queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message=f"Queue reordered with {len(item_ids)} items", metadata={ "action": "queue_reordered", "reordered_count": len(item_ids), "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) logger.info("Queue reordered", reordered_count=len(item_ids)) except Exception as e: logger.error("Failed to reorder queue", error=str(e)) raise DownloadServiceError( f"Failed to reorder queue: {str(e)}" ) from e async def start_queue_processing(self) -> Optional[str]: """Start automatic queue processing of all pending downloads. This will process all pending downloads one by one until the queue is empty or stopped. The processing continues even if the browser is closed. Returns: Item ID of first started download, or None if queue is empty Raises: DownloadServiceError: If queue processing is already active """ try: # Initialize queue progress tracking if not already done await self._init_queue_progress() # Check if download already active if self._active_download: raise DownloadServiceError( "Queue processing is already active" ) # Check if queue is empty if not self._pending_queue: logger.info("No pending downloads to start") return None # Mark queue as running self._is_stopped = False # Start queue processing in background asyncio.create_task(self._process_queue()) logger.info("Queue processing started") return "queue_started" except Exception as e: logger.error("Failed to start queue processing", error=str(e)) raise DownloadServiceError( f"Failed to start queue processing: {str(e)}" ) from e async def _process_queue(self) -> None: """Process all items in the queue sequentially. This runs continuously until the queue is empty or stopped. Each download is processed one at a time, and the next one starts automatically after the previous one completes. """ logger.info("Queue processor started") while not self._is_stopped and len(self._pending_queue) > 0: try: # Get next item from queue item = self._pending_queue.popleft() del self._pending_items_by_id[item.id] logger.info( "Processing next item from queue", item_id=item.id, serie=item.serie_name, remaining=len(self._pending_queue) ) # Notify via progress service queue_status = await self.get_queue_status() msg = ( f"Started: {item.serie_name} " f"S{item.episode.season:02d}E{item.episode.episode:02d}" ) await self._progress_service.update_progress( progress_id="download_queue", message=msg, metadata={ "action": "download_started", "item_id": item.id, "serie_name": item.serie_name, "season": item.episode.season, "episode": item.episode.episode, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) # Process the download (this will wait until complete) self._active_download_task = asyncio.create_task( self._process_download(item) ) await self._active_download_task self._active_download_task = None # Small delay between downloads await asyncio.sleep(1) except Exception as e: logger.error( "Error in queue processing loop", error=str(e), exc_info=True ) # Continue with next item even if one fails await asyncio.sleep(2) # Queue processing completed self._is_stopped = True if len(self._pending_queue) == 0: logger.info("Queue processing completed - all items processed") queue_status = await self.get_queue_status() await self._progress_service.complete_progress( progress_id="download_queue", message="All downloads completed", metadata={ "queue_status": queue_status.model_dump(mode="json") }, ) else: logger.info("Queue processing stopped by user") async def start_next_download(self) -> Optional[str]: """Legacy method - redirects to start_queue_processing. Returns: Item ID of started download, or None if queue is empty Raises: DownloadServiceError: If a download is already active """ return await self.start_queue_processing() async def stop_downloads(self) -> None: """Stop processing new downloads from queue. Current download will continue, but no new downloads will start. """ self._is_stopped = True logger.info("Download processing stopped") # Notify via progress service queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message="Queue processing stopped", metadata={ "action": "queue_stopped", "is_stopped": True, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) async def get_queue_status(self) -> QueueStatus: """Get current status of all queues. Returns: Complete queue status with all items """ active_downloads = ( [self._active_download] if self._active_download else [] ) return QueueStatus( is_running=not self._is_stopped, is_paused=False, # Kept for compatibility active_downloads=active_downloads, pending_queue=list(self._pending_queue), completed_downloads=list(self._completed_items), failed_downloads=list(self._failed_items), ) async def get_queue_stats(self) -> QueueStats: """Calculate queue statistics. Returns: Statistics about the download queue """ active_count = 1 if self._active_download else 0 pending_count = len(self._pending_queue) completed_count = len(self._completed_items) failed_count = len(self._failed_items) # Calculate average speed avg_speed = None if self._download_speeds: avg_speed = ( sum(self._download_speeds) / len(self._download_speeds) ) # Estimate remaining time eta_seconds = None if avg_speed and avg_speed > 0 and pending_count > 0: # Rough estimation based on average file size estimated_size_per_episode = 500 # MB remaining_mb = pending_count * estimated_size_per_episode eta_seconds = int(remaining_mb / avg_speed) return QueueStats( total_items=( active_count + pending_count + completed_count + failed_count ), pending_count=pending_count, active_count=active_count, completed_count=completed_count, failed_count=failed_count, total_downloaded_mb=self._total_downloaded_mb, average_speed_mbps=avg_speed, estimated_time_remaining=eta_seconds, ) async def clear_completed(self) -> int: """Clear completed downloads from history. Returns: Number of items cleared """ count = len(self._completed_items) self._completed_items.clear() logger.info("Cleared completed items", count=count) # Notify via progress service if count > 0: queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message=f"Cleared {count} completed items", metadata={ "action": "completed_cleared", "cleared_count": count, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) return count async def clear_failed(self) -> int: """Clear failed downloads from history. Returns: Number of items cleared """ count = len(self._failed_items) self._failed_items.clear() logger.info("Cleared failed items", count=count) # Notify via progress service if count > 0: queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message=f"Cleared {count} failed items", metadata={ "action": "failed_cleared", "cleared_count": count, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) return count async def clear_pending(self) -> int: """Clear all pending downloads from the queue. Returns: Number of items cleared """ count = len(self._pending_queue) self._pending_queue.clear() self._pending_items_by_id.clear() logger.info("Cleared pending items", count=count) # Save queue state self._save_queue() # Notify via progress service if count > 0: queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message=f"Cleared {count} pending items", metadata={ "action": "pending_cleared", "cleared_count": count, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) return count async def retry_failed( self, item_ids: Optional[List[str]] = None ) -> List[str]: """Retry failed downloads. Args: item_ids: Specific item IDs to retry, or None for all failed items Returns: List of item IDs moved back to pending queue """ retried_ids = [] try: failed_list = list(self._failed_items) for item in failed_list: # Skip if specific IDs requested and this isn't one if item_ids and item.id not in item_ids: continue # Skip if max retries reached if item.retry_count >= self._max_retries: continue # Move back to pending self._failed_items.remove(item) item.status = DownloadStatus.PENDING item.retry_count += 1 item.error = None item.progress = None self._add_to_pending_queue(item) retried_ids.append(item.id) logger.info( "Retrying failed item", item_id=item.id, retry_count=item.retry_count ) if retried_ids: self._save_queue() # Notify via progress service queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message=f"Retried {len(retried_ids)} failed items", metadata={ "action": "items_retried", "retried_ids": retried_ids, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) return retried_ids except Exception as e: logger.error("Failed to retry items", error=str(e)) raise DownloadServiceError( f"Failed to retry: {str(e)}" ) from e async def _process_download(self, item: DownloadItem) -> None: """Process a single download item. Args: item: Download item to process """ try: # Check if shutting down if self._is_shutting_down: logger.info("Skipping download due to shutdown") return # Update status item.status = DownloadStatus.DOWNLOADING item.started_at = datetime.now(timezone.utc) self._active_download = item logger.info( "Starting download", item_id=item.id, serie=item.serie_name, season=item.episode.season, episode=item.episode.episode, ) # Execute download via anime service # AnimeService handles ALL progress via SeriesApp events: # - download started/progress/completed/failed events # - All updates forwarded to ProgressService # - ProgressService broadcasts to WebSocket clients folder = item.serie_folder if item.serie_folder else item.serie_id success = await self._anime_service.download( serie_folder=folder, season=item.episode.season, episode=item.episode.episode, key=item.serie_id, item_id=item.id, ) # Handle result if success: item.status = DownloadStatus.COMPLETED item.completed_at = datetime.now(timezone.utc) # Track downloaded size if item.progress and item.progress.downloaded_mb: self._total_downloaded_mb += item.progress.downloaded_mb self._completed_items.append(item) logger.info( "Download completed successfully", item_id=item.id ) else: raise AnimeServiceError("Download returned False") except asyncio.CancelledError: # Handle task cancellation during shutdown logger.info( "Download cancelled during shutdown", item_id=item.id, ) item.status = DownloadStatus.CANCELLED item.completed_at = datetime.now(timezone.utc) # Return item to pending queue if not shutting down if not self._is_shutting_down: self._add_to_pending_queue(item, front=True) raise # Re-raise to properly cancel the task except Exception as e: # Handle failure item.status = DownloadStatus.FAILED item.completed_at = datetime.now(timezone.utc) item.error = str(e) self._failed_items.append(item) logger.error( "Download failed", item_id=item.id, error=str(e), retry_count=item.retry_count, ) # Note: Failure is already broadcast by AnimeService # via ProgressService when SeriesApp fires failed event finally: # Remove from active downloads if self._active_download and self._active_download.id == item.id: self._active_download = None self._save_queue() async def start(self) -> None: """Initialize the download queue service (compatibility method). Note: Downloads are started manually via start_next_download(). """ logger.info("Download queue service initialized") async def stop(self) -> None: """Stop the download queue service and cancel active downloads. Cancels any active download and shuts down the thread pool immediately. """ logger.info("Stopping download queue service...") # Set shutdown flag self._is_shutting_down = True self._is_stopped = True # Cancel active download task if running if self._active_download_task and not self._active_download_task.done(): logger.info("Cancelling active download task...") self._active_download_task.cancel() try: await self._active_download_task except asyncio.CancelledError: logger.info("Active download task cancelled") # Save final state self._save_queue() # Shutdown executor immediately, don't wait for tasks logger.info("Shutting down thread pool executor...") self._executor.shutdown(wait=False, cancel_futures=True) logger.info("Download queue service stopped") # Singleton instance _download_service_instance: Optional[DownloadService] = None def get_download_service(anime_service: AnimeService) -> DownloadService: """Factory function for FastAPI dependency injection. Args: anime_service: AnimeService instance Returns: Singleton DownloadService instance """ global _download_service_instance if _download_service_instance is None: _download_service_instance = DownloadService(anime_service) return _download_service_instance