"""Performance tests for NFO batch operations. This module tests the performance characteristics of batch NFO creation including concurrent operations, TMDB API request optimization, and memory usage. """ import asyncio import time from pathlib import Path from typing import List from unittest.mock import AsyncMock, Mock, patch import pytest from src.core.services.nfo_service import NFOService from src.server.api.nfo import batch_create_nfo from src.server.models.nfo import NFOBatchCreateRequest class TestConcurrentNFOCreation: """Test performance of concurrent NFO creation operations.""" @pytest.mark.asyncio async def test_concurrent_nfo_creation_10_series(self): """Test concurrent NFO creation for 10 series completes quickly.""" # Target: 10 series in < 5 seconds with concurrency max_time_seconds = 5.0 num_series = 10 # Create mock series mock_series = [] for i in range(num_series): serie = Mock() serie.key = f"series_{i:02d}" serie.name = f"Test Series {i}" serie.folder = f"Series_{i:02d}" serie.ensure_folder_with_year = Mock(return_value=serie.folder) mock_series.append(serie) # Mock SeriesApp mock_app = Mock() mock_app.list.GetList.return_value = mock_series # Mock NFO service mock_nfo_service = Mock(spec=NFOService) mock_nfo_service.check_nfo_exists = AsyncMock(return_value=False) async def mock_create_nfo(*args, **kwargs): await asyncio.sleep(0.1) # Simulate API call return Path("/test/tvshow.nfo") mock_nfo_service.create_tvshow_nfo = AsyncMock(side_effect=mock_create_nfo) request = NFOBatchCreateRequest( serie_ids=[f"series_{i:02d}" for i in range(num_series)], max_concurrent=5, skip_existing=False ) start_time = time.time() with patch("src.server.api.nfo.get_series_app", return_value=mock_app), \ patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service): result = await batch_create_nfo( request=request, _auth={"username": "test"}, series_app=mock_app, nfo_service=mock_nfo_service ) elapsed_time = time.time() - start_time # Verify results assert result.successful == num_series assert result.failed == 0 # Verify performance assert elapsed_time < max_time_seconds, \ f"Batch creation took {elapsed_time:.2f}s, exceeds limit of {max_time_seconds}s" # With concurrency of 5, should be faster than sequential # Sequential would take num_series * 0.1 = 1.0s minimum # Concurrent should take roughly (num_series / 5) * 0.1 = 0.2s assert elapsed_time < 1.0, "Concurrency not providing speedup" print(f"\nPerformance: {num_series} series in {elapsed_time:.2f}s") print(f"Rate: {num_series / elapsed_time:.2f} series/second") @pytest.mark.asyncio async def test_concurrent_nfo_creation_50_series(self): """Test concurrent NFO creation for 50 series.""" max_time_seconds = 20.0 num_series = 50 mock_series = [] for i in range(num_series): serie = Mock() serie.key = f"series_{i:02d}" serie.name = f"Test Series {i}" serie.folder = f"Series_{i:02d}" serie.ensure_folder_with_year = Mock(return_value=serie.folder) mock_series.append(serie) mock_app = Mock() mock_app.list.GetList.return_value = mock_series mock_nfo_service = Mock(spec=NFOService) mock_nfo_service.check_nfo_exists = AsyncMock(return_value=False) async def mock_create_nfo(*args, **kwargs): await asyncio.sleep(0.05) return Path("/test/tvshow.nfo") mock_nfo_service.create_tvshow_nfo = AsyncMock(side_effect=mock_create_nfo) request = NFOBatchCreateRequest( serie_ids=[f"series_{i:02d}" for i in range(num_series)], max_concurrent=10, skip_existing=False ) start_time = time.time() with patch("src.server.api.nfo.get_series_app", return_value=mock_app), \ patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service): result = await batch_create_nfo( request=request, _auth={"username": "test"}, series_app=mock_app, nfo_service=mock_nfo_service ) elapsed_time = time.time() - start_time assert result.successful == num_series assert elapsed_time < max_time_seconds print(f"\n50 series: {elapsed_time:.2f}s ({num_series / elapsed_time:.2f} series/sec)") @pytest.mark.asyncio async def test_concurrent_nfo_creation_100_series(self): """Test concurrent NFO creation for 100 series.""" max_time_seconds = 30.0 num_series = 100 mock_series = [] for i in range(num_series): serie = Mock() serie.key = f"series_{i:03d}" serie.name = f"Test Series {i}" serie.folder = f"Series_{i:03d}" serie.ensure_folder_with_year = Mock(return_value=serie.folder) mock_series.append(serie) mock_app = Mock() mock_app.list.GetList.return_value = mock_series mock_nfo_service = Mock(spec=NFOService) mock_nfo_service.check_nfo_exists = AsyncMock(return_value=False) async def mock_create_nfo(*args, **kwargs): await asyncio.sleep(0.02) return Path("/test/tvshow.nfo") mock_nfo_service.create_tvshow_nfo = AsyncMock(side_effect=mock_create_nfo) request = NFOBatchCreateRequest( serie_ids=[f"series_{i:03d}" for i in range(num_series)], max_concurrent=10, skip_existing=False ) start_time = time.time() with patch("src.server.api.nfo.get_series_app", return_value=mock_app), \ patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service): result = await batch_create_nfo( request=request, _auth={"username": "test"}, series_app=mock_app, nfo_service=mock_nfo_service ) elapsed_time = time.time() - start_time assert result.successful == num_series assert elapsed_time < max_time_seconds print(f"\n100 series: {elapsed_time:.2f}s ({num_series / elapsed_time:.2f} series/sec)") class TestTMDBAPIBatchingOptimization: """Test TMDB API request batching and optimization.""" @pytest.mark.asyncio async def test_tmdb_api_call_count(self): """Test that TMDB API calls are optimized in batch operations.""" num_series = 20 mock_series = [] for i in range(num_series): serie = Mock() serie.key = f"series_{i:02d}" serie.name = f"Test Series {i}" serie.folder = f"Series_{i:02d}" serie.ensure_folder_with_year = Mock(return_value=serie.folder) mock_series.append(serie) mock_app = Mock() mock_app.list.GetList.return_value = mock_series # Track API calls api_calls = [] mock_nfo_service = Mock(spec=NFOService) mock_nfo_service.check_nfo_exists = AsyncMock(return_value=False) async def track_api_call(*args, **kwargs): api_calls.append(time.time()) await asyncio.sleep(0.01) return Path("/test/tvshow.nfo") mock_nfo_service.create_tvshow_nfo = AsyncMock(side_effect=track_api_call) request = NFOBatchCreateRequest( serie_ids=[f"series_{i:02d}" for i in range(num_series)], max_concurrent=5, skip_existing=False ) with patch("src.server.api.nfo.get_series_app", return_value=mock_app), \ patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service): result = await batch_create_nfo( request=request, _auth={"username": "test"}, series_app=mock_app, nfo_service=mock_nfo_service ) # Verify all series processed assert result.successful == num_series assert len(api_calls) == num_series # Verify concurrent execution pattern # With max_concurrent=5, at most 5 calls should overlap concurrent_windows = 0 window_size = 0.005 # 5ms window (tighter window for more accurate detection) for i in range(len(api_calls)): overlapping = sum( 1 for t in api_calls if abs(t - api_calls[i]) < window_size ) concurrent_windows = max(concurrent_windows, overlapping) # Should have concurrent execution (allow some variance due to timing) assert concurrent_windows <= 7, "Concurrency limit significantly exceeded" assert concurrent_windows >= 2, "No concurrent execution detected" print(f"\nAPI Calls: {len(api_calls)} calls") print(f"Max concurrent: {concurrent_windows}") @pytest.mark.asyncio async def test_tmdb_rate_limit_handling(self): """Test handling of TMDB rate limits during batch operations.""" num_series = 10 mock_series = [] for i in range(num_series): serie = Mock() serie.key = f"series_{i:02d}" serie.name = f"Test Series {i}" serie.folder = f"Series_{i:02d}" serie.ensure_folder_with_year = Mock(return_value=serie.folder) mock_series.append(serie) mock_app = Mock() mock_app.list.GetList.return_value = mock_series call_count = 0 mock_nfo_service = Mock(spec=NFOService) mock_nfo_service.check_nfo_exists = AsyncMock(return_value=False) async def mock_create_with_rate_limit(*args, **kwargs): nonlocal call_count call_count += 1 # Simulate rate limit on 5th call if call_count == 5: from src.core.services.tmdb_client import TMDBAPIError raise TMDBAPIError("Rate limit exceeded") await asyncio.sleep(0.01) return Path("/test/tvshow.nfo") mock_nfo_service.create_tvshow_nfo = AsyncMock(side_effect=mock_create_with_rate_limit) request = NFOBatchCreateRequest( serie_ids=[f"series_{i:02d}" for i in range(num_series)], max_concurrent=3, skip_existing=False ) with patch("src.server.api.nfo.get_series_app", return_value=mock_app), \ patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service): result = await batch_create_nfo( request=request, _auth={"username": "test"}, series_app=mock_app, nfo_service=mock_nfo_service ) # One should fail due to rate limit assert result.successful == num_series - 1 assert result.failed == 1 print(f"\nRate limit test: {result.successful} success, {result.failed} failed") class TestMediaDownloadConcurrency: """Test performance of concurrent media file downloads.""" @pytest.mark.asyncio async def test_concurrent_media_downloads(self): """Test concurrent downloads of poster/logo/fanart.""" num_series = 10 mock_series = [] for i in range(num_series): serie = Mock() serie.key = f"series_{i:02d}" serie.name = f"Test Series {i}" serie.folder = f"Series_{i:02d}" serie.ensure_folder_with_year = Mock(return_value=serie.folder) mock_series.append(serie) mock_app = Mock() mock_app.list.GetList.return_value = mock_series download_times = [] mock_nfo_service = Mock(spec=NFOService) mock_nfo_service.check_nfo_exists = AsyncMock(return_value=False) async def mock_create_with_media(*args, **kwargs): start = time.time() # Simulate NFO creation + 3 media downloads await asyncio.sleep(0.01) # NFO creation if kwargs.get('download_poster', True): await asyncio.sleep(0.02) # Poster download if kwargs.get('download_logo', True): await asyncio.sleep(0.02) # Logo download if kwargs.get('download_fanart', True): await asyncio.sleep(0.02) # Fanart download download_times.append(time.time() - start) return Path("/test/tvshow.nfo") mock_nfo_service.create_tvshow_nfo = AsyncMock(side_effect=mock_create_with_media) request = NFOBatchCreateRequest( serie_ids=[f"series_{i:02d}" for i in range(num_series)], max_concurrent=5, skip_existing=False, download_media=True ) start_time = time.time() with patch("src.server.api.nfo.get_series_app", return_value=mock_app), \ patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service): result = await batch_create_nfo( request=request, _auth={"username": "test"}, series_app=mock_app, nfo_service=mock_nfo_service ) elapsed_time = time.time() - start_time assert result.successful == num_series # With media downloads, should still be concurrent # Sequential would take num_series * 0.07 = 0.7s # Concurrent (5 at a time) should take roughly 2 * 0.07 = 0.14s assert elapsed_time < 0.5, f"Too slow: {elapsed_time:.2f}s" avg_download_time = sum(download_times) / len(download_times) print(f"\nMedia downloads: {num_series} series in {elapsed_time:.2f}s") print(f"Average per series: {avg_download_time:.3f}s") @pytest.mark.asyncio async def test_media_download_without_downloads(self): """Test performance when media downloads are disabled.""" num_series = 20 mock_series = [] for i in range(num_series): serie = Mock() serie.key = f"series_{i:02d}" serie.name = f"Test Series {i}" serie.folder = f"Series_{i:02d}" serie.ensure_folder_with_year = Mock(return_value=serie.folder) mock_series.append(serie) mock_app = Mock() mock_app.list.GetList.return_value = mock_series mock_nfo_service = Mock(spec=NFOService) mock_nfo_service.check_nfo_exists = AsyncMock(return_value=False) async def mock_create_no_media(*args, **kwargs): await asyncio.sleep(0.01) # Only NFO creation return Path("/test/tvshow.nfo") mock_nfo_service.create_tvshow_nfo = AsyncMock(side_effect=mock_create_no_media) request = NFOBatchCreateRequest( serie_ids=[f"series_{i:02d}" for i in range(num_series)], max_concurrent=10, skip_existing=False, download_media=False ) start_time = time.time() with patch("src.server.api.nfo.get_series_app", return_value=mock_app), \ patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service): result = await batch_create_nfo( request=request, _auth={"username": "test"}, series_app=mock_app, nfo_service=mock_nfo_service ) elapsed_time = time.time() - start_time assert result.successful == num_series # Should be faster without media downloads assert elapsed_time < 0.5, f"Too slow: {elapsed_time:.2f}s" print(f"\nNo media: {num_series} series in {elapsed_time:.2f}s") class TestBatchOperationMemoryUsage: """Test memory usage during batch NFO operations.""" @pytest.mark.asyncio async def test_memory_usage_during_batch_operations(self): """Test that memory usage stays reasonable during batch operations.""" import psutil process = psutil.Process() baseline_memory_mb = process.memory_info().rss / 1024 / 1024 num_series = 100 mock_series = [] for i in range(num_series): serie = Mock() serie.key = f"series_{i:03d}" serie.name = f"Test Series {i}" serie.folder = f"Series_{i:03d}" serie.ensure_folder_with_year = Mock(return_value=serie.folder) mock_series.append(serie) mock_app = Mock() mock_app.list.GetList.return_value = mock_series mock_nfo_service = Mock(spec=NFOService) mock_nfo_service.check_nfo_exists = AsyncMock(return_value=False) mock_nfo_service.create_tvshow_nfo = AsyncMock( return_value=Path("/test/tvshow.nfo") ) request = NFOBatchCreateRequest( serie_ids=[f"series_{i:03d}" for i in range(num_series)], max_concurrent=10, skip_existing=False ) with patch("src.server.api.nfo.get_series_app", return_value=mock_app), \ patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service): result = await batch_create_nfo( request=request, _auth={"username": "test"}, series_app=mock_app, nfo_service=mock_nfo_service ) current_memory_mb = process.memory_info().rss / 1024 / 1024 memory_increase_mb = current_memory_mb - baseline_memory_mb assert result.successful == num_series # Memory increase should be reasonable (< 100MB for 100 series) assert memory_increase_mb < 100, \ f"Memory increased by {memory_increase_mb:.2f}MB, too much" print(f"\nMemory: Baseline {baseline_memory_mb:.2f}MB") print(f"After batch: {current_memory_mb:.2f}MB") print(f"Increase: {memory_increase_mb:.2f}MB for {num_series} series") @pytest.mark.asyncio async def test_memory_efficient_result_storage(self): """Test that batch results are stored efficiently.""" import sys num_series = 100 # Create mock results results = [] for i in range(num_series): result = Mock() result.serie_id = f"series_{i:03d}" result.serie_folder = f"Series_{i:03d}" result.success = True result.message = "NFO created successfully" result.nfo_path = f"/test/Series_{i:03d}/tvshow.nfo" results.append(result) # Calculate size total_size = sys.getsizeof(results) size_per_result = total_size / num_series # Each result should be reasonably small assert size_per_result < 5000, \ f"Result size {size_per_result}bytes is too large" print(f"\nResult Storage: {total_size} bytes for {num_series} results") print(f"Average: {size_per_result:.2f} bytes/result") class TestBatchOperationScalability: """Test scalability of batch operations with increasing sizes.""" @pytest.mark.asyncio async def test_batch_time_scales_linearly(self): """Test that batch operation time scales linearly.""" batch_sizes = [10, 20, 40] batch_times = [] for size in batch_sizes: mock_series = [] for i in range(size): serie = Mock() serie.key = f"series_{i:02d}" serie.name = f"Test Series {i}" serie.folder = f"Series_{i:02d}" serie.ensure_folder_with_year = Mock(return_value=serie.folder) mock_series.append(serie) mock_app = Mock() mock_app.list.GetList.return_value = mock_series mock_nfo_service = Mock(spec=NFOService) mock_nfo_service.check_nfo_exists = AsyncMock(return_value=False) async def mock_create(*args, **kwargs): await asyncio.sleep(0.01) return Path("/test/tvshow.nfo") mock_nfo_service.create_tvshow_nfo = AsyncMock(side_effect=mock_create) request = NFOBatchCreateRequest( serie_ids=[f"series_{i:02d}" for i in range(size)], max_concurrent=5, skip_existing=False ) start_time = time.time() with patch("src.server.api.nfo.get_series_app", return_value=mock_app), \ patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service): result = await batch_create_nfo( request=request, _auth={"username": "test"}, series_app=mock_app, nfo_service=mock_nfo_service ) elapsed_time = time.time() - start_time batch_times.append(elapsed_time) assert result.successful == size # Verify linear scaling for i in range(len(batch_times) - 1): ratio = batch_times[i + 1] / batch_times[i] size_ratio = batch_sizes[i + 1] / batch_sizes[i] # Time should scale roughly with size (allow generous variance for small batches) assert ratio < size_ratio * 10, \ f"Scaling worse than linear: {ratio:.2f}x time for {size_ratio}x size" print("\nScalability:") for size, time_taken in zip(batch_sizes, batch_times): rate = size / time_taken print(f" {size} series: {time_taken:.2f}s ({rate:.2f} series/sec)") @pytest.mark.asyncio async def test_optimal_concurrency_level(self): """Test finding optimal concurrency level for batch operations.""" num_series = 20 concurrency_levels = [1, 3, 5, 10] times = {} for concurrency in concurrency_levels: mock_series = [] for i in range(num_series): serie = Mock() serie.key = f"series_{i:02d}" serie.name = f"Test Series {i}" serie.folder = f"Series_{i:02d}" serie.ensure_folder_with_year = Mock(return_value=serie.folder) mock_series.append(serie) mock_app = Mock() mock_app.list.GetList.return_value = mock_series mock_nfo_service = Mock(spec=NFOService) mock_nfo_service.check_nfo_exists = AsyncMock(return_value=False) async def mock_create(*args, **kwargs): await asyncio.sleep(0.02) return Path("/test/tvshow.nfo") mock_nfo_service.create_tvshow_nfo = AsyncMock(side_effect=mock_create) request = NFOBatchCreateRequest( serie_ids=[f"series_{i:02d}" for i in range(num_series)], max_concurrent=concurrency, skip_existing=False ) start_time = time.time() with patch("src.server.api.nfo.get_series_app", return_value=mock_app), \ patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service): result = await batch_create_nfo( request=request, _auth={"username": "test"}, series_app=mock_app, nfo_service=mock_nfo_service ) elapsed_time = time.time() - start_time times[concurrency] = elapsed_time assert result.successful == num_series # Higher concurrency should be faster assert times[10] < times[1], "Higher concurrency not faster" # But diminishing returns (3 vs 1 should show more improvement than 10 vs 5) improvement_3_vs_1 = (times[1] - times[3]) / times[1] improvement_10_vs_5 = (times[5] - times[10]) / times[5] assert improvement_3_vs_1 > 0.2, "Low concurrency improvement too small" print("\nConcurrency levels:") for level in concurrency_levels: print(f" {level} concurrent: {times[level]:.2f}s")