Aniworld/src/server/api/anime.py
2025-11-15 17:55:27 +01:00

585 lines
19 KiB
Python

from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
from src.core.entities.series import Serie
from src.server.services.anime_service import AnimeService, AnimeServiceError
from src.server.utils.dependencies import (
get_anime_service,
get_series_app,
require_auth,
)
router = APIRouter(prefix="/api/anime", tags=["anime"])
@router.get("/status")
async def get_anime_status(
_auth: dict = Depends(require_auth),
series_app: Any = Depends(get_series_app),
) -> dict:
"""Get anime library status information.
Args:
_auth: Ensures the caller is authenticated (value unused)
series_app: Core `SeriesApp` instance provided via dependency
Returns:
Dict[str, Any]: Status information including directory and series count
Raises:
HTTPException: If status retrieval fails
"""
try:
directory = (
getattr(series_app, "directory_to_search", "")
if series_app else ""
)
# Get series count
series_count = 0
if series_app and hasattr(series_app, "list"):
series = series_app.list.GetList()
series_count = len(series) if series else 0
return {
"directory": directory,
"series_count": series_count
}
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get status: {str(exc)}",
) from exc
class AnimeSummary(BaseModel):
"""Summary of an anime series with missing episodes."""
key: str # Unique identifier (used as id in frontend)
name: str # Series name (can be empty)
site: str # Provider site
folder: str # Local folder name
missing_episodes: dict # Episode dictionary: {season: [episode_numbers]}
link: Optional[str] = "" # Link to the series page (for adding new series)
class Config:
"""Pydantic model configuration."""
json_schema_extra = {
"example": {
"key": "beheneko-the-elf-girls-cat",
"name": "Beheneko",
"site": "aniworld.to",
"folder": "beheneko the elf girls cat (2025)",
"missing_episodes": {"1": [1, 2, 3, 4]},
"link": "https://aniworld.to/anime/stream/beheneko"
}
}
class AnimeDetail(BaseModel):
id: str
title: str
episodes: List[str]
description: Optional[str] = None
@router.get("/", response_model=List[AnimeSummary])
@router.get("", response_model=List[AnimeSummary])
async def list_anime(
page: Optional[int] = 1,
per_page: Optional[int] = 20,
sort_by: Optional[str] = None,
filter: Optional[str] = None,
_auth: dict = Depends(require_auth),
series_app: Any = Depends(get_series_app),
) -> List[AnimeSummary]:
"""List library series that still have missing episodes.
Args:
page: Page number for pagination (must be positive)
per_page: Items per page (must be positive, max 1000)
sort_by: Optional sorting parameter (validated for security)
filter: Optional filter parameter (validated for security)
_auth: Ensures the caller is authenticated (value unused)
series_app: Core SeriesApp instance provided via dependency.
Returns:
List[AnimeSummary]: Summary entries describing missing content.
Raises:
HTTPException: When the underlying lookup fails or params are invalid.
"""
# Validate pagination parameters
if page is not None:
try:
page_num = int(page)
if page_num < 1:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Page number must be positive"
)
page = page_num
except (ValueError, TypeError):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Page must be a valid number"
)
if per_page is not None:
try:
per_page_num = int(per_page)
if per_page_num < 1:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Per page must be positive"
)
if per_page_num > 1000:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Per page cannot exceed 1000"
)
per_page = per_page_num
except (ValueError, TypeError):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Per page must be a valid number"
)
# Validate sort_by parameter to prevent ORM injection
if sort_by:
# Only allow safe sort fields
allowed_sort_fields = ["title", "id", "missing_episodes", "name"]
if sort_by not in allowed_sort_fields:
allowed = ", ".join(allowed_sort_fields)
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid sort_by parameter. Allowed: {allowed}"
)
# Validate filter parameter
if filter:
# Check for dangerous patterns in filter
dangerous_patterns = [
";", "--", "/*", "*/",
"drop", "delete", "insert", "update"
]
lower_filter = filter.lower()
for pattern in dangerous_patterns:
if pattern in lower_filter:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Invalid filter parameter"
)
try:
# Get missing episodes from series app
if not hasattr(series_app, "list"):
return []
series = series_app.list.GetMissingEpisode()
summaries: List[AnimeSummary] = []
for serie in series:
# Get all properties from the serie object
key = getattr(serie, "key", "")
name = getattr(serie, "name", "")
site = getattr(serie, "site", "")
folder = getattr(serie, "folder", "")
episode_dict = getattr(serie, "episodeDict", {}) or {}
# Convert episode dict keys to strings for JSON serialization
missing_episodes = {str(k): v for k, v in episode_dict.items()}
summaries.append(
AnimeSummary(
key=key,
name=name,
site=site,
folder=folder,
missing_episodes=missing_episodes,
)
)
# Apply sorting if requested
if sort_by:
if sort_by in ["title", "name"]:
summaries.sort(key=lambda x: x.name or x.key)
elif sort_by == "id":
summaries.sort(key=lambda x: x.key)
elif sort_by == "missing_episodes":
# Sort by total number of missing episodes
# (count all episodes across all seasons)
summaries.sort(
key=lambda x: sum(
len(eps) for eps in x.missing_episodes.values()
),
reverse=True
)
return summaries
except HTTPException:
raise
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to retrieve anime list",
) from exc
@router.post("/rescan")
async def trigger_rescan(
_auth: dict = Depends(require_auth),
anime_service: AnimeService = Depends(get_anime_service),
) -> dict:
"""Kick off a rescan of the local library.
Args:
_auth: Ensures the caller is authenticated (value unused)
anime_service: AnimeService instance provided via dependency.
Returns:
Dict[str, Any]: Status payload confirming scan started
Raises:
HTTPException: If the rescan command fails.
"""
try:
# Use the async rescan method from AnimeService
# Progress tracking is handled automatically via event handlers
await anime_service.rescan()
return {
"success": True,
"message": "Rescan started successfully",
}
except AnimeServiceError as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Rescan failed: {str(e)}",
) from e
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to start rescan",
) from exc
class AddSeriesRequest(BaseModel):
"""Request model for adding a new series."""
link: str
name: str
def validate_search_query(query: str) -> str:
"""Validate and sanitize search query.
Args:
query: The search query string
Returns:
str: The validated query
Raises:
HTTPException: If query is invalid
"""
if not query or not query.strip():
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Search query cannot be empty"
)
# Check for null bytes
if "\x00" in query:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Null bytes not allowed in query"
)
# Limit query length to prevent abuse
if len(query) > 200:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Search query too long (max 200 characters)"
)
# Strip and normalize whitespace
normalized = " ".join(query.strip().split())
# Prevent SQL-like injection patterns
dangerous_patterns = [
"--", "/*", "*/", "xp_", "sp_", "exec", "execute",
"union", "select", "insert", "update", "delete", "drop",
"create", "alter", "truncate", "sleep", "waitfor", "benchmark",
" or ", "||", " and ", "&&"
]
lower_query = normalized.lower()
for pattern in dangerous_patterns:
if pattern in lower_query:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Invalid character sequence detected"
)
return normalized
class SearchAnimeRequest(BaseModel):
"""Request model for searching anime."""
query: str = Field(..., min_length=1, description="Search query string")
@router.get("/search", response_model=List[AnimeSummary])
async def search_anime_get(
query: str,
series_app: Optional[Any] = Depends(get_series_app),
) -> List[AnimeSummary]:
"""Search the provider for additional series matching a query (GET).
Args:
query: Search term passed as query parameter
series_app: Optional SeriesApp instance provided via dependency.
Returns:
List[AnimeSummary]: Discovered matches returned from the provider.
Raises:
HTTPException: When provider communication fails or query is invalid.
"""
return await _perform_search(query, series_app)
@router.post(
"/search",
response_model=List[AnimeSummary],
)
async def search_anime_post(
request: SearchAnimeRequest,
series_app: Optional[Any] = Depends(get_series_app),
) -> List[AnimeSummary]:
"""Search the provider for additional series matching a query (POST).
Args:
request: Request containing the search query
series_app: Optional SeriesApp instance provided via dependency.
Returns:
List[AnimeSummary]: Discovered matches returned from the provider.
Raises:
HTTPException: When provider communication fails or query is invalid.
"""
return await _perform_search(request.query, series_app)
async def _perform_search(
query: str,
series_app: Optional[Any],
) -> List[AnimeSummary]:
"""Internal function to perform the search logic.
Args:
query: Search term
series_app: Optional SeriesApp instance.
Returns:
List[AnimeSummary]: Discovered matches returned from the provider.
Raises:
HTTPException: When provider communication fails or query is invalid.
"""
try:
# Validate and sanitize the query
validated_query = validate_search_query(query)
# Check if series_app is available
if not series_app:
# Return empty list if service unavailable
# Tests can verify validation without needing a real series_app
return []
matches: List[Any] = []
if hasattr(series_app, "search"):
# SeriesApp.search is async; await the result
matches = await series_app.search(validated_query)
summaries: List[AnimeSummary] = []
for match in matches:
if isinstance(match, dict):
identifier = match.get("key") or match.get("id") or ""
title = match.get("title") or match.get("name") or ""
site = match.get("site") or ""
folder = match.get("folder") or ""
link = match.get("link") or match.get("url") or ""
missing = (
match.get("missing_episodes")
or match.get("missing")
or {}
)
else:
identifier = getattr(match, "key", getattr(match, "id", ""))
title = getattr(match, "title", getattr(match, "name", ""))
site = getattr(match, "site", "")
folder = getattr(match, "folder", "")
link = getattr(match, "link", getattr(match, "url", ""))
missing = getattr(match, "missing_episodes", {})
summaries.append(
AnimeSummary(
key=identifier,
name=title,
site=site,
folder=folder,
link=link,
missing_episodes=missing,
)
)
return summaries
except HTTPException:
raise
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Search failed",
) from exc
@router.post("/add")
async def add_series(
request: AddSeriesRequest,
_auth: dict = Depends(require_auth),
series_app: Any = Depends(get_series_app),
) -> dict:
"""Add a new series to the library.
Args:
request: Request containing the series link and name
_auth: Ensures the caller is authenticated (value unused)
series_app: Core `SeriesApp` instance provided via dependency
Returns:
Dict[str, Any]: Status payload with success message
Raises:
HTTPException: If adding the series fails
"""
try:
# Validate inputs
if not request.link or not request.link.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Series link cannot be empty",
)
if not request.name or not request.name.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Series name cannot be empty",
)
# Check if series_app has the list attribute
if not hasattr(series_app, "list"):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="Series list functionality not available",
)
# Create a new Serie object
# Following the pattern from CLI:
# Serie(key, name, site, folder, episodeDict)
# The key and folder are both the link in this case
# episodeDict is empty {} for a new series
serie = Serie(
key=request.link.strip(),
name=request.name.strip(),
site="aniworld.to",
folder=request.name.strip(),
episodeDict={}
)
# Add the series to the list
series_app.list.add(serie)
# Refresh the series list to update the cache
if hasattr(series_app, "refresh_series_list"):
series_app.refresh_series_list()
return {
"status": "success",
"message": f"Successfully added series: {request.name}"
}
except HTTPException:
raise
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to add series: {str(exc)}",
) from exc
@router.get("/{anime_id}", response_model=AnimeDetail)
async def get_anime(
anime_id: str,
series_app: Optional[Any] = Depends(get_series_app)
) -> AnimeDetail:
"""Return detailed information about a specific series.
Args:
anime_id: Provider key or folder name of the requested series.
series_app: Optional SeriesApp instance provided via dependency.
Returns:
AnimeDetail: Detailed series metadata including episode list.
Raises:
HTTPException: If the anime cannot be located or retrieval fails.
"""
try:
# Check if series_app is available
if not series_app or not hasattr(series_app, "list"):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Series not found",
)
series = series_app.list.GetList()
found = None
for serie in series:
matches_key = getattr(serie, "key", None) == anime_id
matches_folder = getattr(serie, "folder", None) == anime_id
if matches_key or matches_folder:
found = serie
break
if not found:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Series not found",
)
episodes: List[str] = []
episode_dict = getattr(found, "episodeDict", {}) or {}
for season, episode_numbers in episode_dict.items():
for episode in episode_numbers:
episodes.append(f"{season}-{episode}")
return AnimeDetail(
id=getattr(found, "key", getattr(found, "folder", "")),
title=getattr(found, "name", ""),
episodes=episodes,
description=getattr(found, "description", None),
)
except HTTPException:
raise
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to retrieve series details",
) from exc
# Maximum allowed input size for security
MAX_INPUT_LENGTH = 100000 # 100KB