""" Progress callback interfaces for core operations. This module defines clean interfaces for progress reporting, error handling, and completion notifications across all core operations (scanning, downloading). """ import logging from abc import ABC, abstractmethod from dataclasses import dataclass, field from enum import Enum from typing import Any, Dict, Optional class OperationType(str, Enum): """Types of operations that can report progress.""" SCAN = "scan" DOWNLOAD = "download" SEARCH = "search" INITIALIZATION = "initialization" class ProgressPhase(str, Enum): """Phases of an operation's lifecycle.""" STARTING = "starting" IN_PROGRESS = "in_progress" COMPLETING = "completing" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" @dataclass class ProgressContext: """ Complete context information for a progress update. Attributes: operation_type: Type of operation being performed operation_id: Unique identifier for this operation phase: Current phase of the operation current: Current progress value (e.g., files processed) total: Total progress value (e.g., total files) percentage: Completion percentage (0.0 to 100.0) message: Human-readable progress message details: Additional context-specific details metadata: Extra metadata for specialized use cases """ operation_type: OperationType operation_id: str phase: ProgressPhase current: int total: int percentage: float message: str details: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return { "operation_type": self.operation_type.value, "operation_id": self.operation_id, "phase": self.phase.value, "current": self.current, "total": self.total, "percentage": round(self.percentage, 2), "message": self.message, "details": self.details, "metadata": self.metadata, } @dataclass class ErrorContext: """ Context information for error callbacks. Attributes: operation_type: Type of operation that failed operation_id: Unique identifier for the operation error: The exception that occurred message: Human-readable error message recoverable: Whether the error is recoverable retry_count: Number of retry attempts made metadata: Additional error context """ operation_type: OperationType operation_id: str error: Exception message: str recoverable: bool = False retry_count: int = 0 metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return { "operation_type": self.operation_type.value, "operation_id": self.operation_id, "error_type": type(self.error).__name__, "error_message": str(self.error), "message": self.message, "recoverable": self.recoverable, "retry_count": self.retry_count, "metadata": self.metadata, } @dataclass class CompletionContext: """ Context information for completion callbacks. Attributes: operation_type: Type of operation that completed operation_id: Unique identifier for the operation success: Whether the operation completed successfully message: Human-readable completion message result_data: Result data from the operation statistics: Operation statistics (duration, items processed, etc.) metadata: Additional completion context """ operation_type: OperationType operation_id: str success: bool message: str result_data: Optional[Any] = None statistics: Dict[str, Any] = field(default_factory=dict) metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return { "operation_type": self.operation_type.value, "operation_id": self.operation_id, "success": self.success, "message": self.message, "statistics": self.statistics, "metadata": self.metadata, } class ProgressCallback(ABC): """ Abstract base class for progress callbacks. Implement this interface to receive progress updates from core operations. """ @abstractmethod def on_progress(self, context: ProgressContext) -> None: """ Called when progress is made in an operation. Args: context: Complete progress context information """ pass class ErrorCallback(ABC): """ Abstract base class for error callbacks. Implement this interface to receive error notifications from core operations. """ @abstractmethod def on_error(self, context: ErrorContext) -> None: """ Called when an error occurs during an operation. Args: context: Complete error context information """ pass class CompletionCallback(ABC): """ Abstract base class for completion callbacks. Implement this interface to receive completion notifications from core operations. """ @abstractmethod def on_completion(self, context: CompletionContext) -> None: """ Called when an operation completes (successfully or not). Args: context: Complete completion context information """ pass class CallbackManager: """ Manages multiple callbacks for an operation. This class allows registering multiple progress, error, and completion callbacks and dispatching events to all registered callbacks. """ def __init__(self): """Initialize the callback manager.""" self._progress_callbacks: list[ProgressCallback] = [] self._error_callbacks: list[ErrorCallback] = [] self._completion_callbacks: list[CompletionCallback] = [] def register_progress_callback(self, callback: ProgressCallback) -> None: """ Register a progress callback. Args: callback: Progress callback to register """ if callback not in self._progress_callbacks: self._progress_callbacks.append(callback) def register_error_callback(self, callback: ErrorCallback) -> None: """ Register an error callback. Args: callback: Error callback to register """ if callback not in self._error_callbacks: self._error_callbacks.append(callback) def register_completion_callback( self, callback: CompletionCallback ) -> None: """ Register a completion callback. Args: callback: Completion callback to register """ if callback not in self._completion_callbacks: self._completion_callbacks.append(callback) def unregister_progress_callback(self, callback: ProgressCallback) -> None: """ Unregister a progress callback. Args: callback: Progress callback to unregister """ if callback in self._progress_callbacks: self._progress_callbacks.remove(callback) def unregister_error_callback(self, callback: ErrorCallback) -> None: """ Unregister an error callback. Args: callback: Error callback to unregister """ if callback in self._error_callbacks: self._error_callbacks.remove(callback) def unregister_completion_callback( self, callback: CompletionCallback ) -> None: """ Unregister a completion callback. Args: callback: Completion callback to unregister """ if callback in self._completion_callbacks: self._completion_callbacks.remove(callback) def notify_progress(self, context: ProgressContext) -> None: """ Notify all registered progress callbacks. Args: context: Progress context to send """ for callback in self._progress_callbacks: try: callback.on_progress(context) except Exception as e: # Log but don't let callback errors break the operation logging.error( "Error in progress callback %s: %s", callback, e, exc_info=True ) def notify_error(self, context: ErrorContext) -> None: """ Notify all registered error callbacks. Args: context: Error context to send """ for callback in self._error_callbacks: try: callback.on_error(context) except Exception as e: # Log but don't let callback errors break the operation logging.error( "Error in error callback %s: %s", callback, e, exc_info=True ) def notify_completion(self, context: CompletionContext) -> None: """ Notify all registered completion callbacks. Args: context: Completion context to send """ for callback in self._completion_callbacks: try: callback.on_completion(context) except Exception as e: # Log but don't let callback errors break the operation logging.error( "Error in completion callback %s: %s", callback, e, exc_info=True ) def clear_all_callbacks(self) -> None: """Clear all registered callbacks.""" self._progress_callbacks.clear() self._error_callbacks.clear() self._completion_callbacks.clear()