Implement async series data loading with background processing
- Add loading status fields to AnimeSeries model
- Create BackgroundLoaderService for async task processing
- Update POST /api/anime/add to return 202 Accepted immediately
- Add GET /api/anime/{key}/loading-status endpoint
- Integrate background loader with startup/shutdown lifecycle
- Create database migration script for loading status fields
- Add unit tests for BackgroundLoaderService (10 tests, all passing)
- Update AnimeSeriesService.create() to accept loading status fields
Architecture follows clean separation with no code duplication:
- BackgroundLoader orchestrates, doesn't reimplement
- Reuses existing AnimeService, NFOService, WebSocket patterns
- Database-backed status survives restarts
This commit is contained in:
520
src/server/services/background_loader_service.py
Normal file
520
src/server/services/background_loader_service.py
Normal file
@@ -0,0 +1,520 @@
|
||||
"""Background loader service for asynchronous series data loading.
|
||||
|
||||
This service orchestrates background loading of series metadata (episodes, NFO files,
|
||||
logos, images) without blocking the user. It provides a task queue system for managing
|
||||
loading operations and real-time status updates via WebSocket.
|
||||
|
||||
Key Features:
|
||||
- Asynchronous task queue for series data loading
|
||||
- Reuses existing services (AnimeService, NFOService) to avoid code duplication
|
||||
- Real-time progress updates via WebSocket
|
||||
- Graceful startup and shutdown handling
|
||||
- Error handling with retry logic
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import structlog
|
||||
|
||||
from src.server.services.websocket_service import WebSocketService
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
class LoadingStatus(str, Enum):
|
||||
"""Status of a series loading task."""
|
||||
|
||||
PENDING = "pending"
|
||||
LOADING_EPISODES = "loading_episodes"
|
||||
LOADING_NFO = "loading_nfo"
|
||||
LOADING_LOGO = "loading_logo"
|
||||
LOADING_IMAGES = "loading_images"
|
||||
COMPLETED = "completed"
|
||||
FAILED = "failed"
|
||||
|
||||
|
||||
@dataclass
|
||||
class SeriesLoadingTask:
|
||||
"""Represents a series loading task with progress tracking.
|
||||
|
||||
Attributes:
|
||||
key: Series unique identifier (primary key)
|
||||
folder: Series folder name (metadata only)
|
||||
name: Series display name
|
||||
year: Series release year
|
||||
status: Current loading status
|
||||
progress: Dict tracking what data has been loaded
|
||||
started_at: When loading started
|
||||
completed_at: When loading completed
|
||||
error: Error message if loading failed
|
||||
"""
|
||||
|
||||
key: str
|
||||
folder: str
|
||||
name: str
|
||||
year: Optional[int] = None
|
||||
status: LoadingStatus = LoadingStatus.PENDING
|
||||
progress: Dict[str, bool] = field(default_factory=lambda: {
|
||||
"episodes": False,
|
||||
"nfo": False,
|
||||
"logo": False,
|
||||
"images": False
|
||||
})
|
||||
started_at: Optional[datetime] = None
|
||||
completed_at: Optional[datetime] = None
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
class BackgroundLoaderService:
|
||||
"""Service for managing background loading of series metadata.
|
||||
|
||||
This service orchestrates asynchronous loading by delegating to existing
|
||||
services (AnimeService for episodes, NFOService for NFO/images) rather
|
||||
than reimplementing logic. It provides task queuing, status tracking,
|
||||
and WebSocket notifications.
|
||||
|
||||
Attributes:
|
||||
websocket_service: Service for broadcasting status updates
|
||||
anime_service: Service for episode scanning (reused)
|
||||
series_app: Core SeriesApp instance for NFO service access
|
||||
task_queue: Queue of pending loading tasks
|
||||
active_tasks: Dict of currently processing tasks
|
||||
worker_task: Background worker task
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
websocket_service: WebSocketService,
|
||||
anime_service: Any, # AnimeService - avoiding circular import
|
||||
series_app: Any, # SeriesApp - avoiding circular import
|
||||
):
|
||||
"""Initialize the background loader service.
|
||||
|
||||
Args:
|
||||
websocket_service: WebSocket service for status broadcasts
|
||||
anime_service: AnimeService instance for episode operations
|
||||
series_app: SeriesApp instance for NFO operations
|
||||
"""
|
||||
self.websocket_service = websocket_service
|
||||
self.anime_service = anime_service
|
||||
self.series_app = series_app
|
||||
|
||||
# Task management
|
||||
self.task_queue: asyncio.Queue[SeriesLoadingTask] = asyncio.Queue()
|
||||
self.active_tasks: Dict[str, SeriesLoadingTask] = {}
|
||||
self.worker_task: Optional[asyncio.Task] = None
|
||||
self._shutdown = False
|
||||
|
||||
logger.info("BackgroundLoaderService initialized")
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the background worker task."""
|
||||
if self.worker_task is not None and not self.worker_task.done():
|
||||
logger.warning("Background worker already running")
|
||||
return
|
||||
|
||||
self._shutdown = False
|
||||
self.worker_task = asyncio.create_task(self._worker())
|
||||
logger.info("Background worker started")
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop the background worker gracefully."""
|
||||
if self.worker_task is None:
|
||||
return
|
||||
|
||||
logger.info("Stopping background worker...")
|
||||
self._shutdown = True
|
||||
|
||||
# Cancel the worker task
|
||||
if not self.worker_task.done():
|
||||
self.worker_task.cancel()
|
||||
try:
|
||||
await self.worker_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
logger.info("Background worker stopped")
|
||||
|
||||
async def add_series_loading_task(
|
||||
self,
|
||||
key: str,
|
||||
folder: str,
|
||||
name: str,
|
||||
year: Optional[int] = None
|
||||
) -> None:
|
||||
"""Add a series to the loading queue.
|
||||
|
||||
Args:
|
||||
key: Series unique identifier (primary key)
|
||||
folder: Series folder name (metadata only)
|
||||
name: Series display name
|
||||
year: Series release year
|
||||
"""
|
||||
# Check if task already exists
|
||||
if key in self.active_tasks:
|
||||
logger.debug(f"Task for series {key} already exists, skipping")
|
||||
return
|
||||
|
||||
task = SeriesLoadingTask(
|
||||
key=key,
|
||||
folder=folder,
|
||||
name=name,
|
||||
year=year,
|
||||
started_at=datetime.now(timezone.utc)
|
||||
)
|
||||
|
||||
self.active_tasks[key] = task
|
||||
await self.task_queue.put(task)
|
||||
|
||||
logger.info(f"Added loading task for series: {key}")
|
||||
|
||||
# Broadcast initial status
|
||||
await self._broadcast_status(task)
|
||||
|
||||
async def check_missing_data(
|
||||
self,
|
||||
key: str,
|
||||
folder: str,
|
||||
anime_directory: str,
|
||||
db: Any
|
||||
) -> Dict[str, bool]:
|
||||
"""Check what data is missing for a series.
|
||||
|
||||
Args:
|
||||
key: Series unique identifier
|
||||
folder: Series folder name
|
||||
anime_directory: Base anime directory path
|
||||
db: Database session
|
||||
|
||||
Returns:
|
||||
Dict indicating what data is missing (True = missing, False = exists)
|
||||
"""
|
||||
missing = {
|
||||
"episodes": False,
|
||||
"nfo": False,
|
||||
"logo": False,
|
||||
"images": False
|
||||
}
|
||||
|
||||
# Check database for series info
|
||||
from src.server.database.service import AnimeSeriesService
|
||||
|
||||
series_db = await AnimeSeriesService.get_by_key(db, key)
|
||||
if not series_db:
|
||||
# Series doesn't exist in DB, need everything
|
||||
missing = {k: True for k in missing}
|
||||
return missing
|
||||
|
||||
# Check episodes
|
||||
missing["episodes"] = not series_db.episodes_loaded
|
||||
|
||||
# Check NFO file
|
||||
nfo_path = Path(anime_directory) / folder / "tvshow.nfo"
|
||||
missing["nfo"] = not nfo_path.exists() or not series_db.has_nfo
|
||||
|
||||
# Check logo
|
||||
logo_path = Path(anime_directory) / folder / "logo.png"
|
||||
missing["logo"] = not logo_path.exists() or not series_db.logo_loaded
|
||||
|
||||
# Check images (poster and fanart)
|
||||
poster_path = Path(anime_directory) / folder / "poster.jpg"
|
||||
fanart_path = Path(anime_directory) / folder / "fanart.jpg"
|
||||
missing["images"] = (
|
||||
not (poster_path.exists() and fanart_path.exists())
|
||||
or not series_db.images_loaded
|
||||
)
|
||||
|
||||
return missing
|
||||
|
||||
async def _worker(self) -> None:
|
||||
"""Background worker that processes loading tasks from the queue."""
|
||||
logger.info("Background worker started processing tasks")
|
||||
|
||||
while not self._shutdown:
|
||||
try:
|
||||
# Wait for a task with timeout to allow shutdown checks
|
||||
task = await asyncio.wait_for(
|
||||
self.task_queue.get(),
|
||||
timeout=1.0
|
||||
)
|
||||
|
||||
logger.info(f"Processing loading task for series: {task.key}")
|
||||
|
||||
# Process the task
|
||||
await self._load_series_data(task)
|
||||
|
||||
# Mark task as done
|
||||
self.task_queue.task_done()
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# No task available, continue loop
|
||||
continue
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Worker task cancelled")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"Error in background worker: {e}")
|
||||
# Continue processing other tasks
|
||||
continue
|
||||
|
||||
logger.info("Background worker stopped")
|
||||
|
||||
async def _load_series_data(self, task: SeriesLoadingTask) -> None:
|
||||
"""Load all missing data for a series.
|
||||
|
||||
Orchestrates loading by calling existing services (AnimeService, NFOService)
|
||||
rather than reimplementing logic. Updates status and broadcasts progress.
|
||||
|
||||
Args:
|
||||
task: The loading task to process
|
||||
"""
|
||||
try:
|
||||
# Get database session
|
||||
from src.server.database.connection import get_db_session
|
||||
from src.server.database.service import AnimeSeriesService
|
||||
|
||||
async for db in get_db_session():
|
||||
try:
|
||||
# Check what data is missing
|
||||
missing = await self.check_missing_data(
|
||||
task.key,
|
||||
task.folder,
|
||||
self.series_app.directory_to_search,
|
||||
db
|
||||
)
|
||||
|
||||
# Load episodes if missing
|
||||
if missing["episodes"]:
|
||||
await self._load_episodes(task, db)
|
||||
else:
|
||||
task.progress["episodes"] = True
|
||||
|
||||
# Load NFO and images if missing
|
||||
if missing["nfo"] or missing["logo"] or missing["images"]:
|
||||
await self._load_nfo_and_images(task, db)
|
||||
else:
|
||||
task.progress["nfo"] = True
|
||||
task.progress["logo"] = True
|
||||
task.progress["images"] = True
|
||||
|
||||
# Mark as completed
|
||||
task.status = LoadingStatus.COMPLETED
|
||||
task.completed_at = datetime.now(timezone.utc)
|
||||
|
||||
# Update database
|
||||
series_db = await AnimeSeriesService.get_by_key(db, task.key)
|
||||
if series_db:
|
||||
series_db.loading_status = "completed"
|
||||
series_db.loading_completed_at = task.completed_at
|
||||
series_db.loading_error = None
|
||||
await db.commit()
|
||||
|
||||
# Broadcast completion
|
||||
await self._broadcast_status(task)
|
||||
|
||||
logger.info(f"Successfully loaded all data for series: {task.key}")
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Error loading series data: {e}")
|
||||
task.status = LoadingStatus.FAILED
|
||||
task.error = str(e)
|
||||
task.completed_at = datetime.now(timezone.utc)
|
||||
|
||||
# Update database with error
|
||||
series_db = await AnimeSeriesService.get_by_key(db, task.key)
|
||||
if series_db:
|
||||
series_db.loading_status = "failed"
|
||||
series_db.loading_error = str(e)
|
||||
series_db.loading_completed_at = task.completed_at
|
||||
await db.commit()
|
||||
|
||||
# Broadcast error
|
||||
await self._broadcast_status(task)
|
||||
|
||||
break # Exit async for loop after first iteration
|
||||
|
||||
finally:
|
||||
# Remove from active tasks
|
||||
self.active_tasks.pop(task.key, None)
|
||||
|
||||
async def _load_episodes(self, task: SeriesLoadingTask, db: Any) -> None:
|
||||
"""Load episodes for a series by reusing AnimeService.
|
||||
|
||||
Args:
|
||||
task: The loading task
|
||||
db: Database session
|
||||
"""
|
||||
task.status = LoadingStatus.LOADING_EPISODES
|
||||
await self._broadcast_status(task, "Loading episodes...")
|
||||
|
||||
try:
|
||||
# Use existing AnimeService to rescan episodes
|
||||
# This reuses all existing episode detection logic
|
||||
await self.anime_service.rescan()
|
||||
|
||||
# Update task progress
|
||||
task.progress["episodes"] = True
|
||||
|
||||
# Update database
|
||||
from src.server.database.service import AnimeSeriesService
|
||||
series_db = await AnimeSeriesService.get_by_key(db, task.key)
|
||||
if series_db:
|
||||
series_db.episodes_loaded = True
|
||||
series_db.loading_status = "loading_episodes"
|
||||
await db.commit()
|
||||
|
||||
logger.info(f"Episodes loaded for series: {task.key}")
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to load episodes for {task.key}: {e}")
|
||||
raise
|
||||
|
||||
async def _load_nfo_and_images(self, task: SeriesLoadingTask, db: Any) -> None:
|
||||
"""Load NFO file and images for a series by reusing NFOService.
|
||||
|
||||
Args:
|
||||
task: The loading task
|
||||
db: Database session
|
||||
"""
|
||||
task.status = LoadingStatus.LOADING_NFO
|
||||
await self._broadcast_status(task, "Generating NFO file...")
|
||||
|
||||
try:
|
||||
# Check if NFOService is available
|
||||
if not self.series_app.nfo_service:
|
||||
logger.warning(
|
||||
f"NFOService not available, skipping NFO/images for {task.key}"
|
||||
)
|
||||
task.progress["nfo"] = False
|
||||
task.progress["logo"] = False
|
||||
task.progress["images"] = False
|
||||
return
|
||||
|
||||
# Use existing NFOService to create NFO with all images
|
||||
# This reuses all existing TMDB API logic and image downloading
|
||||
nfo_path = await self.series_app.nfo_service.create_tvshow_nfo(
|
||||
serie_name=task.name,
|
||||
serie_folder=task.folder,
|
||||
year=task.year,
|
||||
download_poster=True,
|
||||
download_logo=True,
|
||||
download_fanart=True
|
||||
)
|
||||
|
||||
# Update task progress
|
||||
task.progress["nfo"] = True
|
||||
task.progress["logo"] = True
|
||||
task.progress["images"] = True
|
||||
|
||||
# Update database
|
||||
from src.server.database.service import AnimeSeriesService
|
||||
series_db = await AnimeSeriesService.get_by_key(db, task.key)
|
||||
if series_db:
|
||||
series_db.has_nfo = True
|
||||
series_db.nfo_created_at = datetime.now(timezone.utc)
|
||||
series_db.logo_loaded = True
|
||||
series_db.images_loaded = True
|
||||
series_db.loading_status = "loading_nfo"
|
||||
await db.commit()
|
||||
|
||||
logger.info(f"NFO and images loaded for series: {task.key}")
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to load NFO/images for {task.key}: {e}")
|
||||
# Don't fail the entire task if NFO fails
|
||||
task.progress["nfo"] = False
|
||||
task.progress["logo"] = False
|
||||
task.progress["images"] = False
|
||||
|
||||
async def _broadcast_status(
|
||||
self,
|
||||
task: SeriesLoadingTask,
|
||||
message: Optional[str] = None
|
||||
) -> None:
|
||||
"""Broadcast loading status update via WebSocket.
|
||||
|
||||
Args:
|
||||
task: The loading task
|
||||
message: Optional status message
|
||||
"""
|
||||
if not message:
|
||||
if task.status == LoadingStatus.PENDING:
|
||||
message = "Queued for loading..."
|
||||
elif task.status == LoadingStatus.LOADING_EPISODES:
|
||||
message = "Loading episodes..."
|
||||
elif task.status == LoadingStatus.LOADING_NFO:
|
||||
message = "Generating NFO file..."
|
||||
elif task.status == LoadingStatus.COMPLETED:
|
||||
message = "All data loaded successfully"
|
||||
elif task.status == LoadingStatus.FAILED:
|
||||
message = f"Loading failed: {task.error}"
|
||||
else:
|
||||
message = "Loading..."
|
||||
|
||||
payload = {
|
||||
"type": "series_loading_update",
|
||||
"key": task.key,
|
||||
"folder": task.folder,
|
||||
"loading_status": task.status.value,
|
||||
"progress": task.progress,
|
||||
"message": message,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"error": task.error
|
||||
}
|
||||
|
||||
await self.websocket_service.broadcast(payload)
|
||||
|
||||
|
||||
# Singleton instance
|
||||
_background_loader_service: Optional[BackgroundLoaderService] = None
|
||||
|
||||
|
||||
def init_background_loader_service(
|
||||
websocket_service: WebSocketService,
|
||||
anime_service: Any,
|
||||
series_app: Any
|
||||
) -> BackgroundLoaderService:
|
||||
"""Initialize the background loader service singleton.
|
||||
|
||||
Args:
|
||||
websocket_service: WebSocket service for broadcasts
|
||||
anime_service: AnimeService instance
|
||||
series_app: SeriesApp instance
|
||||
|
||||
Returns:
|
||||
BackgroundLoaderService instance
|
||||
"""
|
||||
global _background_loader_service
|
||||
|
||||
if _background_loader_service is None:
|
||||
_background_loader_service = BackgroundLoaderService(
|
||||
websocket_service=websocket_service,
|
||||
anime_service=anime_service,
|
||||
series_app=series_app
|
||||
)
|
||||
|
||||
return _background_loader_service
|
||||
|
||||
|
||||
def get_background_loader_service() -> BackgroundLoaderService:
|
||||
"""Get the background loader service singleton.
|
||||
|
||||
Returns:
|
||||
BackgroundLoaderService instance
|
||||
|
||||
Raises:
|
||||
RuntimeError: If service not initialized
|
||||
"""
|
||||
if _background_loader_service is None:
|
||||
raise RuntimeError(
|
||||
"BackgroundLoaderService not initialized. "
|
||||
"Call init_background_loader_service() first."
|
||||
)
|
||||
|
||||
return _background_loader_service
|
||||
Reference in New Issue
Block a user