download re implemented

This commit is contained in:
2025-10-30 22:06:41 +01:00
parent 6ebc2ed2ea
commit 3be175522f
16 changed files with 359 additions and 1335 deletions

View File

@@ -1,8 +1,8 @@
"""Download queue service for managing anime episode downloads.
This module provides a comprehensive queue management system for handling
concurrent anime episode downloads with priority-based scheduling, progress
tracking, persistence, and automatic retry functionality.
This module provides a simplified queue management system for handling
anime episode downloads with manual start/stop controls, progress tracking,
persistence, and retry functionality.
"""
from __future__ import annotations
@@ -41,11 +41,11 @@ class DownloadServiceError(Exception):
class DownloadService:
"""Manages the download queue with concurrent processing and persistence.
"""Manages the download queue with manual start/stop controls.
Features:
- Priority-based queue management
- Concurrent download processing
- Manual download start/stop
- FIFO queue processing
- Real-time progress tracking
- Queue persistence and recovery
- Automatic retry logic
@@ -55,7 +55,6 @@ class DownloadService:
def __init__(
self,
anime_service: AnimeService,
max_concurrent_downloads: int = 2,
max_retries: int = 3,
persistence_path: str = "./data/download_queue.json",
progress_service: Optional[ProgressService] = None,
@@ -64,13 +63,11 @@ class DownloadService:
Args:
anime_service: Service for anime operations
max_concurrent_downloads: Maximum simultaneous downloads
max_retries: Maximum retry attempts for failed downloads
persistence_path: Path to persist queue state
progress_service: Optional progress service for tracking
"""
self._anime_service = anime_service
self._max_concurrent = max_concurrent_downloads
self._max_retries = max_retries
self._persistence_path = Path(persistence_path)
self._progress_service = progress_service or get_progress_service()
@@ -79,19 +76,15 @@ class DownloadService:
self._pending_queue: deque[DownloadItem] = deque()
# Helper dict for O(1) lookup of pending items by ID
self._pending_items_by_id: Dict[str, DownloadItem] = {}
self._active_downloads: Dict[str, DownloadItem] = {}
self._active_download: Optional[DownloadItem] = None
self._completed_items: deque[DownloadItem] = deque(maxlen=100)
self._failed_items: deque[DownloadItem] = deque(maxlen=50)
# Control flags
self._is_running = False
self._is_paused = False
self._shutdown_event = asyncio.Event()
self._is_stopped = True # Queue processing is stopped by default
# Executor for blocking operations
self._executor = ThreadPoolExecutor(
max_workers=max_concurrent_downloads
)
self._executor = ThreadPoolExecutor(max_workers=1)
# WebSocket broadcast callback
self._broadcast_callback: Optional[Callable] = None
@@ -105,7 +98,6 @@ class DownloadService:
logger.info(
"DownloadService initialized",
max_concurrent=max_concurrent_downloads,
max_retries=max_retries,
)
@@ -212,14 +204,17 @@ class DownloadService:
try:
self._persistence_path.parent.mkdir(parents=True, exist_ok=True)
active_items = (
[self._active_download] if self._active_download else []
)
data = {
"pending": [
item.model_dump(mode="json")
for item in self._pending_queue
],
"active": [
item.model_dump(mode="json")
for item in self._active_downloads.values()
item.model_dump(mode="json") for item in active_items
],
"failed": [
item.model_dump(mode="json")
@@ -242,13 +237,13 @@ class DownloadService:
episodes: List[EpisodeIdentifier],
priority: DownloadPriority = DownloadPriority.NORMAL,
) -> List[str]:
"""Add episodes to the download queue.
"""Add episodes to the download queue (FIFO order).
Args:
serie_id: Series identifier
serie_name: Series display name
episodes: List of episodes to download
priority: Queue priority level
priority: Queue priority level (ignored, kept for compatibility)
Returns:
List of created download item IDs
@@ -270,12 +265,8 @@ class DownloadService:
added_at=datetime.now(timezone.utc),
)
# Insert based on priority. High-priority downloads jump the
# line via appendleft so they execute before existing work;
# everything else is appended to preserve FIFO order.
self._add_to_pending_queue(
item, front=(priority == DownloadPriority.HIGH)
)
# Always append to end (FIFO order)
self._add_to_pending_queue(item, front=False)
created_ids.append(item.id)
@@ -285,7 +276,6 @@ class DownloadService:
serie=serie_name,
season=episode.season,
episode=episode.episode,
priority=priority.value,
)
self._save_queue()
@@ -324,12 +314,13 @@ class DownloadService:
try:
for item_id in item_ids:
# Check if item is currently downloading
if item_id in self._active_downloads:
item = self._active_downloads[item_id]
active = self._active_download
if active and active.id == item_id:
item = active
item.status = DownloadStatus.CANCELLED
item.completed_at = datetime.now(timezone.utc)
self._failed_items.append(item)
del self._active_downloads[item_id]
self._active_download = None
removed_ids.append(item_id)
logger.info("Cancelled active download", item_id=item_id)
continue
@@ -365,118 +356,81 @@ class DownloadService:
f"Failed to remove items: {str(e)}"
) from e
async def reorder_queue(self, item_id: str, new_position: int) -> bool:
"""Reorder an item in the pending queue.
Args:
item_id: Download item ID to reorder
new_position: New position in queue (0-based)
async def start_next_download(self) -> Optional[str]:
"""Manually start the next download from pending queue.
Returns:
True if reordering was successful
Item ID of started download, or None if queue is empty
Raises:
DownloadServiceError: If reordering fails
DownloadServiceError: If a download is already active
"""
try:
# Find and remove item - O(1) lookup using helper dict
item_to_move = self._pending_items_by_id.get(item_id)
if not item_to_move:
# Check if download already active
if self._active_download:
raise DownloadServiceError(
f"Item {item_id} not found in pending queue"
"A download is already in progress"
)
# Remove from current position
self._pending_queue.remove(item_to_move)
del self._pending_items_by_id[item_id]
# Check if queue is empty
if not self._pending_queue:
logger.info("No pending downloads to start")
return None
# Insert at new position
queue_list = list(self._pending_queue)
new_position = max(0, min(new_position, len(queue_list)))
queue_list.insert(new_position, item_to_move)
self._pending_queue = deque(queue_list)
# Re-add to helper dict
self._pending_items_by_id[item_id] = item_to_move
# Get first item from queue
item = self._pending_queue.popleft()
del self._pending_items_by_id[item.id]
self._save_queue()
# Mark queue as running
self._is_stopped = False
# Broadcast queue status update
queue_status = await self.get_queue_status()
await self._broadcast_update(
"queue_status",
{
"action": "queue_reordered",
"item_id": item_id,
"new_position": new_position,
"queue_status": queue_status.model_dump(mode="json"),
},
)
# Start download in background
asyncio.create_task(self._process_download(item))
logger.info(
"Queue item reordered",
item_id=item_id,
new_position=new_position
"Started download manually",
item_id=item.id,
serie=item.serie_name
)
return True
except Exception as e:
logger.error("Failed to reorder queue", error=str(e))
raise DownloadServiceError(
f"Failed to reorder: {str(e)}"
) from e
async def reorder_queue_bulk(self, item_order: List[str]) -> bool:
"""Reorder pending queue to match provided item order for the specified
item IDs. Any pending items not mentioned will be appended after the
ordered items preserving their relative order.
Args:
item_order: Desired ordering of item IDs for pending queue
Returns:
True if operation completed
"""
try:
# Map existing pending items by id
existing = {item.id: item for item in list(self._pending_queue)}
new_queue: List[DownloadItem] = []
# Add items in the requested order if present
for item_id in item_order:
item = existing.pop(item_id, None)
if item:
new_queue.append(item)
# Append any remaining items preserving original order
for item in list(self._pending_queue):
if item.id in existing:
new_queue.append(item)
existing.pop(item.id, None)
# Replace pending queue
self._pending_queue = deque(new_queue)
self._save_queue()
# Broadcast queue status update
queue_status = await self.get_queue_status()
await self._broadcast_update(
"queue_status",
"download_started",
{
"action": "queue_bulk_reordered",
"item_order": item_order,
"item_id": item.id,
"serie_name": item.serie_name,
"season": item.episode.season,
"episode": item.episode.episode,
"queue_status": queue_status.model_dump(mode="json"),
},
)
logger.info("Bulk queue reorder applied", ordered_count=len(item_order))
return True
return item.id
except Exception as e:
logger.error("Failed to apply bulk reorder", error=str(e))
raise DownloadServiceError(f"Failed to reorder: {str(e)}") from e
logger.error("Failed to start download", error=str(e))
raise DownloadServiceError(
f"Failed to start download: {str(e)}"
) from e
async def stop_downloads(self) -> None:
"""Stop processing new downloads from queue.
Current download will continue, but no new downloads will start.
"""
self._is_stopped = True
logger.info("Download processing stopped")
# Broadcast queue status update
queue_status = await self.get_queue_status()
await self._broadcast_update(
"queue_stopped",
{
"is_stopped": True,
"queue_status": queue_status.model_dump(mode="json"),
},
)
async def get_queue_status(self) -> QueueStatus:
"""Get current status of all queues.
@@ -484,10 +438,13 @@ class DownloadService:
Returns:
Complete queue status with all items
"""
active_downloads = (
[self._active_download] if self._active_download else []
)
return QueueStatus(
is_running=self._is_running,
is_paused=self._is_paused,
active_downloads=list(self._active_downloads.values()),
is_running=not self._is_stopped,
is_paused=False, # Kept for compatibility
active_downloads=active_downloads,
pending_queue=list(self._pending_queue),
completed_downloads=list(self._completed_items),
failed_downloads=list(self._failed_items),
@@ -499,7 +456,7 @@ class DownloadService:
Returns:
Statistics about the download queue
"""
active_count = len(self._active_downloads)
active_count = 1 if self._active_download else 0
pending_count = len(self._pending_queue)
completed_count = len(self._completed_items)
failed_count = len(self._failed_items)
@@ -532,36 +489,6 @@ class DownloadService:
estimated_time_remaining=eta_seconds,
)
async def pause_queue(self) -> None:
"""Pause download processing."""
self._is_paused = True
logger.info("Download queue paused")
# Broadcast queue status update
queue_status = await self.get_queue_status()
await self._broadcast_update(
"queue_paused",
{
"is_paused": True,
"queue_status": queue_status.model_dump(mode="json"),
},
)
async def resume_queue(self) -> None:
"""Resume download processing."""
self._is_paused = False
logger.info("Download queue resumed")
# Broadcast queue status update
queue_status = await self.get_queue_status()
await self._broadcast_update(
"queue_resumed",
{
"is_paused": False,
"queue_status": queue_status.model_dump(mode="json"),
},
)
async def clear_completed(self) -> int:
"""Clear completed downloads from history.
@@ -742,7 +669,7 @@ class DownloadService:
# Update status
item.status = DownloadStatus.DOWNLOADING
item.started_at = datetime.now(timezone.utc)
self._active_downloads[item.id] = item
self._active_download = item
logger.info(
"Starting download",
@@ -858,83 +785,31 @@ class DownloadService:
finally:
# Remove from active downloads
if item.id in self._active_downloads:
del self._active_downloads[item.id]
if self._active_download and self._active_download.id == item.id:
self._active_download = None
self._save_queue()
async def _queue_processor(self) -> None:
"""Main queue processing loop."""
logger.info("Queue processor started")
while not self._shutdown_event.is_set():
try:
# Wait if paused
if self._is_paused:
await asyncio.sleep(1)
continue
# Check if we can start more downloads
if len(self._active_downloads) >= self._max_concurrent:
await asyncio.sleep(1)
continue
# Get next item from queue
if not self._pending_queue:
await asyncio.sleep(1)
continue
item = self._pending_queue.popleft()
# Process download in background
asyncio.create_task(self._process_download(item))
except Exception as e:
logger.error("Queue processor error", error=str(e))
await asyncio.sleep(5)
logger.info("Queue processor stopped")
async def start(self) -> None:
"""Start the download queue processor."""
if self._is_running:
logger.warning("Queue processor already running")
return
"""Initialize the download queue service (compatibility method).
self._is_running = True
self._shutdown_event.clear()
# Start processor task
asyncio.create_task(self._queue_processor())
logger.info("Download queue service started")
# Broadcast queue started event
queue_status = await self.get_queue_status()
await self._broadcast_update(
"queue_started",
{
"is_running": True,
"queue_status": queue_status.model_dump(mode="json"),
},
)
Note: Downloads are started manually via start_next_download().
"""
logger.info("Download queue service initialized")
async def stop(self) -> None:
"""Stop the download queue processor."""
if not self._is_running:
return
"""Stop the download queue service and wait for active download.
Note: This waits for the current download to complete.
"""
logger.info("Stopping download queue service...")
self._is_running = False
self._shutdown_event.set()
# Wait for active downloads to complete (with timeout)
# Wait for active download to complete (with timeout)
timeout = 30 # seconds
start_time = asyncio.get_event_loop().time()
while (
self._active_downloads
self._active_download
and (asyncio.get_event_loop().time() - start_time) < timeout
):
await asyncio.sleep(1)
@@ -946,16 +821,6 @@ class DownloadService:
self._executor.shutdown(wait=True)
logger.info("Download queue service stopped")
# Broadcast queue stopped event
queue_status = await self.get_queue_status()
await self._broadcast_update(
"queue_stopped",
{
"is_running": False,
"queue_status": queue_status.model_dump(mode="json"),
},
)
# Singleton instance