fix queue error

This commit is contained in:
Lukas 2025-12-10 20:55:09 +01:00
parent 798461a1ea
commit 99f79e4c29
6 changed files with 263 additions and 683 deletions

View File

@ -17,7 +17,8 @@
"keep_days": 30
},
"other": {
"master_password_hash": "$pbkdf2-sha256$29000$lvLeO.c8xzjHOAeAcM45Zw$NwtHXYLnbZE5oQwAJtlvcxLTZav3LjQhkYOhHiPXwWc"
"master_password_hash": "$pbkdf2-sha256$29000$Nyak1Np7j1Gq9V5rLUUoxQ$9/v2NQ9x2YcJ7N8aEgMVET24CO0ND3dWiGythcUgrJs",
"anime_directory": "/home/lukas/Volume/serien/"
},
"version": "1.0.0"
}

View File

@ -222,7 +222,7 @@ class AnimeService:
loop
)
except Exception as exc:
logger.error("Error handling scan status event", error=str(exc))
logger.error("Error handling scan status event: %s", exc)
@lru_cache(maxsize=128)
def _cached_list_missing(self) -> list[dict]:

View File

@ -120,6 +120,9 @@ class DownloadService:
"""Initialize the service by loading queue state from database.
Should be called after database is initialized during app startup.
Note: With the simplified model, status/priority/progress are now
managed in-memory only. The database stores the queue items
for persistence across restarts.
"""
if self._db_initialized:
return
@ -127,44 +130,22 @@ class DownloadService:
try:
repository = self._get_repository()
# Load pending items from database
pending_items = await repository.get_pending_items()
for item in pending_items:
# Reset status if was downloading when saved
if item.status == DownloadStatus.DOWNLOADING:
item.status = DownloadStatus.PENDING
await repository.update_status(
item.id, DownloadStatus.PENDING
)
# Load all items from database - they all start as PENDING
# since status is now managed in-memory only
all_items = await repository.get_all_items()
for item in all_items:
# All items from database are treated as pending
item.status = DownloadStatus.PENDING
self._add_to_pending_queue(item)
# Load failed items from database
failed_items = await repository.get_failed_items()
for item in failed_items:
if item.retry_count < self._max_retries:
item.status = DownloadStatus.PENDING
await repository.update_status(
item.id, DownloadStatus.PENDING
)
self._add_to_pending_queue(item)
else:
self._failed_items.append(item)
# Load completed items for history
completed_items = await repository.get_completed_items(limit=100)
for item in completed_items:
self._completed_items.append(item)
self._db_initialized = True
logger.info(
"Queue restored from database",
pending_count=len(self._pending_queue),
failed_count=len(self._failed_items),
completed_count=len(self._completed_items),
"Queue restored from database: pending_count=%d",
len(self._pending_queue),
)
except Exception as e:
logger.error("Failed to load queue from database", error=str(e))
logger.error("Failed to load queue from database: %s", e, exc_info=True)
# Continue without persistence - queue will work in memory only
self._db_initialized = True
@ -181,59 +162,28 @@ class DownloadService:
repository = self._get_repository()
return await repository.save_item(item)
except Exception as e:
logger.error("Failed to save item to database", error=str(e))
logger.error("Failed to save item to database: %s", e)
return item
async def _update_status_in_database(
async def _set_error_in_database(
self,
item_id: str,
status: DownloadStatus,
error: Optional[str] = None,
error: str,
) -> bool:
"""Update item status in the database.
"""Set error message on an item in the database.
Args:
item_id: Download item ID
status: New status
error: Optional error message
error: Error message
Returns:
True if update succeeded
"""
try:
repository = self._get_repository()
return await repository.update_status(item_id, status, error)
return await repository.set_error(item_id, error)
except Exception as e:
logger.error("Failed to update status in database", error=str(e))
return False
async def _update_progress_in_database(
self,
item_id: str,
progress: float,
downloaded: int,
total: Optional[int],
speed: Optional[float],
) -> bool:
"""Update download progress in the database.
Args:
item_id: Download item ID
progress: Progress percentage
downloaded: Downloaded bytes
total: Total bytes
speed: Download speed in bytes/sec
Returns:
True if update succeeded
"""
try:
repository = self._get_repository()
return await repository.update_progress(
item_id, progress, downloaded, total, speed
)
except Exception as e:
logger.error("Failed to update progress in database", error=str(e))
logger.error("Failed to set error in database: %s", e)
return False
async def _delete_from_database(self, item_id: str) -> bool:
@ -249,7 +199,7 @@ class DownloadService:
repository = self._get_repository()
return await repository.delete_item(item_id)
except Exception as e:
logger.error("Failed to delete from database", error=str(e))
logger.error("Failed to delete from database: %s", e)
return False
async def _init_queue_progress(self) -> None:
@ -271,7 +221,7 @@ class DownloadService:
)
self._queue_progress_initialized = True
except Exception as e:
logger.error("Failed to initialize queue progress", error=str(e))
logger.error("Failed to initialize queue progress: %s", e)
def _add_to_pending_queue(
self, item: DownloadItem, front: bool = False
@ -396,7 +346,7 @@ class DownloadService:
return created_ids
except Exception as e:
logger.error("Failed to add items to queue", error=str(e))
logger.error("Failed to add items to queue: %s", e)
raise DownloadServiceError(f"Failed to add items: {str(e)}") from e
async def remove_from_queue(self, item_ids: List[str]) -> List[str]:
@ -423,12 +373,10 @@ class DownloadService:
item.completed_at = datetime.now(timezone.utc)
self._failed_items.append(item)
self._active_download = None
# Update status in database
await self._update_status_in_database(
item_id, DownloadStatus.CANCELLED
)
# Delete cancelled item from database
await self._delete_from_database(item_id)
removed_ids.append(item_id)
logger.info("Cancelled active download", item_id=item_id)
logger.info("Cancelled active download: item_id=%s", item_id)
continue
# Check pending queue - O(1) lookup using helper dict
@ -460,7 +408,7 @@ class DownloadService:
return removed_ids
except Exception as e:
logger.error("Failed to remove items", error=str(e))
logger.error("Failed to remove items: %s", e)
raise DownloadServiceError(
f"Failed to remove items: {str(e)}"
) from e
@ -514,7 +462,7 @@ class DownloadService:
logger.info("Queue reordered", reordered_count=len(item_ids))
except Exception as e:
logger.error("Failed to reorder queue", error=str(e))
logger.error("Failed to reorder queue: %s", e)
raise DownloadServiceError(
f"Failed to reorder queue: {str(e)}"
) from e
@ -558,7 +506,7 @@ class DownloadService:
return "queue_started"
except Exception as e:
logger.error("Failed to start queue processing", error=str(e))
logger.error("Failed to start queue processing: %s", e)
raise DownloadServiceError(
f"Failed to start queue processing: {str(e)}"
) from e
@ -847,15 +795,12 @@ class DownloadService:
self._add_to_pending_queue(item)
retried_ids.append(item.id)
# Update status in database
await self._update_status_in_database(
item.id, DownloadStatus.PENDING
)
# Status is now managed in-memory only
logger.info(
"Retrying failed item",
item_id=item.id,
retry_count=item.retry_count
"Retrying failed item: item_id=%s, retry_count=%d",
item.id,
item.retry_count,
)
if retried_ids:
@ -875,7 +820,7 @@ class DownloadService:
return retried_ids
except Exception as e:
logger.error("Failed to retry items", error=str(e))
logger.error("Failed to retry items: %s", e)
raise DownloadServiceError(
f"Failed to retry: {str(e)}"
) from e
@ -892,21 +837,17 @@ class DownloadService:
logger.info("Skipping download due to shutdown")
return
# Update status in memory and database
# Update status in memory (status is now in-memory only)
item.status = DownloadStatus.DOWNLOADING
item.started_at = datetime.now(timezone.utc)
self._active_download = item
await self._update_status_in_database(
item.id, DownloadStatus.DOWNLOADING
)
logger.info(
"Starting download",
item_id=item.id,
serie_key=item.serie_id,
serie_name=item.serie_name,
season=item.episode.season,
episode=item.episode.episode,
"Starting download: item_id=%s, serie_key=%s, S%02dE%02d",
item.id,
item.serie_id,
item.episode.season,
item.episode.episode,
)
# Execute download via anime service
@ -941,13 +882,11 @@ class DownloadService:
self._completed_items.append(item)
# Update database
await self._update_status_in_database(
item.id, DownloadStatus.COMPLETED
)
# Delete completed item from database (status is in-memory)
await self._delete_from_database(item.id)
logger.info(
"Download completed successfully", item_id=item.id
"Download completed successfully: item_id=%s", item.id
)
else:
raise AnimeServiceError("Download returned False")
@ -955,20 +894,18 @@ class DownloadService:
except asyncio.CancelledError:
# Handle task cancellation during shutdown
logger.info(
"Download cancelled during shutdown",
item_id=item.id,
"Download cancelled during shutdown: item_id=%s",
item.id,
)
item.status = DownloadStatus.CANCELLED
item.completed_at = datetime.now(timezone.utc)
await self._update_status_in_database(
item.id, DownloadStatus.CANCELLED
)
# Delete cancelled item from database
await self._delete_from_database(item.id)
# Return item to pending queue if not shutting down
if not self._is_shutting_down:
self._add_to_pending_queue(item, front=True)
await self._update_status_in_database(
item.id, DownloadStatus.PENDING
)
# Re-save to database as pending
await self._save_to_database(item)
raise # Re-raise to properly cancel the task
except Exception as e:
@ -978,16 +915,14 @@ class DownloadService:
item.error = str(e)
self._failed_items.append(item)
# Update database with error
await self._update_status_in_database(
item.id, DownloadStatus.FAILED, str(e)
)
# Set error in database
await self._set_error_in_database(item.id, str(e))
logger.error(
"Download failed",
item_id=item.id,
error=str(e),
retry_count=item.retry_count,
"Download failed: item_id=%s, error=%s, retry_count=%d",
item.id,
str(e),
item.retry_count,
)
# Note: Failure is already broadcast by AnimeService
# via ProgressService when SeriesApp fires failed event

View File

@ -3,9 +3,9 @@
This module provides a repository adapter that wraps the DownloadQueueService
and provides the interface needed by DownloadService for queue persistence.
The repository pattern abstracts the database operations from the business logic,
allowing the DownloadService to work with domain models (DownloadItem) while
the repository handles conversion to/from database models (DownloadQueueItem).
The repository pattern abstracts the database operations from the business
logic, allowing the DownloadService to work with domain models (DownloadItem)
while the repository handles conversion to/from database models.
"""
from __future__ import annotations
@ -15,15 +15,15 @@ from typing import Callable, List, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from src.server.database.models import AnimeSeries
from src.server.database.models import DownloadPriority as DBDownloadPriority
from src.server.database.models import DownloadQueueItem as DBDownloadQueueItem
from src.server.database.models import DownloadStatus as DBDownloadStatus
from src.server.database.service import AnimeSeriesService, DownloadQueueService
from src.server.database.service import (
AnimeSeriesService,
DownloadQueueService,
EpisodeService,
)
from src.server.models.download import (
DownloadItem,
DownloadPriority,
DownloadProgress,
DownloadStatus,
EpisodeIdentifier,
)
@ -37,194 +37,110 @@ class QueueRepositoryError(Exception):
class QueueRepository:
"""Repository adapter for database-backed download queue operations.
Provides clean interface for queue operations while handling
model conversion between Pydantic (DownloadItem) and SQLAlchemy
(DownloadQueueItem) models.
Note: The database model (DownloadQueueItem) is simplified and only
stores episode_id as a foreign key. Status, priority, progress, and
retry_count are managed in-memory by the DownloadService.
Attributes:
_db_session_factory: Factory function to create database sessions
"""
def __init__(
self,
db_session_factory: Callable[[], AsyncSession],
) -> None:
"""Initialize the queue repository.
Args:
db_session_factory: Factory function that returns AsyncSession instances
db_session_factory: Factory function that returns AsyncSession
"""
self._db_session_factory = db_session_factory
logger.info("QueueRepository initialized")
# =========================================================================
# Model Conversion Methods
# =========================================================================
def _status_to_db(self, status: DownloadStatus) -> DBDownloadStatus:
"""Convert Pydantic DownloadStatus to SQLAlchemy DownloadStatus.
Args:
status: Pydantic status enum
Returns:
SQLAlchemy status enum
"""
return DBDownloadStatus(status.value)
def _status_from_db(self, status: DBDownloadStatus) -> DownloadStatus:
"""Convert SQLAlchemy DownloadStatus to Pydantic DownloadStatus.
Args:
status: SQLAlchemy status enum
Returns:
Pydantic status enum
"""
return DownloadStatus(status.value)
def _priority_to_db(self, priority: DownloadPriority) -> DBDownloadPriority:
"""Convert Pydantic DownloadPriority to SQLAlchemy DownloadPriority.
Args:
priority: Pydantic priority enum
Returns:
SQLAlchemy priority enum
"""
# Handle case differences (Pydantic uses uppercase, DB uses lowercase)
return DBDownloadPriority(priority.value.lower())
def _priority_from_db(self, priority: DBDownloadPriority) -> DownloadPriority:
"""Convert SQLAlchemy DownloadPriority to Pydantic DownloadPriority.
Args:
priority: SQLAlchemy priority enum
Returns:
Pydantic priority enum
"""
# Handle case differences (DB uses lowercase, Pydantic uses uppercase)
return DownloadPriority(priority.value.upper())
def _to_db_model(
self,
item: DownloadItem,
series_id: int,
) -> DBDownloadQueueItem:
"""Convert DownloadItem to database model.
Args:
item: Pydantic download item
series_id: Database series ID (foreign key)
Returns:
SQLAlchemy download queue item model
"""
return DBDownloadQueueItem(
series_id=series_id,
season=item.episode.season,
episode_number=item.episode.episode,
status=self._status_to_db(item.status),
priority=self._priority_to_db(item.priority),
progress_percent=item.progress.percent if item.progress else 0.0,
downloaded_bytes=int(
item.progress.downloaded_mb * 1024 * 1024
) if item.progress else 0,
total_bytes=int(
item.progress.total_mb * 1024 * 1024
) if item.progress and item.progress.total_mb else None,
download_speed=(
item.progress.speed_mbps * 1024 * 1024
) if item.progress and item.progress.speed_mbps else None,
error_message=item.error,
retry_count=item.retry_count,
download_url=str(item.source_url) if item.source_url else None,
started_at=item.started_at,
completed_at=item.completed_at,
)
def _from_db_model(
self,
db_item: DBDownloadQueueItem,
item_id: Optional[str] = None,
) -> DownloadItem:
"""Convert database model to DownloadItem.
Note: Since the database model is simplified, status, priority,
progress, and retry_count default to initial values.
Args:
db_item: SQLAlchemy download queue item
item_id: Optional override for item ID (uses db ID if not provided)
item_id: Optional override for item ID
Returns:
Pydantic download item
Pydantic download item with default status/priority
"""
# Build progress object if there's progress data
progress = None
if db_item.progress_percent > 0 or db_item.downloaded_bytes > 0:
progress = DownloadProgress(
percent=db_item.progress_percent,
downloaded_mb=db_item.downloaded_bytes / (1024 * 1024),
total_mb=(
db_item.total_bytes / (1024 * 1024)
if db_item.total_bytes else None
),
speed_mbps=(
db_item.download_speed / (1024 * 1024)
if db_item.download_speed else None
),
)
# Get episode info from the related Episode object
episode = db_item.episode
series = db_item.series
episode_identifier = EpisodeIdentifier(
season=episode.season if episode else 1,
episode=episode.episode_number if episode else 1,
title=episode.title if episode else None,
)
return DownloadItem(
id=item_id or str(db_item.id),
serie_id=db_item.series.key if db_item.series else "",
serie_folder=db_item.series.folder if db_item.series else "",
serie_name=db_item.series.name if db_item.series else "",
episode=EpisodeIdentifier(
season=db_item.season,
episode=db_item.episode_number,
),
status=self._status_from_db(db_item.status),
priority=self._priority_from_db(db_item.priority),
serie_id=series.key if series else "",
serie_folder=series.folder if series else "",
serie_name=series.name if series else "",
episode=episode_identifier,
status=DownloadStatus.PENDING, # Default - managed in-memory
priority=DownloadPriority.NORMAL, # Default - managed in-memory
added_at=db_item.created_at or datetime.now(timezone.utc),
started_at=db_item.started_at,
completed_at=db_item.completed_at,
progress=progress,
progress=None, # Managed in-memory
error=db_item.error_message,
retry_count=db_item.retry_count,
retry_count=0, # Managed in-memory
source_url=db_item.download_url,
)
# =========================================================================
# CRUD Operations
# =========================================================================
async def save_item(
self,
item: DownloadItem,
db: Optional[AsyncSession] = None,
) -> DownloadItem:
"""Save a download item to the database.
Creates a new record if the item doesn't exist in the database.
Note: Status, priority, progress, and retry_count are NOT persisted.
Args:
item: Download item to save
db: Optional existing database session
Returns:
Saved download item with database ID
Raises:
QueueRepositoryError: If save operation fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
# Find series by key
series = await AnimeSeriesService.get_by_key(session, item.serie_id)
if not series:
# Create series if it doesn't exist
series = await AnimeSeriesService.create(
@ -235,490 +151,272 @@ class QueueRepository:
folder=item.serie_folder,
)
logger.info(
"Created new series for queue item",
key=item.serie_id,
name=item.serie_name,
"Created new series for queue item: key=%s, name=%s",
item.serie_id,
item.serie_name,
)
# Find or create episode
episode = await EpisodeService.get_by_episode(
session,
series.id,
item.episode.season,
item.episode.episode,
)
if not episode:
# Create episode if it doesn't exist
episode = await EpisodeService.create(
db=session,
series_id=series.id,
season=item.episode.season,
episode_number=item.episode.episode,
title=item.episode.title,
)
logger.info(
"Created new episode for queue item: S%02dE%02d",
item.episode.season,
item.episode.episode,
)
# Create queue item
db_item = await DownloadQueueService.create(
db=session,
series_id=series.id,
season=item.episode.season,
episode_number=item.episode.episode,
priority=self._priority_to_db(item.priority),
episode_id=episode.id,
download_url=str(item.source_url) if item.source_url else None,
)
if manage_session:
await session.commit()
# Update the item ID with the database ID
item.id = str(db_item.id)
logger.debug(
"Saved queue item to database",
item_id=item.id,
serie_key=item.serie_id,
"Saved queue item to database: item_id=%s, serie_key=%s",
item.id,
item.serie_id,
)
return item
except Exception as e:
if manage_session:
await session.rollback()
logger.error("Failed to save queue item", error=str(e))
raise QueueRepositoryError(f"Failed to save item: {str(e)}") from e
logger.error("Failed to save queue item: %s", e)
raise QueueRepositoryError(f"Failed to save item: {e}") from e
finally:
if manage_session:
await session.close()
async def get_item(
self,
item_id: str,
db: Optional[AsyncSession] = None,
) -> Optional[DownloadItem]:
"""Get a download item by ID.
Args:
item_id: Download item ID (database ID as string)
db: Optional existing database session
Returns:
Download item or None if not found
Raises:
QueueRepositoryError: If query fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
db_item = await DownloadQueueService.get_by_id(
session, int(item_id)
)
if not db_item:
return None
return self._from_db_model(db_item, item_id)
except ValueError:
# Invalid ID format
return None
except Exception as e:
logger.error("Failed to get queue item", error=str(e))
raise QueueRepositoryError(f"Failed to get item: {str(e)}") from e
logger.error("Failed to get queue item: %s", e)
raise QueueRepositoryError(f"Failed to get item: {e}") from e
finally:
if manage_session:
await session.close()
async def get_pending_items(
async def get_all_items(
self,
limit: Optional[int] = None,
db: Optional[AsyncSession] = None,
) -> List[DownloadItem]:
"""Get pending download items ordered by priority.
"""Get all download items regardless of status.
Note: All items are returned with default status (PENDING) since
status is now managed in-memory by the DownloadService.
Args:
limit: Optional maximum number of items to return
db: Optional existing database session
Returns:
List of pending download items
List of all download items
Raises:
QueueRepositoryError: If query fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
db_items = await DownloadQueueService.get_pending(session, limit)
return [self._from_db_model(item) for item in db_items]
except Exception as e:
logger.error("Failed to get pending items", error=str(e))
raise QueueRepositoryError(
f"Failed to get pending items: {str(e)}"
) from e
finally:
if manage_session:
await session.close()
async def get_active_item(
self,
db: Optional[AsyncSession] = None,
) -> Optional[DownloadItem]:
"""Get the currently active (downloading) item.
Args:
db: Optional existing database session
Returns:
Active download item or None if none active
Raises:
QueueRepositoryError: If query fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
db_items = await DownloadQueueService.get_active(session)
if not db_items:
return None
# Return first active item (should only be one)
return self._from_db_model(db_items[0])
except Exception as e:
logger.error("Failed to get active item", error=str(e))
raise QueueRepositoryError(
f"Failed to get active item: {str(e)}"
) from e
finally:
if manage_session:
await session.close()
async def get_completed_items(
self,
limit: int = 100,
db: Optional[AsyncSession] = None,
) -> List[DownloadItem]:
"""Get completed download items.
Args:
limit: Maximum number of items to return
db: Optional existing database session
Returns:
List of completed download items
Raises:
QueueRepositoryError: If query fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
db_items = await DownloadQueueService.get_by_status(
session, DBDownloadStatus.COMPLETED, limit
db_items = await DownloadQueueService.get_all(
session, with_series=True
)
return [self._from_db_model(item) for item in db_items]
except Exception as e:
logger.error("Failed to get completed items", error=str(e))
raise QueueRepositoryError(
f"Failed to get completed items: {str(e)}"
) from e
logger.error("Failed to get all items: %s", e)
raise QueueRepositoryError(f"Failed to get all items: {e}") from e
finally:
if manage_session:
await session.close()
async def get_failed_items(
self,
limit: int = 50,
db: Optional[AsyncSession] = None,
) -> List[DownloadItem]:
"""Get failed download items.
Args:
limit: Maximum number of items to return
db: Optional existing database session
Returns:
List of failed download items
Raises:
QueueRepositoryError: If query fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
db_items = await DownloadQueueService.get_by_status(
session, DBDownloadStatus.FAILED, limit
)
return [self._from_db_model(item) for item in db_items]
except Exception as e:
logger.error("Failed to get failed items", error=str(e))
raise QueueRepositoryError(
f"Failed to get failed items: {str(e)}"
) from e
finally:
if manage_session:
await session.close()
async def update_status(
async def set_error(
self,
item_id: str,
status: DownloadStatus,
error: Optional[str] = None,
error: str,
db: Optional[AsyncSession] = None,
) -> bool:
"""Update the status of a download item.
"""Set error message on a download item.
Args:
item_id: Download item ID
status: New download status
error: Optional error message for failed status
error: Error message
db: Optional existing database session
Returns:
True if update succeeded, False if item not found
Raises:
QueueRepositoryError: If update fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
result = await DownloadQueueService.update_status(
result = await DownloadQueueService.set_error(
session,
int(item_id),
self._status_to_db(status),
error,
)
if manage_session:
await session.commit()
success = result is not None
if success:
logger.debug(
"Updated queue item status",
item_id=item_id,
status=status.value,
"Set error on queue item: item_id=%s",
item_id,
)
return success
except ValueError:
return False
except Exception as e:
if manage_session:
await session.rollback()
logger.error("Failed to update status", error=str(e))
raise QueueRepositoryError(
f"Failed to update status: {str(e)}"
) from e
logger.error("Failed to set error: %s", e)
raise QueueRepositoryError(f"Failed to set error: {e}") from e
finally:
if manage_session:
await session.close()
async def update_progress(
self,
item_id: str,
progress: float,
downloaded: int,
total: Optional[int],
speed: Optional[float],
db: Optional[AsyncSession] = None,
) -> bool:
"""Update download progress for an item.
Args:
item_id: Download item ID
progress: Progress percentage (0-100)
downloaded: Downloaded bytes
total: Total bytes (optional)
speed: Download speed in bytes/second (optional)
db: Optional existing database session
Returns:
True if update succeeded, False if item not found
Raises:
QueueRepositoryError: If update fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
result = await DownloadQueueService.update_progress(
session,
int(item_id),
progress,
downloaded,
total,
speed,
)
if manage_session:
await session.commit()
return result is not None
except ValueError:
return False
except Exception as e:
if manage_session:
await session.rollback()
logger.error("Failed to update progress", error=str(e))
raise QueueRepositoryError(
f"Failed to update progress: {str(e)}"
) from e
finally:
if manage_session:
await session.close()
async def delete_item(
self,
item_id: str,
db: Optional[AsyncSession] = None,
) -> bool:
"""Delete a download item from the database.
Args:
item_id: Download item ID
db: Optional existing database session
Returns:
True if item was deleted, False if not found
Raises:
QueueRepositoryError: If delete fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
result = await DownloadQueueService.delete(session, int(item_id))
if manage_session:
await session.commit()
if result:
logger.debug("Deleted queue item", item_id=item_id)
logger.debug("Deleted queue item: item_id=%s", item_id)
return result
except ValueError:
return False
except Exception as e:
if manage_session:
await session.rollback()
logger.error("Failed to delete item", error=str(e))
raise QueueRepositoryError(
f"Failed to delete item: {str(e)}"
) from e
logger.error("Failed to delete item: %s", e)
raise QueueRepositoryError(f"Failed to delete item: {e}") from e
finally:
if manage_session:
await session.close()
async def clear_completed(
async def clear_all(
self,
db: Optional[AsyncSession] = None,
) -> int:
"""Clear all completed download items.
"""Clear all download items from the queue.
Args:
db: Optional existing database session
Returns:
Number of items cleared
Raises:
QueueRepositoryError: If operation fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
count = await DownloadQueueService.clear_completed(session)
# Get all items first to count them
all_items = await DownloadQueueService.get_all(session)
count = len(all_items)
# Delete each item
for item in all_items:
await DownloadQueueService.delete(session, item.id)
if manage_session:
await session.commit()
logger.info("Cleared completed items from queue", count=count)
logger.info("Cleared all items from queue: count=%d", count)
return count
except Exception as e:
if manage_session:
await session.rollback()
logger.error("Failed to clear completed items", error=str(e))
raise QueueRepositoryError(
f"Failed to clear completed: {str(e)}"
) from e
finally:
if manage_session:
await session.close()
async def get_all_items(
self,
db: Optional[AsyncSession] = None,
) -> List[DownloadItem]:
"""Get all download items regardless of status.
Args:
db: Optional existing database session
Returns:
List of all download items
Raises:
QueueRepositoryError: If query fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
db_items = await DownloadQueueService.get_all(
session, with_series=True
)
return [self._from_db_model(item) for item in db_items]
except Exception as e:
logger.error("Failed to get all items", error=str(e))
raise QueueRepositoryError(
f"Failed to get all items: {str(e)}"
) from e
finally:
if manage_session:
await session.close()
async def retry_failed_items(
self,
max_retries: int = 3,
db: Optional[AsyncSession] = None,
) -> List[DownloadItem]:
"""Retry failed downloads that haven't exceeded max retries.
Args:
max_retries: Maximum number of retry attempts
db: Optional existing database session
Returns:
List of items marked for retry
Raises:
QueueRepositoryError: If operation fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
db_items = await DownloadQueueService.retry_failed(
session, max_retries
)
if manage_session:
await session.commit()
return [self._from_db_model(item) for item in db_items]
except Exception as e:
if manage_session:
await session.rollback()
logger.error("Failed to retry failed items", error=str(e))
raise QueueRepositoryError(
f"Failed to retry failed items: {str(e)}"
) from e
logger.error("Failed to clear queue: %s", e)
raise QueueRepositoryError(f"Failed to clear queue: {e}") from e
finally:
if manage_session:
await session.close()
@ -732,22 +430,31 @@ def get_queue_repository(
db_session_factory: Optional[Callable[[], AsyncSession]] = None,
) -> QueueRepository:
"""Get or create the QueueRepository singleton.
Args:
db_session_factory: Optional factory function for database sessions.
If not provided, uses default from connection module.
Returns:
QueueRepository singleton instance
"""
global _queue_repository_instance
if _queue_repository_instance is None:
if db_session_factory is None:
# Use default session factory
from src.server.database.connection import get_async_session_factory
db_session_factory = get_async_session_factory
_queue_repository_instance = QueueRepository(db_session_factory)
return _queue_repository_instance
def reset_queue_repository() -> None:
"""Reset the QueueRepository singleton.
Used for testing to ensure fresh state between tests.
"""
global _queue_repository_instance
_queue_repository_instance = None

View File

@ -415,7 +415,7 @@ class ScanService:
message="Initializing scan...",
)
except Exception as e:
logger.error("Failed to start progress tracking", error=str(e))
logger.error("Failed to start progress tracking: %s", e)
# Emit scan started event
await self._emit_scan_event({
@ -479,7 +479,7 @@ class ScanService:
folder=scan_progress.folder,
)
except Exception as e:
logger.debug("Progress update skipped", error=str(e))
logger.debug("Progress update skipped: %s", e)
# Emit progress event with key as primary identifier
await self._emit_scan_event({
@ -541,7 +541,7 @@ class ScanService:
error_message=completion_context.message,
)
except Exception as e:
logger.debug("Progress completion skipped", error=str(e))
logger.debug("Progress completion skipped: %s", e)
# Emit completion event
await self._emit_scan_event({
@ -598,7 +598,7 @@ class ScanService:
error_message="Scan cancelled by user",
)
except Exception as e:
logger.debug("Progress cancellation skipped", error=str(e))
logger.debug("Progress cancellation skipped: %s", e)
logger.info("Scan cancelled")
return True

View File

@ -25,8 +25,11 @@ from src.server.services.download_service import DownloadService, DownloadServic
class MockQueueRepository:
"""Mock implementation of QueueRepository for testing.
This provides an in-memory storage that mimics the database repository
behavior without requiring actual database connections.
This provides an in-memory storage that mimics the simplified database
repository behavior without requiring actual database connections.
Note: The repository is simplified - status, priority, progress are
now managed in-memory by DownloadService, not stored in database.
"""
def __init__(self):
@ -42,81 +45,19 @@ class MockQueueRepository:
"""Get item by ID from in-memory storage."""
return self._items.get(item_id)
async def get_pending_items(self) -> List[DownloadItem]:
"""Get all pending items."""
return [
item for item in self._items.values()
if item.status == DownloadStatus.PENDING
]
async def get_all_items(self) -> List[DownloadItem]:
"""Get all items in storage."""
return list(self._items.values())
async def get_active_item(self) -> Optional[DownloadItem]:
"""Get the currently active item."""
for item in self._items.values():
if item.status == DownloadStatus.DOWNLOADING:
return item
return None
async def get_completed_items(
self, limit: int = 100
) -> List[DownloadItem]:
"""Get completed items."""
completed = [
item for item in self._items.values()
if item.status == DownloadStatus.COMPLETED
]
return completed[:limit]
async def get_failed_items(self, limit: int = 50) -> List[DownloadItem]:
"""Get failed items."""
failed = [
item for item in self._items.values()
if item.status == DownloadStatus.FAILED
]
return failed[:limit]
async def update_status(
async def set_error(
self,
item_id: str,
status: DownloadStatus,
error: Optional[str] = None
error: str,
) -> bool:
"""Update item status."""
"""Set error message on an item."""
if item_id not in self._items:
return False
self._items[item_id].status = status
if error:
self._items[item_id].error = error
if status == DownloadStatus.COMPLETED:
self._items[item_id].completed_at = datetime.now(timezone.utc)
elif status == DownloadStatus.DOWNLOADING:
self._items[item_id].started_at = datetime.now(timezone.utc)
return True
async def update_progress(
self,
item_id: str,
progress: float,
downloaded: int,
total: int,
speed: float
) -> bool:
"""Update download progress."""
if item_id not in self._items:
return False
item = self._items[item_id]
if item.progress is None:
from src.server.models.download import DownloadProgress
item.progress = DownloadProgress(
percent=progress,
downloaded_bytes=downloaded,
total_bytes=total,
speed_bps=speed
)
else:
item.progress.percent = progress
item.progress.downloaded_bytes = downloaded
item.progress.total_bytes = total
item.progress.speed_bps = speed
self._items[item_id].error = error
return True
async def delete_item(self, item_id: str) -> bool:
@ -126,15 +67,11 @@ class MockQueueRepository:
return True
return False
async def clear_completed(self) -> int:
"""Clear all completed items."""
completed_ids = [
item_id for item_id, item in self._items.items()
if item.status == DownloadStatus.COMPLETED
]
for item_id in completed_ids:
del self._items[item_id]
return len(completed_ids)
async def clear_all(self) -> int:
"""Clear all items."""
count = len(self._items)
self._items.clear()
return count
@pytest.fixture
@ -505,9 +442,9 @@ class TestPersistence:
)
# Item should be saved in mock repository
pending_items = await mock_queue_repository.get_pending_items()
assert len(pending_items) == 1
assert pending_items[0].serie_id == "series-1"
all_items = await mock_queue_repository.get_all_items()
assert len(all_items) == 1
assert all_items[0].serie_id == "series-1"
@pytest.mark.asyncio
async def test_queue_recovery_after_restart(