Migrate download queue from JSON to SQLite database

- Created QueueRepository adapter in src/server/services/queue_repository.py
- Refactored DownloadService to use repository pattern instead of JSON
- Updated application startup to initialize download service from database
- Updated all test fixtures to use MockQueueRepository
- All 1104 tests passing
This commit is contained in:
2025-12-02 16:01:25 +01:00
parent 48daeba012
commit b0f3b643c7
18 changed files with 1393 additions and 330 deletions

View File

@@ -2,18 +2,19 @@
This module provides a simplified queue management system for handling
anime episode downloads with manual start/stop controls, progress tracking,
persistence, and retry functionality.
database persistence, and retry functionality.
The service uses SQLite database for persistent storage via QueueRepository
while maintaining an in-memory cache for performance.
"""
from __future__ import annotations
import asyncio
import json
import uuid
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional
import structlog
@@ -28,6 +29,9 @@ from src.server.models.download import (
from src.server.services.anime_service import AnimeService, AnimeServiceError
from src.server.services.progress_service import ProgressService, get_progress_service
if TYPE_CHECKING:
from src.server.services.queue_repository import QueueRepository
logger = structlog.get_logger(__name__)
@@ -42,7 +46,7 @@ class DownloadService:
- Manual download start/stop
- FIFO queue processing
- Real-time progress tracking
- Queue persistence and recovery
- Database persistence via QueueRepository
- Automatic retry logic
- WebSocket broadcast support
"""
@@ -50,24 +54,28 @@ class DownloadService:
def __init__(
self,
anime_service: AnimeService,
queue_repository: Optional["QueueRepository"] = None,
max_retries: int = 3,
persistence_path: str = "./data/download_queue.json",
progress_service: Optional[ProgressService] = None,
):
"""Initialize the download service.
Args:
anime_service: Service for anime operations
queue_repository: Optional repository for database persistence.
If not provided, will use default singleton.
max_retries: Maximum retry attempts for failed downloads
persistence_path: Path to persist queue state
progress_service: Optional progress service for tracking
"""
self._anime_service = anime_service
self._max_retries = max_retries
self._persistence_path = Path(persistence_path)
self._progress_service = progress_service or get_progress_service()
# Database repository for persistence
self._queue_repository = queue_repository
self._db_initialized = False
# Queue storage by status
# In-memory cache for performance (synced with database)
self._pending_queue: deque[DownloadItem] = deque()
# Helper dict for O(1) lookup of pending items by ID
self._pending_items_by_id: Dict[str, DownloadItem] = {}
@@ -92,14 +100,158 @@ class DownloadService:
# Track if queue progress has been initialized
self._queue_progress_initialized: bool = False
# Load persisted queue
self._load_queue()
logger.info(
"DownloadService initialized",
max_retries=max_retries,
)
def _get_repository(self) -> "QueueRepository":
"""Get the queue repository, initializing if needed.
Returns:
QueueRepository instance
"""
if self._queue_repository is None:
from src.server.services.queue_repository import get_queue_repository
self._queue_repository = get_queue_repository()
return self._queue_repository
async def initialize(self) -> None:
"""Initialize the service by loading queue state from database.
Should be called after database is initialized during app startup.
"""
if self._db_initialized:
return
try:
repository = self._get_repository()
# Load pending items from database
pending_items = await repository.get_pending_items()
for item in pending_items:
# Reset status if was downloading when saved
if item.status == DownloadStatus.DOWNLOADING:
item.status = DownloadStatus.PENDING
await repository.update_status(
item.id, DownloadStatus.PENDING
)
self._add_to_pending_queue(item)
# Load failed items from database
failed_items = await repository.get_failed_items()
for item in failed_items:
if item.retry_count < self._max_retries:
item.status = DownloadStatus.PENDING
await repository.update_status(
item.id, DownloadStatus.PENDING
)
self._add_to_pending_queue(item)
else:
self._failed_items.append(item)
# Load completed items for history
completed_items = await repository.get_completed_items(limit=100)
for item in completed_items:
self._completed_items.append(item)
self._db_initialized = True
logger.info(
"Queue restored from database",
pending_count=len(self._pending_queue),
failed_count=len(self._failed_items),
completed_count=len(self._completed_items),
)
except Exception as e:
logger.error("Failed to load queue from database", error=str(e))
# Continue without persistence - queue will work in memory only
self._db_initialized = True
async def _save_to_database(self, item: DownloadItem) -> DownloadItem:
"""Save or update an item in the database.
Args:
item: Download item to save
Returns:
Saved item with database ID
"""
try:
repository = self._get_repository()
return await repository.save_item(item)
except Exception as e:
logger.error("Failed to save item to database", error=str(e))
return item
async def _update_status_in_database(
self,
item_id: str,
status: DownloadStatus,
error: Optional[str] = None,
) -> bool:
"""Update item status in the database.
Args:
item_id: Download item ID
status: New status
error: Optional error message
Returns:
True if update succeeded
"""
try:
repository = self._get_repository()
return await repository.update_status(item_id, status, error)
except Exception as e:
logger.error("Failed to update status in database", error=str(e))
return False
async def _update_progress_in_database(
self,
item_id: str,
progress: float,
downloaded: int,
total: Optional[int],
speed: Optional[float],
) -> bool:
"""Update download progress in the database.
Args:
item_id: Download item ID
progress: Progress percentage
downloaded: Downloaded bytes
total: Total bytes
speed: Download speed in bytes/sec
Returns:
True if update succeeded
"""
try:
repository = self._get_repository()
return await repository.update_progress(
item_id, progress, downloaded, total, speed
)
except Exception as e:
logger.error("Failed to update progress in database", error=str(e))
return False
async def _delete_from_database(self, item_id: str) -> bool:
"""Delete an item from the database.
Args:
item_id: Download item ID
Returns:
True if delete succeeded
"""
try:
repository = self._get_repository()
return await repository.delete_item(item_id)
except Exception as e:
logger.error("Failed to delete from database", error=str(e))
return False
async def _init_queue_progress(self) -> None:
"""Initialize the download queue progress tracking.
@@ -165,69 +317,6 @@ class DownloadService:
"""Generate unique identifier for download items."""
return str(uuid.uuid4())
def _load_queue(self) -> None:
"""Load persisted queue from disk."""
try:
if self._persistence_path.exists():
with open(self._persistence_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Restore pending items
for item_dict in data.get("pending", []):
item = DownloadItem(**item_dict)
# Reset status if was downloading when saved
if item.status == DownloadStatus.DOWNLOADING:
item.status = DownloadStatus.PENDING
self._add_to_pending_queue(item)
# Restore failed items that can be retried
for item_dict in data.get("failed", []):
item = DownloadItem(**item_dict)
if item.retry_count < self._max_retries:
item.status = DownloadStatus.PENDING
self._add_to_pending_queue(item)
else:
self._failed_items.append(item)
logger.info(
"Queue restored from disk",
pending_count=len(self._pending_queue),
failed_count=len(self._failed_items),
)
except Exception as e:
logger.error("Failed to load persisted queue", error=str(e))
def _save_queue(self) -> None:
"""Persist current queue state to disk."""
try:
self._persistence_path.parent.mkdir(parents=True, exist_ok=True)
active_items = (
[self._active_download] if self._active_download else []
)
data = {
"pending": [
item.model_dump(mode="json")
for item in self._pending_queue
],
"active": [
item.model_dump(mode="json") for item in active_items
],
"failed": [
item.model_dump(mode="json")
for item in self._failed_items
],
"timestamp": datetime.now(timezone.utc).isoformat(),
}
with open(self._persistence_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
logger.debug("Queue persisted to disk")
except Exception as e:
logger.error("Failed to persist queue", error=str(e))
async def add_to_queue(
self,
serie_id: str,
@@ -274,22 +363,23 @@ class DownloadService:
added_at=datetime.now(timezone.utc),
)
# Always append to end (FIFO order)
self._add_to_pending_queue(item, front=False)
# Save to database first to get persistent ID
saved_item = await self._save_to_database(item)
created_ids.append(item.id)
# Add to in-memory cache
self._add_to_pending_queue(saved_item, front=False)
created_ids.append(saved_item.id)
logger.info(
"Item added to queue",
item_id=item.id,
item_id=saved_item.id,
serie_key=serie_id,
serie_name=serie_name,
season=episode.season,
episode=episode.episode,
)
self._save_queue()
# Notify via progress service
queue_status = await self.get_queue_status()
await self._progress_service.update_progress(
@@ -333,6 +423,10 @@ class DownloadService:
item.completed_at = datetime.now(timezone.utc)
self._failed_items.append(item)
self._active_download = None
# Update status in database
await self._update_status_in_database(
item_id, DownloadStatus.CANCELLED
)
removed_ids.append(item_id)
logger.info("Cancelled active download", item_id=item_id)
continue
@@ -342,13 +436,14 @@ class DownloadService:
item = self._pending_items_by_id[item_id]
self._pending_queue.remove(item)
del self._pending_items_by_id[item_id]
# Delete from database
await self._delete_from_database(item_id)
removed_ids.append(item_id)
logger.info(
"Removed from pending queue", item_id=item_id
)
if removed_ids:
self._save_queue()
# Notify via progress service
queue_status = await self.get_queue_status()
await self._progress_service.update_progress(
@@ -379,6 +474,10 @@ class DownloadService:
Raises:
DownloadServiceError: If reordering fails
Note:
Reordering is done in-memory only. Database priority is not
updated since the in-memory queue defines the actual order.
"""
try:
# Build new queue based on specified order
@@ -399,9 +498,6 @@ class DownloadService:
# Replace queue
self._pending_queue = new_queue
# Save updated queue
self._save_queue()
# Notify via progress service
queue_status = await self.get_queue_status()
await self._progress_service.update_progress(
@@ -692,13 +788,15 @@ class DownloadService:
Number of items cleared
"""
count = len(self._pending_queue)
# Delete all pending items from database
for item_id in list(self._pending_items_by_id.keys()):
await self._delete_from_database(item_id)
self._pending_queue.clear()
self._pending_items_by_id.clear()
logger.info("Cleared pending items", count=count)
# Save queue state
self._save_queue()
# Notify via progress service
if count > 0:
queue_status = await self.get_queue_status()
@@ -749,6 +847,11 @@ class DownloadService:
self._add_to_pending_queue(item)
retried_ids.append(item.id)
# Update status in database
await self._update_status_in_database(
item.id, DownloadStatus.PENDING
)
logger.info(
"Retrying failed item",
item_id=item.id,
@@ -756,7 +859,6 @@ class DownloadService:
)
if retried_ids:
self._save_queue()
# Notify via progress service
queue_status = await self.get_queue_status()
await self._progress_service.update_progress(
@@ -790,10 +892,13 @@ class DownloadService:
logger.info("Skipping download due to shutdown")
return
# Update status
# Update status in memory and database
item.status = DownloadStatus.DOWNLOADING
item.started_at = datetime.now(timezone.utc)
self._active_download = item
await self._update_status_in_database(
item.id, DownloadStatus.DOWNLOADING
)
logger.info(
"Starting download",
@@ -809,7 +914,8 @@ class DownloadService:
# - download started/progress/completed/failed events
# - All updates forwarded to ProgressService
# - ProgressService broadcasts to WebSocket clients
# Use serie_folder for filesystem operations and serie_id (key) for identification
# Use serie_folder for filesystem operations
# and serie_id (key) for identification
if not item.serie_folder:
raise DownloadServiceError(
f"Missing serie_folder for download item {item.id}. "
@@ -835,6 +941,11 @@ class DownloadService:
self._completed_items.append(item)
# Update database
await self._update_status_in_database(
item.id, DownloadStatus.COMPLETED
)
logger.info(
"Download completed successfully", item_id=item.id
)
@@ -849,9 +960,15 @@ class DownloadService:
)
item.status = DownloadStatus.CANCELLED
item.completed_at = datetime.now(timezone.utc)
await self._update_status_in_database(
item.id, DownloadStatus.CANCELLED
)
# Return item to pending queue if not shutting down
if not self._is_shutting_down:
self._add_to_pending_queue(item, front=True)
await self._update_status_in_database(
item.id, DownloadStatus.PENDING
)
raise # Re-raise to properly cancel the task
except Exception as e:
@@ -861,6 +978,11 @@ class DownloadService:
item.error = str(e)
self._failed_items.append(item)
# Update database with error
await self._update_status_in_database(
item.id, DownloadStatus.FAILED, str(e)
)
logger.error(
"Download failed",
item_id=item.id,
@@ -874,8 +996,6 @@ class DownloadService:
# Remove from active downloads
if self._active_download and self._active_download.id == item.id:
self._active_download = None
self._save_queue()
async def start(self) -> None:
"""Initialize the download queue service (compatibility method).
@@ -896,17 +1016,15 @@ class DownloadService:
self._is_stopped = True
# Cancel active download task if running
if self._active_download_task and not self._active_download_task.done():
active_task = self._active_download_task
if active_task and not active_task.done():
logger.info("Cancelling active download task...")
self._active_download_task.cancel()
active_task.cancel()
try:
await self._active_download_task
await active_task
except asyncio.CancelledError:
logger.info("Active download task cancelled")
# Save final state
self._save_queue()
# Shutdown executor immediately, don't wait for tasks
logger.info("Shutting down thread pool executor...")
self._executor.shutdown(wait=False, cancel_futures=True)