From 2441730862c3f7c6e3eea5e735e65f8c3c99bbd1 Mon Sep 17 00:00:00 2001 From: Lukas Date: Fri, 7 Nov 2025 18:40:36 +0100 Subject: [PATCH] fix progress events --- docs/progress_service_architecture.md | 450 ++++++++++++++++++++++++ src/server/fastapi_app.py | 24 +- src/server/services/download_service.py | 259 +++++--------- src/server/services/progress_service.py | 156 ++++++-- src/server/utils/dependencies.py | 33 +- 5 files changed, 673 insertions(+), 249 deletions(-) create mode 100644 docs/progress_service_architecture.md diff --git a/docs/progress_service_architecture.md b/docs/progress_service_architecture.md new file mode 100644 index 0000000..675e418 --- /dev/null +++ b/docs/progress_service_architecture.md @@ -0,0 +1,450 @@ +# Progress Service Architecture + +## Overview + +The ProgressService serves as the **single source of truth** for all real-time progress tracking in the Aniworld application. This architecture follows a clean, decoupled design where progress updates flow through a well-defined pipeline. + +## Architecture Diagram + +``` +┌─────────────┐ +│ SeriesApp │ ← Core download/scan logic +└──────┬──────┘ + │ Events (download_status, scan_status) + ▼ +┌─────────────────┐ +│ AnimeService │ ← Subscribes to SeriesApp events +└────────┬────────┘ + │ Forwards events + ▼ +┌──────────────────┐ +│ ProgressService │ ← Single source of truth for progress +└────────┬─────────┘ + │ Emits events to subscribers + ▼ +┌──────────────────┐ +│ WebSocketService │ ← Subscribes to progress events +└──────────────────┘ + │ + ▼ + Connected clients receive real-time updates +``` + +## Components + +### 1. SeriesApp (Core Layer) + +**Location**: `src/core/SeriesApp.py` + +**Responsibilities**: + +- Execute actual downloads and scans +- Fire events with detailed progress information +- Manage download state and error handling + +**Events**: + +- `download_status`: Fired during downloads + + - `started`: Download begins + - `progress`: Progress updates (percent, speed, ETA) + - `completed`: Download finished successfully + - `failed`: Download encountered an error + +- `scan_status`: Fired during library scans + - `started`: Scan begins + - `progress`: Scan progress updates + - `completed`: Scan finished + - `failed`: Scan encountered an error + - `cancelled`: Scan was cancelled + +### 2. AnimeService (Service Layer) + +**Location**: `src/server/services/anime_service.py` + +**Responsibilities**: + +- Subscribe to SeriesApp events +- Translate SeriesApp events into ProgressService updates +- Provide async interface for web layer + +**Event Handlers**: + +```python +def _on_download_status(self, args): + """Translates download events to progress service.""" + if args.status == "started": + await progress_service.start_progress(...) + elif args.status == "progress": + await progress_service.update_progress(...) + elif args.status == "completed": + await progress_service.complete_progress(...) + elif args.status == "failed": + await progress_service.fail_progress(...) + +def _on_scan_status(self, args): + """Translates scan events to progress service.""" + # Similar pattern as download_status +``` + +### 3. ProgressService (Service Layer) + +**Location**: `src/server/services/progress_service.py` + +**Responsibilities**: + +- Central progress tracking for all operations +- Maintain active and historical progress records +- Calculate percentages and rates +- Emit events to subscribers (event-based architecture) + +**Progress Types**: + +- `DOWNLOAD`: Individual episode downloads +- `SCAN`: Library scans for missing episodes +- `QUEUE`: Download queue operations +- `SYSTEM`: System-level operations +- `ERROR`: Error notifications + +**Event System**: + +```python +# Subscribe to progress events +def subscribe(event_name: str, handler: Callable[[ProgressEvent], None]) +def unsubscribe(event_name: str, handler: Callable[[ProgressEvent], None]) + +# Internal event emission +async def _emit_event(event: ProgressEvent) +``` + +**Key Methods**: + +```python +async def start_progress(progress_id, progress_type, title, ...): + """Start tracking a new operation.""" + +async def update_progress(progress_id, current, total, message, ...): + """Update progress for an ongoing operation.""" + +async def complete_progress(progress_id, message, ...): + """Mark operation as completed.""" + +async def fail_progress(progress_id, error_message, ...): + """Mark operation as failed.""" +``` + +### 4. DownloadService (Service Layer) + +**Location**: `src/server/services/download_service.py` + +**Responsibilities**: + +- Manage download queue (FIFO processing) +- Track queue state (pending, active, completed, failed) +- Persist queue to disk +- Use ProgressService for queue-related updates + +**Progress Integration**: + +```python +# Queue operations notify via ProgressService +await progress_service.update_progress( + progress_id="download_queue", + message="Added 3 items to queue", + metadata={ + "action": "items_added", + "queue_status": {...} + }, + force_broadcast=True, +) +``` + +**Note**: DownloadService does NOT directly broadcast. Individual download progress flows through: +`SeriesApp → AnimeService → ProgressService → WebSocket` + +### 5. WebSocketService (Service Layer) + +**Location**: `src/server/services/websocket_service.py` + +**Responsibilities**: + +- Manage WebSocket connections +- Support room-based messaging +- Broadcast progress updates to clients +- Handle connection lifecycle + +**Integration**: +WebSocketService subscribes to ProgressService events: + +```python +async def lifespan(app: FastAPI): + # Get services + progress_service = get_progress_service() + ws_service = get_websocket_service() + + # Define event handler + async def progress_event_handler(event) -> None: + """Handle progress events and broadcast via WebSocket.""" + message = { + "type": event.event_type, + "data": event.progress.to_dict(), + } + await ws_service.manager.broadcast_to_room(message, event.room) + + # Subscribe to progress events + progress_service.subscribe("progress_updated", progress_event_handler) +``` + +## Data Flow Examples + +### Example 1: Episode Download + +1. **User triggers download** via API endpoint +2. **DownloadService** queues the download +3. **DownloadService** starts processing → calls `anime_service.download()` +4. **AnimeService** calls `series_app.download()` +5. **SeriesApp** fires `download_status` events: + - `started` → AnimeService → ProgressService → WebSocket → Client + - `progress` (multiple) → AnimeService → ProgressService → WebSocket → Client + - `completed` → AnimeService → ProgressService → WebSocket → Client + +### Example 2: Library Scan + +1. **User triggers scan** via API endpoint +2. **AnimeService** calls `series_app.rescan()` +3. **SeriesApp** fires `scan_status` events: + - `started` → AnimeService → ProgressService → WebSocket → Client + - `progress` (multiple) → AnimeService → ProgressService → WebSocket → Client + - `completed` → AnimeService → ProgressService → WebSocket → Client + +### Example 3: Queue Management + +1. **User adds items to queue** via API endpoint +2. **DownloadService** adds items to internal queue +3. **DownloadService** notifies via ProgressService: + ```python + await progress_service.update_progress( + progress_id="download_queue", + message="Added 5 items to queue", + metadata={"queue_status": {...}}, + force_broadcast=True, + ) + ``` +4. **ProgressService** → WebSocket → Client receives queue update + +## Benefits of This Architecture + +### 1. **Single Source of Truth** + +- All progress tracking goes through ProgressService +- Consistent progress reporting across the application +- Easy to monitor and debug + +### 2. **Decoupling** + +- Core logic (SeriesApp) doesn't know about web layer +- Services can be tested independently +- Easy to add new progress consumers (e.g., CLI, GUI) + +### 3. **Type Safety** + +- Strongly typed progress updates +- Enum-based progress types and statuses +- Clear data contracts + +### 4. **Flexibility** + +- Multiple subscribers can listen to progress events +- Room-based WebSocket messaging +- Metadata support for custom data +- Multiple concurrent progress operations + +### 5. **Maintainability** + +- Clear separation of concerns +- Single place to modify progress logic +- Easy to extend with new progress types or subscribers + +### 6. **Scalability** + +- Event-based architecture supports multiple consumers +- Isolated error handling per subscriber +- No single point of failure + +## Progress IDs + +Progress operations are identified by unique IDs: + +- **Downloads**: `download_{serie_folder}_{season}_{episode}` +- **Scans**: `library_scan` +- **Queue**: `download_queue` + +## WebSocket Messages + +Clients receive progress updates in this format: + +```json +{ + "type": "download_progress", + "data": { + "id": "download_naruto_1_1", + "type": "download", + "status": "in_progress", + "title": "Downloading Naruto", + "message": "S01E01", + "percent": 45.5, + "current": 45, + "total": 100, + "metadata": {}, + "started_at": "2025-11-07T10:00:00Z", + "updated_at": "2025-11-07T10:05:00Z" + } +} +``` + +## Configuration + +### Startup (fastapi_app.py) + +```python +@asynccontextmanager +async def lifespan(app: FastAPI): + # Initialize services + progress_service = get_progress_service() + ws_service = get_websocket_service() + + # Define event handler + async def progress_event_handler(event) -> None: + """Handle progress events and broadcast via WebSocket.""" + message = { + "type": event.event_type, + "data": event.progress.to_dict(), + } + await ws_service.manager.broadcast_to_room(message, event.room) + + # Subscribe to progress events + progress_service.subscribe("progress_updated", progress_event_handler) +``` + +### Service Initialization + +```python +# AnimeService automatically subscribes to SeriesApp events +anime_service = AnimeService(series_app) + +# DownloadService uses ProgressService for queue updates +download_service = DownloadService(anime_service) +``` + +## Migration Notes + +### What Changed + +**Before (Callback-based)**: + +- ProgressService had a single `set_broadcast_callback()` method +- Only one consumer could receive updates +- Direct coupling between ProgressService and WebSocketService + +**After (Event-based)**: + +- ProgressService uses `subscribe()` and `unsubscribe()` methods +- Multiple consumers can subscribe to progress events +- Loose coupling - ProgressService doesn't know about subscribers +- Clean event flow: SeriesApp → AnimeService → ProgressService → Subscribers + +### Removed + +1. **ProgressService**: + + - `set_broadcast_callback()` method + - `_broadcast_callback` attribute + - `_broadcast()` method + +### Added + +1. **ProgressService**: + + - `ProgressEvent` dataclass to encapsulate event data + - `subscribe()` method for event subscription + - `unsubscribe()` method to remove handlers + - `_emit_event()` method for broadcasting to all subscribers + - `_event_handlers` dictionary to track subscribers + +2. **fastapi_app.py**: + - Event handler function `progress_event_handler` + - Uses `subscribe()` instead of `set_broadcast_callback()` + +### Benefits of Event-Based Design + +1. **Multiple Subscribers**: Can now have multiple services listening to progress + + ```python + # WebSocket for real-time updates + progress_service.subscribe("progress_updated", websocket_handler) + # Metrics for analytics + progress_service.subscribe("progress_updated", metrics_handler) + # Logging for debugging + progress_service.subscribe("progress_updated", logging_handler) + ``` + +2. **Isolated Error Handling**: If one subscriber fails, others continue working + +3. **Dynamic Subscription**: Handlers can subscribe/unsubscribe at runtime + +4. **Extensibility**: Easy to add new features without modifying ProgressService + +## Testing + +### Unit Tests + +- Test each service independently +- Mock ProgressService for services that use it +- Verify event handler logic + +### Integration Tests + +- Test full flow: SeriesApp → AnimeService → ProgressService → WebSocket +- Verify progress updates reach clients +- Test error handling + +### Example Test + +```python +async def test_download_progress_flow(): + # Setup + progress_service = ProgressService() + events_received = [] + + async def mock_event_handler(event): + events_received.append(event) + + progress_service.subscribe("progress_updated", mock_event_handler) + + # Execute + await progress_service.start_progress( + progress_id="test_download", + progress_type=ProgressType.DOWNLOAD, + title="Test" + ) + + # Verify + assert len(events_received) == 1 + assert events_received[0].event_type == "download_progress" + assert events_received[0].progress.id == "test_download" +``` + +## Future Enhancements + +1. **Progress Persistence**: Save progress to database for recovery +2. **Progress History**: Keep detailed history for analytics +3. **Rate Limiting**: Throttle progress updates to prevent spam +4. **Progress Aggregation**: Combine multiple progress operations +5. **Custom Rooms**: Allow clients to subscribe to specific progress types + +## Related Documentation + +- [WebSocket API](./websocket_api.md) +- [Download Service](./download_service.md) +- [Error Handling](./error_handling_validation.md) +- [API Implementation](./api_implementation_summary.md) diff --git a/src/server/fastapi_app.py b/src/server/fastapi_app.py index 1f71363..0988629 100644 --- a/src/server/fastapi_app.py +++ b/src/server/fastapi_app.py @@ -66,23 +66,25 @@ async def lifespan(app: FastAPI): ) except Exception as e: logger.warning("Failed to load config from config.json: %s", e) - - - # Initialize progress service with websocket callback + + # Initialize progress service with event subscription progress_service = get_progress_service() ws_service = get_websocket_service() - async def broadcast_callback( - message_type: str, data: dict, room: str - ) -> None: - """Broadcast progress updates via WebSocket.""" + async def progress_event_handler(event) -> None: + """Handle progress events and broadcast via WebSocket. + + Args: + event: ProgressEvent containing progress update data + """ message = { - "type": message_type, - "data": data, + "type": event.event_type, + "data": event.progress.to_dict(), } - await ws_service.manager.broadcast_to_room(message, room) + await ws_service.manager.broadcast_to_room(message, event.room) - progress_service.set_broadcast_callback(broadcast_callback) + # Subscribe to progress events + progress_service.subscribe("progress_updated", progress_event_handler) logger.info("FastAPI application started successfully") logger.info("Server running on http://127.0.0.1:8000") diff --git a/src/server/services/download_service.py b/src/server/services/download_service.py index 9a2b815..3ba466c 100644 --- a/src/server/services/download_service.py +++ b/src/server/services/download_service.py @@ -13,14 +13,13 @@ from collections import deque from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone from pathlib import Path -from typing import Callable, Dict, List, Optional +from typing import Dict, List, Optional import structlog from src.server.models.download import ( DownloadItem, DownloadPriority, - DownloadProgress, DownloadStatus, EpisodeIdentifier, QueueStats, @@ -82,37 +81,33 @@ class DownloadService: # Executor for blocking operations self._executor = ThreadPoolExecutor(max_workers=1) - # WebSocket broadcast callback - self._broadcast_callback: Optional[Callable] = None - # Statistics tracking self._total_downloaded_mb: float = 0.0 self._download_speeds: deque[float] = deque(maxlen=10) - # Subscribe to SeriesApp download events for progress tracking - # Note: Events library uses assignment (=), not += operator - if hasattr(anime_service, '_app') and hasattr( - anime_service._app, 'download_status' - ): - # Save existing handler if any, and chain them - existing_handler = anime_service._app.download_status - if existing_handler: - def chained_handler(args): - existing_handler(args) - self._on_seriesapp_download_status(args) - anime_service._app.download_status = chained_handler - else: - anime_service._app.download_status = ( - self._on_seriesapp_download_status - ) - # Load persisted queue self._load_queue() + + # Initialize queue progress tracking + asyncio.create_task(self._init_queue_progress()) logger.info( "DownloadService initialized", max_retries=max_retries, ) + + async def _init_queue_progress(self) -> None: + """Initialize the download queue progress tracking.""" + try: + from src.server.services.progress_service import ProgressType + await self._progress_service.start_progress( + progress_id="download_queue", + progress_type=ProgressType.QUEUE, + title="Download Queue", + message="Queue ready", + ) + except Exception as e: + logger.error("Failed to initialize queue progress", error=str(e)) def _add_to_pending_queue( self, item: DownloadItem, front: bool = False @@ -154,91 +149,6 @@ class DownloadService: except (ValueError, KeyError): return None - def set_broadcast_callback(self, callback: Callable) -> None: - """Set callback for broadcasting status updates via WebSocket.""" - self._broadcast_callback = callback - logger.debug("Broadcast callback registered") - - def _on_seriesapp_download_status(self, args) -> None: - """Handle download status events from SeriesApp. - - Updates the active download item with progress information. - - Args: - args: DownloadStatusEventArgs from SeriesApp - """ - try: - # Only process if we have an active download - if not self._active_download: - return - - # Match the event to the active download item - # SeriesApp events include serie_folder, season, episode - if ( - self._active_download.serie_folder == args.serie_folder - and self._active_download.episode.season == args.season - and self._active_download.episode.episode == args.episode - ): - if args.status == "progress": - # Update item progress - self._active_download.progress = DownloadProgress( - percent=args.progress, - downloaded_mb=( - args.progress * args.mbper_sec / 100 - if args.mbper_sec - else 0.0 - ), - total_mb=None, # Not provided by SeriesApp - speed_mbps=args.mbper_sec, - eta_seconds=args.eta, - ) - - # Track speed - if args.mbper_sec: - self._download_speeds.append(args.mbper_sec) - - # Broadcast update - asyncio.create_task( - self._broadcast_update( - "download_progress", - { - "download_id": self._active_download.id, - "item_id": self._active_download.id, - "serie_name": self._active_download.serie_name, - "season": args.season, - "episode": args.episode, - "progress": ( - self._active_download.progress.model_dump( - mode="json" - ) - ), - }, - ) - ) - - except Exception as exc: - logger.error( - "Error handling SeriesApp download status", - error=str(exc) - ) - - async def _broadcast_update(self, update_type: str, data: dict) -> None: - """Broadcast update to connected WebSocket clients. - - Args: - update_type: Type of update (download_progress, queue_status, etc.) - data: Update data to broadcast - """ - if self._broadcast_callback: - try: - await self._broadcast_callback(update_type, data) - except Exception as e: - logger.error( - "Failed to broadcast update", - update_type=update_type, - error=str(e), - ) - def _generate_item_id(self) -> str: """Generate unique identifier for download items.""" return str(uuid.uuid4()) @@ -359,15 +269,17 @@ class DownloadService: self._save_queue() - # Broadcast queue status update + # Notify via progress service queue_status = await self.get_queue_status() - await self._broadcast_update( - "queue_status", - { + await self._progress_service.update_progress( + progress_id="download_queue", + message=f"Added {len(created_ids)} items to queue", + metadata={ "action": "items_added", "added_ids": created_ids, "queue_status": queue_status.model_dump(mode="json"), }, + force_broadcast=True, ) return created_ids @@ -416,15 +328,17 @@ class DownloadService: if removed_ids: self._save_queue() - # Broadcast queue status update + # Notify via progress service queue_status = await self.get_queue_status() - await self._broadcast_update( - "queue_status", - { + await self._progress_service.update_progress( + progress_id="download_queue", + message=f"Removed {len(removed_ids)} items from queue", + metadata={ "action": "items_removed", "removed_ids": removed_ids, "queue_status": queue_status.model_dump(mode="json"), }, + force_broadcast=True, ) return removed_ids @@ -498,17 +412,24 @@ class DownloadService: remaining=len(self._pending_queue) ) - # Broadcast queue status update + # Notify via progress service queue_status = await self.get_queue_status() - await self._broadcast_update( - "download_started", - { + msg = ( + f"Started: {item.serie_name} " + f"S{item.episode.season:02d}E{item.episode.episode:02d}" + ) + await self._progress_service.update_progress( + progress_id="download_queue", + message=msg, + metadata={ + "action": "download_started", "item_id": item.id, "serie_name": item.serie_name, "season": item.episode.season, "episode": item.episode.episode, "queue_status": queue_status.model_dump(mode="json"), }, + force_broadcast=True, ) # Process the download (this will wait until complete) @@ -532,11 +453,11 @@ class DownloadService: if len(self._pending_queue) == 0: logger.info("Queue processing completed - all items processed") queue_status = await self.get_queue_status() - await self._broadcast_update( - "queue_completed", - { - "message": "All downloads completed", - "queue_status": queue_status.model_dump(mode="json"), + await self._progress_service.complete_progress( + progress_id="download_queue", + message="All downloads completed", + metadata={ + "queue_status": queue_status.model_dump(mode="json") }, ) else: @@ -561,14 +482,17 @@ class DownloadService: self._is_stopped = True logger.info("Download processing stopped") - # Broadcast queue status update + # Notify via progress service queue_status = await self.get_queue_status() - await self._broadcast_update( - "queue_stopped", - { + await self._progress_service.update_progress( + progress_id="download_queue", + message="Queue processing stopped", + metadata={ + "action": "queue_stopped", "is_stopped": True, "queue_status": queue_status.model_dump(mode="json"), }, + force_broadcast=True, ) async def get_queue_status(self) -> QueueStatus: @@ -638,16 +562,18 @@ class DownloadService: self._completed_items.clear() logger.info("Cleared completed items", count=count) - # Broadcast queue status update + # Notify via progress service if count > 0: queue_status = await self.get_queue_status() - await self._broadcast_update( - "queue_status", - { + await self._progress_service.update_progress( + progress_id="download_queue", + message=f"Cleared {count} completed items", + metadata={ "action": "completed_cleared", "cleared_count": count, "queue_status": queue_status.model_dump(mode="json"), }, + force_broadcast=True, ) return count @@ -662,16 +588,18 @@ class DownloadService: self._failed_items.clear() logger.info("Cleared failed items", count=count) - # Broadcast queue status update + # Notify via progress service if count > 0: queue_status = await self.get_queue_status() - await self._broadcast_update( - "queue_status", - { + await self._progress_service.update_progress( + progress_id="download_queue", + message=f"Cleared {count} failed items", + metadata={ "action": "failed_cleared", "cleared_count": count, "queue_status": queue_status.model_dump(mode="json"), }, + force_broadcast=True, ) return count @@ -690,16 +618,18 @@ class DownloadService: # Save queue state self._save_queue() - # Broadcast queue status update + # Notify via progress service if count > 0: queue_status = await self.get_queue_status() - await self._broadcast_update( - "queue_status", - { + await self._progress_service.update_progress( + progress_id="download_queue", + message=f"Cleared {count} pending items", + metadata={ "action": "pending_cleared", "cleared_count": count, "queue_status": queue_status.model_dump(mode="json"), }, + force_broadcast=True, ) return count @@ -746,15 +676,17 @@ class DownloadService: if retried_ids: self._save_queue() - # Broadcast queue status update + # Notify via progress service queue_status = await self.get_queue_status() - await self._broadcast_update( - "queue_status", - { + await self._progress_service.update_progress( + progress_id="download_queue", + message=f"Retried {len(retried_ids)} failed items", + metadata={ "action": "items_retried", "retried_ids": retried_ids, "queue_status": queue_status.model_dump(mode="json"), }, + force_broadcast=True, ) return retried_ids @@ -786,10 +718,10 @@ class DownloadService: ) # Execute download via anime service - # Note: AnimeService handles progress via SeriesApp events - # Progress updates received via _on_seriesapp_download_status - # Use serie_folder if available, otherwise fall back to serie_id - # for backwards compatibility with old queue items + # AnimeService handles ALL progress via SeriesApp events: + # - download started/progress/completed/failed events + # - All updates forwarded to ProgressService + # - ProgressService broadcasts to WebSocket clients folder = item.serie_folder if item.serie_folder else item.serie_id success = await self._anime_service.download( serie_folder=folder, @@ -812,21 +744,6 @@ class DownloadService: logger.info( "Download completed successfully", item_id=item.id ) - - # Broadcast completion (progress already handled by events) - await self._broadcast_update( - "download_complete", - { - "download_id": item.id, - "item_id": item.id, - "serie_name": item.serie_name, - "season": item.episode.season, - "episode": item.episode.episode, - "downloaded_mb": item.progress.downloaded_mb - if item.progress - else 0, - }, - ) else: raise AnimeServiceError("Download returned False") @@ -843,20 +760,8 @@ class DownloadService: error=str(e), retry_count=item.retry_count, ) - - # Broadcast failure (progress already handled by events) - await self._broadcast_update( - "download_failed", - { - "download_id": item.id, - "item_id": item.id, - "serie_name": item.serie_name, - "season": item.episode.season, - "episode": item.episode.episode, - "error": item.error, - "retry_count": item.retry_count, - }, - ) + # Note: Failure is already broadcast by AnimeService + # via ProgressService when SeriesApp fires failed event finally: # Remove from active downloads diff --git a/src/server/services/progress_service.py b/src/server/services/progress_service.py index 03cc7f3..9e80bd3 100644 --- a/src/server/services/progress_service.py +++ b/src/server/services/progress_service.py @@ -11,7 +11,7 @@ import asyncio from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum -from typing import Any, Callable, Dict, Optional +from typing import Any, Callable, Dict, List, Optional import structlog @@ -85,6 +85,30 @@ class ProgressUpdate: } +@dataclass +class ProgressEvent: + """Represents a progress event for subscribers. + + Attributes: + event_type: Type of event (e.g., 'download_progress') + progress_id: Unique identifier for the progress operation + progress: The progress update data + room: WebSocket room to broadcast to (default: 'progress') + """ + + event_type: str + progress_id: str + progress: ProgressUpdate + room: str = "progress" + + def to_dict(self) -> Dict[str, Any]: + """Convert event to dictionary for broadcasting.""" + return { + "type": self.event_type, + "data": self.progress.to_dict(), + } + + class ProgressServiceError(Exception): """Service-level exception for progress operations.""" @@ -109,44 +133,82 @@ class ProgressService: self._history: Dict[str, ProgressUpdate] = {} self._max_history_size = 50 - # WebSocket broadcast callback - self._broadcast_callback: Optional[Callable] = None + # Event subscribers: event_name -> list of handlers + self._event_handlers: Dict[ + str, List[Callable[[ProgressEvent], None]] + ] = {} # Lock for thread-safe operations self._lock = asyncio.Lock() logger.info("ProgressService initialized") - def set_broadcast_callback(self, callback: Callable) -> None: - """Set callback for broadcasting progress updates via WebSocket. + def subscribe( + self, event_name: str, handler: Callable[[ProgressEvent], None] + ) -> None: + """Subscribe to progress events. Args: - callback: Async function to call for broadcasting updates + event_name: Name of event to subscribe to + (e.g., 'progress_updated') + handler: Async function to call when event occurs """ - self._broadcast_callback = callback - logger.debug("Progress broadcast callback registered") + if event_name not in self._event_handlers: + self._event_handlers[event_name] = [] - async def _broadcast(self, update: ProgressUpdate, room: str) -> None: - """Broadcast progress update to WebSocket clients. + self._event_handlers[event_name].append(handler) + logger.debug("Event handler subscribed", event=event_name) + + def unsubscribe( + self, event_name: str, handler: Callable[[ProgressEvent], None] + ) -> None: + """Unsubscribe from progress events. Args: - update: Progress update to broadcast - room: WebSocket room to broadcast to + event_name: Name of event to unsubscribe from + handler: Handler function to remove """ - if self._broadcast_callback: + if event_name in self._event_handlers: try: - await self._broadcast_callback( - message_type=f"{update.type.value}_progress", - data=update.to_dict(), - room=room, - ) - except Exception as e: - logger.error( - "Failed to broadcast progress update", - error=str(e), - progress_id=update.id, + self._event_handlers[event_name].remove(handler) + logger.debug("Event handler unsubscribed", event=event_name) + except ValueError: + logger.warning( + "Handler not found for unsubscribe", event=event_name ) + async def _emit_event(self, event: ProgressEvent) -> None: + """Emit event to all subscribers. + + Args: + event: Progress event to emit + + Note: + Errors in individual handlers are logged but do not + prevent other handlers from executing. + """ + event_name = "progress_updated" + + if event_name in self._event_handlers: + handlers = self._event_handlers[event_name] + if handlers: + # Execute all handlers, capturing exceptions + tasks = [handler(event) for handler in handlers] + # Ignore type error - tasks will be coroutines at runtime + results = await asyncio.gather( + *tasks, return_exceptions=True + ) # type: ignore[arg-type] + + # Log any exceptions that occurred + for idx, result in enumerate(results): + if isinstance(result, Exception): + logger.error( + "Event handler raised exception", + event=event_name, + error=str(result), + handler_index=idx, + ) + async def start_progress( self, progress_id: str, @@ -197,9 +259,15 @@ class ProgressService: title=title, ) - # Broadcast to appropriate room + # Emit event to subscribers room = f"{progress_type.value}_progress" - await self._broadcast(update, room) + event = ProgressEvent( + event_type=f"{progress_type.value}_progress", + progress_id=progress_id, + progress=update, + room=room, + ) + await self._emit_event(event) return update @@ -262,7 +330,13 @@ class ProgressService: if should_broadcast: room = f"{update.type.value}_progress" - await self._broadcast(update, room) + event = ProgressEvent( + event_type=f"{update.type.value}_progress", + progress_id=progress_id, + progress=update, + room=room, + ) + await self._emit_event(event) return update @@ -311,9 +385,15 @@ class ProgressService: type=update.type.value, ) - # Broadcast completion + # Emit completion event room = f"{update.type.value}_progress" - await self._broadcast(update, room) + event = ProgressEvent( + event_type=f"{update.type.value}_progress", + progress_id=progress_id, + progress=update, + room=room, + ) + await self._emit_event(event) return update @@ -361,9 +441,15 @@ class ProgressService: error=error_message, ) - # Broadcast failure + # Emit failure event room = f"{update.type.value}_progress" - await self._broadcast(update, room) + event = ProgressEvent( + event_type=f"{update.type.value}_progress", + progress_id=progress_id, + progress=update, + room=room, + ) + await self._emit_event(event) return update @@ -405,9 +491,15 @@ class ProgressService: type=update.type.value, ) - # Broadcast cancellation + # Emit cancellation event room = f"{update.type.value}_progress" - await self._broadcast(update, room) + event = ProgressEvent( + event_type=f"{update.type.value}_progress", + progress_id=progress_id, + progress=update, + room=room, + ) + await self._emit_event(event) return update diff --git a/src/server/utils/dependencies.py b/src/server/utils/dependencies.py index b417767..086e774 100644 --- a/src/server/utils/dependencies.py +++ b/src/server/utils/dependencies.py @@ -384,39 +384,14 @@ def get_download_service() -> "DownloadService": if _download_service is None: try: - from src.server.services import ( - websocket_service as websocket_service_module, - ) from src.server.services.download_service import DownloadService anime_service = get_anime_service() _download_service = DownloadService(anime_service) - - ws_service = websocket_service_module.get_websocket_service() - - async def broadcast_callback(update_type: str, data: dict) -> None: - """Broadcast download updates via WebSocket.""" - if update_type == "download_progress": - await ws_service.broadcast_download_progress( - data.get("download_id", ""), - data, - ) - elif update_type == "download_complete": - await ws_service.broadcast_download_complete( - data.get("download_id", ""), - data, - ) - elif update_type == "download_failed": - await ws_service.broadcast_download_failed( - data.get("download_id", ""), - data, - ) - elif update_type == "queue_status": - await ws_service.broadcast_queue_status(data) - else: - await ws_service.broadcast_queue_status(data) - - _download_service.set_broadcast_callback(broadcast_callback) + + # Note: DownloadService no longer needs broadcast callbacks. + # Progress updates flow through: + # SeriesApp → AnimeService → ProgressService → WebSocketService except HTTPException: raise