""" Performance Tests for Download Operations This module contains performance and load tests for the AniWorld application, focusing on download operations, concurrent access, and system limitations. """ import unittest import os import sys import tempfile import shutil import time import threading import concurrent.futures import statistics from unittest.mock import Mock, patch import requests import psutil # Add parent directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..')) # Import performance modules from performance_optimizer import ( SpeedLimiter, ParallelDownloadManager, DownloadCache, MemoryMonitor, BandwidthMonitor ) from database_manager import DatabaseManager from error_handler import RetryMechanism, NetworkHealthChecker from app import app class TestDownloadPerformance(unittest.TestCase): """Performance tests for download operations.""" def setUp(self): """Set up performance test environment.""" self.test_dir = tempfile.mkdtemp() self.speed_limiter = SpeedLimiter(max_speed_mbps=50) # 50 Mbps limit self.download_manager = ParallelDownloadManager(max_workers=4) self.cache = DownloadCache(max_size_mb=100) # Performance tracking self.download_times = [] self.memory_usage = [] self.cpu_usage = [] def tearDown(self): """Clean up performance test environment.""" self.download_manager.shutdown() shutil.rmtree(self.test_dir, ignore_errors=True) def mock_download_operation(self, size_mb, delay_seconds=0): """Mock download operation with specified size and delay.""" start_time = time.time() # Simulate download delay if delay_seconds > 0: time.sleep(delay_seconds) # Simulate memory usage for large files if size_mb > 10: dummy_data = b'x' * (1024 * 1024) # 1MB of dummy data time.sleep(0.1) # Simulate processing time del dummy_data end_time = time.time() download_time = end_time - start_time return { 'success': True, 'size_mb': size_mb, 'duration': download_time, 'speed_mbps': (size_mb * 8) / download_time if download_time > 0 else 0 } def test_single_download_performance(self): """Test performance of single download operation.""" test_sizes = [1, 5, 10, 50, 100] # MB results = [] for size_mb in test_sizes: with self.subTest(size_mb=size_mb): # Measure memory before process = psutil.Process() memory_before = process.memory_info().rss / 1024 / 1024 # MB # Perform mock download result = self.mock_download_operation(size_mb, delay_seconds=0.1) # Measure memory after memory_after = process.memory_info().rss / 1024 / 1024 # MB memory_increase = memory_after - memory_before results.append({ 'size_mb': size_mb, 'duration': result['duration'], 'speed_mbps': result['speed_mbps'], 'memory_increase_mb': memory_increase }) # Verify reasonable performance self.assertLess(result['duration'], 5.0) # Should complete within 5 seconds self.assertLess(memory_increase, size_mb * 2) # Memory usage shouldn't exceed 2x file size # Print performance summary print("\nSingle Download Performance Results:") print("Size(MB) | Duration(s) | Speed(Mbps) | Memory++(MB)") print("-" * 50) for result in results: print(f"{result['size_mb']:8} | {result['duration']:11.2f} | {result['speed_mbps']:11.2f} | {result['memory_increase_mb']:12.2f}") def test_concurrent_download_performance(self): """Test performance with multiple concurrent downloads.""" concurrent_levels = [1, 2, 4, 8, 16] download_size = 10 # MB per download results = [] for num_concurrent in concurrent_levels: with self.subTest(num_concurrent=num_concurrent): start_time = time.time() # Track system resources process = psutil.Process() cpu_before = process.cpu_percent() memory_before = process.memory_info().rss / 1024 / 1024 # Perform concurrent downloads with concurrent.futures.ThreadPoolExecutor(max_workers=num_concurrent) as executor: futures = [] for i in range(num_concurrent): future = executor.submit(self.mock_download_operation, download_size, 0.2) futures.append(future) # Wait for all downloads to complete download_results = [future.result() for future in futures] end_time = time.time() total_duration = end_time - start_time # Measure resource usage after time.sleep(0.1) # Allow CPU measurement to stabilize cpu_after = process.cpu_percent() memory_after = process.memory_info().rss / 1024 / 1024 # Calculate metrics total_data_mb = download_size * num_concurrent overall_throughput = total_data_mb / total_duration average_speed = statistics.mean([r['speed_mbps'] for r in download_results]) results.append({ 'concurrent': num_concurrent, 'total_duration': total_duration, 'throughput_mbps': overall_throughput * 8, # Convert to Mbps 'average_speed_mbps': average_speed, 'cpu_increase': cpu_after - cpu_before, 'memory_increase_mb': memory_after - memory_before }) # Performance assertions self.assertLess(total_duration, 10.0) # Should complete within 10 seconds self.assertTrue(all(r['success'] for r in download_results)) # Print concurrent performance summary print("\nConcurrent Download Performance Results:") print("Concurrent | Duration(s) | Throughput(Mbps) | Avg Speed(Mbps) | CPU++(%) | Memory++(MB)") print("-" * 85) for result in results: print(f"{result['concurrent']:10} | {result['total_duration']:11.2f} | {result['throughput_mbps']:15.2f} | {result['average_speed_mbps']:15.2f} | {result['cpu_increase']:8.2f} | {result['memory_increase_mb']:12.2f}") def test_speed_limiting_performance(self): """Test download speed limiting effectiveness.""" speed_limits = [1, 5, 10, 25, 50] # Mbps download_size = 20 # MB results = [] for limit_mbps in speed_limits: with self.subTest(limit_mbps=limit_mbps): # Configure speed limiter limiter = SpeedLimiter(max_speed_mbps=limit_mbps) start_time = time.time() # Simulate download with speed limiting chunks_downloaded = 0 total_chunks = download_size # 1MB chunks for chunk in range(total_chunks): chunk_start = time.time() # Simulate chunk download (1MB) time.sleep(0.05) # Base download time chunk_end = time.time() chunk_time = chunk_end - chunk_start # Calculate speed and apply limiting chunk_size_mb = 1 current_speed_mbps = (chunk_size_mb * 8) / chunk_time if limiter.should_limit_speed(current_speed_mbps): # Calculate delay needed to meet speed limit target_time = (chunk_size_mb * 8) / limit_mbps actual_delay = max(0, target_time - chunk_time) time.sleep(actual_delay) chunks_downloaded += 1 end_time = time.time() total_duration = end_time - start_time actual_speed_mbps = (download_size * 8) / total_duration results.append({ 'limit_mbps': limit_mbps, 'actual_speed_mbps': actual_speed_mbps, 'duration': total_duration, 'speed_compliance': actual_speed_mbps <= (limit_mbps * 1.1) # Allow 10% tolerance }) # Verify speed limiting is working (within 10% tolerance) self.assertLessEqual(actual_speed_mbps, limit_mbps * 1.1) # Print speed limiting results print("\nSpeed Limiting Performance Results:") print("Limit(Mbps) | Actual(Mbps) | Duration(s) | Compliant") print("-" * 50) for result in results: compliance = "✓" if result['speed_compliance'] else "✗" print(f"{result['limit_mbps']:11} | {result['actual_speed_mbps']:12.2f} | {result['duration']:11.2f} | {compliance:9}") def test_cache_performance(self): """Test download cache performance impact.""" cache_sizes = [0, 10, 50, 100, 200] # MB test_urls = [f"http://example.com/video_{i}.mp4" for i in range(20)] results = [] for cache_size_mb in cache_sizes: with self.subTest(cache_size_mb=cache_size_mb): # Create cache with specific size cache = DownloadCache(max_size_mb=cache_size_mb) # First pass: populate cache start_time = time.time() for url in test_urls[:10]: # Cache first 10 items dummy_data = b'x' * (1024 * 1024) # 1MB dummy data cache.set(url, dummy_data) populate_time = time.time() - start_time # Second pass: test cache hits start_time = time.time() cache_hits = 0 for url in test_urls[:10]: cached_data = cache.get(url) if cached_data is not None: cache_hits += 1 lookup_time = time.time() - start_time # Third pass: test cache misses start_time = time.time() cache_misses = 0 for url in test_urls[10:15]: # URLs not in cache cached_data = cache.get(url) if cached_data is None: cache_misses += 1 miss_time = time.time() - start_time cache_hit_rate = cache_hits / 10.0 if cache_size_mb > 0 else 0 results.append({ 'cache_size_mb': cache_size_mb, 'populate_time': populate_time, 'lookup_time': lookup_time, 'miss_time': miss_time, 'hit_rate': cache_hit_rate, 'cache_hits': cache_hits, 'cache_misses': cache_misses }) # Print cache performance results print("\nCache Performance Results:") print("Cache(MB) | Populate(s) | Lookup(s) | Miss(s) | Hit Rate | Hits | Misses") print("-" * 75) for result in results: print(f"{result['cache_size_mb']:9} | {result['populate_time']:11.3f} | {result['lookup_time']:9.3f} | {result['miss_time']:7.3f} | {result['hit_rate']:8.2%} | {result['cache_hits']:4} | {result['cache_misses']:6}") def test_memory_usage_under_load(self): """Test memory usage under heavy load conditions.""" load_scenarios = [ {'downloads': 5, 'size_mb': 10, 'name': 'Light Load'}, {'downloads': 10, 'size_mb': 20, 'name': 'Medium Load'}, {'downloads': 20, 'size_mb': 30, 'name': 'Heavy Load'}, {'downloads': 50, 'size_mb': 50, 'name': 'Extreme Load'} ] results = [] for scenario in load_scenarios: with self.subTest(scenario=scenario['name']): memory_monitor = MemoryMonitor(threshold_mb=1000) # 1GB threshold # Measure baseline memory process = psutil.Process() baseline_memory_mb = process.memory_info().rss / 1024 / 1024 memory_samples = [] def memory_sampler(): """Sample memory usage during test.""" for _ in range(30): # Sample for 30 seconds max current_memory = process.memory_info().rss / 1024 / 1024 memory_samples.append(current_memory) time.sleep(0.1) # Start memory monitoring monitor_thread = threading.Thread(target=memory_sampler) monitor_thread.start() start_time = time.time() # Execute load scenario with concurrent.futures.ThreadPoolExecutor(max_workers=scenario['downloads']) as executor: futures = [] for i in range(scenario['downloads']): future = executor.submit( self.mock_download_operation, scenario['size_mb'], 0.1 ) futures.append(future) # Wait for completion download_results = [future.result() for future in futures] end_time = time.time() # Stop memory monitoring monitor_thread.join(timeout=1) # Calculate memory statistics if memory_samples: peak_memory_mb = max(memory_samples) avg_memory_mb = statistics.mean(memory_samples) memory_increase_mb = peak_memory_mb - baseline_memory_mb else: peak_memory_mb = avg_memory_mb = memory_increase_mb = 0 # Check if memory usage is reasonable expected_memory_mb = scenario['downloads'] * scenario['size_mb'] * 0.1 # 10% of total data memory_efficiency = memory_increase_mb <= expected_memory_mb * 2 # Allow 2x overhead results.append({ 'scenario': scenario['name'], 'downloads': scenario['downloads'], 'size_mb': scenario['size_mb'], 'duration': end_time - start_time, 'baseline_memory_mb': baseline_memory_mb, 'peak_memory_mb': peak_memory_mb, 'avg_memory_mb': avg_memory_mb, 'memory_increase_mb': memory_increase_mb, 'memory_efficient': memory_efficiency, 'all_success': all(r['success'] for r in download_results) }) # Performance assertions self.assertTrue(all(r['success'] for r in download_results)) # Memory increase should be reasonable (not more than 5x the data size) max_acceptable_memory = scenario['downloads'] * scenario['size_mb'] * 5 self.assertLess(memory_increase_mb, max_acceptable_memory) # Print memory usage results print("\nMemory Usage Under Load Results:") print("Scenario | Downloads | Size(MB) | Duration(s) | Peak(MB) | Avg(MB) | Increase(MB) | Efficient | Success") print("-" * 110) for result in results: efficient = "✓" if result['memory_efficient'] else "✗" success = "✓" if result['all_success'] else "✗" print(f"{result['scenario']:13} | {result['downloads']:9} | {result['size_mb']:8} | {result['duration']:11.2f} | {result['peak_memory_mb']:8.1f} | {result['avg_memory_mb']:7.1f} | {result['memory_increase_mb']:12.1f} | {efficient:9} | {success:7}") def test_database_performance_under_load(self): """Test database performance under concurrent access load.""" # Create temporary database test_db = os.path.join(self.test_dir, 'performance_test.db') db_manager = DatabaseManager(test_db) concurrent_operations = [1, 5, 10, 20, 50] operations_per_thread = 100 results = [] try: for num_threads in concurrent_operations: with self.subTest(num_threads=num_threads): def database_worker(worker_id): """Worker function for database operations.""" worker_results = { 'inserts': 0, 'selects': 0, 'updates': 0, 'errors': 0, 'total_time': 0 } start_time = time.time() for op in range(operations_per_thread): try: anime_id = f"perf-{worker_id}-{op}" # Insert operation insert_query = """ INSERT INTO anime_metadata (anime_id, name, folder, created_at, last_updated) VALUES (?, ?, ?, ?, ?) """ success = db_manager.execute_update( insert_query, (anime_id, f"Anime {worker_id}-{op}", f"folder_{worker_id}_{op}", time.time(), time.time()) ) if success: worker_results['inserts'] += 1 # Select operation select_query = "SELECT * FROM anime_metadata WHERE anime_id = ?" select_results = db_manager.execute_query(select_query, (anime_id,)) if select_results: worker_results['selects'] += 1 # Update operation (every 10th operation) if op % 10 == 0: update_query = "UPDATE anime_metadata SET name = ? WHERE anime_id = ?" success = db_manager.execute_update( update_query, (f"Updated {worker_id}-{op}", anime_id) ) if success: worker_results['updates'] += 1 except Exception as e: worker_results['errors'] += 1 worker_results['total_time'] = time.time() - start_time return worker_results # Execute concurrent database operations start_time = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [] for worker_id in range(num_threads): future = executor.submit(database_worker, worker_id) futures.append(future) worker_results = [future.result() for future in futures] total_time = time.time() - start_time # Aggregate results total_inserts = sum(r['inserts'] for r in worker_results) total_selects = sum(r['selects'] for r in worker_results) total_updates = sum(r['updates'] for r in worker_results) total_errors = sum(r['errors'] for r in worker_results) total_operations = total_inserts + total_selects + total_updates avg_ops_per_second = total_operations / total_time if total_time > 0 else 0 error_rate = total_errors / (total_operations + total_errors) if (total_operations + total_errors) > 0 else 0 results.append({ 'threads': num_threads, 'total_time': total_time, 'total_operations': total_operations, 'ops_per_second': avg_ops_per_second, 'inserts': total_inserts, 'selects': total_selects, 'updates': total_updates, 'errors': total_errors, 'error_rate': error_rate }) # Performance assertions self.assertLess(error_rate, 0.05) # Less than 5% error rate self.assertGreater(avg_ops_per_second, 10) # At least 10 ops/second finally: db_manager.close() # Print database performance results print("\nDatabase Performance Under Load Results:") print("Threads | Duration(s) | Total Ops | Ops/Sec | Inserts | Selects | Updates | Errors | Error Rate") print("-" * 95) for result in results: print(f"{result['threads']:7} | {result['total_time']:11.2f} | {result['total_operations']:9} | {result['ops_per_second']:7.1f} | {result['inserts']:7} | {result['selects']:7} | {result['updates']:7} | {result['errors']:6} | {result['error_rate']:9.2%}") def run_performance_tests(): """Run the complete performance test suite.""" print("Running AniWorld Performance Tests...") print("This may take several minutes to complete.") print("=" * 60) # Create test suite suite = unittest.TestSuite() # Add performance test cases performance_test_classes = [ TestDownloadPerformance ] for test_class in performance_test_classes: tests = unittest.TestLoader().loadTestsFromTestCase(test_class) suite.addTests(tests) # Run tests with minimal verbosity for performance focus runner = unittest.TextTestRunner(verbosity=1) start_time = time.time() result = runner.run(suite) total_time = time.time() - start_time print("\n" + "=" * 60) print(f"Performance Tests Summary:") print(f"Total execution time: {total_time:.2f} seconds") print(f"Tests run: {result.testsRun}") print(f"Failures: {len(result.failures)}") print(f"Errors: {len(result.errors)}") return result if __name__ == '__main__': result = run_performance_tests() if result.wasSuccessful(): print("\nAll performance tests passed! ✅") sys.exit(0) else: print("\nSome performance tests failed! ❌") print("\nCheck the output above for detailed performance metrics.") sys.exit(1)