451 lines
13 KiB
Markdown
451 lines
13 KiB
Markdown
# Progress Service Architecture
|
|
|
|
## Overview
|
|
|
|
The ProgressService serves as the **single source of truth** for all real-time progress tracking in the Aniworld application. This architecture follows a clean, decoupled design where progress updates flow through a well-defined pipeline.
|
|
|
|
## Architecture Diagram
|
|
|
|
```
|
|
┌─────────────┐
|
|
│ SeriesApp │ ← Core download/scan logic
|
|
└──────┬──────┘
|
|
│ Events (download_status, scan_status)
|
|
▼
|
|
┌─────────────────┐
|
|
│ AnimeService │ ← Subscribes to SeriesApp events
|
|
└────────┬────────┘
|
|
│ Forwards events
|
|
▼
|
|
┌──────────────────┐
|
|
│ ProgressService │ ← Single source of truth for progress
|
|
└────────┬─────────┘
|
|
│ Emits events to subscribers
|
|
▼
|
|
┌──────────────────┐
|
|
│ WebSocketService │ ← Subscribes to progress events
|
|
└──────────────────┘
|
|
│
|
|
▼
|
|
Connected clients receive real-time updates
|
|
```
|
|
|
|
## Components
|
|
|
|
### 1. SeriesApp (Core Layer)
|
|
|
|
**Location**: `src/core/SeriesApp.py`
|
|
|
|
**Responsibilities**:
|
|
|
|
- Execute actual downloads and scans
|
|
- Fire events with detailed progress information
|
|
- Manage download state and error handling
|
|
|
|
**Events**:
|
|
|
|
- `download_status`: Fired during downloads
|
|
|
|
- `started`: Download begins
|
|
- `progress`: Progress updates (percent, speed, ETA)
|
|
- `completed`: Download finished successfully
|
|
- `failed`: Download encountered an error
|
|
|
|
- `scan_status`: Fired during library scans
|
|
- `started`: Scan begins
|
|
- `progress`: Scan progress updates
|
|
- `completed`: Scan finished
|
|
- `failed`: Scan encountered an error
|
|
- `cancelled`: Scan was cancelled
|
|
|
|
### 2. AnimeService (Service Layer)
|
|
|
|
**Location**: `src/server/services/anime_service.py`
|
|
|
|
**Responsibilities**:
|
|
|
|
- Subscribe to SeriesApp events
|
|
- Translate SeriesApp events into ProgressService updates
|
|
- Provide async interface for web layer
|
|
|
|
**Event Handlers**:
|
|
|
|
```python
|
|
def _on_download_status(self, args):
|
|
"""Translates download events to progress service."""
|
|
if args.status == "started":
|
|
await progress_service.start_progress(...)
|
|
elif args.status == "progress":
|
|
await progress_service.update_progress(...)
|
|
elif args.status == "completed":
|
|
await progress_service.complete_progress(...)
|
|
elif args.status == "failed":
|
|
await progress_service.fail_progress(...)
|
|
|
|
def _on_scan_status(self, args):
|
|
"""Translates scan events to progress service."""
|
|
# Similar pattern as download_status
|
|
```
|
|
|
|
### 3. ProgressService (Service Layer)
|
|
|
|
**Location**: `src/server/services/progress_service.py`
|
|
|
|
**Responsibilities**:
|
|
|
|
- Central progress tracking for all operations
|
|
- Maintain active and historical progress records
|
|
- Calculate percentages and rates
|
|
- Emit events to subscribers (event-based architecture)
|
|
|
|
**Progress Types**:
|
|
|
|
- `DOWNLOAD`: Individual episode downloads
|
|
- `SCAN`: Library scans for missing episodes
|
|
- `QUEUE`: Download queue operations
|
|
- `SYSTEM`: System-level operations
|
|
- `ERROR`: Error notifications
|
|
|
|
**Event System**:
|
|
|
|
```python
|
|
# Subscribe to progress events
|
|
def subscribe(event_name: str, handler: Callable[[ProgressEvent], None])
|
|
def unsubscribe(event_name: str, handler: Callable[[ProgressEvent], None])
|
|
|
|
# Internal event emission
|
|
async def _emit_event(event: ProgressEvent)
|
|
```
|
|
|
|
**Key Methods**:
|
|
|
|
```python
|
|
async def start_progress(progress_id, progress_type, title, ...):
|
|
"""Start tracking a new operation."""
|
|
|
|
async def update_progress(progress_id, current, total, message, ...):
|
|
"""Update progress for an ongoing operation."""
|
|
|
|
async def complete_progress(progress_id, message, ...):
|
|
"""Mark operation as completed."""
|
|
|
|
async def fail_progress(progress_id, error_message, ...):
|
|
"""Mark operation as failed."""
|
|
```
|
|
|
|
### 4. DownloadService (Service Layer)
|
|
|
|
**Location**: `src/server/services/download_service.py`
|
|
|
|
**Responsibilities**:
|
|
|
|
- Manage download queue (FIFO processing)
|
|
- Track queue state (pending, active, completed, failed)
|
|
- Persist queue to disk
|
|
- Use ProgressService for queue-related updates
|
|
|
|
**Progress Integration**:
|
|
|
|
```python
|
|
# Queue operations notify via ProgressService
|
|
await progress_service.update_progress(
|
|
progress_id="download_queue",
|
|
message="Added 3 items to queue",
|
|
metadata={
|
|
"action": "items_added",
|
|
"queue_status": {...}
|
|
},
|
|
force_broadcast=True,
|
|
)
|
|
```
|
|
|
|
**Note**: DownloadService does NOT directly broadcast. Individual download progress flows through:
|
|
`SeriesApp → AnimeService → ProgressService → WebSocket`
|
|
|
|
### 5. WebSocketService (Service Layer)
|
|
|
|
**Location**: `src/server/services/websocket_service.py`
|
|
|
|
**Responsibilities**:
|
|
|
|
- Manage WebSocket connections
|
|
- Support room-based messaging
|
|
- Broadcast progress updates to clients
|
|
- Handle connection lifecycle
|
|
|
|
**Integration**:
|
|
WebSocketService subscribes to ProgressService events:
|
|
|
|
```python
|
|
async def lifespan(app: FastAPI):
|
|
# Get services
|
|
progress_service = get_progress_service()
|
|
ws_service = get_websocket_service()
|
|
|
|
# Define event handler
|
|
async def progress_event_handler(event) -> None:
|
|
"""Handle progress events and broadcast via WebSocket."""
|
|
message = {
|
|
"type": event.event_type,
|
|
"data": event.progress.to_dict(),
|
|
}
|
|
await ws_service.manager.broadcast_to_room(message, event.room)
|
|
|
|
# Subscribe to progress events
|
|
progress_service.subscribe("progress_updated", progress_event_handler)
|
|
```
|
|
|
|
## Data Flow Examples
|
|
|
|
### Example 1: Episode Download
|
|
|
|
1. **User triggers download** via API endpoint
|
|
2. **DownloadService** queues the download
|
|
3. **DownloadService** starts processing → calls `anime_service.download()`
|
|
4. **AnimeService** calls `series_app.download()`
|
|
5. **SeriesApp** fires `download_status` events:
|
|
- `started` → AnimeService → ProgressService → WebSocket → Client
|
|
- `progress` (multiple) → AnimeService → ProgressService → WebSocket → Client
|
|
- `completed` → AnimeService → ProgressService → WebSocket → Client
|
|
|
|
### Example 2: Library Scan
|
|
|
|
1. **User triggers scan** via API endpoint
|
|
2. **AnimeService** calls `series_app.rescan()`
|
|
3. **SeriesApp** fires `scan_status` events:
|
|
- `started` → AnimeService → ProgressService → WebSocket → Client
|
|
- `progress` (multiple) → AnimeService → ProgressService → WebSocket → Client
|
|
- `completed` → AnimeService → ProgressService → WebSocket → Client
|
|
|
|
### Example 3: Queue Management
|
|
|
|
1. **User adds items to queue** via API endpoint
|
|
2. **DownloadService** adds items to internal queue
|
|
3. **DownloadService** notifies via ProgressService:
|
|
```python
|
|
await progress_service.update_progress(
|
|
progress_id="download_queue",
|
|
message="Added 5 items to queue",
|
|
metadata={"queue_status": {...}},
|
|
force_broadcast=True,
|
|
)
|
|
```
|
|
4. **ProgressService** → WebSocket → Client receives queue update
|
|
|
|
## Benefits of This Architecture
|
|
|
|
### 1. **Single Source of Truth**
|
|
|
|
- All progress tracking goes through ProgressService
|
|
- Consistent progress reporting across the application
|
|
- Easy to monitor and debug
|
|
|
|
### 2. **Decoupling**
|
|
|
|
- Core logic (SeriesApp) doesn't know about web layer
|
|
- Services can be tested independently
|
|
- Easy to add new progress consumers (e.g., CLI, GUI)
|
|
|
|
### 3. **Type Safety**
|
|
|
|
- Strongly typed progress updates
|
|
- Enum-based progress types and statuses
|
|
- Clear data contracts
|
|
|
|
### 4. **Flexibility**
|
|
|
|
- Multiple subscribers can listen to progress events
|
|
- Room-based WebSocket messaging
|
|
- Metadata support for custom data
|
|
- Multiple concurrent progress operations
|
|
|
|
### 5. **Maintainability**
|
|
|
|
- Clear separation of concerns
|
|
- Single place to modify progress logic
|
|
- Easy to extend with new progress types or subscribers
|
|
|
|
### 6. **Scalability**
|
|
|
|
- Event-based architecture supports multiple consumers
|
|
- Isolated error handling per subscriber
|
|
- No single point of failure
|
|
|
|
## Progress IDs
|
|
|
|
Progress operations are identified by unique IDs:
|
|
|
|
- **Downloads**: `download_{serie_folder}_{season}_{episode}`
|
|
- **Scans**: `library_scan`
|
|
- **Queue**: `download_queue`
|
|
|
|
## WebSocket Messages
|
|
|
|
Clients receive progress updates in this format:
|
|
|
|
```json
|
|
{
|
|
"type": "download_progress",
|
|
"data": {
|
|
"id": "download_naruto_1_1",
|
|
"type": "download",
|
|
"status": "in_progress",
|
|
"title": "Downloading Naruto",
|
|
"message": "S01E01",
|
|
"percent": 45.5,
|
|
"current": 45,
|
|
"total": 100,
|
|
"metadata": {},
|
|
"started_at": "2025-11-07T10:00:00Z",
|
|
"updated_at": "2025-11-07T10:05:00Z"
|
|
}
|
|
}
|
|
```
|
|
|
|
## Configuration
|
|
|
|
### Startup (fastapi_app.py)
|
|
|
|
```python
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
# Initialize services
|
|
progress_service = get_progress_service()
|
|
ws_service = get_websocket_service()
|
|
|
|
# Define event handler
|
|
async def progress_event_handler(event) -> None:
|
|
"""Handle progress events and broadcast via WebSocket."""
|
|
message = {
|
|
"type": event.event_type,
|
|
"data": event.progress.to_dict(),
|
|
}
|
|
await ws_service.manager.broadcast_to_room(message, event.room)
|
|
|
|
# Subscribe to progress events
|
|
progress_service.subscribe("progress_updated", progress_event_handler)
|
|
```
|
|
|
|
### Service Initialization
|
|
|
|
```python
|
|
# AnimeService automatically subscribes to SeriesApp events
|
|
anime_service = AnimeService(series_app)
|
|
|
|
# DownloadService uses ProgressService for queue updates
|
|
download_service = DownloadService(anime_service)
|
|
```
|
|
|
|
## Migration Notes
|
|
|
|
### What Changed
|
|
|
|
**Before (Callback-based)**:
|
|
|
|
- ProgressService had a single `set_broadcast_callback()` method
|
|
- Only one consumer could receive updates
|
|
- Direct coupling between ProgressService and WebSocketService
|
|
|
|
**After (Event-based)**:
|
|
|
|
- ProgressService uses `subscribe()` and `unsubscribe()` methods
|
|
- Multiple consumers can subscribe to progress events
|
|
- Loose coupling - ProgressService doesn't know about subscribers
|
|
- Clean event flow: SeriesApp → AnimeService → ProgressService → Subscribers
|
|
|
|
### Removed
|
|
|
|
1. **ProgressService**:
|
|
|
|
- `set_broadcast_callback()` method
|
|
- `_broadcast_callback` attribute
|
|
- `_broadcast()` method
|
|
|
|
### Added
|
|
|
|
1. **ProgressService**:
|
|
|
|
- `ProgressEvent` dataclass to encapsulate event data
|
|
- `subscribe()` method for event subscription
|
|
- `unsubscribe()` method to remove handlers
|
|
- `_emit_event()` method for broadcasting to all subscribers
|
|
- `_event_handlers` dictionary to track subscribers
|
|
|
|
2. **fastapi_app.py**:
|
|
- Event handler function `progress_event_handler`
|
|
- Uses `subscribe()` instead of `set_broadcast_callback()`
|
|
|
|
### Benefits of Event-Based Design
|
|
|
|
1. **Multiple Subscribers**: Can now have multiple services listening to progress
|
|
|
|
```python
|
|
# WebSocket for real-time updates
|
|
progress_service.subscribe("progress_updated", websocket_handler)
|
|
# Metrics for analytics
|
|
progress_service.subscribe("progress_updated", metrics_handler)
|
|
# Logging for debugging
|
|
progress_service.subscribe("progress_updated", logging_handler)
|
|
```
|
|
|
|
2. **Isolated Error Handling**: If one subscriber fails, others continue working
|
|
|
|
3. **Dynamic Subscription**: Handlers can subscribe/unsubscribe at runtime
|
|
|
|
4. **Extensibility**: Easy to add new features without modifying ProgressService
|
|
|
|
## Testing
|
|
|
|
### Unit Tests
|
|
|
|
- Test each service independently
|
|
- Mock ProgressService for services that use it
|
|
- Verify event handler logic
|
|
|
|
### Integration Tests
|
|
|
|
- Test full flow: SeriesApp → AnimeService → ProgressService → WebSocket
|
|
- Verify progress updates reach clients
|
|
- Test error handling
|
|
|
|
### Example Test
|
|
|
|
```python
|
|
async def test_download_progress_flow():
|
|
# Setup
|
|
progress_service = ProgressService()
|
|
events_received = []
|
|
|
|
async def mock_event_handler(event):
|
|
events_received.append(event)
|
|
|
|
progress_service.subscribe("progress_updated", mock_event_handler)
|
|
|
|
# Execute
|
|
await progress_service.start_progress(
|
|
progress_id="test_download",
|
|
progress_type=ProgressType.DOWNLOAD,
|
|
title="Test"
|
|
)
|
|
|
|
# Verify
|
|
assert len(events_received) == 1
|
|
assert events_received[0].event_type == "download_progress"
|
|
assert events_received[0].progress.id == "test_download"
|
|
```
|
|
|
|
## Future Enhancements
|
|
|
|
1. **Progress Persistence**: Save progress to database for recovery
|
|
2. **Progress History**: Keep detailed history for analytics
|
|
3. **Rate Limiting**: Throttle progress updates to prevent spam
|
|
4. **Progress Aggregation**: Combine multiple progress operations
|
|
5. **Custom Rooms**: Allow clients to subscribe to specific progress types
|
|
|
|
## Related Documentation
|
|
|
|
- [WebSocket API](./websocket_api.md)
|
|
- [Download Service](./download_service.md)
|
|
- [Error Handling](./error_handling_validation.md)
|
|
- [API Implementation](./api_implementation_summary.md)
|