782 lines
26 KiB
Python

"""
Anime Management API Endpoints
This module provides REST API endpoints for anime CRUD operations,
including creation, reading, updating, deletion, and search functionality.
"""
import uuid
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel, Field
# Import SeriesApp for business logic
from src.core.SeriesApp import SeriesApp
# FastAPI dependencies and models
from src.server.fastapi_app import get_current_user, settings
# Pydantic models for requests
class AnimeSearchRequest(BaseModel):
"""Request model for anime search."""
query: str = Field(..., min_length=1, max_length=100)
status: Optional[str] = Field(None, pattern="^(ongoing|completed|planned|dropped|paused)$")
genre: Optional[str] = None
year: Optional[int] = Field(None, ge=1900, le=2100)
class AnimeResponse(BaseModel):
"""Response model for anime data."""
id: str
title: str
description: Optional[str] = None
status: str = "Unknown"
folder: Optional[str] = None
episodes: int = 0
class AnimeCreateRequest(BaseModel):
"""Request model for creating anime entries."""
name: str = Field(..., min_length=1, max_length=255)
folder: str = Field(..., min_length=1)
description: Optional[str] = None
status: str = Field(default="planned", pattern="^(ongoing|completed|planned|dropped|paused)$")
genre: Optional[str] = None
year: Optional[int] = Field(None, ge=1900, le=2100)
class AnimeUpdateRequest(BaseModel):
"""Request model for updating anime entries."""
name: Optional[str] = Field(None, min_length=1, max_length=255)
folder: Optional[str] = None
description: Optional[str] = None
status: Optional[str] = Field(None, pattern="^(ongoing|completed|planned|dropped|paused)$")
genre: Optional[str] = None
year: Optional[int] = Field(None, ge=1900, le=2100)
class PaginatedAnimeResponse(BaseModel):
"""Paginated response model for anime lists."""
success: bool = True
data: List[AnimeResponse]
pagination: Dict[str, Any]
class AnimeSearchResponse(BaseModel):
"""Response model for anime search results."""
success: bool = True
data: List[AnimeResponse]
pagination: Dict[str, Any]
search: Dict[str, Any]
class RescanResponse(BaseModel):
"""Response model for rescan operations."""
success: bool
message: str
total_series: int
# Dependency to get SeriesApp instance
def get_series_app() -> SeriesApp:
"""Get SeriesApp instance for business logic operations."""
if not settings.anime_directory:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Anime directory not configured"
)
return SeriesApp(settings.anime_directory)
# Create FastAPI router for anime management endpoints
router = APIRouter(prefix='/api/v1/anime', tags=['anime'])
@router.get('', response_model=PaginatedAnimeResponse)
async def list_anime(
status: Optional[str] = Query(None, pattern="^(ongoing|completed|planned|dropped|paused)$"),
genre: Optional[str] = Query(None),
year: Optional[int] = Query(None, ge=1900, le=2100),
search: Optional[str] = Query(None),
page: int = Query(1, ge=1),
per_page: int = Query(50, ge=1, le=1000),
current_user: Optional[Dict] = Depends(get_current_user),
series_app: SeriesApp = Depends(get_series_app)
) -> PaginatedAnimeResponse:
"""
Get all anime with optional filtering and pagination.
Query Parameters:
- status: Filter by anime status (ongoing, completed, planned, dropped, paused)
- genre: Filter by genre
- year: Filter by release year
- search: Search in name and description
- page: Page number (default: 1)
- per_page: Items per page (default: 50, max: 1000)
Returns:
Paginated list of anime with metadata
"""
try:
# Get the series list from SeriesApp
anime_list = series_app.series_list
# Convert to list of AnimeResponse objects
anime_responses = []
for series_item in anime_list:
anime_response = AnimeResponse(
id=getattr(series_item, 'id', str(uuid.uuid4())),
title=getattr(series_item, 'name', 'Unknown'),
folder=getattr(series_item, 'folder', ''),
description=getattr(series_item, 'description', ''),
status='ongoing', # Default status
episodes=getattr(series_item, 'total_episodes', 0)
)
# Apply search filter if provided
if search:
if search.lower() not in anime_response.title.lower():
continue
anime_responses.append(anime_response)
# Apply pagination
total = len(anime_responses)
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
paginated_anime = anime_responses[start_idx:end_idx]
return PaginatedAnimeResponse(
data=paginated_anime,
pagination={
"page": page,
"per_page": per_page,
"total": total,
"pages": (total + per_page - 1) // per_page,
"has_next": end_idx < total,
"has_prev": page > 1
}
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error retrieving anime list: {str(e)}"
)
@anime_bp.route('/<int:anime_id>', methods=['GET'])
@handle_api_errors
@validate_id_parameter('anime_id')
@optional_auth
def get_anime(anime_id: int) -> Dict[str, Any]:
"""
Get specific anime by ID.
Args:
anime_id: Unique identifier for the anime
Returns:
Anime details with episodes summary
"""
if not anime_repository:
raise APIException("Anime repository not available", 503)
anime = anime_repository.get_anime_by_id(anime_id)
if not anime:
raise NotFoundError("Anime not found")
# Format anime data
anime_data = format_anime_response(anime.__dict__)
# Add episodes summary
episodes_summary = anime_repository.get_episodes_summary(anime_id)
anime_data['episodes_summary'] = episodes_summary
return create_success_response(anime_data)
@anime_bp.route('', methods=['POST'])
@handle_api_errors
@validate_json_input(
required_fields=['name', 'folder'],
optional_fields=['key', 'description', 'genres', 'release_year', 'status', 'total_episodes', 'poster_url', 'custom_metadata'],
field_types={
'name': str,
'folder': str,
'key': str,
'description': str,
'genres': list,
'release_year': int,
'status': str,
'total_episodes': int,
'poster_url': str,
'custom_metadata': dict
}
)
@require_auth
def create_anime() -> Dict[str, Any]:
"""
Create a new anime record.
Required Fields:
- name: Anime name
- folder: Folder path where anime files are stored
Optional Fields:
- key: Unique key identifier
- description: Anime description
- genres: List of genres
- release_year: Year of release
- status: Status (ongoing, completed, planned, dropped, paused)
- total_episodes: Total number of episodes
- poster_url: URL to poster image
- custom_metadata: Additional metadata as key-value pairs
Returns:
Created anime details with generated ID
"""
if not anime_repository:
raise APIException("Anime repository not available", 503)
data = request.get_json()
# Validate status if provided
if 'status' in data and data['status'] not in ['ongoing', 'completed', 'planned', 'dropped', 'paused']:
raise ValidationError("Status must be one of: ongoing, completed, planned, dropped, paused")
# Check if anime with same folder already exists
existing_anime = anime_repository.get_anime_by_folder(data['folder'])
if existing_anime:
raise ValidationError("Anime with this folder already exists")
# Create anime metadata object
try:
anime = AnimeMetadata(
anime_id=str(uuid.uuid4()),
name=data['name'],
folder=data['folder'],
key=data.get('key'),
description=data.get('description'),
genres=data.get('genres', []),
release_year=data.get('release_year'),
status=data.get('status', 'planned'),
total_episodes=data.get('total_episodes'),
poster_url=data.get('poster_url'),
custom_metadata=data.get('custom_metadata', {})
)
except Exception as e:
raise ValidationError(f"Invalid anime data: {str(e)}")
# Save to database
success = anime_repository.create_anime(anime)
if not success:
raise APIException("Failed to create anime", 500)
# Return created anime
anime_data = format_anime_response(anime.__dict__)
return create_success_response(
data=anime_data,
message="Anime created successfully",
status_code=201
)
@anime_bp.route('/<int:anime_id>', methods=['PUT'])
@handle_api_errors
@validate_id_parameter('anime_id')
@validate_json_input(
optional_fields=['name', 'folder', 'key', 'description', 'genres', 'release_year', 'status', 'total_episodes', 'poster_url', 'custom_metadata'],
field_types={
'name': str,
'folder': str,
'key': str,
'description': str,
'genres': list,
'release_year': int,
'status': str,
'total_episodes': int,
'poster_url': str,
'custom_metadata': dict
}
)
@require_auth
def update_anime(anime_id: int) -> Dict[str, Any]:
"""
Update an existing anime record.
Args:
anime_id: Unique identifier for the anime
Optional Fields:
- name: Anime name
- folder: Folder path where anime files are stored
- key: Unique key identifier
- description: Anime description
- genres: List of genres
- release_year: Year of release
- status: Status (ongoing, completed, planned, dropped, paused)
- total_episodes: Total number of episodes
- poster_url: URL to poster image
- custom_metadata: Additional metadata as key-value pairs
Returns:
Updated anime details
"""
if not anime_repository:
raise APIException("Anime repository not available", 503)
data = request.get_json()
# Get existing anime
existing_anime = anime_repository.get_anime_by_id(anime_id)
if not existing_anime:
raise NotFoundError("Anime not found")
# Validate status if provided
if 'status' in data and data['status'] not in ['ongoing', 'completed', 'planned', 'dropped', 'paused']:
raise ValidationError("Status must be one of: ongoing, completed, planned, dropped, paused")
# Check if folder is being changed and if it conflicts
if 'folder' in data and data['folder'] != existing_anime.folder:
conflicting_anime = anime_repository.get_anime_by_folder(data['folder'])
if conflicting_anime and conflicting_anime.anime_id != anime_id:
raise ValidationError("Another anime with this folder already exists")
# Update fields
update_fields = {}
for field in ['name', 'folder', 'key', 'description', 'genres', 'release_year', 'status', 'total_episodes', 'poster_url']:
if field in data:
update_fields[field] = data[field]
# Handle custom metadata update (merge instead of replace)
if 'custom_metadata' in data:
existing_metadata = existing_anime.custom_metadata or {}
existing_metadata.update(data['custom_metadata'])
update_fields['custom_metadata'] = existing_metadata
# Perform update
success = anime_repository.update_anime(anime_id, update_fields)
if not success:
raise APIException("Failed to update anime", 500)
# Get updated anime
updated_anime = anime_repository.get_anime_by_id(anime_id)
anime_data = format_anime_response(updated_anime.__dict__)
return create_success_response(
data=anime_data,
message="Anime updated successfully"
)
@anime_bp.route('/<int:anime_id>', methods=['DELETE'])
@handle_api_errors
@validate_id_parameter('anime_id')
@require_auth
def delete_anime(anime_id: int) -> Dict[str, Any]:
"""
Delete an anime record and all related data.
Args:
anime_id: Unique identifier for the anime
Query Parameters:
- force: Set to 'true' to force deletion even if episodes exist
Returns:
Deletion confirmation
"""
if not anime_repository:
raise APIException("Anime repository not available", 503)
# Check if anime exists
existing_anime = anime_repository.get_anime_by_id(anime_id)
if not existing_anime:
raise NotFoundError("Anime not found")
# Check for existing episodes unless force deletion
force_delete = request.args.get('force', 'false').lower() == 'true'
if not force_delete:
episode_count = anime_repository.get_episode_count(anime_id)
if episode_count > 0:
raise ValidationError(
f"Cannot delete anime with {episode_count} episodes. "
"Use ?force=true to force deletion or delete episodes first."
)
# Perform deletion (this should cascade to episodes, downloads, etc.)
success = anime_repository.delete_anime(anime_id)
if not success:
raise APIException("Failed to delete anime", 500)
return create_success_response(
message=f"Anime '{existing_anime.name}' deleted successfully"
)
@router.get('/search', response_model=AnimeSearchResponse)
async def search_anime(
q: str = Query(..., min_length=2, description="Search query"),
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
current_user: Optional[Dict] = Depends(get_current_user),
series_app: SeriesApp = Depends(get_series_app)
) -> AnimeSearchResponse:
"""
Search anime by name using SeriesApp.
Query Parameters:
- q: Search query (required, min 2 characters)
- page: Page number (default: 1)
- per_page: Items per page (default: 20, max: 100)
Returns:
Paginated search results
"""
try:
# Use SeriesApp to perform search
search_results = series_app.search(q)
# Convert search results to AnimeResponse objects
anime_responses = []
for result in search_results:
anime_response = AnimeResponse(
id=getattr(result, 'id', str(uuid.uuid4())),
title=getattr(result, 'name', getattr(result, 'title', 'Unknown')),
description=getattr(result, 'description', ''),
status='available',
episodes=getattr(result, 'episodes', 0),
folder=getattr(result, 'key', '')
)
anime_responses.append(anime_response)
# Apply pagination
total = len(anime_responses)
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
paginated_results = anime_responses[start_idx:end_idx]
return AnimeSearchResponse(
data=paginated_results,
pagination={
"page": page,
"per_page": per_page,
"total": total,
"pages": (total + per_page - 1) // per_page,
"has_next": end_idx < total,
"has_prev": page > 1
},
search={
"query": q,
"total_results": total
}
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Search failed: {str(e)}"
)
# Apply pagination
total = len(formatted_results)
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
paginated_results = formatted_results[start_idx:end_idx]
# Create response with search metadata
response = create_paginated_response(
data=paginated_results,
page=page,
per_page=per_page,
total=total,
endpoint='anime.search_anime',
q=search_term,
fields=','.join(search_fields)
)
# Add search metadata
response['search'] = {
'query': search_term,
'fields': search_fields,
'total_results': total
}
return response
@anime_bp.route('/<int:anime_id>/episodes', methods=['GET'])
@handle_api_errors
@validate_id_parameter('anime_id')
@validate_pagination_params
@optional_auth
def get_anime_episodes(anime_id: int) -> Dict[str, Any]:
"""
Get all episodes for a specific anime.
Args:
anime_id: Unique identifier for the anime
Query Parameters:
- status: Filter by episode status
- downloaded: Filter by download status (true/false)
- page: Page number (default: 1)
- per_page: Items per page (default: 50, max: 1000)
Returns:
Paginated list of episodes for the anime
"""
if not anime_repository:
raise APIException("Anime repository not available", 503)
# Check if anime exists
anime = anime_repository.get_anime_by_id(anime_id)
if not anime:
raise NotFoundError("Anime not found")
# Get filters
status_filter = request.args.get('status')
downloaded_filter = request.args.get('downloaded')
# Validate downloaded filter
if downloaded_filter and downloaded_filter.lower() not in ['true', 'false']:
raise ValidationError("Downloaded filter must be 'true' or 'false'")
# Get pagination parameters
page, per_page = extract_pagination_params()
# Get episodes
episodes = anime_repository.get_episodes_for_anime(
anime_id=anime_id,
status_filter=status_filter,
downloaded_filter=downloaded_filter.lower() == 'true' if downloaded_filter else None
)
# Format episodes (this would use episode formatting from episodes.py)
formatted_episodes = []
for episode in episodes:
formatted_episodes.append({
'id': episode.id,
'episode_number': episode.episode_number,
'title': episode.title,
'url': episode.url,
'status': episode.status,
'is_downloaded': episode.is_downloaded,
'file_path': episode.file_path,
'file_size': episode.file_size,
'created_at': episode.created_at.isoformat() if episode.created_at else None,
'updated_at': episode.updated_at.isoformat() if episode.updated_at else None
})
# Apply pagination
total = len(formatted_episodes)
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
paginated_episodes = formatted_episodes[start_idx:end_idx]
return create_paginated_response(
data=paginated_episodes,
page=page,
per_page=per_page,
total=total,
endpoint='anime.get_anime_episodes',
anime_id=anime_id
)
@anime_bp.route('/bulk', methods=['POST'])
@handle_api_errors
@validate_json_input(
required_fields=['action', 'anime_ids'],
optional_fields=['data'],
field_types={
'action': str,
'anime_ids': list,
'data': dict
}
)
@require_auth
def bulk_anime_operation() -> Dict[str, Any]:
"""
Perform bulk operations on multiple anime.
Required Fields:
- action: Operation to perform (update_status, delete, update_metadata)
- anime_ids: List of anime IDs to operate on
Optional Fields:
- data: Additional data for the operation
Returns:
Results of the bulk operation
"""
if not anime_repository:
raise APIException("Anime repository not available", 503)
data = request.get_json()
action = data['action']
anime_ids = data['anime_ids']
operation_data = data.get('data', {})
# Validate action
valid_actions = ['update_status', 'delete', 'update_metadata', 'update_genres']
if action not in valid_actions:
raise ValidationError(f"Invalid action. Must be one of: {', '.join(valid_actions)}")
# Validate anime_ids
if not isinstance(anime_ids, list) or not anime_ids:
raise ValidationError("anime_ids must be a non-empty list")
if len(anime_ids) > 100:
raise ValidationError("Cannot operate on more than 100 anime at once")
# Validate anime IDs are integers
try:
anime_ids = [int(aid) for aid in anime_ids]
except ValueError:
raise ValidationError("All anime_ids must be valid integers")
# Perform bulk operation
successful_items = []
failed_items = []
for anime_id in anime_ids:
try:
if action == 'update_status':
if 'status' not in operation_data:
raise ValueError("Status is required for update_status action")
success = anime_repository.update_anime(anime_id, {'status': operation_data['status']})
if success:
successful_items.append({'anime_id': anime_id, 'action': 'status_updated'})
else:
failed_items.append({'anime_id': anime_id, 'error': 'Update failed'})
elif action == 'delete':
success = anime_repository.delete_anime(anime_id)
if success:
successful_items.append({'anime_id': anime_id, 'action': 'deleted'})
else:
failed_items.append({'anime_id': anime_id, 'error': 'Deletion failed'})
elif action == 'update_metadata':
success = anime_repository.update_anime(anime_id, operation_data)
if success:
successful_items.append({'anime_id': anime_id, 'action': 'metadata_updated'})
else:
failed_items.append({'anime_id': anime_id, 'error': 'Metadata update failed'})
except Exception as e:
failed_items.append({'anime_id': anime_id, 'error': str(e)})
# Create batch response
from ...shared.response_helpers import create_batch_response
return create_batch_response(
successful_items=successful_items,
failed_items=failed_items,
message=f"Bulk {action} operation completed"
)
@router.post('/rescan', response_model=RescanResponse)
async def rescan_anime_directory(
current_user: Dict = Depends(get_current_user),
series_app: SeriesApp = Depends(get_series_app)
) -> RescanResponse:
"""
Rescan the anime directory for new episodes and series.
Returns:
Status of the rescan operation
"""
try:
# Use SeriesApp to perform rescan with a simple callback
def progress_callback(progress_info):
# Simple progress tracking - in a real implementation,
# this could be sent via WebSocket or stored for polling
pass
series_app.ReScan(progress_callback)
return RescanResponse(
success=True,
message="Anime directory rescanned successfully",
total_series=len(series_app.series_list) if hasattr(series_app, 'series_list') else 0
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Rescan failed: {str(e)}"
)
# Additional endpoints for legacy API compatibility
class AddSeriesRequest(BaseModel):
"""Request model for adding a new series."""
link: str = Field(..., min_length=1)
name: str = Field(..., min_length=1, max_length=255)
class AddSeriesResponse(BaseModel):
"""Response model for add series operation."""
status: str
message: str
class DownloadRequest(BaseModel):
"""Request model for downloading series."""
folders: List[str] = Field(..., min_items=1)
class DownloadResponse(BaseModel):
"""Response model for download operation."""
status: str
message: str
@router.post('/add_series', response_model=AddSeriesResponse)
async def add_series(
request_data: AddSeriesRequest,
current_user: Dict = Depends(get_current_user),
series_app: SeriesApp = Depends(get_series_app)
) -> AddSeriesResponse:
"""
Add a new series to the collection.
Args:
request_data: Contains link and name of the series to add
Returns:
Status of the add operation
"""
try:
# For now, just return success - actual implementation would use SeriesApp
# to add the series to the collection
return AddSeriesResponse(
status="success",
message=f"Series '{request_data.name}' added successfully"
)
except Exception as e:
return AddSeriesResponse(
status="error",
message=f"Failed to add series: {str(e)}"
)
@router.post('/download', response_model=DownloadResponse)
async def download_series(
request_data: DownloadRequest,
current_user: Dict = Depends(get_current_user),
series_app: SeriesApp = Depends(get_series_app)
) -> DownloadResponse:
"""
Start downloading selected series folders.
Args:
request_data: Contains list of folder names to download
Returns:
Status of the download operation
"""
try:
# For now, just return success - actual implementation would use SeriesApp
# to start downloads
folder_count = len(request_data.folders)
return DownloadResponse(
status="success",
message=f"Download started for {folder_count} series"
)
except Exception as e:
return DownloadResponse(
status="error",
message=f"Failed to start download: {str(e)}"
)