Add concurrent anime processing support

- Modified BackgroundLoaderService to use multiple workers (default: 5)
- Anime additions now process in parallel without blocking
- Added comprehensive unit tests for concurrent behavior
- Updated integration tests for compatibility
- Updated architecture documentation
This commit is contained in:
2026-01-24 17:42:59 +01:00
parent 04f26d5cfc
commit 8ff558cb07
6 changed files with 530 additions and 843 deletions

View File

@@ -79,13 +79,18 @@ class BackgroundLoaderService:
than reimplementing logic. It provides task queuing, status tracking,
and WebSocket notifications.
Supports concurrent processing of multiple series simultaneously for
improved performance when adding multiple anime.
Attributes:
websocket_service: Service for broadcasting status updates
anime_service: Service for episode scanning (reused)
series_app: Core SeriesApp instance for NFO service access
task_queue: Queue of pending loading tasks
active_tasks: Dict of currently processing tasks
worker_task: Background worker task
processing_tasks: Dict of asyncio tasks being processed
worker_tasks: List of background worker tasks
max_concurrent_loads: Maximum number of series to load concurrently
"""
def __init__(
@@ -93,6 +98,7 @@ class BackgroundLoaderService:
websocket_service: WebSocketService,
anime_service: Any, # AnimeService - avoiding circular import
series_app: Any, # SeriesApp - avoiding circular import
max_concurrent_loads: int = 5,
):
"""Initialize the background loader service.
@@ -100,46 +106,70 @@ class BackgroundLoaderService:
websocket_service: WebSocket service for status broadcasts
anime_service: AnimeService instance for episode operations
series_app: SeriesApp instance for NFO operations
max_concurrent_loads: Maximum number of series to load concurrently (default: 5)
"""
self.websocket_service = websocket_service
self.anime_service = anime_service
self.series_app = series_app
self.max_concurrent_loads = max_concurrent_loads
# Task management
self.task_queue: asyncio.Queue[SeriesLoadingTask] = asyncio.Queue()
self.active_tasks: Dict[str, SeriesLoadingTask] = {}
self.worker_task: Optional[asyncio.Task] = None
self.processing_tasks: Dict[str, asyncio.Task] = {}
self.worker_tasks: List[asyncio.Task] = []
self._shutdown = False
logger.info("BackgroundLoaderService initialized")
logger.info(
"BackgroundLoaderService initialized",
extra={"max_concurrent_loads": max_concurrent_loads}
)
async def start(self) -> None:
"""Start the background worker task."""
if self.worker_task is not None and not self.worker_task.done():
logger.warning("Background worker already running")
"""Start the background worker tasks for concurrent processing."""
if self.worker_tasks and any(not task.done() for task in self.worker_tasks):
logger.warning("Background workers already running")
return
self._shutdown = False
self.worker_task = asyncio.create_task(self._worker())
logger.info("Background worker started")
# Start multiple workers for concurrent processing
self.worker_tasks = []
for i in range(self.max_concurrent_loads):
worker = asyncio.create_task(self._worker(worker_id=i))
self.worker_tasks.append(worker)
logger.info(
"Background workers started",
extra={"num_workers": len(self.worker_tasks)}
)
async def stop(self) -> None:
"""Stop the background worker gracefully."""
if self.worker_task is None:
"""Stop all background workers gracefully."""
if not self.worker_tasks:
return
logger.info("Stopping background worker...")
logger.info("Stopping background workers...")
self._shutdown = True
# Cancel the worker task
if not self.worker_task.done():
self.worker_task.cancel()
try:
await self.worker_task
except asyncio.CancelledError:
pass
# Cancel all worker tasks
for worker_task in self.worker_tasks:
if not worker_task.done():
worker_task.cancel()
logger.info("Background worker stopped")
# Wait for all workers to finish
results = await asyncio.gather(*self.worker_tasks, return_exceptions=True)
# Log any unexpected exceptions (ignore CancelledError)
for i, result in enumerate(results):
if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
logger.error(
f"Worker {i} stopped with exception",
extra={"exception": str(result)}
)
self.worker_tasks = []
logger.info("All background workers stopped")
async def add_series_loading_task(
self,
@@ -232,9 +262,15 @@ class BackgroundLoaderService:
return missing
async def _worker(self) -> None:
"""Background worker that processes loading tasks from the queue."""
logger.info("Background worker started processing tasks")
async def _worker(self, worker_id: int = 0) -> None:
"""Background worker that processes loading tasks from the queue.
Multiple workers can run concurrently to process tasks in parallel.
Args:
worker_id: Unique identifier for this worker instance
"""
logger.info(f"Background worker {worker_id} started processing tasks")
while not self._shutdown:
try:
@@ -244,7 +280,9 @@ class BackgroundLoaderService:
timeout=1.0
)
logger.info(f"Processing loading task for series: {task.key}")
logger.info(
f"Worker {worker_id} processing loading task for series: {task.key}"
)
# Process the task
await self._load_series_data(task)
@@ -256,14 +294,14 @@ class BackgroundLoaderService:
# No task available, continue loop
continue
except asyncio.CancelledError:
logger.info("Worker task cancelled")
logger.info(f"Worker {worker_id} task cancelled")
break
except Exception as e:
logger.exception(f"Error in background worker: {e}")
logger.exception(f"Error in background worker {worker_id}: {e}")
# Continue processing other tasks
continue
logger.info("Background worker stopped")
logger.info(f"Background worker {worker_id} stopped")
async def _load_series_data(self, task: SeriesLoadingTask) -> None:
"""Load all missing data for a series.
@@ -653,7 +691,8 @@ _background_loader_service: Optional[BackgroundLoaderService] = None
def init_background_loader_service(
websocket_service: WebSocketService,
anime_service: Any,
series_app: Any
series_app: Any,
max_concurrent_loads: int = 5,
) -> BackgroundLoaderService:
"""Initialize the background loader service singleton.
@@ -661,6 +700,7 @@ def init_background_loader_service(
websocket_service: WebSocket service for broadcasts
anime_service: AnimeService instance
series_app: SeriesApp instance
max_concurrent_loads: Maximum number of series to load concurrently (default: 5)
Returns:
BackgroundLoaderService instance
@@ -671,7 +711,8 @@ def init_background_loader_service(
_background_loader_service = BackgroundLoaderService(
websocket_service=websocket_service,
anime_service=anime_service,
series_app=series_app
series_app=series_app,
max_concurrent_loads=max_concurrent_loads,
)
return _background_loader_service