Aniworld/docs/progress_service_architecture.md
2025-11-07 18:40:36 +01:00

13 KiB

Progress Service Architecture

Overview

The ProgressService serves as the single source of truth for all real-time progress tracking in the Aniworld application. This architecture follows a clean, decoupled design where progress updates flow through a well-defined pipeline.

Architecture Diagram

┌─────────────┐
│  SeriesApp  │ ← Core download/scan logic
└──────┬──────┘
       │ Events (download_status, scan_status)
       ▼
┌─────────────────┐
│  AnimeService   │ ← Subscribes to SeriesApp events
└────────┬────────┘
         │ Forwards events
         ▼
┌──────────────────┐
│ ProgressService  │ ← Single source of truth for progress
└────────┬─────────┘
         │ Emits events to subscribers
         ▼
┌──────────────────┐
│ WebSocketService │ ← Subscribes to progress events
└──────────────────┘
         │
         ▼
    Connected clients receive real-time updates

Components

1. SeriesApp (Core Layer)

Location: src/core/SeriesApp.py

Responsibilities:

  • Execute actual downloads and scans
  • Fire events with detailed progress information
  • Manage download state and error handling

Events:

  • download_status: Fired during downloads

    • started: Download begins
    • progress: Progress updates (percent, speed, ETA)
    • completed: Download finished successfully
    • failed: Download encountered an error
  • scan_status: Fired during library scans

    • started: Scan begins
    • progress: Scan progress updates
    • completed: Scan finished
    • failed: Scan encountered an error
    • cancelled: Scan was cancelled

2. AnimeService (Service Layer)

Location: src/server/services/anime_service.py

Responsibilities:

  • Subscribe to SeriesApp events
  • Translate SeriesApp events into ProgressService updates
  • Provide async interface for web layer

Event Handlers:

def _on_download_status(self, args):
    """Translates download events to progress service."""
    if args.status == "started":
        await progress_service.start_progress(...)
    elif args.status == "progress":
        await progress_service.update_progress(...)
    elif args.status == "completed":
        await progress_service.complete_progress(...)
    elif args.status == "failed":
        await progress_service.fail_progress(...)

def _on_scan_status(self, args):
    """Translates scan events to progress service."""
    # Similar pattern as download_status

3. ProgressService (Service Layer)

Location: src/server/services/progress_service.py

Responsibilities:

  • Central progress tracking for all operations
  • Maintain active and historical progress records
  • Calculate percentages and rates
  • Emit events to subscribers (event-based architecture)

Progress Types:

  • DOWNLOAD: Individual episode downloads
  • SCAN: Library scans for missing episodes
  • QUEUE: Download queue operations
  • SYSTEM: System-level operations
  • ERROR: Error notifications

Event System:

# Subscribe to progress events
def subscribe(event_name: str, handler: Callable[[ProgressEvent], None])
def unsubscribe(event_name: str, handler: Callable[[ProgressEvent], None])

# Internal event emission
async def _emit_event(event: ProgressEvent)

Key Methods:

async def start_progress(progress_id, progress_type, title, ...):
    """Start tracking a new operation."""

async def update_progress(progress_id, current, total, message, ...):
    """Update progress for an ongoing operation."""

async def complete_progress(progress_id, message, ...):
    """Mark operation as completed."""

async def fail_progress(progress_id, error_message, ...):
    """Mark operation as failed."""

4. DownloadService (Service Layer)

Location: src/server/services/download_service.py

Responsibilities:

  • Manage download queue (FIFO processing)
  • Track queue state (pending, active, completed, failed)
  • Persist queue to disk
  • Use ProgressService for queue-related updates

Progress Integration:

# Queue operations notify via ProgressService
await progress_service.update_progress(
    progress_id="download_queue",
    message="Added 3 items to queue",
    metadata={
        "action": "items_added",
        "queue_status": {...}
    },
    force_broadcast=True,
)

Note: DownloadService does NOT directly broadcast. Individual download progress flows through: SeriesApp → AnimeService → ProgressService → WebSocket

5. WebSocketService (Service Layer)

Location: src/server/services/websocket_service.py

Responsibilities:

  • Manage WebSocket connections
  • Support room-based messaging
  • Broadcast progress updates to clients
  • Handle connection lifecycle

Integration: WebSocketService subscribes to ProgressService events:

async def lifespan(app: FastAPI):
    # Get services
    progress_service = get_progress_service()
    ws_service = get_websocket_service()

    # Define event handler
    async def progress_event_handler(event) -> None:
        """Handle progress events and broadcast via WebSocket."""
        message = {
            "type": event.event_type,
            "data": event.progress.to_dict(),
        }
        await ws_service.manager.broadcast_to_room(message, event.room)

    # Subscribe to progress events
    progress_service.subscribe("progress_updated", progress_event_handler)

Data Flow Examples

Example 1: Episode Download

  1. User triggers download via API endpoint
  2. DownloadService queues the download
  3. DownloadService starts processing → calls anime_service.download()
  4. AnimeService calls series_app.download()
  5. SeriesApp fires download_status events:
    • started → AnimeService → ProgressService → WebSocket → Client
    • progress (multiple) → AnimeService → ProgressService → WebSocket → Client
    • completed → AnimeService → ProgressService → WebSocket → Client

Example 2: Library Scan

  1. User triggers scan via API endpoint
  2. AnimeService calls series_app.rescan()
  3. SeriesApp fires scan_status events:
    • started → AnimeService → ProgressService → WebSocket → Client
    • progress (multiple) → AnimeService → ProgressService → WebSocket → Client
    • completed → AnimeService → ProgressService → WebSocket → Client

Example 3: Queue Management

  1. User adds items to queue via API endpoint
  2. DownloadService adds items to internal queue
  3. DownloadService notifies via ProgressService:
    await progress_service.update_progress(
        progress_id="download_queue",
        message="Added 5 items to queue",
        metadata={"queue_status": {...}},
        force_broadcast=True,
    )
    
  4. ProgressService → WebSocket → Client receives queue update

Benefits of This Architecture

1. Single Source of Truth

  • All progress tracking goes through ProgressService
  • Consistent progress reporting across the application
  • Easy to monitor and debug

2. Decoupling

  • Core logic (SeriesApp) doesn't know about web layer
  • Services can be tested independently
  • Easy to add new progress consumers (e.g., CLI, GUI)

3. Type Safety

  • Strongly typed progress updates
  • Enum-based progress types and statuses
  • Clear data contracts

4. Flexibility

  • Multiple subscribers can listen to progress events
  • Room-based WebSocket messaging
  • Metadata support for custom data
  • Multiple concurrent progress operations

5. Maintainability

  • Clear separation of concerns
  • Single place to modify progress logic
  • Easy to extend with new progress types or subscribers

6. Scalability

  • Event-based architecture supports multiple consumers
  • Isolated error handling per subscriber
  • No single point of failure

Progress IDs

Progress operations are identified by unique IDs:

  • Downloads: download_{serie_folder}_{season}_{episode}
  • Scans: library_scan
  • Queue: download_queue

WebSocket Messages

Clients receive progress updates in this format:

{
    "type": "download_progress",
    "data": {
        "id": "download_naruto_1_1",
        "type": "download",
        "status": "in_progress",
        "title": "Downloading Naruto",
        "message": "S01E01",
        "percent": 45.5,
        "current": 45,
        "total": 100,
        "metadata": {},
        "started_at": "2025-11-07T10:00:00Z",
        "updated_at": "2025-11-07T10:05:00Z"
    }
}

Configuration

Startup (fastapi_app.py)

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize services
    progress_service = get_progress_service()
    ws_service = get_websocket_service()

    # Define event handler
    async def progress_event_handler(event) -> None:
        """Handle progress events and broadcast via WebSocket."""
        message = {
            "type": event.event_type,
            "data": event.progress.to_dict(),
        }
        await ws_service.manager.broadcast_to_room(message, event.room)

    # Subscribe to progress events
    progress_service.subscribe("progress_updated", progress_event_handler)

Service Initialization

# AnimeService automatically subscribes to SeriesApp events
anime_service = AnimeService(series_app)

# DownloadService uses ProgressService for queue updates
download_service = DownloadService(anime_service)

Migration Notes

What Changed

Before (Callback-based):

  • ProgressService had a single set_broadcast_callback() method
  • Only one consumer could receive updates
  • Direct coupling between ProgressService and WebSocketService

After (Event-based):

  • ProgressService uses subscribe() and unsubscribe() methods
  • Multiple consumers can subscribe to progress events
  • Loose coupling - ProgressService doesn't know about subscribers
  • Clean event flow: SeriesApp → AnimeService → ProgressService → Subscribers

Removed

  1. ProgressService:

    • set_broadcast_callback() method
    • _broadcast_callback attribute
    • _broadcast() method

Added

  1. ProgressService:

    • ProgressEvent dataclass to encapsulate event data
    • subscribe() method for event subscription
    • unsubscribe() method to remove handlers
    • _emit_event() method for broadcasting to all subscribers
    • _event_handlers dictionary to track subscribers
  2. fastapi_app.py:

    • Event handler function progress_event_handler
    • Uses subscribe() instead of set_broadcast_callback()

Benefits of Event-Based Design

  1. Multiple Subscribers: Can now have multiple services listening to progress

    # WebSocket for real-time updates
    progress_service.subscribe("progress_updated", websocket_handler)
    # Metrics for analytics
    progress_service.subscribe("progress_updated", metrics_handler)
    # Logging for debugging
    progress_service.subscribe("progress_updated", logging_handler)
    
  2. Isolated Error Handling: If one subscriber fails, others continue working

  3. Dynamic Subscription: Handlers can subscribe/unsubscribe at runtime

  4. Extensibility: Easy to add new features without modifying ProgressService

Testing

Unit Tests

  • Test each service independently
  • Mock ProgressService for services that use it
  • Verify event handler logic

Integration Tests

  • Test full flow: SeriesApp → AnimeService → ProgressService → WebSocket
  • Verify progress updates reach clients
  • Test error handling

Example Test

async def test_download_progress_flow():
    # Setup
    progress_service = ProgressService()
    events_received = []

    async def mock_event_handler(event):
        events_received.append(event)

    progress_service.subscribe("progress_updated", mock_event_handler)

    # Execute
    await progress_service.start_progress(
        progress_id="test_download",
        progress_type=ProgressType.DOWNLOAD,
        title="Test"
    )

    # Verify
    assert len(events_received) == 1
    assert events_received[0].event_type == "download_progress"
    assert events_received[0].progress.id == "test_download"

Future Enhancements

  1. Progress Persistence: Save progress to database for recovery
  2. Progress History: Keep detailed history for analytics
  3. Rate Limiting: Throttle progress updates to prevent spam
  4. Progress Aggregation: Combine multiple progress operations
  5. Custom Rooms: Allow clients to subscribe to specific progress types