- Modified BackgroundLoaderService to use multiple workers (default: 5) - Anime additions now process in parallel without blocking - Added comprehensive unit tests for concurrent behavior - Updated integration tests for compatibility - Updated architecture documentation
283 lines
9.2 KiB
Python
283 lines
9.2 KiB
Python
"""Unit tests for parallel anime addition functionality.
|
|
|
|
This module tests that multiple anime can be added concurrently without blocking
|
|
each other. The background loader should process multiple series simultaneously
|
|
rather than sequentially.
|
|
"""
|
|
import asyncio
|
|
from datetime import datetime, timezone
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from src.server.services.background_loader_service import (
|
|
BackgroundLoaderService,
|
|
LoadingStatus,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_websocket_service():
|
|
"""Create a mock WebSocket service."""
|
|
service = MagicMock()
|
|
service.broadcast = AsyncMock()
|
|
return service
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_anime_service():
|
|
"""Create a mock AnimeService."""
|
|
service = MagicMock()
|
|
return service
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_series_app():
|
|
"""Create a mock SeriesApp."""
|
|
app = MagicMock()
|
|
app.directory_to_search = "/fake/anime/directory"
|
|
app.nfo_service = MagicMock()
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
async def background_loader(mock_websocket_service, mock_anime_service, mock_series_app):
|
|
"""Create a BackgroundLoaderService instance."""
|
|
loader = BackgroundLoaderService(
|
|
websocket_service=mock_websocket_service,
|
|
anime_service=mock_anime_service,
|
|
series_app=mock_series_app
|
|
)
|
|
|
|
# Start the worker
|
|
await loader.start()
|
|
|
|
yield loader
|
|
|
|
# Stop the worker
|
|
await loader.stop()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_parallel_anime_additions(
|
|
background_loader,
|
|
mock_websocket_service,
|
|
):
|
|
"""Test that multiple anime additions are processed in parallel.
|
|
|
|
This test adds two anime series and verifies that:
|
|
1. Both are queued successfully
|
|
2. Both start processing without waiting for the other to complete
|
|
3. Both complete within a reasonable time frame (indicating parallel execution)
|
|
"""
|
|
# Track when tasks start and finish
|
|
task_events = {
|
|
"anime-one": {"started": None, "completed": None},
|
|
"anime-two": {"started": None, "completed": None},
|
|
}
|
|
|
|
# Mock the _load_series_data method to simulate work and track timing
|
|
original_load = background_loader._load_series_data
|
|
|
|
async def mock_load_series_data(task):
|
|
"""Mock load that simulates work with a delay."""
|
|
task_events[task.key]["started"] = datetime.now(timezone.utc)
|
|
|
|
# Simulate some work with a delay
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Mark progress
|
|
task.progress["episodes"] = True
|
|
task.progress["nfo"] = True
|
|
task.progress["logo"] = True
|
|
task.progress["images"] = True
|
|
task.status = LoadingStatus.COMPLETED
|
|
task.completed_at = datetime.now(timezone.utc)
|
|
|
|
task_events[task.key]["completed"] = datetime.now(timezone.utc)
|
|
|
|
# Remove from active tasks (normally done by _load_series_data)
|
|
background_loader.active_tasks.pop(task.key, None)
|
|
|
|
background_loader._load_series_data = mock_load_series_data
|
|
|
|
# Add two anime series
|
|
await background_loader.add_series_loading_task(
|
|
key="anime-one",
|
|
folder="Anime One",
|
|
name="Anime One",
|
|
year=2024
|
|
)
|
|
|
|
await background_loader.add_series_loading_task(
|
|
key="anime-two",
|
|
folder="Anime Two",
|
|
name="Anime Two",
|
|
year=2024
|
|
)
|
|
|
|
# Wait for both tasks to complete
|
|
# With sequential processing, this would take ~1.0 seconds
|
|
# With parallel processing, this should take ~0.5 seconds
|
|
start_time = datetime.now(timezone.utc)
|
|
|
|
# Wait for both to complete (with timeout)
|
|
max_wait = 2.0 # Maximum wait time
|
|
check_interval = 0.1
|
|
elapsed = 0
|
|
|
|
while elapsed < max_wait:
|
|
if (task_events["anime-one"]["completed"] is not None and
|
|
task_events["anime-two"]["completed"] is not None):
|
|
break
|
|
await asyncio.sleep(check_interval)
|
|
elapsed += check_interval
|
|
|
|
end_time = datetime.now(timezone.utc)
|
|
total_duration = (end_time - start_time).total_seconds()
|
|
|
|
# Verify both tasks completed
|
|
assert task_events["anime-one"]["started"] is not None, "Anime One never started"
|
|
assert task_events["anime-one"]["completed"] is not None, "Anime One never completed"
|
|
assert task_events["anime-two"]["started"] is not None, "Anime Two never started"
|
|
assert task_events["anime-two"]["completed"] is not None, "Anime Two never completed"
|
|
|
|
# Calculate time between starts
|
|
start_diff = abs(
|
|
(task_events["anime-two"]["started"] - task_events["anime-one"]["started"]).total_seconds()
|
|
)
|
|
|
|
# Verify parallel execution:
|
|
# If tasks run in parallel, they should start close together (< 0.2s apart)
|
|
# and complete in roughly the same total time as a single task (~0.5-0.8s total)
|
|
assert start_diff < 0.2, (
|
|
f"Tasks should start nearly simultaneously (parallel), "
|
|
f"but started {start_diff:.2f}s apart (sequential)"
|
|
)
|
|
|
|
# Total duration should be close to single task duration, not sum of both
|
|
# Allow some overhead for scheduling
|
|
assert total_duration < 1.0, (
|
|
f"Parallel execution should take ~0.5s, but took {total_duration:.2f}s "
|
|
f"(indicating sequential processing)"
|
|
)
|
|
|
|
print(f"✓ Parallel execution verified:")
|
|
print(f" - Start time difference: {start_diff:.3f}s")
|
|
print(f" - Total duration: {total_duration:.3f}s")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_anime_additions_non_blocking(
|
|
background_loader,
|
|
):
|
|
"""Test that adding anime doesn't block the caller.
|
|
|
|
The add_series_loading_task method should return immediately
|
|
after queuing, not wait for processing to complete.
|
|
"""
|
|
# Mock _load_series_data to simulate slow work
|
|
async def slow_load(task):
|
|
await asyncio.sleep(1.0)
|
|
task.status = LoadingStatus.COMPLETED
|
|
background_loader.active_tasks.pop(task.key, None)
|
|
|
|
background_loader._load_series_data = slow_load
|
|
|
|
# Add should return quickly, not wait for processing
|
|
start = datetime.now(timezone.utc)
|
|
|
|
await background_loader.add_series_loading_task(
|
|
key="test-anime",
|
|
folder="Test Anime",
|
|
name="Test Anime",
|
|
year=2024
|
|
)
|
|
|
|
end = datetime.now(timezone.utc)
|
|
add_duration = (end - start).total_seconds()
|
|
|
|
# Adding to queue should be fast (<0.1s), not wait for processing (1s)
|
|
assert add_duration < 0.1, (
|
|
f"add_series_loading_task should return immediately, "
|
|
f"but took {add_duration:.2f}s (blocking on processing)"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_anime_limit(
|
|
background_loader,
|
|
):
|
|
"""Test that multiple anime can be added and processed concurrently.
|
|
|
|
This test adds 5 anime series and verifies that at least 2 are
|
|
processed concurrently (proves parallel execution exists).
|
|
"""
|
|
processing_times = []
|
|
lock = asyncio.Lock()
|
|
|
|
async def track_load(task):
|
|
"""Track when tasks are processing."""
|
|
start = datetime.now(timezone.utc)
|
|
|
|
async with lock:
|
|
processing_times.append(("start", task.key, start))
|
|
|
|
# Simulate work
|
|
await asyncio.sleep(0.3)
|
|
|
|
end = datetime.now(timezone.utc)
|
|
async with lock:
|
|
processing_times.append(("end", task.key, end))
|
|
|
|
task.status = LoadingStatus.COMPLETED
|
|
background_loader.active_tasks.pop(task.key, None)
|
|
|
|
background_loader._load_series_data = track_load
|
|
|
|
# Add 5 anime
|
|
for i in range(5):
|
|
await background_loader.add_series_loading_task(
|
|
key=f"anime-{i}",
|
|
folder=f"Anime {i}",
|
|
name=f"Anime {i}",
|
|
year=2024
|
|
)
|
|
|
|
# Wait for all to complete
|
|
await asyncio.sleep(2.0)
|
|
|
|
# Analyze processing times to find overlaps
|
|
# If tasks run in parallel, we should see overlapping time windows
|
|
active_at_once = []
|
|
|
|
for i, (event1_type, key1, time1) in enumerate(processing_times):
|
|
if event1_type == "start":
|
|
# Count how many other tasks are active at this start time
|
|
concurrent_count = 1 # This task
|
|
|
|
for event2_type, key2, time2 in processing_times:
|
|
if key2 != key1:
|
|
# Check if key2 was active when key1 started
|
|
# Find start and end times for key2
|
|
key2_start = next((t for evt, k, t in processing_times
|
|
if evt == "start" and k == key2), None)
|
|
key2_end = next((t for evt, k, t in processing_times
|
|
if evt == "end" and k == key2), None)
|
|
|
|
if key2_start and key2_end:
|
|
if key2_start <= time1 <= key2_end:
|
|
concurrent_count += 1
|
|
|
|
active_at_once.append(concurrent_count)
|
|
|
|
max_concurrent = max(active_at_once) if active_at_once else 0
|
|
|
|
# We should see at least 2 tasks running concurrently
|
|
assert max_concurrent >= 2, (
|
|
f"Expected at least 2 concurrent tasks, but max was {max_concurrent}. "
|
|
f"This indicates sequential processing."
|
|
)
|
|
|
|
print(f"✓ Concurrent processing verified: max {max_concurrent} tasks at once")
|