"""Integration tests for WebSocket integration with core services. This module tests the integration between WebSocket broadcasting and core services (DownloadService, AnimeService, ProgressService) to ensure real-time updates are properly broadcasted to connected clients. """ import asyncio from typing import Any, Dict, List from unittest.mock import Mock, patch import pytest from src.server.models.download import ( DownloadPriority, DownloadStatus, EpisodeIdentifier, ) from src.server.services.anime_service import AnimeService from src.server.services.download_service import DownloadService from src.server.services.progress_service import ProgressService, ProgressType from src.server.services.websocket_service import WebSocketService @pytest.fixture def mock_series_app(): """Mock SeriesApp for testing.""" app = Mock() app.series_list = [] app.search = Mock(return_value=[]) app.ReScan = Mock() app.download = Mock(return_value=True) return app @pytest.fixture def progress_service(): """Create a ProgressService instance for testing.""" return ProgressService() @pytest.fixture def websocket_service(): """Create a WebSocketService instance for testing.""" return WebSocketService() @pytest.fixture async def anime_service(mock_series_app, progress_service): """Create an AnimeService with mocked dependencies.""" service = AnimeService( series_app=mock_series_app, progress_service=progress_service, ) yield service @pytest.fixture async def download_service(anime_service, progress_service): """Create a DownloadService with dependencies.""" service = DownloadService( anime_service=anime_service, progress_service=progress_service, persistence_path="/tmp/test_queue.json", ) yield service await service.stop() class TestWebSocketDownloadIntegration: """Test WebSocket integration with DownloadService.""" @pytest.mark.asyncio async def test_download_progress_broadcast( self, download_service, websocket_service ): """Test that download progress updates are broadcasted.""" broadcasts: List[Dict[str, Any]] = [] async def mock_broadcast(update_type: str, data: dict): """Capture broadcast calls.""" broadcasts.append({"type": update_type, "data": data}) download_service.set_broadcast_callback(mock_broadcast) # Add item to queue item_ids = await download_service.add_to_queue( serie_id="test_serie", serie_folder="test_serie", serie_name="Test Anime", episodes=[EpisodeIdentifier(season=1, episode=1)], priority=DownloadPriority.HIGH, ) assert len(item_ids) == 1 assert len(broadcasts) == 1 assert broadcasts[0]["type"] == "queue_status" assert broadcasts[0]["data"]["action"] == "items_added" assert item_ids[0] in broadcasts[0]["data"]["added_ids"] @pytest.mark.asyncio async def test_queue_operations_broadcast( self, download_service ): """Test that queue operations broadcast status updates.""" broadcasts: List[Dict[str, Any]] = [] async def mock_broadcast(update_type: str, data: dict): broadcasts.append({"type": update_type, "data": data}) download_service.set_broadcast_callback(mock_broadcast) # Add items item_ids = await download_service.add_to_queue( serie_id="test", serie_folder="test", serie_name="Test", episodes=[EpisodeIdentifier(season=1, episode=i) for i in range(1, 4)], priority=DownloadPriority.NORMAL, ) # Remove items removed = await download_service.remove_from_queue([item_ids[0]]) assert len(removed) == 1 # Check broadcasts add_broadcast = next( b for b in broadcasts if b["data"].get("action") == "items_added" ) remove_broadcast = next( b for b in broadcasts if b["data"].get("action") == "items_removed" ) assert add_broadcast["type"] == "queue_status" assert len(add_broadcast["data"]["added_ids"]) == 3 assert remove_broadcast["type"] == "queue_status" assert item_ids[0] in remove_broadcast["data"]["removed_ids"] @pytest.mark.asyncio async def test_queue_start_stop_broadcast( self, download_service ): """Test that start/stop operations broadcast updates.""" broadcasts: List[Dict[str, Any]] = [] async def mock_broadcast(update_type: str, data: dict): broadcasts.append({"type": update_type, "data": data}) download_service.set_broadcast_callback(mock_broadcast) # Start queue await download_service.start() await asyncio.sleep(0.1) # Stop queue await download_service.stop() # Find start/stop broadcasts start_broadcast = next( (b for b in broadcasts if b["type"] == "queue_started"), None, ) stop_broadcast = next( (b for b in broadcasts if b["type"] == "queue_stopped"), None, ) assert start_broadcast is not None assert start_broadcast["data"]["is_running"] is True assert stop_broadcast is not None assert stop_broadcast["data"]["is_running"] is False @pytest.mark.asyncio async def test_clear_completed_broadcast( self, download_service ): """Test that clearing completed items broadcasts update.""" broadcasts: List[Dict[str, Any]] = [] async def mock_broadcast(update_type: str, data: dict): broadcasts.append({"type": update_type, "data": data}) download_service.set_broadcast_callback(mock_broadcast) # Manually add a completed item to test from datetime import datetime, timezone from src.server.models.download import DownloadItem completed_item = DownloadItem( id="test_completed", serie_id="test", serie_name="Test", episode=EpisodeIdentifier(season=1, episode=1), status=DownloadStatus.COMPLETED, priority=DownloadPriority.NORMAL, added_at=datetime.now(timezone.utc), ) download_service._completed_items.append(completed_item) # Clear completed count = await download_service.clear_completed() assert count == 1 # Find clear broadcast clear_broadcast = next( ( b for b in broadcasts if b["data"].get("action") == "completed_cleared" ), None, ) assert clear_broadcast is not None assert clear_broadcast["data"]["cleared_count"] == 1 class TestWebSocketScanIntegration: """Test WebSocket integration with AnimeService scan operations.""" @pytest.mark.asyncio async def test_scan_progress_broadcast( self, anime_service, progress_service, mock_series_app ): """Test that scan progress updates are broadcasted.""" broadcasts: List[Dict[str, Any]] = [] async def mock_broadcast(message_type: str, data: dict, room: str): """Capture broadcast calls.""" broadcasts.append({ "type": message_type, "data": data, "room": room, }) progress_service.set_broadcast_callback(mock_broadcast) # Mock scan callback to simulate progress def mock_scan_callback(callback): """Simulate scan progress.""" if callback: callback({"current": 5, "total": 10, "message": "Scanning..."}) callback({"current": 10, "total": 10, "message": "Complete"}) mock_series_app.ReScan = mock_scan_callback # Run scan await anime_service.rescan() # Verify broadcasts were made assert len(broadcasts) >= 2 # At least start and complete # Check for scan progress broadcasts scan_broadcasts = [ b for b in broadcasts if b["room"] == "scan_progress" ] assert len(scan_broadcasts) >= 2 # Verify start broadcast start_broadcast = scan_broadcasts[0] assert start_broadcast["data"]["status"] == "started" assert start_broadcast["data"]["type"] == ProgressType.SCAN.value # Verify completion broadcast complete_broadcast = scan_broadcasts[-1] assert complete_broadcast["data"]["status"] == "completed" @pytest.mark.asyncio async def test_scan_failure_broadcast( self, anime_service, progress_service, mock_series_app ): """Test that scan failures are broadcasted.""" broadcasts: List[Dict[str, Any]] = [] async def mock_broadcast(message_type: str, data: dict, room: str): broadcasts.append({ "type": message_type, "data": data, "room": room, }) progress_service.set_broadcast_callback(mock_broadcast) # Mock scan to raise error def mock_scan_error(callback): raise RuntimeError("Scan failed") mock_series_app.ReScan = mock_scan_error # Run scan (should fail) with pytest.raises(Exception): await anime_service.rescan() # Verify failure broadcast scan_broadcasts = [ b for b in broadcasts if b["room"] == "scan_progress" ] assert len(scan_broadcasts) >= 2 # Start and fail # Verify failure broadcast fail_broadcast = scan_broadcasts[-1] assert fail_broadcast["data"]["status"] == "failed" # Verify error message or failed status is_error = "error" in fail_broadcast["data"]["message"].lower() is_failed = fail_broadcast["data"]["status"] == "failed" assert is_error or is_failed class TestWebSocketProgressIntegration: """Test WebSocket integration with ProgressService.""" @pytest.mark.asyncio async def test_progress_lifecycle_broadcast( self, progress_service ): """Test that progress lifecycle events are broadcasted.""" broadcasts: List[Dict[str, Any]] = [] async def mock_broadcast(message_type: str, data: dict, room: str): broadcasts.append({ "type": message_type, "data": data, "room": room, }) progress_service.set_broadcast_callback(mock_broadcast) # Start progress await progress_service.start_progress( progress_id="test_progress", progress_type=ProgressType.DOWNLOAD, title="Test Download", total=100, ) # Update progress await progress_service.update_progress( progress_id="test_progress", current=50, force_broadcast=True, ) # Complete progress await progress_service.complete_progress( progress_id="test_progress", message="Download complete", ) # Verify broadcasts assert len(broadcasts) == 3 start_broadcast = broadcasts[0] assert start_broadcast["data"]["status"] == "started" assert start_broadcast["room"] == "download_progress" update_broadcast = broadcasts[1] assert update_broadcast["data"]["status"] == "in_progress" assert update_broadcast["data"]["percent"] == 50.0 complete_broadcast = broadcasts[2] assert complete_broadcast["data"]["status"] == "completed" assert complete_broadcast["data"]["percent"] == 100.0 class TestWebSocketEndToEnd: """End-to-end integration tests with all services.""" @pytest.mark.asyncio async def test_complete_download_flow_with_broadcasts( self, download_service, anime_service, progress_service ): """Test complete download flow with all broadcasts.""" all_broadcasts: List[Dict[str, Any]] = [] async def capture_download_broadcast(update_type: str, data: dict): all_broadcasts.append({ "source": "download", "type": update_type, "data": data, }) async def capture_progress_broadcast( message_type: str, data: dict, room: str ): all_broadcasts.append({ "source": "progress", "type": message_type, "data": data, "room": room, }) download_service.set_broadcast_callback(capture_download_broadcast) progress_service.set_broadcast_callback(capture_progress_broadcast) # Add items to queue item_ids = await download_service.add_to_queue( serie_id="test", serie_folder="test", serie_name="Test Anime", episodes=[EpisodeIdentifier(season=1, episode=1)], priority=DownloadPriority.HIGH, ) # Start queue await download_service.start() await asyncio.sleep(0.1) # Pause queue await download_service.pause_queue() # Resume queue await download_service.resume_queue() # Stop queue await download_service.stop() # Verify we received broadcasts from both services download_broadcasts = [ b for b in all_broadcasts if b["source"] == "download" ] assert len(download_broadcasts) >= 4 # add, start, pause, resume, stop assert len(item_ids) == 1 # Verify queue status broadcasts queue_status_broadcasts = [ b for b in download_broadcasts if b["type"] == "queue_status" ] assert len(queue_status_broadcasts) >= 1 if __name__ == "__main__": pytest.main([__file__, "-v"])