feat: Implement download queue API endpoints
- Add comprehensive REST API for download queue management
- Implement GET /api/queue/status endpoint with queue status and statistics
- Implement POST /api/queue/add for adding episodes to queue with priority support
- Implement DELETE /api/queue/{id} and DELETE /api/queue/ for removing items
- Implement POST /api/queue/start and /api/queue/stop for queue control
- Implement POST /api/queue/pause and /api/queue/resume for pause/resume
- Implement POST /api/queue/reorder for queue item reordering
- Implement DELETE /api/queue/completed for clearing completed items
- Implement POST /api/queue/retry for retrying failed downloads
- Add get_download_service and get_anime_service dependencies
- Register download router in FastAPI application
- Add comprehensive test suite for all endpoints
- All endpoints require JWT authentication
- Update infrastructure documentation
- Remove completed task from instructions.md
Follows REST conventions with proper error handling and status codes.
Tests cover success cases, error conditions, and authentication requirements.
This commit is contained in:
474
src/server/api/download.py
Normal file
474
src/server/api/download.py
Normal file
@@ -0,0 +1,474 @@
|
||||
"""Download queue API endpoints for Aniworld web application.
|
||||
|
||||
This module provides REST API endpoints for managing the anime download queue,
|
||||
including adding episodes, removing items, controlling queue processing, and
|
||||
retrieving queue status and statistics.
|
||||
"""
|
||||
from fastapi import APIRouter, Depends, HTTPException, Path, status
|
||||
|
||||
from src.server.models.download import (
|
||||
DownloadRequest,
|
||||
DownloadResponse,
|
||||
QueueOperationRequest,
|
||||
QueueReorderRequest,
|
||||
QueueStatusResponse,
|
||||
)
|
||||
from src.server.services.download_service import DownloadService, DownloadServiceError
|
||||
from src.server.utils.dependencies import get_download_service, require_auth
|
||||
|
||||
router = APIRouter(prefix="/api/queue", tags=["download"])
|
||||
|
||||
|
||||
@router.get("/status", response_model=QueueStatusResponse)
|
||||
async def get_queue_status(
|
||||
download_service: DownloadService = Depends(get_download_service),
|
||||
_: dict = Depends(require_auth),
|
||||
):
|
||||
"""Get current download queue status and statistics.
|
||||
|
||||
Returns comprehensive information about all queue items including:
|
||||
- Active downloads with progress
|
||||
- Pending items waiting to be processed
|
||||
- Recently completed downloads
|
||||
- Failed downloads
|
||||
|
||||
Requires authentication.
|
||||
|
||||
Returns:
|
||||
QueueStatusResponse: Complete queue status and statistics
|
||||
|
||||
Raises:
|
||||
HTTPException: 401 if not authenticated, 500 on service error
|
||||
"""
|
||||
try:
|
||||
queue_status = await download_service.get_queue_status()
|
||||
queue_stats = await download_service.get_queue_stats()
|
||||
|
||||
return QueueStatusResponse(status=queue_status, statistics=queue_stats)
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to retrieve queue status: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/add",
|
||||
response_model=DownloadResponse,
|
||||
status_code=status.HTTP_201_CREATED,
|
||||
)
|
||||
async def add_to_queue(
|
||||
request: DownloadRequest,
|
||||
download_service: DownloadService = Depends(get_download_service),
|
||||
_: dict = Depends(require_auth),
|
||||
):
|
||||
"""Add episodes to the download queue.
|
||||
|
||||
Adds one or more episodes to the download queue with specified priority.
|
||||
Episodes are validated and queued for processing based on priority level:
|
||||
- HIGH priority items are processed first
|
||||
- NORMAL and LOW priority items follow FIFO order
|
||||
|
||||
Requires authentication.
|
||||
|
||||
Args:
|
||||
request: Download request with serie info, episodes, and priority
|
||||
|
||||
Returns:
|
||||
DownloadResponse: Status and list of created download item IDs
|
||||
|
||||
Raises:
|
||||
HTTPException: 401 if not authenticated, 400 for invalid request,
|
||||
500 on service error
|
||||
"""
|
||||
try:
|
||||
# Validate request
|
||||
if not request.episodes:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="At least one episode must be specified",
|
||||
)
|
||||
|
||||
# Add to queue
|
||||
added_ids = await download_service.add_to_queue(
|
||||
serie_id=request.serie_id,
|
||||
serie_name=request.serie_name,
|
||||
episodes=request.episodes,
|
||||
priority=request.priority,
|
||||
)
|
||||
|
||||
return DownloadResponse(
|
||||
status="success",
|
||||
message=f"Added {len(added_ids)} episode(s) to download queue",
|
||||
added_items=added_ids,
|
||||
failed_items=[],
|
||||
)
|
||||
|
||||
except DownloadServiceError as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=str(e),
|
||||
)
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to add episodes to queue: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.delete("/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
|
||||
async def remove_from_queue(
|
||||
item_id: str = Path(..., description="Download item ID to remove"),
|
||||
download_service: DownloadService = Depends(get_download_service),
|
||||
_: dict = Depends(require_auth),
|
||||
):
|
||||
"""Remove a specific item from the download queue.
|
||||
|
||||
Removes a download item from the queue. If the item is currently
|
||||
downloading, it will be cancelled and marked as cancelled. If it's
|
||||
pending, it will simply be removed from the queue.
|
||||
|
||||
Requires authentication.
|
||||
|
||||
Args:
|
||||
item_id: Unique identifier of the download item to remove
|
||||
|
||||
Raises:
|
||||
HTTPException: 401 if not authenticated, 404 if item not found,
|
||||
500 on service error
|
||||
"""
|
||||
try:
|
||||
removed_ids = await download_service.remove_from_queue([item_id])
|
||||
|
||||
if not removed_ids:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"Download item {item_id} not found in queue",
|
||||
)
|
||||
|
||||
except DownloadServiceError as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=str(e),
|
||||
)
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to remove item from queue: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.delete("/", status_code=status.HTTP_204_NO_CONTENT)
|
||||
async def remove_multiple_from_queue(
|
||||
request: QueueOperationRequest,
|
||||
download_service: DownloadService = Depends(get_download_service),
|
||||
_: dict = Depends(require_auth),
|
||||
):
|
||||
"""Remove multiple items from the download queue.
|
||||
|
||||
Batch removal of multiple download items. Each item is processed
|
||||
individually, and the operation continues even if some items are not
|
||||
found.
|
||||
|
||||
Requires authentication.
|
||||
|
||||
Args:
|
||||
request: List of download item IDs to remove
|
||||
|
||||
Raises:
|
||||
HTTPException: 401 if not authenticated, 400 for invalid request,
|
||||
500 on service error
|
||||
"""
|
||||
try:
|
||||
if not request.item_ids:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="At least one item ID must be specified",
|
||||
)
|
||||
|
||||
await download_service.remove_from_queue(request.item_ids)
|
||||
|
||||
# Note: We don't raise 404 if some items weren't found, as this is
|
||||
# a batch operation and partial success is acceptable
|
||||
|
||||
except DownloadServiceError as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=str(e),
|
||||
)
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to remove items from queue: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.post("/start", status_code=status.HTTP_200_OK)
|
||||
async def start_queue(
|
||||
download_service: DownloadService = Depends(get_download_service),
|
||||
_: dict = Depends(require_auth),
|
||||
):
|
||||
"""Start the download queue processor.
|
||||
|
||||
Starts processing the download queue. Downloads will be processed according
|
||||
to priority and concurrency limits. If the queue is already running, this
|
||||
operation is idempotent.
|
||||
|
||||
Requires authentication.
|
||||
|
||||
Returns:
|
||||
dict: Status message indicating queue has been started
|
||||
|
||||
Raises:
|
||||
HTTPException: 401 if not authenticated, 500 on service error
|
||||
"""
|
||||
try:
|
||||
await download_service.start()
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": "Download queue processing started",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to start download queue: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.post("/stop", status_code=status.HTTP_200_OK)
|
||||
async def stop_queue(
|
||||
download_service: DownloadService = Depends(get_download_service),
|
||||
_: dict = Depends(require_auth),
|
||||
):
|
||||
"""Stop the download queue processor.
|
||||
|
||||
Stops processing the download queue. Active downloads will be allowed to
|
||||
complete (with a timeout), then the queue processor will shut down.
|
||||
Queue state is persisted before shutdown.
|
||||
|
||||
Requires authentication.
|
||||
|
||||
Returns:
|
||||
dict: Status message indicating queue has been stopped
|
||||
|
||||
Raises:
|
||||
HTTPException: 401 if not authenticated, 500 on service error
|
||||
"""
|
||||
try:
|
||||
await download_service.stop()
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": "Download queue processing stopped",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to stop download queue: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.post("/pause", status_code=status.HTTP_200_OK)
|
||||
async def pause_queue(
|
||||
download_service: DownloadService = Depends(get_download_service),
|
||||
_: dict = Depends(require_auth),
|
||||
):
|
||||
"""Pause the download queue processor.
|
||||
|
||||
Pauses download processing. Active downloads will continue, but no new
|
||||
downloads will be started until the queue is resumed.
|
||||
|
||||
Requires authentication.
|
||||
|
||||
Returns:
|
||||
dict: Status message indicating queue has been paused
|
||||
|
||||
Raises:
|
||||
HTTPException: 401 if not authenticated, 500 on service error
|
||||
"""
|
||||
try:
|
||||
await download_service.pause_queue()
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": "Download queue paused",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to pause download queue: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.post("/resume", status_code=status.HTTP_200_OK)
|
||||
async def resume_queue(
|
||||
download_service: DownloadService = Depends(get_download_service),
|
||||
_: dict = Depends(require_auth),
|
||||
):
|
||||
"""Resume the download queue processor.
|
||||
|
||||
Resumes download processing after being paused. The queue will continue
|
||||
processing pending items according to priority.
|
||||
|
||||
Requires authentication.
|
||||
|
||||
Returns:
|
||||
dict: Status message indicating queue has been resumed
|
||||
|
||||
Raises:
|
||||
HTTPException: 401 if not authenticated, 500 on service error
|
||||
"""
|
||||
try:
|
||||
await download_service.resume_queue()
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": "Download queue resumed",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to resume download queue: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.post("/reorder", status_code=status.HTTP_200_OK)
|
||||
async def reorder_queue(
|
||||
request: QueueReorderRequest,
|
||||
download_service: DownloadService = Depends(get_download_service),
|
||||
_: dict = Depends(require_auth),
|
||||
):
|
||||
"""Reorder an item in the pending queue.
|
||||
|
||||
Changes the position of a pending download item in the queue. This only
|
||||
affects items that haven't started downloading yet. The position is
|
||||
0-based.
|
||||
|
||||
Requires authentication.
|
||||
|
||||
Args:
|
||||
request: Item ID and new position in queue
|
||||
|
||||
Returns:
|
||||
dict: Status message indicating item has been reordered
|
||||
|
||||
Raises:
|
||||
HTTPException: 401 if not authenticated, 404 if item not found,
|
||||
400 for invalid request, 500 on service error
|
||||
"""
|
||||
try:
|
||||
success = await download_service.reorder_queue(
|
||||
item_id=request.item_id,
|
||||
new_position=request.new_position,
|
||||
)
|
||||
|
||||
if not success:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"Item {request.item_id} not found in pending queue",
|
||||
)
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": "Queue item reordered successfully",
|
||||
}
|
||||
|
||||
except DownloadServiceError as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=str(e),
|
||||
)
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to reorder queue item: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.delete("/completed", status_code=status.HTTP_200_OK)
|
||||
async def clear_completed(
|
||||
download_service: DownloadService = Depends(get_download_service),
|
||||
_: dict = Depends(require_auth),
|
||||
):
|
||||
"""Clear completed downloads from history.
|
||||
|
||||
Removes all completed download items from the queue history. This helps
|
||||
keep the queue display clean and manageable.
|
||||
|
||||
Requires authentication.
|
||||
|
||||
Returns:
|
||||
dict: Status message with count of cleared items
|
||||
|
||||
Raises:
|
||||
HTTPException: 401 if not authenticated, 500 on service error
|
||||
"""
|
||||
try:
|
||||
cleared_count = await download_service.clear_completed()
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": f"Cleared {cleared_count} completed item(s)",
|
||||
"count": cleared_count,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to clear completed items: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.post("/retry", status_code=status.HTTP_200_OK)
|
||||
async def retry_failed(
|
||||
request: QueueOperationRequest,
|
||||
download_service: DownloadService = Depends(get_download_service),
|
||||
_: dict = Depends(require_auth),
|
||||
):
|
||||
"""Retry failed downloads.
|
||||
|
||||
Moves failed download items back to the pending queue for retry. Only items
|
||||
that haven't exceeded the maximum retry count will be retried.
|
||||
|
||||
Requires authentication.
|
||||
|
||||
Args:
|
||||
request: List of download item IDs to retry (empty list retries all)
|
||||
|
||||
Returns:
|
||||
dict: Status message with count of retried items
|
||||
|
||||
Raises:
|
||||
HTTPException: 401 if not authenticated, 500 on service error
|
||||
"""
|
||||
try:
|
||||
# If no specific IDs provided, retry all failed items
|
||||
item_ids = request.item_ids if request.item_ids else None
|
||||
|
||||
retried_ids = await download_service.retry_failed(item_ids)
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": f"Retrying {len(retried_ids)} failed item(s)",
|
||||
"retried_ids": retried_ids,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to retry downloads: {str(e)}",
|
||||
)
|
||||
@@ -18,6 +18,7 @@ from src.config.settings import settings
|
||||
# Import core functionality
|
||||
from src.core.SeriesApp import SeriesApp
|
||||
from src.server.api.auth import router as auth_router
|
||||
from src.server.api.download import router as download_router
|
||||
from src.server.controllers.error_controller import (
|
||||
not_found_handler,
|
||||
server_error_handler,
|
||||
@@ -57,6 +58,7 @@ app.add_middleware(AuthMiddleware, rate_limit_per_minute=5)
|
||||
app.include_router(health_router)
|
||||
app.include_router(page_router)
|
||||
app.include_router(auth_router)
|
||||
app.include_router(download_router)
|
||||
|
||||
# Global variables for application state
|
||||
series_app: Optional[SeriesApp] = None
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
Dependency injection utilities for FastAPI.
|
||||
|
||||
This module provides dependency injection functions for the FastAPI
|
||||
application, including SeriesApp instances, database sessions, and
|
||||
authentication dependencies.
|
||||
application, including SeriesApp instances, AnimeService, DownloadService,
|
||||
database sessions, and authentication dependencies.
|
||||
"""
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
@@ -26,6 +26,10 @@ security = HTTPBearer()
|
||||
# Global SeriesApp instance
|
||||
_series_app: Optional[SeriesApp] = None
|
||||
|
||||
# Global service instances
|
||||
_anime_service: Optional[object] = None
|
||||
_download_service: Optional[object] = None
|
||||
|
||||
|
||||
def get_series_app() -> SeriesApp:
|
||||
"""
|
||||
@@ -193,3 +197,79 @@ async def log_request_dependency():
|
||||
TODO: Implement request logging logic
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
def get_anime_service() -> object:
|
||||
"""
|
||||
Dependency to get AnimeService instance.
|
||||
|
||||
Returns:
|
||||
AnimeService: The anime service for async operations
|
||||
|
||||
Raises:
|
||||
HTTPException: If anime directory is not configured or
|
||||
AnimeService initialization fails
|
||||
"""
|
||||
global _anime_service
|
||||
|
||||
if not settings.anime_directory:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||||
detail="Anime directory not configured. Please complete setup.",
|
||||
)
|
||||
|
||||
if _anime_service is None:
|
||||
try:
|
||||
from src.server.services.anime_service import AnimeService
|
||||
_anime_service = AnimeService(settings.anime_directory)
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to initialize AnimeService: {str(e)}",
|
||||
) from e
|
||||
|
||||
return _anime_service
|
||||
|
||||
|
||||
def get_download_service() -> object:
|
||||
"""
|
||||
Dependency to get DownloadService instance.
|
||||
|
||||
Returns:
|
||||
DownloadService: The download queue service
|
||||
|
||||
Raises:
|
||||
HTTPException: If DownloadService initialization fails
|
||||
"""
|
||||
global _download_service
|
||||
|
||||
if _download_service is None:
|
||||
try:
|
||||
from src.server.services.download_service import DownloadService
|
||||
|
||||
# Get anime service first (required dependency)
|
||||
anime_service = get_anime_service()
|
||||
|
||||
# Initialize download service with anime service
|
||||
_download_service = DownloadService(anime_service)
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to initialize DownloadService: {str(e)}",
|
||||
) from e
|
||||
|
||||
return _download_service
|
||||
|
||||
|
||||
def reset_anime_service() -> None:
|
||||
"""Reset global AnimeService instance (for testing/config changes)."""
|
||||
global _anime_service
|
||||
_anime_service = None
|
||||
|
||||
|
||||
def reset_download_service() -> None:
|
||||
"""Reset global DownloadService instance (for testing/config changes)."""
|
||||
global _download_service
|
||||
_download_service = None
|
||||
|
||||
Reference in New Issue
Block a user