"""Unit tests for download queue operations and logic. Tests queue management logic including FIFO ordering, single download enforcement, queue statistics, reordering, and concurrent modification handling. """ import asyncio from datetime import datetime, timezone from typing import List from unittest.mock import AsyncMock, Mock, patch import pytest from src.server.models.download import ( DownloadItem, DownloadPriority, DownloadStatus, EpisodeIdentifier, ) from src.server.services.download_service import DownloadService, DownloadServiceError @pytest.fixture def mock_anime_service(): """Create mock anime service.""" service = AsyncMock() service.get_missing_episodes = AsyncMock(return_value=[]) return service @pytest.fixture def mock_queue_repository(): """Create mock queue repository.""" repo = Mock() repo.get_all = AsyncMock(return_value=[]) repo.save = AsyncMock(return_value=None) repo.update = AsyncMock(return_value=None) repo.delete = AsyncMock(return_value=True) repo.delete_batch = AsyncMock(return_value=None) return repo @pytest.fixture def mock_progress_service(): """Create mock progress service.""" service = Mock() service.start_download = AsyncMock() service.update_download = AsyncMock() service.complete_download = AsyncMock() service.fail_download = AsyncMock() service.update_queue = AsyncMock() return service @pytest.fixture async def download_service(mock_anime_service, mock_queue_repository, mock_progress_service): """Create download service with mocked dependencies.""" with patch('src.server.services.download_service.get_progress_service', return_value=mock_progress_service): service = DownloadService( anime_service=mock_anime_service, queue_repository=mock_queue_repository ) await service.initialize() yield service class TestFIFOQueueOrdering: """Tests for FIFO queue ordering validation.""" @pytest.mark.asyncio async def test_items_processed_in_fifo_order(self, download_service): """Test that queue items are processed in first-in-first-out order.""" # Add items to queue episodes = [ EpisodeIdentifier(serie_key="serie1", season=1, episode=i) for i in range(1, 6) ] for i, ep in enumerate(episodes): await download_service.add_to_queue( episodes=[ep], serie_name=f"Series {i+1}", priority=DownloadPriority.NORMAL ) # Get queue status status = await download_service.get_queue_status() # Verify FIFO order (first added should be first in queue) assert len(status.pending) == 5 for i, item in enumerate(status.pending): assert item.episode.episode == i + 1 @pytest.mark.asyncio async def test_high_priority_items_go_to_front(self, download_service): """Test that high priority items are placed at the front of the queue.""" # Add normal priority items for i in range(1, 4): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) # Add high priority item await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=99)], serie_name="Series 1", priority=DownloadPriority.HIGH ) status = await download_service.get_queue_status() # High priority item should be first assert status.pending[0].episode.episode == 99 assert status.pending[0].priority == DownloadPriority.HIGH # Normal items follow in original order assert status.pending[1].episode.episode == 1 assert status.pending[2].episode.episode == 2 assert status.pending[3].episode.episode == 3 @pytest.mark.asyncio async def test_fifo_maintained_after_removal(self, download_service): """Test that FIFO order is maintained after removing items.""" # Add items for i in range(1, 6): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) status = await download_service.get_queue_status() middle_item_id = status.pending[2].id # Episode 3 # Remove middle item await download_service.remove_from_queue([middle_item_id]) # Verify order maintained status = await download_service.get_queue_status() assert len(status.pending) == 4 assert status.pending[0].episode.episode == 1 assert status.pending[1].episode.episode == 2 assert status.pending[2].episode.episode == 4 # Episode 3 removed assert status.pending[3].episode.episode == 5 @pytest.mark.asyncio async def test_reordering_changes_processing_order(self, download_service): """Test that reordering changes the processing order.""" # Add items for i in range(1, 5): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) status = await download_service.get_queue_status() item_ids = [item.id for item in status.pending] # Reverse order reversed_ids = list(reversed(item_ids)) await download_service.reorder_queue(reversed_ids) # Verify new order status = await download_service.get_queue_status() assert status.pending[0].episode.episode == 4 assert status.pending[1].episode.episode == 3 assert status.pending[2].episode.episode == 2 assert status.pending[3].episode.episode == 1 class TestSingleDownloadEnforcement: """Tests for single download mode enforcement.""" @pytest.mark.asyncio async def test_only_one_download_active_at_time(self, download_service): """Test that only one download can be active at a time.""" # Add multiple items for i in range(1, 4): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) # Start processing (but don't actually download) with patch.object(download_service, '_process_download', new_callable=AsyncMock): await download_service.start_queue_processing() # Small delay to let processing start await asyncio.sleep(0.1) status = await download_service.get_queue_status() # Should have exactly 1 active download (or 0 if completed quickly) active_count = len([item for item in status.active if item.status == DownloadStatus.DOWNLOADING]) assert active_count <= 1 @pytest.mark.asyncio async def test_starting_queue_twice_returns_error(self, download_service): """Test that starting queue processing twice is rejected.""" # Add item await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=1)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) # Start first time with patch.object(download_service, '_process_download', new_callable=AsyncMock): result1 = await download_service.start_queue_processing() assert result1 is not None # Returns message # Try to start again result2 = await download_service.start_queue_processing() assert result2 is not None assert "already" in result2.lower() # Error message about already running @pytest.mark.asyncio async def test_next_download_starts_after_current_completes(self, download_service): """Test that next download starts automatically after current completes.""" # Add multiple items for i in range(1, 3): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) # Mock download to complete quickly async def quick_download(item): item.status = DownloadStatus.COMPLETED item.completed_at = datetime.now(timezone.utc) with patch.object(download_service, '_process_download', side_effect=quick_download): await download_service.start_queue_processing() # Wait for both to complete await asyncio.sleep(0.5) status = await download_service.get_queue_status() # Both should be completed assert len(status.completed) == 2 assert len(status.pending) == 0 class TestQueueStatistics: """Tests for queue statistics accuracy.""" @pytest.mark.asyncio async def test_stats_accurate_for_pending_items(self, download_service): """Test that statistics accurately reflect pending item counts.""" # Add 5 items for i in range(1, 6): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) stats = await download_service.get_queue_stats() assert stats.pending_count == 5 assert stats.active_count == 0 assert stats.completed_count == 0 assert stats.failed_count == 0 assert stats.total_count == 5 @pytest.mark.asyncio async def test_stats_updated_after_removal(self, download_service): """Test that statistics update correctly after removing items.""" # Add items for i in range(1, 6): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) status = await download_service.get_queue_status() item_ids = [item.id for item in status.pending[:3]] # Remove 3 items await download_service.remove_from_queue(item_ids) stats = await download_service.get_queue_stats() assert stats.pending_count == 2 assert stats.total_count == 2 @pytest.mark.asyncio async def test_stats_reflect_completed_and_failed_counts(self, download_service): """Test that statistics accurately track completed and failed downloads.""" # Add items for i in range(1, 6): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) # Manually move some to completed/failed for testing async with download_service._lock: # Move 2 to completed for _ in range(2): item = download_service._pending_queue.popleft() item.status = DownloadStatus.COMPLETED download_service._completed.append(item) # Move 1 to failed item = download_service._pending_queue.popleft() item.status = DownloadStatus.FAILED download_service._failed.append(item) stats = await download_service.get_queue_stats() assert stats.pending_count == 2 assert stats.completed_count == 2 assert stats.failed_count == 1 assert stats.total_count == 5 @pytest.mark.asyncio async def test_stats_include_high_priority_count(self, download_service): """Test that statistics include high priority item counts.""" # Add normal priority items for i in range(1, 4): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) # Add high priority items for i in range(4, 6): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.HIGH ) stats = await download_service.get_queue_stats() # Should have 2 high priority items at front of queue status = await download_service.get_queue_status() high_priority_count = len([item for item in status.pending if item.priority == DownloadPriority.HIGH]) assert high_priority_count == 2 class TestQueueReordering: """Tests for queue reordering functionality.""" @pytest.mark.asyncio async def test_reorder_with_valid_ids(self, download_service): """Test reordering queue with valid item IDs.""" # Add items for i in range(1, 5): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) status = await download_service.get_queue_status() item_ids = [item.id for item in status.pending] # Reorder: move last to first new_order = [item_ids[3], item_ids[0], item_ids[1], item_ids[2]] await download_service.reorder_queue(new_order) # Verify new order status = await download_service.get_queue_status() assert status.pending[0].id == item_ids[3] assert status.pending[1].id == item_ids[0] assert status.pending[2].id == item_ids[1] assert status.pending[3].id == item_ids[2] @pytest.mark.asyncio async def test_reorder_with_invalid_ids_raises_error(self, download_service): """Test that reordering with invalid IDs raises an error.""" # Add items for i in range(1, 4): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) # Try to reorder with invalid ID with pytest.raises(DownloadServiceError, match="Invalid item IDs"): await download_service.reorder_queue(["invalid-id-1", "invalid-id-2"]) @pytest.mark.asyncio async def test_reorder_with_partial_ids_raises_error(self, download_service): """Test that reordering with partial list of IDs raises an error.""" # Add items for i in range(1, 5): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) status = await download_service.get_queue_status() item_ids = [item.id for item in status.pending] # Try to reorder with only some IDs with pytest.raises(DownloadServiceError, match="Invalid item IDs"): await download_service.reorder_queue([item_ids[0], item_ids[1]]) # Missing 2 items @pytest.mark.asyncio async def test_reorder_empty_queue_succeeds(self, download_service): """Test that reordering an empty queue succeeds (no-op).""" # Don't add any items # Reorder empty queue await download_service.reorder_queue([]) # Verify still empty status = await download_service.get_queue_status() assert len(status.pending) == 0 class TestConcurrentModifications: """Tests for concurrent queue modification handling and race condition prevention.""" @pytest.mark.asyncio async def test_concurrent_add_operations_all_succeed(self, download_service): """Test that concurrent add operations don't lose items.""" # Add items concurrently tasks = [] for i in range(1, 11): task = download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) tasks.append(task) await asyncio.gather(*tasks) # All 10 items should be in queue status = await download_service.get_queue_status() assert len(status.pending) == 10 @pytest.mark.asyncio async def test_concurrent_remove_operations_all_succeed(self, download_service): """Test that concurrent remove operations don't cause errors.""" # Add items for i in range(1, 11): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) status = await download_service.get_queue_status() item_ids = [item.id for item in status.pending] # Remove items concurrently tasks = [] for item_id in item_ids[:5]: task = download_service.remove_from_queue([item_id]) tasks.append(task) await asyncio.gather(*tasks) # 5 items should remain status = await download_service.get_queue_status() assert len(status.pending) == 5 @pytest.mark.asyncio async def test_add_while_processing_maintains_integrity(self, download_service): """Test that adding items while processing maintains queue integrity.""" # Add initial items for i in range(1, 3): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) # Start processing (mock slow download) async def slow_download(item): await asyncio.sleep(0.2) item.status = DownloadStatus.COMPLETED with patch.object(download_service, '_process_download', side_effect=slow_download): await download_service.start_queue_processing() # Add more items while processing await asyncio.sleep(0.1) for i in range(3, 6): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) # Wait for processing to finish await asyncio.sleep(0.5) # All items should be processed status = await download_service.get_queue_status() total_items = len(status.pending) + len(status.completed) assert total_items == 5 @pytest.mark.asyncio async def test_remove_while_processing_maintains_integrity(self, download_service): """Test that removing items while processing maintains queue integrity.""" # Add items for i in range(1, 6): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) status = await download_service.get_queue_status() items_to_remove = [item.id for item in status.pending[2:4]] # Remove items 3 and 4 # Start processing (mock slow download) async def slow_download(item): await asyncio.sleep(0.2) item.status = DownloadStatus.COMPLETED with patch.object(download_service, '_process_download', side_effect=slow_download): await download_service.start_queue_processing() # Remove items while processing await asyncio.sleep(0.1) await download_service.remove_from_queue(items_to_remove) # Wait for processing await asyncio.sleep(0.5) # Should have 3 items total (5 - 2 removed) status = await download_service.get_queue_status() total_items = len(status.pending) + len(status.completed) assert total_items == 3 @pytest.mark.asyncio async def test_reorder_while_empty_queue_succeeds(self, download_service): """Test that concurrent reorder on empty queue doesn't cause errors.""" # Try to reorder empty queue multiple times concurrently tasks = [download_service.reorder_queue([]) for _ in range(5)] # Should not raise any errors await asyncio.gather(*tasks) # Verify still empty status = await download_service.get_queue_status() assert len(status.pending) == 0 @pytest.mark.asyncio async def test_clear_operations_during_processing(self, download_service): """Test that clear operations during processing don't cause errors.""" # Add items for i in range(1, 6): await download_service.add_to_queue( episodes=[EpisodeIdentifier(serie_key="serie1", season=1, episode=i)], serie_name="Series 1", priority=DownloadPriority.NORMAL ) # Start processing async def slow_download(item): await asyncio.sleep(0.2) item.status = DownloadStatus.COMPLETED with patch.object(download_service, '_process_download', side_effect=slow_download): await download_service.start_queue_processing() # Clear pending while processing await asyncio.sleep(0.1) await download_service.clear_pending() # Wait for processing await asyncio.sleep(0.5) # Verify cleared (only currently processing item might complete) status = await download_service.get_queue_status() assert len(status.pending) == 0 # At most 1 completed (the one that was processing) assert len(status.completed) <= 1