Convert handle_network_failure and handle_download_failure from instance methods to static methods. Hardcode retry params (max_retries, delays) instead of using instance state. Improves testability and removes implicit dependencies. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
213 lines
6.4 KiB
Python
213 lines
6.4 KiB
Python
"""
|
|
Error handling and recovery strategies for core providers.
|
|
|
|
This module provides custom exceptions and decorators for handling
|
|
errors in provider operations with automatic retry mechanisms.
|
|
"""
|
|
|
|
import functools
|
|
import logging
|
|
from typing import Any, Callable, Optional, TypeVar
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Type variable for decorator
|
|
F = TypeVar("F", bound=Callable[..., Any])
|
|
|
|
|
|
class RetryableError(Exception):
|
|
"""Exception that indicates an operation can be safely retried."""
|
|
|
|
pass
|
|
|
|
|
|
class NonRetryableError(Exception):
|
|
"""Exception that indicates an operation should not be retried."""
|
|
|
|
pass
|
|
|
|
|
|
class NetworkError(Exception):
|
|
"""Exception for network-related errors."""
|
|
|
|
pass
|
|
|
|
|
|
class DownloadError(Exception):
|
|
"""Exception for download-related errors."""
|
|
|
|
pass
|
|
|
|
|
|
class RecoveryStrategies:
|
|
"""Strategies for handling errors and recovering from failures."""
|
|
|
|
def __init__(
|
|
self,
|
|
max_retries: int = 3,
|
|
base_delay: float = 1.0,
|
|
max_delay: float = 60.0,
|
|
exponential_base: float = 2.0,
|
|
) -> None:
|
|
"""Initialize recovery strategies.
|
|
|
|
Args:
|
|
max_retries: Maximum number of retry attempts.
|
|
base_delay: Initial delay between retries in seconds.
|
|
max_delay: Maximum delay between retries in seconds.
|
|
exponential_base: Base for exponential backoff multiplier.
|
|
"""
|
|
self.max_retries = max_retries
|
|
self.base_delay = base_delay
|
|
self.max_delay = max_delay
|
|
self.exponential_base = exponential_base
|
|
|
|
def _calculate_delay(self, attempt: int) -> float:
|
|
"""Calculate delay for given retry attempt using exponential backoff.
|
|
|
|
Args:
|
|
attempt: Zero-based retry attempt number.
|
|
|
|
Returns:
|
|
Delay in seconds before next retry.
|
|
"""
|
|
delay = self.base_delay * (self.exponential_base ** attempt)
|
|
return min(delay, self.max_delay)
|
|
|
|
@staticmethod
|
|
def handle_network_failure(
|
|
func: Callable, *args: Any, **kwargs: Any
|
|
) -> Any:
|
|
"""Handle network failures with exponential backoff retry logic."""
|
|
last_error: Optional[Exception] = None
|
|
max_retries = 3
|
|
base_delay = 1.0
|
|
max_delay = 60.0
|
|
exponential_base = 2.0
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except (NetworkError, ConnectionError, TimeoutError) as exc:
|
|
last_error = exc
|
|
if attempt < max_retries - 1:
|
|
delay = base_delay * (exponential_base ** attempt)
|
|
delay = min(delay, max_delay)
|
|
logger.warning(
|
|
"Network error on attempt %d/%d, retrying in %.1fs: %s",
|
|
attempt + 1, max_retries, delay, exc
|
|
)
|
|
import time
|
|
time.sleep(delay)
|
|
continue
|
|
if last_error:
|
|
raise last_error
|
|
raise NetworkError("Network failure after retries")
|
|
|
|
@staticmethod
|
|
def handle_download_failure(
|
|
func: Callable, *args: Any, **kwargs: Any
|
|
) -> Any:
|
|
"""Handle download failures with exponential backoff retry logic."""
|
|
last_error: Optional[Exception] = None
|
|
max_retries = 2
|
|
base_delay = 1.0
|
|
max_delay = 60.0
|
|
exponential_base = 2.0
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except DownloadError as exc:
|
|
last_error = exc
|
|
if attempt < max_retries - 1:
|
|
delay = base_delay * (exponential_base ** attempt)
|
|
delay = min(delay, max_delay)
|
|
logger.warning(
|
|
"Download error on attempt %d/%d, retrying in %.1fs: %s",
|
|
attempt + 1, max_retries, delay, exc
|
|
)
|
|
import time
|
|
time.sleep(delay)
|
|
continue
|
|
if last_error:
|
|
raise last_error
|
|
raise DownloadError("Download failed after retries")
|
|
|
|
|
|
class FileCorruptionDetector:
|
|
"""Detector for corrupted files."""
|
|
|
|
@staticmethod
|
|
def is_valid_video_file(filepath: str) -> bool:
|
|
"""Check if a video file is valid and not corrupted."""
|
|
try:
|
|
import os
|
|
if not os.path.exists(filepath):
|
|
return False
|
|
|
|
file_size = os.path.getsize(filepath)
|
|
# Video files should be at least 1MB
|
|
return file_size > 1024 * 1024
|
|
except Exception as e:
|
|
logger.error("Error checking file validity: %s", e)
|
|
return False
|
|
|
|
|
|
def with_error_recovery(
|
|
max_retries: int = 3, context: str = ""
|
|
) -> Callable[[F], F]:
|
|
"""
|
|
Decorator for adding error recovery to functions.
|
|
|
|
Args:
|
|
max_retries: Maximum number of retry attempts
|
|
context: Context string for logging
|
|
|
|
Returns:
|
|
Decorated function with retry logic
|
|
"""
|
|
|
|
def decorator(func: F) -> F:
|
|
@functools.wraps(func)
|
|
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
|
last_error = None
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except NonRetryableError:
|
|
raise
|
|
except Exception as e:
|
|
last_error = e
|
|
if attempt < max_retries - 1:
|
|
logger.warning(
|
|
"Error in %s (attempt %d/%d): %s, retrying...",
|
|
context,
|
|
attempt + 1,
|
|
max_retries,
|
|
e,
|
|
)
|
|
else:
|
|
logger.error(
|
|
"Error in %s failed after %d attempts: %s",
|
|
context,
|
|
max_retries,
|
|
e,
|
|
)
|
|
|
|
if last_error:
|
|
raise last_error
|
|
|
|
raise RuntimeError(
|
|
f"Unexpected error in {context} after {max_retries} attempts"
|
|
)
|
|
|
|
return wrapper # type: ignore
|
|
|
|
return decorator
|
|
|
|
|
|
# Create module-level instances for use in provider code
|
|
recovery_strategies = RecoveryStrategies()
|
|
file_corruption_detector = FileCorruptionDetector()
|