"""Integration tests for WebSocket functionality. This module tests the complete WebSocket integration including: - WebSocket connection establishment and authentication - Real-time message broadcasting - Room-based messaging - Connection lifecycle management - Integration with download and progress services - Error handling and reconnection - Concurrent client management """ import asyncio import json from typing import Any, Dict, List from unittest.mock import AsyncMock, Mock, patch import pytest from httpx import ASGITransport, AsyncClient from starlette.websockets import WebSocketDisconnect from src.server.fastapi_app import app from src.server.models.download import DownloadPriority, DownloadStatus from src.server.services.auth_service import auth_service from src.server.services.progress_service import ProgressType from src.server.services.websocket_service import ( ConnectionManager, get_websocket_service, ) @pytest.fixture(autouse=True) def reset_auth(): """Reset authentication state before each test.""" original_hash = auth_service._hash auth_service._hash = None auth_service._failed.clear() yield auth_service._hash = original_hash auth_service._failed.clear() @pytest.fixture async def client(): """Create an async test client.""" transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as ac: yield ac @pytest.fixture async def auth_token(client): """Get a valid authentication token.""" password = "WebSocketTestPassword123!" await client.post( "/api/auth/setup", json={"master_password": password} ) response = await client.post( "/api/auth/login", json={"password": password} ) return response.json()["access_token"] @pytest.fixture def websocket_service(): """Get the WebSocket service instance.""" return get_websocket_service() @pytest.fixture def mock_websocket(): """Create a mock WebSocket connection.""" ws = AsyncMock() ws.send_text = AsyncMock() ws.send_json = AsyncMock() ws.receive_text = AsyncMock() ws.accept = AsyncMock() ws.close = AsyncMock() return ws class TestWebSocketConnection: """Test WebSocket connection establishment and lifecycle.""" async def test_websocket_endpoint_exists(self, client, auth_token): """Test that WebSocket endpoint is available.""" # This test verifies the endpoint exists # Full WebSocket testing requires WebSocket client # Verify the WebSocket route is registered routes = [route.path for route in app.routes] websocket_routes = [ path for path in routes if "ws" in path or "websocket" in path ] assert len(websocket_routes) > 0 async def test_connection_manager_tracks_connections( self, websocket_service, mock_websocket ): """Test that connection manager tracks active connections.""" manager = websocket_service.manager # Initially no connections initial_count = len(manager.active_connections) # Add a connection await manager.connect(mock_websocket, room="test-room") assert len(manager.active_connections) == initial_count + 1 assert mock_websocket in manager.active_connections async def test_disconnect_removes_connection( self, websocket_service, mock_websocket ): """Test that disconnecting removes connection from manager.""" manager = websocket_service.manager # Connect await manager.connect(mock_websocket, room="test-room") assert mock_websocket in manager.active_connections # Disconnect manager.disconnect(mock_websocket) assert mock_websocket not in manager.active_connections async def test_room_assignment_on_connection( self, websocket_service, mock_websocket ): """Test that connections are assigned to rooms.""" manager = websocket_service.manager room = "test-room-1" await manager.connect(mock_websocket, room=room) # Verify connection is in the room assert room in manager._rooms assert mock_websocket in manager._rooms[room] async def test_multiple_rooms_support( self, websocket_service ): """Test that multiple rooms can exist simultaneously.""" manager = websocket_service.manager ws1 = AsyncMock() ws2 = AsyncMock() ws3 = AsyncMock() # Connect to different rooms await manager.connect(ws1, room="room-1") await manager.connect(ws2, room="room-2") await manager.connect(ws3, room="room-1") # Verify room structure assert "room-1" in manager._rooms assert "room-2" in manager._rooms assert len(manager._rooms["room-1"]) == 2 assert len(manager._rooms["room-2"]) == 1 class TestMessageBroadcasting: """Test message broadcasting functionality.""" async def test_broadcast_to_all_connections( self, websocket_service ): """Test broadcasting message to all connected clients.""" manager = websocket_service.manager # Create mock connections ws1 = AsyncMock() ws2 = AsyncMock() ws3 = AsyncMock() await manager.connect(ws1, room="room-1") await manager.connect(ws2, room="room-1") await manager.connect(ws3, room="room-2") # Broadcast to all message = {"type": "test", "data": "broadcast to all"} await manager.broadcast(message) # All connections should receive message ws1.send_json.assert_called_once() ws2.send_json.assert_called_once() ws3.send_json.assert_called_once() async def test_broadcast_to_specific_room( self, websocket_service ): """Test broadcasting message to specific room only.""" manager = websocket_service.manager ws1 = AsyncMock() ws2 = AsyncMock() ws3 = AsyncMock() await manager.connect(ws1, room="downloads") await manager.connect(ws2, room="downloads") await manager.connect(ws3, room="system") # Broadcast to specific room message = {"type": "download_progress", "data": {}} await manager.broadcast_to_room(message, room="downloads") # Only room members should receive assert ws1.send_json.call_count == 1 assert ws2.send_json.call_count == 1 assert ws3.send_json.call_count == 0 async def test_broadcast_with_json_message( self, websocket_service ): """Test broadcasting JSON-formatted messages.""" manager = websocket_service.manager ws = AsyncMock() await manager.connect(ws, room="test") message = { "type": "queue_update", "data": { "pending": 5, "active": 2, "completed": 10 }, "timestamp": "2025-10-19T10:00:00" } await manager.broadcast(message) ws.send_json.assert_called_once_with(message) async def test_broadcast_handles_disconnected_clients( self, websocket_service ): """Test that broadcasting handles disconnected clients gracefully.""" manager = websocket_service.manager # Mock connection that will fail failing_ws = AsyncMock() failing_ws.send_json.side_effect = RuntimeError("Connection closed") working_ws = AsyncMock() await manager.connect(failing_ws, room="test") await manager.connect(working_ws, room="test") # Broadcast should handle failure message = {"type": "test", "data": "test"} await manager.broadcast(message) # Working connection should still receive working_ws.send_json.assert_called_once() class TestProgressIntegration: """Test integration with progress service.""" async def test_download_progress_broadcasts_to_websocket( self, websocket_service ): """Test that download progress updates broadcast via WebSocket.""" manager = websocket_service.manager ws = AsyncMock() await manager.connect(ws, room="downloads") # Simulate progress update broadcast message = { "type": "download_progress", "data": { "item_id": "test-download-1", "percentage": 45.5, "current_mb": 45.5, "total_mb": 100.0, "speed_mbps": 2.5 } } await manager.broadcast_to_room(message, room="downloads") ws.send_json.assert_called_once_with(message) async def test_download_complete_notification( self, websocket_service ): """Test download completion notification via WebSocket.""" manager = websocket_service.manager ws = AsyncMock() await manager.connect(ws, room="downloads") message = { "type": "download_complete", "data": { "item_id": "test-download-1", "serie_name": "Test Anime", "episode": {"season": 1, "episode": 1} } } await manager.broadcast_to_room(message, room="downloads") ws.send_json.assert_called_once() async def test_download_failed_notification( self, websocket_service ): """Test download failure notification via WebSocket.""" manager = websocket_service.manager ws = AsyncMock() await manager.connect(ws, room="downloads") message = { "type": "download_failed", "data": { "item_id": "test-download-1", "error": "Network timeout", "retry_count": 2 } } await manager.broadcast_to_room(message, room="downloads") ws.send_json.assert_called_once() class TestQueueStatusBroadcasting: """Test queue status broadcasting via WebSocket.""" async def test_queue_status_update_broadcast( self, websocket_service ): """Test broadcasting queue status updates.""" manager = websocket_service.manager ws = AsyncMock() await manager.connect(ws, room="queue") message = { "type": "queue_status", "data": { "pending_count": 5, "active_count": 2, "completed_count": 10, "failed_count": 1, "total_items": 18 } } await manager.broadcast_to_room(message, room="queue") ws.send_json.assert_called_once_with(message) async def test_queue_item_added_notification( self, websocket_service ): """Test notification when item is added to queue.""" manager = websocket_service.manager ws = AsyncMock() await manager.connect(ws, room="queue") message = { "type": "queue_item_added", "data": { "item_id": "new-item-1", "serie_name": "New Series", "episode_count": 3, "priority": "normal" } } await manager.broadcast_to_room(message, room="queue") ws.send_json.assert_called_once() async def test_queue_item_removed_notification( self, websocket_service ): """Test notification when item is removed from queue.""" manager = websocket_service.manager ws = AsyncMock() await manager.connect(ws, room="queue") message = { "type": "queue_item_removed", "data": { "item_id": "removed-item-1", "reason": "user_cancelled" } } await manager.broadcast_to_room(message, room="queue") ws.send_json.assert_called_once() class TestSystemMessaging: """Test system-wide messaging via WebSocket.""" async def test_system_notification_broadcast( self, websocket_service ): """Test broadcasting system notifications.""" manager = websocket_service.manager ws1 = AsyncMock() ws2 = AsyncMock() await manager.connect(ws1, room="system") await manager.connect(ws2, room="system") message = { "type": "system_notification", "data": { "level": "info", "message": "System maintenance scheduled", "timestamp": "2025-10-19T10:00:00" } } await manager.broadcast_to_room(message, room="system") ws1.send_json.assert_called_once() ws2.send_json.assert_called_once() async def test_error_message_broadcast( self, websocket_service ): """Test broadcasting error messages.""" manager = websocket_service.manager ws = AsyncMock() await manager.connect(ws, room="errors") message = { "type": "error", "data": { "error_code": "DOWNLOAD_FAILED", "message": "Failed to download episode", "details": "Connection timeout" } } await manager.broadcast_to_room(message, room="errors") ws.send_json.assert_called_once() class TestConcurrentConnections: """Test handling of concurrent WebSocket connections.""" async def test_multiple_clients_in_same_room( self, websocket_service ): """Test multiple clients receiving broadcasts in same room.""" manager = websocket_service.manager # Create multiple connections clients = [AsyncMock() for _ in range(5)] for client in clients: await manager.connect(client, room="shared-room") # Broadcast to all message = {"type": "test", "data": "multi-client test"} await manager.broadcast_to_room(message, room="shared-room") # All clients should receive for client in clients: client.send_json.assert_called_once_with(message) async def test_concurrent_broadcasts_to_different_rooms( self, websocket_service ): """Test concurrent broadcasts to different rooms.""" manager = websocket_service.manager # Setup rooms with clients downloads_ws = AsyncMock() queue_ws = AsyncMock() system_ws = AsyncMock() await manager.connect(downloads_ws, room="downloads") await manager.connect(queue_ws, room="queue") await manager.connect(system_ws, room="system") # Concurrent broadcasts await asyncio.gather( manager.broadcast_to_room( {"type": "download_progress"}, "downloads" ), manager.broadcast_to_room( {"type": "queue_update"}, "queue" ), manager.broadcast_to_room( {"type": "system_message"}, "system" ) ) # Each client should receive only their room's message downloads_ws.send_json.assert_called_once() queue_ws.send_json.assert_called_once() system_ws.send_json.assert_called_once() class TestConnectionErrorHandling: """Test error handling in WebSocket connections.""" async def test_handle_send_failure( self, websocket_service ): """Test handling of message send failures.""" manager = websocket_service.manager # Connection that will fail on send failing_ws = AsyncMock() failing_ws.send_json.side_effect = RuntimeError("Send failed") await manager.connect(failing_ws, room="test") # Should handle error gracefully message = {"type": "test", "data": "test"} try: await manager.broadcast_to_room(message, room="test") except RuntimeError: pytest.fail("Should handle send failure gracefully") async def test_handle_multiple_send_failures( self, websocket_service ): """Test handling multiple concurrent send failures.""" manager = websocket_service.manager # Multiple failing connections failing_clients = [] for i in range(3): ws = AsyncMock() ws.send_json.side_effect = RuntimeError(f"Failed {i}") failing_clients.append(ws) await manager.connect(ws, room="test") # Add one working connection working_ws = AsyncMock() await manager.connect(working_ws, room="test") # Broadcast should continue despite failures message = {"type": "test", "data": "test"} await manager.broadcast_to_room(message, room="test") # Working connection should still receive working_ws.send_json.assert_called_once() async def test_cleanup_after_disconnect( self, websocket_service ): """Test proper cleanup after client disconnect.""" manager = websocket_service.manager ws = AsyncMock() room = "test-room" # Connect and then disconnect await manager.connect(ws, room=room) manager.disconnect(ws) # Verify cleanup assert ws not in manager.active_connections if room in manager._rooms: assert ws not in manager._rooms[room] class TestMessageFormatting: """Test message formatting and validation.""" async def test_message_structure_validation( self, websocket_service ): """Test that messages have required structure.""" manager = websocket_service.manager ws = AsyncMock() await manager.connect(ws, room="test") # Valid message structure valid_message = { "type": "test_message", "data": {"key": "value"}, } await manager.broadcast(valid_message) ws.send_json.assert_called_once_with(valid_message) async def test_different_message_types( self, websocket_service ): """Test broadcasting different message types.""" manager = websocket_service.manager ws = AsyncMock() await manager.connect(ws, room="test") message_types = [ "download_progress", "download_complete", "download_failed", "queue_status", "system_notification", "error" ] for msg_type in message_types: message = {"type": msg_type, "data": {}} await manager.broadcast(message) # Should have received all message types assert ws.send_json.call_count == len(message_types) class TestWebSocketServiceIntegration: """Test WebSocket service integration with other services.""" async def test_websocket_service_singleton(self): """Test that WebSocket service is a singleton.""" service1 = get_websocket_service() service2 = get_websocket_service() assert service1 is service2 async def test_service_has_connection_manager(self): """Test that service has connection manager.""" service = get_websocket_service() assert hasattr(service, 'manager') assert isinstance(service.manager, ConnectionManager) async def test_service_broadcast_methods_exist(self): """Test that service has required broadcast methods.""" service = get_websocket_service() required_methods = [ 'broadcast_download_progress', 'broadcast_download_complete', 'broadcast_download_failed', 'broadcast_queue_status', 'broadcast_system_message', 'send_error' ] for method in required_methods: assert hasattr(service, method) class TestRoomManagement: """Test room management functionality.""" async def test_room_creation_on_first_connection( self, websocket_service ): """Test that room is created when first client connects.""" manager = websocket_service.manager ws = AsyncMock() room = "new-room" # Room should not exist initially assert room not in manager._rooms # Connect to room await manager.connect(ws, room=room) # Room should now exist assert room in manager._rooms async def test_room_cleanup_when_empty( self, websocket_service ): """Test that empty rooms are cleaned up.""" manager = websocket_service.manager ws = AsyncMock() room = "temp-room" # Connect and disconnect await manager.connect(ws, room=room) manager.disconnect(ws) # Room should be cleaned up if empty # (Implementation may vary) if room in manager._rooms: assert len(manager._rooms[room]) == 0 async def test_client_can_be_in_one_room( self, websocket_service ): """Test client room membership.""" manager = websocket_service.manager ws = AsyncMock() # Connect to room await manager.connect(ws, room="room-1") # Verify in room assert "room-1" in manager._rooms assert ws in manager._rooms["room-1"] class TestCompleteWebSocketWorkflow: """Test complete WebSocket workflows.""" async def test_full_download_notification_workflow( self, websocket_service ): """Test complete workflow of download notifications.""" manager = websocket_service.manager ws = AsyncMock() await manager.connect(ws, room="downloads") # Simulate download lifecycle # 1. Download started await manager.broadcast_to_room( {"type": "download_started", "data": {"item_id": "dl-1"}}, "downloads" ) # 2. Progress updates for progress in [25, 50, 75]: await manager.broadcast_to_room( { "type": "download_progress", "data": {"item_id": "dl-1", "percentage": progress} }, "downloads" ) # 3. Download complete await manager.broadcast_to_room( {"type": "download_complete", "data": {"item_id": "dl-1"}}, "downloads" ) # Client should have received all notifications assert ws.send_json.call_count == 5 async def test_multi_room_workflow( self, websocket_service ): """Test workflow involving multiple rooms.""" manager = websocket_service.manager # Setup clients in different rooms download_ws = AsyncMock() queue_ws = AsyncMock() system_ws = AsyncMock() await manager.connect(download_ws, room="downloads") await manager.connect(queue_ws, room="queue") await manager.connect(system_ws, room="system") # Broadcast to each room await manager.broadcast_to_room( {"type": "download_update"}, "downloads" ) await manager.broadcast_to_room( {"type": "queue_update"}, "queue" ) await manager.broadcast_to_room( {"type": "system_update"}, "system" ) # Each client should only receive their room's messages download_ws.send_json.assert_called_once() queue_ws.send_json.assert_called_once() system_ws.send_json.assert_called_once()