Aniworld/tests/integration/test_websocket.py

766 lines
24 KiB
Python

"""Integration tests for WebSocket functionality.
This module tests the complete WebSocket integration including:
- WebSocket connection establishment and authentication
- Real-time message broadcasting
- Room-based messaging
- Connection lifecycle management
- Integration with download and progress services
- Error handling and reconnection
- Concurrent client management
"""
import asyncio
import json
from typing import Any, Dict, List
from unittest.mock import AsyncMock, Mock, patch
import pytest
from httpx import ASGITransport, AsyncClient
from starlette.websockets import WebSocketDisconnect
from src.server.fastapi_app import app
from src.server.models.download import DownloadPriority, DownloadStatus
from src.server.services.auth_service import auth_service
from src.server.services.progress_service import ProgressType
from src.server.services.websocket_service import (
ConnectionManager,
get_websocket_service,
)
@pytest.fixture(autouse=True)
def reset_auth():
"""Reset authentication state before each test."""
original_hash = auth_service._hash
auth_service._hash = None
auth_service._failed.clear()
yield
auth_service._hash = original_hash
auth_service._failed.clear()
@pytest.fixture
async def client():
"""Create an async test client."""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
@pytest.fixture
async def auth_token(client):
"""Get a valid authentication token."""
password = "WebSocketTestPassword123!"
await client.post(
"/api/auth/setup",
json={"master_password": password}
)
response = await client.post(
"/api/auth/login",
json={"password": password}
)
return response.json()["access_token"]
@pytest.fixture
def websocket_service():
"""Get the WebSocket service instance."""
return get_websocket_service()
@pytest.fixture
def mock_websocket():
"""Create a mock WebSocket connection."""
ws = AsyncMock()
ws.send_text = AsyncMock()
ws.send_json = AsyncMock()
ws.receive_text = AsyncMock()
ws.accept = AsyncMock()
ws.close = AsyncMock()
return ws
class TestWebSocketConnection:
"""Test WebSocket connection establishment and lifecycle."""
async def test_websocket_endpoint_exists(self, client, auth_token):
"""Test that WebSocket endpoint is available."""
# This test verifies the endpoint exists
# Full WebSocket testing requires WebSocket client
# Verify the WebSocket route is registered
routes = [route.path for route in app.routes]
websocket_routes = [
path for path in routes if "ws" in path or "websocket" in path
]
assert len(websocket_routes) > 0
async def test_connection_manager_tracks_connections(
self, websocket_service, mock_websocket
):
"""Test that connection manager tracks active connections."""
manager = websocket_service.manager
# Initially no connections
initial_count = len(manager.active_connections)
# Add a connection
await manager.connect(mock_websocket, room="test-room")
assert len(manager.active_connections) == initial_count + 1
assert mock_websocket in manager.active_connections
async def test_disconnect_removes_connection(
self, websocket_service, mock_websocket
):
"""Test that disconnecting removes connection from manager."""
manager = websocket_service.manager
# Connect
await manager.connect(mock_websocket, room="test-room")
assert mock_websocket in manager.active_connections
# Disconnect
manager.disconnect(mock_websocket)
assert mock_websocket not in manager.active_connections
async def test_room_assignment_on_connection(
self, websocket_service, mock_websocket
):
"""Test that connections are assigned to rooms."""
manager = websocket_service.manager
room = "test-room-1"
await manager.connect(mock_websocket, room=room)
# Verify connection is in the room
assert room in manager._rooms
assert mock_websocket in manager._rooms[room]
async def test_multiple_rooms_support(
self, websocket_service
):
"""Test that multiple rooms can exist simultaneously."""
manager = websocket_service.manager
ws1 = AsyncMock()
ws2 = AsyncMock()
ws3 = AsyncMock()
# Connect to different rooms
await manager.connect(ws1, room="room-1")
await manager.connect(ws2, room="room-2")
await manager.connect(ws3, room="room-1")
# Verify room structure
assert "room-1" in manager._rooms
assert "room-2" in manager._rooms
assert len(manager._rooms["room-1"]) == 2
assert len(manager._rooms["room-2"]) == 1
class TestMessageBroadcasting:
"""Test message broadcasting functionality."""
async def test_broadcast_to_all_connections(
self, websocket_service
):
"""Test broadcasting message to all connected clients."""
manager = websocket_service.manager
# Create mock connections
ws1 = AsyncMock()
ws2 = AsyncMock()
ws3 = AsyncMock()
await manager.connect(ws1, room="room-1")
await manager.connect(ws2, room="room-1")
await manager.connect(ws3, room="room-2")
# Broadcast to all
message = {"type": "test", "data": "broadcast to all"}
await manager.broadcast(message)
# All connections should receive message
ws1.send_json.assert_called_once()
ws2.send_json.assert_called_once()
ws3.send_json.assert_called_once()
async def test_broadcast_to_specific_room(
self, websocket_service
):
"""Test broadcasting message to specific room only."""
manager = websocket_service.manager
ws1 = AsyncMock()
ws2 = AsyncMock()
ws3 = AsyncMock()
await manager.connect(ws1, room="downloads")
await manager.connect(ws2, room="downloads")
await manager.connect(ws3, room="system")
# Broadcast to specific room
message = {"type": "download_progress", "data": {}}
await manager.broadcast_to_room(message, room="downloads")
# Only room members should receive
assert ws1.send_json.call_count == 1
assert ws2.send_json.call_count == 1
assert ws3.send_json.call_count == 0
async def test_broadcast_with_json_message(
self, websocket_service
):
"""Test broadcasting JSON-formatted messages."""
manager = websocket_service.manager
ws = AsyncMock()
await manager.connect(ws, room="test")
message = {
"type": "queue_update",
"data": {
"pending": 5,
"active": 2,
"completed": 10
},
"timestamp": "2025-10-19T10:00:00"
}
await manager.broadcast(message)
ws.send_json.assert_called_once_with(message)
async def test_broadcast_handles_disconnected_clients(
self, websocket_service
):
"""Test that broadcasting handles disconnected clients gracefully."""
manager = websocket_service.manager
# Mock connection that will fail
failing_ws = AsyncMock()
failing_ws.send_json.side_effect = RuntimeError("Connection closed")
working_ws = AsyncMock()
await manager.connect(failing_ws, room="test")
await manager.connect(working_ws, room="test")
# Broadcast should handle failure
message = {"type": "test", "data": "test"}
await manager.broadcast(message)
# Working connection should still receive
working_ws.send_json.assert_called_once()
class TestProgressIntegration:
"""Test integration with progress service."""
async def test_download_progress_broadcasts_to_websocket(
self, websocket_service
):
"""Test that download progress updates broadcast via WebSocket."""
manager = websocket_service.manager
ws = AsyncMock()
await manager.connect(ws, room="downloads")
# Simulate progress update broadcast
message = {
"type": "download_progress",
"data": {
"item_id": "test-download-1",
"percentage": 45.5,
"current_mb": 45.5,
"total_mb": 100.0,
"speed_mbps": 2.5
}
}
await manager.broadcast_to_room(message, room="downloads")
ws.send_json.assert_called_once_with(message)
async def test_download_complete_notification(
self, websocket_service
):
"""Test download completion notification via WebSocket."""
manager = websocket_service.manager
ws = AsyncMock()
await manager.connect(ws, room="downloads")
message = {
"type": "download_complete",
"data": {
"item_id": "test-download-1",
"serie_name": "Test Anime",
"episode": {"season": 1, "episode": 1}
}
}
await manager.broadcast_to_room(message, room="downloads")
ws.send_json.assert_called_once()
async def test_download_failed_notification(
self, websocket_service
):
"""Test download failure notification via WebSocket."""
manager = websocket_service.manager
ws = AsyncMock()
await manager.connect(ws, room="downloads")
message = {
"type": "download_failed",
"data": {
"item_id": "test-download-1",
"error": "Network timeout",
"retry_count": 2
}
}
await manager.broadcast_to_room(message, room="downloads")
ws.send_json.assert_called_once()
class TestQueueStatusBroadcasting:
"""Test queue status broadcasting via WebSocket."""
async def test_queue_status_update_broadcast(
self, websocket_service
):
"""Test broadcasting queue status updates."""
manager = websocket_service.manager
ws = AsyncMock()
await manager.connect(ws, room="queue")
message = {
"type": "queue_status",
"data": {
"pending_count": 5,
"active_count": 2,
"completed_count": 10,
"failed_count": 1,
"total_items": 18
}
}
await manager.broadcast_to_room(message, room="queue")
ws.send_json.assert_called_once_with(message)
async def test_queue_item_added_notification(
self, websocket_service
):
"""Test notification when item is added to queue."""
manager = websocket_service.manager
ws = AsyncMock()
await manager.connect(ws, room="queue")
message = {
"type": "queue_item_added",
"data": {
"item_id": "new-item-1",
"serie_name": "New Series",
"episode_count": 3,
"priority": "normal"
}
}
await manager.broadcast_to_room(message, room="queue")
ws.send_json.assert_called_once()
async def test_queue_item_removed_notification(
self, websocket_service
):
"""Test notification when item is removed from queue."""
manager = websocket_service.manager
ws = AsyncMock()
await manager.connect(ws, room="queue")
message = {
"type": "queue_item_removed",
"data": {
"item_id": "removed-item-1",
"reason": "user_cancelled"
}
}
await manager.broadcast_to_room(message, room="queue")
ws.send_json.assert_called_once()
class TestSystemMessaging:
"""Test system-wide messaging via WebSocket."""
async def test_system_notification_broadcast(
self, websocket_service
):
"""Test broadcasting system notifications."""
manager = websocket_service.manager
ws1 = AsyncMock()
ws2 = AsyncMock()
await manager.connect(ws1, room="system")
await manager.connect(ws2, room="system")
message = {
"type": "system_notification",
"data": {
"level": "info",
"message": "System maintenance scheduled",
"timestamp": "2025-10-19T10:00:00"
}
}
await manager.broadcast_to_room(message, room="system")
ws1.send_json.assert_called_once()
ws2.send_json.assert_called_once()
async def test_error_message_broadcast(
self, websocket_service
):
"""Test broadcasting error messages."""
manager = websocket_service.manager
ws = AsyncMock()
await manager.connect(ws, room="errors")
message = {
"type": "error",
"data": {
"error_code": "DOWNLOAD_FAILED",
"message": "Failed to download episode",
"details": "Connection timeout"
}
}
await manager.broadcast_to_room(message, room="errors")
ws.send_json.assert_called_once()
class TestConcurrentConnections:
"""Test handling of concurrent WebSocket connections."""
async def test_multiple_clients_in_same_room(
self, websocket_service
):
"""Test multiple clients receiving broadcasts in same room."""
manager = websocket_service.manager
# Create multiple connections
clients = [AsyncMock() for _ in range(5)]
for client in clients:
await manager.connect(client, room="shared-room")
# Broadcast to all
message = {"type": "test", "data": "multi-client test"}
await manager.broadcast_to_room(message, room="shared-room")
# All clients should receive
for client in clients:
client.send_json.assert_called_once_with(message)
async def test_concurrent_broadcasts_to_different_rooms(
self, websocket_service
):
"""Test concurrent broadcasts to different rooms."""
manager = websocket_service.manager
# Setup rooms with clients
downloads_ws = AsyncMock()
queue_ws = AsyncMock()
system_ws = AsyncMock()
await manager.connect(downloads_ws, room="downloads")
await manager.connect(queue_ws, room="queue")
await manager.connect(system_ws, room="system")
# Concurrent broadcasts
await asyncio.gather(
manager.broadcast_to_room(
{"type": "download_progress"}, "downloads"
),
manager.broadcast_to_room(
{"type": "queue_update"}, "queue"
),
manager.broadcast_to_room(
{"type": "system_message"}, "system"
)
)
# Each client should receive only their room's message
downloads_ws.send_json.assert_called_once()
queue_ws.send_json.assert_called_once()
system_ws.send_json.assert_called_once()
class TestConnectionErrorHandling:
"""Test error handling in WebSocket connections."""
async def test_handle_send_failure(
self, websocket_service
):
"""Test handling of message send failures."""
manager = websocket_service.manager
# Connection that will fail on send
failing_ws = AsyncMock()
failing_ws.send_json.side_effect = RuntimeError("Send failed")
await manager.connect(failing_ws, room="test")
# Should handle error gracefully
message = {"type": "test", "data": "test"}
try:
await manager.broadcast_to_room(message, room="test")
except RuntimeError:
pytest.fail("Should handle send failure gracefully")
async def test_handle_multiple_send_failures(
self, websocket_service
):
"""Test handling multiple concurrent send failures."""
manager = websocket_service.manager
# Multiple failing connections
failing_clients = []
for i in range(3):
ws = AsyncMock()
ws.send_json.side_effect = RuntimeError(f"Failed {i}")
failing_clients.append(ws)
await manager.connect(ws, room="test")
# Add one working connection
working_ws = AsyncMock()
await manager.connect(working_ws, room="test")
# Broadcast should continue despite failures
message = {"type": "test", "data": "test"}
await manager.broadcast_to_room(message, room="test")
# Working connection should still receive
working_ws.send_json.assert_called_once()
async def test_cleanup_after_disconnect(
self, websocket_service
):
"""Test proper cleanup after client disconnect."""
manager = websocket_service.manager
ws = AsyncMock()
room = "test-room"
# Connect and then disconnect
await manager.connect(ws, room=room)
manager.disconnect(ws)
# Verify cleanup
assert ws not in manager.active_connections
if room in manager._rooms:
assert ws not in manager._rooms[room]
class TestMessageFormatting:
"""Test message formatting and validation."""
async def test_message_structure_validation(
self, websocket_service
):
"""Test that messages have required structure."""
manager = websocket_service.manager
ws = AsyncMock()
await manager.connect(ws, room="test")
# Valid message structure
valid_message = {
"type": "test_message",
"data": {"key": "value"},
}
await manager.broadcast(valid_message)
ws.send_json.assert_called_once_with(valid_message)
async def test_different_message_types(
self, websocket_service
):
"""Test broadcasting different message types."""
manager = websocket_service.manager
ws = AsyncMock()
await manager.connect(ws, room="test")
message_types = [
"download_progress",
"download_complete",
"download_failed",
"queue_status",
"system_notification",
"error"
]
for msg_type in message_types:
message = {"type": msg_type, "data": {}}
await manager.broadcast(message)
# Should have received all message types
assert ws.send_json.call_count == len(message_types)
class TestWebSocketServiceIntegration:
"""Test WebSocket service integration with other services."""
async def test_websocket_service_singleton(self):
"""Test that WebSocket service is a singleton."""
service1 = get_websocket_service()
service2 = get_websocket_service()
assert service1 is service2
async def test_service_has_connection_manager(self):
"""Test that service has connection manager."""
service = get_websocket_service()
assert hasattr(service, 'manager')
assert isinstance(service.manager, ConnectionManager)
async def test_service_broadcast_methods_exist(self):
"""Test that service has required broadcast methods."""
service = get_websocket_service()
required_methods = [
'broadcast_download_progress',
'broadcast_download_complete',
'broadcast_download_failed',
'broadcast_queue_status',
'broadcast_system_message',
'send_error'
]
for method in required_methods:
assert hasattr(service, method)
class TestRoomManagement:
"""Test room management functionality."""
async def test_room_creation_on_first_connection(
self, websocket_service
):
"""Test that room is created when first client connects."""
manager = websocket_service.manager
ws = AsyncMock()
room = "new-room"
# Room should not exist initially
assert room not in manager._rooms
# Connect to room
await manager.connect(ws, room=room)
# Room should now exist
assert room in manager._rooms
async def test_room_cleanup_when_empty(
self, websocket_service
):
"""Test that empty rooms are cleaned up."""
manager = websocket_service.manager
ws = AsyncMock()
room = "temp-room"
# Connect and disconnect
await manager.connect(ws, room=room)
manager.disconnect(ws)
# Room should be cleaned up if empty
# (Implementation may vary)
if room in manager._rooms:
assert len(manager._rooms[room]) == 0
async def test_client_can_be_in_one_room(
self, websocket_service
):
"""Test client room membership."""
manager = websocket_service.manager
ws = AsyncMock()
# Connect to room
await manager.connect(ws, room="room-1")
# Verify in room
assert "room-1" in manager._rooms
assert ws in manager._rooms["room-1"]
class TestCompleteWebSocketWorkflow:
"""Test complete WebSocket workflows."""
async def test_full_download_notification_workflow(
self, websocket_service
):
"""Test complete workflow of download notifications."""
manager = websocket_service.manager
ws = AsyncMock()
await manager.connect(ws, room="downloads")
# Simulate download lifecycle
# 1. Download started
await manager.broadcast_to_room(
{"type": "download_started", "data": {"item_id": "dl-1"}},
"downloads"
)
# 2. Progress updates
for progress in [25, 50, 75]:
await manager.broadcast_to_room(
{
"type": "download_progress",
"data": {"item_id": "dl-1", "percentage": progress}
},
"downloads"
)
# 3. Download complete
await manager.broadcast_to_room(
{"type": "download_complete", "data": {"item_id": "dl-1"}},
"downloads"
)
# Client should have received all notifications
assert ws.send_json.call_count == 5
async def test_multi_room_workflow(
self, websocket_service
):
"""Test workflow involving multiple rooms."""
manager = websocket_service.manager
# Setup clients in different rooms
download_ws = AsyncMock()
queue_ws = AsyncMock()
system_ws = AsyncMock()
await manager.connect(download_ws, room="downloads")
await manager.connect(queue_ws, room="queue")
await manager.connect(system_ws, room="system")
# Broadcast to each room
await manager.broadcast_to_room(
{"type": "download_update"}, "downloads"
)
await manager.broadcast_to_room(
{"type": "queue_update"}, "queue"
)
await manager.broadcast_to_room(
{"type": "system_update"}, "system"
)
# Each client should only receive their room's messages
download_ws.send_json.assert_called_once()
queue_ws.send_json.assert_called_once()
system_ws.send_json.assert_called_once()