Aniworld/tests/integration/test_websocket.py
2025-10-19 20:27:30 +02:00

793 lines
25 KiB
Python

"""Integration tests for WebSocket functionality.
This module tests the complete WebSocket integration including:
- WebSocket connection establishment and authentication
- Real-time message broadcasting
- Room-based messaging
- Connection lifecycle management
- Integration with download and progress services
- Error handling and reconnection
- Concurrent client management
"""
import asyncio
import uuid
from unittest.mock import AsyncMock
import pytest
from httpx import ASGITransport, AsyncClient
from src.server.fastapi_app import app
from src.server.services.auth_service import auth_service
from src.server.services.websocket_service import (
ConnectionManager,
get_websocket_service,
)
@pytest.fixture(autouse=True)
def reset_auth():
"""Reset authentication state before each test."""
original_hash = auth_service._hash
auth_service._hash = None
auth_service._failed.clear()
yield
auth_service._hash = original_hash
auth_service._failed.clear()
@pytest.fixture
async def client():
"""Create an async test client."""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
@pytest.fixture
async def auth_token(client):
"""Get a valid authentication token."""
password = "WebSocketTestPassword123!"
await client.post(
"/api/auth/setup",
json={"master_password": password}
)
response = await client.post(
"/api/auth/login",
json={"password": password}
)
return response.json()["access_token"]
@pytest.fixture
def websocket_service():
"""Get the WebSocket service instance."""
return get_websocket_service()
@pytest.fixture
def mock_websocket():
"""Create a mock WebSocket connection."""
ws = AsyncMock()
ws.send_text = AsyncMock()
ws.send_json = AsyncMock()
ws.receive_text = AsyncMock()
ws.accept = AsyncMock()
ws.close = AsyncMock()
return ws
class TestWebSocketConnection:
"""Test WebSocket connection establishment and lifecycle."""
async def test_websocket_endpoint_exists(self, client, auth_token):
"""Test that WebSocket endpoint is available."""
routes = [route.path for route in app.routes]
websocket_routes = [
path for path in routes if "ws" in path or "websocket" in path
]
assert len(websocket_routes) > 0
async def test_connection_manager_tracks_connections(
self, websocket_service, mock_websocket
):
"""Test that connection manager tracks active connections."""
manager = websocket_service.manager
connection_id = "test-conn-1"
initial_count = await manager.get_connection_count()
mock_websocket.accept = AsyncMock()
await manager.connect(mock_websocket, connection_id)
assert await manager.get_connection_count() == initial_count + 1
assert connection_id in manager._active_connections
async def test_disconnect_removes_connection(
self, websocket_service, mock_websocket
):
"""Test that disconnecting removes connection from manager."""
manager = websocket_service.manager
connection_id = "test-conn-2"
mock_websocket.accept = AsyncMock()
await manager.connect(mock_websocket, connection_id)
assert connection_id in manager._active_connections
await manager.disconnect(connection_id)
assert connection_id not in manager._active_connections
async def test_room_assignment_on_connection(
self, websocket_service, mock_websocket
):
"""Test that connections can join rooms."""
manager = websocket_service.manager
connection_id = "test-conn-3"
room = "test-room-1"
mock_websocket.accept = AsyncMock()
await manager.connect(mock_websocket, connection_id)
await manager.join_room(connection_id, room)
assert room in manager._rooms
assert connection_id in manager._rooms[room]
async def test_multiple_rooms_support(
self, websocket_service
):
"""Test that multiple rooms can exist simultaneously."""
manager = websocket_service.manager
ws1 = AsyncMock()
ws1.accept = AsyncMock()
ws2 = AsyncMock()
ws2.accept = AsyncMock()
ws3 = AsyncMock()
ws3.accept = AsyncMock()
await manager.connect(ws1, "conn-1")
await manager.connect(ws2, "conn-2")
await manager.connect(ws3, "conn-3")
await manager.join_room("conn-1", "room-1")
await manager.join_room("conn-2", "room-2")
await manager.join_room("conn-3", "room-1")
assert "room-1" in manager._rooms
assert "room-2" in manager._rooms
assert len(manager._rooms["room-1"]) == 2
assert len(manager._rooms["room-2"]) == 1
class TestMessageBroadcasting:
"""Test message broadcasting functionality."""
async def test_broadcast_to_all_connections(
self, websocket_service
):
"""Test broadcasting message to all connected clients."""
manager = websocket_service.manager
ws1 = AsyncMock()
ws1.accept = AsyncMock()
ws2 = AsyncMock()
ws2.accept = AsyncMock()
ws3 = AsyncMock()
ws3.accept = AsyncMock()
await manager.connect(ws1, "conn-1")
await manager.connect(ws2, "conn-2")
await manager.connect(ws3, "conn-3")
message = {"type": "test", "data": "broadcast to all"}
await manager.broadcast(message)
ws1.send_json.assert_called_once()
ws2.send_json.assert_called_once()
ws3.send_json.assert_called_once()
async def test_broadcast_to_specific_room(
self, websocket_service
):
"""Test broadcasting message to specific room only."""
manager = websocket_service.manager
ws1 = AsyncMock()
ws1.accept = AsyncMock()
ws2 = AsyncMock()
ws2.accept = AsyncMock()
ws3 = AsyncMock()
ws3.accept = AsyncMock()
await manager.connect(ws1, "conn-1")
await manager.connect(ws2, "conn-2")
await manager.connect(ws3, "conn-3")
await manager.join_room("conn-1", "downloads")
await manager.join_room("conn-2", "downloads")
await manager.join_room("conn-3", "system")
message = {"type": "download_progress", "data": {}}
await manager.broadcast_to_room(message, room="downloads")
assert ws1.send_json.call_count == 1
assert ws2.send_json.call_count == 1
assert ws3.send_json.call_count == 0
async def test_broadcast_with_json_message(
self, websocket_service
):
"""Test broadcasting JSON-formatted messages."""
manager = websocket_service.manager
ws = AsyncMock()
ws.accept = AsyncMock()
await manager.connect(ws, "conn-1")
message = {
"type": "queue_update",
"data": {
"pending": 5,
"active": 2,
"completed": 10
},
"timestamp": "2025-10-19T10:00:00"
}
await manager.broadcast(message)
ws.send_json.assert_called_once_with(message)
async def test_broadcast_handles_disconnected_clients(
self, websocket_service
):
"""Test that broadcasting handles disconnected clients gracefully."""
manager = websocket_service.manager
failing_ws = AsyncMock()
failing_ws.accept = AsyncMock()
failing_ws.send_json.side_effect = RuntimeError("Connection closed")
working_ws = AsyncMock()
working_ws.accept = AsyncMock()
await manager.connect(failing_ws, "conn-1")
await manager.connect(working_ws, "conn-2")
message = {"type": "test", "data": "test"}
await manager.broadcast(message)
working_ws.send_json.assert_called_once()
class TestProgressIntegration:
"""Test integration with progress service."""
async def test_download_progress_broadcasts_to_websocket(
self, websocket_service
):
"""Test that download progress updates broadcast via WebSocket."""
manager = websocket_service.manager
ws = AsyncMock()
ws.accept = AsyncMock()
await manager.connect(ws, "conn-1")
await manager.join_room("conn-1", "downloads")
message = {
"type": "download_progress",
"data": {
"item_id": "test-download-1",
"percentage": 45.5,
"current_mb": 45.5,
"total_mb": 100.0,
"speed_mbps": 2.5
}
}
await manager.broadcast_to_room(message, room="downloads")
ws.send_json.assert_called_once_with(message)
async def test_download_complete_notification(
self, websocket_service
):
"""Test download completion notification via WebSocket."""
manager = websocket_service.manager
ws = AsyncMock()
ws.accept = AsyncMock()
await manager.connect(ws, "conn-1")
await manager.join_room("conn-1", "downloads")
message = {
"type": "download_complete",
"data": {
"item_id": "test-download-1",
"serie_name": "Test Anime",
"episode": {"season": 1, "episode": 1}
}
}
await manager.broadcast_to_room(message, room="downloads")
ws.send_json.assert_called_once()
async def test_download_failed_notification(
self, websocket_service
):
"""Test download failure notification via WebSocket."""
manager = websocket_service.manager
ws = AsyncMock()
ws.accept = AsyncMock()
await manager.connect(ws, "conn-1")
await manager.join_room("conn-1", "downloads")
message = {
"type": "download_failed",
"data": {
"item_id": "test-download-1",
"error": "Network timeout",
"retry_count": 2
}
}
await manager.broadcast_to_room(message, room="downloads")
ws.send_json.assert_called_once()
class TestQueueStatusBroadcasting:
"""Test queue status broadcasting via WebSocket."""
async def test_queue_status_update_broadcast(
self, websocket_service
):
"""Test broadcasting queue status updates."""
manager = websocket_service.manager
ws = AsyncMock()
ws.accept = AsyncMock()
await manager.connect(ws, "conn-1")
await manager.join_room("conn-1", "queue")
message = {
"type": "queue_status",
"data": {
"pending_count": 5,
"active_count": 2,
"completed_count": 10,
"failed_count": 1,
"total_items": 18
}
}
await manager.broadcast_to_room(message, room="queue")
ws.send_json.assert_called_once_with(message)
async def test_queue_item_added_notification(
self, websocket_service
):
"""Test notification when item is added to queue."""
manager = websocket_service.manager
ws = AsyncMock()
ws.accept = AsyncMock()
await manager.connect(ws, "conn-1")
await manager.join_room("conn-1", "queue")
message = {
"type": "queue_item_added",
"data": {
"item_id": "new-item-1",
"serie_name": "New Series",
"episode_count": 3,
"priority": "normal"
}
}
await manager.broadcast_to_room(message, room="queue")
ws.send_json.assert_called_once()
async def test_queue_item_removed_notification(
self, websocket_service
):
"""Test notification when item is removed from queue."""
manager = websocket_service.manager
ws = AsyncMock()
ws.accept = AsyncMock()
await manager.connect(ws, "conn-1")
await manager.join_room("conn-1", "queue")
message = {
"type": "queue_item_removed",
"data": {
"item_id": "removed-item-1",
"reason": "user_cancelled"
}
}
await manager.broadcast_to_room(message, room="queue")
ws.send_json.assert_called_once()
class TestSystemMessaging:
"""Test system-wide messaging via WebSocket."""
async def test_system_notification_broadcast(
self, websocket_service
):
"""Test broadcasting system notifications."""
manager = websocket_service.manager
ws1 = AsyncMock()
ws1.accept = AsyncMock()
ws2 = AsyncMock()
ws2.accept = AsyncMock()
await manager.connect(ws1, "conn-1")
await manager.connect(ws2, "conn-2")
await manager.join_room("conn-1", "system")
await manager.join_room("conn-2", "system")
message = {
"type": "system_notification",
"data": {
"level": "info",
"message": "System maintenance scheduled",
"timestamp": "2025-10-19T10:00:00"
}
}
await manager.broadcast_to_room(message, room="system")
ws1.send_json.assert_called_once()
ws2.send_json.assert_called_once()
async def test_error_message_broadcast(
self, websocket_service
):
"""Test broadcasting error messages."""
manager = websocket_service.manager
ws = AsyncMock()
ws.accept = AsyncMock()
await manager.connect(ws, "conn-1")
await manager.join_room("conn-1", "errors")
message = {
"type": "error",
"data": {
"error_code": "DOWNLOAD_FAILED",
"message": "Failed to download episode",
"details": "Connection timeout"
}
}
await manager.broadcast_to_room(message, room="errors")
ws.send_json.assert_called_once()
class TestConcurrentConnections:
"""Test handling of concurrent WebSocket connections."""
async def test_multiple_clients_in_same_room(
self, websocket_service
):
"""Test multiple clients receiving broadcasts in same room."""
manager = websocket_service.manager
clients = []
for i in range(5):
ws = AsyncMock()
ws.accept = AsyncMock()
clients.append(ws)
await manager.connect(ws, f"conn-{i}")
await manager.join_room(f"conn-{i}", "shared-room")
message = {"type": "test", "data": "multi-client test"}
await manager.broadcast_to_room(message, room="shared-room")
for client in clients:
client.send_json.assert_called_once_with(message)
async def test_concurrent_broadcasts_to_different_rooms(
self, websocket_service
):
"""Test concurrent broadcasts to different rooms."""
manager = websocket_service.manager
downloads_ws = AsyncMock()
downloads_ws.accept = AsyncMock()
queue_ws = AsyncMock()
queue_ws.accept = AsyncMock()
system_ws = AsyncMock()
system_ws.accept = AsyncMock()
await manager.connect(downloads_ws, "conn-1")
await manager.connect(queue_ws, "conn-2")
await manager.connect(system_ws, "conn-3")
await manager.join_room("conn-1", "downloads")
await manager.join_room("conn-2", "queue")
await manager.join_room("conn-3", "system")
await asyncio.gather(
manager.broadcast_to_room(
{"type": "download_progress"}, "downloads"
),
manager.broadcast_to_room(
{"type": "queue_update"}, "queue"
),
manager.broadcast_to_room(
{"type": "system_message"}, "system"
)
)
downloads_ws.send_json.assert_called_once()
queue_ws.send_json.assert_called_once()
system_ws.send_json.assert_called_once()
class TestConnectionErrorHandling:
"""Test error handling in WebSocket connections."""
async def test_handle_send_failure(
self, websocket_service
):
"""Test handling of message send failures."""
manager = websocket_service.manager
failing_ws = AsyncMock()
failing_ws.accept = AsyncMock()
failing_ws.send_json.side_effect = RuntimeError("Send failed")
await manager.connect(failing_ws, "conn-1")
message = {"type": "test", "data": "test"}
try:
await manager.broadcast_to_room(message, room="test")
except RuntimeError:
pytest.fail("Should handle send failure gracefully")
async def test_handle_multiple_send_failures(
self, websocket_service
):
"""Test handling multiple concurrent send failures."""
manager = websocket_service.manager
failing_clients = []
for i in range(3):
ws = AsyncMock()
ws.accept = AsyncMock()
ws.send_json.side_effect = RuntimeError(f"Failed {i}")
failing_clients.append(ws)
await manager.connect(ws, f"conn-{i}")
await manager.join_room(f"conn-{i}", "test")
working_ws = AsyncMock()
working_ws.accept = AsyncMock()
await manager.connect(working_ws, "conn-working")
await manager.join_room("conn-working", "test")
message = {"type": "test", "data": "test"}
await manager.broadcast_to_room(message, room="test")
working_ws.send_json.assert_called_once()
async def test_cleanup_after_disconnect(
self, websocket_service
):
"""Test proper cleanup after client disconnect."""
manager = websocket_service.manager
ws = AsyncMock()
ws.accept = AsyncMock()
room = "test-room"
connection_id = "test-conn"
await manager.connect(ws, connection_id)
await manager.join_room(connection_id, room)
await manager.disconnect(connection_id)
assert connection_id not in manager._active_connections
if room in manager._rooms:
assert connection_id not in manager._rooms[room]
class TestMessageFormatting:
"""Test message formatting and validation."""
async def test_message_structure_validation(
self, websocket_service
):
"""Test that messages have required structure."""
manager = websocket_service.manager
ws = AsyncMock()
ws.accept = AsyncMock()
await manager.connect(ws, "conn-1")
valid_message = {
"type": "test_message",
"data": {"key": "value"},
}
await manager.broadcast(valid_message)
ws.send_json.assert_called_once_with(valid_message)
async def test_different_message_types(
self, websocket_service
):
"""Test broadcasting different message types."""
manager = websocket_service.manager
ws = AsyncMock()
ws.accept = AsyncMock()
await manager.connect(ws, "conn-1")
message_types = [
"download_progress",
"download_complete",
"download_failed",
"queue_status",
"system_notification",
"error"
]
for msg_type in message_types:
message = {"type": msg_type, "data": {}}
await manager.broadcast(message)
assert ws.send_json.call_count == len(message_types)
class TestWebSocketServiceIntegration:
"""Test WebSocket service integration with other services."""
async def test_websocket_service_singleton(self):
"""Test that WebSocket service is a singleton."""
service1 = get_websocket_service()
service2 = get_websocket_service()
assert service1 is service2
async def test_service_has_connection_manager(self):
"""Test that service has connection manager."""
service = get_websocket_service()
assert hasattr(service, 'manager')
assert isinstance(service.manager, ConnectionManager)
async def test_service_broadcast_methods_exist(self):
"""Test that service has required broadcast methods."""
service = get_websocket_service()
required_methods = [
'broadcast_download_progress',
'broadcast_download_complete',
'broadcast_download_failed',
'broadcast_queue_status',
'broadcast_system_message',
'send_error'
]
for method in required_methods:
assert hasattr(service, method)
class TestRoomManagement:
"""Test room management functionality."""
async def test_room_creation_on_first_connection(
self, websocket_service
):
"""Test that room is created when first client connects."""
manager = websocket_service.manager
ws = AsyncMock()
ws.accept = AsyncMock()
room = "new-room"
assert room not in manager._rooms
await manager.connect(ws, "conn-1")
await manager.join_room("conn-1", room)
assert room in manager._rooms
async def test_room_cleanup_when_empty(
self, websocket_service
):
"""Test that empty rooms are cleaned up."""
manager = websocket_service.manager
ws = AsyncMock()
ws.accept = AsyncMock()
room = "temp-room"
connection_id = "conn-1"
await manager.connect(ws, connection_id)
await manager.join_room(connection_id, room)
await manager.disconnect(connection_id)
if room in manager._rooms:
assert len(manager._rooms[room]) == 0
async def test_client_can_be_in_one_room(
self, websocket_service
):
"""Test client room membership."""
manager = websocket_service.manager
ws = AsyncMock()
ws.accept = AsyncMock()
await manager.connect(ws, "conn-1")
await manager.join_room("conn-1", "room-1")
assert "room-1" in manager._rooms
assert "conn-1" in manager._rooms["room-1"]
class TestCompleteWebSocketWorkflow:
"""Test complete WebSocket workflows."""
async def test_full_download_notification_workflow(
self, websocket_service
):
"""Test complete workflow of download notifications."""
manager = websocket_service.manager
ws = AsyncMock()
ws.accept = AsyncMock()
await manager.connect(ws, "conn-1")
await manager.join_room("conn-1", "downloads")
await manager.broadcast_to_room(
{"type": "download_started", "data": {"item_id": "dl-1"}},
"downloads"
)
for progress in [25, 50, 75]:
await manager.broadcast_to_room(
{
"type": "download_progress",
"data": {"item_id": "dl-1", "percentage": progress}
},
"downloads"
)
await manager.broadcast_to_room(
{"type": "download_complete", "data": {"item_id": "dl-1"}},
"downloads"
)
assert ws.send_json.call_count == 5
async def test_multi_room_workflow(
self, websocket_service
):
"""Test workflow involving multiple rooms."""
manager = websocket_service.manager
download_ws = AsyncMock()
download_ws.accept = AsyncMock()
queue_ws = AsyncMock()
queue_ws.accept = AsyncMock()
system_ws = AsyncMock()
system_ws.accept = AsyncMock()
await manager.connect(download_ws, "conn-1")
await manager.connect(queue_ws, "conn-2")
await manager.connect(system_ws, "conn-3")
await manager.join_room("conn-1", "downloads")
await manager.join_room("conn-2", "queue")
await manager.join_room("conn-3", "system")
await manager.broadcast_to_room(
{"type": "download_update"}, "downloads"
)
await manager.broadcast_to_room(
{"type": "queue_update"}, "queue"
)
await manager.broadcast_to_room(
{"type": "system_update"}, "system"
)
download_ws.send_json.assert_called_once()
queue_ws.send_json.assert_called_once()
system_ws.send_json.assert_called_once()