fix progress events
This commit is contained in:
@@ -13,14 +13,13 @@ from collections import deque
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Callable, Dict, List, Optional
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
import structlog
|
||||
|
||||
from src.server.models.download import (
|
||||
DownloadItem,
|
||||
DownloadPriority,
|
||||
DownloadProgress,
|
||||
DownloadStatus,
|
||||
EpisodeIdentifier,
|
||||
QueueStats,
|
||||
@@ -82,37 +81,33 @@ class DownloadService:
|
||||
# Executor for blocking operations
|
||||
self._executor = ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
# WebSocket broadcast callback
|
||||
self._broadcast_callback: Optional[Callable] = None
|
||||
|
||||
# Statistics tracking
|
||||
self._total_downloaded_mb: float = 0.0
|
||||
self._download_speeds: deque[float] = deque(maxlen=10)
|
||||
|
||||
# Subscribe to SeriesApp download events for progress tracking
|
||||
# Note: Events library uses assignment (=), not += operator
|
||||
if hasattr(anime_service, '_app') and hasattr(
|
||||
anime_service._app, 'download_status'
|
||||
):
|
||||
# Save existing handler if any, and chain them
|
||||
existing_handler = anime_service._app.download_status
|
||||
if existing_handler:
|
||||
def chained_handler(args):
|
||||
existing_handler(args)
|
||||
self._on_seriesapp_download_status(args)
|
||||
anime_service._app.download_status = chained_handler
|
||||
else:
|
||||
anime_service._app.download_status = (
|
||||
self._on_seriesapp_download_status
|
||||
)
|
||||
|
||||
# Load persisted queue
|
||||
self._load_queue()
|
||||
|
||||
# Initialize queue progress tracking
|
||||
asyncio.create_task(self._init_queue_progress())
|
||||
|
||||
logger.info(
|
||||
"DownloadService initialized",
|
||||
max_retries=max_retries,
|
||||
)
|
||||
|
||||
async def _init_queue_progress(self) -> None:
|
||||
"""Initialize the download queue progress tracking."""
|
||||
try:
|
||||
from src.server.services.progress_service import ProgressType
|
||||
await self._progress_service.start_progress(
|
||||
progress_id="download_queue",
|
||||
progress_type=ProgressType.QUEUE,
|
||||
title="Download Queue",
|
||||
message="Queue ready",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Failed to initialize queue progress", error=str(e))
|
||||
|
||||
def _add_to_pending_queue(
|
||||
self, item: DownloadItem, front: bool = False
|
||||
@@ -154,91 +149,6 @@ class DownloadService:
|
||||
except (ValueError, KeyError):
|
||||
return None
|
||||
|
||||
def set_broadcast_callback(self, callback: Callable) -> None:
|
||||
"""Set callback for broadcasting status updates via WebSocket."""
|
||||
self._broadcast_callback = callback
|
||||
logger.debug("Broadcast callback registered")
|
||||
|
||||
def _on_seriesapp_download_status(self, args) -> None:
|
||||
"""Handle download status events from SeriesApp.
|
||||
|
||||
Updates the active download item with progress information.
|
||||
|
||||
Args:
|
||||
args: DownloadStatusEventArgs from SeriesApp
|
||||
"""
|
||||
try:
|
||||
# Only process if we have an active download
|
||||
if not self._active_download:
|
||||
return
|
||||
|
||||
# Match the event to the active download item
|
||||
# SeriesApp events include serie_folder, season, episode
|
||||
if (
|
||||
self._active_download.serie_folder == args.serie_folder
|
||||
and self._active_download.episode.season == args.season
|
||||
and self._active_download.episode.episode == args.episode
|
||||
):
|
||||
if args.status == "progress":
|
||||
# Update item progress
|
||||
self._active_download.progress = DownloadProgress(
|
||||
percent=args.progress,
|
||||
downloaded_mb=(
|
||||
args.progress * args.mbper_sec / 100
|
||||
if args.mbper_sec
|
||||
else 0.0
|
||||
),
|
||||
total_mb=None, # Not provided by SeriesApp
|
||||
speed_mbps=args.mbper_sec,
|
||||
eta_seconds=args.eta,
|
||||
)
|
||||
|
||||
# Track speed
|
||||
if args.mbper_sec:
|
||||
self._download_speeds.append(args.mbper_sec)
|
||||
|
||||
# Broadcast update
|
||||
asyncio.create_task(
|
||||
self._broadcast_update(
|
||||
"download_progress",
|
||||
{
|
||||
"download_id": self._active_download.id,
|
||||
"item_id": self._active_download.id,
|
||||
"serie_name": self._active_download.serie_name,
|
||||
"season": args.season,
|
||||
"episode": args.episode,
|
||||
"progress": (
|
||||
self._active_download.progress.model_dump(
|
||||
mode="json"
|
||||
)
|
||||
),
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
"Error handling SeriesApp download status",
|
||||
error=str(exc)
|
||||
)
|
||||
|
||||
async def _broadcast_update(self, update_type: str, data: dict) -> None:
|
||||
"""Broadcast update to connected WebSocket clients.
|
||||
|
||||
Args:
|
||||
update_type: Type of update (download_progress, queue_status, etc.)
|
||||
data: Update data to broadcast
|
||||
"""
|
||||
if self._broadcast_callback:
|
||||
try:
|
||||
await self._broadcast_callback(update_type, data)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Failed to broadcast update",
|
||||
update_type=update_type,
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
def _generate_item_id(self) -> str:
|
||||
"""Generate unique identifier for download items."""
|
||||
return str(uuid.uuid4())
|
||||
@@ -359,15 +269,17 @@ class DownloadService:
|
||||
|
||||
self._save_queue()
|
||||
|
||||
# Broadcast queue status update
|
||||
# Notify via progress service
|
||||
queue_status = await self.get_queue_status()
|
||||
await self._broadcast_update(
|
||||
"queue_status",
|
||||
{
|
||||
await self._progress_service.update_progress(
|
||||
progress_id="download_queue",
|
||||
message=f"Added {len(created_ids)} items to queue",
|
||||
metadata={
|
||||
"action": "items_added",
|
||||
"added_ids": created_ids,
|
||||
"queue_status": queue_status.model_dump(mode="json"),
|
||||
},
|
||||
force_broadcast=True,
|
||||
)
|
||||
|
||||
return created_ids
|
||||
@@ -416,15 +328,17 @@ class DownloadService:
|
||||
|
||||
if removed_ids:
|
||||
self._save_queue()
|
||||
# Broadcast queue status update
|
||||
# Notify via progress service
|
||||
queue_status = await self.get_queue_status()
|
||||
await self._broadcast_update(
|
||||
"queue_status",
|
||||
{
|
||||
await self._progress_service.update_progress(
|
||||
progress_id="download_queue",
|
||||
message=f"Removed {len(removed_ids)} items from queue",
|
||||
metadata={
|
||||
"action": "items_removed",
|
||||
"removed_ids": removed_ids,
|
||||
"queue_status": queue_status.model_dump(mode="json"),
|
||||
},
|
||||
force_broadcast=True,
|
||||
)
|
||||
|
||||
return removed_ids
|
||||
@@ -498,17 +412,24 @@ class DownloadService:
|
||||
remaining=len(self._pending_queue)
|
||||
)
|
||||
|
||||
# Broadcast queue status update
|
||||
# Notify via progress service
|
||||
queue_status = await self.get_queue_status()
|
||||
await self._broadcast_update(
|
||||
"download_started",
|
||||
{
|
||||
msg = (
|
||||
f"Started: {item.serie_name} "
|
||||
f"S{item.episode.season:02d}E{item.episode.episode:02d}"
|
||||
)
|
||||
await self._progress_service.update_progress(
|
||||
progress_id="download_queue",
|
||||
message=msg,
|
||||
metadata={
|
||||
"action": "download_started",
|
||||
"item_id": item.id,
|
||||
"serie_name": item.serie_name,
|
||||
"season": item.episode.season,
|
||||
"episode": item.episode.episode,
|
||||
"queue_status": queue_status.model_dump(mode="json"),
|
||||
},
|
||||
force_broadcast=True,
|
||||
)
|
||||
|
||||
# Process the download (this will wait until complete)
|
||||
@@ -532,11 +453,11 @@ class DownloadService:
|
||||
if len(self._pending_queue) == 0:
|
||||
logger.info("Queue processing completed - all items processed")
|
||||
queue_status = await self.get_queue_status()
|
||||
await self._broadcast_update(
|
||||
"queue_completed",
|
||||
{
|
||||
"message": "All downloads completed",
|
||||
"queue_status": queue_status.model_dump(mode="json"),
|
||||
await self._progress_service.complete_progress(
|
||||
progress_id="download_queue",
|
||||
message="All downloads completed",
|
||||
metadata={
|
||||
"queue_status": queue_status.model_dump(mode="json")
|
||||
},
|
||||
)
|
||||
else:
|
||||
@@ -561,14 +482,17 @@ class DownloadService:
|
||||
self._is_stopped = True
|
||||
logger.info("Download processing stopped")
|
||||
|
||||
# Broadcast queue status update
|
||||
# Notify via progress service
|
||||
queue_status = await self.get_queue_status()
|
||||
await self._broadcast_update(
|
||||
"queue_stopped",
|
||||
{
|
||||
await self._progress_service.update_progress(
|
||||
progress_id="download_queue",
|
||||
message="Queue processing stopped",
|
||||
metadata={
|
||||
"action": "queue_stopped",
|
||||
"is_stopped": True,
|
||||
"queue_status": queue_status.model_dump(mode="json"),
|
||||
},
|
||||
force_broadcast=True,
|
||||
)
|
||||
|
||||
async def get_queue_status(self) -> QueueStatus:
|
||||
@@ -638,16 +562,18 @@ class DownloadService:
|
||||
self._completed_items.clear()
|
||||
logger.info("Cleared completed items", count=count)
|
||||
|
||||
# Broadcast queue status update
|
||||
# Notify via progress service
|
||||
if count > 0:
|
||||
queue_status = await self.get_queue_status()
|
||||
await self._broadcast_update(
|
||||
"queue_status",
|
||||
{
|
||||
await self._progress_service.update_progress(
|
||||
progress_id="download_queue",
|
||||
message=f"Cleared {count} completed items",
|
||||
metadata={
|
||||
"action": "completed_cleared",
|
||||
"cleared_count": count,
|
||||
"queue_status": queue_status.model_dump(mode="json"),
|
||||
},
|
||||
force_broadcast=True,
|
||||
)
|
||||
|
||||
return count
|
||||
@@ -662,16 +588,18 @@ class DownloadService:
|
||||
self._failed_items.clear()
|
||||
logger.info("Cleared failed items", count=count)
|
||||
|
||||
# Broadcast queue status update
|
||||
# Notify via progress service
|
||||
if count > 0:
|
||||
queue_status = await self.get_queue_status()
|
||||
await self._broadcast_update(
|
||||
"queue_status",
|
||||
{
|
||||
await self._progress_service.update_progress(
|
||||
progress_id="download_queue",
|
||||
message=f"Cleared {count} failed items",
|
||||
metadata={
|
||||
"action": "failed_cleared",
|
||||
"cleared_count": count,
|
||||
"queue_status": queue_status.model_dump(mode="json"),
|
||||
},
|
||||
force_broadcast=True,
|
||||
)
|
||||
|
||||
return count
|
||||
@@ -690,16 +618,18 @@ class DownloadService:
|
||||
# Save queue state
|
||||
self._save_queue()
|
||||
|
||||
# Broadcast queue status update
|
||||
# Notify via progress service
|
||||
if count > 0:
|
||||
queue_status = await self.get_queue_status()
|
||||
await self._broadcast_update(
|
||||
"queue_status",
|
||||
{
|
||||
await self._progress_service.update_progress(
|
||||
progress_id="download_queue",
|
||||
message=f"Cleared {count} pending items",
|
||||
metadata={
|
||||
"action": "pending_cleared",
|
||||
"cleared_count": count,
|
||||
"queue_status": queue_status.model_dump(mode="json"),
|
||||
},
|
||||
force_broadcast=True,
|
||||
)
|
||||
|
||||
return count
|
||||
@@ -746,15 +676,17 @@ class DownloadService:
|
||||
|
||||
if retried_ids:
|
||||
self._save_queue()
|
||||
# Broadcast queue status update
|
||||
# Notify via progress service
|
||||
queue_status = await self.get_queue_status()
|
||||
await self._broadcast_update(
|
||||
"queue_status",
|
||||
{
|
||||
await self._progress_service.update_progress(
|
||||
progress_id="download_queue",
|
||||
message=f"Retried {len(retried_ids)} failed items",
|
||||
metadata={
|
||||
"action": "items_retried",
|
||||
"retried_ids": retried_ids,
|
||||
"queue_status": queue_status.model_dump(mode="json"),
|
||||
},
|
||||
force_broadcast=True,
|
||||
)
|
||||
|
||||
return retried_ids
|
||||
@@ -786,10 +718,10 @@ class DownloadService:
|
||||
)
|
||||
|
||||
# Execute download via anime service
|
||||
# Note: AnimeService handles progress via SeriesApp events
|
||||
# Progress updates received via _on_seriesapp_download_status
|
||||
# Use serie_folder if available, otherwise fall back to serie_id
|
||||
# for backwards compatibility with old queue items
|
||||
# AnimeService handles ALL progress via SeriesApp events:
|
||||
# - download started/progress/completed/failed events
|
||||
# - All updates forwarded to ProgressService
|
||||
# - ProgressService broadcasts to WebSocket clients
|
||||
folder = item.serie_folder if item.serie_folder else item.serie_id
|
||||
success = await self._anime_service.download(
|
||||
serie_folder=folder,
|
||||
@@ -812,21 +744,6 @@ class DownloadService:
|
||||
logger.info(
|
||||
"Download completed successfully", item_id=item.id
|
||||
)
|
||||
|
||||
# Broadcast completion (progress already handled by events)
|
||||
await self._broadcast_update(
|
||||
"download_complete",
|
||||
{
|
||||
"download_id": item.id,
|
||||
"item_id": item.id,
|
||||
"serie_name": item.serie_name,
|
||||
"season": item.episode.season,
|
||||
"episode": item.episode.episode,
|
||||
"downloaded_mb": item.progress.downloaded_mb
|
||||
if item.progress
|
||||
else 0,
|
||||
},
|
||||
)
|
||||
else:
|
||||
raise AnimeServiceError("Download returned False")
|
||||
|
||||
@@ -843,20 +760,8 @@ class DownloadService:
|
||||
error=str(e),
|
||||
retry_count=item.retry_count,
|
||||
)
|
||||
|
||||
# Broadcast failure (progress already handled by events)
|
||||
await self._broadcast_update(
|
||||
"download_failed",
|
||||
{
|
||||
"download_id": item.id,
|
||||
"item_id": item.id,
|
||||
"serie_name": item.serie_name,
|
||||
"season": item.episode.season,
|
||||
"episode": item.episode.episode,
|
||||
"error": item.error,
|
||||
"retry_count": item.retry_count,
|
||||
},
|
||||
)
|
||||
# Note: Failure is already broadcast by AnimeService
|
||||
# via ProgressService when SeriesApp fires failed event
|
||||
|
||||
finally:
|
||||
# Remove from active downloads
|
||||
|
||||
@@ -11,7 +11,7 @@ import asyncio
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
from typing import Any, Callable, Dict, List, Optional
|
||||
|
||||
import structlog
|
||||
|
||||
@@ -85,6 +85,30 @@ class ProgressUpdate:
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProgressEvent:
|
||||
"""Represents a progress event for subscribers.
|
||||
|
||||
Attributes:
|
||||
event_type: Type of event (e.g., 'download_progress')
|
||||
progress_id: Unique identifier for the progress operation
|
||||
progress: The progress update data
|
||||
room: WebSocket room to broadcast to (default: 'progress')
|
||||
"""
|
||||
|
||||
event_type: str
|
||||
progress_id: str
|
||||
progress: ProgressUpdate
|
||||
room: str = "progress"
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert event to dictionary for broadcasting."""
|
||||
return {
|
||||
"type": self.event_type,
|
||||
"data": self.progress.to_dict(),
|
||||
}
|
||||
|
||||
|
||||
class ProgressServiceError(Exception):
|
||||
"""Service-level exception for progress operations."""
|
||||
|
||||
@@ -109,44 +133,82 @@ class ProgressService:
|
||||
self._history: Dict[str, ProgressUpdate] = {}
|
||||
self._max_history_size = 50
|
||||
|
||||
# WebSocket broadcast callback
|
||||
self._broadcast_callback: Optional[Callable] = None
|
||||
# Event subscribers: event_name -> list of handlers
|
||||
self._event_handlers: Dict[
|
||||
str, List[Callable[[ProgressEvent], None]]
|
||||
] = {}
|
||||
|
||||
# Lock for thread-safe operations
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
logger.info("ProgressService initialized")
|
||||
|
||||
def set_broadcast_callback(self, callback: Callable) -> None:
|
||||
"""Set callback for broadcasting progress updates via WebSocket.
|
||||
def subscribe(
|
||||
self, event_name: str, handler: Callable[[ProgressEvent], None]
|
||||
) -> None:
|
||||
"""Subscribe to progress events.
|
||||
|
||||
Args:
|
||||
callback: Async function to call for broadcasting updates
|
||||
event_name: Name of event to subscribe to
|
||||
(e.g., 'progress_updated')
|
||||
handler: Async function to call when event occurs
|
||||
"""
|
||||
self._broadcast_callback = callback
|
||||
logger.debug("Progress broadcast callback registered")
|
||||
if event_name not in self._event_handlers:
|
||||
self._event_handlers[event_name] = []
|
||||
|
||||
async def _broadcast(self, update: ProgressUpdate, room: str) -> None:
|
||||
"""Broadcast progress update to WebSocket clients.
|
||||
self._event_handlers[event_name].append(handler)
|
||||
logger.debug("Event handler subscribed", event=event_name)
|
||||
|
||||
def unsubscribe(
|
||||
self, event_name: str, handler: Callable[[ProgressEvent], None]
|
||||
) -> None:
|
||||
"""Unsubscribe from progress events.
|
||||
|
||||
Args:
|
||||
update: Progress update to broadcast
|
||||
room: WebSocket room to broadcast to
|
||||
event_name: Name of event to unsubscribe from
|
||||
handler: Handler function to remove
|
||||
"""
|
||||
if self._broadcast_callback:
|
||||
if event_name in self._event_handlers:
|
||||
try:
|
||||
await self._broadcast_callback(
|
||||
message_type=f"{update.type.value}_progress",
|
||||
data=update.to_dict(),
|
||||
room=room,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Failed to broadcast progress update",
|
||||
error=str(e),
|
||||
progress_id=update.id,
|
||||
self._event_handlers[event_name].remove(handler)
|
||||
logger.debug("Event handler unsubscribed", event=event_name)
|
||||
except ValueError:
|
||||
logger.warning(
|
||||
"Handler not found for unsubscribe", event=event_name
|
||||
)
|
||||
|
||||
async def _emit_event(self, event: ProgressEvent) -> None:
|
||||
"""Emit event to all subscribers.
|
||||
|
||||
Args:
|
||||
event: Progress event to emit
|
||||
|
||||
Note:
|
||||
Errors in individual handlers are logged but do not
|
||||
prevent other handlers from executing.
|
||||
"""
|
||||
event_name = "progress_updated"
|
||||
|
||||
if event_name in self._event_handlers:
|
||||
handlers = self._event_handlers[event_name]
|
||||
if handlers:
|
||||
# Execute all handlers, capturing exceptions
|
||||
tasks = [handler(event) for handler in handlers]
|
||||
# Ignore type error - tasks will be coroutines at runtime
|
||||
results = await asyncio.gather(
|
||||
*tasks, return_exceptions=True
|
||||
) # type: ignore[arg-type]
|
||||
|
||||
# Log any exceptions that occurred
|
||||
for idx, result in enumerate(results):
|
||||
if isinstance(result, Exception):
|
||||
logger.error(
|
||||
"Event handler raised exception",
|
||||
event=event_name,
|
||||
error=str(result),
|
||||
handler_index=idx,
|
||||
)
|
||||
|
||||
async def start_progress(
|
||||
self,
|
||||
progress_id: str,
|
||||
@@ -197,9 +259,15 @@ class ProgressService:
|
||||
title=title,
|
||||
)
|
||||
|
||||
# Broadcast to appropriate room
|
||||
# Emit event to subscribers
|
||||
room = f"{progress_type.value}_progress"
|
||||
await self._broadcast(update, room)
|
||||
event = ProgressEvent(
|
||||
event_type=f"{progress_type.value}_progress",
|
||||
progress_id=progress_id,
|
||||
progress=update,
|
||||
room=room,
|
||||
)
|
||||
await self._emit_event(event)
|
||||
|
||||
return update
|
||||
|
||||
@@ -262,7 +330,13 @@ class ProgressService:
|
||||
|
||||
if should_broadcast:
|
||||
room = f"{update.type.value}_progress"
|
||||
await self._broadcast(update, room)
|
||||
event = ProgressEvent(
|
||||
event_type=f"{update.type.value}_progress",
|
||||
progress_id=progress_id,
|
||||
progress=update,
|
||||
room=room,
|
||||
)
|
||||
await self._emit_event(event)
|
||||
|
||||
return update
|
||||
|
||||
@@ -311,9 +385,15 @@ class ProgressService:
|
||||
type=update.type.value,
|
||||
)
|
||||
|
||||
# Broadcast completion
|
||||
# Emit completion event
|
||||
room = f"{update.type.value}_progress"
|
||||
await self._broadcast(update, room)
|
||||
event = ProgressEvent(
|
||||
event_type=f"{update.type.value}_progress",
|
||||
progress_id=progress_id,
|
||||
progress=update,
|
||||
room=room,
|
||||
)
|
||||
await self._emit_event(event)
|
||||
|
||||
return update
|
||||
|
||||
@@ -361,9 +441,15 @@ class ProgressService:
|
||||
error=error_message,
|
||||
)
|
||||
|
||||
# Broadcast failure
|
||||
# Emit failure event
|
||||
room = f"{update.type.value}_progress"
|
||||
await self._broadcast(update, room)
|
||||
event = ProgressEvent(
|
||||
event_type=f"{update.type.value}_progress",
|
||||
progress_id=progress_id,
|
||||
progress=update,
|
||||
room=room,
|
||||
)
|
||||
await self._emit_event(event)
|
||||
|
||||
return update
|
||||
|
||||
@@ -405,9 +491,15 @@ class ProgressService:
|
||||
type=update.type.value,
|
||||
)
|
||||
|
||||
# Broadcast cancellation
|
||||
# Emit cancellation event
|
||||
room = f"{update.type.value}_progress"
|
||||
await self._broadcast(update, room)
|
||||
event = ProgressEvent(
|
||||
event_type=f"{update.type.value}_progress",
|
||||
progress_id=progress_id,
|
||||
progress=update,
|
||||
room=room,
|
||||
)
|
||||
await self._emit_event(event)
|
||||
|
||||
return update
|
||||
|
||||
|
||||
Reference in New Issue
Block a user