- Migrate settings.py to Pydantic V2 (SettingsConfigDict, validation_alias) - Update config models to use @field_validator with @classmethod - Replace deprecated datetime.utcnow() with datetime.now(timezone.utc) - Migrate FastAPI app from @app.on_event to lifespan context manager - Implement comprehensive rate limiting middleware with: * Endpoint-specific rate limits (login: 5/min, register: 3/min) * IP-based and user-based tracking * Authenticated user multiplier (2x limits) * Bypass paths for health, docs, static, websocket endpoints * Rate limit headers in responses - Add 13 comprehensive tests for rate limiting (all passing) - Update instructions.md to mark completed tasks - Fix asyncio.create_task usage in anime_service.py All 714 tests passing. No deprecation warnings.
471 lines
15 KiB
Python
471 lines
15 KiB
Python
"""Integration tests for WebSocket integration with core services.
|
|
|
|
This module tests the integration between WebSocket broadcasting and
|
|
core services (DownloadService, AnimeService, ProgressService) to ensure
|
|
real-time updates are properly broadcasted to connected clients.
|
|
"""
|
|
import asyncio
|
|
from typing import Any, Dict, List
|
|
from unittest.mock import Mock, patch
|
|
|
|
import pytest
|
|
|
|
from src.server.models.download import (
|
|
DownloadPriority,
|
|
DownloadStatus,
|
|
EpisodeIdentifier,
|
|
)
|
|
from src.server.services.anime_service import AnimeService
|
|
from src.server.services.download_service import DownloadService
|
|
from src.server.services.progress_service import ProgressService, ProgressType
|
|
from src.server.services.websocket_service import WebSocketService
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_series_app():
|
|
"""Mock SeriesApp for testing."""
|
|
app = Mock()
|
|
app.series_list = []
|
|
app.search = Mock(return_value=[])
|
|
app.ReScan = Mock()
|
|
app.download = Mock(return_value=True)
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def progress_service():
|
|
"""Create a ProgressService instance for testing."""
|
|
return ProgressService()
|
|
|
|
|
|
@pytest.fixture
|
|
def websocket_service():
|
|
"""Create a WebSocketService instance for testing."""
|
|
return WebSocketService()
|
|
|
|
|
|
@pytest.fixture
|
|
async def anime_service(mock_series_app, progress_service):
|
|
"""Create an AnimeService with mocked dependencies."""
|
|
with patch("src.server.services.anime_service.SeriesApp", return_value=mock_series_app):
|
|
service = AnimeService(
|
|
directory="/test/anime",
|
|
progress_service=progress_service,
|
|
)
|
|
yield service
|
|
|
|
|
|
@pytest.fixture
|
|
async def download_service(anime_service, progress_service):
|
|
"""Create a DownloadService with dependencies."""
|
|
service = DownloadService(
|
|
anime_service=anime_service,
|
|
max_concurrent_downloads=2,
|
|
progress_service=progress_service,
|
|
persistence_path="/tmp/test_queue.json",
|
|
)
|
|
yield service
|
|
await service.stop()
|
|
|
|
|
|
class TestWebSocketDownloadIntegration:
|
|
"""Test WebSocket integration with DownloadService."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_download_progress_broadcast(
|
|
self, download_service, websocket_service
|
|
):
|
|
"""Test that download progress updates are broadcasted."""
|
|
broadcasts: List[Dict[str, Any]] = []
|
|
|
|
async def mock_broadcast(update_type: str, data: dict):
|
|
"""Capture broadcast calls."""
|
|
broadcasts.append({"type": update_type, "data": data})
|
|
|
|
download_service.set_broadcast_callback(mock_broadcast)
|
|
|
|
# Add item to queue
|
|
item_ids = await download_service.add_to_queue(
|
|
serie_id="test_serie",
|
|
serie_name="Test Anime",
|
|
episodes=[EpisodeIdentifier(season=1, episode=1)],
|
|
priority=DownloadPriority.HIGH,
|
|
)
|
|
|
|
assert len(item_ids) == 1
|
|
assert len(broadcasts) == 1
|
|
assert broadcasts[0]["type"] == "queue_status"
|
|
assert broadcasts[0]["data"]["action"] == "items_added"
|
|
assert item_ids[0] in broadcasts[0]["data"]["added_ids"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_queue_operations_broadcast(
|
|
self, download_service
|
|
):
|
|
"""Test that queue operations broadcast status updates."""
|
|
broadcasts: List[Dict[str, Any]] = []
|
|
|
|
async def mock_broadcast(update_type: str, data: dict):
|
|
broadcasts.append({"type": update_type, "data": data})
|
|
|
|
download_service.set_broadcast_callback(mock_broadcast)
|
|
|
|
# Add items
|
|
item_ids = await download_service.add_to_queue(
|
|
serie_id="test",
|
|
serie_name="Test",
|
|
episodes=[EpisodeIdentifier(season=1, episode=i) for i in range(1, 4)],
|
|
priority=DownloadPriority.NORMAL,
|
|
)
|
|
|
|
# Remove items
|
|
removed = await download_service.remove_from_queue([item_ids[0]])
|
|
assert len(removed) == 1
|
|
|
|
# Check broadcasts
|
|
add_broadcast = next(
|
|
b for b in broadcasts
|
|
if b["data"].get("action") == "items_added"
|
|
)
|
|
remove_broadcast = next(
|
|
b for b in broadcasts
|
|
if b["data"].get("action") == "items_removed"
|
|
)
|
|
|
|
assert add_broadcast["type"] == "queue_status"
|
|
assert len(add_broadcast["data"]["added_ids"]) == 3
|
|
|
|
assert remove_broadcast["type"] == "queue_status"
|
|
assert item_ids[0] in remove_broadcast["data"]["removed_ids"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_queue_start_stop_broadcast(
|
|
self, download_service
|
|
):
|
|
"""Test that start/stop operations broadcast updates."""
|
|
broadcasts: List[Dict[str, Any]] = []
|
|
|
|
async def mock_broadcast(update_type: str, data: dict):
|
|
broadcasts.append({"type": update_type, "data": data})
|
|
|
|
download_service.set_broadcast_callback(mock_broadcast)
|
|
|
|
# Start queue
|
|
await download_service.start()
|
|
await asyncio.sleep(0.1)
|
|
|
|
# Stop queue
|
|
await download_service.stop()
|
|
|
|
# Find start/stop broadcasts
|
|
start_broadcast = next(
|
|
(b for b in broadcasts if b["type"] == "queue_started"),
|
|
None,
|
|
)
|
|
stop_broadcast = next(
|
|
(b for b in broadcasts if b["type"] == "queue_stopped"),
|
|
None,
|
|
)
|
|
|
|
assert start_broadcast is not None
|
|
assert start_broadcast["data"]["is_running"] is True
|
|
|
|
assert stop_broadcast is not None
|
|
assert stop_broadcast["data"]["is_running"] is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_queue_pause_resume_broadcast(
|
|
self, download_service
|
|
):
|
|
"""Test that pause/resume operations broadcast updates."""
|
|
broadcasts: List[Dict[str, Any]] = []
|
|
|
|
async def mock_broadcast(update_type: str, data: dict):
|
|
broadcasts.append({"type": update_type, "data": data})
|
|
|
|
download_service.set_broadcast_callback(mock_broadcast)
|
|
|
|
# Pause queue
|
|
await download_service.pause_queue()
|
|
|
|
# Resume queue
|
|
await download_service.resume_queue()
|
|
|
|
# Find pause/resume broadcasts
|
|
pause_broadcast = next(
|
|
(b for b in broadcasts if b["type"] == "queue_paused"),
|
|
None,
|
|
)
|
|
resume_broadcast = next(
|
|
(b for b in broadcasts if b["type"] == "queue_resumed"),
|
|
None,
|
|
)
|
|
|
|
assert pause_broadcast is not None
|
|
assert pause_broadcast["data"]["is_paused"] is True
|
|
|
|
assert resume_broadcast is not None
|
|
assert resume_broadcast["data"]["is_paused"] is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_clear_completed_broadcast(
|
|
self, download_service
|
|
):
|
|
"""Test that clearing completed items broadcasts update."""
|
|
broadcasts: List[Dict[str, Any]] = []
|
|
|
|
async def mock_broadcast(update_type: str, data: dict):
|
|
broadcasts.append({"type": update_type, "data": data})
|
|
|
|
download_service.set_broadcast_callback(mock_broadcast)
|
|
|
|
# Manually add a completed item to test
|
|
from datetime import datetime, timezone
|
|
|
|
from src.server.models.download import DownloadItem
|
|
|
|
completed_item = DownloadItem(
|
|
id="test_completed",
|
|
serie_id="test",
|
|
serie_name="Test",
|
|
episode=EpisodeIdentifier(season=1, episode=1),
|
|
status=DownloadStatus.COMPLETED,
|
|
priority=DownloadPriority.NORMAL,
|
|
added_at=datetime.now(timezone.utc),
|
|
)
|
|
download_service._completed_items.append(completed_item)
|
|
|
|
# Clear completed
|
|
count = await download_service.clear_completed()
|
|
|
|
assert count == 1
|
|
|
|
# Find clear broadcast
|
|
clear_broadcast = next(
|
|
(
|
|
b for b in broadcasts
|
|
if b["data"].get("action") == "completed_cleared"
|
|
),
|
|
None,
|
|
)
|
|
|
|
assert clear_broadcast is not None
|
|
assert clear_broadcast["data"]["cleared_count"] == 1
|
|
|
|
|
|
class TestWebSocketScanIntegration:
|
|
"""Test WebSocket integration with AnimeService scan operations."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_progress_broadcast(
|
|
self, anime_service, progress_service, mock_series_app
|
|
):
|
|
"""Test that scan progress updates are broadcasted."""
|
|
broadcasts: List[Dict[str, Any]] = []
|
|
|
|
async def mock_broadcast(message_type: str, data: dict, room: str):
|
|
"""Capture broadcast calls."""
|
|
broadcasts.append({
|
|
"type": message_type,
|
|
"data": data,
|
|
"room": room,
|
|
})
|
|
|
|
progress_service.set_broadcast_callback(mock_broadcast)
|
|
|
|
# Mock scan callback to simulate progress
|
|
def mock_scan_callback(callback):
|
|
"""Simulate scan progress."""
|
|
if callback:
|
|
callback({"current": 5, "total": 10, "message": "Scanning..."})
|
|
callback({"current": 10, "total": 10, "message": "Complete"})
|
|
|
|
mock_series_app.ReScan = mock_scan_callback
|
|
|
|
# Run scan
|
|
await anime_service.rescan()
|
|
|
|
# Verify broadcasts were made
|
|
assert len(broadcasts) >= 2 # At least start and complete
|
|
|
|
# Check for scan progress broadcasts
|
|
scan_broadcasts = [
|
|
b for b in broadcasts if b["room"] == "scan_progress"
|
|
]
|
|
assert len(scan_broadcasts) >= 2
|
|
|
|
# Verify start broadcast
|
|
start_broadcast = scan_broadcasts[0]
|
|
assert start_broadcast["data"]["status"] == "started"
|
|
assert start_broadcast["data"]["type"] == ProgressType.SCAN.value
|
|
|
|
# Verify completion broadcast
|
|
complete_broadcast = scan_broadcasts[-1]
|
|
assert complete_broadcast["data"]["status"] == "completed"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_failure_broadcast(
|
|
self, anime_service, progress_service, mock_series_app
|
|
):
|
|
"""Test that scan failures are broadcasted."""
|
|
broadcasts: List[Dict[str, Any]] = []
|
|
|
|
async def mock_broadcast(message_type: str, data: dict, room: str):
|
|
broadcasts.append({
|
|
"type": message_type,
|
|
"data": data,
|
|
"room": room,
|
|
})
|
|
|
|
progress_service.set_broadcast_callback(mock_broadcast)
|
|
|
|
# Mock scan to raise error
|
|
def mock_scan_error(callback):
|
|
raise RuntimeError("Scan failed")
|
|
|
|
mock_series_app.ReScan = mock_scan_error
|
|
|
|
# Run scan (should fail)
|
|
with pytest.raises(Exception):
|
|
await anime_service.rescan()
|
|
|
|
# Verify failure broadcast
|
|
scan_broadcasts = [
|
|
b for b in broadcasts if b["room"] == "scan_progress"
|
|
]
|
|
assert len(scan_broadcasts) >= 2 # Start and fail
|
|
|
|
# Verify failure broadcast
|
|
fail_broadcast = scan_broadcasts[-1]
|
|
assert fail_broadcast["data"]["status"] == "failed"
|
|
# Verify error message or failed status
|
|
is_error = "error" in fail_broadcast["data"]["message"].lower()
|
|
is_failed = fail_broadcast["data"]["status"] == "failed"
|
|
assert is_error or is_failed
|
|
|
|
|
|
class TestWebSocketProgressIntegration:
|
|
"""Test WebSocket integration with ProgressService."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_progress_lifecycle_broadcast(
|
|
self, progress_service
|
|
):
|
|
"""Test that progress lifecycle events are broadcasted."""
|
|
broadcasts: List[Dict[str, Any]] = []
|
|
|
|
async def mock_broadcast(message_type: str, data: dict, room: str):
|
|
broadcasts.append({
|
|
"type": message_type,
|
|
"data": data,
|
|
"room": room,
|
|
})
|
|
|
|
progress_service.set_broadcast_callback(mock_broadcast)
|
|
|
|
# Start progress
|
|
await progress_service.start_progress(
|
|
progress_id="test_progress",
|
|
progress_type=ProgressType.DOWNLOAD,
|
|
title="Test Download",
|
|
total=100,
|
|
)
|
|
|
|
# Update progress
|
|
await progress_service.update_progress(
|
|
progress_id="test_progress",
|
|
current=50,
|
|
force_broadcast=True,
|
|
)
|
|
|
|
# Complete progress
|
|
await progress_service.complete_progress(
|
|
progress_id="test_progress",
|
|
message="Download complete",
|
|
)
|
|
|
|
# Verify broadcasts
|
|
assert len(broadcasts) == 3
|
|
|
|
start_broadcast = broadcasts[0]
|
|
assert start_broadcast["data"]["status"] == "started"
|
|
assert start_broadcast["room"] == "download_progress"
|
|
|
|
update_broadcast = broadcasts[1]
|
|
assert update_broadcast["data"]["status"] == "in_progress"
|
|
assert update_broadcast["data"]["percent"] == 50.0
|
|
|
|
complete_broadcast = broadcasts[2]
|
|
assert complete_broadcast["data"]["status"] == "completed"
|
|
assert complete_broadcast["data"]["percent"] == 100.0
|
|
|
|
|
|
class TestWebSocketEndToEnd:
|
|
"""End-to-end integration tests with all services."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_complete_download_flow_with_broadcasts(
|
|
self, download_service, anime_service, progress_service
|
|
):
|
|
"""Test complete download flow with all broadcasts."""
|
|
all_broadcasts: List[Dict[str, Any]] = []
|
|
|
|
async def capture_download_broadcast(update_type: str, data: dict):
|
|
all_broadcasts.append({
|
|
"source": "download",
|
|
"type": update_type,
|
|
"data": data,
|
|
})
|
|
|
|
async def capture_progress_broadcast(
|
|
message_type: str, data: dict, room: str
|
|
):
|
|
all_broadcasts.append({
|
|
"source": "progress",
|
|
"type": message_type,
|
|
"data": data,
|
|
"room": room,
|
|
})
|
|
|
|
download_service.set_broadcast_callback(capture_download_broadcast)
|
|
progress_service.set_broadcast_callback(capture_progress_broadcast)
|
|
|
|
# Add items to queue
|
|
item_ids = await download_service.add_to_queue(
|
|
serie_id="test",
|
|
serie_name="Test Anime",
|
|
episodes=[EpisodeIdentifier(season=1, episode=1)],
|
|
priority=DownloadPriority.HIGH,
|
|
)
|
|
|
|
# Start queue
|
|
await download_service.start()
|
|
await asyncio.sleep(0.1)
|
|
|
|
# Pause queue
|
|
await download_service.pause_queue()
|
|
|
|
# Resume queue
|
|
await download_service.resume_queue()
|
|
|
|
# Stop queue
|
|
await download_service.stop()
|
|
|
|
# Verify we received broadcasts from both services
|
|
download_broadcasts = [
|
|
b for b in all_broadcasts if b["source"] == "download"
|
|
]
|
|
|
|
assert len(download_broadcasts) >= 4 # add, start, pause, resume, stop
|
|
assert len(item_ids) == 1
|
|
|
|
# Verify queue status broadcasts
|
|
queue_status_broadcasts = [
|
|
b for b in download_broadcasts if b["type"] == "queue_status"
|
|
]
|
|
assert len(queue_status_broadcasts) >= 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|