13 KiB
Progress Service Architecture
Overview
The ProgressService serves as the single source of truth for all real-time progress tracking in the Aniworld application. This architecture follows a clean, decoupled design where progress updates flow through a well-defined pipeline.
Architecture Diagram
┌─────────────┐
│ SeriesApp │ ← Core download/scan logic
└──────┬──────┘
│ Events (download_status, scan_status)
▼
┌─────────────────┐
│ AnimeService │ ← Subscribes to SeriesApp events
└────────┬────────┘
│ Forwards events
▼
┌──────────────────┐
│ ProgressService │ ← Single source of truth for progress
└────────┬─────────┘
│ Emits events to subscribers
▼
┌──────────────────┐
│ WebSocketService │ ← Subscribes to progress events
└──────────────────┘
│
▼
Connected clients receive real-time updates
Components
1. SeriesApp (Core Layer)
Location: src/core/SeriesApp.py
Responsibilities:
- Execute actual downloads and scans
- Fire events with detailed progress information
- Manage download state and error handling
Events:
-
download_status: Fired during downloadsstarted: Download beginsprogress: Progress updates (percent, speed, ETA)completed: Download finished successfullyfailed: Download encountered an error
-
scan_status: Fired during library scansstarted: Scan beginsprogress: Scan progress updatescompleted: Scan finishedfailed: Scan encountered an errorcancelled: Scan was cancelled
2. AnimeService (Service Layer)
Location: src/server/services/anime_service.py
Responsibilities:
- Subscribe to SeriesApp events
- Translate SeriesApp events into ProgressService updates
- Provide async interface for web layer
Event Handlers:
def _on_download_status(self, args):
"""Translates download events to progress service."""
if args.status == "started":
await progress_service.start_progress(...)
elif args.status == "progress":
await progress_service.update_progress(...)
elif args.status == "completed":
await progress_service.complete_progress(...)
elif args.status == "failed":
await progress_service.fail_progress(...)
def _on_scan_status(self, args):
"""Translates scan events to progress service."""
# Similar pattern as download_status
3. ProgressService (Service Layer)
Location: src/server/services/progress_service.py
Responsibilities:
- Central progress tracking for all operations
- Maintain active and historical progress records
- Calculate percentages and rates
- Emit events to subscribers (event-based architecture)
Progress Types:
DOWNLOAD: Individual episode downloadsSCAN: Library scans for missing episodesQUEUE: Download queue operationsSYSTEM: System-level operationsERROR: Error notifications
Event System:
# Subscribe to progress events
def subscribe(event_name: str, handler: Callable[[ProgressEvent], None])
def unsubscribe(event_name: str, handler: Callable[[ProgressEvent], None])
# Internal event emission
async def _emit_event(event: ProgressEvent)
Key Methods:
async def start_progress(progress_id, progress_type, title, ...):
"""Start tracking a new operation."""
async def update_progress(progress_id, current, total, message, ...):
"""Update progress for an ongoing operation."""
async def complete_progress(progress_id, message, ...):
"""Mark operation as completed."""
async def fail_progress(progress_id, error_message, ...):
"""Mark operation as failed."""
4. DownloadService (Service Layer)
Location: src/server/services/download_service.py
Responsibilities:
- Manage download queue (FIFO processing)
- Track queue state (pending, active, completed, failed)
- Persist queue to disk
- Use ProgressService for queue-related updates
Progress Integration:
# Queue operations notify via ProgressService
await progress_service.update_progress(
progress_id="download_queue",
message="Added 3 items to queue",
metadata={
"action": "items_added",
"queue_status": {...}
},
force_broadcast=True,
)
Note: DownloadService does NOT directly broadcast. Individual download progress flows through:
SeriesApp → AnimeService → ProgressService → WebSocket
5. WebSocketService (Service Layer)
Location: src/server/services/websocket_service.py
Responsibilities:
- Manage WebSocket connections
- Support room-based messaging
- Broadcast progress updates to clients
- Handle connection lifecycle
Integration: WebSocketService subscribes to ProgressService events:
async def lifespan(app: FastAPI):
# Get services
progress_service = get_progress_service()
ws_service = get_websocket_service()
# Define event handler
async def progress_event_handler(event) -> None:
"""Handle progress events and broadcast via WebSocket."""
message = {
"type": event.event_type,
"data": event.progress.to_dict(),
}
await ws_service.manager.broadcast_to_room(message, event.room)
# Subscribe to progress events
progress_service.subscribe("progress_updated", progress_event_handler)
Data Flow Examples
Example 1: Episode Download
- User triggers download via API endpoint
- DownloadService queues the download
- DownloadService starts processing → calls
anime_service.download() - AnimeService calls
series_app.download() - SeriesApp fires
download_statusevents:started→ AnimeService → ProgressService → WebSocket → Clientprogress(multiple) → AnimeService → ProgressService → WebSocket → Clientcompleted→ AnimeService → ProgressService → WebSocket → Client
Example 2: Library Scan
- User triggers scan via API endpoint
- AnimeService calls
series_app.rescan() - SeriesApp fires
scan_statusevents:started→ AnimeService → ProgressService → WebSocket → Clientprogress(multiple) → AnimeService → ProgressService → WebSocket → Clientcompleted→ AnimeService → ProgressService → WebSocket → Client
Example 3: Queue Management
- User adds items to queue via API endpoint
- DownloadService adds items to internal queue
- DownloadService notifies via ProgressService:
await progress_service.update_progress( progress_id="download_queue", message="Added 5 items to queue", metadata={"queue_status": {...}}, force_broadcast=True, ) - ProgressService → WebSocket → Client receives queue update
Benefits of This Architecture
1. Single Source of Truth
- All progress tracking goes through ProgressService
- Consistent progress reporting across the application
- Easy to monitor and debug
2. Decoupling
- Core logic (SeriesApp) doesn't know about web layer
- Services can be tested independently
- Easy to add new progress consumers (e.g., CLI, GUI)
3. Type Safety
- Strongly typed progress updates
- Enum-based progress types and statuses
- Clear data contracts
4. Flexibility
- Multiple subscribers can listen to progress events
- Room-based WebSocket messaging
- Metadata support for custom data
- Multiple concurrent progress operations
5. Maintainability
- Clear separation of concerns
- Single place to modify progress logic
- Easy to extend with new progress types or subscribers
6. Scalability
- Event-based architecture supports multiple consumers
- Isolated error handling per subscriber
- No single point of failure
Progress IDs
Progress operations are identified by unique IDs:
- Downloads:
download_{serie_folder}_{season}_{episode} - Scans:
library_scan - Queue:
download_queue
WebSocket Messages
Clients receive progress updates in this format:
{
"type": "download_progress",
"data": {
"id": "download_naruto_1_1",
"type": "download",
"status": "in_progress",
"title": "Downloading Naruto",
"message": "S01E01",
"percent": 45.5,
"current": 45,
"total": 100,
"metadata": {},
"started_at": "2025-11-07T10:00:00Z",
"updated_at": "2025-11-07T10:05:00Z"
}
}
Configuration
Startup (fastapi_app.py)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize services
progress_service = get_progress_service()
ws_service = get_websocket_service()
# Define event handler
async def progress_event_handler(event) -> None:
"""Handle progress events and broadcast via WebSocket."""
message = {
"type": event.event_type,
"data": event.progress.to_dict(),
}
await ws_service.manager.broadcast_to_room(message, event.room)
# Subscribe to progress events
progress_service.subscribe("progress_updated", progress_event_handler)
Service Initialization
# AnimeService automatically subscribes to SeriesApp events
anime_service = AnimeService(series_app)
# DownloadService uses ProgressService for queue updates
download_service = DownloadService(anime_service)
Migration Notes
What Changed
Before (Callback-based):
- ProgressService had a single
set_broadcast_callback()method - Only one consumer could receive updates
- Direct coupling between ProgressService and WebSocketService
After (Event-based):
- ProgressService uses
subscribe()andunsubscribe()methods - Multiple consumers can subscribe to progress events
- Loose coupling - ProgressService doesn't know about subscribers
- Clean event flow: SeriesApp → AnimeService → ProgressService → Subscribers
Removed
-
ProgressService:
set_broadcast_callback()method_broadcast_callbackattribute_broadcast()method
Added
-
ProgressService:
ProgressEventdataclass to encapsulate event datasubscribe()method for event subscriptionunsubscribe()method to remove handlers_emit_event()method for broadcasting to all subscribers_event_handlersdictionary to track subscribers
-
fastapi_app.py:
- Event handler function
progress_event_handler - Uses
subscribe()instead ofset_broadcast_callback()
- Event handler function
Benefits of Event-Based Design
-
Multiple Subscribers: Can now have multiple services listening to progress
# WebSocket for real-time updates progress_service.subscribe("progress_updated", websocket_handler) # Metrics for analytics progress_service.subscribe("progress_updated", metrics_handler) # Logging for debugging progress_service.subscribe("progress_updated", logging_handler) -
Isolated Error Handling: If one subscriber fails, others continue working
-
Dynamic Subscription: Handlers can subscribe/unsubscribe at runtime
-
Extensibility: Easy to add new features without modifying ProgressService
Testing
Unit Tests
- Test each service independently
- Mock ProgressService for services that use it
- Verify event handler logic
Integration Tests
- Test full flow: SeriesApp → AnimeService → ProgressService → WebSocket
- Verify progress updates reach clients
- Test error handling
Example Test
async def test_download_progress_flow():
# Setup
progress_service = ProgressService()
events_received = []
async def mock_event_handler(event):
events_received.append(event)
progress_service.subscribe("progress_updated", mock_event_handler)
# Execute
await progress_service.start_progress(
progress_id="test_download",
progress_type=ProgressType.DOWNLOAD,
title="Test"
)
# Verify
assert len(events_received) == 1
assert events_received[0].event_type == "download_progress"
assert events_received[0].progress.id == "test_download"
Future Enhancements
- Progress Persistence: Save progress to database for recovery
- Progress History: Keep detailed history for analytics
- Rate Limiting: Throttle progress updates to prevent spam
- Progress Aggregation: Combine multiple progress operations
- Custom Rooms: Allow clients to subscribe to specific progress types