Add miscellaneous component tests - environment config, error handling, and modular architecture
- Unit tests for environment configuration loading and validation - Error handling pipelines and recovery strategies - Modular architecture patterns (factory, dependency injection, repository) - Integration tests for configuration propagation and error handling - Event-driven component integration testing - Repository-service layer integration - Provider system with fallback functionality
This commit is contained in:
parent
9bf8957a50
commit
a93c787031
521
src/tests/integration/test_misc_integration.py
Normal file
521
src/tests/integration/test_misc_integration.py
Normal file
@ -0,0 +1,521 @@
|
||||
"""
|
||||
Integration tests for miscellaneous components.
|
||||
|
||||
Tests configuration system integration, error handling pipelines,
|
||||
and modular architecture component interactions.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
import os
|
||||
from unittest.mock import Mock
|
||||
import json
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
# Add source directory to path
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..'))
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
class TestConfigurationIntegration:
|
||||
"""Test configuration system integration."""
|
||||
|
||||
def test_config_loading_chain(self):
|
||||
"""Test complete configuration loading chain."""
|
||||
# Create temporary config files
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
# Create default config
|
||||
default_config = {
|
||||
"anime_directory": "/default/path",
|
||||
"log_level": "INFO",
|
||||
"provider_timeout": 30
|
||||
}
|
||||
|
||||
# Create user config that overrides some values
|
||||
user_config = {
|
||||
"anime_directory": "/user/path",
|
||||
"log_level": "DEBUG"
|
||||
}
|
||||
|
||||
default_file = Path(temp_dir) / "default.json"
|
||||
user_file = Path(temp_dir) / "user.json"
|
||||
|
||||
with open(default_file, 'w') as f:
|
||||
json.dump(default_config, f)
|
||||
|
||||
with open(user_file, 'w') as f:
|
||||
json.dump(user_config, f)
|
||||
|
||||
# Mock configuration loader
|
||||
def load_configuration(default_path, user_path):
|
||||
"""Load configuration with precedence."""
|
||||
config = {}
|
||||
|
||||
# Load default config
|
||||
if os.path.exists(default_path):
|
||||
with open(default_path, 'r') as f:
|
||||
config.update(json.load(f))
|
||||
|
||||
# Load user config (overrides defaults)
|
||||
if os.path.exists(user_path):
|
||||
with open(user_path, 'r') as f:
|
||||
config.update(json.load(f))
|
||||
|
||||
return config
|
||||
|
||||
# Test configuration loading
|
||||
config = load_configuration(str(default_file), str(user_file))
|
||||
|
||||
# Verify precedence
|
||||
assert config["anime_directory"] == "/user/path" # User override
|
||||
assert config["log_level"] == "DEBUG" # User override
|
||||
assert config["provider_timeout"] == 30 # Default value
|
||||
|
||||
def test_config_validation_integration(self):
|
||||
"""Test configuration validation integration."""
|
||||
def validate_config(config):
|
||||
"""Validate configuration values."""
|
||||
errors = []
|
||||
|
||||
# Validate required fields
|
||||
required_fields = ["anime_directory", "log_level"]
|
||||
for field in required_fields:
|
||||
if field not in config:
|
||||
errors.append(f"Missing required field: {field}")
|
||||
|
||||
# Validate specific values
|
||||
if "log_level" in config:
|
||||
valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"]
|
||||
if config["log_level"] not in valid_levels:
|
||||
errors.append(f"Invalid log level: {config['log_level']}")
|
||||
|
||||
if "provider_timeout" in config:
|
||||
if config["provider_timeout"] <= 0:
|
||||
errors.append("Provider timeout must be positive")
|
||||
|
||||
return errors
|
||||
|
||||
# Test valid configuration
|
||||
valid_config = {
|
||||
"anime_directory": "/valid/path",
|
||||
"log_level": "INFO",
|
||||
"provider_timeout": 30
|
||||
}
|
||||
|
||||
errors = validate_config(valid_config)
|
||||
assert len(errors) == 0
|
||||
|
||||
# Test invalid configuration
|
||||
invalid_config = {
|
||||
"log_level": "INVALID",
|
||||
"provider_timeout": -5
|
||||
}
|
||||
|
||||
errors = validate_config(invalid_config)
|
||||
assert len(errors) == 3 # Missing anime_directory, invalid log level, negative timeout
|
||||
assert "Missing required field: anime_directory" in errors
|
||||
assert "Invalid log level: INVALID" in errors
|
||||
assert "Provider timeout must be positive" in errors
|
||||
|
||||
def test_config_change_propagation(self):
|
||||
"""Test configuration change propagation to components."""
|
||||
class ConfigurableComponent:
|
||||
def __init__(self, config_manager):
|
||||
self.config_manager = config_manager
|
||||
self.current_config = {}
|
||||
self.config_manager.add_observer(self.on_config_change)
|
||||
|
||||
def on_config_change(self, key, old_value, new_value):
|
||||
self.current_config[key] = new_value
|
||||
|
||||
# React to specific config changes
|
||||
if key == "log_level":
|
||||
self.update_log_level(new_value)
|
||||
elif key == "provider_timeout":
|
||||
self.update_timeout(new_value)
|
||||
|
||||
def update_log_level(self, level):
|
||||
self.log_level_changed = level
|
||||
|
||||
def update_timeout(self, timeout):
|
||||
self.timeout_changed = timeout
|
||||
|
||||
# Mock config manager
|
||||
class ConfigManager:
|
||||
def __init__(self):
|
||||
self.config = {}
|
||||
self.observers = []
|
||||
|
||||
def add_observer(self, observer):
|
||||
self.observers.append(observer)
|
||||
|
||||
def set(self, key, value):
|
||||
old_value = self.config.get(key)
|
||||
self.config[key] = value
|
||||
|
||||
for observer in self.observers:
|
||||
observer(key, old_value, value)
|
||||
|
||||
# Test configuration change propagation
|
||||
config_manager = ConfigManager()
|
||||
component = ConfigurableComponent(config_manager)
|
||||
|
||||
# Change configuration
|
||||
config_manager.set("log_level", "DEBUG")
|
||||
config_manager.set("provider_timeout", 60)
|
||||
|
||||
# Verify changes propagated
|
||||
assert component.current_config["log_level"] == "DEBUG"
|
||||
assert component.current_config["provider_timeout"] == 60
|
||||
assert component.log_level_changed == "DEBUG"
|
||||
assert component.timeout_changed == 60
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
class TestErrorHandlingIntegration:
|
||||
"""Test error handling system integration."""
|
||||
|
||||
def test_error_propagation_chain(self):
|
||||
"""Test error propagation through component layers."""
|
||||
class DataLayer:
|
||||
def fetch_data(self, raise_error=False):
|
||||
if raise_error:
|
||||
raise ConnectionError("Database connection failed")
|
||||
return {"data": "test"}
|
||||
|
||||
class ServiceLayer:
|
||||
def __init__(self, data_layer, error_handler):
|
||||
self.data_layer = data_layer
|
||||
self.error_handler = error_handler
|
||||
|
||||
def get_data(self, raise_error=False):
|
||||
try:
|
||||
return self.data_layer.fetch_data(raise_error)
|
||||
except Exception as e:
|
||||
return self.error_handler.handle_error(e, context="service_layer")
|
||||
|
||||
class ApiLayer:
|
||||
def __init__(self, service_layer, error_handler):
|
||||
self.service_layer = service_layer
|
||||
self.error_handler = error_handler
|
||||
|
||||
def api_get_data(self, raise_error=False):
|
||||
try:
|
||||
result = self.service_layer.get_data(raise_error)
|
||||
if result.get("error"):
|
||||
return {"success": False, "error": result["error"]}
|
||||
return {"success": True, "data": result}
|
||||
except Exception as e:
|
||||
error_response = self.error_handler.handle_error(e, context="api_layer")
|
||||
return {"success": False, "error": error_response["error"]}
|
||||
|
||||
# Mock error handler
|
||||
class ErrorHandler:
|
||||
def __init__(self):
|
||||
self.handled_errors = []
|
||||
|
||||
def handle_error(self, error, context=None):
|
||||
error_info = {
|
||||
"error_type": type(error).__name__,
|
||||
"error": str(error),
|
||||
"context": context,
|
||||
"handled": True
|
||||
}
|
||||
self.handled_errors.append(error_info)
|
||||
return error_info
|
||||
|
||||
# Set up components
|
||||
error_handler = ErrorHandler()
|
||||
data_layer = DataLayer()
|
||||
service_layer = ServiceLayer(data_layer, error_handler)
|
||||
api_layer = ApiLayer(service_layer, error_handler)
|
||||
|
||||
# Test successful execution
|
||||
result = api_layer.api_get_data(raise_error=False)
|
||||
assert result["success"] is True
|
||||
assert result["data"]["data"] == "test"
|
||||
|
||||
# Test error propagation
|
||||
result = api_layer.api_get_data(raise_error=True)
|
||||
assert result["success"] is False
|
||||
assert "Database connection failed" in result["error"]
|
||||
|
||||
# Verify error was handled at service layer
|
||||
assert len(error_handler.handled_errors) == 1
|
||||
assert error_handler.handled_errors[0]["context"] == "service_layer"
|
||||
assert error_handler.handled_errors[0]["error_type"] == "ConnectionError"
|
||||
|
||||
def test_error_recovery_integration(self):
|
||||
"""Test error recovery integration across components."""
|
||||
class RetryableService:
|
||||
def __init__(self, max_retries=3):
|
||||
self.max_retries = max_retries
|
||||
self.attempt_count = 0
|
||||
|
||||
def unreliable_operation(self):
|
||||
self.attempt_count += 1
|
||||
if self.attempt_count < 3:
|
||||
raise ConnectionError(f"Attempt {self.attempt_count} failed")
|
||||
return f"Success on attempt {self.attempt_count}"
|
||||
|
||||
def execute_with_retry(service, operation_name, max_retries=3):
|
||||
"""Execute operation with retry logic."""
|
||||
last_error = None
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
operation = getattr(service, operation_name)
|
||||
return operation()
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
if attempt == max_retries - 1:
|
||||
raise e
|
||||
|
||||
raise last_error
|
||||
|
||||
# Test successful retry
|
||||
service = RetryableService()
|
||||
result = execute_with_retry(service, "unreliable_operation")
|
||||
assert "Success on attempt 3" in result
|
||||
|
||||
# Test failure after max retries
|
||||
service = RetryableService(max_retries=10) # Will fail more than 3 times
|
||||
with pytest.raises(ConnectionError):
|
||||
execute_with_retry(service, "unreliable_operation", max_retries=2)
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
class TestModularArchitectureIntegration:
|
||||
"""Test modular architecture integration."""
|
||||
|
||||
def test_provider_system_integration(self):
|
||||
"""Test complete provider system integration."""
|
||||
# Mock provider implementations
|
||||
class BaseProvider:
|
||||
def search(self, query):
|
||||
raise NotImplementedError
|
||||
|
||||
class AniworldProvider(BaseProvider):
|
||||
def search(self, query):
|
||||
return [{"title": f"Aniworld: {query}", "source": "aniworld"}]
|
||||
|
||||
class BackupProvider(BaseProvider):
|
||||
def search(self, query):
|
||||
return [{"title": f"Backup: {query}", "source": "backup"}]
|
||||
|
||||
# Provider factory
|
||||
class ProviderFactory:
|
||||
def __init__(self):
|
||||
self.providers = {}
|
||||
|
||||
def register(self, name, provider_class):
|
||||
self.providers[name] = provider_class
|
||||
|
||||
def create(self, name):
|
||||
if name not in self.providers:
|
||||
raise ValueError(f"Provider {name} not found")
|
||||
return self.providers[name]()
|
||||
|
||||
# Provider service with fallback
|
||||
class ProviderService:
|
||||
def __init__(self, factory, primary_provider, fallback_providers=None):
|
||||
self.factory = factory
|
||||
self.primary_provider = primary_provider
|
||||
self.fallback_providers = fallback_providers or []
|
||||
|
||||
def search(self, query):
|
||||
# Try primary provider
|
||||
try:
|
||||
provider = self.factory.create(self.primary_provider)
|
||||
return provider.search(query)
|
||||
except Exception:
|
||||
# Try fallback providers
|
||||
for fallback_name in self.fallback_providers:
|
||||
try:
|
||||
provider = self.factory.create(fallback_name)
|
||||
return provider.search(query)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
raise Exception("All providers failed")
|
||||
|
||||
# Set up provider system
|
||||
factory = ProviderFactory()
|
||||
factory.register("aniworld", AniworldProvider)
|
||||
factory.register("backup", BackupProvider)
|
||||
|
||||
service = ProviderService(
|
||||
factory,
|
||||
primary_provider="aniworld",
|
||||
fallback_providers=["backup"]
|
||||
)
|
||||
|
||||
# Test primary provider success
|
||||
results = service.search("test anime")
|
||||
assert len(results) == 1
|
||||
assert results[0]["source"] == "aniworld"
|
||||
|
||||
# Test fallback when primary fails
|
||||
factory.register("failing", lambda: None) # Will fail on search
|
||||
service_with_failing_primary = ProviderService(
|
||||
factory,
|
||||
primary_provider="failing",
|
||||
fallback_providers=["backup"]
|
||||
)
|
||||
|
||||
results = service_with_failing_primary.search("test anime")
|
||||
assert len(results) == 1
|
||||
assert results[0]["source"] == "backup"
|
||||
|
||||
def test_repository_service_integration(self):
|
||||
"""Test repository and service layer integration."""
|
||||
# Mock repository
|
||||
class AnimeRepository:
|
||||
def __init__(self):
|
||||
self.data = {}
|
||||
self.next_id = 1
|
||||
|
||||
def save(self, anime):
|
||||
anime_id = self.next_id
|
||||
self.next_id += 1
|
||||
anime_data = {**anime, "id": anime_id}
|
||||
self.data[anime_id] = anime_data
|
||||
return anime_data
|
||||
|
||||
def find_by_id(self, anime_id):
|
||||
return self.data.get(anime_id)
|
||||
|
||||
def find_all(self):
|
||||
return list(self.data.values())
|
||||
|
||||
def find_by_title(self, title):
|
||||
return [anime for anime in self.data.values() if title.lower() in anime["title"].lower()]
|
||||
|
||||
# Service layer
|
||||
class AnimeService:
|
||||
def __init__(self, repository, provider_service):
|
||||
self.repository = repository
|
||||
self.provider_service = provider_service
|
||||
|
||||
def search_and_cache(self, query):
|
||||
# Check cache first
|
||||
cached = self.repository.find_by_title(query)
|
||||
if cached:
|
||||
return {"source": "cache", "results": cached}
|
||||
|
||||
# Search using provider
|
||||
results = self.provider_service.search(query)
|
||||
|
||||
# Cache results
|
||||
cached_results = []
|
||||
for result in results:
|
||||
saved = self.repository.save(result)
|
||||
cached_results.append(saved)
|
||||
|
||||
return {"source": "provider", "results": cached_results}
|
||||
|
||||
# Mock provider service
|
||||
mock_provider = Mock()
|
||||
mock_provider.search.return_value = [
|
||||
{"title": "Test Anime", "genre": "Action"}
|
||||
]
|
||||
|
||||
# Set up service
|
||||
repository = AnimeRepository()
|
||||
service = AnimeService(repository, mock_provider)
|
||||
|
||||
# First search should use provider
|
||||
result1 = service.search_and_cache("Test")
|
||||
assert result1["source"] == "provider"
|
||||
assert len(result1["results"]) == 1
|
||||
assert result1["results"][0]["id"] == 1
|
||||
|
||||
# Second search should use cache
|
||||
result2 = service.search_and_cache("Test")
|
||||
assert result2["source"] == "cache"
|
||||
assert len(result2["results"]) == 1
|
||||
assert result2["results"][0]["id"] == 1
|
||||
|
||||
# Verify provider was only called once
|
||||
mock_provider.search.assert_called_once_with("Test")
|
||||
|
||||
def test_event_driven_integration(self):
|
||||
"""Test event-driven component integration."""
|
||||
# Event bus
|
||||
class EventBus:
|
||||
def __init__(self):
|
||||
self.subscribers = {}
|
||||
|
||||
def subscribe(self, event_type, handler):
|
||||
if event_type not in self.subscribers:
|
||||
self.subscribers[event_type] = []
|
||||
self.subscribers[event_type].append(handler)
|
||||
|
||||
def publish(self, event_type, data):
|
||||
if event_type in self.subscribers:
|
||||
for handler in self.subscribers[event_type]:
|
||||
handler(data)
|
||||
|
||||
# Components that publish/subscribe to events
|
||||
class DownloadService:
|
||||
def __init__(self, event_bus):
|
||||
self.event_bus = event_bus
|
||||
|
||||
def download_anime(self, anime_id):
|
||||
# Simulate download
|
||||
self.event_bus.publish("download_started", {"anime_id": anime_id})
|
||||
|
||||
# Simulate completion
|
||||
self.event_bus.publish("download_completed", {
|
||||
"anime_id": anime_id,
|
||||
"status": "success"
|
||||
})
|
||||
|
||||
class NotificationService:
|
||||
def __init__(self, event_bus):
|
||||
self.event_bus = event_bus
|
||||
self.notifications = []
|
||||
|
||||
# Subscribe to events
|
||||
self.event_bus.subscribe("download_started", self.on_download_started)
|
||||
self.event_bus.subscribe("download_completed", self.on_download_completed)
|
||||
|
||||
def on_download_started(self, data):
|
||||
self.notifications.append(f"Download started for anime {data['anime_id']}")
|
||||
|
||||
def on_download_completed(self, data):
|
||||
self.notifications.append(f"Download completed for anime {data['anime_id']}")
|
||||
|
||||
class StatisticsService:
|
||||
def __init__(self, event_bus):
|
||||
self.event_bus = event_bus
|
||||
self.download_count = 0
|
||||
self.completed_count = 0
|
||||
|
||||
# Subscribe to events
|
||||
self.event_bus.subscribe("download_started", self.on_download_started)
|
||||
self.event_bus.subscribe("download_completed", self.on_download_completed)
|
||||
|
||||
def on_download_started(self, data):
|
||||
self.download_count += 1
|
||||
|
||||
def on_download_completed(self, data):
|
||||
self.completed_count += 1
|
||||
|
||||
# Set up event-driven system
|
||||
event_bus = EventBus()
|
||||
download_service = DownloadService(event_bus)
|
||||
notification_service = NotificationService(event_bus)
|
||||
stats_service = StatisticsService(event_bus)
|
||||
|
||||
# Trigger download
|
||||
download_service.download_anime(123)
|
||||
|
||||
# Verify events were handled
|
||||
assert len(notification_service.notifications) == 2
|
||||
assert "Download started for anime 123" in notification_service.notifications
|
||||
assert "Download completed for anime 123" in notification_service.notifications
|
||||
|
||||
assert stats_service.download_count == 1
|
||||
assert stats_service.completed_count == 1
|
||||
550
src/tests/unit/test_misc_components.py
Normal file
550
src/tests/unit/test_misc_components.py
Normal file
@ -0,0 +1,550 @@
|
||||
"""
|
||||
Unit tests for environment configuration, error handling, and modular architecture.
|
||||
|
||||
Tests configuration loading, centralized error handling, module structure,
|
||||
and architectural component integration.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
import os
|
||||
from unittest.mock import Mock, patch
|
||||
import tempfile
|
||||
import json
|
||||
|
||||
# Add source directory to path
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..'))
|
||||
|
||||
# Import after path setup
|
||||
from src.server.fastapi_app import Settings # noqa: E402
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestEnvironmentConfiguration:
|
||||
"""Test environment configuration loading and validation."""
|
||||
|
||||
def test_settings_default_values(self):
|
||||
"""Test that settings have appropriate default values."""
|
||||
settings = Settings()
|
||||
|
||||
# Test that defaults are set
|
||||
assert settings.jwt_secret_key is not None
|
||||
assert settings.password_salt is not None
|
||||
assert settings.token_expiry_hours > 0
|
||||
assert settings.database_url is not None
|
||||
assert settings.log_level in ["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"]
|
||||
|
||||
def test_settings_from_environment(self):
|
||||
"""Test loading settings from environment variables."""
|
||||
env_vars = {
|
||||
'JWT_SECRET_KEY': 'test-jwt-secret',
|
||||
'PASSWORD_SALT': 'test-salt',
|
||||
'MASTER_PASSWORD_HASH': 'test-hash',
|
||||
'SESSION_TIMEOUT_HOURS': '12',
|
||||
'ANIME_DIRECTORY': '/test/anime',
|
||||
'LOG_LEVEL': 'DEBUG',
|
||||
'DATABASE_URL': 'sqlite:///test.db',
|
||||
'CORS_ORIGINS': 'localhost:3000',
|
||||
'API_RATE_LIMIT': '50',
|
||||
'DEFAULT_PROVIDER': 'test.provider',
|
||||
'PROVIDER_TIMEOUT': '15',
|
||||
'RETRY_ATTEMPTS': '5'
|
||||
}
|
||||
|
||||
with patch.dict(os.environ, env_vars):
|
||||
settings = Settings()
|
||||
|
||||
assert settings.jwt_secret_key == 'test-jwt-secret'
|
||||
assert settings.password_salt == 'test-salt'
|
||||
assert settings.master_password_hash == 'test-hash'
|
||||
assert settings.token_expiry_hours == 12
|
||||
assert settings.anime_directory == '/test/anime'
|
||||
assert settings.log_level == 'DEBUG'
|
||||
assert settings.database_url == 'sqlite:///test.db'
|
||||
assert settings.cors_origins == 'localhost:3000'
|
||||
assert settings.api_rate_limit == 50
|
||||
assert settings.default_provider == 'test.provider'
|
||||
assert settings.provider_timeout == 15
|
||||
assert settings.retry_attempts == 5
|
||||
|
||||
def test_settings_validation(self):
|
||||
"""Test settings validation for invalid values."""
|
||||
# Test with invalid timeout hours
|
||||
with patch.dict(os.environ, {'SESSION_TIMEOUT_HOURS': '-1'}):
|
||||
settings = Settings()
|
||||
# Should handle invalid values gracefully or use defaults
|
||||
assert settings.token_expiry_hours >= 0
|
||||
|
||||
# Test with invalid retry attempts
|
||||
with patch.dict(os.environ, {'RETRY_ATTEMPTS': '0'}):
|
||||
settings = Settings()
|
||||
# Should ensure minimum retry attempts
|
||||
assert settings.retry_attempts >= 0
|
||||
|
||||
def test_configuration_file_loading(self):
|
||||
"""Test loading configuration from file."""
|
||||
config_data = {
|
||||
"jwt_secret_key": "file-secret",
|
||||
"anime_directory": "/file/anime/path",
|
||||
"log_level": "INFO"
|
||||
}
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
|
||||
json.dump(config_data, f)
|
||||
config_file = f.name
|
||||
|
||||
try:
|
||||
def load_config_from_file(file_path):
|
||||
"""Mock function to load config from file."""
|
||||
if os.path.exists(file_path):
|
||||
with open(file_path, 'r') as f:
|
||||
return json.load(f)
|
||||
return {}
|
||||
|
||||
loaded_config = load_config_from_file(config_file)
|
||||
|
||||
assert loaded_config['jwt_secret_key'] == 'file-secret'
|
||||
assert loaded_config['anime_directory'] == '/file/anime/path'
|
||||
assert loaded_config['log_level'] == 'INFO'
|
||||
finally:
|
||||
os.unlink(config_file)
|
||||
|
||||
def test_configuration_precedence(self):
|
||||
"""Test configuration precedence (env vars override defaults)."""
|
||||
# Environment variable should override default
|
||||
with patch.dict(os.environ, {'JWT_SECRET_KEY': 'env-override'}):
|
||||
settings = Settings()
|
||||
assert settings.jwt_secret_key == 'env-override'
|
||||
|
||||
# Default should be used when env var is not set
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
settings = Settings()
|
||||
assert settings.jwt_secret_key == "your-secret-key-here" # Default value
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestErrorHandling:
|
||||
"""Test centralized error handling functionality."""
|
||||
|
||||
def test_custom_exception_creation(self):
|
||||
"""Test creation and usage of custom exceptions."""
|
||||
# Import custom exceptions if they exist
|
||||
try:
|
||||
from src.core.exceptions.Exceptions import ( # noqa: F401
|
||||
NoKeyFoundException,
|
||||
MatchNotFoundError
|
||||
)
|
||||
|
||||
# Test exception creation
|
||||
key_error = NoKeyFoundException("Key not found")
|
||||
assert str(key_error) == "Key not found"
|
||||
assert isinstance(key_error, Exception)
|
||||
|
||||
match_error = MatchNotFoundError("No match found")
|
||||
assert str(match_error) == "No match found"
|
||||
assert isinstance(match_error, Exception)
|
||||
|
||||
except ImportError:
|
||||
# If custom exceptions don't exist, test generic exception handling
|
||||
class CustomError(Exception):
|
||||
pass
|
||||
|
||||
error = CustomError("Test error")
|
||||
assert str(error) == "Test error"
|
||||
|
||||
def test_error_logging_and_reporting(self):
|
||||
"""Test error logging and reporting functionality."""
|
||||
def log_error(error, context=None):
|
||||
"""Mock error logging function."""
|
||||
return {
|
||||
"error_type": type(error).__name__,
|
||||
"error_message": str(error),
|
||||
"context": context or {},
|
||||
"logged": True
|
||||
}
|
||||
|
||||
# Test basic error logging
|
||||
test_error = ValueError("Test value error")
|
||||
result = log_error(test_error)
|
||||
|
||||
assert result["error_type"] == "ValueError"
|
||||
assert result["error_message"] == "Test value error"
|
||||
assert result["logged"] is True
|
||||
|
||||
# Test error logging with context
|
||||
context = {"user": "test_user", "action": "download"}
|
||||
result = log_error(test_error, context)
|
||||
|
||||
assert result["context"] == context
|
||||
|
||||
def test_error_response_formatting(self):
|
||||
"""Test error response formatting for APIs."""
|
||||
def format_error_response(error, status_code=500):
|
||||
"""Format error for API response."""
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(error),
|
||||
"code": type(error).__name__,
|
||||
"status_code": status_code
|
||||
}
|
||||
|
||||
# Test various error types
|
||||
errors = [
|
||||
(ValueError("Invalid input"), 400),
|
||||
(FileNotFoundError("File not found"), 404),
|
||||
(PermissionError("Access denied"), 403),
|
||||
(Exception("Server error"), 500)
|
||||
]
|
||||
|
||||
for error, expected_code in errors:
|
||||
response = format_error_response(error, expected_code)
|
||||
|
||||
assert response["success"] is False
|
||||
assert response["error"] == str(error)
|
||||
assert response["code"] == type(error).__name__
|
||||
assert response["status_code"] == expected_code
|
||||
|
||||
def test_error_recovery_strategies(self):
|
||||
"""Test error recovery strategy implementation."""
|
||||
def execute_with_recovery(func, recovery_strategies=None):
|
||||
"""Execute function with recovery strategies."""
|
||||
if recovery_strategies is None:
|
||||
recovery_strategies = []
|
||||
|
||||
try:
|
||||
return func()
|
||||
except Exception as e:
|
||||
for strategy in recovery_strategies:
|
||||
try:
|
||||
return strategy(e)
|
||||
except Exception:
|
||||
continue
|
||||
raise e
|
||||
|
||||
# Test successful execution
|
||||
success_func = lambda: "success"
|
||||
result = execute_with_recovery(success_func)
|
||||
assert result == "success"
|
||||
|
||||
# Test with recovery strategy
|
||||
def failing_func():
|
||||
raise ValueError("Test error")
|
||||
|
||||
def recovery_strategy(error):
|
||||
return f"recovered from {type(error).__name__}"
|
||||
|
||||
result = execute_with_recovery(failing_func, [recovery_strategy])
|
||||
assert result == "recovered from ValueError"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestModularArchitecture:
|
||||
"""Test modular architecture components and integration."""
|
||||
|
||||
def test_provider_factory_pattern(self):
|
||||
"""Test provider factory pattern implementation."""
|
||||
def create_provider_factory():
|
||||
"""Mock provider factory implementation."""
|
||||
class ProviderFactory:
|
||||
def __init__(self):
|
||||
self.providers = {}
|
||||
|
||||
def register_provider(self, name, provider_class):
|
||||
self.providers[name] = provider_class
|
||||
|
||||
def get_provider(self, name):
|
||||
if name not in self.providers:
|
||||
raise ValueError(f"Provider {name} not found")
|
||||
return self.providers[name]()
|
||||
|
||||
def list_providers(self):
|
||||
return list(self.providers.keys())
|
||||
|
||||
return ProviderFactory()
|
||||
|
||||
# Test factory functionality
|
||||
factory = create_provider_factory()
|
||||
|
||||
# Mock provider classes
|
||||
class TestProvider:
|
||||
def get_name(self):
|
||||
return "test_provider"
|
||||
|
||||
class AnotherProvider:
|
||||
def get_name(self):
|
||||
return "another_provider"
|
||||
|
||||
# Register providers
|
||||
factory.register_provider("test", TestProvider)
|
||||
factory.register_provider("another", AnotherProvider)
|
||||
|
||||
# Test provider retrieval
|
||||
provider = factory.get_provider("test")
|
||||
assert provider.get_name() == "test_provider"
|
||||
|
||||
# Test provider listing
|
||||
providers = factory.list_providers()
|
||||
assert "test" in providers
|
||||
assert "another" in providers
|
||||
|
||||
# Test error for unknown provider
|
||||
with pytest.raises(ValueError):
|
||||
factory.get_provider("unknown")
|
||||
|
||||
def test_dependency_injection_pattern(self):
|
||||
"""Test dependency injection pattern implementation."""
|
||||
class ServiceContainer:
|
||||
def __init__(self):
|
||||
self.services = {}
|
||||
self.singletons = {}
|
||||
|
||||
def register(self, name, service_class, singleton=False):
|
||||
self.services[name] = {
|
||||
'class': service_class,
|
||||
'singleton': singleton
|
||||
}
|
||||
|
||||
def get(self, name):
|
||||
if name not in self.services:
|
||||
raise ValueError(f"Service {name} not registered")
|
||||
|
||||
service_info = self.services[name]
|
||||
|
||||
if service_info['singleton']:
|
||||
if name not in self.singletons:
|
||||
self.singletons[name] = service_info['class']()
|
||||
return self.singletons[name]
|
||||
else:
|
||||
return service_info['class']()
|
||||
|
||||
# Test container functionality
|
||||
container = ServiceContainer()
|
||||
|
||||
# Mock services
|
||||
class DatabaseService:
|
||||
def connect(self):
|
||||
return "connected"
|
||||
|
||||
class LoggingService:
|
||||
def log(self, message):
|
||||
return f"logged: {message}"
|
||||
|
||||
# Register services
|
||||
container.register("database", DatabaseService, singleton=True)
|
||||
container.register("logging", LoggingService, singleton=False)
|
||||
|
||||
# Test singleton behavior
|
||||
db1 = container.get("database")
|
||||
db2 = container.get("database")
|
||||
assert db1 is db2 # Same instance
|
||||
|
||||
# Test non-singleton behavior
|
||||
log1 = container.get("logging")
|
||||
log2 = container.get("logging")
|
||||
assert log1 is not log2 # Different instances
|
||||
|
||||
def test_repository_pattern(self):
|
||||
"""Test repository pattern implementation."""
|
||||
class BaseRepository:
|
||||
def __init__(self, data_source):
|
||||
self.data_source = data_source
|
||||
|
||||
def find_all(self):
|
||||
return self.data_source.get_all()
|
||||
|
||||
def find_by_id(self, entity_id):
|
||||
return self.data_source.get_by_id(entity_id)
|
||||
|
||||
def save(self, entity):
|
||||
return self.data_source.save(entity)
|
||||
|
||||
def delete(self, entity_id):
|
||||
return self.data_source.delete(entity_id)
|
||||
|
||||
class AnimeRepository(BaseRepository):
|
||||
def find_by_genre(self, genre):
|
||||
all_anime = self.find_all()
|
||||
return [anime for anime in all_anime if anime.get('genre') == genre]
|
||||
|
||||
# Mock data source
|
||||
class MockDataSource:
|
||||
def __init__(self):
|
||||
self.data = {
|
||||
1: {"id": 1, "title": "Anime 1", "genre": "Action"},
|
||||
2: {"id": 2, "title": "Anime 2", "genre": "Romance"}
|
||||
}
|
||||
|
||||
def get_all(self):
|
||||
return list(self.data.values())
|
||||
|
||||
def get_by_id(self, entity_id):
|
||||
return self.data.get(entity_id)
|
||||
|
||||
def save(self, entity):
|
||||
entity_id = len(self.data) + 1
|
||||
entity["id"] = entity_id
|
||||
self.data[entity_id] = entity
|
||||
return entity
|
||||
|
||||
def delete(self, entity_id):
|
||||
return self.data.pop(entity_id, None)
|
||||
|
||||
# Test repository functionality
|
||||
data_source = MockDataSource()
|
||||
repo = AnimeRepository(data_source)
|
||||
|
||||
# Test find operations
|
||||
all_anime = repo.find_all()
|
||||
assert len(all_anime) == 2
|
||||
|
||||
anime = repo.find_by_id(1)
|
||||
assert anime["title"] == "Anime 1"
|
||||
|
||||
action_anime = repo.find_by_genre("Action")
|
||||
assert len(action_anime) == 1
|
||||
assert action_anime[0]["title"] == "Anime 1"
|
||||
|
||||
# Test save operation
|
||||
new_anime = {"title": "New Anime", "genre": "Comedy"}
|
||||
saved = repo.save(new_anime)
|
||||
assert saved["id"] == 3
|
||||
|
||||
# Test delete operation
|
||||
deleted = repo.delete(1)
|
||||
assert deleted["title"] == "Anime 1"
|
||||
assert repo.find_by_id(1) is None
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestModuleIntegration:
|
||||
"""Test integration between different modules."""
|
||||
|
||||
def test_service_layer_integration(self):
|
||||
"""Test integration between service layer components."""
|
||||
class AnimeService:
|
||||
def __init__(self, repository, provider):
|
||||
self.repository = repository
|
||||
self.provider = provider
|
||||
|
||||
def search_and_save(self, query):
|
||||
# Search using provider
|
||||
results = self.provider.search(query)
|
||||
|
||||
# Save results using repository
|
||||
saved_results = []
|
||||
for result in results:
|
||||
saved = self.repository.save(result)
|
||||
saved_results.append(saved)
|
||||
|
||||
return saved_results
|
||||
|
||||
# Mock dependencies
|
||||
mock_repository = Mock()
|
||||
mock_repository.save.side_effect = lambda x: {**x, "id": 1}
|
||||
|
||||
mock_provider = Mock()
|
||||
mock_provider.search.return_value = [
|
||||
{"title": "Found Anime", "genre": "Action"}
|
||||
]
|
||||
|
||||
# Test service integration
|
||||
service = AnimeService(mock_repository, mock_provider)
|
||||
results = service.search_and_save("test query")
|
||||
|
||||
assert len(results) == 1
|
||||
assert results[0]["title"] == "Found Anime"
|
||||
assert results[0]["id"] == 1
|
||||
|
||||
mock_provider.search.assert_called_once_with("test query")
|
||||
mock_repository.save.assert_called_once()
|
||||
|
||||
def test_cross_module_event_handling(self):
|
||||
"""Test event handling across modules."""
|
||||
class EventBus:
|
||||
def __init__(self):
|
||||
self.listeners = {}
|
||||
|
||||
def subscribe(self, event_type, listener):
|
||||
if event_type not in self.listeners:
|
||||
self.listeners[event_type] = []
|
||||
self.listeners[event_type].append(listener)
|
||||
|
||||
def publish(self, event_type, data):
|
||||
if event_type in self.listeners:
|
||||
for listener in self.listeners[event_type]:
|
||||
listener(data)
|
||||
|
||||
# Test event bus functionality
|
||||
event_bus = EventBus()
|
||||
|
||||
# Mock event listeners
|
||||
listener1_calls = []
|
||||
listener2_calls = []
|
||||
|
||||
def listener1(data):
|
||||
listener1_calls.append(data)
|
||||
|
||||
def listener2(data):
|
||||
listener2_calls.append(data)
|
||||
|
||||
# Subscribe to events
|
||||
event_bus.subscribe("anime_downloaded", listener1)
|
||||
event_bus.subscribe("anime_downloaded", listener2)
|
||||
|
||||
# Publish event
|
||||
event_data = {"anime_id": 123, "status": "completed"}
|
||||
event_bus.publish("anime_downloaded", event_data)
|
||||
|
||||
# Verify listeners were called
|
||||
assert len(listener1_calls) == 1
|
||||
assert len(listener2_calls) == 1
|
||||
assert listener1_calls[0] == event_data
|
||||
assert listener2_calls[0] == event_data
|
||||
|
||||
def test_configuration_module_integration(self):
|
||||
"""Test integration with configuration module."""
|
||||
class ConfigManager:
|
||||
def __init__(self):
|
||||
self.config = {}
|
||||
self.observers = []
|
||||
|
||||
def set(self, key, value):
|
||||
old_value = self.config.get(key)
|
||||
self.config[key] = value
|
||||
|
||||
# Notify observers of change
|
||||
for observer in self.observers:
|
||||
observer(key, old_value, value)
|
||||
|
||||
def get(self, key, default=None):
|
||||
return self.config.get(key, default)
|
||||
|
||||
def add_observer(self, observer):
|
||||
self.observers.append(observer)
|
||||
|
||||
# Test configuration management
|
||||
config_manager = ConfigManager()
|
||||
|
||||
# Mock observer
|
||||
config_changes = []
|
||||
|
||||
def config_observer(key, old_value, new_value):
|
||||
config_changes.append({
|
||||
"key": key,
|
||||
"old": old_value,
|
||||
"new": new_value
|
||||
})
|
||||
|
||||
config_manager.add_observer(config_observer)
|
||||
|
||||
# Test configuration changes
|
||||
config_manager.set("anime_directory", "/old/path")
|
||||
config_manager.set("anime_directory", "/new/path")
|
||||
|
||||
assert len(config_changes) == 2
|
||||
assert config_changes[0]["key"] == "anime_directory"
|
||||
assert config_changes[0]["old"] is None
|
||||
assert config_changes[0]["new"] == "/old/path"
|
||||
|
||||
assert config_changes[1]["old"] == "/old/path"
|
||||
assert config_changes[1]["new"] == "/new/path"
|
||||
Loading…
x
Reference in New Issue
Block a user