better shutdown

This commit is contained in:
Lukas 2025-11-20 19:11:05 +01:00
parent 1ca105f330
commit 34019b7e65
2 changed files with 47 additions and 37 deletions

View File

@ -1,25 +1,5 @@
{
"pending": [
{
"id": "1c4ade94-0a9f-4b75-8ed7-ceb8995c6865",
"serie_id": "highschool-dxd",
"serie_folder": "Highschool DxD",
"serie_name": "Highschool DxD",
"episode": {
"season": 1,
"episode": 2,
"title": null
},
"status": "pending",
"priority": "NORMAL",
"added_at": "2025-11-20T17:12:34.485183Z",
"started_at": null,
"completed_at": null,
"progress": null,
"error": null,
"retry_count": 0,
"source_url": null
},
{
"id": "c4bb6196-7a58-4338-8747-100970fd3834",
"serie_id": "highschool-dxd",
@ -30,11 +10,11 @@
"episode": 3,
"title": null
},
"status": "pending",
"status": "cancelled",
"priority": "NORMAL",
"added_at": "2025-11-20T17:12:34.485225Z",
"started_at": null,
"completed_at": null,
"started_at": "2025-11-20T18:09:04.527701Z",
"completed_at": "2025-11-20T18:09:27.852314Z",
"progress": null,
"error": null,
"retry_count": 0,
@ -963,5 +943,5 @@
],
"active": [],
"failed": [],
"timestamp": "2025-11-20T17:54:12.722000+00:00"
"timestamp": "2025-11-20T18:09:27.852598+00:00"
}

View File

@ -77,10 +77,14 @@ class DownloadService:
# Control flags
self._is_stopped = True # Queue processing is stopped by default
self._is_shutting_down = False # Flag to indicate shutdown
# Executor for blocking operations
self._executor = ThreadPoolExecutor(max_workers=1)
# Track active download task for cancellation
self._active_download_task: Optional[asyncio.Task] = None
# Statistics tracking
self._total_downloaded_mb: float = 0.0
self._download_speeds: deque[float] = deque(maxlen=10)
@ -500,7 +504,11 @@ class DownloadService:
)
# Process the download (this will wait until complete)
await self._process_download(item)
self._active_download_task = asyncio.create_task(
self._process_download(item)
)
await self._active_download_task
self._active_download_task = None
# Small delay between downloads
await asyncio.sleep(1)
@ -771,6 +779,11 @@ class DownloadService:
item: Download item to process
"""
try:
# Check if shutting down
if self._is_shutting_down:
logger.info("Skipping download due to shutdown")
return
# Update status
item.status = DownloadStatus.DOWNLOADING
item.started_at = datetime.now(timezone.utc)
@ -814,6 +827,19 @@ class DownloadService:
else:
raise AnimeServiceError("Download returned False")
except asyncio.CancelledError:
# Handle task cancellation during shutdown
logger.info(
"Download cancelled during shutdown",
item_id=item.id,
)
item.status = DownloadStatus.CANCELLED
item.completed_at = datetime.now(timezone.utc)
# Return item to pending queue if not shutting down
if not self._is_shutting_down:
self._add_to_pending_queue(item, front=True)
raise # Re-raise to properly cancel the task
except Exception as e:
# Handle failure
item.status = DownloadStatus.FAILED
@ -845,27 +871,31 @@ class DownloadService:
logger.info("Download queue service initialized")
async def stop(self) -> None:
"""Stop the download queue service and wait for active download.
"""Stop the download queue service and cancel active downloads.
Note: This waits for the current download to complete.
Cancels any active download and shuts down the thread pool immediately.
"""
logger.info("Stopping download queue service...")
# Wait for active download to complete (with timeout)
timeout = 30 # seconds
start_time = asyncio.get_event_loop().time()
# Set shutdown flag
self._is_shutting_down = True
self._is_stopped = True
while (
self._active_download
and (asyncio.get_event_loop().time() - start_time) < timeout
):
await asyncio.sleep(1)
# Cancel active download task if running
if self._active_download_task and not self._active_download_task.done():
logger.info("Cancelling active download task...")
self._active_download_task.cancel()
try:
await self._active_download_task
except asyncio.CancelledError:
logger.info("Active download task cancelled")
# Save final state
self._save_queue()
# Shutdown executor
self._executor.shutdown(wait=True)
# Shutdown executor immediately, don't wait for tasks
logger.info("Shutting down thread pool executor...")
self._executor.shutdown(wait=False, cancel_futures=True)
logger.info("Download queue service stopped")