"""Download queue service for managing anime episode downloads. This module provides a simplified queue management system for handling anime episode downloads with manual start/stop controls, progress tracking, database persistence, and retry functionality. The service uses SQLite database for persistent storage via QueueRepository while maintaining an in-memory cache for performance. """ from __future__ import annotations import asyncio import uuid from collections import deque from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone from typing import TYPE_CHECKING, Dict, List, Optional import structlog from src.server.models.download import ( DownloadItem, DownloadPriority, DownloadStatus, EpisodeIdentifier, QueueStats, QueueStatus, ) from src.server.services.anime_service import AnimeService, AnimeServiceError from src.server.services.progress_service import ProgressService, get_progress_service if TYPE_CHECKING: from src.server.services.queue_repository import QueueRepository logger = structlog.get_logger(__name__) class DownloadServiceError(Exception): """Service-level exception for download queue operations.""" class DownloadService: """Manages the download queue with manual start/stop controls. Features: - Manual download start/stop - FIFO queue processing - Real-time progress tracking - Database persistence via QueueRepository - Automatic retry logic - WebSocket broadcast support """ def __init__( self, anime_service: AnimeService, queue_repository: Optional["QueueRepository"] = None, max_retries: int = 3, progress_service: Optional[ProgressService] = None, ): """Initialize the download service. Args: anime_service: Service for anime operations queue_repository: Optional repository for database persistence. If not provided, will use default singleton. max_retries: Maximum retry attempts for failed downloads progress_service: Optional progress service for tracking """ self._anime_service = anime_service self._max_retries = max_retries self._progress_service = progress_service or get_progress_service() # Database repository for persistence self._queue_repository = queue_repository self._db_initialized = False # In-memory cache for performance (synced with database) self._pending_queue: deque[DownloadItem] = deque() # Helper dict for O(1) lookup of pending items by ID self._pending_items_by_id: Dict[str, DownloadItem] = {} self._active_download: Optional[DownloadItem] = None self._completed_items: deque[DownloadItem] = deque(maxlen=100) self._failed_items: deque[DownloadItem] = deque(maxlen=50) # Control flags self._is_stopped = True # Queue processing is stopped by default self._is_shutting_down = False # Flag to indicate shutdown # Executor for blocking operations self._executor = ThreadPoolExecutor(max_workers=1) # Track active download task for cancellation self._active_download_task: Optional[asyncio.Task] = None # Statistics tracking self._total_downloaded_mb: float = 0.0 self._download_speeds: deque[float] = deque(maxlen=10) # Track if queue progress has been initialized self._queue_progress_initialized: bool = False logger.info( "DownloadService initialized", max_retries=max_retries, ) def _get_repository(self) -> "QueueRepository": """Get the queue repository, initializing if needed. Returns: QueueRepository instance """ if self._queue_repository is None: from src.server.services.queue_repository import get_queue_repository self._queue_repository = get_queue_repository() return self._queue_repository async def initialize(self) -> None: """Initialize the service by loading queue state from database. Should be called after database is initialized during app startup. Note: With the simplified model, status/priority/progress are now managed in-memory only. The database stores the queue items for persistence across restarts. """ if self._db_initialized: return try: repository = self._get_repository() # Load all items from database - they all start as PENDING # since status is now managed in-memory only all_items = await repository.get_all_items() for item in all_items: # All items from database are treated as pending item.status = DownloadStatus.PENDING self._add_to_pending_queue(item) self._db_initialized = True logger.info( "Queue restored from database: pending_count=%d", len(self._pending_queue), ) except Exception as e: logger.error("Failed to load queue from database: %s", e, exc_info=True) # Continue without persistence - queue will work in memory only self._db_initialized = True async def _save_to_database(self, item: DownloadItem) -> DownloadItem: """Save or update an item in the database. Args: item: Download item to save Returns: Saved item with database ID """ try: repository = self._get_repository() return await repository.save_item(item) except Exception as e: logger.error("Failed to save item to database: %s", e) return item async def _set_error_in_database( self, item_id: str, error: str, ) -> bool: """Set error message on an item in the database. Args: item_id: Download item ID error: Error message Returns: True if update succeeded """ try: repository = self._get_repository() return await repository.set_error(item_id, error) except Exception as e: logger.error("Failed to set error in database: %s", e) return False async def _delete_from_database(self, item_id: str) -> bool: """Delete an item from the database. Args: item_id: Download item ID Returns: True if delete succeeded """ try: repository = self._get_repository() return await repository.delete_item(item_id) except Exception as e: logger.error("Failed to delete from database: %s", e) return False async def _remove_episode_from_missing_list( self, series_key: str, season: int, episode: int, ) -> bool: """Remove a downloaded episode from the missing episodes list. Called when a download completes successfully to update the database so the episode no longer appears as missing. Args: series_key: Unique provider key for the series season: Season number episode: Episode number within season Returns: True if episode was removed, False otherwise """ try: from src.server.database.connection import get_db_session from src.server.database.service import EpisodeService async with get_db_session() as db: deleted = await EpisodeService.delete_by_series_and_episode( db=db, series_key=series_key, season=season, episode_number=episode, ) if deleted: logger.info( "Removed episode from missing list: " "%s S%02dE%02d", series_key, season, episode, ) # Clear the anime service cache so list_missing # returns updated data try: self._anime_service._cached_list_missing.cache_clear() except Exception: pass return deleted except Exception as e: logger.error( "Failed to remove episode from missing list: %s", e ) return False async def _init_queue_progress(self) -> None: """Initialize the download queue progress tracking. This is called lazily when queue processing starts to ensure the event loop is running and the coroutine can be properly awaited. """ if self._queue_progress_initialized: return try: from src.server.services.progress_service import ProgressType await self._progress_service.start_progress( progress_id="download_queue", progress_type=ProgressType.QUEUE, title="Download Queue", message="Queue ready", ) self._queue_progress_initialized = True except Exception as e: logger.error("Failed to initialize queue progress: %s", e) def _add_to_pending_queue( self, item: DownloadItem, front: bool = False ) -> None: """Add item to pending queue and update helper dict. Args: item: Download item to add front: If True, add to front of queue (higher priority) """ if front: self._pending_queue.appendleft(item) else: self._pending_queue.append(item) self._pending_items_by_id[item.id] = item def _remove_from_pending_queue(self, item_or_id: str) -> Optional[DownloadItem]: # noqa: E501 """Remove item from pending queue and update helper dict. Args: item_or_id: Item ID to remove Returns: Removed item or None if not found """ if isinstance(item_or_id, str): item = self._pending_items_by_id.get(item_or_id) if not item: return None item_id = item_or_id else: item = item_or_id item_id = item.id try: self._pending_queue.remove(item) del self._pending_items_by_id[item_id] return item except (ValueError, KeyError): return None def _generate_item_id(self) -> str: """Generate unique identifier for download items.""" return str(uuid.uuid4()) async def add_to_queue( self, serie_id: str, serie_folder: str, serie_name: str, episodes: List[EpisodeIdentifier], priority: DownloadPriority = DownloadPriority.NORMAL, ) -> List[str]: """Add episodes to the download queue (FIFO order). Args: serie_id: Series identifier - provider key (e.g., 'attack-on-titan'). This is the unique identifier used for lookups and identification. serie_folder: Series folder name on disk (e.g., 'Attack on Titan (2013)'). Used for filesystem operations only. serie_name: Series display name for user interface episodes: List of episodes to download priority: Queue priority level (ignored, kept for compatibility) Returns: List of created download item IDs Raises: DownloadServiceError: If adding items fails """ # Initialize queue progress tracking if not already done await self._init_queue_progress() created_ids = [] try: for episode in episodes: item = DownloadItem( id=self._generate_item_id(), serie_id=serie_id, serie_folder=serie_folder, serie_name=serie_name, episode=episode, status=DownloadStatus.PENDING, priority=priority, added_at=datetime.now(timezone.utc), ) # Save to database first to get persistent ID saved_item = await self._save_to_database(item) # Add to in-memory cache self._add_to_pending_queue(saved_item, front=False) created_ids.append(saved_item.id) logger.info( "Item added to queue", item_id=saved_item.id, serie_key=serie_id, serie_name=serie_name, season=episode.season, episode=episode.episode, ) # Notify via progress service queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message=f"Added {len(created_ids)} items to queue", metadata={ "action": "items_added", "added_ids": created_ids, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) return created_ids except Exception as e: logger.error("Failed to add items to queue: %s", e) raise DownloadServiceError(f"Failed to add items: {str(e)}") from e async def remove_from_queue(self, item_ids: List[str]) -> List[str]: """Remove items from the queue. Args: item_ids: List of download item IDs to remove Returns: List of successfully removed item IDs Raises: DownloadServiceError: If removal fails """ removed_ids = [] try: for item_id in item_ids: # Check if item is currently downloading active = self._active_download if active and active.id == item_id: item = active item.status = DownloadStatus.CANCELLED item.completed_at = datetime.now(timezone.utc) self._failed_items.append(item) self._active_download = None # Delete cancelled item from database await self._delete_from_database(item_id) removed_ids.append(item_id) logger.info("Cancelled active download: item_id=%s", item_id) continue # Check pending queue - O(1) lookup using helper dict if item_id in self._pending_items_by_id: item = self._pending_items_by_id[item_id] self._pending_queue.remove(item) del self._pending_items_by_id[item_id] # Delete from database await self._delete_from_database(item_id) removed_ids.append(item_id) logger.info( "Removed from pending queue", item_id=item_id ) if removed_ids: # Notify via progress service queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message=f"Removed {len(removed_ids)} items from queue", metadata={ "action": "items_removed", "removed_ids": removed_ids, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) return removed_ids except Exception as e: logger.error("Failed to remove items: %s", e) raise DownloadServiceError( f"Failed to remove items: {str(e)}" ) from e async def reorder_queue(self, item_ids: List[str]) -> None: """Reorder pending queue items. Args: item_ids: List of item IDs in desired order. Items not in this list remain at end of queue. Raises: DownloadServiceError: If reordering fails Note: Reordering is done in-memory only. Database priority is not updated since the in-memory queue defines the actual order. """ try: # Build new queue based on specified order new_queue = deque() remaining_items = list(self._pending_queue) # Add items in specified order for item_id in item_ids: if item_id in self._pending_items_by_id: item = self._pending_items_by_id[item_id] new_queue.append(item) remaining_items.remove(item) # Add remaining items that weren't in the reorder list for item in remaining_items: new_queue.append(item) # Replace queue self._pending_queue = new_queue # Notify via progress service queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message=f"Queue reordered with {len(item_ids)} items", metadata={ "action": "queue_reordered", "reordered_count": len(item_ids), "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) logger.info("Queue reordered", reordered_count=len(item_ids)) except Exception as e: logger.error("Failed to reorder queue: %s", e) raise DownloadServiceError( f"Failed to reorder queue: {str(e)}" ) from e async def start_queue_processing(self) -> Optional[str]: """Start automatic queue processing of all pending downloads. This will process all pending downloads one by one until the queue is empty or stopped. The processing continues even if the browser is closed. Returns: Item ID of first started download, or None if queue is empty Raises: DownloadServiceError: If queue processing is already active """ try: # Initialize queue progress tracking if not already done await self._init_queue_progress() # Check if download already active if self._active_download: raise DownloadServiceError( "Queue processing is already active" ) # Check if queue is empty if not self._pending_queue: logger.info("No pending downloads to start") return None # Mark queue as running self._is_stopped = False # Start queue processing in background asyncio.create_task(self._process_queue()) logger.info("Queue processing started") return "queue_started" except Exception as e: logger.error("Failed to start queue processing: %s", e) raise DownloadServiceError( f"Failed to start queue processing: {str(e)}" ) from e async def _process_queue(self) -> None: """Process all items in the queue sequentially. This runs continuously until the queue is empty or stopped. Each download is processed one at a time, and the next one starts automatically after the previous one completes. """ logger.info("Queue processor started") while not self._is_stopped and len(self._pending_queue) > 0: try: # Get next item from queue item = self._pending_queue.popleft() del self._pending_items_by_id[item.id] logger.info( "Processing next item from queue", item_id=item.id, serie=item.serie_name, remaining=len(self._pending_queue) ) # Notify via progress service queue_status = await self.get_queue_status() msg = ( f"Started: {item.serie_name} " f"S{item.episode.season:02d}E{item.episode.episode:02d}" ) await self._progress_service.update_progress( progress_id="download_queue", message=msg, metadata={ "action": "download_started", "item_id": item.id, "serie_name": item.serie_name, "season": item.episode.season, "episode": item.episode.episode, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) # Process the download (this will wait until complete) self._active_download_task = asyncio.create_task( self._process_download(item) ) await self._active_download_task self._active_download_task = None # Small delay between downloads await asyncio.sleep(1) except Exception as e: logger.error( "Error in queue processing loop", error=str(e), exc_info=True ) # Continue with next item even if one fails await asyncio.sleep(2) # Queue processing completed self._is_stopped = True if len(self._pending_queue) == 0: logger.info("Queue processing completed - all items processed") queue_status = await self.get_queue_status() await self._progress_service.complete_progress( progress_id="download_queue", message="All downloads completed", metadata={ "queue_status": queue_status.model_dump(mode="json") }, ) else: logger.info("Queue processing stopped by user") async def start_next_download(self) -> Optional[str]: """Legacy method - redirects to start_queue_processing. Returns: Item ID of started download, or None if queue is empty Raises: DownloadServiceError: If a download is already active """ return await self.start_queue_processing() async def stop_downloads(self) -> None: """Stop processing new downloads from queue. Current download will continue, but no new downloads will start. """ self._is_stopped = True logger.info("Download processing stopped") # Notify via progress service queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message="Queue processing stopped", metadata={ "action": "queue_stopped", "is_stopped": True, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) async def get_queue_status(self) -> QueueStatus: """Get current status of all queues. Returns: Complete queue status with all items """ active_downloads = ( [self._active_download] if self._active_download else [] ) return QueueStatus( is_running=not self._is_stopped, is_paused=False, # Kept for compatibility active_downloads=active_downloads, pending_queue=list(self._pending_queue), completed_downloads=list(self._completed_items), failed_downloads=list(self._failed_items), ) async def get_queue_stats(self) -> QueueStats: """Calculate queue statistics. Returns: Statistics about the download queue """ active_count = 1 if self._active_download else 0 pending_count = len(self._pending_queue) completed_count = len(self._completed_items) failed_count = len(self._failed_items) # Calculate average speed avg_speed = None if self._download_speeds: avg_speed = ( sum(self._download_speeds) / len(self._download_speeds) ) # Estimate remaining time eta_seconds = None if avg_speed and avg_speed > 0 and pending_count > 0: # Rough estimation based on average file size estimated_size_per_episode = 500 # MB remaining_mb = pending_count * estimated_size_per_episode eta_seconds = int(remaining_mb / avg_speed) return QueueStats( total_items=( active_count + pending_count + completed_count + failed_count ), pending_count=pending_count, active_count=active_count, completed_count=completed_count, failed_count=failed_count, total_downloaded_mb=self._total_downloaded_mb, average_speed_mbps=avg_speed, estimated_time_remaining=eta_seconds, ) async def clear_completed(self) -> int: """Clear completed downloads from history. Returns: Number of items cleared """ count = len(self._completed_items) self._completed_items.clear() logger.info("Cleared completed items", count=count) # Notify via progress service if count > 0: queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message=f"Cleared {count} completed items", metadata={ "action": "completed_cleared", "cleared_count": count, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) return count async def clear_failed(self) -> int: """Clear failed downloads from history. Returns: Number of items cleared """ count = len(self._failed_items) self._failed_items.clear() logger.info("Cleared failed items", count=count) # Notify via progress service if count > 0: queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message=f"Cleared {count} failed items", metadata={ "action": "failed_cleared", "cleared_count": count, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) return count async def clear_pending(self) -> int: """Clear all pending downloads from the queue. Returns: Number of items cleared """ count = len(self._pending_queue) # Delete all pending items from database for item_id in list(self._pending_items_by_id.keys()): await self._delete_from_database(item_id) self._pending_queue.clear() self._pending_items_by_id.clear() logger.info("Cleared pending items", count=count) # Notify via progress service if count > 0: queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message=f"Cleared {count} pending items", metadata={ "action": "pending_cleared", "cleared_count": count, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) return count async def retry_failed( self, item_ids: Optional[List[str]] = None ) -> List[str]: """Retry failed downloads. Args: item_ids: Specific item IDs to retry, or None for all failed items Returns: List of item IDs moved back to pending queue """ retried_ids = [] try: failed_list = list(self._failed_items) for item in failed_list: # Skip if specific IDs requested and this isn't one if item_ids and item.id not in item_ids: continue # Skip if max retries reached if item.retry_count >= self._max_retries: continue # Move back to pending self._failed_items.remove(item) item.status = DownloadStatus.PENDING item.retry_count += 1 item.error = None item.progress = None self._add_to_pending_queue(item) retried_ids.append(item.id) # Status is now managed in-memory only logger.info( "Retrying failed item: item_id=%s, retry_count=%d", item.id, item.retry_count, ) if retried_ids: # Notify via progress service queue_status = await self.get_queue_status() await self._progress_service.update_progress( progress_id="download_queue", message=f"Retried {len(retried_ids)} failed items", metadata={ "action": "items_retried", "retried_ids": retried_ids, "queue_status": queue_status.model_dump(mode="json"), }, force_broadcast=True, ) return retried_ids except Exception as e: logger.error("Failed to retry items: %s", e) raise DownloadServiceError( f"Failed to retry: {str(e)}" ) from e async def _process_download(self, item: DownloadItem) -> None: """Process a single download item. Args: item: Download item to process """ try: # Check if shutting down if self._is_shutting_down: logger.info("Skipping download due to shutdown") return # Update status in memory (status is now in-memory only) item.status = DownloadStatus.DOWNLOADING item.started_at = datetime.now(timezone.utc) self._active_download = item logger.info( "Starting download: item_id=%s, serie_key=%s, S%02dE%02d", item.id, item.serie_id, item.episode.season, item.episode.episode, ) # Execute download via anime service # AnimeService handles ALL progress via SeriesApp events: # - download started/progress/completed/failed events # - All updates forwarded to ProgressService # - ProgressService broadcasts to WebSocket clients # Use serie_folder for filesystem operations # and serie_id (key) for identification if not item.serie_folder: raise DownloadServiceError( f"Missing serie_folder for download item {item.id}. " "serie_folder is required for filesystem operations." ) success = await self._anime_service.download( serie_folder=item.serie_folder, season=item.episode.season, episode=item.episode.episode, key=item.serie_id, item_id=item.id, ) # Handle result if success: item.status = DownloadStatus.COMPLETED item.completed_at = datetime.now(timezone.utc) # Track downloaded size if item.progress and item.progress.downloaded_mb: self._total_downloaded_mb += item.progress.downloaded_mb self._completed_items.append(item) # Delete completed item from database (status is in-memory) await self._delete_from_database(item.id) # Remove episode from missing episodes list in database await self._remove_episode_from_missing_list( series_key=item.serie_id, season=item.episode.season, episode=item.episode.episode, ) logger.info( "Download completed successfully: item_id=%s", item.id ) else: raise AnimeServiceError("Download returned False") except asyncio.CancelledError: # Handle task cancellation during shutdown logger.info( "Download cancelled during shutdown: item_id=%s", item.id, ) item.status = DownloadStatus.CANCELLED item.completed_at = datetime.now(timezone.utc) # Delete cancelled item from database await self._delete_from_database(item.id) # Return item to pending queue if not shutting down if not self._is_shutting_down: self._add_to_pending_queue(item, front=True) # Re-save to database as pending await self._save_to_database(item) raise # Re-raise to properly cancel the task except Exception as e: # Handle failure item.status = DownloadStatus.FAILED item.completed_at = datetime.now(timezone.utc) item.error = str(e) self._failed_items.append(item) # Set error in database await self._set_error_in_database(item.id, str(e)) logger.error( "Download failed: item_id=%s, error=%s, retry_count=%d", item.id, str(e), item.retry_count, ) # Note: Failure is already broadcast by AnimeService # via ProgressService when SeriesApp fires failed event finally: # Remove from active downloads if self._active_download and self._active_download.id == item.id: self._active_download = None async def start(self) -> None: """Initialize the download queue service (compatibility method). Note: Downloads are started manually via start_next_download(). """ logger.info("Download queue service initialized") async def stop(self) -> None: """Stop the download queue service and cancel active downloads. Cancels any active download and shuts down the thread pool immediately. """ logger.info("Stopping download queue service...") # Set shutdown flag self._is_shutting_down = True self._is_stopped = True # Cancel active download task if running active_task = self._active_download_task if active_task and not active_task.done(): logger.info("Cancelling active download task...") active_task.cancel() try: await active_task except asyncio.CancelledError: logger.info("Active download task cancelled") # Shutdown executor immediately, don't wait for tasks logger.info("Shutting down thread pool executor...") self._executor.shutdown(wait=False, cancel_futures=True) logger.info("Download queue service stopped") # Singleton instance _download_service_instance: Optional[DownloadService] = None def get_download_service(anime_service: AnimeService) -> DownloadService: """Factory function for FastAPI dependency injection. Args: anime_service: AnimeService instance Returns: Singleton DownloadService instance """ global _download_service_instance if _download_service_instance is None: _download_service_instance = DownloadService(anime_service) return _download_service_instance