feat: Add comprehensive provider health monitoring and failover system

- Implemented ProviderHealthMonitor for real-time tracking
  - Monitors availability, response times, success rates
  - Automatic marking unavailable after failures
  - Background health check loop

- Added ProviderFailover for automatic provider switching
  - Configurable retry attempts with exponential backoff
  - Integration with health monitoring
  - Smart provider selection

- Created MonitoredProviderWrapper for performance tracking
  - Transparent monitoring for any provider
  - Automatic metric recording
  - No changes needed to existing providers

- Implemented ProviderConfigManager for dynamic configuration
  - Runtime updates without restart
  - Per-provider settings (timeout, retries, bandwidth)
  - JSON-based persistence

- Added Provider Management API (15+ endpoints)
  - Health monitoring endpoints
  - Configuration management
  - Failover control

- Comprehensive testing (34 tests, 100% pass rate)
  - Health monitoring tests
  - Failover scenario tests
  - Configuration management tests

- Documentation updates
  - Updated infrastructure.md
  - Updated instructions.md
  - Created PROVIDER_ENHANCEMENT_SUMMARY.md

Total: ~2,593 lines of code, 34 passing tests
This commit is contained in:
2025-10-24 11:01:40 +02:00
parent 85d73b8294
commit fecdb38a90
23 changed files with 3137 additions and 109 deletions

View File

@@ -0,0 +1,351 @@
"""Dynamic provider configuration management.
This module provides runtime configuration management for anime providers,
allowing dynamic updates without application restart.
"""
import json
import logging
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class ProviderSettings:
"""Configuration settings for a single provider."""
name: str
enabled: bool = True
priority: int = 0
timeout_seconds: int = 30
max_retries: int = 3
retry_delay_seconds: float = 1.0
max_concurrent_downloads: int = 3
bandwidth_limit_mbps: Optional[float] = None
custom_headers: Optional[Dict[str, str]] = None
custom_params: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert settings to dictionary."""
return {
k: v for k, v in asdict(self).items() if v is not None
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ProviderSettings":
"""Create settings from dictionary."""
return cls(**{k: v for k, v in data.items() if hasattr(cls, k)})
class ProviderConfigManager:
"""Manages dynamic configuration for anime providers."""
def __init__(self, config_file: Optional[Path] = None):
"""Initialize provider configuration manager.
Args:
config_file: Path to configuration file (optional).
"""
self._config_file = config_file
self._provider_settings: Dict[str, ProviderSettings] = {}
self._global_settings: Dict[str, Any] = {
"default_timeout": 30,
"default_max_retries": 3,
"default_retry_delay": 1.0,
"enable_health_monitoring": True,
"enable_failover": True,
}
# Load configuration if file exists
if config_file and config_file.exists():
self.load_config()
logger.info("Provider configuration manager initialized")
def get_provider_settings(
self, provider_name: str
) -> Optional[ProviderSettings]:
"""Get settings for a specific provider.
Args:
provider_name: Name of the provider.
Returns:
Provider settings or None if not configured.
"""
return self._provider_settings.get(provider_name)
def set_provider_settings(
self, provider_name: str, settings: ProviderSettings
) -> None:
"""Set settings for a specific provider.
Args:
provider_name: Name of the provider.
settings: Provider settings to apply.
"""
self._provider_settings[provider_name] = settings
logger.info(f"Updated settings for provider: {provider_name}")
def update_provider_settings(
self, provider_name: str, **kwargs
) -> bool:
"""Update specific provider settings.
Args:
provider_name: Name of the provider.
**kwargs: Settings to update.
Returns:
True if updated, False if provider not found.
"""
if provider_name not in self._provider_settings:
# Create new settings
self._provider_settings[provider_name] = ProviderSettings(
name=provider_name, **kwargs
)
logger.info(f"Created new settings for provider: {provider_name}") # noqa: E501
return True
settings = self._provider_settings[provider_name]
# Update settings
for key, value in kwargs.items():
if hasattr(settings, key):
setattr(settings, key, value)
logger.info(
f"Updated settings for provider {provider_name}: {kwargs}"
)
return True
def get_all_provider_settings(self) -> Dict[str, ProviderSettings]:
"""Get settings for all configured providers.
Returns:
Dictionary mapping provider names to their settings.
"""
return self._provider_settings.copy()
def get_enabled_providers(self) -> List[str]:
"""Get list of enabled providers.
Returns:
List of enabled provider names.
"""
return [
name
for name, settings in self._provider_settings.items()
if settings.enabled
]
def enable_provider(self, provider_name: str) -> bool:
"""Enable a provider.
Args:
provider_name: Name of the provider.
Returns:
True if enabled, False if not found.
"""
if provider_name in self._provider_settings:
self._provider_settings[provider_name].enabled = True
logger.info(f"Enabled provider: {provider_name}")
return True
return False
def disable_provider(self, provider_name: str) -> bool:
"""Disable a provider.
Args:
provider_name: Name of the provider.
Returns:
True if disabled, False if not found.
"""
if provider_name in self._provider_settings:
self._provider_settings[provider_name].enabled = False
logger.info(f"Disabled provider: {provider_name}")
return True
return False
def set_provider_priority(
self, provider_name: str, priority: int
) -> bool:
"""Set priority for a provider.
Lower priority values = higher priority.
Args:
provider_name: Name of the provider.
priority: Priority value (lower = higher priority).
Returns:
True if updated, False if not found.
"""
if provider_name in self._provider_settings:
self._provider_settings[provider_name].priority = priority
logger.info(
f"Set priority for {provider_name} to {priority}"
)
return True
return False
def get_providers_by_priority(self) -> List[str]:
"""Get providers sorted by priority.
Returns:
List of provider names sorted by priority (low to high).
"""
sorted_providers = sorted(
self._provider_settings.items(),
key=lambda x: x[1].priority,
)
return [name for name, _ in sorted_providers]
def get_global_setting(self, key: str) -> Optional[Any]:
"""Get a global setting value.
Args:
key: Setting key.
Returns:
Setting value or None if not found.
"""
return self._global_settings.get(key)
def set_global_setting(self, key: str, value: Any) -> None:
"""Set a global setting value.
Args:
key: Setting key.
value: Setting value.
"""
self._global_settings[key] = value
logger.info(f"Updated global setting {key}: {value}")
def get_all_global_settings(self) -> Dict[str, Any]:
"""Get all global settings.
Returns:
Dictionary of global settings.
"""
return self._global_settings.copy()
def load_config(self, file_path: Optional[Path] = None) -> bool:
"""Load configuration from file.
Args:
file_path: Path to configuration file (uses default if None).
Returns:
True if loaded successfully, False otherwise.
"""
config_path = file_path or self._config_file
if not config_path or not config_path.exists():
logger.warning(
f"Configuration file not found: {config_path}"
)
return False
try:
with open(config_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Load provider settings
if "providers" in data:
for name, settings_data in data["providers"].items():
self._provider_settings[name] = (
ProviderSettings.from_dict(settings_data)
)
# Load global settings
if "global" in data:
self._global_settings.update(data["global"])
logger.info(
f"Loaded configuration from {config_path} "
f"({len(self._provider_settings)} providers)"
)
return True
except Exception as e:
logger.error(
f"Failed to load configuration from {config_path}: {e}",
exc_info=True,
)
return False
def save_config(self, file_path: Optional[Path] = None) -> bool:
"""Save configuration to file.
Args:
file_path: Path to save to (uses default if None).
Returns:
True if saved successfully, False otherwise.
"""
config_path = file_path or self._config_file
if not config_path:
logger.error("No configuration file path specified")
return False
try:
# Ensure parent directory exists
config_path.parent.mkdir(parents=True, exist_ok=True)
data = {
"providers": {
name: settings.to_dict()
for name, settings in self._provider_settings.items()
},
"global": self._global_settings,
}
with open(config_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
logger.info(f"Saved configuration to {config_path}")
return True
except Exception as e:
logger.error(
f"Failed to save configuration to {config_path}: {e}",
exc_info=True,
)
return False
def reset_to_defaults(self) -> None:
"""Reset all settings to defaults."""
self._provider_settings.clear()
self._global_settings = {
"default_timeout": 30,
"default_max_retries": 3,
"default_retry_delay": 1.0,
"enable_health_monitoring": True,
"enable_failover": True,
}
logger.info("Reset configuration to defaults")
# Global configuration manager instance
_config_manager: Optional[ProviderConfigManager] = None
def get_config_manager(
config_file: Optional[Path] = None,
) -> ProviderConfigManager:
"""Get or create global provider configuration manager.
Args:
config_file: Configuration file path (used on first call).
Returns:
Global ProviderConfigManager instance.
"""
global _config_manager
if _config_manager is None:
_config_manager = ProviderConfigManager(config_file=config_file)
return _config_manager

View File

@@ -0,0 +1,325 @@
"""Provider failover system for automatic fallback on failures.
This module implements automatic failover between multiple providers,
ensuring high availability by switching to backup providers when the
primary fails.
"""
import asyncio
import logging
from typing import Any, Callable, Dict, List, Optional, TypeVar
from src.core.providers.health_monitor import get_health_monitor
from src.core.providers.provider_config import DEFAULT_PROVIDERS
logger = logging.getLogger(__name__)
T = TypeVar("T")
class ProviderFailover:
"""Manages automatic failover between multiple providers."""
def __init__(
self,
providers: Optional[List[str]] = None,
max_retries: int = 3,
retry_delay: float = 1.0,
enable_health_monitoring: bool = True,
):
"""Initialize provider failover manager.
Args:
providers: List of provider names to use (default: all).
max_retries: Maximum retry attempts per provider.
retry_delay: Delay between retries in seconds.
enable_health_monitoring: Whether to use health monitoring.
"""
self._providers = providers or DEFAULT_PROVIDERS.copy()
self._max_retries = max_retries
self._retry_delay = retry_delay
self._enable_health_monitoring = enable_health_monitoring
# Current provider index
self._current_index = 0
# Health monitor
self._health_monitor = (
get_health_monitor() if enable_health_monitoring else None
)
logger.info(
f"Provider failover initialized with "
f"{len(self._providers)} providers"
)
def get_current_provider(self) -> str:
"""Get the current active provider.
Returns:
Name of current provider.
"""
if self._enable_health_monitoring and self._health_monitor:
# Try to get best available provider
best = self._health_monitor.get_best_provider()
if best and best in self._providers:
return best
# Fall back to round-robin selection
return self._providers[self._current_index % len(self._providers)]
def get_next_provider(self) -> Optional[str]:
"""Get the next provider in the failover chain.
Returns:
Name of next provider or None if none available.
"""
if self._enable_health_monitoring and self._health_monitor:
# Get available providers
available = [
p
for p in self._providers
if p in self._health_monitor.get_available_providers()
]
if not available:
logger.warning("No available providers for failover")
return None
# Find next available provider
current = self.get_current_provider()
try:
current_idx = available.index(current)
next_idx = (current_idx + 1) % len(available)
return available[next_idx]
except ValueError:
# Current provider not in available list
return available[0]
# Fall back to simple rotation
self._current_index = (self._current_index + 1) % len(
self._providers
)
return self._providers[self._current_index]
async def execute_with_failover(
self,
operation: Callable[[str], Any],
operation_name: str = "operation",
**kwargs,
) -> Any:
"""Execute an operation with automatic failover.
Args:
operation: Async callable that takes provider name.
operation_name: Name for logging purposes.
**kwargs: Additional arguments to pass to operation.
Returns:
Result from successful operation.
Raises:
Exception: If all providers fail.
"""
providers_tried = []
last_error = None
# Try each provider
for attempt in range(len(self._providers)):
provider = self.get_current_provider()
# Skip if already tried
if provider in providers_tried:
self.get_next_provider()
continue
providers_tried.append(provider)
# Try operation with retries
for retry in range(self._max_retries):
try:
logger.info(
f"Executing {operation_name} with provider "
f"{provider} (attempt {retry + 1}/{self._max_retries})" # noqa: E501
)
# Execute operation
import time
start_time = time.time()
result = await operation(provider, **kwargs)
elapsed_ms = (time.time() - start_time) * 1000
# Record success
if self._health_monitor:
self._health_monitor.record_request(
provider_name=provider,
success=True,
response_time_ms=elapsed_ms,
)
logger.info(
f"{operation_name} succeeded with provider "
f"{provider} in {elapsed_ms:.2f}ms"
)
return result
except Exception as e:
last_error = e
logger.warning(
f"{operation_name} failed with provider "
f"{provider} (attempt {retry + 1}): {e}"
)
# Record failure
if self._health_monitor:
import time
elapsed_ms = (time.time() - start_time) * 1000
self._health_monitor.record_request(
provider_name=provider,
success=False,
response_time_ms=elapsed_ms,
error_message=str(e),
)
# Retry with delay
if retry < self._max_retries - 1:
await asyncio.sleep(self._retry_delay)
# Try next provider
next_provider = self.get_next_provider()
if next_provider is None:
break
# All providers failed
error_msg = (
f"{operation_name} failed with all providers. "
f"Tried: {', '.join(providers_tried)}"
)
logger.error(error_msg)
raise Exception(error_msg) from last_error
def add_provider(self, provider_name: str) -> None:
"""Add a provider to the failover chain.
Args:
provider_name: Name of provider to add.
"""
if provider_name not in self._providers:
self._providers.append(provider_name)
logger.info(f"Added provider to failover chain: {provider_name}")
def remove_provider(self, provider_name: str) -> bool:
"""Remove a provider from the failover chain.
Args:
provider_name: Name of provider to remove.
Returns:
True if removed, False if not found.
"""
if provider_name in self._providers:
self._providers.remove(provider_name)
logger.info(
f"Removed provider from failover chain: {provider_name}"
)
return True
return False
def get_providers(self) -> List[str]:
"""Get list of all providers in failover chain.
Returns:
List of provider names.
"""
return self._providers.copy()
def set_provider_priority(
self, provider_name: str, priority_index: int
) -> bool:
"""Set priority of a provider by moving it in the chain.
Args:
provider_name: Name of provider to prioritize.
priority_index: New index position (0 = highest priority).
Returns:
True if updated, False if provider not found.
"""
if provider_name not in self._providers:
return False
self._providers.remove(provider_name)
self._providers.insert(
min(priority_index, len(self._providers)), provider_name
)
logger.info(
f"Set provider {provider_name} priority to index {priority_index}"
)
return True
def get_failover_stats(self) -> Dict[str, Any]:
"""Get failover statistics and configuration.
Returns:
Dictionary with failover stats.
"""
stats = {
"total_providers": len(self._providers),
"providers": self._providers.copy(),
"current_provider": self.get_current_provider(),
"max_retries": self._max_retries,
"retry_delay": self._retry_delay,
"health_monitoring_enabled": self._enable_health_monitoring,
}
if self._health_monitor:
available = self._health_monitor.get_available_providers()
stats["available_providers"] = [
p for p in self._providers if p in available
]
stats["unavailable_providers"] = [
p for p in self._providers if p not in available
]
return stats
# Global failover instance
_failover: Optional[ProviderFailover] = None
def get_failover() -> ProviderFailover:
"""Get or create global provider failover instance.
Returns:
Global ProviderFailover instance.
"""
global _failover
if _failover is None:
_failover = ProviderFailover()
return _failover
def configure_failover(
providers: Optional[List[str]] = None,
max_retries: int = 3,
retry_delay: float = 1.0,
) -> ProviderFailover:
"""Configure global provider failover instance.
Args:
providers: List of provider names to use.
max_retries: Maximum retry attempts per provider.
retry_delay: Delay between retries in seconds.
Returns:
Configured ProviderFailover instance.
"""
global _failover
_failover = ProviderFailover(
providers=providers,
max_retries=max_retries,
retry_delay=retry_delay,
)
return _failover

View File

@@ -0,0 +1,416 @@
"""Provider health monitoring system for tracking availability and performance.
This module provides health monitoring capabilities for anime providers,
tracking metrics like availability, response times, success rates, and
bandwidth usage.
"""
import asyncio
import logging
from collections import defaultdict, deque
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Deque, Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class ProviderHealthMetrics:
"""Health metrics for a single provider."""
provider_name: str
is_available: bool = True
last_check_time: Optional[datetime] = None
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
average_response_time_ms: float = 0.0
last_error: Optional[str] = None
last_error_time: Optional[datetime] = None
consecutive_failures: int = 0
total_bytes_downloaded: int = 0
uptime_percentage: float = 100.0
@property
def success_rate(self) -> float:
"""Calculate success rate as percentage."""
if self.total_requests == 0:
return 0.0
return (self.successful_requests / self.total_requests) * 100
@property
def failure_rate(self) -> float:
"""Calculate failure rate as percentage."""
return 100.0 - self.success_rate
def to_dict(self) -> Dict[str, Any]:
"""Convert metrics to dictionary."""
return {
"provider_name": self.provider_name,
"is_available": self.is_available,
"last_check_time": (
self.last_check_time.isoformat()
if self.last_check_time
else None
),
"total_requests": self.total_requests,
"successful_requests": self.successful_requests,
"failed_requests": self.failed_requests,
"success_rate": round(self.success_rate, 2),
"average_response_time_ms": round(
self.average_response_time_ms, 2
),
"last_error": self.last_error,
"last_error_time": (
self.last_error_time.isoformat()
if self.last_error_time
else None
),
"consecutive_failures": self.consecutive_failures,
"total_bytes_downloaded": self.total_bytes_downloaded,
"uptime_percentage": round(self.uptime_percentage, 2),
}
@dataclass
class RequestMetric:
"""Individual request metric."""
timestamp: datetime
success: bool
response_time_ms: float
bytes_transferred: int = 0
error_message: Optional[str] = None
class ProviderHealthMonitor:
"""Monitors health and performance of anime providers."""
def __init__(
self,
max_history_size: int = 1000,
health_check_interval: int = 300, # 5 minutes
failure_threshold: int = 3,
):
"""Initialize provider health monitor.
Args:
max_history_size: Maximum number of request metrics to keep
per provider.
health_check_interval: Interval between health checks in
seconds.
failure_threshold: Number of consecutive failures before
marking unavailable.
"""
self._max_history_size = max_history_size
self._health_check_interval = health_check_interval
self._failure_threshold = failure_threshold
# Provider metrics storage
self._metrics: Dict[str, ProviderHealthMetrics] = {}
self._request_history: Dict[str, Deque[RequestMetric]] = defaultdict(
lambda: deque(maxlen=max_history_size)
)
# Health check task
self._health_check_task: Optional[asyncio.Task] = None
self._is_running = False
logger.info("Provider health monitor initialized")
def start_monitoring(self) -> None:
"""Start background health monitoring."""
if self._is_running:
logger.warning("Health monitoring already running")
return
self._is_running = True
self._health_check_task = asyncio.create_task(
self._health_check_loop()
)
logger.info("Provider health monitoring started")
async def stop_monitoring(self) -> None:
"""Stop background health monitoring."""
self._is_running = False
if self._health_check_task:
self._health_check_task.cancel()
try:
await self._health_check_task
except asyncio.CancelledError:
pass
self._health_check_task = None
logger.info("Provider health monitoring stopped")
async def _health_check_loop(self) -> None:
"""Background health check loop."""
while self._is_running:
try:
await self._perform_health_checks()
await asyncio.sleep(self._health_check_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in health check loop: {e}", exc_info=True)
await asyncio.sleep(self._health_check_interval)
async def _perform_health_checks(self) -> None:
"""Perform health checks on all registered providers."""
for provider_name in list(self._metrics.keys()):
try:
metrics = self._metrics[provider_name]
metrics.last_check_time = datetime.now()
# Update uptime percentage based on recent history
recent_metrics = self._get_recent_metrics(
provider_name, minutes=60
)
if recent_metrics:
successful = sum(1 for m in recent_metrics if m.success)
metrics.uptime_percentage = (
successful / len(recent_metrics)
) * 100
logger.debug(
f"Health check for {provider_name}: "
f"available={metrics.is_available}, "
f"success_rate={metrics.success_rate:.2f}%"
)
except Exception as e:
logger.error(
f"Error checking health for {provider_name}: {e}",
exc_info=True,
)
def record_request(
self,
provider_name: str,
success: bool,
response_time_ms: float,
bytes_transferred: int = 0,
error_message: Optional[str] = None,
) -> None:
"""Record a provider request for health tracking.
Args:
provider_name: Name of the provider.
success: Whether the request was successful.
response_time_ms: Response time in milliseconds.
bytes_transferred: Number of bytes transferred.
error_message: Error message if request failed.
"""
# Initialize metrics if not exists
if provider_name not in self._metrics:
self._metrics[provider_name] = ProviderHealthMetrics(
provider_name=provider_name
)
metrics = self._metrics[provider_name]
# Update request counts
metrics.total_requests += 1
if success:
metrics.successful_requests += 1
metrics.consecutive_failures = 0
else:
metrics.failed_requests += 1
metrics.consecutive_failures += 1
metrics.last_error = error_message
metrics.last_error_time = datetime.now()
# Update availability based on consecutive failures
if metrics.consecutive_failures >= self._failure_threshold:
if metrics.is_available:
logger.warning(
f"Provider {provider_name} marked as unavailable after "
f"{metrics.consecutive_failures} consecutive failures"
)
metrics.is_available = False
else:
metrics.is_available = True
# Update average response time
total_time = metrics.average_response_time_ms * (
metrics.total_requests - 1
)
metrics.average_response_time_ms = (
total_time + response_time_ms
) / metrics.total_requests
# Update bytes transferred
metrics.total_bytes_downloaded += bytes_transferred
# Store request metric in history
request_metric = RequestMetric(
timestamp=datetime.now(),
success=success,
response_time_ms=response_time_ms,
bytes_transferred=bytes_transferred,
error_message=error_message,
)
self._request_history[provider_name].append(request_metric)
logger.debug(
f"Recorded request for {provider_name}: "
f"success={success}, time={response_time_ms:.2f}ms"
)
def get_provider_metrics(
self, provider_name: str
) -> Optional[ProviderHealthMetrics]:
"""Get health metrics for a specific provider.
Args:
provider_name: Name of the provider.
Returns:
Provider health metrics or None if not found.
"""
return self._metrics.get(provider_name)
def get_all_metrics(self) -> Dict[str, ProviderHealthMetrics]:
"""Get health metrics for all providers.
Returns:
Dictionary mapping provider names to their metrics.
"""
return self._metrics.copy()
def get_available_providers(self) -> List[str]:
"""Get list of currently available providers.
Returns:
List of available provider names.
"""
return [
name
for name, metrics in self._metrics.items()
if metrics.is_available
]
def get_best_provider(self) -> Optional[str]:
"""Get the best performing available provider.
Best is determined by:
1. Availability
2. Success rate
3. Response time
Returns:
Name of best provider or None if none available.
"""
available = [
(name, metrics)
for name, metrics in self._metrics.items()
if metrics.is_available
]
if not available:
return None
# Sort by success rate (descending) then response time (ascending)
available.sort(
key=lambda x: (-x[1].success_rate, x[1].average_response_time_ms)
)
best_provider = available[0][0]
logger.debug(f"Best provider selected: {best_provider}")
return best_provider
def _get_recent_metrics(
self, provider_name: str, minutes: int = 60
) -> List[RequestMetric]:
"""Get recent request metrics for a provider.
Args:
provider_name: Name of the provider.
minutes: Number of minutes to look back.
Returns:
List of recent request metrics.
"""
if provider_name not in self._request_history:
return []
cutoff_time = datetime.now() - timedelta(minutes=minutes)
return [
metric
for metric in self._request_history[provider_name]
if metric.timestamp >= cutoff_time
]
def reset_provider_metrics(self, provider_name: str) -> bool:
"""Reset metrics for a specific provider.
Args:
provider_name: Name of the provider.
Returns:
True if reset successful, False if provider not found.
"""
if provider_name not in self._metrics:
return False
self._metrics[provider_name] = ProviderHealthMetrics(
provider_name=provider_name
)
self._request_history[provider_name].clear()
logger.info(f"Reset metrics for provider: {provider_name}")
return True
def get_health_summary(self) -> Dict[str, Any]:
"""Get summary of overall provider health.
Returns:
Dictionary with health summary statistics.
"""
total_providers = len(self._metrics)
available_providers = len(self.get_available_providers())
if total_providers == 0:
return {
"total_providers": 0,
"available_providers": 0,
"availability_percentage": 0.0,
"average_success_rate": 0.0,
"average_response_time_ms": 0.0,
}
avg_success_rate = sum(
m.success_rate for m in self._metrics.values()
) / total_providers
avg_response_time = sum(
m.average_response_time_ms for m in self._metrics.values()
) / total_providers
return {
"total_providers": total_providers,
"available_providers": available_providers,
"availability_percentage": (
available_providers / total_providers
)
* 100,
"average_success_rate": round(avg_success_rate, 2),
"average_response_time_ms": round(avg_response_time, 2),
"providers": {
name: metrics.to_dict()
for name, metrics in self._metrics.items()
},
}
# Global health monitor instance
_health_monitor: Optional[ProviderHealthMonitor] = None
def get_health_monitor() -> ProviderHealthMonitor:
"""Get or create global provider health monitor instance.
Returns:
Global ProviderHealthMonitor instance.
"""
global _health_monitor
if _health_monitor is None:
_health_monitor = ProviderHealthMonitor()
return _health_monitor

View File

@@ -0,0 +1,307 @@
"""Performance monitoring wrapper for anime providers.
This module provides a wrapper that adds automatic performance tracking
to any provider implementation.
"""
import logging
import time
from typing import Any, Callable, Dict, List, Optional
from src.core.providers.base_provider import Loader
from src.core.providers.health_monitor import get_health_monitor
logger = logging.getLogger(__name__)
class MonitoredProviderWrapper(Loader):
"""Wrapper that adds performance monitoring to any provider."""
def __init__(
self,
provider: Loader,
enable_monitoring: bool = True,
):
"""Initialize monitored provider wrapper.
Args:
provider: Provider instance to wrap.
enable_monitoring: Whether to enable performance monitoring.
"""
self._provider = provider
self._enable_monitoring = enable_monitoring
self._health_monitor = (
get_health_monitor() if enable_monitoring else None
)
logger.info(
f"Monitoring wrapper initialized for provider: "
f"{provider.get_site_key()}"
)
def _record_operation(
self,
operation_name: str,
start_time: float,
success: bool,
bytes_transferred: int = 0,
error_message: Optional[str] = None,
) -> None:
"""Record operation metrics.
Args:
operation_name: Name of the operation.
start_time: Operation start time (from time.time()).
success: Whether operation succeeded.
bytes_transferred: Number of bytes transferred.
error_message: Error message if operation failed.
"""
if not self._enable_monitoring or not self._health_monitor:
return
elapsed_ms = (time.time() - start_time) * 1000
provider_name = self._provider.get_site_key()
self._health_monitor.record_request(
provider_name=provider_name,
success=success,
response_time_ms=elapsed_ms,
bytes_transferred=bytes_transferred,
error_message=error_message,
)
if success:
logger.debug(
f"{operation_name} succeeded for {provider_name} "
f"in {elapsed_ms:.2f}ms"
)
else:
logger.warning(
f"{operation_name} failed for {provider_name} "
f"in {elapsed_ms:.2f}ms: {error_message}"
)
def search(self, word: str) -> List[Dict[str, Any]]:
"""Search for anime series by name (with monitoring).
Args:
word: Search term to look for.
Returns:
List of found series as dictionaries.
"""
start_time = time.time()
try:
result = self._provider.search(word)
self._record_operation(
operation_name="search",
start_time=start_time,
success=True,
)
return result
except Exception as e:
self._record_operation(
operation_name="search",
start_time=start_time,
success=False,
error_message=str(e),
)
raise
def is_language(
self,
season: int,
episode: int,
key: str,
language: str = "German Dub",
) -> bool:
"""Check if episode exists in specified language (monitored).
Args:
season: Season number (1-indexed).
episode: Episode number (1-indexed).
key: Unique series identifier/key.
language: Language to check (default: German Dub).
Returns:
True if episode exists in specified language.
"""
start_time = time.time()
try:
result = self._provider.is_language(
season, episode, key, language
)
self._record_operation(
operation_name="is_language",
start_time=start_time,
success=True,
)
return result
except Exception as e:
self._record_operation(
operation_name="is_language",
start_time=start_time,
success=False,
error_message=str(e),
)
raise
def download(
self,
base_directory: str,
serie_folder: str,
season: int,
episode: int,
key: str,
language: str = "German Dub",
progress_callback: Optional[Callable[[str, Dict], None]] = None,
) -> bool:
"""Download episode to specified directory (with monitoring).
Args:
base_directory: Base directory for downloads.
serie_folder: Series folder name.
season: Season number.
episode: Episode number.
key: Unique series identifier/key.
language: Language version to download.
progress_callback: Optional callback for progress updates.
Returns:
True if download successful.
"""
start_time = time.time()
bytes_transferred = 0
# Wrap progress callback to track bytes
if progress_callback and self._enable_monitoring:
def monitored_callback(event_type: str, data: Dict) -> None:
nonlocal bytes_transferred
if event_type == "progress" and "downloaded" in data:
bytes_transferred = data.get("downloaded", 0)
progress_callback(event_type, data)
wrapped_callback = monitored_callback
else:
wrapped_callback = progress_callback
try:
result = self._provider.download(
base_directory=base_directory,
serie_folder=serie_folder,
season=season,
episode=episode,
key=key,
language=language,
progress_callback=wrapped_callback,
)
self._record_operation(
operation_name="download",
start_time=start_time,
success=result,
bytes_transferred=bytes_transferred,
)
return result
except Exception as e:
self._record_operation(
operation_name="download",
start_time=start_time,
success=False,
bytes_transferred=bytes_transferred,
error_message=str(e),
)
raise
def get_site_key(self) -> str:
"""Get the site key/identifier for this provider.
Returns:
Site key string.
"""
return self._provider.get_site_key()
def get_title(self, key: str) -> str:
"""Get the human-readable title of a series.
Args:
key: Unique series identifier/key.
Returns:
Series title string.
"""
start_time = time.time()
try:
result = self._provider.get_title(key)
self._record_operation(
operation_name="get_title",
start_time=start_time,
success=True,
)
return result
except Exception as e:
self._record_operation(
operation_name="get_title",
start_time=start_time,
success=False,
error_message=str(e),
)
raise
def get_season_episode_count(self, slug: str) -> Dict[int, int]:
"""Get season and episode counts for a series.
Args:
slug: Series slug/key identifier.
Returns:
Dictionary mapping season number to episode count.
"""
start_time = time.time()
try:
result = self._provider.get_season_episode_count(slug)
self._record_operation(
operation_name="get_season_episode_count",
start_time=start_time,
success=True,
)
return result
except Exception as e:
self._record_operation(
operation_name="get_season_episode_count",
start_time=start_time,
success=False,
error_message=str(e),
)
raise
@property
def wrapped_provider(self) -> Loader:
"""Get the underlying provider instance.
Returns:
Wrapped provider instance.
"""
return self._provider
def wrap_provider(
provider: Loader,
enable_monitoring: bool = True,
) -> Loader:
"""Wrap a provider with performance monitoring.
Args:
provider: Provider to wrap.
enable_monitoring: Whether to enable monitoring.
Returns:
Monitored provider wrapper.
"""
if isinstance(provider, MonitoredProviderWrapper):
# Already wrapped
return provider
return MonitoredProviderWrapper(
provider=provider,
enable_monitoring=enable_monitoring,
)

531
src/server/api/providers.py Normal file
View File

@@ -0,0 +1,531 @@
"""Provider management API endpoints.
This module provides REST API endpoints for monitoring and managing
anime providers, including health checks, configuration, and failover.
"""
import logging
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
from src.core.providers.config_manager import ProviderSettings, get_config_manager
from src.core.providers.failover import get_failover
from src.core.providers.health_monitor import get_health_monitor
from src.server.utils.dependencies import require_auth
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/providers", tags=["providers"])
# Request/Response Models
class ProviderHealthResponse(BaseModel):
"""Response model for provider health status."""
provider_name: str
is_available: bool
last_check_time: Optional[str] = None
total_requests: int
successful_requests: int
failed_requests: int
success_rate: float
average_response_time_ms: float
last_error: Optional[str] = None
last_error_time: Optional[str] = None
consecutive_failures: int
total_bytes_downloaded: int
uptime_percentage: float
class HealthSummaryResponse(BaseModel):
"""Response model for overall health summary."""
total_providers: int
available_providers: int
availability_percentage: float
average_success_rate: float
average_response_time_ms: float
providers: Dict[str, Dict[str, Any]]
class ProviderSettingsRequest(BaseModel):
"""Request model for updating provider settings."""
enabled: Optional[bool] = None
priority: Optional[int] = None
timeout_seconds: Optional[int] = Field(None, gt=0)
max_retries: Optional[int] = Field(None, ge=0)
retry_delay_seconds: Optional[float] = Field(None, gt=0)
max_concurrent_downloads: Optional[int] = Field(None, gt=0)
bandwidth_limit_mbps: Optional[float] = Field(None, gt=0)
class ProviderSettingsResponse(BaseModel):
"""Response model for provider settings."""
name: str
enabled: bool
priority: int
timeout_seconds: int
max_retries: int
retry_delay_seconds: float
max_concurrent_downloads: int
bandwidth_limit_mbps: Optional[float] = None
class FailoverStatsResponse(BaseModel):
"""Response model for failover statistics."""
total_providers: int
providers: List[str]
current_provider: str
max_retries: int
retry_delay: float
health_monitoring_enabled: bool
available_providers: Optional[List[str]] = None
unavailable_providers: Optional[List[str]] = None
# Health Monitoring Endpoints
@router.get("/health", response_model=HealthSummaryResponse)
async def get_providers_health(
auth: Optional[dict] = Depends(require_auth),
) -> HealthSummaryResponse:
"""Get overall provider health summary.
Args:
auth: Authentication token (optional).
Returns:
Health summary for all providers.
"""
try:
health_monitor = get_health_monitor()
summary = health_monitor.get_health_summary()
return HealthSummaryResponse(**summary)
except Exception as e:
logger.error(f"Failed to get provider health: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve provider health: {str(e)}",
)
@router.get("/health/{provider_name}", response_model=ProviderHealthResponse) # noqa: E501
async def get_provider_health(
provider_name: str,
auth: Optional[dict] = Depends(require_auth),
) -> ProviderHealthResponse:
"""Get health status for a specific provider.
Args:
provider_name: Name of the provider.
auth: Authentication token (optional).
Returns:
Health metrics for the provider.
Raises:
HTTPException: If provider not found or error occurs.
"""
try:
health_monitor = get_health_monitor()
metrics = health_monitor.get_provider_metrics(provider_name)
if not metrics:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Provider '{provider_name}' not found",
)
return ProviderHealthResponse(**metrics.to_dict())
except HTTPException:
raise
except Exception as e:
logger.error(
f"Failed to get health for {provider_name}: {e}",
exc_info=True,
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve provider health: {str(e)}",
)
@router.get("/available", response_model=List[str])
async def get_available_providers(
auth: Optional[dict] = Depends(require_auth),
) -> List[str]:
"""Get list of currently available providers.
Args:
auth: Authentication token (optional).
Returns:
List of available provider names.
"""
try:
health_monitor = get_health_monitor()
return health_monitor.get_available_providers()
except Exception as e:
logger.error(f"Failed to get available providers: {e}", exc_info=True) # noqa: E501
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve available providers: {str(e)}",
)
@router.get("/best", response_model=Dict[str, str])
async def get_best_provider(
auth: Optional[dict] = Depends(require_auth),
) -> Dict[str, str]:
"""Get the best performing provider.
Args:
auth: Authentication token (optional).
Returns:
Dictionary with best provider name.
"""
try:
health_monitor = get_health_monitor()
best = health_monitor.get_best_provider()
if not best:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="No available providers",
)
return {"provider": best}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get best provider: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to determine best provider: {str(e)}",
)
@router.post("/health/{provider_name}/reset")
async def reset_provider_health(
provider_name: str,
auth: Optional[dict] = Depends(require_auth),
) -> Dict[str, str]:
"""Reset health metrics for a specific provider.
Args:
provider_name: Name of the provider.
auth: Authentication token (optional).
Returns:
Success message.
Raises:
HTTPException: If provider not found or error occurs.
"""
try:
health_monitor = get_health_monitor()
success = health_monitor.reset_provider_metrics(provider_name)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Provider '{provider_name}' not found",
)
return {"message": f"Reset metrics for provider: {provider_name}"}
except HTTPException:
raise
except Exception as e:
logger.error(
f"Failed to reset health for {provider_name}: {e}",
exc_info=True,
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to reset provider health: {str(e)}",
)
# Configuration Endpoints
@router.get("/config", response_model=List[ProviderSettingsResponse])
async def get_all_provider_configs(
auth: Optional[dict] = Depends(require_auth),
) -> List[ProviderSettingsResponse]:
"""Get configuration for all providers.
Args:
auth: Authentication token (optional).
Returns:
List of provider configurations.
"""
try:
config_manager = get_config_manager()
all_settings = config_manager.get_all_provider_settings()
return [
ProviderSettingsResponse(**settings.to_dict())
for settings in all_settings.values()
]
except Exception as e:
logger.error(f"Failed to get provider configs: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve provider configurations: {str(e)}", # noqa: E501
)
@router.get(
"/config/{provider_name}", response_model=ProviderSettingsResponse
)
async def get_provider_config(
provider_name: str,
auth: Optional[dict] = Depends(require_auth),
) -> ProviderSettingsResponse:
"""Get configuration for a specific provider.
Args:
provider_name: Name of the provider.
auth: Authentication token (optional).
Returns:
Provider configuration.
Raises:
HTTPException: If provider not found or error occurs.
"""
try:
config_manager = get_config_manager()
settings = config_manager.get_provider_settings(provider_name)
if not settings:
# Return default settings
settings = ProviderSettings(name=provider_name)
return ProviderSettingsResponse(**settings.to_dict())
except Exception as e:
logger.error(
f"Failed to get config for {provider_name}: {e}",
exc_info=True,
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve provider configuration: {str(e)}", # noqa: E501
)
@router.put(
"/config/{provider_name}", response_model=ProviderSettingsResponse
)
async def update_provider_config(
provider_name: str,
settings: ProviderSettingsRequest,
auth: Optional[dict] = Depends(require_auth),
) -> ProviderSettingsResponse:
"""Update configuration for a specific provider.
Args:
provider_name: Name of the provider.
settings: Settings to update.
auth: Authentication token (optional).
Returns:
Updated provider configuration.
"""
try:
config_manager = get_config_manager()
# Update settings
update_dict = settings.dict(exclude_unset=True)
config_manager.update_provider_settings(
provider_name, **update_dict
)
# Get updated settings
updated = config_manager.get_provider_settings(provider_name)
if not updated:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to retrieve updated configuration",
)
return ProviderSettingsResponse(**updated.to_dict())
except HTTPException:
raise
except Exception as e:
logger.error(
f"Failed to update config for {provider_name}: {e}",
exc_info=True,
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update provider configuration: {str(e)}",
)
@router.post("/config/{provider_name}/enable")
async def enable_provider(
provider_name: str,
auth: Optional[dict] = Depends(require_auth),
) -> Dict[str, str]:
"""Enable a provider.
Args:
provider_name: Name of the provider.
auth: Authentication token (optional).
Returns:
Success message.
"""
try:
config_manager = get_config_manager()
config_manager.update_provider_settings(
provider_name, enabled=True
)
return {"message": f"Enabled provider: {provider_name}"}
except Exception as e:
logger.error(
f"Failed to enable {provider_name}: {e}", exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to enable provider: {str(e)}",
)
@router.post("/config/{provider_name}/disable")
async def disable_provider(
provider_name: str,
auth: Optional[dict] = Depends(require_auth),
) -> Dict[str, str]:
"""Disable a provider.
Args:
provider_name: Name of the provider.
auth: Authentication token (optional).
Returns:
Success message.
"""
try:
config_manager = get_config_manager()
config_manager.update_provider_settings(
provider_name, enabled=False
)
return {"message": f"Disabled provider: {provider_name}"}
except Exception as e:
logger.error(
f"Failed to disable {provider_name}: {e}", exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to disable provider: {str(e)}",
)
# Failover Endpoints
@router.get("/failover", response_model=FailoverStatsResponse)
async def get_failover_stats(
auth: Optional[dict] = Depends(require_auth),
) -> FailoverStatsResponse:
"""Get failover statistics and configuration.
Args:
auth: Authentication token (optional).
Returns:
Failover statistics.
"""
try:
failover = get_failover()
stats = failover.get_failover_stats()
return FailoverStatsResponse(**stats)
except Exception as e:
logger.error(f"Failed to get failover stats: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve failover statistics: {str(e)}",
)
@router.post("/failover/{provider_name}/add")
async def add_provider_to_failover(
provider_name: str,
auth: Optional[dict] = Depends(require_auth),
) -> Dict[str, str]:
"""Add a provider to the failover chain.
Args:
provider_name: Name of the provider.
auth: Authentication token (optional).
Returns:
Success message.
"""
try:
failover = get_failover()
failover.add_provider(provider_name)
return {"message": f"Added provider to failover: {provider_name}"}
except Exception as e:
logger.error(
f"Failed to add {provider_name} to failover: {e}",
exc_info=True,
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to add provider to failover: {str(e)}",
)
@router.delete("/failover/{provider_name}")
async def remove_provider_from_failover(
provider_name: str,
auth: Optional[dict] = Depends(require_auth),
) -> Dict[str, str]:
"""Remove a provider from the failover chain.
Args:
provider_name: Name of the provider.
auth: Authentication token (optional).
Returns:
Success message.
Raises:
HTTPException: If provider not found in failover chain.
"""
try:
failover = get_failover()
success = failover.remove_provider(provider_name)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Provider '{provider_name}' not in failover chain", # noqa: E501
)
return {
"message": f"Removed provider from failover: {provider_name}"
}
except HTTPException:
raise
except Exception as e:
logger.error(
f"Failed to remove {provider_name} from failover: {e}",
exc_info=True,
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to remove provider from failover: {str(e)}",
)

View File

@@ -25,6 +25,7 @@ from src.server.api.config import router as config_router
from src.server.api.diagnostics import router as diagnostics_router
from src.server.api.download import router as download_router
from src.server.api.logging import router as logging_router
from src.server.api.providers import router as providers_router
from src.server.api.scheduler import router as scheduler_router
from src.server.api.websocket import router as websocket_router
from src.server.controllers.error_controller import (
@@ -139,6 +140,7 @@ app.include_router(diagnostics_router)
app.include_router(analytics_router)
app.include_router(anime_router)
app.include_router(download_router)
app.include_router(providers_router)
app.include_router(websocket_router)
# Register exception handlers