"""Integration test for concurrent anime additions via API endpoint. This test verifies that the /api/anime/add endpoint can handle multiple concurrent requests without blocking. """ import asyncio import time from unittest.mock import AsyncMock, MagicMock, patch import pytest from httpx import ASGITransport, AsyncClient from src.server.fastapi_app import app from src.server.services.auth_service import auth_service from src.server.services.background_loader_service import get_background_loader_service from src.server.utils.dependencies import get_optional_database_session, get_series_app def _make_mock_series_app(): """Create a mock SeriesApp with the attributes the endpoint needs.""" mock_app = MagicMock() mock_app.loader.get_year.return_value = 2024 mock_app.list.keyDict = {} return mock_app def _make_mock_loader(): """Create a mock BackgroundLoaderService.""" loader = MagicMock() loader.add_series_loading_task = AsyncMock() return loader @pytest.fixture async def authenticated_client(): """Create authenticated async client with mocked dependencies.""" if not auth_service.is_configured(): auth_service.setup_master_password("TestPass123!") mock_app = _make_mock_series_app() mock_loader = _make_mock_loader() # Override dependencies so the endpoint doesn't need real services app.dependency_overrides[get_series_app] = lambda: mock_app app.dependency_overrides[get_background_loader_service] = lambda: mock_loader app.dependency_overrides[get_optional_database_session] = lambda: None transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as client: # Login to get token r = await client.post( "/api/auth/login", json={"password": "TestPass123!"} ) if r.status_code == 200: token = r.json()["access_token"] client.headers["Authorization"] = f"Bearer {token}" yield client # Clean up overrides app.dependency_overrides.clear() @pytest.mark.asyncio async def test_concurrent_anime_add_requests(authenticated_client): """Test that multiple anime add requests can be processed concurrently. This test sends multiple anime add requests simultaneously and verifies: 1. All requests return 202 Accepted 2. All requests complete within a reasonable time (indicating no blocking) 3. Each anime is added successfully with correct response structure """ anime_list = [ {"link": "https://aniworld.to/anime/stream/test-anime-1", "name": "Test Anime 1"}, {"link": "https://aniworld.to/anime/stream/test-anime-2", "name": "Test Anime 2"}, {"link": "https://aniworld.to/anime/stream/test-anime-3", "name": "Test Anime 3"}, ] start_time = time.time() tasks = [ authenticated_client.post("/api/anime/add", json=anime) for anime in anime_list ] responses = await asyncio.gather(*tasks) total_time = time.time() - start_time for i, response in enumerate(responses): assert response.status_code in (202, 200), ( f"Request {i} failed with status {response.status_code}" ) data = response.json() assert "status" in data assert data["status"] in ("success", "exists") assert "key" in data assert "folder" in data assert "loading_status" in data assert "loading_progress" in data assert total_time < 5.0, ( f"Concurrent requests took {total_time:.2f}s, " f"indicating possible blocking issues" ) print(f"3 concurrent anime add requests completed in {total_time:.2f}s") @pytest.mark.asyncio async def test_same_anime_concurrent_add(authenticated_client): """Test that adding the same anime twice concurrently is handled correctly. Without a database, both requests succeed with 'success' status since the in-memory cache is the only dedup mechanism and might not catch concurrent writes from the same key. """ anime = {"link": "https://aniworld.to/anime/stream/concurrent-test", "name": "Concurrent Test"} task1 = authenticated_client.post("/api/anime/add", json=anime) task2 = authenticated_client.post("/api/anime/add", json=anime) responses = await asyncio.gather(task1, task2) statuses = [r.json().get("status") for r in responses] # Without DB, both succeed; with DB the second may see "exists" assert all(s in ("success", "exists") for s in statuses), ( f"Unexpected statuses: {statuses}" ) keys = [r.json().get("key") for r in responses] assert keys[0] == keys[1], "Both responses should have the same key" print(f"Concurrent same-anime requests handled correctly: {statuses}")