"""Progress service for managing real-time progress updates. This module provides a centralized service for tracking and broadcasting real-time progress updates for downloads, scans, queue changes, and system events. It integrates with the WebSocket service to push updates to connected clients. """ from __future__ import annotations import asyncio import time from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum from typing import Any, Callable, Dict, List, Optional import structlog logger = structlog.get_logger(__name__) class ProgressType(str, Enum): """Types of progress updates.""" DOWNLOAD = "download" SCAN = "scan" QUEUE = "queue" SYSTEM = "system" ERROR = "error" class ProgressStatus(str, Enum): """Status of a progress operation.""" STARTED = "started" IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" @dataclass class ProgressUpdate: """Represents a progress update event. Attributes: id: Unique identifier for this progress operation type: Type of progress (download, scan, etc.) status: Current status of the operation title: Human-readable title message: Detailed message percent: Completion percentage (0-100) current: Current progress value total: Total progress value key: Optional series identifier (provider key, e.g., 'attack-on-titan') Used as the primary identifier for series-related operations folder: Optional series folder name (e.g., 'Attack on Titan (2013)') Used for display and filesystem operations only metadata: Additional metadata started_at: When operation started updated_at: When last updated """ id: str type: ProgressType status: ProgressStatus title: str message: str = "" percent: float = 0.0 current: int = 0 total: int = 0 key: Optional[str] = None folder: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) started_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) def to_dict(self) -> Dict[str, Any]: """Convert progress update to dictionary. Returns: Dictionary representation with all fields including optional key (series identifier) and folder (display metadata). """ result = { "id": self.id, "type": self.type.value, "status": self.status.value, "title": self.title, "message": self.message, "percent": round(self.percent, 2), "current": self.current, "total": self.total, # Make a copy to prevent mutation issues "metadata": self.metadata.copy(), "started_at": self.started_at.isoformat(), "updated_at": self.updated_at.isoformat(), } # Include optional series identifier fields if self.key is not None: result["key"] = self.key if self.folder is not None: result["folder"] = self.folder return result @dataclass class ProgressEvent: """Represents a progress event for subscribers. Attributes: event_type: Type of event (e.g., 'download_progress') progress_id: Unique identifier for the progress operation progress: The progress update data room: WebSocket room to broadcast to (default: 'progress') """ event_type: str progress_id: str progress: ProgressUpdate room: str = "progress" def to_dict(self) -> Dict[str, Any]: """Convert event to dictionary for broadcasting.""" return { "type": self.event_type, "data": self.progress.to_dict(), } class ProgressServiceError(Exception): """Service-level exception for progress operations.""" # Mapping from ProgressType to WebSocket room names # This ensures compatibility with the valid rooms defined in the WebSocket API: # "downloads", "queue", "scan", "system", "errors" _PROGRESS_TYPE_TO_ROOM: Dict[ProgressType, str] = { ProgressType.DOWNLOAD: "downloads", ProgressType.SCAN: "scan", ProgressType.QUEUE: "queue", ProgressType.SYSTEM: "system", ProgressType.ERROR: "errors", } def _get_room_for_progress_type(progress_type: ProgressType) -> str: """Get the WebSocket room name for a progress type. Args: progress_type: The type of progress update Returns: The WebSocket room name to broadcast to """ return _PROGRESS_TYPE_TO_ROOM.get(progress_type, "system") class ProgressService: """Manages real-time progress updates and broadcasting. Features: - Track multiple concurrent progress operations - Calculate progress percentages and rates - Broadcast updates via WebSocket - Manage progress lifecycle (start, update, complete, fail) - Support for different progress types (download, scan, queue) """ # Minimum interval between broadcasts in seconds (300ms) MIN_BROADCAST_INTERVAL: float = 0.3 def __init__(self): """Initialize the progress service.""" # Active progress operations: id -> ProgressUpdate self._active_progress: Dict[str, ProgressUpdate] = {} # Completed progress history (limited size) self._history: Dict[str, ProgressUpdate] = {} self._max_history_size = 50 # Event subscribers: event_name -> list of handlers self._event_handlers: Dict[ str, List[Callable[[ProgressEvent], None]] ] = {} # Track last broadcast time per progress_id for time-based throttling self._last_broadcast_time: Dict[str, float] = {} # Lock for thread-safe operations self._lock = asyncio.Lock() logger.info("ProgressService initialized") def subscribe( self, event_name: str, handler: Callable[[ProgressEvent], None] ) -> None: """Subscribe to progress events. Args: event_name: Name of event to subscribe to (e.g., 'progress_updated') handler: Async function to call when event occurs """ if event_name not in self._event_handlers: self._event_handlers[event_name] = [] self._event_handlers[event_name].append(handler) logger.debug("Event handler subscribed", event_type=event_name) def unsubscribe( self, event_name: str, handler: Callable[[ProgressEvent], None] ) -> None: """Unsubscribe from progress events. Args: event_name: Name of event to unsubscribe from handler: Handler function to remove """ if event_name in self._event_handlers: try: self._event_handlers[event_name].remove(handler) logger.debug( "Event handler unsubscribed", event_type=event_name ) except ValueError: logger.warning( "Handler not found for unsubscribe", event_type=event_name, ) async def _emit_event(self, event: ProgressEvent) -> None: """Emit event to all subscribers. Args: event: Progress event to emit Note: Errors in individual handlers are logged but do not prevent other handlers from executing. """ event_name = "progress_updated" if event_name in self._event_handlers: handlers = self._event_handlers[event_name] if handlers: # Execute all handlers, capturing exceptions tasks = [handler(event) for handler in handlers] # Ignore type error - tasks will be coroutines at runtime results = await asyncio.gather( *tasks, return_exceptions=True ) # type: ignore[arg-type] # Log any exceptions that occurred for idx, result in enumerate(results): if isinstance(result, Exception): logger.error( "Event handler raised exception", event_type=event_name, error=str(result), handler_index=idx, ) async def start_progress( self, progress_id: str, progress_type: ProgressType, title: str, total: int = 0, message: str = "", key: Optional[str] = None, folder: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ) -> ProgressUpdate: """Start a new progress operation. Args: progress_id: Unique identifier for this progress progress_type: Type of progress operation title: Human-readable title total: Total items/bytes to process message: Initial message key: Optional series identifier (provider key) Used as primary identifier for series-related operations folder: Optional series folder name Used for display and filesystem operations only metadata: Additional metadata Returns: Created progress update object Raises: ProgressServiceError: If progress already exists """ async with self._lock: if progress_id in self._active_progress: raise ProgressServiceError( f"Progress with id '{progress_id}' already exists" ) update = ProgressUpdate( id=progress_id, type=progress_type, status=ProgressStatus.STARTED, title=title, message=message, total=total, key=key, folder=folder, metadata=metadata or {}, ) self._active_progress[progress_id] = update logger.info( "Progress started", progress_id=progress_id, type=progress_type.value, title=title, key=key, folder=folder, ) # Emit event to subscribers room = _get_room_for_progress_type(progress_type) event = ProgressEvent( event_type=f"{progress_type.value}_progress", progress_id=progress_id, progress=update, room=room, ) await self._emit_event(event) return update async def update_progress( self, progress_id: str, current: Optional[int] = None, total: Optional[int] = None, message: Optional[str] = None, key: Optional[str] = None, folder: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, force_broadcast: bool = False, ) -> ProgressUpdate: """Update an existing progress operation. Args: progress_id: Progress identifier current: Current progress value total: Updated total value message: Updated message key: Optional series identifier (provider key) folder: Optional series folder name metadata: Additional metadata to merge force_broadcast: Force broadcasting even for small changes Returns: Updated progress object Raises: ProgressServiceError: If progress not found """ async with self._lock: if progress_id not in self._active_progress: raise ProgressServiceError( f"Progress with id '{progress_id}' not found" ) update = self._active_progress[progress_id] old_percent = update.percent # Update fields if current is not None: update.current = current if total is not None: update.total = total if message is not None: update.message = message if key is not None: update.key = key if folder is not None: update.folder = folder if metadata: update.metadata.update(metadata) # Calculate percentage if update.total > 0: update.percent = (update.current / update.total) * 100 else: update.percent = 0.0 update.status = ProgressStatus.IN_PROGRESS update.updated_at = datetime.now(timezone.utc) # Time-based throttle: broadcast at most every 300ms, # or immediately for significant changes / forced broadcasts now = time.monotonic() last_broadcast = self._last_broadcast_time.get(progress_id, 0.0) time_since_last = now - last_broadcast percent_change = abs(update.percent - old_percent) should_broadcast = ( force_broadcast or percent_change >= 1.0 or time_since_last >= self.MIN_BROADCAST_INTERVAL ) if should_broadcast: self._last_broadcast_time[progress_id] = time.monotonic() room = _get_room_for_progress_type(update.type) event = ProgressEvent( event_type=f"{update.type.value}_progress", progress_id=progress_id, progress=update, room=room, ) await self._emit_event(event) return update async def complete_progress( self, progress_id: str, message: str = "Completed successfully", metadata: Optional[Dict[str, Any]] = None, ) -> ProgressUpdate: """Mark a progress operation as completed. Args: progress_id: Progress identifier message: Completion message metadata: Additional metadata Returns: Completed progress object Raises: ProgressServiceError: If progress not found """ async with self._lock: if progress_id not in self._active_progress: raise ProgressServiceError( f"Progress with id '{progress_id}' not found" ) update = self._active_progress[progress_id] update.status = ProgressStatus.COMPLETED update.message = message update.percent = 100.0 update.current = update.total update.updated_at = datetime.now(timezone.utc) if metadata: update.metadata.update(metadata) # Move to history del self._active_progress[progress_id] self._last_broadcast_time.pop(progress_id, None) self._add_to_history(update) logger.info( "Progress completed", progress_id=progress_id, type=update.type.value, ) # Emit completion event room = _get_room_for_progress_type(update.type) event = ProgressEvent( event_type=f"{update.type.value}_progress", progress_id=progress_id, progress=update, room=room, ) await self._emit_event(event) return update async def fail_progress( self, progress_id: str, error_message: str, metadata: Optional[Dict[str, Any]] = None, ) -> ProgressUpdate: """Mark a progress operation as failed. Args: progress_id: Progress identifier error_message: Error description metadata: Additional error metadata Returns: Failed progress object Raises: ProgressServiceError: If progress not found """ async with self._lock: if progress_id not in self._active_progress: raise ProgressServiceError( f"Progress with id '{progress_id}' not found" ) update = self._active_progress[progress_id] update.status = ProgressStatus.FAILED update.message = error_message update.updated_at = datetime.now(timezone.utc) if metadata: update.metadata.update(metadata) # Move to history del self._active_progress[progress_id] self._last_broadcast_time.pop(progress_id, None) self._add_to_history(update) logger.error( "Progress failed", progress_id=progress_id, type=update.type.value, error=error_message, ) # Emit failure event room = _get_room_for_progress_type(update.type) event = ProgressEvent( event_type=f"{update.type.value}_progress", progress_id=progress_id, progress=update, room=room, ) await self._emit_event(event) return update async def cancel_progress( self, progress_id: str, message: str = "Cancelled by user", ) -> ProgressUpdate: """Cancel a progress operation. Args: progress_id: Progress identifier message: Cancellation message Returns: Cancelled progress object Raises: ProgressServiceError: If progress not found """ async with self._lock: if progress_id not in self._active_progress: raise ProgressServiceError( f"Progress with id '{progress_id}' not found" ) update = self._active_progress[progress_id] update.status = ProgressStatus.CANCELLED update.message = message update.updated_at = datetime.now(timezone.utc) # Move to history del self._active_progress[progress_id] self._last_broadcast_time.pop(progress_id, None) self._add_to_history(update) logger.info( "Progress cancelled", progress_id=progress_id, type=update.type.value, ) # Emit cancellation event room = _get_room_for_progress_type(update.type) event = ProgressEvent( event_type=f"{update.type.value}_progress", progress_id=progress_id, progress=update, room=room, ) await self._emit_event(event) return update def _add_to_history(self, update: ProgressUpdate) -> None: """Add completed progress to history with size limit.""" self._history[update.id] = update # Maintain history size limit if len(self._history) > self._max_history_size: # Remove oldest entries oldest_keys = sorted( self._history.keys(), key=lambda k: self._history[k].updated_at, )[: len(self._history) - self._max_history_size] for key in oldest_keys: del self._history[key] async def get_progress(self, progress_id: str) -> Optional[ProgressUpdate]: """Get current progress state. Args: progress_id: Progress identifier Returns: Progress update object or None if not found """ async with self._lock: if progress_id in self._active_progress: return self._active_progress[progress_id] if progress_id in self._history: return self._history[progress_id] return None async def get_all_active_progress( self, progress_type: Optional[ProgressType] = None ) -> Dict[str, ProgressUpdate]: """Get all active progress operations. Args: progress_type: Optional filter by progress type Returns: Dictionary of progress_id -> ProgressUpdate """ async with self._lock: if progress_type: return { pid: update for pid, update in self._active_progress.items() if update.type == progress_type } return self._active_progress.copy() async def clear_history(self) -> None: """Clear progress history.""" async with self._lock: self._history.clear() logger.info("Progress history cleared") # Global singleton instance _progress_service: Optional[ProgressService] = None def get_progress_service() -> ProgressService: """Get or create the global progress service instance. Returns: Global ProgressService instance """ global _progress_service if _progress_service is None: _progress_service = ProgressService() return _progress_service