Files
Aniworld/tests/unit/test_parallel_anime_add.py
Lukas 8ff558cb07 Add concurrent anime processing support
- Modified BackgroundLoaderService to use multiple workers (default: 5)
- Anime additions now process in parallel without blocking
- Added comprehensive unit tests for concurrent behavior
- Updated integration tests for compatibility
- Updated architecture documentation
2026-01-24 17:42:59 +01:00

283 lines
9.2 KiB
Python

"""Unit tests for parallel anime addition functionality.
This module tests that multiple anime can be added concurrently without blocking
each other. The background loader should process multiple series simultaneously
rather than sequentially.
"""
import asyncio
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.server.services.background_loader_service import (
BackgroundLoaderService,
LoadingStatus,
)
@pytest.fixture
def mock_websocket_service():
"""Create a mock WebSocket service."""
service = MagicMock()
service.broadcast = AsyncMock()
return service
@pytest.fixture
def mock_anime_service():
"""Create a mock AnimeService."""
service = MagicMock()
return service
@pytest.fixture
def mock_series_app():
"""Create a mock SeriesApp."""
app = MagicMock()
app.directory_to_search = "/fake/anime/directory"
app.nfo_service = MagicMock()
return app
@pytest.fixture
async def background_loader(mock_websocket_service, mock_anime_service, mock_series_app):
"""Create a BackgroundLoaderService instance."""
loader = BackgroundLoaderService(
websocket_service=mock_websocket_service,
anime_service=mock_anime_service,
series_app=mock_series_app
)
# Start the worker
await loader.start()
yield loader
# Stop the worker
await loader.stop()
@pytest.mark.asyncio
async def test_parallel_anime_additions(
background_loader,
mock_websocket_service,
):
"""Test that multiple anime additions are processed in parallel.
This test adds two anime series and verifies that:
1. Both are queued successfully
2. Both start processing without waiting for the other to complete
3. Both complete within a reasonable time frame (indicating parallel execution)
"""
# Track when tasks start and finish
task_events = {
"anime-one": {"started": None, "completed": None},
"anime-two": {"started": None, "completed": None},
}
# Mock the _load_series_data method to simulate work and track timing
original_load = background_loader._load_series_data
async def mock_load_series_data(task):
"""Mock load that simulates work with a delay."""
task_events[task.key]["started"] = datetime.now(timezone.utc)
# Simulate some work with a delay
await asyncio.sleep(0.5)
# Mark progress
task.progress["episodes"] = True
task.progress["nfo"] = True
task.progress["logo"] = True
task.progress["images"] = True
task.status = LoadingStatus.COMPLETED
task.completed_at = datetime.now(timezone.utc)
task_events[task.key]["completed"] = datetime.now(timezone.utc)
# Remove from active tasks (normally done by _load_series_data)
background_loader.active_tasks.pop(task.key, None)
background_loader._load_series_data = mock_load_series_data
# Add two anime series
await background_loader.add_series_loading_task(
key="anime-one",
folder="Anime One",
name="Anime One",
year=2024
)
await background_loader.add_series_loading_task(
key="anime-two",
folder="Anime Two",
name="Anime Two",
year=2024
)
# Wait for both tasks to complete
# With sequential processing, this would take ~1.0 seconds
# With parallel processing, this should take ~0.5 seconds
start_time = datetime.now(timezone.utc)
# Wait for both to complete (with timeout)
max_wait = 2.0 # Maximum wait time
check_interval = 0.1
elapsed = 0
while elapsed < max_wait:
if (task_events["anime-one"]["completed"] is not None and
task_events["anime-two"]["completed"] is not None):
break
await asyncio.sleep(check_interval)
elapsed += check_interval
end_time = datetime.now(timezone.utc)
total_duration = (end_time - start_time).total_seconds()
# Verify both tasks completed
assert task_events["anime-one"]["started"] is not None, "Anime One never started"
assert task_events["anime-one"]["completed"] is not None, "Anime One never completed"
assert task_events["anime-two"]["started"] is not None, "Anime Two never started"
assert task_events["anime-two"]["completed"] is not None, "Anime Two never completed"
# Calculate time between starts
start_diff = abs(
(task_events["anime-two"]["started"] - task_events["anime-one"]["started"]).total_seconds()
)
# Verify parallel execution:
# If tasks run in parallel, they should start close together (< 0.2s apart)
# and complete in roughly the same total time as a single task (~0.5-0.8s total)
assert start_diff < 0.2, (
f"Tasks should start nearly simultaneously (parallel), "
f"but started {start_diff:.2f}s apart (sequential)"
)
# Total duration should be close to single task duration, not sum of both
# Allow some overhead for scheduling
assert total_duration < 1.0, (
f"Parallel execution should take ~0.5s, but took {total_duration:.2f}s "
f"(indicating sequential processing)"
)
print(f"✓ Parallel execution verified:")
print(f" - Start time difference: {start_diff:.3f}s")
print(f" - Total duration: {total_duration:.3f}s")
@pytest.mark.asyncio
async def test_multiple_anime_additions_non_blocking(
background_loader,
):
"""Test that adding anime doesn't block the caller.
The add_series_loading_task method should return immediately
after queuing, not wait for processing to complete.
"""
# Mock _load_series_data to simulate slow work
async def slow_load(task):
await asyncio.sleep(1.0)
task.status = LoadingStatus.COMPLETED
background_loader.active_tasks.pop(task.key, None)
background_loader._load_series_data = slow_load
# Add should return quickly, not wait for processing
start = datetime.now(timezone.utc)
await background_loader.add_series_loading_task(
key="test-anime",
folder="Test Anime",
name="Test Anime",
year=2024
)
end = datetime.now(timezone.utc)
add_duration = (end - start).total_seconds()
# Adding to queue should be fast (<0.1s), not wait for processing (1s)
assert add_duration < 0.1, (
f"add_series_loading_task should return immediately, "
f"but took {add_duration:.2f}s (blocking on processing)"
)
@pytest.mark.asyncio
async def test_concurrent_anime_limit(
background_loader,
):
"""Test that multiple anime can be added and processed concurrently.
This test adds 5 anime series and verifies that at least 2 are
processed concurrently (proves parallel execution exists).
"""
processing_times = []
lock = asyncio.Lock()
async def track_load(task):
"""Track when tasks are processing."""
start = datetime.now(timezone.utc)
async with lock:
processing_times.append(("start", task.key, start))
# Simulate work
await asyncio.sleep(0.3)
end = datetime.now(timezone.utc)
async with lock:
processing_times.append(("end", task.key, end))
task.status = LoadingStatus.COMPLETED
background_loader.active_tasks.pop(task.key, None)
background_loader._load_series_data = track_load
# Add 5 anime
for i in range(5):
await background_loader.add_series_loading_task(
key=f"anime-{i}",
folder=f"Anime {i}",
name=f"Anime {i}",
year=2024
)
# Wait for all to complete
await asyncio.sleep(2.0)
# Analyze processing times to find overlaps
# If tasks run in parallel, we should see overlapping time windows
active_at_once = []
for i, (event1_type, key1, time1) in enumerate(processing_times):
if event1_type == "start":
# Count how many other tasks are active at this start time
concurrent_count = 1 # This task
for event2_type, key2, time2 in processing_times:
if key2 != key1:
# Check if key2 was active when key1 started
# Find start and end times for key2
key2_start = next((t for evt, k, t in processing_times
if evt == "start" and k == key2), None)
key2_end = next((t for evt, k, t in processing_times
if evt == "end" and k == key2), None)
if key2_start and key2_end:
if key2_start <= time1 <= key2_end:
concurrent_count += 1
active_at_once.append(concurrent_count)
max_concurrent = max(active_at_once) if active_at_once else 0
# We should see at least 2 tasks running concurrently
assert max_concurrent >= 2, (
f"Expected at least 2 concurrent tasks, but max was {max_concurrent}. "
f"This indicates sequential processing."
)
print(f"✓ Concurrent processing verified: max {max_concurrent} tasks at once")