From 99f79e4c295391019e55ca666e60dbac3325299f Mon Sep 17 00:00:00 2001 From: Lukas Date: Wed, 10 Dec 2025 20:55:09 +0100 Subject: [PATCH] fix queue error --- data/config.json | 3 +- src/server/services/anime_service.py | 2 +- src/server/services/download_service.py | 173 ++----- src/server/services/queue_repository.py | 657 +++++++----------------- src/server/services/scan_service.py | 8 +- tests/unit/test_download_service.py | 103 +--- 6 files changed, 263 insertions(+), 683 deletions(-) diff --git a/data/config.json b/data/config.json index c507462..dd2e09d 100644 --- a/data/config.json +++ b/data/config.json @@ -17,7 +17,8 @@ "keep_days": 30 }, "other": { - "master_password_hash": "$pbkdf2-sha256$29000$lvLeO.c8xzjHOAeAcM45Zw$NwtHXYLnbZE5oQwAJtlvcxLTZav3LjQhkYOhHiPXwWc" + "master_password_hash": "$pbkdf2-sha256$29000$Nyak1Np7j1Gq9V5rLUUoxQ$9/v2NQ9x2YcJ7N8aEgMVET24CO0ND3dWiGythcUgrJs", + "anime_directory": "/home/lukas/Volume/serien/" }, "version": "1.0.0" } \ No newline at end of file diff --git a/src/server/services/anime_service.py b/src/server/services/anime_service.py index 94c8917..6156493 100644 --- a/src/server/services/anime_service.py +++ b/src/server/services/anime_service.py @@ -222,7 +222,7 @@ class AnimeService: loop ) except Exception as exc: - logger.error("Error handling scan status event", error=str(exc)) + logger.error("Error handling scan status event: %s", exc) @lru_cache(maxsize=128) def _cached_list_missing(self) -> list[dict]: diff --git a/src/server/services/download_service.py b/src/server/services/download_service.py index e54c4db..c4d6c73 100644 --- a/src/server/services/download_service.py +++ b/src/server/services/download_service.py @@ -120,6 +120,9 @@ class DownloadService: """Initialize the service by loading queue state from database. Should be called after database is initialized during app startup. + Note: With the simplified model, status/priority/progress are now + managed in-memory only. The database stores the queue items + for persistence across restarts. """ if self._db_initialized: return @@ -127,44 +130,22 @@ class DownloadService: try: repository = self._get_repository() - # Load pending items from database - pending_items = await repository.get_pending_items() - for item in pending_items: - # Reset status if was downloading when saved - if item.status == DownloadStatus.DOWNLOADING: - item.status = DownloadStatus.PENDING - await repository.update_status( - item.id, DownloadStatus.PENDING - ) + # Load all items from database - they all start as PENDING + # since status is now managed in-memory only + all_items = await repository.get_all_items() + for item in all_items: + # All items from database are treated as pending + item.status = DownloadStatus.PENDING self._add_to_pending_queue(item) - # Load failed items from database - failed_items = await repository.get_failed_items() - for item in failed_items: - if item.retry_count < self._max_retries: - item.status = DownloadStatus.PENDING - await repository.update_status( - item.id, DownloadStatus.PENDING - ) - self._add_to_pending_queue(item) - else: - self._failed_items.append(item) - - # Load completed items for history - completed_items = await repository.get_completed_items(limit=100) - for item in completed_items: - self._completed_items.append(item) - self._db_initialized = True logger.info( - "Queue restored from database", - pending_count=len(self._pending_queue), - failed_count=len(self._failed_items), - completed_count=len(self._completed_items), + "Queue restored from database: pending_count=%d", + len(self._pending_queue), ) except Exception as e: - logger.error("Failed to load queue from database", error=str(e)) + logger.error("Failed to load queue from database: %s", e, exc_info=True) # Continue without persistence - queue will work in memory only self._db_initialized = True @@ -181,59 +162,28 @@ class DownloadService: repository = self._get_repository() return await repository.save_item(item) except Exception as e: - logger.error("Failed to save item to database", error=str(e)) + logger.error("Failed to save item to database: %s", e) return item - async def _update_status_in_database( + async def _set_error_in_database( self, item_id: str, - status: DownloadStatus, - error: Optional[str] = None, + error: str, ) -> bool: - """Update item status in the database. + """Set error message on an item in the database. Args: item_id: Download item ID - status: New status - error: Optional error message + error: Error message Returns: True if update succeeded """ try: repository = self._get_repository() - return await repository.update_status(item_id, status, error) + return await repository.set_error(item_id, error) except Exception as e: - logger.error("Failed to update status in database", error=str(e)) - return False - - async def _update_progress_in_database( - self, - item_id: str, - progress: float, - downloaded: int, - total: Optional[int], - speed: Optional[float], - ) -> bool: - """Update download progress in the database. - - Args: - item_id: Download item ID - progress: Progress percentage - downloaded: Downloaded bytes - total: Total bytes - speed: Download speed in bytes/sec - - Returns: - True if update succeeded - """ - try: - repository = self._get_repository() - return await repository.update_progress( - item_id, progress, downloaded, total, speed - ) - except Exception as e: - logger.error("Failed to update progress in database", error=str(e)) + logger.error("Failed to set error in database: %s", e) return False async def _delete_from_database(self, item_id: str) -> bool: @@ -249,7 +199,7 @@ class DownloadService: repository = self._get_repository() return await repository.delete_item(item_id) except Exception as e: - logger.error("Failed to delete from database", error=str(e)) + logger.error("Failed to delete from database: %s", e) return False async def _init_queue_progress(self) -> None: @@ -271,7 +221,7 @@ class DownloadService: ) self._queue_progress_initialized = True except Exception as e: - logger.error("Failed to initialize queue progress", error=str(e)) + logger.error("Failed to initialize queue progress: %s", e) def _add_to_pending_queue( self, item: DownloadItem, front: bool = False @@ -396,7 +346,7 @@ class DownloadService: return created_ids except Exception as e: - logger.error("Failed to add items to queue", error=str(e)) + logger.error("Failed to add items to queue: %s", e) raise DownloadServiceError(f"Failed to add items: {str(e)}") from e async def remove_from_queue(self, item_ids: List[str]) -> List[str]: @@ -423,12 +373,10 @@ class DownloadService: item.completed_at = datetime.now(timezone.utc) self._failed_items.append(item) self._active_download = None - # Update status in database - await self._update_status_in_database( - item_id, DownloadStatus.CANCELLED - ) + # Delete cancelled item from database + await self._delete_from_database(item_id) removed_ids.append(item_id) - logger.info("Cancelled active download", item_id=item_id) + logger.info("Cancelled active download: item_id=%s", item_id) continue # Check pending queue - O(1) lookup using helper dict @@ -460,7 +408,7 @@ class DownloadService: return removed_ids except Exception as e: - logger.error("Failed to remove items", error=str(e)) + logger.error("Failed to remove items: %s", e) raise DownloadServiceError( f"Failed to remove items: {str(e)}" ) from e @@ -514,7 +462,7 @@ class DownloadService: logger.info("Queue reordered", reordered_count=len(item_ids)) except Exception as e: - logger.error("Failed to reorder queue", error=str(e)) + logger.error("Failed to reorder queue: %s", e) raise DownloadServiceError( f"Failed to reorder queue: {str(e)}" ) from e @@ -558,7 +506,7 @@ class DownloadService: return "queue_started" except Exception as e: - logger.error("Failed to start queue processing", error=str(e)) + logger.error("Failed to start queue processing: %s", e) raise DownloadServiceError( f"Failed to start queue processing: {str(e)}" ) from e @@ -847,15 +795,12 @@ class DownloadService: self._add_to_pending_queue(item) retried_ids.append(item.id) - # Update status in database - await self._update_status_in_database( - item.id, DownloadStatus.PENDING - ) + # Status is now managed in-memory only logger.info( - "Retrying failed item", - item_id=item.id, - retry_count=item.retry_count + "Retrying failed item: item_id=%s, retry_count=%d", + item.id, + item.retry_count, ) if retried_ids: @@ -875,7 +820,7 @@ class DownloadService: return retried_ids except Exception as e: - logger.error("Failed to retry items", error=str(e)) + logger.error("Failed to retry items: %s", e) raise DownloadServiceError( f"Failed to retry: {str(e)}" ) from e @@ -892,21 +837,17 @@ class DownloadService: logger.info("Skipping download due to shutdown") return - # Update status in memory and database + # Update status in memory (status is now in-memory only) item.status = DownloadStatus.DOWNLOADING item.started_at = datetime.now(timezone.utc) self._active_download = item - await self._update_status_in_database( - item.id, DownloadStatus.DOWNLOADING - ) logger.info( - "Starting download", - item_id=item.id, - serie_key=item.serie_id, - serie_name=item.serie_name, - season=item.episode.season, - episode=item.episode.episode, + "Starting download: item_id=%s, serie_key=%s, S%02dE%02d", + item.id, + item.serie_id, + item.episode.season, + item.episode.episode, ) # Execute download via anime service @@ -941,13 +882,11 @@ class DownloadService: self._completed_items.append(item) - # Update database - await self._update_status_in_database( - item.id, DownloadStatus.COMPLETED - ) + # Delete completed item from database (status is in-memory) + await self._delete_from_database(item.id) logger.info( - "Download completed successfully", item_id=item.id + "Download completed successfully: item_id=%s", item.id ) else: raise AnimeServiceError("Download returned False") @@ -955,20 +894,18 @@ class DownloadService: except asyncio.CancelledError: # Handle task cancellation during shutdown logger.info( - "Download cancelled during shutdown", - item_id=item.id, + "Download cancelled during shutdown: item_id=%s", + item.id, ) item.status = DownloadStatus.CANCELLED item.completed_at = datetime.now(timezone.utc) - await self._update_status_in_database( - item.id, DownloadStatus.CANCELLED - ) + # Delete cancelled item from database + await self._delete_from_database(item.id) # Return item to pending queue if not shutting down if not self._is_shutting_down: self._add_to_pending_queue(item, front=True) - await self._update_status_in_database( - item.id, DownloadStatus.PENDING - ) + # Re-save to database as pending + await self._save_to_database(item) raise # Re-raise to properly cancel the task except Exception as e: @@ -978,16 +915,14 @@ class DownloadService: item.error = str(e) self._failed_items.append(item) - # Update database with error - await self._update_status_in_database( - item.id, DownloadStatus.FAILED, str(e) - ) + # Set error in database + await self._set_error_in_database(item.id, str(e)) logger.error( - "Download failed", - item_id=item.id, - error=str(e), - retry_count=item.retry_count, + "Download failed: item_id=%s, error=%s, retry_count=%d", + item.id, + str(e), + item.retry_count, ) # Note: Failure is already broadcast by AnimeService # via ProgressService when SeriesApp fires failed event diff --git a/src/server/services/queue_repository.py b/src/server/services/queue_repository.py index 2fe1fe8..80f3070 100644 --- a/src/server/services/queue_repository.py +++ b/src/server/services/queue_repository.py @@ -3,9 +3,9 @@ This module provides a repository adapter that wraps the DownloadQueueService and provides the interface needed by DownloadService for queue persistence. -The repository pattern abstracts the database operations from the business logic, -allowing the DownloadService to work with domain models (DownloadItem) while -the repository handles conversion to/from database models (DownloadQueueItem). +The repository pattern abstracts the database operations from the business +logic, allowing the DownloadService to work with domain models (DownloadItem) +while the repository handles conversion to/from database models. """ from __future__ import annotations @@ -15,15 +15,15 @@ from typing import Callable, List, Optional from sqlalchemy.ext.asyncio import AsyncSession -from src.server.database.models import AnimeSeries -from src.server.database.models import DownloadPriority as DBDownloadPriority from src.server.database.models import DownloadQueueItem as DBDownloadQueueItem -from src.server.database.models import DownloadStatus as DBDownloadStatus -from src.server.database.service import AnimeSeriesService, DownloadQueueService +from src.server.database.service import ( + AnimeSeriesService, + DownloadQueueService, + EpisodeService, +) from src.server.models.download import ( DownloadItem, DownloadPriority, - DownloadProgress, DownloadStatus, EpisodeIdentifier, ) @@ -37,194 +37,110 @@ class QueueRepositoryError(Exception): class QueueRepository: """Repository adapter for database-backed download queue operations. - + Provides clean interface for queue operations while handling model conversion between Pydantic (DownloadItem) and SQLAlchemy (DownloadQueueItem) models. - + + Note: The database model (DownloadQueueItem) is simplified and only + stores episode_id as a foreign key. Status, priority, progress, and + retry_count are managed in-memory by the DownloadService. + Attributes: _db_session_factory: Factory function to create database sessions """ - + def __init__( self, db_session_factory: Callable[[], AsyncSession], ) -> None: """Initialize the queue repository. - + Args: - db_session_factory: Factory function that returns AsyncSession instances + db_session_factory: Factory function that returns AsyncSession """ self._db_session_factory = db_session_factory logger.info("QueueRepository initialized") - + # ========================================================================= # Model Conversion Methods # ========================================================================= - - def _status_to_db(self, status: DownloadStatus) -> DBDownloadStatus: - """Convert Pydantic DownloadStatus to SQLAlchemy DownloadStatus. - - Args: - status: Pydantic status enum - - Returns: - SQLAlchemy status enum - """ - return DBDownloadStatus(status.value) - - def _status_from_db(self, status: DBDownloadStatus) -> DownloadStatus: - """Convert SQLAlchemy DownloadStatus to Pydantic DownloadStatus. - - Args: - status: SQLAlchemy status enum - - Returns: - Pydantic status enum - """ - return DownloadStatus(status.value) - - def _priority_to_db(self, priority: DownloadPriority) -> DBDownloadPriority: - """Convert Pydantic DownloadPriority to SQLAlchemy DownloadPriority. - - Args: - priority: Pydantic priority enum - - Returns: - SQLAlchemy priority enum - """ - # Handle case differences (Pydantic uses uppercase, DB uses lowercase) - return DBDownloadPriority(priority.value.lower()) - - def _priority_from_db(self, priority: DBDownloadPriority) -> DownloadPriority: - """Convert SQLAlchemy DownloadPriority to Pydantic DownloadPriority. - - Args: - priority: SQLAlchemy priority enum - - Returns: - Pydantic priority enum - """ - # Handle case differences (DB uses lowercase, Pydantic uses uppercase) - return DownloadPriority(priority.value.upper()) - - def _to_db_model( - self, - item: DownloadItem, - series_id: int, - ) -> DBDownloadQueueItem: - """Convert DownloadItem to database model. - - Args: - item: Pydantic download item - series_id: Database series ID (foreign key) - - Returns: - SQLAlchemy download queue item model - """ - return DBDownloadQueueItem( - series_id=series_id, - season=item.episode.season, - episode_number=item.episode.episode, - status=self._status_to_db(item.status), - priority=self._priority_to_db(item.priority), - progress_percent=item.progress.percent if item.progress else 0.0, - downloaded_bytes=int( - item.progress.downloaded_mb * 1024 * 1024 - ) if item.progress else 0, - total_bytes=int( - item.progress.total_mb * 1024 * 1024 - ) if item.progress and item.progress.total_mb else None, - download_speed=( - item.progress.speed_mbps * 1024 * 1024 - ) if item.progress and item.progress.speed_mbps else None, - error_message=item.error, - retry_count=item.retry_count, - download_url=str(item.source_url) if item.source_url else None, - started_at=item.started_at, - completed_at=item.completed_at, - ) - + def _from_db_model( self, db_item: DBDownloadQueueItem, item_id: Optional[str] = None, ) -> DownloadItem: """Convert database model to DownloadItem. - + + Note: Since the database model is simplified, status, priority, + progress, and retry_count default to initial values. + Args: db_item: SQLAlchemy download queue item - item_id: Optional override for item ID (uses db ID if not provided) - + item_id: Optional override for item ID + Returns: - Pydantic download item + Pydantic download item with default status/priority """ - # Build progress object if there's progress data - progress = None - if db_item.progress_percent > 0 or db_item.downloaded_bytes > 0: - progress = DownloadProgress( - percent=db_item.progress_percent, - downloaded_mb=db_item.downloaded_bytes / (1024 * 1024), - total_mb=( - db_item.total_bytes / (1024 * 1024) - if db_item.total_bytes else None - ), - speed_mbps=( - db_item.download_speed / (1024 * 1024) - if db_item.download_speed else None - ), - ) - + # Get episode info from the related Episode object + episode = db_item.episode + series = db_item.series + + episode_identifier = EpisodeIdentifier( + season=episode.season if episode else 1, + episode=episode.episode_number if episode else 1, + title=episode.title if episode else None, + ) + return DownloadItem( id=item_id or str(db_item.id), - serie_id=db_item.series.key if db_item.series else "", - serie_folder=db_item.series.folder if db_item.series else "", - serie_name=db_item.series.name if db_item.series else "", - episode=EpisodeIdentifier( - season=db_item.season, - episode=db_item.episode_number, - ), - status=self._status_from_db(db_item.status), - priority=self._priority_from_db(db_item.priority), + serie_id=series.key if series else "", + serie_folder=series.folder if series else "", + serie_name=series.name if series else "", + episode=episode_identifier, + status=DownloadStatus.PENDING, # Default - managed in-memory + priority=DownloadPriority.NORMAL, # Default - managed in-memory added_at=db_item.created_at or datetime.now(timezone.utc), started_at=db_item.started_at, completed_at=db_item.completed_at, - progress=progress, + progress=None, # Managed in-memory error=db_item.error_message, - retry_count=db_item.retry_count, + retry_count=0, # Managed in-memory source_url=db_item.download_url, ) - + # ========================================================================= # CRUD Operations # ========================================================================= - + async def save_item( self, item: DownloadItem, db: Optional[AsyncSession] = None, ) -> DownloadItem: """Save a download item to the database. - + Creates a new record if the item doesn't exist in the database. - + Note: Status, priority, progress, and retry_count are NOT persisted. + Args: item: Download item to save db: Optional existing database session - + Returns: Saved download item with database ID - + Raises: QueueRepositoryError: If save operation fails """ session = db or self._db_session_factory() manage_session = db is None - + try: # Find series by key series = await AnimeSeriesService.get_by_key(session, item.serie_id) - + if not series: # Create series if it doesn't exist series = await AnimeSeriesService.create( @@ -235,490 +151,272 @@ class QueueRepository: folder=item.serie_folder, ) logger.info( - "Created new series for queue item", - key=item.serie_id, - name=item.serie_name, + "Created new series for queue item: key=%s, name=%s", + item.serie_id, + item.serie_name, ) - + + # Find or create episode + episode = await EpisodeService.get_by_episode( + session, + series.id, + item.episode.season, + item.episode.episode, + ) + + if not episode: + # Create episode if it doesn't exist + episode = await EpisodeService.create( + db=session, + series_id=series.id, + season=item.episode.season, + episode_number=item.episode.episode, + title=item.episode.title, + ) + logger.info( + "Created new episode for queue item: S%02dE%02d", + item.episode.season, + item.episode.episode, + ) + # Create queue item db_item = await DownloadQueueService.create( db=session, series_id=series.id, - season=item.episode.season, - episode_number=item.episode.episode, - priority=self._priority_to_db(item.priority), + episode_id=episode.id, download_url=str(item.source_url) if item.source_url else None, ) - + if manage_session: await session.commit() - + # Update the item ID with the database ID item.id = str(db_item.id) - + logger.debug( - "Saved queue item to database", - item_id=item.id, - serie_key=item.serie_id, + "Saved queue item to database: item_id=%s, serie_key=%s", + item.id, + item.serie_id, ) - + return item - + except Exception as e: if manage_session: await session.rollback() - logger.error("Failed to save queue item", error=str(e)) - raise QueueRepositoryError(f"Failed to save item: {str(e)}") from e + logger.error("Failed to save queue item: %s", e) + raise QueueRepositoryError(f"Failed to save item: {e}") from e finally: if manage_session: await session.close() - + async def get_item( self, item_id: str, db: Optional[AsyncSession] = None, ) -> Optional[DownloadItem]: """Get a download item by ID. - + Args: item_id: Download item ID (database ID as string) db: Optional existing database session - + Returns: Download item or None if not found - + Raises: QueueRepositoryError: If query fails """ session = db or self._db_session_factory() manage_session = db is None - + try: db_item = await DownloadQueueService.get_by_id( session, int(item_id) ) - + if not db_item: return None - + return self._from_db_model(db_item, item_id) - + except ValueError: # Invalid ID format return None except Exception as e: - logger.error("Failed to get queue item", error=str(e)) - raise QueueRepositoryError(f"Failed to get item: {str(e)}") from e + logger.error("Failed to get queue item: %s", e) + raise QueueRepositoryError(f"Failed to get item: {e}") from e finally: if manage_session: await session.close() - - async def get_pending_items( + + async def get_all_items( self, - limit: Optional[int] = None, db: Optional[AsyncSession] = None, ) -> List[DownloadItem]: - """Get pending download items ordered by priority. - + """Get all download items regardless of status. + + Note: All items are returned with default status (PENDING) since + status is now managed in-memory by the DownloadService. + Args: - limit: Optional maximum number of items to return db: Optional existing database session - + Returns: - List of pending download items - + List of all download items + Raises: QueueRepositoryError: If query fails """ session = db or self._db_session_factory() manage_session = db is None - + try: - db_items = await DownloadQueueService.get_pending(session, limit) - return [self._from_db_model(item) for item in db_items] - - except Exception as e: - logger.error("Failed to get pending items", error=str(e)) - raise QueueRepositoryError( - f"Failed to get pending items: {str(e)}" - ) from e - finally: - if manage_session: - await session.close() - - async def get_active_item( - self, - db: Optional[AsyncSession] = None, - ) -> Optional[DownloadItem]: - """Get the currently active (downloading) item. - - Args: - db: Optional existing database session - - Returns: - Active download item or None if none active - - Raises: - QueueRepositoryError: If query fails - """ - session = db or self._db_session_factory() - manage_session = db is None - - try: - db_items = await DownloadQueueService.get_active(session) - - if not db_items: - return None - - # Return first active item (should only be one) - return self._from_db_model(db_items[0]) - - except Exception as e: - logger.error("Failed to get active item", error=str(e)) - raise QueueRepositoryError( - f"Failed to get active item: {str(e)}" - ) from e - finally: - if manage_session: - await session.close() - - async def get_completed_items( - self, - limit: int = 100, - db: Optional[AsyncSession] = None, - ) -> List[DownloadItem]: - """Get completed download items. - - Args: - limit: Maximum number of items to return - db: Optional existing database session - - Returns: - List of completed download items - - Raises: - QueueRepositoryError: If query fails - """ - session = db or self._db_session_factory() - manage_session = db is None - - try: - db_items = await DownloadQueueService.get_by_status( - session, DBDownloadStatus.COMPLETED, limit + db_items = await DownloadQueueService.get_all( + session, with_series=True ) return [self._from_db_model(item) for item in db_items] - + except Exception as e: - logger.error("Failed to get completed items", error=str(e)) - raise QueueRepositoryError( - f"Failed to get completed items: {str(e)}" - ) from e + logger.error("Failed to get all items: %s", e) + raise QueueRepositoryError(f"Failed to get all items: {e}") from e finally: if manage_session: await session.close() - - async def get_failed_items( - self, - limit: int = 50, - db: Optional[AsyncSession] = None, - ) -> List[DownloadItem]: - """Get failed download items. - - Args: - limit: Maximum number of items to return - db: Optional existing database session - - Returns: - List of failed download items - - Raises: - QueueRepositoryError: If query fails - """ - session = db or self._db_session_factory() - manage_session = db is None - - try: - db_items = await DownloadQueueService.get_by_status( - session, DBDownloadStatus.FAILED, limit - ) - return [self._from_db_model(item) for item in db_items] - - except Exception as e: - logger.error("Failed to get failed items", error=str(e)) - raise QueueRepositoryError( - f"Failed to get failed items: {str(e)}" - ) from e - finally: - if manage_session: - await session.close() - - async def update_status( + + async def set_error( self, item_id: str, - status: DownloadStatus, - error: Optional[str] = None, + error: str, db: Optional[AsyncSession] = None, ) -> bool: - """Update the status of a download item. - + """Set error message on a download item. + Args: item_id: Download item ID - status: New download status - error: Optional error message for failed status + error: Error message db: Optional existing database session - + Returns: True if update succeeded, False if item not found - + Raises: QueueRepositoryError: If update fails """ session = db or self._db_session_factory() manage_session = db is None - + try: - result = await DownloadQueueService.update_status( + result = await DownloadQueueService.set_error( session, int(item_id), - self._status_to_db(status), error, ) - + if manage_session: await session.commit() - + success = result is not None - + if success: logger.debug( - "Updated queue item status", - item_id=item_id, - status=status.value, + "Set error on queue item: item_id=%s", + item_id, ) - + return success - + except ValueError: return False except Exception as e: if manage_session: await session.rollback() - logger.error("Failed to update status", error=str(e)) - raise QueueRepositoryError( - f"Failed to update status: {str(e)}" - ) from e + logger.error("Failed to set error: %s", e) + raise QueueRepositoryError(f"Failed to set error: {e}") from e finally: if manage_session: await session.close() - - async def update_progress( - self, - item_id: str, - progress: float, - downloaded: int, - total: Optional[int], - speed: Optional[float], - db: Optional[AsyncSession] = None, - ) -> bool: - """Update download progress for an item. - - Args: - item_id: Download item ID - progress: Progress percentage (0-100) - downloaded: Downloaded bytes - total: Total bytes (optional) - speed: Download speed in bytes/second (optional) - db: Optional existing database session - - Returns: - True if update succeeded, False if item not found - - Raises: - QueueRepositoryError: If update fails - """ - session = db or self._db_session_factory() - manage_session = db is None - - try: - result = await DownloadQueueService.update_progress( - session, - int(item_id), - progress, - downloaded, - total, - speed, - ) - - if manage_session: - await session.commit() - - return result is not None - - except ValueError: - return False - except Exception as e: - if manage_session: - await session.rollback() - logger.error("Failed to update progress", error=str(e)) - raise QueueRepositoryError( - f"Failed to update progress: {str(e)}" - ) from e - finally: - if manage_session: - await session.close() - + async def delete_item( self, item_id: str, db: Optional[AsyncSession] = None, ) -> bool: """Delete a download item from the database. - + Args: item_id: Download item ID db: Optional existing database session - + Returns: True if item was deleted, False if not found - + Raises: QueueRepositoryError: If delete fails """ session = db or self._db_session_factory() manage_session = db is None - + try: result = await DownloadQueueService.delete(session, int(item_id)) - + if manage_session: await session.commit() - + if result: - logger.debug("Deleted queue item", item_id=item_id) - + logger.debug("Deleted queue item: item_id=%s", item_id) + return result - + except ValueError: return False except Exception as e: if manage_session: await session.rollback() - logger.error("Failed to delete item", error=str(e)) - raise QueueRepositoryError( - f"Failed to delete item: {str(e)}" - ) from e + logger.error("Failed to delete item: %s", e) + raise QueueRepositoryError(f"Failed to delete item: {e}") from e finally: if manage_session: await session.close() - - async def clear_completed( + + async def clear_all( self, db: Optional[AsyncSession] = None, ) -> int: - """Clear all completed download items. - + """Clear all download items from the queue. + Args: db: Optional existing database session - + Returns: Number of items cleared - + Raises: QueueRepositoryError: If operation fails """ session = db or self._db_session_factory() manage_session = db is None - + try: - count = await DownloadQueueService.clear_completed(session) - + # Get all items first to count them + all_items = await DownloadQueueService.get_all(session) + count = len(all_items) + + # Delete each item + for item in all_items: + await DownloadQueueService.delete(session, item.id) + if manage_session: await session.commit() - - logger.info("Cleared completed items from queue", count=count) + + logger.info("Cleared all items from queue: count=%d", count) return count - + except Exception as e: if manage_session: await session.rollback() - logger.error("Failed to clear completed items", error=str(e)) - raise QueueRepositoryError( - f"Failed to clear completed: {str(e)}" - ) from e - finally: - if manage_session: - await session.close() - - async def get_all_items( - self, - db: Optional[AsyncSession] = None, - ) -> List[DownloadItem]: - """Get all download items regardless of status. - - Args: - db: Optional existing database session - - Returns: - List of all download items - - Raises: - QueueRepositoryError: If query fails - """ - session = db or self._db_session_factory() - manage_session = db is None - - try: - db_items = await DownloadQueueService.get_all( - session, with_series=True - ) - return [self._from_db_model(item) for item in db_items] - - except Exception as e: - logger.error("Failed to get all items", error=str(e)) - raise QueueRepositoryError( - f"Failed to get all items: {str(e)}" - ) from e - finally: - if manage_session: - await session.close() - - async def retry_failed_items( - self, - max_retries: int = 3, - db: Optional[AsyncSession] = None, - ) -> List[DownloadItem]: - """Retry failed downloads that haven't exceeded max retries. - - Args: - max_retries: Maximum number of retry attempts - db: Optional existing database session - - Returns: - List of items marked for retry - - Raises: - QueueRepositoryError: If operation fails - """ - session = db or self._db_session_factory() - manage_session = db is None - - try: - db_items = await DownloadQueueService.retry_failed( - session, max_retries - ) - - if manage_session: - await session.commit() - - return [self._from_db_model(item) for item in db_items] - - except Exception as e: - if manage_session: - await session.rollback() - logger.error("Failed to retry failed items", error=str(e)) - raise QueueRepositoryError( - f"Failed to retry failed items: {str(e)}" - ) from e + logger.error("Failed to clear queue: %s", e) + raise QueueRepositoryError(f"Failed to clear queue: {e}") from e finally: if manage_session: await session.close() @@ -732,22 +430,31 @@ def get_queue_repository( db_session_factory: Optional[Callable[[], AsyncSession]] = None, ) -> QueueRepository: """Get or create the QueueRepository singleton. - + Args: db_session_factory: Optional factory function for database sessions. If not provided, uses default from connection module. - + Returns: QueueRepository singleton instance """ global _queue_repository_instance - + if _queue_repository_instance is None: if db_session_factory is None: # Use default session factory from src.server.database.connection import get_async_session_factory db_session_factory = get_async_session_factory - + _queue_repository_instance = QueueRepository(db_session_factory) - + return _queue_repository_instance + + +def reset_queue_repository() -> None: + """Reset the QueueRepository singleton. + + Used for testing to ensure fresh state between tests. + """ + global _queue_repository_instance + _queue_repository_instance = None diff --git a/src/server/services/scan_service.py b/src/server/services/scan_service.py index bcedba1..28c479e 100644 --- a/src/server/services/scan_service.py +++ b/src/server/services/scan_service.py @@ -415,7 +415,7 @@ class ScanService: message="Initializing scan...", ) except Exception as e: - logger.error("Failed to start progress tracking", error=str(e)) + logger.error("Failed to start progress tracking: %s", e) # Emit scan started event await self._emit_scan_event({ @@ -479,7 +479,7 @@ class ScanService: folder=scan_progress.folder, ) except Exception as e: - logger.debug("Progress update skipped", error=str(e)) + logger.debug("Progress update skipped: %s", e) # Emit progress event with key as primary identifier await self._emit_scan_event({ @@ -541,7 +541,7 @@ class ScanService: error_message=completion_context.message, ) except Exception as e: - logger.debug("Progress completion skipped", error=str(e)) + logger.debug("Progress completion skipped: %s", e) # Emit completion event await self._emit_scan_event({ @@ -598,7 +598,7 @@ class ScanService: error_message="Scan cancelled by user", ) except Exception as e: - logger.debug("Progress cancellation skipped", error=str(e)) + logger.debug("Progress cancellation skipped: %s", e) logger.info("Scan cancelled") return True diff --git a/tests/unit/test_download_service.py b/tests/unit/test_download_service.py index f27d7f1..271a93b 100644 --- a/tests/unit/test_download_service.py +++ b/tests/unit/test_download_service.py @@ -25,8 +25,11 @@ from src.server.services.download_service import DownloadService, DownloadServic class MockQueueRepository: """Mock implementation of QueueRepository for testing. - This provides an in-memory storage that mimics the database repository - behavior without requiring actual database connections. + This provides an in-memory storage that mimics the simplified database + repository behavior without requiring actual database connections. + + Note: The repository is simplified - status, priority, progress are + now managed in-memory by DownloadService, not stored in database. """ def __init__(self): @@ -42,81 +45,19 @@ class MockQueueRepository: """Get item by ID from in-memory storage.""" return self._items.get(item_id) - async def get_pending_items(self) -> List[DownloadItem]: - """Get all pending items.""" - return [ - item for item in self._items.values() - if item.status == DownloadStatus.PENDING - ] + async def get_all_items(self) -> List[DownloadItem]: + """Get all items in storage.""" + return list(self._items.values()) - async def get_active_item(self) -> Optional[DownloadItem]: - """Get the currently active item.""" - for item in self._items.values(): - if item.status == DownloadStatus.DOWNLOADING: - return item - return None - - async def get_completed_items( - self, limit: int = 100 - ) -> List[DownloadItem]: - """Get completed items.""" - completed = [ - item for item in self._items.values() - if item.status == DownloadStatus.COMPLETED - ] - return completed[:limit] - - async def get_failed_items(self, limit: int = 50) -> List[DownloadItem]: - """Get failed items.""" - failed = [ - item for item in self._items.values() - if item.status == DownloadStatus.FAILED - ] - return failed[:limit] - - async def update_status( + async def set_error( self, item_id: str, - status: DownloadStatus, - error: Optional[str] = None + error: str, ) -> bool: - """Update item status.""" + """Set error message on an item.""" if item_id not in self._items: return False - self._items[item_id].status = status - if error: - self._items[item_id].error = error - if status == DownloadStatus.COMPLETED: - self._items[item_id].completed_at = datetime.now(timezone.utc) - elif status == DownloadStatus.DOWNLOADING: - self._items[item_id].started_at = datetime.now(timezone.utc) - return True - - async def update_progress( - self, - item_id: str, - progress: float, - downloaded: int, - total: int, - speed: float - ) -> bool: - """Update download progress.""" - if item_id not in self._items: - return False - item = self._items[item_id] - if item.progress is None: - from src.server.models.download import DownloadProgress - item.progress = DownloadProgress( - percent=progress, - downloaded_bytes=downloaded, - total_bytes=total, - speed_bps=speed - ) - else: - item.progress.percent = progress - item.progress.downloaded_bytes = downloaded - item.progress.total_bytes = total - item.progress.speed_bps = speed + self._items[item_id].error = error return True async def delete_item(self, item_id: str) -> bool: @@ -126,15 +67,11 @@ class MockQueueRepository: return True return False - async def clear_completed(self) -> int: - """Clear all completed items.""" - completed_ids = [ - item_id for item_id, item in self._items.items() - if item.status == DownloadStatus.COMPLETED - ] - for item_id in completed_ids: - del self._items[item_id] - return len(completed_ids) + async def clear_all(self) -> int: + """Clear all items.""" + count = len(self._items) + self._items.clear() + return count @pytest.fixture @@ -505,9 +442,9 @@ class TestPersistence: ) # Item should be saved in mock repository - pending_items = await mock_queue_repository.get_pending_items() - assert len(pending_items) == 1 - assert pending_items[0].serie_id == "series-1" + all_items = await mock_queue_repository.get_all_items() + assert len(all_items) == 1 + assert all_items[0].serie_id == "series-1" @pytest.mark.asyncio async def test_queue_recovery_after_restart(