593 lines
21 KiB
Python
593 lines
21 KiB
Python
"""Performance tests for WebSocket load and broadcasting.
|
||
|
||
This module tests the performance characteristics of WebSocket connections
|
||
including concurrent clients, message throughput, and progress update throttling.
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
import time
|
||
from typing import List
|
||
from unittest.mock import AsyncMock, Mock
|
||
|
||
import pytest
|
||
|
||
from src.server.services.websocket_service import WebSocketService
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class MockWebSocket:
|
||
"""Mock WebSocket client for testing."""
|
||
|
||
def __init__(self):
|
||
self.received_messages: List[dict] = []
|
||
self.send_count = 0
|
||
|
||
async def accept(self):
|
||
"""Accept connection."""
|
||
pass
|
||
|
||
async def send_json(self, data: dict):
|
||
"""Send JSON data."""
|
||
self.received_messages.append(data)
|
||
self.send_count += 1
|
||
await asyncio.sleep(0.001) # Simulate network latency
|
||
|
||
async def receive_json(self):
|
||
"""Keep connection open."""
|
||
await asyncio.sleep(100)
|
||
|
||
def clear_messages(self):
|
||
"""Clear received messages."""
|
||
self.received_messages = []
|
||
|
||
|
||
class TestWebSocketConcurrentClients:
|
||
"""Test WebSocket performance with many concurrent clients."""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_100_concurrent_clients_receive_broadcast(self):
|
||
"""Test broadcasting to 100 concurrent clients."""
|
||
# Target: Broadcast should complete in < 2 seconds
|
||
max_broadcast_time = 2.0
|
||
num_clients = 100
|
||
|
||
websocket_service = WebSocketService()
|
||
|
||
# Create and connect 100 clients
|
||
clients = []
|
||
for i in range(num_clients):
|
||
client = MockWebSocket()
|
||
await websocket_service.connect(client, f"client_{i:03d}")
|
||
await websocket_service.manager.join_room(f"client_{i:03d}", "test_room")
|
||
clients.append(client)
|
||
|
||
# Broadcast message
|
||
message = {
|
||
"type": "test_broadcast",
|
||
"data": "Performance test message"
|
||
}
|
||
|
||
start_time = time.time()
|
||
await websocket_service.manager.broadcast_to_room(message, "test_room")
|
||
elapsed_time = time.time() - start_time
|
||
|
||
# Verify all clients received message
|
||
received_count = sum(1 for c in clients if len(c.received_messages) > 0)
|
||
assert received_count == num_clients, \
|
||
f"Only {received_count}/{num_clients} clients received message"
|
||
|
||
# Verify performance
|
||
assert elapsed_time < max_broadcast_time, \
|
||
f"Broadcast took {elapsed_time:.2f}s, exceeds limit of {max_broadcast_time}s"
|
||
|
||
# Cleanup
|
||
for i in range(num_clients):
|
||
await websocket_service.disconnect(f"client_{i:03d}")
|
||
|
||
logger.info("Broadcast completed for %d clients", num_clients, extra={"elapsed_s": elapsed_time})
|
||
logger.debug(
|
||
"Broadcast performance per client",
|
||
extra={
|
||
"num_clients": num_clients,
|
||
"avg_ms_per_client": elapsed_time / num_clients * 1000,
|
||
},
|
||
)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_200_concurrent_clients_scalability(self):
|
||
"""Test scalability with 200 concurrent clients."""
|
||
max_broadcast_time = 3.0
|
||
num_clients = 200
|
||
|
||
websocket_service = WebSocketService()
|
||
|
||
clients = []
|
||
for i in range(num_clients):
|
||
client = MockWebSocket()
|
||
await websocket_service.connect(client, f"client_{i:03d}")
|
||
await websocket_service.manager.join_room(f"client_{i:03d}", "test_room")
|
||
clients.append(client)
|
||
|
||
message = {"type": "scalability_test", "data": "Test"}
|
||
|
||
start_time = time.time()
|
||
await websocket_service.manager.broadcast_to_room(message, "test_room")
|
||
elapsed_time = time.time() - start_time
|
||
|
||
received_count = sum(1 for c in clients if len(c.received_messages) > 0)
|
||
assert received_count == num_clients
|
||
assert elapsed_time < max_broadcast_time
|
||
|
||
# Cleanup
|
||
for i in range(num_clients):
|
||
await websocket_service.disconnect(f"client_{i:03d}")
|
||
|
||
logger.info("Broadcast completed for %d clients", num_clients, extra={"elapsed_s": elapsed_time})
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_connection_pool_efficiency(self):
|
||
"""Test efficient handling of connection pool."""
|
||
websocket_service = WebSocketService()
|
||
|
||
# Connect 50 clients rapidly
|
||
num_clients = 50
|
||
start_time = time.time()
|
||
|
||
clients = []
|
||
for i in range(num_clients):
|
||
client = MockWebSocket()
|
||
await websocket_service.connect(client, f"client_{i:02d}")
|
||
clients.append(client)
|
||
|
||
connection_time = time.time() - start_time
|
||
|
||
# Connection should be fast (< 1 second for 50 clients)
|
||
assert connection_time < 1.0, \
|
||
f"Connection time {connection_time:.2f}s too slow"
|
||
|
||
# Verify all connected
|
||
assert len(websocket_service.manager._active_connections) == num_clients
|
||
|
||
# Cleanup
|
||
for i in range(num_clients):
|
||
await websocket_service.disconnect(f"client_{i:02d}")
|
||
|
||
logger.info("Connected %d clients in %.3fs", num_clients, connection_time)
|
||
logger.info("Average: %.2fms per connection", connection_time / num_clients * 1000)
|
||
|
||
|
||
class TestMessageThroughput:
|
||
"""Test message throughput and rate performance."""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_messages_per_second_baseline(self):
|
||
"""Test baseline message throughput."""
|
||
websocket_service = WebSocketService()
|
||
|
||
# Connect 10 clients
|
||
num_clients = 10
|
||
clients = []
|
||
for i in range(num_clients):
|
||
client = MockWebSocket()
|
||
await websocket_service.connect(client, f"client_{i}")
|
||
await websocket_service.manager.join_room(f"client_{i}", "test_room")
|
||
clients.append(client)
|
||
|
||
# Send 50 messages
|
||
num_messages = 50
|
||
start_time = time.time()
|
||
|
||
for i in range(num_messages):
|
||
message = {
|
||
"type": "throughput_test",
|
||
"sequence": i,
|
||
"data": f"Message {i}"
|
||
}
|
||
await websocket_service.manager.broadcast_to_room(message, "test_room")
|
||
|
||
elapsed_time = time.time() - start_time
|
||
messages_per_second = num_messages / elapsed_time
|
||
|
||
# Should handle at least 10 messages/second
|
||
assert messages_per_second >= 10, \
|
||
f"Only {messages_per_second:.2f} messages/sec, too slow"
|
||
|
||
# Verify all clients received all messages
|
||
for client in clients:
|
||
assert len(client.received_messages) == num_messages
|
||
|
||
# Cleanup
|
||
for i in range(num_clients):
|
||
await websocket_service.disconnect(f"client_{i}")
|
||
|
||
logger.info("Throughput: %.2f messages/second", messages_per_second)
|
||
logger.info(
|
||
"Total: %d messages to %d clients in %.2fs",
|
||
num_messages,
|
||
num_clients,
|
||
elapsed_time,
|
||
)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_high_frequency_updates(self):
|
||
"""Test high-frequency progress updates."""
|
||
websocket_service = WebSocketService()
|
||
|
||
# Connect 5 clients
|
||
clients = []
|
||
for i in range(5):
|
||
client = MockWebSocket()
|
||
await websocket_service.connect(client, f"client_{i}")
|
||
await websocket_service.manager.join_room(f"client_{i}", "downloads")
|
||
clients.append(client)
|
||
|
||
# Send 100 progress updates rapidly
|
||
num_updates = 100
|
||
start_time = time.time()
|
||
|
||
for progress in range(num_updates):
|
||
message = {
|
||
"type": "download_progress",
|
||
"data": {
|
||
"download_id": "test_download",
|
||
"percent": progress,
|
||
"speed_mbps": 2.5
|
||
}
|
||
}
|
||
await websocket_service.manager.broadcast_to_room(message, "downloads")
|
||
|
||
elapsed_time = time.time() - start_time
|
||
updates_per_second = num_updates / elapsed_time
|
||
|
||
# Should handle at least 20 updates/second
|
||
assert updates_per_second >= 20, \
|
||
f"Only {updates_per_second:.2f} updates/sec"
|
||
|
||
# Cleanup
|
||
for i in range(5):
|
||
await websocket_service.disconnect(f"client_{i}")
|
||
|
||
logger.info("High-frequency: %.2f updates/second", updates_per_second)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_burst_message_handling(self):
|
||
"""Test handling of burst message traffic."""
|
||
websocket_service = WebSocketService()
|
||
|
||
# Connect 20 clients
|
||
num_clients = 20
|
||
clients = []
|
||
for i in range(num_clients):
|
||
client = MockWebSocket()
|
||
await websocket_service.connect(client, f"client_{i:02d}")
|
||
await websocket_service.manager.join_room(f"client_{i:02d}", "test_room")
|
||
clients.append(client)
|
||
|
||
# Send 30 messages in rapid burst
|
||
num_messages = 30
|
||
start_time = time.time()
|
||
|
||
tasks = []
|
||
for i in range(num_messages):
|
||
message = {"type": "burst_test", "id": i}
|
||
tasks.append(
|
||
websocket_service.manager.broadcast_to_room(message, "test_room")
|
||
)
|
||
|
||
await asyncio.gather(*tasks)
|
||
elapsed_time = time.time() - start_time
|
||
|
||
# Burst should complete quickly
|
||
assert elapsed_time < 2.0, f"Burst took {elapsed_time:.2f}s, too slow"
|
||
|
||
# Verify all messages delivered
|
||
for client in clients:
|
||
assert len(client.received_messages) == num_messages
|
||
|
||
# Cleanup
|
||
for i in range(num_clients):
|
||
await websocket_service.disconnect(f"client_{i:02d}")
|
||
|
||
logger.info("Burst: %d messages in %.2fs", num_messages, elapsed_time)
|
||
|
||
|
||
class TestProgressUpdateThrottling:
|
||
"""Test progress update throttling to avoid flooding."""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_throttled_progress_updates(self):
|
||
"""Test that progress updates are properly throttled."""
|
||
websocket_service = WebSocketService()
|
||
|
||
# Connect client
|
||
client = MockWebSocket()
|
||
await websocket_service.connect(client, "test_client")
|
||
await websocket_service.manager.join_room("test_client", "downloads")
|
||
|
||
# Send 100 progress updates rapidly (simulating 1% increments)
|
||
# Only significant changes (>= 1%) should be broadcast
|
||
for progress in range(100):
|
||
message = {
|
||
"type": "download_progress",
|
||
"data": {
|
||
"download_id": "throttle_test",
|
||
"percent": progress + 0.1, # Sub-1% increments
|
||
"speed_mbps": 2.5
|
||
}
|
||
}
|
||
# In real implementation, throttling happens in ProgressService
|
||
# Here we simulate by only sending every 5%
|
||
if progress % 5 == 0:
|
||
await websocket_service.manager.broadcast_to_room(message, "downloads")
|
||
|
||
# With 5% throttling, should receive ~20 updates
|
||
assert len(client.received_messages) <= 25, "Too many updates sent"
|
||
assert len(client.received_messages) >= 15, "Too few updates sent"
|
||
|
||
await websocket_service.disconnect("test_client")
|
||
|
||
logger.info(
|
||
"Throttling: %d updates sent (100 possible)",
|
||
len(client.received_messages),
|
||
)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_throttling_reduces_network_load(self):
|
||
"""Test that throttling significantly reduces message count."""
|
||
websocket_service = WebSocketService()
|
||
|
||
# Connect 10 clients
|
||
clients = []
|
||
for i in range(10):
|
||
client = MockWebSocket()
|
||
await websocket_service.connect(client, f"client_{i}")
|
||
await websocket_service.manager.join_room(f"client_{i}", "downloads")
|
||
clients.append(client)
|
||
|
||
# Without throttling: 1000 updates (simulate)
|
||
# With throttling: Only significant changes
|
||
throttled_updates = 0
|
||
|
||
for i in range(1000):
|
||
percent = i * 0.1 # 0.1% increments
|
||
|
||
# Simulate throttling: only send when percent changes by >= 1%
|
||
if i % 10 == 0:
|
||
message = {
|
||
"type": "download_progress",
|
||
"data": {"percent": percent}
|
||
}
|
||
await websocket_service.manager.broadcast_to_room(message, "downloads")
|
||
throttled_updates += 1
|
||
|
||
# Verify significant reduction
|
||
assert throttled_updates <= 110, "Throttling ineffective"
|
||
|
||
for client in clients:
|
||
assert len(client.received_messages) == throttled_updates
|
||
|
||
reduction_percent = (1 - throttled_updates / 1000) * 100
|
||
|
||
# Cleanup
|
||
for i in range(10):
|
||
await websocket_service.disconnect(f"client_{i}")
|
||
|
||
logger.info(
|
||
"Throttling: %d/1000 updates sent (%.1f%% reduction)",
|
||
throttled_updates,
|
||
reduction_percent,
|
||
)
|
||
|
||
|
||
class TestRoomIsolation:
|
||
"""Test performance of room-based message isolation."""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_room_isolation_performance(self):
|
||
"""Test that room isolation doesn't impact performance."""
|
||
websocket_service = WebSocketService()
|
||
|
||
# Create 3 rooms with 30 clients each
|
||
rooms = ["downloads", "queue", "system"]
|
||
clients_per_room = 30
|
||
|
||
all_clients = []
|
||
for room in rooms:
|
||
for i in range(clients_per_room):
|
||
client = MockWebSocket()
|
||
conn_id = f"{room}_client_{i:02d}"
|
||
await websocket_service.connect(client, conn_id)
|
||
await websocket_service.manager.join_room(conn_id, room)
|
||
all_clients.append((client, room))
|
||
|
||
# Broadcast to each room
|
||
start_time = time.time()
|
||
|
||
for room in rooms:
|
||
message = {"type": f"{room}_message", "data": f"Message for {room}"}
|
||
await websocket_service.manager.broadcast_to_room(message, room)
|
||
|
||
elapsed_time = time.time() - start_time
|
||
|
||
# Should be fast even with room isolation
|
||
assert elapsed_time < 1.0, f"Room broadcasts took {elapsed_time:.2f}s"
|
||
|
||
# Verify isolation: each client received exactly 1 message
|
||
for client, room in all_clients:
|
||
assert len(client.received_messages) == 1
|
||
assert client.received_messages[0]["type"] == f"{room}_message"
|
||
|
||
# Cleanup
|
||
for room in rooms:
|
||
for i in range(clients_per_room):
|
||
await websocket_service.disconnect(f"{room}_client_{i:02d}")
|
||
|
||
logger.info("Room isolation: 3 rooms × 30 clients in %.2fs", elapsed_time)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_selective_room_broadcast_performance(self):
|
||
"""Test performance of selective room broadcasting."""
|
||
websocket_service = WebSocketService()
|
||
|
||
# Connect 100 clients across 4 rooms
|
||
rooms = ["room_a", "room_b", "room_c", "room_d"]
|
||
clients_per_room = 25
|
||
|
||
for room in rooms:
|
||
for i in range(clients_per_room):
|
||
client = MockWebSocket()
|
||
conn_id = f"{room}_{i:02d}"
|
||
await websocket_service.connect(client, conn_id)
|
||
await websocket_service.manager.join_room(conn_id, room)
|
||
|
||
# Broadcast to room_b only (25 clients)
|
||
message = {"type": "selective_broadcast", "target": "room_b"}
|
||
|
||
start_time = time.time()
|
||
await websocket_service.manager.broadcast_to_room(message, "room_b")
|
||
elapsed_time = time.time() - start_time
|
||
|
||
# Should be fast and not broadcast to other rooms
|
||
assert elapsed_time < 0.5, f"Selective broadcast took {elapsed_time:.2f}s"
|
||
|
||
# Cleanup
|
||
for room in rooms:
|
||
for i in range(clients_per_room):
|
||
await websocket_service.disconnect(f"{room}_{i:02d}")
|
||
|
||
logger.info("Selective broadcast: 25/100 clients in %.3fs", elapsed_time)
|
||
|
||
|
||
class TestConnectionStability:
|
||
"""Test connection stability under load."""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_rapid_connect_disconnect_cycles(self):
|
||
"""Test rapid connection and disconnection cycles."""
|
||
websocket_service = WebSocketService()
|
||
|
||
# Perform 50 rapid connect/disconnect cycles
|
||
num_cycles = 50
|
||
start_time = time.time()
|
||
|
||
for i in range(num_cycles):
|
||
client = MockWebSocket()
|
||
conn_id = f"cycle_client_{i:02d}"
|
||
|
||
await websocket_service.connect(client, conn_id)
|
||
|
||
# Send a message
|
||
message = {"type": "cycle_test", "id": i}
|
||
await websocket_service.manager.broadcast_to_room(message, "default")
|
||
|
||
# Disconnect
|
||
await websocket_service.disconnect(conn_id)
|
||
|
||
elapsed_time = time.time() - start_time
|
||
cycles_per_second = num_cycles / elapsed_time
|
||
|
||
# Should handle rapid cycling
|
||
assert elapsed_time < 5.0, f"Cycles took {elapsed_time:.2f}s, too slow"
|
||
|
||
# All connections should be cleaned up
|
||
assert len(websocket_service.manager._active_connections) == 0
|
||
|
||
logger.info("Rapid cycles: %.2f cycles/second", cycles_per_second)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_concurrent_connect_disconnect(self):
|
||
"""Test concurrent connection and disconnection operations."""
|
||
websocket_service = WebSocketService()
|
||
|
||
# Connect 30 clients concurrently
|
||
async def connect_client(client_id: int):
|
||
client = MockWebSocket()
|
||
conn_id = f"concurrent_{client_id:02d}"
|
||
await websocket_service.connect(client, conn_id)
|
||
await asyncio.sleep(0.1) # Keep connection briefly
|
||
await websocket_service.disconnect(conn_id)
|
||
|
||
start_time = time.time()
|
||
await asyncio.gather(*[connect_client(i) for i in range(30)])
|
||
elapsed_time = time.time() - start_time
|
||
|
||
# Should handle concurrent operations efficiently
|
||
assert elapsed_time < 2.0, f"Concurrent ops took {elapsed_time:.2f}s"
|
||
|
||
# All should be cleaned up
|
||
assert len(websocket_service.manager._active_connections) == 0
|
||
|
||
logger.info("Concurrent ops: 30 clients in %.2fs", elapsed_time)
|
||
|
||
|
||
class TestMemoryEfficiency:
|
||
"""Test memory efficiency of WebSocket operations."""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_memory_usage_with_many_connections(self):
|
||
"""Test memory usage with many concurrent connections."""
|
||
import psutil
|
||
|
||
process = psutil.Process()
|
||
baseline_memory_mb = process.memory_info().rss / 1024 / 1024
|
||
|
||
websocket_service = WebSocketService()
|
||
|
||
# Connect 100 clients
|
||
clients = []
|
||
for i in range(100):
|
||
client = MockWebSocket()
|
||
await websocket_service.connect(client, f"mem_client_{i:03d}")
|
||
clients.append(client)
|
||
|
||
current_memory_mb = process.memory_info().rss / 1024 / 1024
|
||
memory_increase_mb = current_memory_mb - baseline_memory_mb
|
||
|
||
# Memory increase should be reasonable (< 50MB for 100 connections)
|
||
assert memory_increase_mb < 50, \
|
||
f"Memory increased by {memory_increase_mb:.2f}MB, too much"
|
||
|
||
per_connection_kb = (memory_increase_mb * 1024) / 100
|
||
|
||
# Cleanup
|
||
for i in range(100):
|
||
await websocket_service.disconnect(f"mem_client_{i:03d}")
|
||
|
||
logger.info("Memory: %.2fMB for 100 connections", memory_increase_mb)
|
||
logger.info("Per connection: %.2fKB", per_connection_kb)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_message_queue_memory_efficiency(self):
|
||
"""Test that message queues don't accumulate excessively."""
|
||
import sys
|
||
|
||
websocket_service = WebSocketService()
|
||
|
||
# Connect client
|
||
client = MockWebSocket()
|
||
await websocket_service.connect(client, "queue_test")
|
||
await websocket_service.manager.join_room("queue_test", "test_room")
|
||
|
||
# Send 100 messages
|
||
messages = []
|
||
for i in range(100):
|
||
message = {
|
||
"type": "memory_test",
|
||
"id": i,
|
||
"data": "x" * 100 # 100 bytes of data
|
||
}
|
||
messages.append(message)
|
||
await websocket_service.manager.broadcast_to_room(message, "test_room")
|
||
|
||
# Calculate approximate size
|
||
total_size = sum(sys.getsizeof(msg) for msg in client.received_messages)
|
||
|
||
# Size should be reasonable
|
||
assert total_size < 100000, f"Message queue size {total_size} bytes too large"
|
||
|
||
await websocket_service.disconnect("queue_test")
|
||
|
||
logger.info("Message queue: %d bytes for 100 messages", total_size)
|
||
logger.info("Average: %.2f bytes/message", total_size / 100)
|