"""Integration tests for WebSocket functionality. This module tests the complete WebSocket integration including: - WebSocket connection establishment and authentication - Real-time message broadcasting - Room-based messaging - Connection lifecycle management - Integration with download and progress services - Error handling and reconnection - Concurrent client management """ import asyncio import uuid from unittest.mock import AsyncMock import pytest from httpx import ASGITransport, AsyncClient from src.server.fastapi_app import app from src.server.services.auth_service import auth_service from src.server.services.websocket_service import ( ConnectionManager, get_websocket_service, ) @pytest.fixture(autouse=True) def reset_auth(): """Reset authentication state before each test.""" original_hash = auth_service._hash auth_service._hash = None auth_service._failed.clear() yield auth_service._hash = original_hash auth_service._failed.clear() @pytest.fixture async def client(): """Create an async test client.""" transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as ac: yield ac @pytest.fixture async def auth_token(client): """Get a valid authentication token.""" password = "WebSocketTestPassword123!" await client.post( "/api/auth/setup", json={"master_password": password} ) response = await client.post( "/api/auth/login", json={"password": password} ) return response.json()["access_token"] @pytest.fixture def websocket_service(): """Get the WebSocket service instance.""" return get_websocket_service() @pytest.fixture def mock_websocket(): """Create a mock WebSocket connection.""" ws = AsyncMock() ws.send_text = AsyncMock() ws.send_json = AsyncMock() ws.receive_text = AsyncMock() ws.accept = AsyncMock() ws.close = AsyncMock() return ws class TestWebSocketConnection: """Test WebSocket connection establishment and lifecycle.""" async def test_websocket_endpoint_exists(self, client, auth_token): """Test that WebSocket endpoint is available.""" routes = [route.path for route in app.routes] websocket_routes = [ path for path in routes if "ws" in path or "websocket" in path ] assert len(websocket_routes) > 0 async def test_connection_manager_tracks_connections( self, websocket_service, mock_websocket ): """Test that connection manager tracks active connections.""" manager = websocket_service.manager connection_id = "test-conn-1" initial_count = await manager.get_connection_count() mock_websocket.accept = AsyncMock() await manager.connect(mock_websocket, connection_id) assert await manager.get_connection_count() == initial_count + 1 assert connection_id in manager._active_connections async def test_disconnect_removes_connection( self, websocket_service, mock_websocket ): """Test that disconnecting removes connection from manager.""" manager = websocket_service.manager connection_id = "test-conn-2" mock_websocket.accept = AsyncMock() await manager.connect(mock_websocket, connection_id) assert connection_id in manager._active_connections await manager.disconnect(connection_id) assert connection_id not in manager._active_connections async def test_room_assignment_on_connection( self, websocket_service, mock_websocket ): """Test that connections can join rooms.""" manager = websocket_service.manager connection_id = "test-conn-3" room = "test-room-1" mock_websocket.accept = AsyncMock() await manager.connect(mock_websocket, connection_id) await manager.join_room(connection_id, room) assert room in manager._rooms assert connection_id in manager._rooms[room] async def test_multiple_rooms_support( self, websocket_service ): """Test that multiple rooms can exist simultaneously.""" manager = websocket_service.manager ws1 = AsyncMock() ws1.accept = AsyncMock() ws2 = AsyncMock() ws2.accept = AsyncMock() ws3 = AsyncMock() ws3.accept = AsyncMock() await manager.connect(ws1, "conn-1") await manager.connect(ws2, "conn-2") await manager.connect(ws3, "conn-3") await manager.join_room("conn-1", "room-1") await manager.join_room("conn-2", "room-2") await manager.join_room("conn-3", "room-1") assert "room-1" in manager._rooms assert "room-2" in manager._rooms assert len(manager._rooms["room-1"]) == 2 assert len(manager._rooms["room-2"]) == 1 class TestMessageBroadcasting: """Test message broadcasting functionality.""" async def test_broadcast_to_all_connections( self, websocket_service ): """Test broadcasting message to all connected clients.""" manager = websocket_service.manager ws1 = AsyncMock() ws1.accept = AsyncMock() ws2 = AsyncMock() ws2.accept = AsyncMock() ws3 = AsyncMock() ws3.accept = AsyncMock() await manager.connect(ws1, "conn-1") await manager.connect(ws2, "conn-2") await manager.connect(ws3, "conn-3") message = {"type": "test", "data": "broadcast to all"} await manager.broadcast(message) ws1.send_json.assert_called_once() ws2.send_json.assert_called_once() ws3.send_json.assert_called_once() async def test_broadcast_to_specific_room( self, websocket_service ): """Test broadcasting message to specific room only.""" manager = websocket_service.manager ws1 = AsyncMock() ws1.accept = AsyncMock() ws2 = AsyncMock() ws2.accept = AsyncMock() ws3 = AsyncMock() ws3.accept = AsyncMock() await manager.connect(ws1, "conn-1") await manager.connect(ws2, "conn-2") await manager.connect(ws3, "conn-3") await manager.join_room("conn-1", "downloads") await manager.join_room("conn-2", "downloads") await manager.join_room("conn-3", "system") message = {"type": "download_progress", "data": {}} await manager.broadcast_to_room(message, room="downloads") assert ws1.send_json.call_count == 1 assert ws2.send_json.call_count == 1 assert ws3.send_json.call_count == 0 async def test_broadcast_with_json_message( self, websocket_service ): """Test broadcasting JSON-formatted messages.""" manager = websocket_service.manager ws = AsyncMock() ws.accept = AsyncMock() await manager.connect(ws, "conn-1") message = { "type": "queue_update", "data": { "pending": 5, "active": 2, "completed": 10 }, "timestamp": "2025-10-19T10:00:00" } await manager.broadcast(message) ws.send_json.assert_called_once_with(message) async def test_broadcast_handles_disconnected_clients( self, websocket_service ): """Test that broadcasting handles disconnected clients gracefully.""" manager = websocket_service.manager failing_ws = AsyncMock() failing_ws.accept = AsyncMock() failing_ws.send_json.side_effect = RuntimeError("Connection closed") working_ws = AsyncMock() working_ws.accept = AsyncMock() await manager.connect(failing_ws, "conn-1") await manager.connect(working_ws, "conn-2") message = {"type": "test", "data": "test"} await manager.broadcast(message) working_ws.send_json.assert_called_once() class TestProgressIntegration: """Test integration with progress service.""" async def test_download_progress_broadcasts_to_websocket( self, websocket_service ): """Test that download progress updates broadcast via WebSocket.""" manager = websocket_service.manager ws = AsyncMock() ws.accept = AsyncMock() await manager.connect(ws, "conn-1") await manager.join_room("conn-1", "downloads") message = { "type": "download_progress", "data": { "item_id": "test-download-1", "percentage": 45.5, "current_mb": 45.5, "total_mb": 100.0, "speed_mbps": 2.5 } } await manager.broadcast_to_room(message, room="downloads") ws.send_json.assert_called_once_with(message) async def test_download_complete_notification( self, websocket_service ): """Test download completion notification via WebSocket.""" manager = websocket_service.manager ws = AsyncMock() ws.accept = AsyncMock() await manager.connect(ws, "conn-1") await manager.join_room("conn-1", "downloads") message = { "type": "download_complete", "data": { "item_id": "test-download-1", "serie_name": "Test Anime", "episode": {"season": 1, "episode": 1} } } await manager.broadcast_to_room(message, room="downloads") ws.send_json.assert_called_once() async def test_download_failed_notification( self, websocket_service ): """Test download failure notification via WebSocket.""" manager = websocket_service.manager ws = AsyncMock() ws.accept = AsyncMock() await manager.connect(ws, "conn-1") await manager.join_room("conn-1", "downloads") message = { "type": "download_failed", "data": { "item_id": "test-download-1", "error": "Network timeout", "retry_count": 2 } } await manager.broadcast_to_room(message, room="downloads") ws.send_json.assert_called_once() class TestQueueStatusBroadcasting: """Test queue status broadcasting via WebSocket.""" async def test_queue_status_update_broadcast( self, websocket_service ): """Test broadcasting queue status updates.""" manager = websocket_service.manager ws = AsyncMock() ws.accept = AsyncMock() await manager.connect(ws, "conn-1") await manager.join_room("conn-1", "queue") message = { "type": "queue_status", "data": { "pending_count": 5, "active_count": 2, "completed_count": 10, "failed_count": 1, "total_items": 18 } } await manager.broadcast_to_room(message, room="queue") ws.send_json.assert_called_once_with(message) async def test_queue_item_added_notification( self, websocket_service ): """Test notification when item is added to queue.""" manager = websocket_service.manager ws = AsyncMock() ws.accept = AsyncMock() await manager.connect(ws, "conn-1") await manager.join_room("conn-1", "queue") message = { "type": "queue_item_added", "data": { "item_id": "new-item-1", "serie_name": "New Series", "episode_count": 3, "priority": "normal" } } await manager.broadcast_to_room(message, room="queue") ws.send_json.assert_called_once() async def test_queue_item_removed_notification( self, websocket_service ): """Test notification when item is removed from queue.""" manager = websocket_service.manager ws = AsyncMock() ws.accept = AsyncMock() await manager.connect(ws, "conn-1") await manager.join_room("conn-1", "queue") message = { "type": "queue_item_removed", "data": { "item_id": "removed-item-1", "reason": "user_cancelled" } } await manager.broadcast_to_room(message, room="queue") ws.send_json.assert_called_once() class TestSystemMessaging: """Test system-wide messaging via WebSocket.""" async def test_system_notification_broadcast( self, websocket_service ): """Test broadcasting system notifications.""" manager = websocket_service.manager ws1 = AsyncMock() ws1.accept = AsyncMock() ws2 = AsyncMock() ws2.accept = AsyncMock() await manager.connect(ws1, "conn-1") await manager.connect(ws2, "conn-2") await manager.join_room("conn-1", "system") await manager.join_room("conn-2", "system") message = { "type": "system_notification", "data": { "level": "info", "message": "System maintenance scheduled", "timestamp": "2025-10-19T10:00:00" } } await manager.broadcast_to_room(message, room="system") ws1.send_json.assert_called_once() ws2.send_json.assert_called_once() async def test_error_message_broadcast( self, websocket_service ): """Test broadcasting error messages.""" manager = websocket_service.manager ws = AsyncMock() ws.accept = AsyncMock() await manager.connect(ws, "conn-1") await manager.join_room("conn-1", "errors") message = { "type": "error", "data": { "error_code": "DOWNLOAD_FAILED", "message": "Failed to download episode", "details": "Connection timeout" } } await manager.broadcast_to_room(message, room="errors") ws.send_json.assert_called_once() class TestConcurrentConnections: """Test handling of concurrent WebSocket connections.""" async def test_multiple_clients_in_same_room( self, websocket_service ): """Test multiple clients receiving broadcasts in same room.""" manager = websocket_service.manager clients = [] for i in range(5): ws = AsyncMock() ws.accept = AsyncMock() clients.append(ws) await manager.connect(ws, f"conn-{i}") await manager.join_room(f"conn-{i}", "shared-room") message = {"type": "test", "data": "multi-client test"} await manager.broadcast_to_room(message, room="shared-room") for client in clients: client.send_json.assert_called_once_with(message) async def test_concurrent_broadcasts_to_different_rooms( self, websocket_service ): """Test concurrent broadcasts to different rooms.""" manager = websocket_service.manager downloads_ws = AsyncMock() downloads_ws.accept = AsyncMock() queue_ws = AsyncMock() queue_ws.accept = AsyncMock() system_ws = AsyncMock() system_ws.accept = AsyncMock() await manager.connect(downloads_ws, "conn-1") await manager.connect(queue_ws, "conn-2") await manager.connect(system_ws, "conn-3") await manager.join_room("conn-1", "downloads") await manager.join_room("conn-2", "queue") await manager.join_room("conn-3", "system") await asyncio.gather( manager.broadcast_to_room( {"type": "download_progress"}, "downloads" ), manager.broadcast_to_room( {"type": "queue_update"}, "queue" ), manager.broadcast_to_room( {"type": "system_message"}, "system" ) ) downloads_ws.send_json.assert_called_once() queue_ws.send_json.assert_called_once() system_ws.send_json.assert_called_once() class TestConnectionErrorHandling: """Test error handling in WebSocket connections.""" async def test_handle_send_failure( self, websocket_service ): """Test handling of message send failures.""" manager = websocket_service.manager failing_ws = AsyncMock() failing_ws.accept = AsyncMock() failing_ws.send_json.side_effect = RuntimeError("Send failed") await manager.connect(failing_ws, "conn-1") message = {"type": "test", "data": "test"} try: await manager.broadcast_to_room(message, room="test") except RuntimeError: pytest.fail("Should handle send failure gracefully") async def test_handle_multiple_send_failures( self, websocket_service ): """Test handling multiple concurrent send failures.""" manager = websocket_service.manager failing_clients = [] for i in range(3): ws = AsyncMock() ws.accept = AsyncMock() ws.send_json.side_effect = RuntimeError(f"Failed {i}") failing_clients.append(ws) await manager.connect(ws, f"conn-{i}") await manager.join_room(f"conn-{i}", "test") working_ws = AsyncMock() working_ws.accept = AsyncMock() await manager.connect(working_ws, "conn-working") await manager.join_room("conn-working", "test") message = {"type": "test", "data": "test"} await manager.broadcast_to_room(message, room="test") working_ws.send_json.assert_called_once() async def test_cleanup_after_disconnect( self, websocket_service ): """Test proper cleanup after client disconnect.""" manager = websocket_service.manager ws = AsyncMock() ws.accept = AsyncMock() room = "test-room" connection_id = "test-conn" await manager.connect(ws, connection_id) await manager.join_room(connection_id, room) await manager.disconnect(connection_id) assert connection_id not in manager._active_connections if room in manager._rooms: assert connection_id not in manager._rooms[room] class TestMessageFormatting: """Test message formatting and validation.""" async def test_message_structure_validation( self, websocket_service ): """Test that messages have required structure.""" manager = websocket_service.manager ws = AsyncMock() ws.accept = AsyncMock() await manager.connect(ws, "conn-1") valid_message = { "type": "test_message", "data": {"key": "value"}, } await manager.broadcast(valid_message) ws.send_json.assert_called_once_with(valid_message) async def test_different_message_types( self, websocket_service ): """Test broadcasting different message types.""" manager = websocket_service.manager ws = AsyncMock() ws.accept = AsyncMock() await manager.connect(ws, "conn-1") message_types = [ "download_progress", "download_complete", "download_failed", "queue_status", "system_notification", "error" ] for msg_type in message_types: message = {"type": msg_type, "data": {}} await manager.broadcast(message) assert ws.send_json.call_count == len(message_types) class TestWebSocketServiceIntegration: """Test WebSocket service integration with other services.""" async def test_websocket_service_singleton(self): """Test that WebSocket service is a singleton.""" service1 = get_websocket_service() service2 = get_websocket_service() assert service1 is service2 async def test_service_has_connection_manager(self): """Test that service has connection manager.""" service = get_websocket_service() assert hasattr(service, 'manager') assert isinstance(service.manager, ConnectionManager) async def test_service_broadcast_methods_exist(self): """Test that service has required broadcast methods.""" service = get_websocket_service() required_methods = [ 'broadcast_download_progress', 'broadcast_download_complete', 'broadcast_download_failed', 'broadcast_queue_status', 'broadcast_system_message', 'send_error' ] for method in required_methods: assert hasattr(service, method) class TestRoomManagement: """Test room management functionality.""" async def test_room_creation_on_first_connection( self, websocket_service ): """Test that room is created when first client connects.""" manager = websocket_service.manager ws = AsyncMock() ws.accept = AsyncMock() room = "new-room" assert room not in manager._rooms await manager.connect(ws, "conn-1") await manager.join_room("conn-1", room) assert room in manager._rooms async def test_room_cleanup_when_empty( self, websocket_service ): """Test that empty rooms are cleaned up.""" manager = websocket_service.manager ws = AsyncMock() ws.accept = AsyncMock() room = "temp-room" connection_id = "conn-1" await manager.connect(ws, connection_id) await manager.join_room(connection_id, room) await manager.disconnect(connection_id) if room in manager._rooms: assert len(manager._rooms[room]) == 0 async def test_client_can_be_in_one_room( self, websocket_service ): """Test client room membership.""" manager = websocket_service.manager ws = AsyncMock() ws.accept = AsyncMock() await manager.connect(ws, "conn-1") await manager.join_room("conn-1", "room-1") assert "room-1" in manager._rooms assert "conn-1" in manager._rooms["room-1"] class TestCompleteWebSocketWorkflow: """Test complete WebSocket workflows.""" async def test_full_download_notification_workflow( self, websocket_service ): """Test complete workflow of download notifications.""" manager = websocket_service.manager ws = AsyncMock() ws.accept = AsyncMock() await manager.connect(ws, "conn-1") await manager.join_room("conn-1", "downloads") await manager.broadcast_to_room( {"type": "download_started", "data": {"item_id": "dl-1"}}, "downloads" ) for progress in [25, 50, 75]: await manager.broadcast_to_room( { "type": "download_progress", "data": {"item_id": "dl-1", "percentage": progress} }, "downloads" ) await manager.broadcast_to_room( {"type": "download_complete", "data": {"item_id": "dl-1"}}, "downloads" ) assert ws.send_json.call_count == 5 async def test_multi_room_workflow( self, websocket_service ): """Test workflow involving multiple rooms.""" manager = websocket_service.manager download_ws = AsyncMock() download_ws.accept = AsyncMock() queue_ws = AsyncMock() queue_ws.accept = AsyncMock() system_ws = AsyncMock() system_ws.accept = AsyncMock() await manager.connect(download_ws, "conn-1") await manager.connect(queue_ws, "conn-2") await manager.connect(system_ws, "conn-3") await manager.join_room("conn-1", "downloads") await manager.join_room("conn-2", "queue") await manager.join_room("conn-3", "system") await manager.broadcast_to_room( {"type": "download_update"}, "downloads" ) await manager.broadcast_to_room( {"type": "queue_update"}, "queue" ) await manager.broadcast_to_room( {"type": "system_update"}, "system" ) download_ws.send_json.assert_called_once() queue_ws.send_json.assert_called_once() system_ws.send_json.assert_called_once()