Aniworld/src/server/services/queue_repository.py
Lukas 1ba67357dc Add database transaction support with atomic operations
- Create transaction.py with @transactional decorator, atomic() context manager
- Add TransactionPropagation modes: REQUIRED, REQUIRES_NEW, NESTED
- Add savepoint support for nested transactions with partial rollback
- Update connection.py with TransactionManager, get_transactional_session
- Update service.py with bulk operations (bulk_mark_downloaded, bulk_delete)
- Wrap QueueRepository.save_item() and clear_all() in atomic transactions
- Add comprehensive tests (66 transaction tests, 90% coverage)
- All 1090 tests passing
2025-12-25 18:05:33 +01:00

472 lines
15 KiB
Python

"""Queue repository adapter for database-backed download queue operations.
This module provides a repository adapter that wraps the DownloadQueueService
and provides the interface needed by DownloadService for queue persistence.
The repository pattern abstracts the database operations from the business
logic, allowing the DownloadService to work with domain models (DownloadItem)
while the repository handles conversion to/from database models.
Transaction Support:
Compound operations (save_item, clear_all) are wrapped in atomic()
context managers to ensure all-or-nothing behavior. If any part of
a compound operation fails, all changes are rolled back.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import Callable, List, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from src.server.database.models import DownloadQueueItem as DBDownloadQueueItem
from src.server.database.service import (
AnimeSeriesService,
DownloadQueueService,
EpisodeService,
)
from src.server.database.transaction import atomic
from src.server.models.download import (
DownloadItem,
DownloadPriority,
DownloadStatus,
EpisodeIdentifier,
)
logger = logging.getLogger(__name__)
class QueueRepositoryError(Exception):
"""Repository-level exception for queue operations."""
class QueueRepository:
"""Repository adapter for database-backed download queue operations.
Provides clean interface for queue operations while handling
model conversion between Pydantic (DownloadItem) and SQLAlchemy
(DownloadQueueItem) models.
Note: The database model (DownloadQueueItem) is simplified and only
stores episode_id as a foreign key. Status, priority, progress, and
retry_count are managed in-memory by the DownloadService.
Transaction Support:
All compound operations are wrapped in atomic() transactions.
This ensures data consistency even if operations fail mid-way.
Attributes:
_db_session_factory: Factory function to create database sessions
"""
def __init__(
self,
db_session_factory: Callable[[], AsyncSession],
) -> None:
"""Initialize the queue repository.
Args:
db_session_factory: Factory function that returns AsyncSession
"""
self._db_session_factory = db_session_factory
logger.info("QueueRepository initialized")
# =========================================================================
# Model Conversion Methods
# =========================================================================
def _from_db_model(
self,
db_item: DBDownloadQueueItem,
item_id: Optional[str] = None,
) -> DownloadItem:
"""Convert database model to DownloadItem.
Note: Since the database model is simplified, status, priority,
progress, and retry_count default to initial values.
Args:
db_item: SQLAlchemy download queue item
item_id: Optional override for item ID
Returns:
Pydantic download item with default status/priority
"""
# Get episode info from the related Episode object
episode = db_item.episode
series = db_item.series
episode_identifier = EpisodeIdentifier(
season=episode.season if episode else 1,
episode=episode.episode_number if episode else 1,
title=episode.title if episode else None,
)
return DownloadItem(
id=item_id or str(db_item.id),
serie_id=series.key if series else "",
serie_folder=series.folder if series else "",
serie_name=series.name if series else "",
episode=episode_identifier,
status=DownloadStatus.PENDING, # Default - managed in-memory
priority=DownloadPriority.NORMAL, # Default - managed in-memory
added_at=db_item.created_at or datetime.now(timezone.utc),
started_at=db_item.started_at,
completed_at=db_item.completed_at,
progress=None, # Managed in-memory
error=db_item.error_message,
retry_count=0, # Managed in-memory
source_url=db_item.download_url,
)
# =========================================================================
# CRUD Operations
# =========================================================================
async def save_item(
self,
item: DownloadItem,
db: Optional[AsyncSession] = None,
) -> DownloadItem:
"""Save a download item to the database atomically.
Creates a new record if the item doesn't exist in the database.
This compound operation (series lookup/create, episode lookup/create,
queue item create) is wrapped in a transaction for atomicity.
Note: Status, priority, progress, and retry_count are NOT persisted.
Args:
item: Download item to save
db: Optional existing database session
Returns:
Saved download item with database ID
Raises:
QueueRepositoryError: If save operation fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
async with atomic(session):
# Find series by key
series = await AnimeSeriesService.get_by_key(session, item.serie_id)
if not series:
# Create series if it doesn't exist
# Use a placeholder site URL - will be updated later when actual URL is known
site_url = getattr(item, 'serie_site', None) or f"https://aniworld.to/anime/{item.serie_id}"
series = await AnimeSeriesService.create(
db=session,
key=item.serie_id,
name=item.serie_name,
site=site_url,
folder=item.serie_folder,
)
logger.info(
"Created new series for queue item: key=%s, name=%s",
item.serie_id,
item.serie_name,
)
# Find or create episode
episode = await EpisodeService.get_by_episode(
session,
series.id,
item.episode.season,
item.episode.episode,
)
if not episode:
# Create episode if it doesn't exist
episode = await EpisodeService.create(
db=session,
series_id=series.id,
season=item.episode.season,
episode_number=item.episode.episode,
title=item.episode.title,
)
logger.info(
"Created new episode for queue item: S%02dE%02d",
item.episode.season,
item.episode.episode,
)
# Create queue item
db_item = await DownloadQueueService.create(
db=session,
series_id=series.id,
episode_id=episode.id,
download_url=str(item.source_url) if item.source_url else None,
)
# Update the item ID with the database ID
item.id = str(db_item.id)
# Transaction committed by atomic() context manager
logger.debug(
"Saved queue item to database: item_id=%s, serie_key=%s",
item.id,
item.serie_id,
)
return item
except Exception as e:
# Rollback handled by atomic() context manager
logger.error("Failed to save queue item: %s", e)
raise QueueRepositoryError(f"Failed to save item: {e}") from e
finally:
if manage_session:
await session.close()
async def get_item(
self,
item_id: str,
db: Optional[AsyncSession] = None,
) -> Optional[DownloadItem]:
"""Get a download item by ID.
Args:
item_id: Download item ID (database ID as string)
db: Optional existing database session
Returns:
Download item or None if not found
Raises:
QueueRepositoryError: If query fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
db_item = await DownloadQueueService.get_by_id(
session, int(item_id)
)
if not db_item:
return None
return self._from_db_model(db_item, item_id)
except ValueError:
# Invalid ID format
return None
except Exception as e:
logger.error("Failed to get queue item: %s", e)
raise QueueRepositoryError(f"Failed to get item: {e}") from e
finally:
if manage_session:
await session.close()
async def get_all_items(
self,
db: Optional[AsyncSession] = None,
) -> List[DownloadItem]:
"""Get all download items regardless of status.
Note: All items are returned with default status (PENDING) since
status is now managed in-memory by the DownloadService.
Args:
db: Optional existing database session
Returns:
List of all download items
Raises:
QueueRepositoryError: If query fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
db_items = await DownloadQueueService.get_all(
session, with_series=True
)
return [self._from_db_model(item) for item in db_items]
except Exception as e:
logger.error("Failed to get all items: %s", e)
raise QueueRepositoryError(f"Failed to get all items: {e}") from e
finally:
if manage_session:
await session.close()
async def set_error(
self,
item_id: str,
error: str,
db: Optional[AsyncSession] = None,
) -> bool:
"""Set error message on a download item.
Args:
item_id: Download item ID
error: Error message
db: Optional existing database session
Returns:
True if update succeeded, False if item not found
Raises:
QueueRepositoryError: If update fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
result = await DownloadQueueService.set_error(
session,
int(item_id),
error,
)
if manage_session:
await session.commit()
success = result is not None
if success:
logger.debug(
"Set error on queue item: item_id=%s",
item_id,
)
return success
except ValueError:
return False
except Exception as e:
if manage_session:
await session.rollback()
logger.error("Failed to set error: %s", e)
raise QueueRepositoryError(f"Failed to set error: {e}") from e
finally:
if manage_session:
await session.close()
async def delete_item(
self,
item_id: str,
db: Optional[AsyncSession] = None,
) -> bool:
"""Delete a download item from the database.
Args:
item_id: Download item ID
db: Optional existing database session
Returns:
True if item was deleted, False if not found
Raises:
QueueRepositoryError: If delete fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
result = await DownloadQueueService.delete(session, int(item_id))
if manage_session:
await session.commit()
if result:
logger.debug("Deleted queue item: item_id=%s", item_id)
return result
except ValueError:
return False
except Exception as e:
if manage_session:
await session.rollback()
logger.error("Failed to delete item: %s", e)
raise QueueRepositoryError(f"Failed to delete item: {e}") from e
finally:
if manage_session:
await session.close()
async def clear_all(
self,
db: Optional[AsyncSession] = None,
) -> int:
"""Clear all download items from the queue atomically.
This bulk delete operation is wrapped in a transaction.
Either all items are deleted or none are.
Args:
db: Optional existing database session
Returns:
Number of items cleared
Raises:
QueueRepositoryError: If operation fails
"""
session = db or self._db_session_factory()
manage_session = db is None
try:
async with atomic(session):
# Use the bulk clear operation for efficiency and atomicity
count = await DownloadQueueService.clear_all(session)
# Transaction committed by atomic() context manager
logger.info("Cleared all items from queue: count=%d", count)
return count
except Exception as e:
# Rollback handled by atomic() context manager
logger.error("Failed to clear queue: %s", e)
raise QueueRepositoryError(f"Failed to clear queue: {e}") from e
finally:
if manage_session:
await session.close()
# Singleton instance
_queue_repository_instance: Optional[QueueRepository] = None
def get_queue_repository(
db_session_factory: Optional[Callable[[], AsyncSession]] = None,
) -> QueueRepository:
"""Get or create the QueueRepository singleton.
Args:
db_session_factory: Optional factory function for database sessions.
If not provided, uses default from connection module.
Returns:
QueueRepository singleton instance
"""
global _queue_repository_instance
if _queue_repository_instance is None:
if db_session_factory is None:
# Use default session factory
from src.server.database.connection import get_async_session_factory
db_session_factory = get_async_session_factory
_queue_repository_instance = QueueRepository(db_session_factory)
return _queue_repository_instance
def reset_queue_repository() -> None:
"""Reset the QueueRepository singleton.
Used for testing to ensure fresh state between tests.
"""
global _queue_repository_instance
_queue_repository_instance = None