"""Unit tests for download progress WebSocket updates. This module tests the integration between download service progress tracking and WebSocket broadcasting to ensure real-time updates are properly sent to connected clients. """ import asyncio from typing import Any, Dict, List from unittest.mock import Mock, patch import pytest from src.server.models.download import DownloadPriority, EpisodeIdentifier from src.server.services.anime_service import AnimeService from src.server.services.download_service import DownloadService from src.server.services.progress_service import ProgressService @pytest.fixture def mock_series_app(): """Mock SeriesApp for testing.""" from unittest.mock import MagicMock app = MagicMock() app.series_list = [] app.search = Mock(return_value=[]) app.ReScan = Mock() # Create mock event handlers that can be assigned app.download_status = None app.scan_status = None # Mock download with event triggering async def mock_download( serie_folder, season, episode, key, **kwargs ): """Simulate download with events.""" # Create event args that mimic SeriesApp's DownloadStatusEventArgs class MockDownloadArgs: def __init__( self, status, serie_folder, season, episode, key=None, progress=None, message=None, error=None, item_id=None ): self.status = status self.serie_folder = serie_folder self.key = key self.season = season self.episode = episode self.progress = progress self.message = message self.error = error self.item_id = item_id # Trigger started event if app.download_status: app.download_status(MockDownloadArgs( "started", serie_folder, season, episode, key=key )) # Simulate progress updates progress_values = [25.0, 50.0, 75.0, 100.0] for progress in progress_values: if app.download_status: await asyncio.sleep(0.01) # Small delay app.download_status(MockDownloadArgs( "progress", serie_folder, season, episode, key=key, progress=progress, message=f"Downloading... {progress}%" )) # Trigger completed event if app.download_status: app.download_status(MockDownloadArgs( "completed", serie_folder, season, episode, key=key )) return True return True app.download = Mock(side_effect=mock_download) return app @pytest.fixture def progress_service(): """Create a ProgressService instance for testing.""" return ProgressService() @pytest.fixture async def anime_service(mock_series_app, progress_service): """Create an AnimeService with mocked dependencies.""" service = AnimeService( series_app=mock_series_app, progress_service=progress_service, ) yield service @pytest.fixture async def download_service(anime_service, progress_service): """Create a DownloadService with dependencies.""" import os persistence_path = "/tmp/test_download_progress_queue.json" # Remove any existing queue file if os.path.exists(persistence_path): os.remove(persistence_path) service = DownloadService( anime_service=anime_service, progress_service=progress_service, persistence_path=persistence_path, ) yield service, progress_service await service.stop() # Clean up after test if os.path.exists(persistence_path): os.remove(persistence_path) class TestDownloadProgressWebSocket: """Test download progress WebSocket broadcasting.""" @pytest.mark.asyncio async def test_progress_callback_broadcasts_updates( self, download_service ): """Test that progress updates are emitted via events.""" download_svc, progress_svc = download_service broadcasts: List[Dict[str, Any]] = [] async def mock_event_handler(event): """Capture progress events.""" broadcasts.append({ "type": event.event_type, "data": event.progress.to_dict() }) # Subscribe to progress_updated events progress_svc.subscribe("progress_updated", mock_event_handler) # Add item to queue item_ids = await download_svc.add_to_queue( serie_id="test_serie_1", serie_folder="test_serie_1", serie_name="Test Anime", episodes=[EpisodeIdentifier(season=1, episode=1)], priority=DownloadPriority.NORMAL, ) assert len(item_ids) == 1 # Start processing - this should trigger download with progress result = await download_svc.start_queue_processing() assert result is not None # Wait for download to process await asyncio.sleep(0.5) # Filter download progress broadcasts progress_broadcasts = [ b for b in broadcasts if b["type"] == "download_progress" ] # Should have received multiple progress updates assert len(progress_broadcasts) >= 2 # Verify progress data structure (Progress model format) for broadcast in progress_broadcasts: data = broadcast["data"] assert "id" in data # Progress ID assert "type" in data # Progress type # Progress events use 'current' and 'total' assert "current" in data or "message" in data @pytest.mark.asyncio async def test_progress_updates_include_episode_info( self, download_service ): """Test that progress updates include episode information.""" download_svc, progress_svc = download_service broadcasts: List[Dict[str, Any]] = [] async def mock_event_handler(event): broadcasts.append({ "type": event.event_type, "data": event.progress.to_dict() }) progress_svc.subscribe("progress_updated", mock_event_handler) # Add item with specific episode info await download_svc.add_to_queue( serie_id="test_serie_2", serie_folder="test_serie_2", serie_name="My Test Anime", episodes=[EpisodeIdentifier(season=2, episode=5)], priority=DownloadPriority.HIGH, ) # Start processing await download_svc.start_queue_processing() await asyncio.sleep(0.5) # Find progress broadcasts progress_broadcasts = [ b for b in broadcasts if b["type"] == "download_progress" ] assert len(progress_broadcasts) > 0 # Verify progress info is included data = progress_broadcasts[0]["data"] assert "id" in data # ID should contain folder name: download_test_serie_2_2_5 assert "test_serie_2" in data["id"] @pytest.mark.asyncio async def test_progress_percent_increases(self, download_service): """Test that progress percentage increases over time.""" download_svc, progress_svc = download_service broadcasts: List[Dict[str, Any]] = [] async def mock_event_handler(event): broadcasts.append({ "type": event.event_type, "data": event.progress.to_dict() }) progress_svc.subscribe("progress_updated", mock_event_handler) await download_svc.add_to_queue( serie_id="test_serie_3", serie_folder="test_serie_3", serie_name="Progress Test", episodes=[EpisodeIdentifier(season=1, episode=1)], ) await download_svc.start_queue_processing() await asyncio.sleep(0.5) # Get progress broadcasts in order progress_broadcasts = [ b for b in broadcasts if b["type"] == "download_progress" ] # Verify we have multiple updates assert len(progress_broadcasts) >= 2 # Verify progress increases (using current value) current_values = [ b["data"].get("current", 0) for b in progress_broadcasts ] # Each current value should be >= the previous one for i in range(1, len(current_values)): assert current_values[i] >= current_values[i - 1] @pytest.mark.asyncio async def test_progress_includes_speed_and_eta(self, download_service): """Test that progress updates include speed and ETA.""" download_svc, progress_svc = download_service broadcasts: List[Dict[str, Any]] = [] async def mock_event_handler(event): broadcasts.append({ "type": event.event_type, "data": event.progress.to_dict() }) progress_svc.subscribe("progress_updated", mock_event_handler) await download_svc.add_to_queue( serie_id="test_serie_4", serie_folder="test_serie_4", serie_name="Speed Test", episodes=[EpisodeIdentifier(season=1, episode=1)], ) await download_svc.start_queue_processing() await asyncio.sleep(0.5) progress_broadcasts = [ b for b in broadcasts if b["type"] == "download_progress" ] assert len(progress_broadcasts) > 0 # Check that progress data is present progress_data = progress_broadcasts[0]["data"] assert "id" in progress_data assert "type" in progress_data assert progress_data["type"] == "download" @pytest.mark.asyncio async def test_no_broadcast_without_callback(self, download_service): """Test that no errors occur when no event handlers subscribed.""" download_svc, progress_svc = download_service # Don't subscribe to any events await download_svc.add_to_queue( serie_id="test_serie_5", serie_folder="test_serie_5", serie_name="No Broadcast Test", episodes=[EpisodeIdentifier(season=1, episode=1)], ) # Should complete without errors await download_svc.start_queue_processing() await asyncio.sleep(0.5) # Verify download completed successfully status = await download_svc.get_queue_status() assert len(status.completed_downloads) == 1 @pytest.mark.asyncio async def test_broadcast_error_handling(self, download_service): """Test that event handler errors don't break download process.""" download_svc, progress_svc = download_service error_count = 0 async def failing_handler(event): """Event handler that always fails.""" nonlocal error_count error_count += 1 raise RuntimeError("Event handler failed") progress_svc.subscribe("progress_updated", failing_handler) await download_svc.add_to_queue( serie_id="test_serie_6", serie_folder="test_serie_6", serie_name="Error Handling Test", episodes=[EpisodeIdentifier(season=1, episode=1)], ) # Should complete despite handler errors await download_svc.start_queue_processing() await asyncio.sleep(0.5) # Verify download still completed status = await download_svc.get_queue_status() assert len(status.completed_downloads) == 1 # Verify handler was attempted assert error_count > 0 @pytest.mark.asyncio async def test_multiple_downloads_broadcast_separately( self, download_service ): """Test that multiple downloads emit progress separately.""" download_svc, progress_svc = download_service broadcasts: List[Dict[str, Any]] = [] async def mock_event_handler(event): broadcasts.append({ "type": event.event_type, "data": event.progress.to_dict() }) progress_svc.subscribe("progress_updated", mock_event_handler) # Add multiple episodes item_ids = await download_svc.add_to_queue( serie_id="test_serie_7", serie_folder="test_serie_7", serie_name="Multi Episode Test", episodes=[ EpisodeIdentifier(season=1, episode=1), EpisodeIdentifier(season=1, episode=2), ], ) assert len(item_ids) == 2 # Start processing # Give time for both downloads await download_svc.start_queue_processing() await asyncio.sleep(2.0) # Get progress broadcasts progress_broadcasts = [ b for b in broadcasts if b["type"] == "download_progress" ] # Should have progress for both episodes assert len(progress_broadcasts) >= 4 # At least 2 updates per ep # Verify different download IDs download_ids = set() for broadcast in progress_broadcasts: download_id = broadcast["data"].get("id", "") if "download_" in download_id: download_ids.add(download_id) # Should have at least 2 unique download progress IDs assert len(download_ids) >= 2 @pytest.mark.asyncio async def test_progress_data_format_matches_model(self, download_service): """Test that event data matches Progress model.""" download_svc, progress_svc = download_service broadcasts: List[Dict[str, Any]] = [] async def mock_event_handler(event): broadcasts.append({ "type": event.event_type, "data": event.progress.to_dict() }) progress_svc.subscribe("progress_updated", mock_event_handler) await download_svc.add_to_queue( serie_id="test_serie_8", serie_folder="test_serie_8", serie_name="Model Test", episodes=[EpisodeIdentifier(season=1, episode=1)], ) await download_svc.start_queue_processing() await asyncio.sleep(0.5) progress_broadcasts = [ b for b in broadcasts if b["type"] == "download_progress" ] assert len(progress_broadcasts) > 0 # Verify progress follows Progress model structure progress_data = progress_broadcasts[0]["data"] # Verify required fields from Progress model assert "id" in progress_data assert "type" in progress_data assert "status" in progress_data assert progress_data["type"] == "download"