cleanup
This commit is contained in:
@@ -1 +0,0 @@
|
||||
# Integration test package
|
||||
@@ -1,640 +0,0 @@
|
||||
"""
|
||||
Integration tests for API endpoints using Flask test client.
|
||||
|
||||
This module provides integration tests that actually make HTTP requests
|
||||
to the Flask application to test the complete request/response cycle.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
import json
|
||||
import tempfile
|
||||
import os
|
||||
from unittest.mock import patch, MagicMock
|
||||
import sys
|
||||
|
||||
# Add parent directories to path for imports
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..', 'src'))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..', 'src', 'server'))
|
||||
|
||||
|
||||
class APIIntegrationTestBase(unittest.TestCase):
|
||||
"""Base class for API integration tests."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures before each test method."""
|
||||
# Mock all the complex dependencies to avoid initialization issues
|
||||
self.patches = {}
|
||||
|
||||
# Mock the main series app and related components
|
||||
self.patches['series_app'] = patch('src.server.app.series_app')
|
||||
self.patches['config'] = patch('src.server.app.config')
|
||||
self.patches['session_manager'] = patch('src.server.app.session_manager')
|
||||
self.patches['socketio'] = patch('src.server.app.socketio')
|
||||
|
||||
# Start all patches
|
||||
self.mock_series_app = self.patches['series_app'].start()
|
||||
self.mock_config = self.patches['config'].start()
|
||||
self.mock_session_manager = self.patches['session_manager'].start()
|
||||
self.mock_socketio = self.patches['socketio'].start()
|
||||
|
||||
# Configure mock config
|
||||
self.mock_config.anime_directory = '/test/anime'
|
||||
self.mock_config.has_master_password.return_value = True
|
||||
self.mock_config.save_config = MagicMock()
|
||||
|
||||
# Configure mock session manager
|
||||
self.mock_session_manager.sessions = {}
|
||||
self.mock_session_manager.get_session_info.return_value = {
|
||||
'authenticated': False,
|
||||
'session_id': None
|
||||
}
|
||||
|
||||
try:
|
||||
# Import and create the Flask app
|
||||
from src.server.app import app
|
||||
app.config['TESTING'] = True
|
||||
app.config['WTF_CSRF_ENABLED'] = False
|
||||
self.app = app
|
||||
self.client = app.test_client()
|
||||
except ImportError as e:
|
||||
self.skipTest(f"Cannot import Flask app: {e}")
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after each test method."""
|
||||
# Stop all patches
|
||||
for patch_obj in self.patches.values():
|
||||
patch_obj.stop()
|
||||
|
||||
def authenticate_session(self):
|
||||
"""Helper method to set up authenticated session."""
|
||||
session_id = 'test-session-123'
|
||||
self.mock_session_manager.sessions[session_id] = {
|
||||
'authenticated': True,
|
||||
'created_at': 1234567890,
|
||||
'last_accessed': 1234567890
|
||||
}
|
||||
self.mock_session_manager.get_session_info.return_value = {
|
||||
'authenticated': True,
|
||||
'session_id': session_id
|
||||
}
|
||||
|
||||
# Mock session validation
|
||||
def mock_require_auth(func):
|
||||
return func
|
||||
|
||||
def mock_optional_auth(func):
|
||||
return func
|
||||
|
||||
with patch('src.server.app.require_auth', mock_require_auth), \
|
||||
patch('src.server.app.optional_auth', mock_optional_auth):
|
||||
return session_id
|
||||
|
||||
|
||||
class TestAuthenticationAPI(APIIntegrationTestBase):
|
||||
"""Integration tests for authentication API endpoints."""
|
||||
|
||||
def test_auth_status_get(self):
|
||||
"""Test GET /api/auth/status endpoint."""
|
||||
response = self.client.get('/api/auth/status')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertIn('authenticated', data)
|
||||
self.assertIn('has_master_password', data)
|
||||
self.assertIn('setup_required', data)
|
||||
|
||||
@patch('src.server.app.require_auth', lambda f: f) # Skip auth decorator
|
||||
def test_auth_setup_post(self):
|
||||
"""Test POST /api/auth/setup endpoint."""
|
||||
test_data = {'password': 'new_master_password'}
|
||||
|
||||
self.mock_config.has_master_password.return_value = False
|
||||
self.mock_session_manager.create_session.return_value = 'new-session'
|
||||
|
||||
response = self.client.post(
|
||||
'/api/auth/setup',
|
||||
data=json.dumps(test_data),
|
||||
content_type='application/json'
|
||||
)
|
||||
|
||||
# Should not be 404 (route exists)
|
||||
self.assertNotEqual(response.status_code, 404)
|
||||
|
||||
def test_auth_login_post(self):
|
||||
"""Test POST /api/auth/login endpoint."""
|
||||
test_data = {'password': 'test_password'}
|
||||
|
||||
self.mock_session_manager.login.return_value = {
|
||||
'success': True,
|
||||
'session_id': 'test-session'
|
||||
}
|
||||
|
||||
response = self.client.post(
|
||||
'/api/auth/login',
|
||||
data=json.dumps(test_data),
|
||||
content_type='application/json'
|
||||
)
|
||||
|
||||
self.assertNotEqual(response.status_code, 404)
|
||||
|
||||
def test_auth_logout_post(self):
|
||||
"""Test POST /api/auth/logout endpoint."""
|
||||
self.authenticate_session()
|
||||
|
||||
response = self.client.post('/api/auth/logout')
|
||||
self.assertNotEqual(response.status_code, 404)
|
||||
|
||||
|
||||
class TestConfigurationAPI(APIIntegrationTestBase):
|
||||
"""Integration tests for configuration API endpoints."""
|
||||
|
||||
@patch('src.server.app.require_auth', lambda f: f) # Skip auth decorator
|
||||
@patch('src.server.app.init_series_app') # Mock series app initialization
|
||||
def test_config_directory_post(self):
|
||||
"""Test POST /api/config/directory endpoint."""
|
||||
test_data = {'directory': '/new/test/directory'}
|
||||
|
||||
response = self.client.post(
|
||||
'/api/config/directory',
|
||||
data=json.dumps(test_data),
|
||||
content_type='application/json'
|
||||
)
|
||||
|
||||
self.assertNotEqual(response.status_code, 404)
|
||||
# Should be successful or have validation error, but route should exist
|
||||
self.assertIn(response.status_code, [200, 400, 500])
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_scheduler_config_get(self):
|
||||
"""Test GET /api/scheduler/config endpoint."""
|
||||
response = self.client.get('/api/scheduler/config')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertIn('success', data)
|
||||
self.assertIn('config', data)
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_scheduler_config_post(self):
|
||||
"""Test POST /api/scheduler/config endpoint."""
|
||||
test_data = {
|
||||
'enabled': True,
|
||||
'time': '02:30',
|
||||
'auto_download_after_rescan': True
|
||||
}
|
||||
|
||||
response = self.client.post(
|
||||
'/api/scheduler/config',
|
||||
data=json.dumps(test_data),
|
||||
content_type='application/json'
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
self.assertTrue(data['success'])
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_advanced_config_get(self):
|
||||
"""Test GET /api/config/section/advanced endpoint."""
|
||||
response = self.client.get('/api/config/section/advanced')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertTrue(data['success'])
|
||||
self.assertIn('config', data)
|
||||
self.assertIn('max_concurrent_downloads', data['config'])
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_advanced_config_post(self):
|
||||
"""Test POST /api/config/section/advanced endpoint."""
|
||||
test_data = {
|
||||
'max_concurrent_downloads': 5,
|
||||
'provider_timeout': 45,
|
||||
'enable_debug_mode': True
|
||||
}
|
||||
|
||||
response = self.client.post(
|
||||
'/api/config/section/advanced',
|
||||
data=json.dumps(test_data),
|
||||
content_type='application/json'
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
self.assertTrue(data['success'])
|
||||
|
||||
|
||||
class TestSeriesAPI(APIIntegrationTestBase):
|
||||
"""Integration tests for series management API endpoints."""
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_series_get_with_data(self):
|
||||
"""Test GET /api/series endpoint with mock data."""
|
||||
# Mock series data
|
||||
mock_serie = MagicMock()
|
||||
mock_serie.folder = 'test_anime'
|
||||
mock_serie.name = 'Test Anime'
|
||||
mock_serie.episodeDict = {'Season 1': [1, 2, 3, 4, 5]}
|
||||
|
||||
self.mock_series_app.List.GetList.return_value = [mock_serie]
|
||||
|
||||
response = self.client.get('/api/series')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertEqual(data['status'], 'success')
|
||||
self.assertIn('series', data)
|
||||
self.assertIn('total_series', data)
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_series_get_no_data(self):
|
||||
"""Test GET /api/series endpoint with no data."""
|
||||
self.mock_series_app = None
|
||||
|
||||
with patch('src.server.app.series_app', None):
|
||||
response = self.client.get('/api/series')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertEqual(data['status'], 'success')
|
||||
self.assertEqual(len(data['series']), 0)
|
||||
self.assertEqual(data['total_series'], 0)
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_search_post(self):
|
||||
"""Test POST /api/search endpoint."""
|
||||
test_data = {'query': 'test anime search'}
|
||||
|
||||
mock_results = [
|
||||
{'name': 'Test Anime 1', 'link': 'https://example.com/anime1'},
|
||||
{'name': 'Test Anime 2', 'link': 'https://example.com/anime2'}
|
||||
]
|
||||
|
||||
self.mock_series_app.search.return_value = mock_results
|
||||
|
||||
response = self.client.post(
|
||||
'/api/search',
|
||||
data=json.dumps(test_data),
|
||||
content_type='application/json'
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertEqual(data['status'], 'success')
|
||||
self.assertIn('results', data)
|
||||
self.assertIn('total', data)
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_search_post_empty_query(self):
|
||||
"""Test POST /api/search endpoint with empty query."""
|
||||
test_data = {'query': ''}
|
||||
|
||||
response = self.client.post(
|
||||
'/api/search',
|
||||
data=json.dumps(test_data),
|
||||
content_type='application/json'
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 400)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertEqual(data['status'], 'error')
|
||||
self.assertIn('empty', data['message'])
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
@patch('src.server.app.is_scanning', False)
|
||||
@patch('src.server.app.is_process_running')
|
||||
@patch('threading.Thread')
|
||||
def test_rescan_post(self, mock_thread, mock_is_running):
|
||||
"""Test POST /api/rescan endpoint."""
|
||||
mock_is_running.return_value = False
|
||||
|
||||
response = self.client.post('/api/rescan')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertEqual(data['status'], 'success')
|
||||
self.assertIn('started', data['message'])
|
||||
|
||||
|
||||
class TestDownloadAPI(APIIntegrationTestBase):
|
||||
"""Integration tests for download management API endpoints."""
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
@patch('src.server.app.is_downloading', False)
|
||||
@patch('src.server.app.is_process_running')
|
||||
def test_download_post(self, mock_is_running):
|
||||
"""Test POST /api/download endpoint."""
|
||||
mock_is_running.return_value = False
|
||||
|
||||
test_data = {'series': 'test_series', 'episodes': [1, 2, 3]}
|
||||
|
||||
response = self.client.post(
|
||||
'/api/download',
|
||||
data=json.dumps(test_data),
|
||||
content_type='application/json'
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertEqual(data['status'], 'success')
|
||||
|
||||
|
||||
class TestStatusAPI(APIIntegrationTestBase):
|
||||
"""Integration tests for status and monitoring API endpoints."""
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
@patch('src.server.app.is_process_running')
|
||||
def test_process_locks_status_get(self, mock_is_running):
|
||||
"""Test GET /api/process/locks/status endpoint."""
|
||||
mock_is_running.return_value = False
|
||||
|
||||
response = self.client.get('/api/process/locks/status')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertTrue(data['success'])
|
||||
self.assertIn('locks', data)
|
||||
self.assertIn('rescan', data['locks'])
|
||||
self.assertIn('download', data['locks'])
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
@patch.dict('os.environ', {'ANIME_DIRECTORY': '/test/anime'})
|
||||
def test_status_get(self):
|
||||
"""Test GET /api/status endpoint."""
|
||||
response = self.client.get('/api/status')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertTrue(data['success'])
|
||||
self.assertIn('directory', data)
|
||||
self.assertIn('series_count', data)
|
||||
self.assertIn('timestamp', data)
|
||||
|
||||
|
||||
class TestLoggingAPI(APIIntegrationTestBase):
|
||||
"""Integration tests for logging management API endpoints."""
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_logging_config_get(self):
|
||||
"""Test GET /api/logging/config endpoint."""
|
||||
response = self.client.get('/api/logging/config')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertTrue(data['success'])
|
||||
self.assertIn('config', data)
|
||||
self.assertIn('log_level', data['config'])
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_logging_config_post(self):
|
||||
"""Test POST /api/logging/config endpoint."""
|
||||
test_data = {
|
||||
'log_level': 'DEBUG',
|
||||
'enable_console_logging': False
|
||||
}
|
||||
|
||||
response = self.client.post(
|
||||
'/api/logging/config',
|
||||
data=json.dumps(test_data),
|
||||
content_type='application/json'
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertTrue(data['success'])
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_logging_files_get(self):
|
||||
"""Test GET /api/logging/files endpoint."""
|
||||
response = self.client.get('/api/logging/files')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertTrue(data['success'])
|
||||
self.assertIn('files', data)
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_logging_test_post(self):
|
||||
"""Test POST /api/logging/test endpoint."""
|
||||
response = self.client.post('/api/logging/test')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertTrue(data['success'])
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_logging_cleanup_post(self):
|
||||
"""Test POST /api/logging/cleanup endpoint."""
|
||||
test_data = {'days': 7}
|
||||
|
||||
response = self.client.post(
|
||||
'/api/logging/cleanup',
|
||||
data=json.dumps(test_data),
|
||||
content_type='application/json'
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertTrue(data['success'])
|
||||
self.assertIn('7 days', data['message'])
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_logging_tail_get(self):
|
||||
"""Test GET /api/logging/files/<filename>/tail endpoint."""
|
||||
response = self.client.get('/api/logging/files/test.log/tail?lines=50')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertTrue(data['success'])
|
||||
self.assertIn('content', data)
|
||||
self.assertEqual(data['filename'], 'test.log')
|
||||
|
||||
|
||||
class TestBackupAPI(APIIntegrationTestBase):
|
||||
"""Integration tests for configuration backup API endpoints."""
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_config_backup_create_post(self):
|
||||
"""Test POST /api/config/backup endpoint."""
|
||||
response = self.client.post('/api/config/backup')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertTrue(data['success'])
|
||||
self.assertIn('filename', data)
|
||||
self.assertIn('config_backup_', data['filename'])
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_config_backups_get(self):
|
||||
"""Test GET /api/config/backups endpoint."""
|
||||
response = self.client.get('/api/config/backups')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertTrue(data['success'])
|
||||
self.assertIn('backups', data)
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_config_backup_restore_post(self):
|
||||
"""Test POST /api/config/backup/<filename>/restore endpoint."""
|
||||
filename = 'config_backup_20231201_143000.json'
|
||||
response = self.client.post(f'/api/config/backup/{filename}/restore')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertTrue(data['success'])
|
||||
self.assertIn(filename, data['message'])
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
def test_config_backup_download_get(self):
|
||||
"""Test GET /api/config/backup/<filename>/download endpoint."""
|
||||
filename = 'config_backup_20231201_143000.json'
|
||||
response = self.client.get(f'/api/config/backup/{filename}/download')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertTrue(data['success'])
|
||||
|
||||
|
||||
class TestDiagnosticsAPI(APIIntegrationTestBase):
|
||||
"""Integration tests for diagnostics and monitoring API endpoints."""
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
@patch('src.server.app.network_health_checker')
|
||||
def test_network_diagnostics_get(self, mock_checker):
|
||||
"""Test GET /api/diagnostics/network endpoint."""
|
||||
mock_checker.get_network_status.return_value = {
|
||||
'internet_connected': True,
|
||||
'dns_working': True
|
||||
}
|
||||
mock_checker.check_url_reachability.return_value = True
|
||||
|
||||
response = self.client.get('/api/diagnostics/network')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertEqual(data['status'], 'success')
|
||||
self.assertIn('data', data)
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
@patch('src.server.app.error_recovery_manager')
|
||||
def test_diagnostics_errors_get(self, mock_manager):
|
||||
"""Test GET /api/diagnostics/errors endpoint."""
|
||||
mock_manager.error_history = [
|
||||
{'timestamp': '2023-12-01T14:30:00', 'error': 'Test error'}
|
||||
]
|
||||
mock_manager.blacklisted_urls = {'bad_url.com': True}
|
||||
|
||||
response = self.client.get('/api/diagnostics/errors')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertEqual(data['status'], 'success')
|
||||
self.assertIn('data', data)
|
||||
|
||||
@patch('src.server.app.require_auth', lambda f: f) # Skip auth decorator
|
||||
@patch('src.server.app.error_recovery_manager')
|
||||
def test_recovery_clear_blacklist_post(self, mock_manager):
|
||||
"""Test POST /api/recovery/clear-blacklist endpoint."""
|
||||
mock_manager.blacklisted_urls = {'url1': True}
|
||||
|
||||
response = self.client.post('/api/recovery/clear-blacklist')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertEqual(data['status'], 'success')
|
||||
|
||||
@patch('src.server.app.optional_auth', lambda f: f) # Skip auth decorator
|
||||
@patch('src.server.app.error_recovery_manager')
|
||||
def test_recovery_retry_counts_get(self, mock_manager):
|
||||
"""Test GET /api/recovery/retry-counts endpoint."""
|
||||
mock_manager.retry_counts = {'url1': 3, 'url2': 5}
|
||||
|
||||
response = self.client.get('/api/recovery/retry-counts')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.data)
|
||||
|
||||
self.assertEqual(data['status'], 'success')
|
||||
self.assertIn('data', data)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Run integration tests
|
||||
loader = unittest.TestLoader()
|
||||
|
||||
# Load all test cases
|
||||
test_classes = [
|
||||
TestAuthenticationAPI,
|
||||
TestConfigurationAPI,
|
||||
TestSeriesAPI,
|
||||
TestDownloadAPI,
|
||||
TestStatusAPI,
|
||||
TestLoggingAPI,
|
||||
TestBackupAPI,
|
||||
TestDiagnosticsAPI
|
||||
]
|
||||
|
||||
# Create test suite
|
||||
suite = unittest.TestSuite()
|
||||
for test_class in test_classes:
|
||||
tests = loader.loadTestsFromTestCase(test_class)
|
||||
suite.addTests(tests)
|
||||
|
||||
# Run tests
|
||||
runner = unittest.TextTestRunner(verbosity=2)
|
||||
result = runner.run(suite)
|
||||
|
||||
# Print summary
|
||||
print(f"\n{'='*70}")
|
||||
print(f"API INTEGRATION TEST SUMMARY")
|
||||
print(f"{'='*70}")
|
||||
print(f"Tests run: {result.testsRun}")
|
||||
print(f"Failures: {len(result.failures)}")
|
||||
print(f"Errors: {len(result.errors)}")
|
||||
print(f"Skipped: {len(result.skipped) if hasattr(result, 'skipped') else 0}")
|
||||
|
||||
if result.testsRun > 0:
|
||||
success_rate = ((result.testsRun - len(result.failures) - len(result.errors)) / result.testsRun * 100)
|
||||
print(f"Success rate: {success_rate:.1f}%")
|
||||
|
||||
# Print details of any failures or errors
|
||||
if result.failures:
|
||||
print(f"\n🔥 FAILURES:")
|
||||
for test, traceback in result.failures:
|
||||
print(f" ❌ {test}")
|
||||
print(f" {traceback.split('AssertionError: ')[-1].split(chr(10))[0] if 'AssertionError:' in traceback else 'See traceback above'}")
|
||||
|
||||
if result.errors:
|
||||
print(f"\n💥 ERRORS:")
|
||||
for test, traceback in result.errors:
|
||||
print(f" 💣 {test}")
|
||||
error_line = traceback.split(chr(10))[-2] if len(traceback.split(chr(10))) > 1 else 'See traceback above'
|
||||
print(f" {error_line}")
|
||||
|
||||
# Exit with proper code
|
||||
exit(0 if result.wasSuccessful() else 1)
|
||||
@@ -1,619 +0,0 @@
|
||||
"""
|
||||
Integration Tests for Web Interface
|
||||
|
||||
This module contains integration tests for the Flask web application,
|
||||
testing the complete workflow from HTTP requests to database operations.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import shutil
|
||||
import json
|
||||
import sqlite3
|
||||
from unittest.mock import Mock, MagicMock, patch
|
||||
import threading
|
||||
import time
|
||||
|
||||
# Add parent directory to path for imports
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||
|
||||
# Import Flask app and components
|
||||
from app import app, socketio, init_series_app
|
||||
from database_manager import DatabaseManager, AnimeMetadata
|
||||
from auth import session_manager
|
||||
from config import config
|
||||
|
||||
|
||||
class TestWebInterface(unittest.TestCase):
|
||||
"""Integration tests for the web interface."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test environment."""
|
||||
# Create temporary directory for test files
|
||||
self.test_dir = tempfile.mkdtemp()
|
||||
|
||||
# Configure Flask app for testing
|
||||
app.config['TESTING'] = True
|
||||
app.config['WTF_CSRF_ENABLED'] = False
|
||||
app.config['SECRET_KEY'] = 'test-secret-key'
|
||||
|
||||
self.app = app
|
||||
self.client = app.test_client()
|
||||
|
||||
# Create test database
|
||||
self.test_db_path = os.path.join(self.test_dir, 'test.db')
|
||||
|
||||
# Mock configuration
|
||||
self.original_config = {}
|
||||
for attr in ['anime_directory', 'master_password', 'database_path']:
|
||||
if hasattr(config, attr):
|
||||
self.original_config[attr] = getattr(config, attr)
|
||||
|
||||
config.anime_directory = self.test_dir
|
||||
config.master_password = 'test123'
|
||||
config.database_path = self.test_db_path
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up test environment."""
|
||||
# Restore original configuration
|
||||
for attr, value in self.original_config.items():
|
||||
setattr(config, attr, value)
|
||||
|
||||
# Clean up temporary files
|
||||
shutil.rmtree(self.test_dir, ignore_errors=True)
|
||||
|
||||
# Clear sessions
|
||||
session_manager.clear_all_sessions()
|
||||
|
||||
def test_index_page_unauthenticated(self):
|
||||
"""Test index page redirects to login when unauthenticated."""
|
||||
response = self.client.get('/')
|
||||
|
||||
# Should redirect to login
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertIn('/login', response.location)
|
||||
|
||||
def test_login_page_loads(self):
|
||||
"""Test login page loads correctly."""
|
||||
response = self.client.get('/login')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn(b'login', response.data.lower())
|
||||
|
||||
def test_successful_login(self):
|
||||
"""Test successful login flow."""
|
||||
# Attempt login with correct password
|
||||
response = self.client.post('/login', data={
|
||||
'password': 'test123'
|
||||
}, follow_redirects=True)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
# Should be redirected to main page after successful login
|
||||
|
||||
def test_failed_login(self):
|
||||
"""Test failed login with wrong password."""
|
||||
response = self.client.post('/login', data={
|
||||
'password': 'wrong_password'
|
||||
})
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
# Should return to login page with error
|
||||
|
||||
def test_authenticated_index_page(self):
|
||||
"""Test index page loads when authenticated."""
|
||||
# Login first
|
||||
with self.client.session_transaction() as sess:
|
||||
sess['authenticated'] = True
|
||||
sess['session_id'] = 'test-session'
|
||||
session_manager.sessions['test-session'] = {
|
||||
'authenticated': True,
|
||||
'created_at': time.time(),
|
||||
'last_accessed': time.time()
|
||||
}
|
||||
|
||||
response = self.client.get('/')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_api_authentication_required(self):
|
||||
"""Test API endpoints require authentication."""
|
||||
# Test unauthenticated API call
|
||||
response = self.client.get('/api/series/list')
|
||||
self.assertEqual(response.status_code, 401)
|
||||
|
||||
# Test authenticated API call
|
||||
with self.client.session_transaction() as sess:
|
||||
sess['authenticated'] = True
|
||||
sess['session_id'] = 'test-session'
|
||||
session_manager.sessions['test-session'] = {
|
||||
'authenticated': True,
|
||||
'created_at': time.time(),
|
||||
'last_accessed': time.time()
|
||||
}
|
||||
|
||||
response = self.client.get('/api/series/list')
|
||||
# Should not return 401 (might return other codes based on implementation)
|
||||
self.assertNotEqual(response.status_code, 401)
|
||||
|
||||
def test_config_api_endpoints(self):
|
||||
"""Test configuration API endpoints."""
|
||||
# Authenticate
|
||||
with self.client.session_transaction() as sess:
|
||||
sess['authenticated'] = True
|
||||
sess['session_id'] = 'test-session'
|
||||
session_manager.sessions['test-session'] = {
|
||||
'authenticated': True,
|
||||
'created_at': time.time(),
|
||||
'last_accessed': time.time()
|
||||
}
|
||||
|
||||
# Get current config
|
||||
response = self.client.get('/api/config')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
config_data = json.loads(response.data)
|
||||
self.assertIn('anime_directory', config_data)
|
||||
|
||||
def test_download_queue_operations(self):
|
||||
"""Test download queue management."""
|
||||
# Authenticate
|
||||
with self.client.session_transaction() as sess:
|
||||
sess['authenticated'] = True
|
||||
sess['session_id'] = 'test-session'
|
||||
session_manager.sessions['test-session'] = {
|
||||
'authenticated': True,
|
||||
'created_at': time.time(),
|
||||
'last_accessed': time.time()
|
||||
}
|
||||
|
||||
# Get queue status
|
||||
response = self.client.get('/api/queue/status')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
queue_data = json.loads(response.data)
|
||||
self.assertIn('status', queue_data)
|
||||
|
||||
def test_process_locking_endpoints(self):
|
||||
"""Test process locking API endpoints."""
|
||||
# Authenticate
|
||||
with self.client.session_transaction() as sess:
|
||||
sess['authenticated'] = True
|
||||
sess['session_id'] = 'test-session'
|
||||
session_manager.sessions['test-session'] = {
|
||||
'authenticated': True,
|
||||
'created_at': time.time(),
|
||||
'last_accessed': time.time()
|
||||
}
|
||||
|
||||
# Check process locks
|
||||
response = self.client.get('/api/process/locks')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
locks_data = json.loads(response.data)
|
||||
self.assertIn('locks', locks_data)
|
||||
|
||||
def test_database_api_endpoints(self):
|
||||
"""Test database management API endpoints."""
|
||||
# Authenticate
|
||||
with self.client.session_transaction() as sess:
|
||||
sess['authenticated'] = True
|
||||
sess['session_id'] = 'test-session'
|
||||
session_manager.sessions['test-session'] = {
|
||||
'authenticated': True,
|
||||
'created_at': time.time(),
|
||||
'last_accessed': time.time()
|
||||
}
|
||||
|
||||
# Get database info
|
||||
response = self.client.get('/api/database/info')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
db_data = json.loads(response.data)
|
||||
self.assertIn('status', db_data)
|
||||
|
||||
def test_health_monitoring_endpoints(self):
|
||||
"""Test health monitoring API endpoints."""
|
||||
# Authenticate (health endpoints might be public)
|
||||
with self.client.session_transaction() as sess:
|
||||
sess['authenticated'] = True
|
||||
sess['session_id'] = 'test-session'
|
||||
session_manager.sessions['test-session'] = {
|
||||
'authenticated': True,
|
||||
'created_at': time.time(),
|
||||
'last_accessed': time.time()
|
||||
}
|
||||
|
||||
# Get system health
|
||||
response = self.client.get('/api/health/system')
|
||||
# Health endpoints might be accessible without auth
|
||||
self.assertIn(response.status_code, [200, 401])
|
||||
|
||||
def test_error_handling(self):
|
||||
"""Test error handling for invalid requests."""
|
||||
# Authenticate
|
||||
with self.client.session_transaction() as sess:
|
||||
sess['authenticated'] = True
|
||||
sess['session_id'] = 'test-session'
|
||||
session_manager.sessions['test-session'] = {
|
||||
'authenticated': True,
|
||||
'created_at': time.time(),
|
||||
'last_accessed': time.time()
|
||||
}
|
||||
|
||||
# Test invalid endpoint
|
||||
response = self.client.get('/api/nonexistent/endpoint')
|
||||
self.assertEqual(response.status_code, 404)
|
||||
|
||||
# Test invalid method
|
||||
response = self.client.post('/api/series/list')
|
||||
# Should return method not allowed or other appropriate error
|
||||
self.assertIn(response.status_code, [405, 400, 404])
|
||||
|
||||
def test_json_response_format(self):
|
||||
"""Test API responses return valid JSON."""
|
||||
# Authenticate
|
||||
with self.client.session_transaction() as sess:
|
||||
sess['authenticated'] = True
|
||||
sess['session_id'] = 'test-session'
|
||||
session_manager.sessions['test-session'] = {
|
||||
'authenticated': True,
|
||||
'created_at': time.time(),
|
||||
'last_accessed': time.time()
|
||||
}
|
||||
|
||||
# Test various API endpoints for valid JSON
|
||||
endpoints = [
|
||||
'/api/config',
|
||||
'/api/queue/status',
|
||||
'/api/process/locks',
|
||||
'/api/database/info'
|
||||
]
|
||||
|
||||
for endpoint in endpoints:
|
||||
with self.subTest(endpoint=endpoint):
|
||||
response = self.client.get(endpoint)
|
||||
if response.status_code == 200:
|
||||
# Should be valid JSON
|
||||
try:
|
||||
json.loads(response.data)
|
||||
except json.JSONDecodeError:
|
||||
self.fail(f"Invalid JSON response from {endpoint}")
|
||||
|
||||
|
||||
class TestSocketIOEvents(unittest.TestCase):
|
||||
"""Integration tests for SocketIO events."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test environment for SocketIO."""
|
||||
app.config['TESTING'] = True
|
||||
self.socketio_client = socketio.test_client(app)
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up SocketIO test environment."""
|
||||
if self.socketio_client:
|
||||
self.socketio_client.disconnect()
|
||||
|
||||
def test_socketio_connection(self):
|
||||
"""Test SocketIO connection establishment."""
|
||||
self.assertTrue(self.socketio_client.is_connected())
|
||||
|
||||
def test_download_progress_events(self):
|
||||
"""Test download progress event handling."""
|
||||
# Mock download progress update
|
||||
test_progress = {
|
||||
'episode': 'Test Episode 1',
|
||||
'progress': 50,
|
||||
'speed': '1.5 MB/s',
|
||||
'eta': '2 minutes'
|
||||
}
|
||||
|
||||
# Emit progress update
|
||||
socketio.emit('download_progress', test_progress)
|
||||
|
||||
# Check if client receives the event
|
||||
received = self.socketio_client.get_received()
|
||||
# Note: In real tests, you'd check if the client received the event
|
||||
|
||||
def test_scan_progress_events(self):
|
||||
"""Test scan progress event handling."""
|
||||
test_scan_data = {
|
||||
'status': 'scanning',
|
||||
'current_folder': 'Test Anime',
|
||||
'progress': 25,
|
||||
'total_series': 100,
|
||||
'scanned_series': 25
|
||||
}
|
||||
|
||||
# Emit scan progress
|
||||
socketio.emit('scan_progress', test_scan_data)
|
||||
|
||||
# Verify event handling
|
||||
received = self.socketio_client.get_received()
|
||||
# In real implementation, verify the event was received and processed
|
||||
|
||||
|
||||
class TestDatabaseIntegration(unittest.TestCase):
|
||||
"""Integration tests for database operations."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up database integration test environment."""
|
||||
self.test_dir = tempfile.mkdtemp()
|
||||
self.test_db = os.path.join(self.test_dir, 'integration_test.db')
|
||||
self.db_manager = DatabaseManager(self.test_db)
|
||||
|
||||
# Configure Flask app for testing
|
||||
app.config['TESTING'] = True
|
||||
self.client = app.test_client()
|
||||
|
||||
# Authenticate for API calls
|
||||
self.auth_session = {
|
||||
'authenticated': True,
|
||||
'session_id': 'integration-test-session'
|
||||
}
|
||||
session_manager.sessions['integration-test-session'] = {
|
||||
'authenticated': True,
|
||||
'created_at': time.time(),
|
||||
'last_accessed': time.time()
|
||||
}
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up database integration test environment."""
|
||||
self.db_manager.close()
|
||||
shutil.rmtree(self.test_dir, ignore_errors=True)
|
||||
session_manager.clear_all_sessions()
|
||||
|
||||
def test_anime_crud_via_api(self):
|
||||
"""Test anime CRUD operations via API endpoints."""
|
||||
# Authenticate session
|
||||
with self.client.session_transaction() as sess:
|
||||
sess.update(self.auth_session)
|
||||
|
||||
# Create anime via API
|
||||
anime_data = {
|
||||
'name': 'Integration Test Anime',
|
||||
'folder': 'integration_test_folder',
|
||||
'key': 'integration-test-key',
|
||||
'description': 'Test anime for integration testing',
|
||||
'genres': ['Action', 'Adventure'],
|
||||
'release_year': 2023,
|
||||
'status': 'ongoing'
|
||||
}
|
||||
|
||||
response = self.client.post('/api/database/anime',
|
||||
data=json.dumps(anime_data),
|
||||
content_type='application/json')
|
||||
|
||||
self.assertEqual(response.status_code, 201)
|
||||
response_data = json.loads(response.data)
|
||||
self.assertEqual(response_data['status'], 'success')
|
||||
|
||||
anime_id = response_data['data']['anime_id']
|
||||
|
||||
# Read anime via API
|
||||
response = self.client.get(f'/api/database/anime/{anime_id}')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response_data = json.loads(response.data)
|
||||
self.assertEqual(response_data['status'], 'success')
|
||||
self.assertEqual(response_data['data']['name'], anime_data['name'])
|
||||
|
||||
# Update anime via API
|
||||
update_data = {
|
||||
'description': 'Updated description for integration testing'
|
||||
}
|
||||
|
||||
response = self.client.put(f'/api/database/anime/{anime_id}',
|
||||
data=json.dumps(update_data),
|
||||
content_type='application/json')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Verify update
|
||||
response = self.client.get(f'/api/database/anime/{anime_id}')
|
||||
response_data = json.loads(response.data)
|
||||
self.assertEqual(
|
||||
response_data['data']['description'],
|
||||
update_data['description']
|
||||
)
|
||||
|
||||
# Delete anime via API
|
||||
response = self.client.delete(f'/api/database/anime/{anime_id}')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Verify deletion
|
||||
response = self.client.get(f'/api/database/anime/{anime_id}')
|
||||
self.assertEqual(response.status_code, 404)
|
||||
|
||||
def test_backup_operations_via_api(self):
|
||||
"""Test backup operations via API."""
|
||||
# Authenticate session
|
||||
with self.client.session_transaction() as sess:
|
||||
sess.update(self.auth_session)
|
||||
|
||||
# Create test data
|
||||
anime_data = {
|
||||
'name': 'Backup Test Anime',
|
||||
'folder': 'backup_test_folder',
|
||||
'key': 'backup-test-key'
|
||||
}
|
||||
|
||||
response = self.client.post('/api/database/anime',
|
||||
data=json.dumps(anime_data),
|
||||
content_type='application/json')
|
||||
self.assertEqual(response.status_code, 201)
|
||||
|
||||
# Create backup via API
|
||||
backup_data = {
|
||||
'backup_type': 'full',
|
||||
'description': 'Integration test backup'
|
||||
}
|
||||
|
||||
response = self.client.post('/api/database/backups/create',
|
||||
data=json.dumps(backup_data),
|
||||
content_type='application/json')
|
||||
|
||||
self.assertEqual(response.status_code, 201)
|
||||
response_data = json.loads(response.data)
|
||||
self.assertEqual(response_data['status'], 'success')
|
||||
|
||||
backup_id = response_data['data']['backup_id']
|
||||
|
||||
# List backups
|
||||
response = self.client.get('/api/database/backups')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response_data = json.loads(response.data)
|
||||
self.assertGreater(response_data['data']['count'], 0)
|
||||
|
||||
# Verify backup exists in list
|
||||
backup_found = False
|
||||
for backup in response_data['data']['backups']:
|
||||
if backup['backup_id'] == backup_id:
|
||||
backup_found = True
|
||||
break
|
||||
self.assertTrue(backup_found)
|
||||
|
||||
def test_search_functionality(self):
|
||||
"""Test search functionality via API."""
|
||||
# Authenticate session
|
||||
with self.client.session_transaction() as sess:
|
||||
sess.update(self.auth_session)
|
||||
|
||||
# Create test anime for searching
|
||||
test_anime = [
|
||||
{'name': 'Attack on Titan', 'folder': 'attack_titan', 'key': 'attack-titan'},
|
||||
{'name': 'Death Note', 'folder': 'death_note', 'key': 'death-note'},
|
||||
{'name': 'Naruto', 'folder': 'naruto', 'key': 'naruto'}
|
||||
]
|
||||
|
||||
for anime_data in test_anime:
|
||||
response = self.client.post('/api/database/anime',
|
||||
data=json.dumps(anime_data),
|
||||
content_type='application/json')
|
||||
self.assertEqual(response.status_code, 201)
|
||||
|
||||
# Test search
|
||||
search_queries = [
|
||||
('Attack', 1), # Should find "Attack on Titan"
|
||||
('Note', 1), # Should find "Death Note"
|
||||
('Naruto', 1), # Should find "Naruto"
|
||||
('Anime', 0), # Should find nothing
|
||||
('', 0) # Empty search should return error
|
||||
]
|
||||
|
||||
for search_term, expected_count in search_queries:
|
||||
with self.subTest(search_term=search_term):
|
||||
response = self.client.get(f'/api/database/anime/search?q={search_term}')
|
||||
|
||||
if search_term == '':
|
||||
self.assertEqual(response.status_code, 400)
|
||||
else:
|
||||
self.assertEqual(response.status_code, 200)
|
||||
response_data = json.loads(response.data)
|
||||
self.assertEqual(response_data['data']['count'], expected_count)
|
||||
|
||||
|
||||
class TestPerformanceIntegration(unittest.TestCase):
|
||||
"""Integration tests for performance features."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up performance integration test environment."""
|
||||
app.config['TESTING'] = True
|
||||
self.client = app.test_client()
|
||||
|
||||
# Authenticate
|
||||
self.auth_session = {
|
||||
'authenticated': True,
|
||||
'session_id': 'performance-test-session'
|
||||
}
|
||||
session_manager.sessions['performance-test-session'] = {
|
||||
'authenticated': True,
|
||||
'created_at': time.time(),
|
||||
'last_accessed': time.time()
|
||||
}
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up performance test environment."""
|
||||
session_manager.clear_all_sessions()
|
||||
|
||||
def test_performance_monitoring_api(self):
|
||||
"""Test performance monitoring API endpoints."""
|
||||
# Authenticate session
|
||||
with self.client.session_transaction() as sess:
|
||||
sess.update(self.auth_session)
|
||||
|
||||
# Test system metrics
|
||||
response = self.client.get('/api/performance/system-metrics')
|
||||
if response.status_code == 200: # Endpoint might not exist yet
|
||||
metrics_data = json.loads(response.data)
|
||||
self.assertIn('status', metrics_data)
|
||||
|
||||
def test_download_speed_limiting(self):
|
||||
"""Test download speed limiting configuration."""
|
||||
# Authenticate session
|
||||
with self.client.session_transaction() as sess:
|
||||
sess.update(self.auth_session)
|
||||
|
||||
# Test speed limit configuration
|
||||
speed_config = {'max_speed_mbps': 10}
|
||||
|
||||
response = self.client.post('/api/performance/speed-limit',
|
||||
data=json.dumps(speed_config),
|
||||
content_type='application/json')
|
||||
|
||||
# Endpoint might not exist yet, so check for appropriate response
|
||||
self.assertIn(response.status_code, [200, 404, 405])
|
||||
|
||||
|
||||
def run_integration_tests():
|
||||
"""Run the integration test suite."""
|
||||
# Create test suite
|
||||
suite = unittest.TestSuite()
|
||||
|
||||
# Add integration test cases
|
||||
integration_test_classes = [
|
||||
TestWebInterface,
|
||||
TestSocketIOEvents,
|
||||
TestDatabaseIntegration,
|
||||
TestPerformanceIntegration
|
||||
]
|
||||
|
||||
for test_class in integration_test_classes:
|
||||
tests = unittest.TestLoader().loadTestsFromTestCase(test_class)
|
||||
suite.addTests(tests)
|
||||
|
||||
# Run tests
|
||||
runner = unittest.TextTestRunner(verbosity=2)
|
||||
result = runner.run(suite)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("Running AniWorld Integration Tests...")
|
||||
print("=" * 50)
|
||||
|
||||
result = run_integration_tests()
|
||||
|
||||
print("\n" + "=" * 50)
|
||||
print(f"Tests run: {result.testsRun}")
|
||||
print(f"Failures: {len(result.failures)}")
|
||||
print(f"Errors: {len(result.errors)}")
|
||||
|
||||
if result.failures:
|
||||
print("\nFailures:")
|
||||
for test, traceback in result.failures:
|
||||
print(f"- {test}")
|
||||
|
||||
if result.errors:
|
||||
print("\nErrors:")
|
||||
for test, traceback in result.errors:
|
||||
print(f"- {test}")
|
||||
|
||||
if result.wasSuccessful():
|
||||
print("\nAll integration tests passed! ✅")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print("\nSome integration tests failed! ❌")
|
||||
sys.exit(1)
|
||||
@@ -1,498 +0,0 @@
|
||||
"""
|
||||
Automated Testing Pipeline
|
||||
|
||||
This module provides a comprehensive test runner and pipeline for the AniWorld application,
|
||||
including unit tests, integration tests, performance tests, and code coverage reporting.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import subprocess
|
||||
import json
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
# Add parent directory to path for imports
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||
|
||||
# Import test modules
|
||||
import test_core
|
||||
import test_integration
|
||||
import test_performance
|
||||
|
||||
|
||||
class TestResult:
|
||||
"""Container for test execution results."""
|
||||
|
||||
def __init__(self, test_type, result, execution_time, details=None):
|
||||
self.test_type = test_type
|
||||
self.result = result
|
||||
self.execution_time = execution_time
|
||||
self.details = details or {}
|
||||
self.timestamp = datetime.utcnow()
|
||||
|
||||
def to_dict(self):
|
||||
"""Convert result to dictionary format."""
|
||||
return {
|
||||
'test_type': self.test_type,
|
||||
'success': self.result.wasSuccessful() if hasattr(self.result, 'wasSuccessful') else self.result,
|
||||
'tests_run': self.result.testsRun if hasattr(self.result, 'testsRun') else 0,
|
||||
'failures': len(self.result.failures) if hasattr(self.result, 'failures') else 0,
|
||||
'errors': len(self.result.errors) if hasattr(self.result, 'errors') else 0,
|
||||
'execution_time': self.execution_time,
|
||||
'timestamp': self.timestamp.isoformat(),
|
||||
'details': self.details
|
||||
}
|
||||
|
||||
|
||||
class TestPipeline:
|
||||
"""Automated testing pipeline for AniWorld application."""
|
||||
|
||||
def __init__(self, output_dir=None):
|
||||
self.output_dir = output_dir or os.path.join(os.path.dirname(__file__), 'test_results')
|
||||
self.results = []
|
||||
|
||||
# Create output directory
|
||||
Path(self.output_dir).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def run_unit_tests(self, verbose=True):
|
||||
"""Run unit tests and return results."""
|
||||
print("=" * 60)
|
||||
print("RUNNING UNIT TESTS")
|
||||
print("=" * 60)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
# Run unit tests
|
||||
result = test_core.run_test_suite()
|
||||
execution_time = time.time() - start_time
|
||||
|
||||
test_result = TestResult('unit', result, execution_time)
|
||||
self.results.append(test_result)
|
||||
|
||||
if verbose:
|
||||
self._print_test_summary('Unit Tests', result, execution_time)
|
||||
|
||||
return test_result
|
||||
|
||||
except Exception as e:
|
||||
execution_time = time.time() - start_time
|
||||
test_result = TestResult('unit', False, execution_time, {'error': str(e)})
|
||||
self.results.append(test_result)
|
||||
|
||||
if verbose:
|
||||
print(f"Unit tests failed with error: {e}")
|
||||
|
||||
return test_result
|
||||
|
||||
def run_integration_tests(self, verbose=True):
|
||||
"""Run integration tests and return results."""
|
||||
print("\n" + "=" * 60)
|
||||
print("RUNNING INTEGRATION TESTS")
|
||||
print("=" * 60)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
# Run integration tests
|
||||
result = test_integration.run_integration_tests()
|
||||
execution_time = time.time() - start_time
|
||||
|
||||
test_result = TestResult('integration', result, execution_time)
|
||||
self.results.append(test_result)
|
||||
|
||||
if verbose:
|
||||
self._print_test_summary('Integration Tests', result, execution_time)
|
||||
|
||||
return test_result
|
||||
|
||||
except Exception as e:
|
||||
execution_time = time.time() - start_time
|
||||
test_result = TestResult('integration', False, execution_time, {'error': str(e)})
|
||||
self.results.append(test_result)
|
||||
|
||||
if verbose:
|
||||
print(f"Integration tests failed with error: {e}")
|
||||
|
||||
return test_result
|
||||
|
||||
def run_performance_tests(self, verbose=True):
|
||||
"""Run performance tests and return results."""
|
||||
print("\n" + "=" * 60)
|
||||
print("RUNNING PERFORMANCE TESTS")
|
||||
print("=" * 60)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
# Run performance tests
|
||||
result = test_performance.run_performance_tests()
|
||||
execution_time = time.time() - start_time
|
||||
|
||||
test_result = TestResult('performance', result, execution_time)
|
||||
self.results.append(test_result)
|
||||
|
||||
if verbose:
|
||||
self._print_test_summary('Performance Tests', result, execution_time)
|
||||
|
||||
return test_result
|
||||
|
||||
except Exception as e:
|
||||
execution_time = time.time() - start_time
|
||||
test_result = TestResult('performance', False, execution_time, {'error': str(e)})
|
||||
self.results.append(test_result)
|
||||
|
||||
if verbose:
|
||||
print(f"Performance tests failed with error: {e}")
|
||||
|
||||
return test_result
|
||||
|
||||
def run_code_coverage(self, test_modules=None, verbose=True):
|
||||
"""Run code coverage analysis."""
|
||||
if verbose:
|
||||
print("\n" + "=" * 60)
|
||||
print("RUNNING CODE COVERAGE ANALYSIS")
|
||||
print("=" * 60)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
# Check if coverage is available
|
||||
coverage_available = self._check_coverage_available()
|
||||
|
||||
if not coverage_available:
|
||||
if verbose:
|
||||
print("Coverage package not available. Install with: pip install coverage")
|
||||
return TestResult('coverage', False, 0, {'error': 'Coverage package not available'})
|
||||
|
||||
# Determine test modules to include
|
||||
if test_modules is None:
|
||||
test_modules = ['test_core', 'test_integration']
|
||||
|
||||
# Run coverage
|
||||
coverage_data = self._run_coverage_analysis(test_modules)
|
||||
execution_time = time.time() - start_time
|
||||
|
||||
test_result = TestResult('coverage', True, execution_time, coverage_data)
|
||||
self.results.append(test_result)
|
||||
|
||||
if verbose:
|
||||
self._print_coverage_summary(coverage_data)
|
||||
|
||||
return test_result
|
||||
|
||||
except Exception as e:
|
||||
execution_time = time.time() - start_time
|
||||
test_result = TestResult('coverage', False, execution_time, {'error': str(e)})
|
||||
self.results.append(test_result)
|
||||
|
||||
if verbose:
|
||||
print(f"Coverage analysis failed: {e}")
|
||||
|
||||
return test_result
|
||||
|
||||
def run_load_tests(self, concurrent_users=10, duration_seconds=60, verbose=True):
|
||||
"""Run load tests against the web application."""
|
||||
if verbose:
|
||||
print("\n" + "=" * 60)
|
||||
print(f"RUNNING LOAD TESTS ({concurrent_users} users, {duration_seconds}s)")
|
||||
print("=" * 60)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
# Mock load test implementation
|
||||
load_result = self._run_mock_load_test(concurrent_users, duration_seconds)
|
||||
execution_time = time.time() - start_time
|
||||
|
||||
test_result = TestResult('load', True, execution_time, load_result)
|
||||
self.results.append(test_result)
|
||||
|
||||
if verbose:
|
||||
self._print_load_test_summary(load_result)
|
||||
|
||||
return test_result
|
||||
|
||||
except Exception as e:
|
||||
execution_time = time.time() - start_time
|
||||
test_result = TestResult('load', False, execution_time, {'error': str(e)})
|
||||
self.results.append(test_result)
|
||||
|
||||
if verbose:
|
||||
print(f"Load tests failed: {e}")
|
||||
|
||||
return test_result
|
||||
|
||||
def run_full_pipeline(self, include_performance=True, include_coverage=True, include_load=False):
|
||||
"""Run the complete testing pipeline."""
|
||||
print("ANIWORLD AUTOMATED TESTING PIPELINE")
|
||||
print("=" * 80)
|
||||
print(f"Started at: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC")
|
||||
print("=" * 80)
|
||||
|
||||
pipeline_start = time.time()
|
||||
|
||||
# Run unit tests
|
||||
unit_result = self.run_unit_tests()
|
||||
|
||||
# Run integration tests
|
||||
integration_result = self.run_integration_tests()
|
||||
|
||||
# Run performance tests if requested
|
||||
performance_result = None
|
||||
if include_performance:
|
||||
performance_result = self.run_performance_tests()
|
||||
|
||||
# Run code coverage if requested
|
||||
coverage_result = None
|
||||
if include_coverage:
|
||||
coverage_result = self.run_code_coverage()
|
||||
|
||||
# Run load tests if requested
|
||||
load_result = None
|
||||
if include_load:
|
||||
load_result = self.run_load_tests()
|
||||
|
||||
pipeline_time = time.time() - pipeline_start
|
||||
|
||||
# Generate summary report
|
||||
self._generate_pipeline_report(pipeline_time)
|
||||
|
||||
# Return overall success
|
||||
all_successful = all(
|
||||
result.result.wasSuccessful() if hasattr(result.result, 'wasSuccessful') else result.result
|
||||
for result in self.results
|
||||
)
|
||||
|
||||
return all_successful
|
||||
|
||||
def _print_test_summary(self, test_name, result, execution_time):
|
||||
"""Print summary of test execution."""
|
||||
print(f"\n{test_name} Summary:")
|
||||
print(f"Tests run: {result.testsRun}")
|
||||
print(f"Failures: {len(result.failures)}")
|
||||
print(f"Errors: {len(result.errors)}")
|
||||
print(f"Execution time: {execution_time:.2f} seconds")
|
||||
|
||||
if result.failures:
|
||||
print(f"\nFailures ({len(result.failures)}):")
|
||||
for i, (test, error) in enumerate(result.failures[:3]): # Show first 3
|
||||
print(f" {i+1}. {test}")
|
||||
|
||||
if result.errors:
|
||||
print(f"\nErrors ({len(result.errors)}):")
|
||||
for i, (test, error) in enumerate(result.errors[:3]): # Show first 3
|
||||
print(f" {i+1}. {test}")
|
||||
|
||||
status = "PASSED ✅" if result.wasSuccessful() else "FAILED ❌"
|
||||
print(f"\nStatus: {status}")
|
||||
|
||||
def _print_coverage_summary(self, coverage_data):
|
||||
"""Print code coverage summary."""
|
||||
print(f"\nCode Coverage Summary:")
|
||||
print(f"Overall coverage: {coverage_data.get('overall_percentage', 0):.1f}%")
|
||||
print(f"Lines covered: {coverage_data.get('lines_covered', 0)}")
|
||||
print(f"Lines missing: {coverage_data.get('lines_missing', 0)}")
|
||||
print(f"Total lines: {coverage_data.get('total_lines', 0)}")
|
||||
|
||||
if 'file_coverage' in coverage_data:
|
||||
print(f"\nFile Coverage (top 5):")
|
||||
for file_info in coverage_data['file_coverage'][:5]:
|
||||
print(f" {file_info['file']}: {file_info['percentage']:.1f}%")
|
||||
|
||||
def _print_load_test_summary(self, load_result):
|
||||
"""Print load test summary."""
|
||||
print(f"\nLoad Test Summary:")
|
||||
print(f"Concurrent users: {load_result.get('concurrent_users', 0)}")
|
||||
print(f"Duration: {load_result.get('duration_seconds', 0)} seconds")
|
||||
print(f"Total requests: {load_result.get('total_requests', 0)}")
|
||||
print(f"Successful requests: {load_result.get('successful_requests', 0)}")
|
||||
print(f"Failed requests: {load_result.get('failed_requests', 0)}")
|
||||
print(f"Average response time: {load_result.get('avg_response_time', 0):.2f} ms")
|
||||
print(f"Requests per second: {load_result.get('requests_per_second', 0):.1f}")
|
||||
|
||||
def _generate_pipeline_report(self, pipeline_time):
|
||||
"""Generate comprehensive pipeline report."""
|
||||
print("\n" + "=" * 80)
|
||||
print("PIPELINE EXECUTION SUMMARY")
|
||||
print("=" * 80)
|
||||
|
||||
total_tests = sum(
|
||||
result.result.testsRun if hasattr(result.result, 'testsRun') else 0
|
||||
for result in self.results
|
||||
)
|
||||
|
||||
total_failures = sum(
|
||||
len(result.result.failures) if hasattr(result.result, 'failures') else 0
|
||||
for result in self.results
|
||||
)
|
||||
|
||||
total_errors = sum(
|
||||
len(result.result.errors) if hasattr(result.result, 'errors') else 0
|
||||
for result in self.results
|
||||
)
|
||||
|
||||
successful_suites = sum(
|
||||
1 for result in self.results
|
||||
if (hasattr(result.result, 'wasSuccessful') and result.result.wasSuccessful()) or result.result is True
|
||||
)
|
||||
|
||||
print(f"Total execution time: {pipeline_time:.2f} seconds")
|
||||
print(f"Test suites run: {len(self.results)}")
|
||||
print(f"Successful suites: {successful_suites}/{len(self.results)}")
|
||||
print(f"Total tests executed: {total_tests}")
|
||||
print(f"Total failures: {total_failures}")
|
||||
print(f"Total errors: {total_errors}")
|
||||
|
||||
print(f"\nSuite Breakdown:")
|
||||
for result in self.results:
|
||||
status = "PASS" if (hasattr(result.result, 'wasSuccessful') and result.result.wasSuccessful()) or result.result is True else "FAIL"
|
||||
print(f" {result.test_type.ljust(15)}: {status.ljust(6)} ({result.execution_time:.2f}s)")
|
||||
|
||||
# Save detailed report to file
|
||||
self._save_detailed_report(pipeline_time)
|
||||
|
||||
overall_success = successful_suites == len(self.results) and total_failures == 0 and total_errors == 0
|
||||
final_status = "PIPELINE PASSED ✅" if overall_success else "PIPELINE FAILED ❌"
|
||||
print(f"\n{final_status}")
|
||||
|
||||
return overall_success
|
||||
|
||||
def _save_detailed_report(self, pipeline_time):
|
||||
"""Save detailed test report to JSON file."""
|
||||
report_data = {
|
||||
'pipeline_execution': {
|
||||
'start_time': datetime.utcnow().isoformat(),
|
||||
'total_time': pipeline_time,
|
||||
'total_suites': len(self.results),
|
||||
'successful_suites': sum(
|
||||
1 for r in self.results
|
||||
if (hasattr(r.result, 'wasSuccessful') and r.result.wasSuccessful()) or r.result is True
|
||||
)
|
||||
},
|
||||
'test_results': [result.to_dict() for result in self.results]
|
||||
}
|
||||
|
||||
report_file = os.path.join(self.output_dir, f'test_report_{int(time.time())}.json')
|
||||
with open(report_file, 'w') as f:
|
||||
json.dump(report_data, f, indent=2)
|
||||
|
||||
print(f"\nDetailed report saved to: {report_file}")
|
||||
|
||||
def _check_coverage_available(self):
|
||||
"""Check if coverage package is available."""
|
||||
try:
|
||||
import coverage
|
||||
return True
|
||||
except ImportError:
|
||||
return False
|
||||
|
||||
def _run_coverage_analysis(self, test_modules):
|
||||
"""Run code coverage analysis."""
|
||||
# Mock coverage analysis since we don't want to require coverage package
|
||||
# In a real implementation, this would use the coverage package
|
||||
|
||||
return {
|
||||
'overall_percentage': 75.5,
|
||||
'lines_covered': 1245,
|
||||
'lines_missing': 405,
|
||||
'total_lines': 1650,
|
||||
'file_coverage': [
|
||||
{'file': 'Serie.py', 'percentage': 85.2, 'lines_covered': 89, 'lines_missing': 15},
|
||||
{'file': 'SerieList.py', 'percentage': 78.9, 'lines_covered': 123, 'lines_missing': 33},
|
||||
{'file': 'SerieScanner.py', 'percentage': 72.3, 'lines_covered': 156, 'lines_missing': 60},
|
||||
{'file': 'database_manager.py', 'percentage': 82.1, 'lines_covered': 234, 'lines_missing': 51},
|
||||
{'file': 'performance_optimizer.py', 'percentage': 68.7, 'lines_covered': 198, 'lines_missing': 90}
|
||||
]
|
||||
}
|
||||
|
||||
def _run_mock_load_test(self, concurrent_users, duration_seconds):
|
||||
"""Run mock load test (placeholder for real load testing)."""
|
||||
# This would integrate with tools like locust, artillery, or custom load testing
|
||||
import time
|
||||
import random
|
||||
|
||||
print(f"Simulating load test with {concurrent_users} concurrent users for {duration_seconds} seconds...")
|
||||
|
||||
# Simulate load test execution
|
||||
time.sleep(min(duration_seconds / 10, 5)) # Simulate some time for demo
|
||||
|
||||
# Mock results
|
||||
total_requests = concurrent_users * duration_seconds * random.randint(2, 8)
|
||||
failed_requests = int(total_requests * random.uniform(0.01, 0.05)) # 1-5% failure rate
|
||||
successful_requests = total_requests - failed_requests
|
||||
|
||||
return {
|
||||
'concurrent_users': concurrent_users,
|
||||
'duration_seconds': duration_seconds,
|
||||
'total_requests': total_requests,
|
||||
'successful_requests': successful_requests,
|
||||
'failed_requests': failed_requests,
|
||||
'avg_response_time': random.uniform(50, 200), # 50-200ms
|
||||
'requests_per_second': total_requests / duration_seconds,
|
||||
'success_rate': (successful_requests / total_requests) * 100
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
"""Main function to run the testing pipeline."""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description='AniWorld Testing Pipeline')
|
||||
parser.add_argument('--unit', action='store_true', help='Run unit tests only')
|
||||
parser.add_argument('--integration', action='store_true', help='Run integration tests only')
|
||||
parser.add_argument('--performance', action='store_true', help='Run performance tests only')
|
||||
parser.add_argument('--coverage', action='store_true', help='Run code coverage analysis')
|
||||
parser.add_argument('--load', action='store_true', help='Run load tests')
|
||||
parser.add_argument('--all', action='store_true', help='Run complete pipeline')
|
||||
parser.add_argument('--output-dir', help='Output directory for test results')
|
||||
parser.add_argument('--concurrent-users', type=int, default=10, help='Number of concurrent users for load tests')
|
||||
parser.add_argument('--load-duration', type=int, default=60, help='Duration for load tests in seconds')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Create pipeline
|
||||
pipeline = TestPipeline(args.output_dir)
|
||||
|
||||
success = True
|
||||
|
||||
if args.all or (not any([args.unit, args.integration, args.performance, args.coverage, args.load])):
|
||||
# Run full pipeline
|
||||
success = pipeline.run_full_pipeline(
|
||||
include_performance=True,
|
||||
include_coverage=True,
|
||||
include_load=args.load
|
||||
)
|
||||
else:
|
||||
# Run specific test suites
|
||||
if args.unit:
|
||||
result = pipeline.run_unit_tests()
|
||||
success &= result.result.wasSuccessful() if hasattr(result.result, 'wasSuccessful') else result.result
|
||||
|
||||
if args.integration:
|
||||
result = pipeline.run_integration_tests()
|
||||
success &= result.result.wasSuccessful() if hasattr(result.result, 'wasSuccessful') else result.result
|
||||
|
||||
if args.performance:
|
||||
result = pipeline.run_performance_tests()
|
||||
success &= result.result.wasSuccessful() if hasattr(result.result, 'wasSuccessful') else result.result
|
||||
|
||||
if args.coverage:
|
||||
result = pipeline.run_code_coverage()
|
||||
success &= result.result if isinstance(result.result, bool) else result.result.wasSuccessful()
|
||||
|
||||
if args.load:
|
||||
result = pipeline.run_load_tests(args.concurrent_users, args.load_duration)
|
||||
success &= result.result if isinstance(result.result, bool) else result.result.wasSuccessful()
|
||||
|
||||
# Exit with appropriate code
|
||||
sys.exit(0 if success else 1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -1,281 +0,0 @@
|
||||
"""
|
||||
Integration tests to verify no route conflicts exist.
|
||||
|
||||
This module ensures that all routes are unique and properly configured
|
||||
after consolidation efforts.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
import os
|
||||
from typing import Dict, List, Tuple, Set
|
||||
from collections import defaultdict
|
||||
|
||||
# Add src to path for imports
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'src'))
|
||||
|
||||
|
||||
class TestRouteConflicts:
|
||||
"""Test suite to detect and prevent route conflicts."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Setup test fixtures."""
|
||||
self.route_registry = defaultdict(list)
|
||||
self.blueprint_routes = {}
|
||||
|
||||
def test_no_duplicate_routes(self):
|
||||
"""
|
||||
Ensure no route conflicts exist across all controllers.
|
||||
|
||||
This test scans all controller files for route definitions
|
||||
and verifies that no two routes have the same path and method.
|
||||
"""
|
||||
routes = self._extract_all_routes()
|
||||
conflicts = self._find_route_conflicts(routes)
|
||||
|
||||
assert len(conflicts) == 0, f"Route conflicts found: {conflicts}"
|
||||
|
||||
def test_url_prefix_consistency(self):
|
||||
"""
|
||||
Test that URL prefixes follow consistent patterns.
|
||||
|
||||
Verifies that all API routes follow the /api/v1/ prefix pattern
|
||||
where appropriate.
|
||||
"""
|
||||
routes = self._extract_all_routes()
|
||||
inconsistent_routes = []
|
||||
|
||||
for route_info in routes:
|
||||
path = route_info['path']
|
||||
controller = route_info['controller']
|
||||
|
||||
# Skip non-API routes
|
||||
if not path.startswith('/api/'):
|
||||
continue
|
||||
|
||||
# Check for version consistency
|
||||
if path.startswith('/api/') and not path.startswith('/api/v1/'):
|
||||
# Some exceptions are allowed (like /api/health)
|
||||
allowed_exceptions = ['/api/health', '/api/config', '/api/scheduler', '/api/logging']
|
||||
if not any(path.startswith(exc) for exc in allowed_exceptions):
|
||||
inconsistent_routes.append({
|
||||
'path': path,
|
||||
'controller': controller,
|
||||
'issue': 'Missing version prefix'
|
||||
})
|
||||
|
||||
# This is a warning test - inconsistencies should be noted but not fail
|
||||
if inconsistent_routes:
|
||||
print(f"URL prefix inconsistencies found (consider standardizing): {inconsistent_routes}")
|
||||
|
||||
def test_blueprint_name_uniqueness(self):
|
||||
"""
|
||||
Test that all Blueprint names are unique.
|
||||
|
||||
Ensures no Blueprint naming conflicts exist.
|
||||
"""
|
||||
blueprint_names = self._extract_blueprint_names()
|
||||
duplicates = self._find_duplicates(blueprint_names)
|
||||
|
||||
assert len(duplicates) == 0, f"Duplicate blueprint names found: {duplicates}"
|
||||
|
||||
def test_route_parameter_consistency(self):
|
||||
"""
|
||||
Test that route parameters follow consistent naming patterns.
|
||||
|
||||
Ensures parameters like {id} vs {episode_id} are used consistently.
|
||||
"""
|
||||
routes = self._extract_all_routes()
|
||||
parameter_patterns = defaultdict(set)
|
||||
|
||||
for route_info in routes:
|
||||
path = route_info['path']
|
||||
# Extract parameter patterns
|
||||
if '<' in path:
|
||||
# Extract parameter names like <int:episode_id>
|
||||
import re
|
||||
params = re.findall(r'<[^>]+>', path)
|
||||
for param in params:
|
||||
# Normalize parameter (remove type hints)
|
||||
clean_param = param.replace('<int:', '<').replace('<string:', '<').replace('<', '').replace('>', '')
|
||||
parameter_patterns[clean_param].add(route_info['controller'])
|
||||
|
||||
# Check for inconsistent ID naming
|
||||
id_patterns = {k: v for k, v in parameter_patterns.items() if 'id' in k}
|
||||
if len(id_patterns) > 3: # Allow some variation
|
||||
print(f"Consider standardizing ID parameter naming: {dict(id_patterns)}")
|
||||
|
||||
def test_http_method_coverage(self):
|
||||
"""
|
||||
Test that CRUD operations are consistently implemented.
|
||||
|
||||
Ensures that resources supporting CRUD have all necessary methods.
|
||||
"""
|
||||
routes = self._extract_all_routes()
|
||||
resource_methods = defaultdict(set)
|
||||
|
||||
for route_info in routes:
|
||||
path = route_info['path']
|
||||
method = route_info['method']
|
||||
|
||||
# Group by resource (extract base path)
|
||||
if '/api/v1/' in path:
|
||||
resource = path.split('/api/v1/')[1].split('/')[0]
|
||||
resource_methods[resource].add(method)
|
||||
|
||||
# Check for incomplete CRUD implementations
|
||||
incomplete_crud = {}
|
||||
for resource, methods in resource_methods.items():
|
||||
if 'GET' in methods or 'POST' in methods: # If it has read/write operations
|
||||
missing_methods = {'GET', 'POST', 'PUT', 'DELETE'} - methods
|
||||
if missing_methods:
|
||||
incomplete_crud[resource] = missing_methods
|
||||
|
||||
# This is informational - not all resources need full CRUD
|
||||
if incomplete_crud:
|
||||
print(f"Resources with incomplete CRUD operations: {incomplete_crud}")
|
||||
|
||||
def _extract_all_routes(self) -> List[Dict]:
|
||||
"""
|
||||
Extract all route definitions from controller files.
|
||||
|
||||
Returns:
|
||||
List of route information dictionaries
|
||||
"""
|
||||
routes = []
|
||||
controller_dir = os.path.join(os.path.dirname(__file__), '..', '..', '..', 'src', 'server', 'web', 'controllers')
|
||||
|
||||
# This would normally scan actual controller files
|
||||
# For now, return mock data based on our analysis
|
||||
mock_routes = [
|
||||
{'path': '/api/v1/anime', 'method': 'GET', 'controller': 'anime.py', 'function': 'list_anime'},
|
||||
{'path': '/api/v1/anime', 'method': 'POST', 'controller': 'anime.py', 'function': 'create_anime'},
|
||||
{'path': '/api/v1/anime/<int:id>', 'method': 'GET', 'controller': 'anime.py', 'function': 'get_anime'},
|
||||
{'path': '/api/v1/episodes', 'method': 'GET', 'controller': 'episodes.py', 'function': 'list_episodes'},
|
||||
{'path': '/api/v1/episodes', 'method': 'POST', 'controller': 'episodes.py', 'function': 'create_episode'},
|
||||
{'path': '/api/health', 'method': 'GET', 'controller': 'health.py', 'function': 'health_check'},
|
||||
{'path': '/api/health/system', 'method': 'GET', 'controller': 'health.py', 'function': 'system_health'},
|
||||
{'path': '/status', 'method': 'GET', 'controller': 'health.py', 'function': 'basic_status'},
|
||||
{'path': '/ping', 'method': 'GET', 'controller': 'health.py', 'function': 'ping'},
|
||||
]
|
||||
|
||||
return mock_routes
|
||||
|
||||
def _find_route_conflicts(self, routes: List[Dict]) -> List[Dict]:
|
||||
"""
|
||||
Find conflicting routes (same path and method).
|
||||
|
||||
Args:
|
||||
routes: List of route information
|
||||
|
||||
Returns:
|
||||
List of conflicts found
|
||||
"""
|
||||
route_map = {}
|
||||
conflicts = []
|
||||
|
||||
for route in routes:
|
||||
key = (route['path'], route['method'])
|
||||
if key in route_map:
|
||||
conflicts.append({
|
||||
'path': route['path'],
|
||||
'method': route['method'],
|
||||
'controllers': [route_map[key]['controller'], route['controller']]
|
||||
})
|
||||
else:
|
||||
route_map[key] = route
|
||||
|
||||
return conflicts
|
||||
|
||||
def _extract_blueprint_names(self) -> List[Tuple[str, str]]:
|
||||
"""
|
||||
Extract all Blueprint names from controller files.
|
||||
|
||||
Returns:
|
||||
List of (blueprint_name, controller_file) tuples
|
||||
"""
|
||||
# Mock blueprint names based on our analysis
|
||||
blueprint_names = [
|
||||
('anime', 'anime.py'),
|
||||
('episodes', 'episodes.py'),
|
||||
('health_check', 'health.py'),
|
||||
('auth', 'auth.py'),
|
||||
('config', 'config.py'),
|
||||
('scheduler', 'scheduler.py'),
|
||||
('logging', 'logging.py'),
|
||||
('storage', 'storage.py'),
|
||||
('search', 'search.py'),
|
||||
('downloads', 'downloads.py'),
|
||||
('maintenance', 'maintenance.py'),
|
||||
('performance', 'performance.py'),
|
||||
('process', 'process.py'),
|
||||
('integrations', 'integrations.py'),
|
||||
('diagnostics', 'diagnostics.py'),
|
||||
('database', 'database.py'),
|
||||
('bulk_api', 'bulk.py'),
|
||||
('backups', 'backups.py'),
|
||||
]
|
||||
|
||||
return blueprint_names
|
||||
|
||||
def _find_duplicates(self, items: List[Tuple[str, str]]) -> List[str]:
|
||||
"""
|
||||
Find duplicate items in a list.
|
||||
|
||||
Args:
|
||||
items: List of (name, source) tuples
|
||||
|
||||
Returns:
|
||||
List of duplicate names
|
||||
"""
|
||||
seen = set()
|
||||
duplicates = []
|
||||
|
||||
for name, source in items:
|
||||
if name in seen:
|
||||
duplicates.append(name)
|
||||
seen.add(name)
|
||||
|
||||
return duplicates
|
||||
|
||||
|
||||
class TestControllerStandardization:
|
||||
"""Test suite for controller standardization compliance."""
|
||||
|
||||
def test_base_controller_usage(self):
|
||||
"""
|
||||
Test that controllers properly inherit from BaseController.
|
||||
|
||||
This would check that new controllers use the base controller
|
||||
instead of implementing duplicate functionality.
|
||||
"""
|
||||
# This would scan controller files to ensure they inherit BaseController
|
||||
# For now, this is a placeholder test
|
||||
assert True # Placeholder
|
||||
|
||||
def test_shared_decorators_usage(self):
|
||||
"""
|
||||
Test that controllers use shared decorators instead of local implementations.
|
||||
|
||||
Ensures @handle_api_errors, @require_auth, etc. are imported
|
||||
from shared modules rather than locally implemented.
|
||||
"""
|
||||
# This would scan for decorator usage patterns
|
||||
# For now, this is a placeholder test
|
||||
assert True # Placeholder
|
||||
|
||||
def test_response_format_consistency(self):
|
||||
"""
|
||||
Test that all endpoints return consistent response formats.
|
||||
|
||||
Ensures all responses follow the standardized format:
|
||||
{"status": "success/error", "message": "...", "data": ...}
|
||||
"""
|
||||
# This would test actual endpoint responses
|
||||
# For now, this is a placeholder test
|
||||
assert True # Placeholder
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run the tests
|
||||
pytest.main([__file__, "-v"])
|
||||
Reference in New Issue
Block a user