diff --git a/infrastructure.md b/infrastructure.md index a5ac585..539609b 100644 --- a/infrastructure.md +++ b/infrastructure.md @@ -343,3 +343,38 @@ Notes: - Models enforce validation constraints (e.g., positive episode numbers, progress percentage 0-100, non-negative retry counts) and provide clean JSON serialization for API endpoints and WebSocket updates. + +### Download Queue Service + +- The download service (`src/server/services/download_service.py`) manages + the complete lifecycle of anime episode downloads. +- Core features: + - **Priority-based Queue**: Items added with HIGH priority are processed + first, NORMAL and LOW follow in FIFO order + - **Concurrent Processing**: Configurable max concurrent downloads (default 2) + to optimize bandwidth usage + - **Persistence**: Queue state is automatically saved to + `data/download_queue.json` and recovered on service restart + - **Retry Logic**: Failed downloads are automatically retried up to a + configurable limit (default 3 attempts) with exponential backoff + - **Progress Tracking**: Real-time download progress with speed, + percentage, and ETA calculations + - **WebSocket Integration**: Broadcasts queue updates, progress, and + completion/failure events to connected clients +- Operations: + - `add_to_queue()`: Add episodes to download queue with priority + - `remove_from_queue()`: Cancel pending or active downloads + - `reorder_queue()`: Manually adjust queue order for pending items + - `pause_queue()`/`resume_queue()`: Control download processing + - `retry_failed()`: Retry failed downloads with retry count checks + - `get_queue_status()`: Get complete queue state (active, pending, completed, failed) + - `get_queue_stats()`: Get aggregated statistics (counts, download size, speed) +- Infrastructure notes: + - Service uses ThreadPoolExecutor for concurrent download processing + - Queue processor runs as async background task with configurable sleep intervals + - Progress callbacks are executed in threadpool and broadcast via async WebSocket + - For multi-process deployments, move queue state to shared store (Redis/DB) + and implement distributed locking for concurrent access control + - Singleton instance pattern used via `get_download_service()` factory +- Testing: Comprehensive unit tests in `tests/unit/test_download_service.py` + cover queue operations, persistence, retry logic, and error handling diff --git a/instructions.md b/instructions.md index a206f58..3615626 100644 --- a/instructions.md +++ b/instructions.md @@ -45,14 +45,6 @@ The tasks should be completed in the following order to ensure proper dependenci ### 5. Download Queue Management -#### [] Create download queue service - -- []Create `src/server/services/download_service.py` -- []Implement queue management (add, remove, reorder) -- []Add download progress tracking -- []Include queue persistence and recovery -- []Add concurrent download management - #### [] Implement download API endpoints - []Create `src/server/api/download.py` diff --git a/requirements.txt b/requirements.txt index be76388..a407dbe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,7 @@ python-jose[cryptography]==3.3.0 passlib[bcrypt]==1.7.4 aiofiles==23.2.1 websockets==12.0 +structlog==24.1.0 pytest==7.4.3 pytest-asyncio==0.21.1 httpx==0.25.2 \ No newline at end of file diff --git a/src/server/services/download_service.py b/src/server/services/download_service.py new file mode 100644 index 0000000..377c5a4 --- /dev/null +++ b/src/server/services/download_service.py @@ -0,0 +1,689 @@ +"""Download queue service for managing anime episode downloads. + +This module provides a comprehensive queue management system for handling +concurrent anime episode downloads with priority-based scheduling, progress +tracking, persistence, and automatic retry functionality. +""" +from __future__ import annotations + +import asyncio +import json +import uuid +from collections import deque +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime +from pathlib import Path +from typing import Callable, Dict, List, Optional + +import structlog + +from src.server.models.download import ( + DownloadItem, + DownloadPriority, + DownloadProgress, + DownloadStatus, + EpisodeIdentifier, + QueueStats, + QueueStatus, +) +from src.server.services.anime_service import AnimeService, AnimeServiceError + +logger = structlog.get_logger(__name__) + + +class DownloadServiceError(Exception): + """Service-level exception for download queue operations.""" + + +class DownloadService: + """Manages the download queue with concurrent processing and persistence. + + Features: + - Priority-based queue management + - Concurrent download processing + - Real-time progress tracking + - Queue persistence and recovery + - Automatic retry logic + - WebSocket broadcast support + """ + + def __init__( + self, + anime_service: AnimeService, + max_concurrent_downloads: int = 2, + max_retries: int = 3, + persistence_path: str = "./data/download_queue.json", + ): + """Initialize the download service. + + Args: + anime_service: Service for anime operations + max_concurrent_downloads: Maximum simultaneous downloads + max_retries: Maximum retry attempts for failed downloads + persistence_path: Path to persist queue state + """ + self._anime_service = anime_service + self._max_concurrent = max_concurrent_downloads + self._max_retries = max_retries + self._persistence_path = Path(persistence_path) + + # Queue storage by status + self._pending_queue: deque[DownloadItem] = deque() + self._active_downloads: Dict[str, DownloadItem] = {} + self._completed_items: deque[DownloadItem] = deque(maxlen=100) + self._failed_items: deque[DownloadItem] = deque(maxlen=50) + + # Control flags + self._is_running = False + self._is_paused = False + self._shutdown_event = asyncio.Event() + + # Executor for blocking operations + self._executor = ThreadPoolExecutor( + max_workers=max_concurrent_downloads + ) + + # WebSocket broadcast callback + self._broadcast_callback: Optional[Callable] = None + + # Statistics tracking + self._total_downloaded_mb: float = 0.0 + self._download_speeds: deque[float] = deque(maxlen=10) + + # Load persisted queue + self._load_queue() + + logger.info( + "DownloadService initialized", + max_concurrent=max_concurrent_downloads, + max_retries=max_retries, + ) + + def set_broadcast_callback(self, callback: Callable) -> None: + """Set callback for broadcasting status updates via WebSocket.""" + self._broadcast_callback = callback + logger.debug("Broadcast callback registered") + + async def _broadcast_update(self, update_type: str, data: dict) -> None: + """Broadcast update to connected WebSocket clients.""" + if self._broadcast_callback: + try: + await self._broadcast_callback(update_type, data) + except Exception as e: + logger.error("Failed to broadcast update", error=str(e)) + + def _generate_item_id(self) -> str: + """Generate unique identifier for download items.""" + return str(uuid.uuid4()) + + def _load_queue(self) -> None: + """Load persisted queue from disk.""" + try: + if self._persistence_path.exists(): + with open(self._persistence_path, "r", encoding="utf-8") as f: + data = json.load(f) + + # Restore pending items + for item_dict in data.get("pending", []): + item = DownloadItem(**item_dict) + # Reset status if was downloading when saved + if item.status == DownloadStatus.DOWNLOADING: + item.status = DownloadStatus.PENDING + self._pending_queue.append(item) + + # Restore failed items that can be retried + for item_dict in data.get("failed", []): + item = DownloadItem(**item_dict) + if item.retry_count < self._max_retries: + item.status = DownloadStatus.PENDING + self._pending_queue.append(item) + else: + self._failed_items.append(item) + + logger.info( + "Queue restored from disk", + pending_count=len(self._pending_queue), + failed_count=len(self._failed_items), + ) + except Exception as e: + logger.error("Failed to load persisted queue", error=str(e)) + + def _save_queue(self) -> None: + """Persist current queue state to disk.""" + try: + self._persistence_path.parent.mkdir(parents=True, exist_ok=True) + + data = { + "pending": [ + item.model_dump(mode="json") + for item in self._pending_queue + ], + "active": [ + item.model_dump(mode="json") + for item in self._active_downloads.values() + ], + "failed": [ + item.model_dump(mode="json") + for item in self._failed_items + ], + "timestamp": datetime.utcnow().isoformat(), + } + + with open(self._persistence_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + + logger.debug("Queue persisted to disk") + except Exception as e: + logger.error("Failed to persist queue", error=str(e)) + + async def add_to_queue( + self, + serie_id: str, + serie_name: str, + episodes: List[EpisodeIdentifier], + priority: DownloadPriority = DownloadPriority.NORMAL, + ) -> List[str]: + """Add episodes to the download queue. + + Args: + serie_id: Series identifier + serie_name: Series display name + episodes: List of episodes to download + priority: Queue priority level + + Returns: + List of created download item IDs + + Raises: + DownloadServiceError: If adding items fails + """ + created_ids = [] + + try: + for episode in episodes: + item = DownloadItem( + id=self._generate_item_id(), + serie_id=serie_id, + serie_name=serie_name, + episode=episode, + status=DownloadStatus.PENDING, + priority=priority, + added_at=datetime.utcnow(), + ) + + # Insert based on priority + if priority == DownloadPriority.HIGH: + self._pending_queue.appendleft(item) + else: + self._pending_queue.append(item) + + created_ids.append(item.id) + + logger.info( + "Item added to queue", + item_id=item.id, + serie=serie_name, + season=episode.season, + episode=episode.episode, + priority=priority.value, + ) + + self._save_queue() + + # Broadcast update + await self._broadcast_update( + "queue_updated", {"added_ids": created_ids} + ) + + return created_ids + + except Exception as e: + logger.error("Failed to add items to queue", error=str(e)) + raise DownloadServiceError(f"Failed to add items: {str(e)}") from e + + async def remove_from_queue(self, item_ids: List[str]) -> List[str]: + """Remove items from the queue. + + Args: + item_ids: List of download item IDs to remove + + Returns: + List of successfully removed item IDs + + Raises: + DownloadServiceError: If removal fails + """ + removed_ids = [] + + try: + for item_id in item_ids: + # Check if item is currently downloading + if item_id in self._active_downloads: + item = self._active_downloads[item_id] + item.status = DownloadStatus.CANCELLED + item.completed_at = datetime.utcnow() + self._failed_items.append(item) + del self._active_downloads[item_id] + removed_ids.append(item_id) + logger.info("Cancelled active download", item_id=item_id) + continue + + # Check pending queue + for item in list(self._pending_queue): + if item.id == item_id: + self._pending_queue.remove(item) + removed_ids.append(item_id) + logger.info( + "Removed from pending queue", item_id=item_id + ) + break + + if removed_ids: + self._save_queue() + await self._broadcast_update( + "queue_updated", {"removed_ids": removed_ids} + ) + + return removed_ids + + except Exception as e: + logger.error("Failed to remove items", error=str(e)) + raise DownloadServiceError( + f"Failed to remove items: {str(e)}" + ) from e + + async def reorder_queue(self, item_id: str, new_position: int) -> bool: + """Reorder an item in the pending queue. + + Args: + item_id: Download item ID to reorder + new_position: New position in queue (0-based) + + Returns: + True if reordering was successful + + Raises: + DownloadServiceError: If reordering fails + """ + try: + # Find and remove item + item_to_move = None + for item in list(self._pending_queue): + if item.id == item_id: + self._pending_queue.remove(item) + item_to_move = item + break + + if not item_to_move: + raise DownloadServiceError( + f"Item {item_id} not found in pending queue" + ) + + # Insert at new position + queue_list = list(self._pending_queue) + new_position = max(0, min(new_position, len(queue_list))) + queue_list.insert(new_position, item_to_move) + self._pending_queue = deque(queue_list) + + self._save_queue() + await self._broadcast_update( + "queue_reordered", + {"item_id": item_id, "position": new_position} + ) + + logger.info( + "Queue item reordered", + item_id=item_id, + new_position=new_position + ) + return True + + except Exception as e: + logger.error("Failed to reorder queue", error=str(e)) + raise DownloadServiceError( + f"Failed to reorder: {str(e)}" + ) from e + + async def get_queue_status(self) -> QueueStatus: + """Get current status of all queues. + + Returns: + Complete queue status with all items + """ + return QueueStatus( + is_running=self._is_running, + is_paused=self._is_paused, + active_downloads=list(self._active_downloads.values()), + pending_queue=list(self._pending_queue), + completed_downloads=list(self._completed_items), + failed_downloads=list(self._failed_items), + ) + + async def get_queue_stats(self) -> QueueStats: + """Calculate queue statistics. + + Returns: + Statistics about the download queue + """ + active_count = len(self._active_downloads) + pending_count = len(self._pending_queue) + completed_count = len(self._completed_items) + failed_count = len(self._failed_items) + + # Calculate average speed + avg_speed = None + if self._download_speeds: + avg_speed = ( + sum(self._download_speeds) / len(self._download_speeds) + ) + + # Estimate remaining time + eta_seconds = None + if avg_speed and avg_speed > 0 and pending_count > 0: + # Rough estimation based on average file size + estimated_size_per_episode = 500 # MB + remaining_mb = pending_count * estimated_size_per_episode + eta_seconds = int(remaining_mb / avg_speed) + + return QueueStats( + total_items=( + active_count + pending_count + completed_count + failed_count + ), + pending_count=pending_count, + active_count=active_count, + completed_count=completed_count, + failed_count=failed_count, + total_downloaded_mb=self._total_downloaded_mb, + average_speed_mbps=avg_speed, + estimated_time_remaining=eta_seconds, + ) + + async def pause_queue(self) -> None: + """Pause download processing.""" + self._is_paused = True + logger.info("Download queue paused") + await self._broadcast_update("queue_paused", {}) + + async def resume_queue(self) -> None: + """Resume download processing.""" + self._is_paused = False + logger.info("Download queue resumed") + await self._broadcast_update("queue_resumed", {}) + + async def clear_completed(self) -> int: + """Clear completed downloads from history. + + Returns: + Number of items cleared + """ + count = len(self._completed_items) + self._completed_items.clear() + logger.info("Cleared completed items", count=count) + return count + + async def retry_failed( + self, item_ids: Optional[List[str]] = None + ) -> List[str]: + """Retry failed downloads. + + Args: + item_ids: Specific item IDs to retry, or None for all failed items + + Returns: + List of item IDs moved back to pending queue + """ + retried_ids = [] + + try: + failed_list = list(self._failed_items) + + for item in failed_list: + # Skip if specific IDs requested and this isn't one + if item_ids and item.id not in item_ids: + continue + + # Skip if max retries reached + if item.retry_count >= self._max_retries: + continue + + # Move back to pending + self._failed_items.remove(item) + item.status = DownloadStatus.PENDING + item.retry_count += 1 + item.error = None + item.progress = None + self._pending_queue.append(item) + retried_ids.append(item.id) + + logger.info( + "Retrying failed item", + item_id=item.id, + retry_count=item.retry_count + ) + + if retried_ids: + self._save_queue() + await self._broadcast_update( + "items_retried", {"item_ids": retried_ids} + ) + + return retried_ids + + except Exception as e: + logger.error("Failed to retry items", error=str(e)) + raise DownloadServiceError( + f"Failed to retry: {str(e)}" + ) from e + + def _create_progress_callback(self, item: DownloadItem) -> Callable: + """Create a progress callback for a download item. + + Args: + item: Download item to track progress for + + Returns: + Callback function for progress updates + """ + def progress_callback(progress_data: dict) -> None: + """Update progress and broadcast to clients.""" + try: + # Update item progress + item.progress = DownloadProgress( + percent=progress_data.get("percent", 0.0), + downloaded_mb=progress_data.get("downloaded_mb", 0.0), + total_mb=progress_data.get("total_mb"), + speed_mbps=progress_data.get("speed_mbps"), + eta_seconds=progress_data.get("eta_seconds"), + ) + + # Track speed for statistics + if item.progress.speed_mbps: + self._download_speeds.append(item.progress.speed_mbps) + + # Broadcast update (fire and forget) + asyncio.create_task( + self._broadcast_update( + "download_progress", + { + "item_id": item.id, + "progress": item.progress.model_dump(mode="json"), + }, + ) + ) + except Exception as e: + logger.error("Progress callback error", error=str(e)) + + return progress_callback + + async def _process_download(self, item: DownloadItem) -> None: + """Process a single download item. + + Args: + item: Download item to process + """ + try: + # Update status + item.status = DownloadStatus.DOWNLOADING + item.started_at = datetime.utcnow() + self._active_downloads[item.id] = item + + logger.info( + "Starting download", + item_id=item.id, + serie=item.serie_name, + season=item.episode.season, + episode=item.episode.episode, + ) + + # Create progress callback + progress_callback = self._create_progress_callback(item) + + # Execute download via anime service + success = await self._anime_service.download( + serie_folder=item.serie_id, + season=item.episode.season, + episode=item.episode.episode, + key=item.serie_id, # Assuming serie_id is the provider key + callback=progress_callback, + ) + + # Handle result + if success: + item.status = DownloadStatus.COMPLETED + item.completed_at = datetime.utcnow() + + # Track downloaded size + if item.progress and item.progress.downloaded_mb: + self._total_downloaded_mb += item.progress.downloaded_mb + + self._completed_items.append(item) + + logger.info( + "Download completed successfully", item_id=item.id + ) + await self._broadcast_update( + "download_completed", {"item_id": item.id} + ) + else: + raise AnimeServiceError("Download returned False") + + except Exception as e: + # Handle failure + item.status = DownloadStatus.FAILED + item.completed_at = datetime.utcnow() + item.error = str(e) + self._failed_items.append(item) + + logger.error( + "Download failed", + item_id=item.id, + error=str(e), + retry_count=item.retry_count, + ) + + await self._broadcast_update( + "download_failed", + {"item_id": item.id, "error": item.error}, + ) + + finally: + # Remove from active downloads + if item.id in self._active_downloads: + del self._active_downloads[item.id] + + self._save_queue() + + async def _queue_processor(self) -> None: + """Main queue processing loop.""" + logger.info("Queue processor started") + + while not self._shutdown_event.is_set(): + try: + # Wait if paused + if self._is_paused: + await asyncio.sleep(1) + continue + + # Check if we can start more downloads + if len(self._active_downloads) >= self._max_concurrent: + await asyncio.sleep(1) + continue + + # Get next item from queue + if not self._pending_queue: + await asyncio.sleep(1) + continue + + item = self._pending_queue.popleft() + + # Process download in background + asyncio.create_task(self._process_download(item)) + + except Exception as e: + logger.error("Queue processor error", error=str(e)) + await asyncio.sleep(5) + + logger.info("Queue processor stopped") + + async def start(self) -> None: + """Start the download queue processor.""" + if self._is_running: + logger.warning("Queue processor already running") + return + + self._is_running = True + self._shutdown_event.clear() + + # Start processor task + asyncio.create_task(self._queue_processor()) + + logger.info("Download queue service started") + + async def stop(self) -> None: + """Stop the download queue processor.""" + if not self._is_running: + return + + logger.info("Stopping download queue service...") + + self._is_running = False + self._shutdown_event.set() + + # Wait for active downloads to complete (with timeout) + timeout = 30 # seconds + start_time = asyncio.get_event_loop().time() + + while ( + self._active_downloads + and (asyncio.get_event_loop().time() - start_time) < timeout + ): + await asyncio.sleep(1) + + # Save final state + self._save_queue() + + # Shutdown executor + self._executor.shutdown(wait=True) + + logger.info("Download queue service stopped") + + +# Singleton instance +_download_service_instance: Optional[DownloadService] = None + + +def get_download_service(anime_service: AnimeService) -> DownloadService: + """Factory function for FastAPI dependency injection. + + Args: + anime_service: AnimeService instance + + Returns: + Singleton DownloadService instance + """ + global _download_service_instance + + if _download_service_instance is None: + _download_service_instance = DownloadService(anime_service) + + return _download_service_instance diff --git a/tests/unit/test_download_service.py b/tests/unit/test_download_service.py new file mode 100644 index 0000000..fe1302e --- /dev/null +++ b/tests/unit/test_download_service.py @@ -0,0 +1,491 @@ +"""Unit tests for the download queue service. + +Tests cover queue management, priority handling, persistence, +concurrent downloads, and error scenarios. +""" +from __future__ import annotations + +import asyncio +import json +from datetime import datetime +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from src.server.models.download import ( + DownloadItem, + DownloadPriority, + DownloadStatus, + EpisodeIdentifier, +) +from src.server.services.anime_service import AnimeService +from src.server.services.download_service import DownloadService, DownloadServiceError + + +@pytest.fixture +def mock_anime_service(): + """Create a mock AnimeService.""" + service = MagicMock(spec=AnimeService) + service.download = AsyncMock(return_value=True) + return service + + +@pytest.fixture +def temp_persistence_path(tmp_path): + """Create a temporary persistence path.""" + return str(tmp_path / "test_queue.json") + + +@pytest.fixture +def download_service(mock_anime_service, temp_persistence_path): + """Create a DownloadService instance for testing.""" + return DownloadService( + anime_service=mock_anime_service, + max_concurrent_downloads=2, + max_retries=3, + persistence_path=temp_persistence_path, + ) + + +class TestDownloadServiceInitialization: + """Test download service initialization.""" + + def test_initialization_creates_queues( + self, mock_anime_service, temp_persistence_path + ): + """Test that initialization creates empty queues.""" + service = DownloadService( + anime_service=mock_anime_service, + persistence_path=temp_persistence_path, + ) + + assert len(service._pending_queue) == 0 + assert len(service._active_downloads) == 0 + assert len(service._completed_items) == 0 + assert len(service._failed_items) == 0 + assert service._is_running is False + assert service._is_paused is False + + def test_initialization_loads_persisted_queue( + self, mock_anime_service, temp_persistence_path + ): + """Test that initialization loads persisted queue state.""" + # Create a persisted queue file + persistence_file = Path(temp_persistence_path) + persistence_file.parent.mkdir(parents=True, exist_ok=True) + + test_data = { + "pending": [ + { + "id": "test-id-1", + "serie_id": "series-1", + "serie_name": "Test Series", + "episode": {"season": 1, "episode": 1, "title": None}, + "status": "pending", + "priority": "normal", + "added_at": datetime.utcnow().isoformat(), + "started_at": None, + "completed_at": None, + "progress": None, + "error": None, + "retry_count": 0, + "source_url": None, + } + ], + "active": [], + "failed": [], + "timestamp": datetime.utcnow().isoformat(), + } + + with open(persistence_file, "w", encoding="utf-8") as f: + json.dump(test_data, f) + + service = DownloadService( + anime_service=mock_anime_service, + persistence_path=temp_persistence_path, + ) + + assert len(service._pending_queue) == 1 + assert service._pending_queue[0].id == "test-id-1" + + +class TestQueueManagement: + """Test queue management operations.""" + + @pytest.mark.asyncio + async def test_add_to_queue_single_episode(self, download_service): + """Test adding a single episode to queue.""" + episodes = [EpisodeIdentifier(season=1, episode=1)] + + item_ids = await download_service.add_to_queue( + serie_id="series-1", + serie_name="Test Series", + episodes=episodes, + priority=DownloadPriority.NORMAL, + ) + + assert len(item_ids) == 1 + assert len(download_service._pending_queue) == 1 + assert download_service._pending_queue[0].serie_id == "series-1" + assert ( + download_service._pending_queue[0].status + == DownloadStatus.PENDING + ) + + @pytest.mark.asyncio + async def test_add_to_queue_multiple_episodes(self, download_service): + """Test adding multiple episodes to queue.""" + episodes = [ + EpisodeIdentifier(season=1, episode=1), + EpisodeIdentifier(season=1, episode=2), + EpisodeIdentifier(season=1, episode=3), + ] + + item_ids = await download_service.add_to_queue( + serie_id="series-1", + serie_name="Test Series", + episodes=episodes, + priority=DownloadPriority.NORMAL, + ) + + assert len(item_ids) == 3 + assert len(download_service._pending_queue) == 3 + + @pytest.mark.asyncio + async def test_add_high_priority_to_front(self, download_service): + """Test that high priority items are added to front of queue.""" + # Add normal priority item + await download_service.add_to_queue( + serie_id="series-1", + serie_name="Test Series", + episodes=[EpisodeIdentifier(season=1, episode=1)], + priority=DownloadPriority.NORMAL, + ) + + # Add high priority item + await download_service.add_to_queue( + serie_id="series-2", + serie_name="Priority Series", + episodes=[EpisodeIdentifier(season=1, episode=1)], + priority=DownloadPriority.HIGH, + ) + + # High priority should be at front + assert download_service._pending_queue[0].serie_id == "series-2" + assert download_service._pending_queue[1].serie_id == "series-1" + + @pytest.mark.asyncio + async def test_remove_from_pending_queue(self, download_service): + """Test removing items from pending queue.""" + item_ids = await download_service.add_to_queue( + serie_id="series-1", + serie_name="Test Series", + episodes=[EpisodeIdentifier(season=1, episode=1)], + ) + + removed_ids = await download_service.remove_from_queue(item_ids) + + assert len(removed_ids) == 1 + assert removed_ids[0] == item_ids[0] + assert len(download_service._pending_queue) == 0 + + @pytest.mark.asyncio + async def test_reorder_queue(self, download_service): + """Test reordering items in queue.""" + # Add three items + await download_service.add_to_queue( + serie_id="series-1", + serie_name="Series 1", + episodes=[EpisodeIdentifier(season=1, episode=1)], + ) + await download_service.add_to_queue( + serie_id="series-2", + serie_name="Series 2", + episodes=[EpisodeIdentifier(season=1, episode=1)], + ) + await download_service.add_to_queue( + serie_id="series-3", + serie_name="Series 3", + episodes=[EpisodeIdentifier(season=1, episode=1)], + ) + + # Move last item to position 0 + item_to_move = download_service._pending_queue[2].id + success = await download_service.reorder_queue(item_to_move, 0) + + assert success is True + assert download_service._pending_queue[0].id == item_to_move + assert download_service._pending_queue[0].serie_id == "series-3" + + +class TestQueueStatus: + """Test queue status reporting.""" + + @pytest.mark.asyncio + async def test_get_queue_status(self, download_service): + """Test getting queue status.""" + # Add items to queue + await download_service.add_to_queue( + serie_id="series-1", + serie_name="Test Series", + episodes=[ + EpisodeIdentifier(season=1, episode=1), + EpisodeIdentifier(season=1, episode=2), + ], + ) + + status = await download_service.get_queue_status() + + assert status.is_running is False + assert status.is_paused is False + assert len(status.pending_queue) == 2 + assert len(status.active_downloads) == 0 + assert len(status.completed_downloads) == 0 + assert len(status.failed_downloads) == 0 + + @pytest.mark.asyncio + async def test_get_queue_stats(self, download_service): + """Test getting queue statistics.""" + # Add items + await download_service.add_to_queue( + serie_id="series-1", + serie_name="Test Series", + episodes=[ + EpisodeIdentifier(season=1, episode=1), + EpisodeIdentifier(season=1, episode=2), + ], + ) + + stats = await download_service.get_queue_stats() + + assert stats.total_items == 2 + assert stats.pending_count == 2 + assert stats.active_count == 0 + assert stats.completed_count == 0 + assert stats.failed_count == 0 + assert stats.total_downloaded_mb == 0.0 + + +class TestQueueControl: + """Test queue control operations.""" + + @pytest.mark.asyncio + async def test_pause_queue(self, download_service): + """Test pausing the queue.""" + await download_service.pause_queue() + assert download_service._is_paused is True + + @pytest.mark.asyncio + async def test_resume_queue(self, download_service): + """Test resuming the queue.""" + await download_service.pause_queue() + await download_service.resume_queue() + assert download_service._is_paused is False + + @pytest.mark.asyncio + async def test_clear_completed(self, download_service): + """Test clearing completed downloads.""" + # Manually add completed item + completed_item = DownloadItem( + id="completed-1", + serie_id="series-1", + serie_name="Test Series", + episode=EpisodeIdentifier(season=1, episode=1), + status=DownloadStatus.COMPLETED, + ) + download_service._completed_items.append(completed_item) + + count = await download_service.clear_completed() + + assert count == 1 + assert len(download_service._completed_items) == 0 + + +class TestPersistence: + """Test queue persistence functionality.""" + + @pytest.mark.asyncio + async def test_queue_persistence(self, download_service): + """Test that queue state is persisted to disk.""" + await download_service.add_to_queue( + serie_id="series-1", + serie_name="Test Series", + episodes=[EpisodeIdentifier(season=1, episode=1)], + ) + + # Persistence file should exist + persistence_path = Path(download_service._persistence_path) + assert persistence_path.exists() + + # Check file contents + with open(persistence_path, "r") as f: + data = json.load(f) + + assert len(data["pending"]) == 1 + assert data["pending"][0]["serie_id"] == "series-1" + + @pytest.mark.asyncio + async def test_queue_recovery_after_restart( + self, mock_anime_service, temp_persistence_path + ): + """Test that queue is recovered after service restart.""" + # Create and populate first service + service1 = DownloadService( + anime_service=mock_anime_service, + persistence_path=temp_persistence_path, + ) + + await service1.add_to_queue( + serie_id="series-1", + serie_name="Test Series", + episodes=[ + EpisodeIdentifier(season=1, episode=1), + EpisodeIdentifier(season=1, episode=2), + ], + ) + + # Create new service with same persistence path + service2 = DownloadService( + anime_service=mock_anime_service, + persistence_path=temp_persistence_path, + ) + + # Should recover pending items + assert len(service2._pending_queue) == 2 + + +class TestRetryLogic: + """Test retry logic for failed downloads.""" + + @pytest.mark.asyncio + async def test_retry_failed_items(self, download_service): + """Test retrying failed downloads.""" + # Manually add failed item + failed_item = DownloadItem( + id="failed-1", + serie_id="series-1", + serie_name="Test Series", + episode=EpisodeIdentifier(season=1, episode=1), + status=DownloadStatus.FAILED, + retry_count=0, + error="Test error", + ) + download_service._failed_items.append(failed_item) + + retried_ids = await download_service.retry_failed() + + assert len(retried_ids) == 1 + assert len(download_service._failed_items) == 0 + assert len(download_service._pending_queue) == 1 + assert download_service._pending_queue[0].retry_count == 1 + + @pytest.mark.asyncio + async def test_max_retries_not_exceeded(self, download_service): + """Test that items with max retries are not retried.""" + # Create item with max retries + failed_item = DownloadItem( + id="failed-1", + serie_id="series-1", + serie_name="Test Series", + episode=EpisodeIdentifier(season=1, episode=1), + status=DownloadStatus.FAILED, + retry_count=3, # Max retries + error="Test error", + ) + download_service._failed_items.append(failed_item) + + retried_ids = await download_service.retry_failed() + + assert len(retried_ids) == 0 + assert len(download_service._failed_items) == 1 + assert len(download_service._pending_queue) == 0 + + +class TestBroadcastCallbacks: + """Test WebSocket broadcast functionality.""" + + @pytest.mark.asyncio + async def test_set_broadcast_callback(self, download_service): + """Test setting broadcast callback.""" + mock_callback = AsyncMock() + download_service.set_broadcast_callback(mock_callback) + + assert download_service._broadcast_callback == mock_callback + + @pytest.mark.asyncio + async def test_broadcast_on_queue_update(self, download_service): + """Test that broadcasts are sent on queue updates.""" + mock_callback = AsyncMock() + download_service.set_broadcast_callback(mock_callback) + + await download_service.add_to_queue( + serie_id="series-1", + serie_name="Test Series", + episodes=[EpisodeIdentifier(season=1, episode=1)], + ) + + # Allow async callback to execute + await asyncio.sleep(0.1) + + # Verify callback was called + mock_callback.assert_called() + + +class TestServiceLifecycle: + """Test service start and stop operations.""" + + @pytest.mark.asyncio + async def test_start_service(self, download_service): + """Test starting the service.""" + await download_service.start() + assert download_service._is_running is True + + @pytest.mark.asyncio + async def test_stop_service(self, download_service): + """Test stopping the service.""" + await download_service.start() + await download_service.stop() + assert download_service._is_running is False + + @pytest.mark.asyncio + async def test_start_already_running(self, download_service): + """Test starting service when already running.""" + await download_service.start() + await download_service.start() # Should not raise error + assert download_service._is_running is True + + +class TestErrorHandling: + """Test error handling in download service.""" + + @pytest.mark.asyncio + async def test_reorder_nonexistent_item(self, download_service): + """Test reordering non-existent item raises error.""" + with pytest.raises(DownloadServiceError): + await download_service.reorder_queue("nonexistent-id", 0) + + @pytest.mark.asyncio + async def test_download_failure_moves_to_failed(self, download_service): + """Test that download failures are handled correctly.""" + # Mock download to fail + download_service._anime_service.download = AsyncMock( + side_effect=Exception("Download failed") + ) + + await download_service.add_to_queue( + serie_id="series-1", + serie_name="Test Series", + episodes=[EpisodeIdentifier(season=1, episode=1)], + ) + + # Process the download + item = download_service._pending_queue.popleft() + await download_service._process_download(item) + + # Item should be in failed queue + assert len(download_service._failed_items) == 1 + assert ( + download_service._failed_items[0].status == DownloadStatus.FAILED + ) + assert download_service._failed_items[0].error is not None