"""Provider health monitoring system for tracking availability and performance. This module provides health monitoring capabilities for anime providers, tracking metrics like availability, response times, success rates, and bandwidth usage. """ import asyncio import logging from collections import defaultdict, deque from dataclasses import dataclass from datetime import datetime, timedelta from typing import Any, Deque, Dict, List, Optional logger = logging.getLogger(__name__) @dataclass class ProviderHealthMetrics: """Health metrics for a single provider.""" provider_name: str is_available: bool = True last_check_time: Optional[datetime] = None total_requests: int = 0 successful_requests: int = 0 failed_requests: int = 0 average_response_time_ms: float = 0.0 last_error: Optional[str] = None last_error_time: Optional[datetime] = None consecutive_failures: int = 0 total_bytes_downloaded: int = 0 uptime_percentage: float = 100.0 @property def success_rate(self) -> float: """Calculate success rate as percentage.""" if self.total_requests == 0: return 0.0 return (self.successful_requests / self.total_requests) * 100 @property def failure_rate(self) -> float: """Calculate failure rate as percentage.""" return 100.0 - self.success_rate def to_dict(self) -> Dict[str, Any]: """Convert metrics to dictionary.""" return { "provider_name": self.provider_name, "is_available": self.is_available, "last_check_time": ( self.last_check_time.isoformat() if self.last_check_time else None ), "total_requests": self.total_requests, "successful_requests": self.successful_requests, "failed_requests": self.failed_requests, "success_rate": round(self.success_rate, 2), "average_response_time_ms": round( self.average_response_time_ms, 2 ), "last_error": self.last_error, "last_error_time": ( self.last_error_time.isoformat() if self.last_error_time else None ), "consecutive_failures": self.consecutive_failures, "total_bytes_downloaded": self.total_bytes_downloaded, "uptime_percentage": round(self.uptime_percentage, 2), } @dataclass class RequestMetric: """Individual request metric.""" timestamp: datetime success: bool response_time_ms: float bytes_transferred: int = 0 error_message: Optional[str] = None class ProviderHealthMonitor: """Monitors health and performance of anime providers.""" def __init__( self, max_history_size: int = 1000, health_check_interval: int = 300, # 5 minutes failure_threshold: int = 3, ): """Initialize provider health monitor. Args: max_history_size: Maximum number of request metrics to keep per provider. health_check_interval: Interval between health checks in seconds. failure_threshold: Number of consecutive failures before marking unavailable. """ self._max_history_size = max_history_size self._health_check_interval = health_check_interval self._failure_threshold = failure_threshold # Provider metrics storage self._metrics: Dict[str, ProviderHealthMetrics] = {} self._request_history: Dict[str, Deque[RequestMetric]] = defaultdict( lambda: deque(maxlen=max_history_size) ) # Health check task self._health_check_task: Optional[asyncio.Task] = None self._is_running = False logger.info("Provider health monitor initialized") def start_monitoring(self) -> None: """Start background health monitoring.""" if self._is_running: logger.warning("Health monitoring already running") return self._is_running = True self._health_check_task = asyncio.create_task( self._health_check_loop() ) logger.info("Provider health monitoring started") async def stop_monitoring(self) -> None: """Stop background health monitoring.""" self._is_running = False if self._health_check_task: self._health_check_task.cancel() try: await self._health_check_task except asyncio.CancelledError: pass self._health_check_task = None logger.info("Provider health monitoring stopped") async def _health_check_loop(self) -> None: """Background health check loop.""" while self._is_running: try: await self._perform_health_checks() await asyncio.sleep(self._health_check_interval) except asyncio.CancelledError: break except Exception as e: logger.exception("Error in health check loop: %s", e) await asyncio.sleep(self._health_check_interval) async def _perform_health_checks(self) -> None: """Perform health checks on all registered providers.""" for provider_name in list(self._metrics.keys()): try: metrics = self._metrics[provider_name] metrics.last_check_time = datetime.now() # Update uptime percentage based on recent history recent_metrics = self._get_recent_metrics( provider_name, minutes=60 ) if recent_metrics: successful = sum(1 for m in recent_metrics if m.success) metrics.uptime_percentage = ( successful / len(recent_metrics) ) * 100 logger.debug( f"Health check for {provider_name}: " f"available={metrics.is_available}, " f"success_rate={metrics.success_rate:.2f}%" ) except Exception as e: logger.error( f"Error checking health for {provider_name}: {e}", exc_info=True, ) def record_request( self, provider_name: str, success: bool, response_time_ms: float, bytes_transferred: int = 0, error_message: Optional[str] = None, ) -> None: """Record a provider request for health tracking. Args: provider_name: Name of the provider. success: Whether the request was successful. response_time_ms: Response time in milliseconds. bytes_transferred: Number of bytes transferred. error_message: Error message if request failed. """ # Initialize metrics if not exists if provider_name not in self._metrics: self._metrics[provider_name] = ProviderHealthMetrics( provider_name=provider_name ) metrics = self._metrics[provider_name] # Update request counts metrics.total_requests += 1 if success: metrics.successful_requests += 1 metrics.consecutive_failures = 0 else: metrics.failed_requests += 1 metrics.consecutive_failures += 1 metrics.last_error = error_message metrics.last_error_time = datetime.now() # Update availability based on consecutive failures if metrics.consecutive_failures >= self._failure_threshold: if metrics.is_available: logger.warning( f"Provider {provider_name} marked as unavailable after " f"{metrics.consecutive_failures} consecutive failures" ) metrics.is_available = False else: metrics.is_available = True # Update average response time total_time = metrics.average_response_time_ms * ( metrics.total_requests - 1 ) metrics.average_response_time_ms = ( total_time + response_time_ms ) / metrics.total_requests # Update bytes transferred metrics.total_bytes_downloaded += bytes_transferred # Store request metric in history request_metric = RequestMetric( timestamp=datetime.now(), success=success, response_time_ms=response_time_ms, bytes_transferred=bytes_transferred, error_message=error_message, ) self._request_history[provider_name].append(request_metric) logger.debug( f"Recorded request for {provider_name}: " f"success={success}, time={response_time_ms:.2f}ms" ) def get_provider_metrics( self, provider_name: str ) -> Optional[ProviderHealthMetrics]: """Get health metrics for a specific provider. Args: provider_name: Name of the provider. Returns: Provider health metrics or None if not found. """ return self._metrics.get(provider_name) def get_all_metrics(self) -> Dict[str, ProviderHealthMetrics]: """Get health metrics for all providers. Returns: Dictionary mapping provider names to their metrics. """ return self._metrics.copy() def get_available_providers(self) -> List[str]: """Get list of currently available providers. Returns: List of available provider names. """ return [ name for name, metrics in self._metrics.items() if metrics.is_available ] def get_best_provider(self) -> Optional[str]: """Get the best performing available provider. Best is determined by: 1. Availability 2. Success rate 3. Response time Returns: Name of best provider or None if none available. """ available = [ (name, metrics) for name, metrics in self._metrics.items() if metrics.is_available ] if not available: return None # Sort by success rate (descending) then response time (ascending) available.sort( key=lambda x: (-x[1].success_rate, x[1].average_response_time_ms) ) best_provider = available[0][0] logger.debug("Best provider selected: %s", best_provider) return best_provider def _get_recent_metrics( self, provider_name: str, minutes: int = 60 ) -> List[RequestMetric]: """Get recent request metrics for a provider. Args: provider_name: Name of the provider. minutes: Number of minutes to look back. Returns: List of recent request metrics. """ if provider_name not in self._request_history: return [] cutoff_time = datetime.now() - timedelta(minutes=minutes) return [ metric for metric in self._request_history[provider_name] if metric.timestamp >= cutoff_time ] def reset_provider_metrics(self, provider_name: str) -> bool: """Reset metrics for a specific provider. Args: provider_name: Name of the provider. Returns: True if reset successful, False if provider not found. """ if provider_name not in self._metrics: return False self._metrics[provider_name] = ProviderHealthMetrics( provider_name=provider_name ) self._request_history[provider_name].clear() logger.info("Reset metrics for provider: %s", provider_name) return True def get_health_summary(self) -> Dict[str, Any]: """Get summary of overall provider health. Returns: Dictionary with health summary statistics. """ total_providers = len(self._metrics) available_providers = len(self.get_available_providers()) if total_providers == 0: return { "total_providers": 0, "available_providers": 0, "availability_percentage": 0.0, "average_success_rate": 0.0, "average_response_time_ms": 0.0, } avg_success_rate = sum( m.success_rate for m in self._metrics.values() ) / total_providers avg_response_time = sum( m.average_response_time_ms for m in self._metrics.values() ) / total_providers return { "total_providers": total_providers, "available_providers": available_providers, "availability_percentage": ( available_providers / total_providers ) * 100, "average_success_rate": round(avg_success_rate, 2), "average_response_time_ms": round(avg_response_time, 2), "providers": { name: metrics.to_dict() for name, metrics in self._metrics.items() }, } # Global health monitor instance _health_monitor: Optional[ProviderHealthMonitor] = None def get_health_monitor() -> ProviderHealthMonitor: """Get or create global provider health monitor instance. Returns: Global ProviderHealthMonitor instance. """ global _health_monitor if _health_monitor is None: _health_monitor = ProviderHealthMonitor() return _health_monitor