download the queue

This commit is contained in:
2025-11-01 16:13:28 +01:00
parent eaf6bb9957
commit 33aeac0141
4 changed files with 176 additions and 108 deletions

View File

@@ -356,20 +356,24 @@ class DownloadService:
f"Failed to remove items: {str(e)}"
) from e
async def start_next_download(self) -> Optional[str]:
"""Manually start the next download from pending queue.
async def start_queue_processing(self) -> Optional[str]:
"""Start automatic queue processing of all pending downloads.
This will process all pending downloads one by one until the queue
is empty or stopped. The processing continues even if the browser
is closed.
Returns:
Item ID of started download, or None if queue is empty
Item ID of first started download, or None if queue is empty
Raises:
DownloadServiceError: If a download is already active
DownloadServiceError: If queue processing is already active
"""
try:
# Check if download already active
if self._active_download:
raise DownloadServiceError(
"A download is already in progress"
"Queue processing is already active"
)
# Check if queue is empty
@@ -377,42 +381,98 @@ class DownloadService:
logger.info("No pending downloads to start")
return None
# Get first item from queue
item = self._pending_queue.popleft()
del self._pending_items_by_id[item.id]
# Mark queue as running
self._is_stopped = False
# Start download in background
asyncio.create_task(self._process_download(item))
# Start queue processing in background
asyncio.create_task(self._process_queue())
logger.info(
"Started download manually",
item_id=item.id,
serie=item.serie_name
)
logger.info("Queue processing started")
# Broadcast queue status update
return "queue_started"
except Exception as e:
logger.error("Failed to start queue processing", error=str(e))
raise DownloadServiceError(
f"Failed to start queue processing: {str(e)}"
) from e
async def _process_queue(self) -> None:
"""Process all items in the queue sequentially.
This runs continuously until the queue is empty or stopped.
Each download is processed one at a time, and the next one starts
automatically after the previous one completes.
"""
logger.info("Queue processor started")
while not self._is_stopped and len(self._pending_queue) > 0:
try:
# Get next item from queue
item = self._pending_queue.popleft()
del self._pending_items_by_id[item.id]
logger.info(
"Processing next item from queue",
item_id=item.id,
serie=item.serie_name,
remaining=len(self._pending_queue)
)
# Broadcast queue status update
queue_status = await self.get_queue_status()
await self._broadcast_update(
"download_started",
{
"item_id": item.id,
"serie_name": item.serie_name,
"season": item.episode.season,
"episode": item.episode.episode,
"queue_status": queue_status.model_dump(mode="json"),
},
)
# Process the download (this will wait until complete)
await self._process_download(item)
# Small delay between downloads
await asyncio.sleep(1)
except Exception as e:
logger.error(
"Error in queue processing loop",
error=str(e),
exc_info=True
)
# Continue with next item even if one fails
await asyncio.sleep(2)
# Queue processing completed
self._is_stopped = True
if len(self._pending_queue) == 0:
logger.info("Queue processing completed - all items processed")
queue_status = await self.get_queue_status()
await self._broadcast_update(
"download_started",
"queue_completed",
{
"item_id": item.id,
"serie_name": item.serie_name,
"season": item.episode.season,
"episode": item.episode.episode,
"message": "All downloads completed",
"queue_status": queue_status.model_dump(mode="json"),
},
)
return item.id
except Exception as e:
logger.error("Failed to start download", error=str(e))
raise DownloadServiceError(
f"Failed to start download: {str(e)}"
) from e
else:
logger.info("Queue processing stopped by user")
async def start_next_download(self) -> Optional[str]:
"""Legacy method - redirects to start_queue_processing.
Returns:
Item ID of started download, or None if queue is empty
Raises:
DownloadServiceError: If a download is already active
"""
return await self.start_queue_processing()
async def stop_downloads(self) -> None:
"""Stop processing new downloads from queue.