When SetupService cannot auto-resolve a provider key for an anime folder,
the folder is now tracked in the new 'unresolved_folders' table instead of
being silently skipped. Users can then resolve these via the new API:
- GET /api/setup/unresolved - list unresolved folders with search suggestions
- POST /api/setup/unresolved/{folder}/resolve - provide key to resolve folder
The SetupService.run() now:
- Tracks unresolved folders instead of skipping them
- Re-creates AnimeSeries for previously unresolved folders that are now resolved
- Includes unresolved count in logs
New files:
- src/server/api/setup_endpoints.py - API endpoints for unresolved management
- tests/unit/test_unresolved_folder_service.py - service and model tests
Modified:
- src/server/database/models.py - add UnresolvedFolder model
- src/server/database/service.py - add UnresolvedFolderService
- src/server/services/setup_service.py - track unresolved folders
- src/server/fastapi_app.py - include setup router
1541 lines
46 KiB
Python
1541 lines
46 KiB
Python
"""Database service layer for CRUD operations.
|
|
|
|
This module provides a comprehensive service layer for database operations,
|
|
implementing the Repository pattern for clean separation of concerns.
|
|
|
|
Services:
|
|
- AnimeSeriesService: CRUD operations for anime series
|
|
- EpisodeService: CRUD operations for episodes
|
|
- DownloadQueueService: CRUD operations for download queue
|
|
- UserSessionService: CRUD operations for user sessions
|
|
|
|
Transaction Support:
|
|
All services are designed to work within transaction boundaries.
|
|
Individual operations use flush() instead of commit() to allow
|
|
the caller to control transaction boundaries.
|
|
|
|
For compound operations spanning multiple services, use the
|
|
@transactional decorator or atomic() context manager from
|
|
src.server.database.transaction.
|
|
|
|
All services support both async and sync operations for flexibility.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import List, Optional
|
|
|
|
from sqlalchemy import Integer, delete, select, update
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import Session, selectinload
|
|
|
|
from src.server.database.models import (
|
|
AnimeSeries,
|
|
DownloadQueueItem,
|
|
Episode,
|
|
UnresolvedFolder,
|
|
UserSession,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ============================================================================
|
|
# Anime Series Service
|
|
# ============================================================================
|
|
|
|
|
|
class AnimeSeriesService:
|
|
"""Service for anime series CRUD operations.
|
|
|
|
Provides methods for creating, reading, updating, and deleting anime series
|
|
with support for both async and sync database sessions.
|
|
|
|
Series Identifier Convention:
|
|
- Use `get_by_key()` for lookups by provider key (primary identifier)
|
|
- Use `get_by_id()` for lookups by database primary key (internal)
|
|
- Never use `folder` for identification - it's metadata only
|
|
"""
|
|
|
|
@staticmethod
|
|
async def create(
|
|
db: AsyncSession,
|
|
key: str,
|
|
name: str,
|
|
site: str,
|
|
folder: str,
|
|
year: int | None = None,
|
|
loading_status: str = "completed",
|
|
episodes_loaded: bool = True,
|
|
logo_loaded: bool = False,
|
|
images_loaded: bool = False,
|
|
loading_started_at: datetime | None = None,
|
|
has_nfo: bool = False,
|
|
nfo_path: str | None = None,
|
|
nfo_created_at: datetime | None = None,
|
|
nfo_updated_at: datetime | None = None,
|
|
) -> AnimeSeries:
|
|
"""Create a new anime series.
|
|
|
|
Args:
|
|
db: Database session
|
|
key: Unique provider key
|
|
name: Series name
|
|
site: Provider site URL
|
|
folder: Local filesystem path
|
|
year: Release year (optional)
|
|
loading_status: Initial loading status (default: "completed")
|
|
episodes_loaded: Whether episodes are loaded (default: True for backward compat)
|
|
logo_loaded: Whether logo is loaded (default: False)
|
|
images_loaded: Whether images are loaded (default: False)
|
|
loading_started_at: When loading started (optional)
|
|
has_nfo: Whether tvshow.nfo exists (default: False)
|
|
nfo_path: Path to tvshow.nfo file (optional)
|
|
nfo_created_at: When NFO file was created (optional)
|
|
nfo_updated_at: When NFO file was last updated (optional)
|
|
|
|
Returns:
|
|
Created AnimeSeries instance
|
|
|
|
Raises:
|
|
IntegrityError: If series with key already exists
|
|
"""
|
|
series = AnimeSeries(
|
|
key=key,
|
|
name=name,
|
|
site=site,
|
|
folder=folder,
|
|
year=year,
|
|
loading_status=loading_status,
|
|
episodes_loaded=episodes_loaded,
|
|
logo_loaded=logo_loaded,
|
|
images_loaded=images_loaded,
|
|
loading_started_at=loading_started_at,
|
|
has_nfo=has_nfo,
|
|
nfo_path=nfo_path,
|
|
nfo_created_at=nfo_created_at,
|
|
nfo_updated_at=nfo_updated_at,
|
|
)
|
|
db.add(series)
|
|
await db.flush()
|
|
await db.refresh(series)
|
|
logger.info("Created anime series: %s (key=%s, year=%s)", series.name, series.key, year)
|
|
return series
|
|
|
|
@staticmethod
|
|
async def get_by_id(db: AsyncSession, series_id: int) -> Optional[AnimeSeries]:
|
|
"""Get anime series by ID.
|
|
|
|
Args:
|
|
db: Database session
|
|
series_id: Series primary key
|
|
|
|
Returns:
|
|
AnimeSeries instance or None if not found
|
|
"""
|
|
result = await db.execute(
|
|
select(AnimeSeries).where(AnimeSeries.id == series_id)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
@staticmethod
|
|
async def get_by_key(db: AsyncSession, key: str) -> Optional[AnimeSeries]:
|
|
"""Get anime series by provider key.
|
|
|
|
This is the PRIMARY lookup method for series identification.
|
|
Use this method instead of get_by_id() when looking up by
|
|
the provider-assigned unique key.
|
|
|
|
Args:
|
|
db: Database session
|
|
key: Unique provider key (e.g., "attack-on-titan")
|
|
|
|
Returns:
|
|
AnimeSeries instance or None if not found
|
|
|
|
Note:
|
|
Do NOT use folder for lookups - it's metadata only.
|
|
"""
|
|
result = await db.execute(
|
|
select(AnimeSeries).where(AnimeSeries.key == key)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
@staticmethod
|
|
def get_by_folder_sync(db: Session, folder: str) -> Optional[AnimeSeries]:
|
|
"""Look up an anime series by its filesystem folder name (sync).
|
|
|
|
Intended as a fallback for ``SerieScanner`` when neither a ``key``
|
|
file nor a ``data`` file exists on disk for a given folder.
|
|
|
|
Args:
|
|
db: Synchronous database session (from ``get_sync_session``).
|
|
folder: Filesystem folder name to match (e.g.
|
|
``"Rooster Fighter (2026)"``).
|
|
|
|
Returns:
|
|
``AnimeSeries`` instance or ``None`` if not found.
|
|
"""
|
|
result = db.execute(
|
|
select(AnimeSeries).where(AnimeSeries.folder == folder)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
@staticmethod
|
|
async def get_by_folder(db: AsyncSession, folder: str) -> Optional[AnimeSeries]:
|
|
"""Look up an anime series by its filesystem folder name (async).
|
|
|
|
Intended as primary lookup for ``SerieScanner`` when scanning
|
|
directories, replacing the legacy file-based lookups (key/data files).
|
|
|
|
Args:
|
|
db: Async database session.
|
|
folder: Filesystem folder name to match (e.g.
|
|
``"Rooster Fighter (2026)"``).
|
|
|
|
Returns:
|
|
``AnimeSeries`` instance or ``None`` if not found.
|
|
"""
|
|
result = await db.execute(
|
|
select(AnimeSeries).where(AnimeSeries.folder == folder)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
@staticmethod
|
|
async def get_all(
|
|
db: AsyncSession,
|
|
limit: Optional[int] = None,
|
|
offset: int = 0,
|
|
with_episodes: bool = False,
|
|
) -> List[AnimeSeries]:
|
|
"""Get all anime series.
|
|
|
|
Args:
|
|
db: Database session
|
|
limit: Optional limit for results
|
|
offset: Offset for pagination
|
|
with_episodes: Whether to eagerly load episodes
|
|
|
|
Returns:
|
|
List of AnimeSeries instances
|
|
"""
|
|
query = select(AnimeSeries)
|
|
|
|
if with_episodes:
|
|
query = query.options(selectinload(AnimeSeries.episodes))
|
|
|
|
query = query.offset(offset)
|
|
if limit:
|
|
query = query.limit(limit)
|
|
|
|
result = await db.execute(query)
|
|
return list(result.scalars().all())
|
|
|
|
@staticmethod
|
|
async def update(
|
|
db: AsyncSession,
|
|
series_id: int,
|
|
**kwargs,
|
|
) -> Optional[AnimeSeries]:
|
|
"""Update anime series.
|
|
|
|
Args:
|
|
db: Database session
|
|
series_id: Series primary key
|
|
**kwargs: Fields to update
|
|
|
|
Returns:
|
|
Updated AnimeSeries instance or None if not found
|
|
"""
|
|
series = await AnimeSeriesService.get_by_id(db, series_id)
|
|
if not series:
|
|
return None
|
|
|
|
for key, value in kwargs.items():
|
|
if hasattr(series, key):
|
|
setattr(series, key, value)
|
|
|
|
await db.flush()
|
|
await db.refresh(series)
|
|
logger.info("Updated anime series: %s (id=%s)", series.name, series_id)
|
|
return series
|
|
|
|
@staticmethod
|
|
async def delete(db: AsyncSession, series_id: int) -> bool:
|
|
"""Delete anime series.
|
|
|
|
Cascades to delete all episodes and download items.
|
|
|
|
Args:
|
|
db: Database session
|
|
series_id: Series primary key
|
|
|
|
Returns:
|
|
True if deleted, False if not found
|
|
"""
|
|
result = await db.execute(
|
|
delete(AnimeSeries).where(AnimeSeries.id == series_id)
|
|
)
|
|
deleted = result.rowcount > 0
|
|
if deleted:
|
|
logger.info("Deleted anime series with id=%s", series_id)
|
|
return deleted
|
|
|
|
@staticmethod
|
|
async def search(
|
|
db: AsyncSession,
|
|
query: str,
|
|
limit: int = 50,
|
|
) -> List[AnimeSeries]:
|
|
"""Search anime series by name.
|
|
|
|
Args:
|
|
db: Database session
|
|
query: Search query
|
|
limit: Maximum results
|
|
|
|
Returns:
|
|
List of matching AnimeSeries instances
|
|
"""
|
|
result = await db.execute(
|
|
select(AnimeSeries)
|
|
.where(AnimeSeries.name.ilike(f"%{query}%"))
|
|
.limit(limit)
|
|
)
|
|
return list(result.scalars().all())
|
|
|
|
@staticmethod
|
|
async def get_series_with_missing_episodes(
|
|
db: AsyncSession,
|
|
limit: Optional[int] = None,
|
|
offset: int = 0,
|
|
) -> List[AnimeSeries]:
|
|
"""Get anime series that currently have missing episodes.
|
|
|
|
Episodes in the database represent missing episodes (from episodeDict).
|
|
This returns series that have at least one missing episode recorded in
|
|
the database (is_downloaded=False).
|
|
|
|
Args:
|
|
db: Database session
|
|
limit: Optional limit for results
|
|
offset: Offset for pagination
|
|
|
|
Returns:
|
|
List of AnimeSeries that have missing episodes.
|
|
"""
|
|
# Subquery to find series IDs with at least one missing episode
|
|
missing_series_ids = (
|
|
select(Episode.series_id)
|
|
.where(Episode.is_downloaded == False)
|
|
.distinct()
|
|
.subquery()
|
|
)
|
|
|
|
query = (
|
|
select(AnimeSeries)
|
|
.where(AnimeSeries.id.in_(select(missing_series_ids.c.series_id)))
|
|
.order_by(AnimeSeries.name)
|
|
.offset(offset)
|
|
)
|
|
|
|
if limit:
|
|
query = query.limit(limit)
|
|
|
|
result = await db.execute(query)
|
|
return list(result.scalars().all())
|
|
|
|
@staticmethod
|
|
async def get_series_with_no_episodes(
|
|
db: AsyncSession,
|
|
limit: Optional[int] = None,
|
|
offset: int = 0,
|
|
) -> List[AnimeSeries]:
|
|
"""Get anime series that have no downloaded episodes.
|
|
|
|
A series has "no episodes" if it has at least one missing episode
|
|
(is_downloaded=False) and no downloaded episodes (is_downloaded=True).
|
|
|
|
Args:
|
|
db: Database session
|
|
limit: Optional limit for results
|
|
offset: Offset for pagination
|
|
|
|
Returns:
|
|
List of AnimeSeries where no episodes are downloaded.
|
|
"""
|
|
# Series with missing episodes
|
|
missing_series_ids = (
|
|
select(Episode.series_id)
|
|
.where(Episode.is_downloaded == False)
|
|
.distinct()
|
|
.subquery()
|
|
)
|
|
|
|
# Series with any downloaded episodes
|
|
downloaded_series_ids = (
|
|
select(Episode.series_id)
|
|
.where(Episode.is_downloaded == True)
|
|
.distinct()
|
|
.subquery()
|
|
)
|
|
|
|
query = (
|
|
select(AnimeSeries)
|
|
.where(AnimeSeries.id.in_(select(missing_series_ids.c.series_id)))
|
|
.where(~AnimeSeries.id.in_(select(downloaded_series_ids.c.series_id)))
|
|
.order_by(AnimeSeries.name)
|
|
.offset(offset)
|
|
)
|
|
|
|
if limit:
|
|
query = query.limit(limit)
|
|
|
|
result = await db.execute(query)
|
|
return list(result.scalars().all())
|
|
|
|
@staticmethod
|
|
async def get_series_without_nfo(
|
|
db: AsyncSession,
|
|
limit: Optional[int] = None,
|
|
offset: int = 0,
|
|
) -> List[AnimeSeries]:
|
|
"""Get anime series without NFO files.
|
|
|
|
Returns series where has_nfo is False.
|
|
|
|
Args:
|
|
db: Database session
|
|
limit: Optional limit for results
|
|
offset: Offset for pagination
|
|
|
|
Returns:
|
|
List of AnimeSeries without NFO files
|
|
"""
|
|
query = (
|
|
select(AnimeSeries)
|
|
.where(AnimeSeries.has_nfo == False) # noqa: E712
|
|
.order_by(AnimeSeries.name)
|
|
.offset(offset)
|
|
)
|
|
|
|
if limit:
|
|
query = query.limit(limit)
|
|
|
|
result = await db.execute(query)
|
|
return list(result.scalars().all())
|
|
|
|
@staticmethod
|
|
async def count_all(db: AsyncSession) -> int:
|
|
"""Count total number of anime series.
|
|
|
|
Args:
|
|
db: Database session
|
|
|
|
Returns:
|
|
Total count of series
|
|
"""
|
|
from sqlalchemy import func
|
|
|
|
result = await db.execute(
|
|
select(func.count()).select_from(AnimeSeries)
|
|
)
|
|
return result.scalar() or 0
|
|
|
|
@staticmethod
|
|
async def count_with_nfo(db: AsyncSession) -> int:
|
|
"""Count anime series with NFO files.
|
|
|
|
Args:
|
|
db: Database session
|
|
|
|
Returns:
|
|
Count of series with has_nfo=True
|
|
"""
|
|
from sqlalchemy import func
|
|
|
|
result = await db.execute(
|
|
select(func.count())
|
|
.select_from(AnimeSeries)
|
|
.where(AnimeSeries.has_nfo == True) # noqa: E712
|
|
)
|
|
return result.scalar() or 0
|
|
|
|
@staticmethod
|
|
async def count_with_tmdb_id(db: AsyncSession) -> int:
|
|
"""Count anime series with TMDB ID.
|
|
|
|
Args:
|
|
db: Database session
|
|
|
|
Returns:
|
|
Count of series with tmdb_id set
|
|
"""
|
|
from sqlalchemy import func
|
|
|
|
result = await db.execute(
|
|
select(func.count())
|
|
.select_from(AnimeSeries)
|
|
.where(AnimeSeries.tmdb_id.isnot(None))
|
|
)
|
|
return result.scalar() or 0
|
|
|
|
@staticmethod
|
|
async def count_with_tvdb_id(db: AsyncSession) -> int:
|
|
"""Count anime series with TVDB ID.
|
|
|
|
Args:
|
|
db: Database session
|
|
|
|
Returns:
|
|
Count of series with tvdb_id set
|
|
"""
|
|
from sqlalchemy import func
|
|
|
|
result = await db.execute(
|
|
select(func.count())
|
|
.select_from(AnimeSeries)
|
|
.where(AnimeSeries.tvdb_id.isnot(None))
|
|
)
|
|
return result.scalar() or 0
|
|
|
|
|
|
# ============================================================================
|
|
# Episode Service
|
|
# ============================================================================
|
|
|
|
|
|
class EpisodeService:
|
|
"""Service for episode CRUD operations.
|
|
|
|
Provides methods for managing episodes within anime series.
|
|
"""
|
|
|
|
@staticmethod
|
|
async def create(
|
|
db: AsyncSession,
|
|
series_id: int,
|
|
season: int,
|
|
episode_number: int,
|
|
title: Optional[str] = None,
|
|
file_path: Optional[str] = None,
|
|
is_downloaded: bool = False,
|
|
) -> Episode:
|
|
"""Create a new episode.
|
|
|
|
Args:
|
|
db: Database session
|
|
series_id: Foreign key to AnimeSeries
|
|
season: Season number
|
|
episode_number: Episode number within season
|
|
title: Optional episode title
|
|
file_path: Optional local file path
|
|
is_downloaded: Whether episode is downloaded
|
|
|
|
Returns:
|
|
Created Episode instance
|
|
"""
|
|
episode = Episode(
|
|
series_id=series_id,
|
|
season=season,
|
|
episode_number=episode_number,
|
|
title=title,
|
|
file_path=file_path,
|
|
is_downloaded=is_downloaded,
|
|
)
|
|
db.add(episode)
|
|
await db.flush()
|
|
await db.refresh(episode)
|
|
logger.debug(
|
|
f"Created episode: S{season:02d}E{episode_number:02d} "
|
|
f"for series_id={series_id}"
|
|
)
|
|
return episode
|
|
|
|
@staticmethod
|
|
async def get_by_id(db: AsyncSession, episode_id: int) -> Optional[Episode]:
|
|
"""Get episode by ID.
|
|
|
|
Args:
|
|
db: Database session
|
|
episode_id: Episode primary key
|
|
|
|
Returns:
|
|
Episode instance or None if not found
|
|
"""
|
|
result = await db.execute(
|
|
select(Episode).where(Episode.id == episode_id)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
@staticmethod
|
|
async def get_by_series(
|
|
db: AsyncSession,
|
|
series_id: int,
|
|
season: Optional[int] = None,
|
|
only_missing: bool = False,
|
|
) -> List[Episode]:
|
|
"""Get episodes for a series.
|
|
|
|
Args:
|
|
db: Database session
|
|
series_id: Foreign key to AnimeSeries
|
|
season: Optional season filter
|
|
only_missing: If True, only return episodes where
|
|
is_downloaded is False (i.e., missing episodes).
|
|
Default False returns all episodes.
|
|
|
|
Returns:
|
|
List of Episode instances
|
|
"""
|
|
query = select(Episode).where(Episode.series_id == series_id)
|
|
|
|
if season is not None:
|
|
query = query.where(Episode.season == season)
|
|
|
|
if only_missing:
|
|
query = query.where(Episode.is_downloaded == False)
|
|
|
|
query = query.order_by(Episode.season, Episode.episode_number)
|
|
result = await db.execute(query)
|
|
return list(result.scalars().all())
|
|
|
|
@staticmethod
|
|
async def get_by_episode(
|
|
db: AsyncSession,
|
|
series_id: int,
|
|
season: int,
|
|
episode_number: int,
|
|
) -> Optional[Episode]:
|
|
"""Get specific episode.
|
|
|
|
Args:
|
|
db: Database session
|
|
series_id: Foreign key to AnimeSeries
|
|
season: Season number
|
|
episode_number: Episode number
|
|
|
|
Returns:
|
|
Episode instance or None if not found
|
|
"""
|
|
result = await db.execute(
|
|
select(Episode).where(
|
|
Episode.series_id == series_id,
|
|
Episode.season == season,
|
|
Episode.episode_number == episode_number,
|
|
)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
@staticmethod
|
|
async def mark_downloaded(
|
|
db: AsyncSession,
|
|
episode_id: int,
|
|
file_path: str,
|
|
) -> Optional[Episode]:
|
|
"""Mark episode as downloaded.
|
|
|
|
Args:
|
|
db: Database session
|
|
episode_id: Episode primary key
|
|
file_path: Local file path
|
|
|
|
Returns:
|
|
Updated Episode instance or None if not found
|
|
"""
|
|
episode = await EpisodeService.get_by_id(db, episode_id)
|
|
if not episode:
|
|
return None
|
|
|
|
episode.is_downloaded = True
|
|
episode.file_path = file_path
|
|
|
|
await db.flush()
|
|
await db.refresh(episode)
|
|
logger.info(
|
|
f"Marked episode as downloaded: "
|
|
f"S{episode.season:02d}E{episode.episode_number:02d}"
|
|
)
|
|
return episode
|
|
|
|
@staticmethod
|
|
async def delete(db: AsyncSession, episode_id: int) -> bool:
|
|
"""Delete episode.
|
|
|
|
Args:
|
|
db: Database session
|
|
episode_id: Episode primary key
|
|
|
|
Returns:
|
|
True if deleted, False if not found
|
|
"""
|
|
result = await db.execute(
|
|
delete(Episode).where(Episode.id == episode_id)
|
|
)
|
|
return result.rowcount > 0
|
|
|
|
@staticmethod
|
|
async def delete_by_series(
|
|
db: AsyncSession,
|
|
series_id: int,
|
|
season: int,
|
|
episode_number: int,
|
|
) -> bool:
|
|
"""Delete episode by series ID, season, and episode number.
|
|
|
|
Args:
|
|
db: Database session
|
|
series_id: Foreign key to AnimeSeries
|
|
season: Season number
|
|
episode_number: Episode number within season
|
|
|
|
Returns:
|
|
True if deleted, False if not found
|
|
"""
|
|
result = await db.execute(
|
|
delete(Episode).where(
|
|
Episode.series_id == series_id,
|
|
Episode.season == season,
|
|
Episode.episode_number == episode_number,
|
|
)
|
|
)
|
|
return result.rowcount > 0
|
|
|
|
@staticmethod
|
|
async def delete_by_series_and_episode(
|
|
db: AsyncSession,
|
|
series_key: str,
|
|
season: int,
|
|
episode_number: int,
|
|
) -> bool:
|
|
"""Delete episode by series key, season, and episode number.
|
|
|
|
Used to remove episodes from the missing list when they are
|
|
downloaded successfully.
|
|
|
|
Args:
|
|
db: Database session
|
|
series_key: Unique provider key for the series
|
|
season: Season number
|
|
episode_number: Episode number within season
|
|
|
|
Returns:
|
|
True if deleted, False if not found
|
|
"""
|
|
# First get the series by key
|
|
series = await AnimeSeriesService.get_by_key(db, series_key)
|
|
if not series:
|
|
logger.warning(
|
|
f"Series not found for key: {series_key}"
|
|
)
|
|
return False
|
|
|
|
# Then delete the episode
|
|
result = await db.execute(
|
|
delete(Episode).where(
|
|
Episode.series_id == series.id,
|
|
Episode.season == season,
|
|
Episode.episode_number == episode_number,
|
|
)
|
|
)
|
|
deleted = result.rowcount > 0
|
|
if deleted:
|
|
logger.info(
|
|
f"Removed episode from missing list: "
|
|
f"{series_key} S{season:02d}E{episode_number:02d}"
|
|
)
|
|
return deleted
|
|
|
|
@staticmethod
|
|
async def bulk_mark_downloaded(
|
|
db: AsyncSession,
|
|
episode_ids: List[int],
|
|
file_paths: Optional[List[str]] = None,
|
|
) -> int:
|
|
"""Mark multiple episodes as downloaded atomically.
|
|
|
|
This operation should be wrapped in a transaction for atomicity.
|
|
All episodes will be updated or none if an error occurs.
|
|
|
|
Args:
|
|
db: Database session
|
|
episode_ids: List of episode primary keys to update
|
|
file_paths: Optional list of file paths (parallel to episode_ids)
|
|
|
|
Returns:
|
|
Number of episodes updated
|
|
|
|
Note:
|
|
Use within @transactional or atomic() for guaranteed atomicity:
|
|
|
|
async with atomic(db) as tx:
|
|
count = await EpisodeService.bulk_mark_downloaded(
|
|
db, episode_ids, file_paths
|
|
)
|
|
"""
|
|
if not episode_ids:
|
|
return 0
|
|
|
|
updated_count = 0
|
|
|
|
for i, episode_id in enumerate(episode_ids):
|
|
episode = await EpisodeService.get_by_id(db, episode_id)
|
|
if episode:
|
|
episode.is_downloaded = True
|
|
if file_paths and i < len(file_paths):
|
|
episode.file_path = file_paths[i]
|
|
updated_count += 1
|
|
|
|
await db.flush()
|
|
logger.info("Bulk marked %s episodes as downloaded", updated_count)
|
|
|
|
return updated_count
|
|
|
|
|
|
# ============================================================================
|
|
# Download Queue Service
|
|
# ============================================================================
|
|
|
|
|
|
class DownloadQueueService:
|
|
"""Service for download queue CRUD operations.
|
|
|
|
Provides methods for managing the download queue.
|
|
|
|
Transaction Support:
|
|
All operations use flush() for transaction-safe operation.
|
|
For bulk operations, use @transactional or atomic() context.
|
|
"""
|
|
|
|
@staticmethod
|
|
async def create(
|
|
db: AsyncSession,
|
|
series_id: int,
|
|
episode_id: int,
|
|
download_url: Optional[str] = None,
|
|
file_destination: Optional[str] = None,
|
|
status: str = "pending",
|
|
retry_count: int = 0,
|
|
) -> DownloadQueueItem:
|
|
"""Add item to download queue.
|
|
|
|
Args:
|
|
db: Database session
|
|
series_id: Foreign key to AnimeSeries
|
|
episode_id: Foreign key to Episode
|
|
download_url: Optional provider download URL
|
|
file_destination: Optional target file path
|
|
status: Queue item status (default: "pending")
|
|
retry_count: Number of retry attempts (default: 0)
|
|
|
|
Returns:
|
|
Created DownloadQueueItem instance
|
|
"""
|
|
item = DownloadQueueItem(
|
|
series_id=series_id,
|
|
episode_id=episode_id,
|
|
download_url=download_url,
|
|
file_destination=file_destination,
|
|
status=status,
|
|
retry_count=retry_count,
|
|
)
|
|
db.add(item)
|
|
await db.flush()
|
|
await db.refresh(item)
|
|
logger.info(
|
|
f"Added to download queue: episode_id={episode_id} "
|
|
f"for series_id={series_id}, status={status}"
|
|
)
|
|
return item
|
|
|
|
@staticmethod
|
|
async def get_by_id(
|
|
db: AsyncSession,
|
|
item_id: int,
|
|
) -> Optional[DownloadQueueItem]:
|
|
"""Get download queue item by ID.
|
|
|
|
Args:
|
|
db: Database session
|
|
item_id: Item primary key
|
|
|
|
Returns:
|
|
DownloadQueueItem instance or None if not found
|
|
"""
|
|
result = await db.execute(
|
|
select(DownloadQueueItem).where(DownloadQueueItem.id == item_id)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
@staticmethod
|
|
async def get_by_episode(
|
|
db: AsyncSession,
|
|
episode_id: int,
|
|
status_filter: Optional[str] = None,
|
|
) -> Optional[DownloadQueueItem]:
|
|
"""Get download queue item by episode ID.
|
|
|
|
Args:
|
|
db: Database session
|
|
episode_id: Foreign key to Episode
|
|
status_filter: Optional status to filter by (e.g., "pending")
|
|
|
|
Returns:
|
|
DownloadQueueItem instance or None if not found
|
|
"""
|
|
query = select(DownloadQueueItem).where(
|
|
DownloadQueueItem.episode_id == episode_id
|
|
)
|
|
if status_filter:
|
|
query = query.where(DownloadQueueItem.status == status_filter)
|
|
result = await db.execute(query)
|
|
return result.scalar_one_or_none()
|
|
|
|
@staticmethod
|
|
async def get_all(
|
|
db: AsyncSession,
|
|
with_series: bool = False,
|
|
) -> List[DownloadQueueItem]:
|
|
"""Get all download queue items.
|
|
|
|
Args:
|
|
db: Database session
|
|
with_series: Whether to eagerly load series and episode data
|
|
|
|
Returns:
|
|
List of all DownloadQueueItem instances
|
|
"""
|
|
query = select(DownloadQueueItem)
|
|
|
|
if with_series:
|
|
# Eagerly load both series and episode relationships
|
|
query = query.options(
|
|
selectinload(DownloadQueueItem.series),
|
|
selectinload(DownloadQueueItem.episode)
|
|
)
|
|
|
|
query = query.order_by(
|
|
DownloadQueueItem.created_at.asc(),
|
|
)
|
|
|
|
result = await db.execute(query)
|
|
return list(result.scalars().all())
|
|
|
|
@staticmethod
|
|
async def set_error(
|
|
db: AsyncSession,
|
|
item_id: int,
|
|
error_message: str,
|
|
) -> Optional[DownloadQueueItem]:
|
|
"""Set error message on download queue item.
|
|
|
|
Args:
|
|
db: Database session
|
|
item_id: Item primary key
|
|
error_message: Error description
|
|
|
|
Returns:
|
|
Updated DownloadQueueItem instance or None if not found
|
|
"""
|
|
item = await DownloadQueueService.get_by_id(db, item_id)
|
|
if not item:
|
|
return None
|
|
|
|
item.error_message = error_message
|
|
|
|
await db.flush()
|
|
await db.refresh(item)
|
|
logger.debug("Set error on download queue item %s", item_id)
|
|
return item
|
|
|
|
@staticmethod
|
|
async def set_status(
|
|
db: AsyncSession,
|
|
item_id: int,
|
|
status: str,
|
|
) -> Optional[DownloadQueueItem]:
|
|
"""Set status on download queue item.
|
|
|
|
Args:
|
|
db: Database session
|
|
item_id: Item primary key
|
|
status: New status value
|
|
|
|
Returns:
|
|
Updated DownloadQueueItem instance or None if not found
|
|
"""
|
|
item = await DownloadQueueService.get_by_id(db, item_id)
|
|
if not item:
|
|
return None
|
|
|
|
item.status = status
|
|
|
|
await db.flush()
|
|
await db.refresh(item)
|
|
logger.debug("Set status on download queue item %s to %s", item_id, status)
|
|
return item
|
|
|
|
@staticmethod
|
|
async def increment_retry_count(
|
|
db: AsyncSession,
|
|
item_id: int,
|
|
) -> Optional[DownloadQueueItem]:
|
|
"""Increment retry count on download queue item.
|
|
|
|
Args:
|
|
db: Database session
|
|
item_id: Item primary key
|
|
|
|
Returns:
|
|
Updated DownloadQueueItem instance or None if not found
|
|
"""
|
|
item = await DownloadQueueService.get_by_id(db, item_id)
|
|
if not item:
|
|
return None
|
|
|
|
item.retry_count += 1
|
|
|
|
await db.flush()
|
|
await db.refresh(item)
|
|
logger.debug(
|
|
"Incremented retry count on download queue item %s to %s",
|
|
item_id, item.retry_count
|
|
)
|
|
return item
|
|
|
|
@staticmethod
|
|
async def set_status_and_error(
|
|
db: AsyncSession,
|
|
item_id: int,
|
|
status: str,
|
|
error_message: Optional[str] = None,
|
|
) -> Optional[DownloadQueueItem]:
|
|
"""Set status and error message on download queue item atomically.
|
|
|
|
Args:
|
|
db: Database session
|
|
item_id: Item primary key
|
|
status: New status value
|
|
error_message: Optional error description
|
|
|
|
Returns:
|
|
Updated DownloadQueueItem instance or None if not found
|
|
"""
|
|
item = await DownloadQueueService.get_by_id(db, item_id)
|
|
if not item:
|
|
return None
|
|
|
|
item.status = status
|
|
if error_message is not None:
|
|
item.error_message = error_message
|
|
|
|
await db.flush()
|
|
await db.refresh(item)
|
|
logger.debug(
|
|
"Set status=%s on download queue item %s, error=%s",
|
|
status, item_id, error_message
|
|
)
|
|
return item
|
|
|
|
@staticmethod
|
|
async def delete(db: AsyncSession, item_id: int) -> bool:
|
|
"""Delete download queue item.
|
|
|
|
Args:
|
|
db: Database session
|
|
item_id: Item primary key
|
|
|
|
Returns:
|
|
True if deleted, False if not found
|
|
"""
|
|
result = await db.execute(
|
|
delete(DownloadQueueItem).where(DownloadQueueItem.id == item_id)
|
|
)
|
|
deleted = result.rowcount > 0
|
|
if deleted:
|
|
logger.info("Deleted download queue item with id=%s", item_id)
|
|
return deleted
|
|
|
|
@staticmethod
|
|
async def delete_by_episode(
|
|
db: AsyncSession,
|
|
episode_id: int,
|
|
) -> bool:
|
|
"""Delete download queue item by episode ID.
|
|
|
|
Args:
|
|
db: Database session
|
|
episode_id: Foreign key to Episode
|
|
|
|
Returns:
|
|
True if deleted, False if not found
|
|
"""
|
|
result = await db.execute(
|
|
delete(DownloadQueueItem).where(
|
|
DownloadQueueItem.episode_id == episode_id
|
|
)
|
|
)
|
|
deleted = result.rowcount > 0
|
|
if deleted:
|
|
logger.info(
|
|
f"Deleted download queue item with episode_id={episode_id}"
|
|
)
|
|
return deleted
|
|
|
|
@staticmethod
|
|
async def bulk_delete(
|
|
db: AsyncSession,
|
|
item_ids: List[int],
|
|
) -> int:
|
|
"""Delete multiple download queue items atomically.
|
|
|
|
This operation should be wrapped in a transaction for atomicity.
|
|
All items will be deleted or none if an error occurs.
|
|
|
|
Args:
|
|
db: Database session
|
|
item_ids: List of item primary keys to delete
|
|
|
|
Returns:
|
|
Number of items deleted
|
|
|
|
Note:
|
|
Use within @transactional or atomic() for guaranteed atomicity:
|
|
|
|
async with atomic(db) as tx:
|
|
count = await DownloadQueueService.bulk_delete(db, item_ids)
|
|
"""
|
|
if not item_ids:
|
|
return 0
|
|
|
|
result = await db.execute(
|
|
delete(DownloadQueueItem).where(
|
|
DownloadQueueItem.id.in_(item_ids)
|
|
)
|
|
)
|
|
|
|
count = result.rowcount
|
|
logger.info("Bulk deleted %s download queue items", count)
|
|
|
|
return count
|
|
|
|
@staticmethod
|
|
async def clear_all(
|
|
db: AsyncSession,
|
|
) -> int:
|
|
"""Clear all download queue items.
|
|
|
|
Deletes all items from the download queue. This operation
|
|
should be wrapped in a transaction.
|
|
|
|
Args:
|
|
db: Database session
|
|
|
|
Returns:
|
|
Number of items deleted
|
|
"""
|
|
result = await db.execute(delete(DownloadQueueItem))
|
|
count = result.rowcount
|
|
logger.info("Cleared all %s download queue items", count)
|
|
return count
|
|
|
|
|
|
# ============================================================================
|
|
# User Session Service
|
|
# ============================================================================
|
|
|
|
|
|
class UserSessionService:
|
|
"""Service for user session CRUD operations.
|
|
|
|
Provides methods for managing user authentication sessions with JWT tokens.
|
|
|
|
Transaction Support:
|
|
Session rotation and cleanup operations should use transactions
|
|
for atomicity when multiple sessions are involved.
|
|
"""
|
|
|
|
@staticmethod
|
|
async def create(
|
|
db: AsyncSession,
|
|
session_id: str,
|
|
token_hash: str,
|
|
expires_at: datetime,
|
|
user_id: Optional[str] = None,
|
|
ip_address: Optional[str] = None,
|
|
user_agent: Optional[str] = None,
|
|
) -> UserSession:
|
|
"""Create a new user session.
|
|
|
|
Args:
|
|
db: Database session
|
|
session_id: Unique session identifier
|
|
token_hash: Hashed JWT token
|
|
expires_at: Session expiration timestamp
|
|
user_id: Optional user identifier
|
|
ip_address: Optional client IP address
|
|
user_agent: Optional client user agent
|
|
|
|
Returns:
|
|
Created UserSession instance
|
|
"""
|
|
session = UserSession(
|
|
session_id=session_id,
|
|
token_hash=token_hash,
|
|
expires_at=expires_at,
|
|
user_id=user_id,
|
|
ip_address=ip_address,
|
|
user_agent=user_agent,
|
|
)
|
|
db.add(session)
|
|
await db.flush()
|
|
await db.refresh(session)
|
|
logger.info("Created user session: %s", session_id)
|
|
return session
|
|
|
|
@staticmethod
|
|
async def get_by_session_id(
|
|
db: AsyncSession,
|
|
session_id: str,
|
|
) -> Optional[UserSession]:
|
|
"""Get session by session ID.
|
|
|
|
Args:
|
|
db: Database session
|
|
session_id: Unique session identifier
|
|
|
|
Returns:
|
|
UserSession instance or None if not found
|
|
"""
|
|
result = await db.execute(
|
|
select(UserSession).where(UserSession.session_id == session_id)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
@staticmethod
|
|
async def get_active_sessions(
|
|
db: AsyncSession,
|
|
user_id: Optional[str] = None,
|
|
) -> List[UserSession]:
|
|
"""Get active sessions.
|
|
|
|
Args:
|
|
db: Database session
|
|
user_id: Optional user ID filter
|
|
|
|
Returns:
|
|
List of active UserSession instances
|
|
"""
|
|
query = select(UserSession).where(
|
|
UserSession.is_active == True,
|
|
UserSession.expires_at > datetime.now(timezone.utc),
|
|
)
|
|
|
|
if user_id:
|
|
query = query.where(UserSession.user_id == user_id)
|
|
|
|
result = await db.execute(query)
|
|
return list(result.scalars().all())
|
|
|
|
@staticmethod
|
|
async def update_activity(
|
|
db: AsyncSession,
|
|
session_id: str,
|
|
) -> Optional[UserSession]:
|
|
"""Update session last activity timestamp.
|
|
|
|
Args:
|
|
db: Database session
|
|
session_id: Unique session identifier
|
|
|
|
Returns:
|
|
Updated UserSession instance or None if not found
|
|
"""
|
|
session = await UserSessionService.get_by_session_id(db, session_id)
|
|
if not session:
|
|
return None
|
|
|
|
session.last_activity = datetime.now(timezone.utc)
|
|
await db.flush()
|
|
await db.refresh(session)
|
|
return session
|
|
|
|
@staticmethod
|
|
async def revoke(db: AsyncSession, session_id: str) -> bool:
|
|
"""Revoke a session.
|
|
|
|
Args:
|
|
db: Database session
|
|
session_id: Unique session identifier
|
|
|
|
Returns:
|
|
True if revoked, False if not found
|
|
"""
|
|
session = await UserSessionService.get_by_session_id(db, session_id)
|
|
if not session:
|
|
return False
|
|
|
|
session.revoke()
|
|
await db.flush()
|
|
logger.info("Revoked user session: %s", session_id)
|
|
return True
|
|
|
|
@staticmethod
|
|
async def cleanup_expired(db: AsyncSession) -> int:
|
|
"""Clean up expired sessions.
|
|
|
|
This is a bulk delete operation that should be wrapped in
|
|
a transaction for atomicity when multiple sessions are deleted.
|
|
|
|
Args:
|
|
db: Database session
|
|
|
|
Returns:
|
|
Number of sessions deleted
|
|
"""
|
|
result = await db.execute(
|
|
delete(UserSession).where(
|
|
UserSession.expires_at < datetime.now(timezone.utc)
|
|
)
|
|
)
|
|
count = result.rowcount
|
|
logger.info("Cleaned up %s expired sessions", count)
|
|
return count
|
|
|
|
@staticmethod
|
|
async def rotate_session(
|
|
db: AsyncSession,
|
|
old_session_id: str,
|
|
new_session_id: str,
|
|
new_token_hash: str,
|
|
new_expires_at: datetime,
|
|
user_id: Optional[str] = None,
|
|
ip_address: Optional[str] = None,
|
|
user_agent: Optional[str] = None,
|
|
) -> Optional[UserSession]:
|
|
"""Rotate a session by revoking old and creating new atomically.
|
|
|
|
This compound operation revokes the old session and creates a new
|
|
one. Should be wrapped in a transaction for atomicity.
|
|
|
|
Args:
|
|
db: Database session
|
|
old_session_id: Session ID to revoke
|
|
new_session_id: New session ID
|
|
new_token_hash: New token hash
|
|
new_expires_at: New expiration time
|
|
user_id: Optional user identifier
|
|
ip_address: Optional client IP
|
|
user_agent: Optional user agent
|
|
|
|
Returns:
|
|
New UserSession instance, or None if old session not found
|
|
|
|
Note:
|
|
Use within @transactional or atomic() for atomicity:
|
|
|
|
async with atomic(db) as tx:
|
|
new_session = await UserSessionService.rotate_session(
|
|
db, old_id, new_id, hash, expires
|
|
)
|
|
"""
|
|
# Revoke old session
|
|
old_revoked = await UserSessionService.revoke(db, old_session_id)
|
|
if not old_revoked:
|
|
logger.warning(
|
|
f"Could not rotate: old session {old_session_id} not found"
|
|
)
|
|
return None
|
|
|
|
# Create new session
|
|
new_session = await UserSessionService.create(
|
|
db=db,
|
|
session_id=new_session_id,
|
|
token_hash=new_token_hash,
|
|
expires_at=new_expires_at,
|
|
user_id=user_id,
|
|
ip_address=ip_address,
|
|
user_agent=user_agent,
|
|
)
|
|
|
|
logger.info(
|
|
f"Rotated session: {old_session_id} -> {new_session_id}"
|
|
)
|
|
|
|
return new_session
|
|
|
|
|
|
# ============================================================================
|
|
# Unresolved Folder Service
|
|
# ============================================================================
|
|
|
|
|
|
class UnresolvedFolderService:
|
|
"""Service for tracking and resolving folders that couldn't be auto-resolved.
|
|
|
|
During initial setup, some folders may not resolve to a provider key
|
|
(no search match or multiple ambiguous matches). These are tracked as
|
|
UnresolvedFolder records and can later be resolved by the user providing
|
|
the correct provider key.
|
|
"""
|
|
|
|
@staticmethod
|
|
async def create(
|
|
db: AsyncSession,
|
|
folder_name: str,
|
|
title: str,
|
|
year: int | None = None,
|
|
search_attempts: int = 1,
|
|
last_search_result: str | None = None,
|
|
) -> UnresolvedFolder:
|
|
"""Create a new unresolved folder tracking record.
|
|
|
|
Args:
|
|
db: Database session
|
|
folder_name: Original filesystem folder name
|
|
title: Extracted title from folder name
|
|
year: Extracted release year (optional)
|
|
search_attempts: Number of search attempts made (default: 1)
|
|
last_search_result: JSON string of search results for UI (optional)
|
|
|
|
Returns:
|
|
Created UnresolvedFolder instance
|
|
"""
|
|
folder = UnresolvedFolder(
|
|
folder_name=folder_name,
|
|
title=title,
|
|
year=year,
|
|
search_attempts=search_attempts,
|
|
last_search_result=last_search_result,
|
|
)
|
|
db.add(folder)
|
|
await db.flush()
|
|
await db.refresh(folder)
|
|
logger.info(
|
|
"Created unresolved folder tracking: %s (title=%s, year=%s)",
|
|
folder_name, title, year
|
|
)
|
|
return folder
|
|
|
|
@staticmethod
|
|
async def get_by_folder_name(
|
|
db: AsyncSession,
|
|
folder_name: str,
|
|
) -> Optional[UnresolvedFolder]:
|
|
"""Get unresolved folder by folder name.
|
|
|
|
Args:
|
|
db: Database session
|
|
folder_name: Filesystem folder name to look up
|
|
|
|
Returns:
|
|
UnresolvedFolder instance or None if not found
|
|
"""
|
|
result = await db.execute(
|
|
select(UnresolvedFolder).where(
|
|
UnresolvedFolder.folder_name == folder_name
|
|
)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
@staticmethod
|
|
async def get_all_unresolved(
|
|
db: AsyncSession,
|
|
) -> list[UnresolvedFolder]:
|
|
"""Get all unresolved folders that haven't been resolved yet.
|
|
|
|
Args:
|
|
db: Database session
|
|
|
|
Returns:
|
|
List of unresolved UnresolvedFolder instances
|
|
"""
|
|
result = await db.execute(
|
|
select(UnresolvedFolder)
|
|
.where(UnresolvedFolder.provider_key.is_(None))
|
|
.order_by(UnresolvedFolder.created_at)
|
|
)
|
|
return list(result.scalars().all())
|
|
|
|
@staticmethod
|
|
async def resolve(
|
|
db: AsyncSession,
|
|
folder_name: str,
|
|
provider_key: str,
|
|
) -> Optional[UnresolvedFolder]:
|
|
"""Mark an unresolved folder as resolved with the given provider key.
|
|
|
|
Args:
|
|
db: Database session
|
|
folder_name: Filesystem folder name to resolve
|
|
provider_key: Provider key to associate with this folder
|
|
|
|
Returns:
|
|
Updated UnresolvedFolder instance or None if not found
|
|
"""
|
|
from datetime import datetime, timezone
|
|
|
|
folder = await UnresolvedFolderService.get_by_folder_name(db, folder_name)
|
|
if not folder:
|
|
return None
|
|
|
|
folder.provider_key = provider_key
|
|
folder.resolved_at = datetime.now(timezone.utc)
|
|
await db.flush()
|
|
await db.refresh(folder)
|
|
logger.info(
|
|
"Resolved unresolved folder: %s -> key=%s",
|
|
folder_name, provider_key
|
|
)
|
|
return folder
|
|
|
|
@staticmethod
|
|
async def delete(
|
|
db: AsyncSession,
|
|
folder_name: str,
|
|
) -> bool:
|
|
"""Delete an unresolved folder record (e.g., after manual add).
|
|
|
|
Args:
|
|
db: Database session
|
|
folder_name: Filesystem folder name to delete
|
|
|
|
Returns:
|
|
True if deleted, False if not found
|
|
"""
|
|
folder = await UnresolvedFolderService.get_by_folder_name(db, folder_name)
|
|
if not folder:
|
|
return False
|
|
|
|
await db.delete(folder)
|
|
await db.flush()
|
|
return True
|
|
|
|
@staticmethod
|
|
async def update_search_result(
|
|
db: AsyncSession,
|
|
folder_name: str,
|
|
search_result: str,
|
|
) -> Optional[UnresolvedFolder]:
|
|
"""Update the cached search result for an unresolved folder.
|
|
|
|
Args:
|
|
db: Database session
|
|
folder_name: Filesystem folder name to update
|
|
search_result: JSON string of search results
|
|
|
|
Returns:
|
|
Updated UnresolvedFolder instance or None if not found
|
|
"""
|
|
folder = await UnresolvedFolderService.get_by_folder_name(db, folder_name)
|
|
if not folder:
|
|
return None
|
|
|
|
folder.search_attempts += 1
|
|
folder.last_search_result = search_result
|
|
await db.flush()
|
|
await db.refresh(folder)
|
|
return folder
|
|
|