"""Database service layer for CRUD operations. This module provides a comprehensive service layer for database operations, implementing the Repository pattern for clean separation of concerns. Services: - AnimeSeriesService: CRUD operations for anime series - EpisodeService: CRUD operations for episodes - DownloadQueueService: CRUD operations for download queue - UserSessionService: CRUD operations for user sessions Transaction Support: All services are designed to work within transaction boundaries. Individual operations use flush() instead of commit() to allow the caller to control transaction boundaries. For compound operations spanning multiple services, use the @transactional decorator or atomic() context manager from src.server.database.transaction. All services support both async and sync operations for flexibility. """ from __future__ import annotations import logging from datetime import datetime, timedelta, timezone from typing import List, Optional from sqlalchemy import delete, select, update from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Session, selectinload from src.server.database.models import ( AnimeSeries, DownloadQueueItem, Episode, UserSession, ) logger = logging.getLogger(__name__) # ============================================================================ # Anime Series Service # ============================================================================ class AnimeSeriesService: """Service for anime series CRUD operations. Provides methods for creating, reading, updating, and deleting anime series with support for both async and sync database sessions. Series Identifier Convention: - Use `get_by_key()` for lookups by provider key (primary identifier) - Use `get_by_id()` for lookups by database primary key (internal) - Never use `folder` for identification - it's metadata only """ @staticmethod async def create( db: AsyncSession, key: str, name: str, site: str, folder: str, year: int | None = None, ) -> AnimeSeries: """Create a new anime series. Args: db: Database session key: Unique provider key name: Series name site: Provider site URL folder: Local filesystem path year: Release year (optional) Returns: Created AnimeSeries instance Raises: IntegrityError: If series with key already exists """ series = AnimeSeries( key=key, name=name, site=site, folder=folder, year=year, ) db.add(series) await db.flush() await db.refresh(series) logger.info(f"Created anime series: {series.name} (key={series.key}, year={year})") return series @staticmethod async def get_by_id(db: AsyncSession, series_id: int) -> Optional[AnimeSeries]: """Get anime series by ID. Args: db: Database session series_id: Series primary key Returns: AnimeSeries instance or None if not found """ result = await db.execute( select(AnimeSeries).where(AnimeSeries.id == series_id) ) return result.scalar_one_or_none() @staticmethod async def get_by_key(db: AsyncSession, key: str) -> Optional[AnimeSeries]: """Get anime series by provider key. This is the PRIMARY lookup method for series identification. Use this method instead of get_by_id() when looking up by the provider-assigned unique key. Args: db: Database session key: Unique provider key (e.g., "attack-on-titan") Returns: AnimeSeries instance or None if not found Note: Do NOT use folder for lookups - it's metadata only. """ result = await db.execute( select(AnimeSeries).where(AnimeSeries.key == key) ) return result.scalar_one_or_none() @staticmethod async def get_all( db: AsyncSession, limit: Optional[int] = None, offset: int = 0, with_episodes: bool = False, ) -> List[AnimeSeries]: """Get all anime series. Args: db: Database session limit: Optional limit for results offset: Offset for pagination with_episodes: Whether to eagerly load episodes Returns: List of AnimeSeries instances """ query = select(AnimeSeries) if with_episodes: query = query.options(selectinload(AnimeSeries.episodes)) query = query.offset(offset) if limit: query = query.limit(limit) result = await db.execute(query) return list(result.scalars().all()) @staticmethod async def update( db: AsyncSession, series_id: int, **kwargs, ) -> Optional[AnimeSeries]: """Update anime series. Args: db: Database session series_id: Series primary key **kwargs: Fields to update Returns: Updated AnimeSeries instance or None if not found """ series = await AnimeSeriesService.get_by_id(db, series_id) if not series: return None for key, value in kwargs.items(): if hasattr(series, key): setattr(series, key, value) await db.flush() await db.refresh(series) logger.info(f"Updated anime series: {series.name} (id={series_id})") return series @staticmethod async def delete(db: AsyncSession, series_id: int) -> bool: """Delete anime series. Cascades to delete all episodes and download items. Args: db: Database session series_id: Series primary key Returns: True if deleted, False if not found """ result = await db.execute( delete(AnimeSeries).where(AnimeSeries.id == series_id) ) deleted = result.rowcount > 0 if deleted: logger.info(f"Deleted anime series with id={series_id}") return deleted @staticmethod async def search( db: AsyncSession, query: str, limit: int = 50, ) -> List[AnimeSeries]: """Search anime series by name. Args: db: Database session query: Search query limit: Maximum results Returns: List of matching AnimeSeries instances """ result = await db.execute( select(AnimeSeries) .where(AnimeSeries.name.ilike(f"%{query}%")) .limit(limit) ) return list(result.scalars().all()) # ============================================================================ # Episode Service # ============================================================================ class EpisodeService: """Service for episode CRUD operations. Provides methods for managing episodes within anime series. """ @staticmethod async def create( db: AsyncSession, series_id: int, season: int, episode_number: int, title: Optional[str] = None, file_path: Optional[str] = None, is_downloaded: bool = False, ) -> Episode: """Create a new episode. Args: db: Database session series_id: Foreign key to AnimeSeries season: Season number episode_number: Episode number within season title: Optional episode title file_path: Optional local file path is_downloaded: Whether episode is downloaded Returns: Created Episode instance """ episode = Episode( series_id=series_id, season=season, episode_number=episode_number, title=title, file_path=file_path, is_downloaded=is_downloaded, ) db.add(episode) await db.flush() await db.refresh(episode) logger.debug( f"Created episode: S{season:02d}E{episode_number:02d} " f"for series_id={series_id}" ) return episode @staticmethod async def get_by_id(db: AsyncSession, episode_id: int) -> Optional[Episode]: """Get episode by ID. Args: db: Database session episode_id: Episode primary key Returns: Episode instance or None if not found """ result = await db.execute( select(Episode).where(Episode.id == episode_id) ) return result.scalar_one_or_none() @staticmethod async def get_by_series( db: AsyncSession, series_id: int, season: Optional[int] = None, ) -> List[Episode]: """Get episodes for a series. Args: db: Database session series_id: Foreign key to AnimeSeries season: Optional season filter Returns: List of Episode instances """ query = select(Episode).where(Episode.series_id == series_id) if season is not None: query = query.where(Episode.season == season) query = query.order_by(Episode.season, Episode.episode_number) result = await db.execute(query) return list(result.scalars().all()) @staticmethod async def get_by_episode( db: AsyncSession, series_id: int, season: int, episode_number: int, ) -> Optional[Episode]: """Get specific episode. Args: db: Database session series_id: Foreign key to AnimeSeries season: Season number episode_number: Episode number Returns: Episode instance or None if not found """ result = await db.execute( select(Episode).where( Episode.series_id == series_id, Episode.season == season, Episode.episode_number == episode_number, ) ) return result.scalar_one_or_none() @staticmethod async def mark_downloaded( db: AsyncSession, episode_id: int, file_path: str, ) -> Optional[Episode]: """Mark episode as downloaded. Args: db: Database session episode_id: Episode primary key file_path: Local file path Returns: Updated Episode instance or None if not found """ episode = await EpisodeService.get_by_id(db, episode_id) if not episode: return None episode.is_downloaded = True episode.file_path = file_path await db.flush() await db.refresh(episode) logger.info( f"Marked episode as downloaded: " f"S{episode.season:02d}E{episode.episode_number:02d}" ) return episode @staticmethod async def delete(db: AsyncSession, episode_id: int) -> bool: """Delete episode. Args: db: Database session episode_id: Episode primary key Returns: True if deleted, False if not found """ result = await db.execute( delete(Episode).where(Episode.id == episode_id) ) return result.rowcount > 0 @staticmethod async def delete_by_series_and_episode( db: AsyncSession, series_key: str, season: int, episode_number: int, ) -> bool: """Delete episode by series key, season, and episode number. Used to remove episodes from the missing list when they are downloaded successfully. Args: db: Database session series_key: Unique provider key for the series season: Season number episode_number: Episode number within season Returns: True if deleted, False if not found """ # First get the series by key series = await AnimeSeriesService.get_by_key(db, series_key) if not series: logger.warning( f"Series not found for key: {series_key}" ) return False # Then delete the episode result = await db.execute( delete(Episode).where( Episode.series_id == series.id, Episode.season == season, Episode.episode_number == episode_number, ) ) deleted = result.rowcount > 0 if deleted: logger.info( f"Removed episode from missing list: " f"{series_key} S{season:02d}E{episode_number:02d}" ) return deleted @staticmethod async def bulk_mark_downloaded( db: AsyncSession, episode_ids: List[int], file_paths: Optional[List[str]] = None, ) -> int: """Mark multiple episodes as downloaded atomically. This operation should be wrapped in a transaction for atomicity. All episodes will be updated or none if an error occurs. Args: db: Database session episode_ids: List of episode primary keys to update file_paths: Optional list of file paths (parallel to episode_ids) Returns: Number of episodes updated Note: Use within @transactional or atomic() for guaranteed atomicity: async with atomic(db) as tx: count = await EpisodeService.bulk_mark_downloaded( db, episode_ids, file_paths ) """ if not episode_ids: return 0 updated_count = 0 for i, episode_id in enumerate(episode_ids): episode = await EpisodeService.get_by_id(db, episode_id) if episode: episode.is_downloaded = True if file_paths and i < len(file_paths): episode.file_path = file_paths[i] updated_count += 1 await db.flush() logger.info(f"Bulk marked {updated_count} episodes as downloaded") return updated_count # ============================================================================ # Download Queue Service # ============================================================================ class DownloadQueueService: """Service for download queue CRUD operations. Provides methods for managing the download queue. Transaction Support: All operations use flush() for transaction-safe operation. For bulk operations, use @transactional or atomic() context. """ @staticmethod async def create( db: AsyncSession, series_id: int, episode_id: int, download_url: Optional[str] = None, file_destination: Optional[str] = None, ) -> DownloadQueueItem: """Add item to download queue. Args: db: Database session series_id: Foreign key to AnimeSeries episode_id: Foreign key to Episode download_url: Optional provider download URL file_destination: Optional target file path Returns: Created DownloadQueueItem instance """ item = DownloadQueueItem( series_id=series_id, episode_id=episode_id, download_url=download_url, file_destination=file_destination, ) db.add(item) await db.flush() await db.refresh(item) logger.info( f"Added to download queue: episode_id={episode_id} " f"for series_id={series_id}" ) return item @staticmethod async def get_by_id( db: AsyncSession, item_id: int, ) -> Optional[DownloadQueueItem]: """Get download queue item by ID. Args: db: Database session item_id: Item primary key Returns: DownloadQueueItem instance or None if not found """ result = await db.execute( select(DownloadQueueItem).where(DownloadQueueItem.id == item_id) ) return result.scalar_one_or_none() @staticmethod async def get_by_episode( db: AsyncSession, episode_id: int, ) -> Optional[DownloadQueueItem]: """Get download queue item by episode ID. Args: db: Database session episode_id: Foreign key to Episode Returns: DownloadQueueItem instance or None if not found """ result = await db.execute( select(DownloadQueueItem).where( DownloadQueueItem.episode_id == episode_id ) ) return result.scalar_one_or_none() @staticmethod async def get_all( db: AsyncSession, with_series: bool = False, ) -> List[DownloadQueueItem]: """Get all download queue items. Args: db: Database session with_series: Whether to eagerly load series data Returns: List of all DownloadQueueItem instances """ query = select(DownloadQueueItem) if with_series: query = query.options(selectinload(DownloadQueueItem.series)) query = query.order_by( DownloadQueueItem.created_at.asc(), ) result = await db.execute(query) return list(result.scalars().all()) @staticmethod async def set_error( db: AsyncSession, item_id: int, error_message: str, ) -> Optional[DownloadQueueItem]: """Set error message on download queue item. Args: db: Database session item_id: Item primary key error_message: Error description Returns: Updated DownloadQueueItem instance or None if not found """ item = await DownloadQueueService.get_by_id(db, item_id) if not item: return None item.error_message = error_message await db.flush() await db.refresh(item) logger.debug(f"Set error on download queue item {item_id}") return item @staticmethod async def delete(db: AsyncSession, item_id: int) -> bool: """Delete download queue item. Args: db: Database session item_id: Item primary key Returns: True if deleted, False if not found """ result = await db.execute( delete(DownloadQueueItem).where(DownloadQueueItem.id == item_id) ) deleted = result.rowcount > 0 if deleted: logger.info(f"Deleted download queue item with id={item_id}") return deleted @staticmethod async def delete_by_episode( db: AsyncSession, episode_id: int, ) -> bool: """Delete download queue item by episode ID. Args: db: Database session episode_id: Foreign key to Episode Returns: True if deleted, False if not found """ result = await db.execute( delete(DownloadQueueItem).where( DownloadQueueItem.episode_id == episode_id ) ) deleted = result.rowcount > 0 if deleted: logger.info( f"Deleted download queue item with episode_id={episode_id}" ) return deleted @staticmethod async def bulk_delete( db: AsyncSession, item_ids: List[int], ) -> int: """Delete multiple download queue items atomically. This operation should be wrapped in a transaction for atomicity. All items will be deleted or none if an error occurs. Args: db: Database session item_ids: List of item primary keys to delete Returns: Number of items deleted Note: Use within @transactional or atomic() for guaranteed atomicity: async with atomic(db) as tx: count = await DownloadQueueService.bulk_delete(db, item_ids) """ if not item_ids: return 0 result = await db.execute( delete(DownloadQueueItem).where( DownloadQueueItem.id.in_(item_ids) ) ) count = result.rowcount logger.info(f"Bulk deleted {count} download queue items") return count @staticmethod async def clear_all( db: AsyncSession, ) -> int: """Clear all download queue items. Deletes all items from the download queue. This operation should be wrapped in a transaction. Args: db: Database session Returns: Number of items deleted """ result = await db.execute(delete(DownloadQueueItem)) count = result.rowcount logger.info(f"Cleared all {count} download queue items") return count # ============================================================================ # User Session Service # ============================================================================ class UserSessionService: """Service for user session CRUD operations. Provides methods for managing user authentication sessions with JWT tokens. Transaction Support: Session rotation and cleanup operations should use transactions for atomicity when multiple sessions are involved. """ @staticmethod async def create( db: AsyncSession, session_id: str, token_hash: str, expires_at: datetime, user_id: Optional[str] = None, ip_address: Optional[str] = None, user_agent: Optional[str] = None, ) -> UserSession: """Create a new user session. Args: db: Database session session_id: Unique session identifier token_hash: Hashed JWT token expires_at: Session expiration timestamp user_id: Optional user identifier ip_address: Optional client IP address user_agent: Optional client user agent Returns: Created UserSession instance """ session = UserSession( session_id=session_id, token_hash=token_hash, expires_at=expires_at, user_id=user_id, ip_address=ip_address, user_agent=user_agent, ) db.add(session) await db.flush() await db.refresh(session) logger.info(f"Created user session: {session_id}") return session @staticmethod async def get_by_session_id( db: AsyncSession, session_id: str, ) -> Optional[UserSession]: """Get session by session ID. Args: db: Database session session_id: Unique session identifier Returns: UserSession instance or None if not found """ result = await db.execute( select(UserSession).where(UserSession.session_id == session_id) ) return result.scalar_one_or_none() @staticmethod async def get_active_sessions( db: AsyncSession, user_id: Optional[str] = None, ) -> List[UserSession]: """Get active sessions. Args: db: Database session user_id: Optional user ID filter Returns: List of active UserSession instances """ query = select(UserSession).where( UserSession.is_active == True, UserSession.expires_at > datetime.now(timezone.utc), ) if user_id: query = query.where(UserSession.user_id == user_id) result = await db.execute(query) return list(result.scalars().all()) @staticmethod async def update_activity( db: AsyncSession, session_id: str, ) -> Optional[UserSession]: """Update session last activity timestamp. Args: db: Database session session_id: Unique session identifier Returns: Updated UserSession instance or None if not found """ session = await UserSessionService.get_by_session_id(db, session_id) if not session: return None session.last_activity = datetime.now(timezone.utc) await db.flush() await db.refresh(session) return session @staticmethod async def revoke(db: AsyncSession, session_id: str) -> bool: """Revoke a session. Args: db: Database session session_id: Unique session identifier Returns: True if revoked, False if not found """ session = await UserSessionService.get_by_session_id(db, session_id) if not session: return False session.revoke() await db.flush() logger.info(f"Revoked user session: {session_id}") return True @staticmethod async def cleanup_expired(db: AsyncSession) -> int: """Clean up expired sessions. This is a bulk delete operation that should be wrapped in a transaction for atomicity when multiple sessions are deleted. Args: db: Database session Returns: Number of sessions deleted """ result = await db.execute( delete(UserSession).where( UserSession.expires_at < datetime.now(timezone.utc) ) ) count = result.rowcount logger.info(f"Cleaned up {count} expired sessions") return count @staticmethod async def rotate_session( db: AsyncSession, old_session_id: str, new_session_id: str, new_token_hash: str, new_expires_at: datetime, user_id: Optional[str] = None, ip_address: Optional[str] = None, user_agent: Optional[str] = None, ) -> Optional[UserSession]: """Rotate a session by revoking old and creating new atomically. This compound operation revokes the old session and creates a new one. Should be wrapped in a transaction for atomicity. Args: db: Database session old_session_id: Session ID to revoke new_session_id: New session ID new_token_hash: New token hash new_expires_at: New expiration time user_id: Optional user identifier ip_address: Optional client IP user_agent: Optional user agent Returns: New UserSession instance, or None if old session not found Note: Use within @transactional or atomic() for atomicity: async with atomic(db) as tx: new_session = await UserSessionService.rotate_session( db, old_id, new_id, hash, expires ) """ # Revoke old session old_revoked = await UserSessionService.revoke(db, old_session_id) if not old_revoked: logger.warning( f"Could not rotate: old session {old_session_id} not found" ) return None # Create new session new_session = await UserSessionService.create( db=db, session_id=new_session_id, token_hash=new_token_hash, expires_at=new_expires_at, user_id=user_id, ip_address=ip_address, user_agent=user_agent, ) logger.info( f"Rotated session: {old_session_id} -> {new_session_id}" ) return new_session