import logging import os import warnings from typing import Any, List, Optional from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel, Field from sqlalchemy.ext.asyncio import AsyncSession from src.core.entities.series import Serie from src.server.database.service import AnimeSeriesService from src.server.exceptions import ( BadRequestError, NotFoundError, ServerError, ValidationError, ) from src.server.services.anime_service import AnimeService, AnimeServiceError from src.server.utils.dependencies import ( get_anime_service, get_optional_database_session, get_series_app, require_auth, ) from src.server.utils.filesystem import sanitize_folder_name logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/anime", tags=["anime"]) @router.get("/status") async def get_anime_status( _auth: dict = Depends(require_auth), series_app: Any = Depends(get_series_app), ) -> dict: """Get anime library status information. Args: _auth: Ensures the caller is authenticated (value unused) series_app: Core `SeriesApp` instance provided via dependency Returns: Dict[str, Any]: Status information including directory and series count Raises: HTTPException: If status retrieval fails """ try: directory = ( getattr(series_app, "directory_to_search", "") if series_app else "" ) # Get series count series_count = 0 if series_app and hasattr(series_app, "list"): series = series_app.list.GetList() series_count = len(series) if series else 0 return { "directory": directory, "series_count": series_count } except Exception as exc: raise ServerError( message=f"Failed to get status: {str(exc)}" ) from exc class AnimeSummary(BaseModel): """Summary of an anime series with missing episodes. The `key` field is the unique provider-assigned identifier used for all lookups and operations (URL-safe, e.g., "attack-on-titan"). The `folder` field is metadata only for filesystem operations and display (e.g., "Attack on Titan (2013)") - not used for identification. Attributes: key: Unique series identifier (primary key for all operations) name: Display name of the series site: Provider site URL folder: Filesystem folder name (metadata only) missing_episodes: Episode dictionary mapping seasons to episode numbers has_missing: Boolean flag indicating if series has missing episodes link: Optional link to the series page (used when adding new series) """ key: str = Field( ..., description="Unique series identifier (primary key for all operations)" ) name: str = Field( ..., description="Display name of the series" ) site: str = Field( ..., description="Provider site URL" ) folder: str = Field( ..., description="Filesystem folder name (metadata, not for lookups)" ) missing_episodes: dict = Field( ..., description="Episode dictionary: {season: [episode_numbers]}" ) has_missing: bool = Field( default=False, description="Whether the series has any missing episodes" ) link: Optional[str] = Field( default="", description="Link to the series page (for adding new series)" ) class Config: """Pydantic model configuration.""" json_schema_extra = { "example": { "key": "beheneko-the-elf-girls-cat", "name": "Beheneko", "site": "aniworld.to", "folder": "beheneko the elf girls cat (2025)", "missing_episodes": {"1": [1, 2, 3, 4]}, "has_missing": True, "link": "https://aniworld.to/anime/stream/beheneko" } } class AnimeDetail(BaseModel): """Detailed information about a specific anime series. The `key` field is the unique provider-assigned identifier used for all lookups and operations (URL-safe, e.g., "attack-on-titan"). The `folder` field is metadata only for filesystem operations and display. Attributes: key: Unique series identifier (primary key for all operations) title: Display name of the series folder: Filesystem folder name (metadata only) episodes: List of episode identifiers in "season-episode" format description: Optional description of the series """ key: str = Field( ..., description="Unique series identifier (primary key for all operations)" ) title: str = Field( ..., description="Display name of the series" ) folder: str = Field( default="", description="Filesystem folder name (metadata, not for lookups)" ) episodes: List[str] = Field( ..., description="List of episode identifiers in 'season-episode' format" ) description: Optional[str] = Field( default=None, description="Optional description of the series" ) class Config: """Pydantic model configuration.""" json_schema_extra = { "example": { "key": "attack-on-titan", "title": "Attack on Titan", "folder": "Attack on Titan (2013)", "episodes": ["1-1", "1-2", "1-3"], "description": "Humans fight against giant humanoid Titans." } } @router.get("/", response_model=List[AnimeSummary]) @router.get("", response_model=List[AnimeSummary]) async def list_anime( page: Optional[int] = 1, per_page: Optional[int] = 20, sort_by: Optional[str] = None, filter: Optional[str] = None, _auth: dict = Depends(require_auth), series_app: Any = Depends(get_series_app), ) -> List[AnimeSummary]: """List all library series with their missing episodes status. Returns AnimeSummary objects where `key` is the primary identifier used for all operations. The `folder` field is metadata only and should not be used for lookups. All series are returned, with `has_missing` flag indicating whether a series has any missing episodes. Args: page: Page number for pagination (must be positive) per_page: Items per page (must be positive, max 1000) sort_by: Optional sorting parameter. Allowed: title, id, name, missing_episodes filter: Optional filter parameter (validated for security) _auth: Ensures the caller is authenticated (value unused) series_app: Core SeriesApp instance provided via dependency. Returns: List[AnimeSummary]: Summary entries with `key` as primary identifier. Each entry includes: - key: Unique series identifier (use for all operations) - name: Display name - site: Provider site - folder: Filesystem folder name (metadata only) - missing_episodes: Dict mapping seasons to episode numbers - has_missing: Whether the series has any missing episodes Raises: HTTPException: When the underlying lookup fails or params invalid. """ # Validate pagination parameters if page is not None: try: page_num = int(page) if page_num < 1: raise ValidationError( message="Page number must be positive" ) page = page_num except (ValueError, TypeError): raise ValidationError( message="Page must be a valid number" ) if per_page is not None: try: per_page_num = int(per_page) if per_page_num < 1: raise ValidationError( message="Per page must be positive" ) if per_page_num > 1000: raise ValidationError( message="Per page cannot exceed 1000" ) per_page = per_page_num except (ValueError, TypeError): raise ValidationError( message="Per page must be a valid number" ) # Validate sort_by parameter to prevent ORM injection if sort_by: # Only allow safe sort fields allowed_sort_fields = ["title", "id", "missing_episodes", "name"] if sort_by not in allowed_sort_fields: allowed = ", ".join(allowed_sort_fields) raise ValidationError( message=f"Invalid sort_by parameter. Allowed: {allowed}" ) # Validate filter parameter if filter: # Check for dangerous patterns in filter dangerous_patterns = [ ";", "--", "/*", "*/", "drop", "delete", "insert", "update" ] lower_filter = filter.lower() for pattern in dangerous_patterns: if pattern in lower_filter: raise ValidationError( message="Invalid filter parameter" ) try: # Get all series from series app if not hasattr(series_app, "list"): return [] series = series_app.list.GetList() summaries: List[AnimeSummary] = [] for serie in series: # Get all properties from the serie object key = getattr(serie, "key", "") name = getattr(serie, "name", "") site = getattr(serie, "site", "") folder = getattr(serie, "folder", "") episode_dict = getattr(serie, "episodeDict", {}) or {} # Convert episode dict keys to strings for JSON serialization missing_episodes = {str(k): v for k, v in episode_dict.items()} # Determine if series has missing episodes has_missing = bool(episode_dict) summaries.append( AnimeSummary( key=key, name=name, site=site, folder=folder, missing_episodes=missing_episodes, has_missing=has_missing, ) ) # Apply sorting if requested if sort_by: if sort_by in ["title", "name"]: summaries.sort(key=lambda x: x.name or x.key) elif sort_by == "id": summaries.sort(key=lambda x: x.key) elif sort_by == "missing_episodes": # Sort by total number of missing episodes # (count all episodes across all seasons) summaries.sort( key=lambda x: sum( len(eps) for eps in x.missing_episodes.values() ), reverse=True ) return summaries except (ValidationError, BadRequestError, NotFoundError, ServerError): raise except Exception as exc: raise ServerError( message="Failed to retrieve anime list" ) from exc @router.post("/rescan") async def trigger_rescan( _auth: dict = Depends(require_auth), anime_service: AnimeService = Depends(get_anime_service), ) -> dict: """Kick off a rescan of the local library. Args: _auth: Ensures the caller is authenticated (value unused) anime_service: AnimeService instance provided via dependency. Returns: Dict[str, Any]: Status payload confirming scan started Raises: HTTPException: If the rescan command fails. """ try: # Use the async rescan method from AnimeService # Progress tracking is handled automatically via event handlers await anime_service.rescan() return { "success": True, "message": "Rescan started successfully", } except AnimeServiceError as e: raise ServerError( message=f"Rescan failed: {str(e)}" ) from e except Exception as exc: raise ServerError( message="Failed to start rescan" ) from exc @router.get("/scan/status") async def get_scan_status( _auth: dict = Depends(require_auth), anime_service: AnimeService = Depends(get_anime_service), ) -> dict: """Get the current scan status. Returns the current state of any ongoing library scan, useful for restoring UI state after page reload. Args: _auth: Ensures the caller is authenticated (value unused) anime_service: AnimeService instance provided via dependency. Returns: Dict[str, Any]: Current scan status including: - is_scanning: Whether a scan is in progress - total_items: Total items to scan - directories_scanned: Items scanned so far - current_directory: Current item being scanned - directory: Root scan directory """ return anime_service.get_scan_status() class AddSeriesRequest(BaseModel): """Request model for adding a new series.""" link: str name: str def validate_search_query(query: str) -> str: """Validate and sanitize search query. Args: query: The search query string Returns: str: The validated query Raises: HTTPException: If query is invalid """ if not query or not query.strip(): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Search query cannot be empty" ) # Check for null bytes if "\x00" in query: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Null bytes not allowed in query" ) # Limit query length to prevent abuse if len(query) > 200: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Search query too long (max 200 characters)" ) # Strip and normalize whitespace normalized = " ".join(query.strip().split()) # Prevent SQL-like injection patterns dangerous_patterns = [ "--", "/*", "*/", "xp_", "sp_", "exec", "execute", "union", "select", "insert", "update", "delete", "drop", "create", "alter", "truncate", "sleep", "waitfor", "benchmark", " or ", "||", " and ", "&&" ] lower_query = normalized.lower() for pattern in dangerous_patterns: if pattern in lower_query: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid character sequence detected" ) return normalized class SearchAnimeRequest(BaseModel): """Request model for searching anime.""" query: str = Field(..., min_length=1, description="Search query string") @router.get("/search", response_model=List[AnimeSummary]) async def search_anime_get( query: str, series_app: Optional[Any] = Depends(get_series_app), ) -> List[AnimeSummary]: """Search the provider for additional series matching a query (GET). Returns AnimeSummary objects where `key` is the primary identifier. Use the `key` field for subsequent operations (add, download, etc.). Args: query: Search term passed as query parameter series_app: Optional SeriesApp instance provided via dependency. Returns: List[AnimeSummary]: Discovered matches with `key` as identifier. Raises: HTTPException: When provider communication fails or query is invalid. """ return await _perform_search(query, series_app) @router.post( "/search", response_model=List[AnimeSummary], ) async def search_anime_post( request: SearchAnimeRequest, series_app: Optional[Any] = Depends(get_series_app), ) -> List[AnimeSummary]: """Search the provider for additional series matching a query (POST). Returns AnimeSummary objects where `key` is the primary identifier. Use the `key` field for subsequent operations (add, download, etc.). Args: request: Request containing the search query series_app: Optional SeriesApp instance provided via dependency. Returns: List[AnimeSummary]: Discovered matches with `key` as identifier. Raises: HTTPException: When provider communication fails or query is invalid. """ return await _perform_search(request.query, series_app) async def _perform_search( query: str, series_app: Optional[Any], ) -> List[AnimeSummary]: """Search for anime series matching the given query. This internal function performs the actual search logic, extracting results from the provider and converting them to AnimeSummary objects. The returned summaries use `key` as the primary identifier. The `key` is extracted from the result's key field (preferred) or derived from the link URL if not available. The `folder` field is metadata only. Args: query: Search term (will be validated and sanitized) series_app: Optional SeriesApp instance for search. Returns: List[AnimeSummary]: Discovered matches with `key` as identifier and `folder` as metadata. Each summary includes: - key: Unique series identifier (primary) - name: Display name - site: Provider site - folder: Filesystem folder name (metadata) - link: URL to series page - missing_episodes: Episode dictionary Raises: HTTPException: When provider communication fails or query is invalid. """ try: # Validate and sanitize the query validated_query = validate_search_query(query) # Check if series_app is available if not series_app: # Return empty list if service unavailable # Tests can verify validation without needing a real series_app return [] matches: List[Any] = [] if hasattr(series_app, "search"): # SeriesApp.search is async; await the result matches = await series_app.search(validated_query) summaries: List[AnimeSummary] = [] for match in matches: if isinstance(match, dict): # Extract key (primary identifier) key = match.get("key") or match.get("id") or "" title = match.get("title") or match.get("name") or "" site = match.get("site") or "" folder = match.get("folder") or "" link = match.get("link") or match.get("url") or "" missing = ( match.get("missing_episodes") or match.get("missing") or {} ) # If key is empty, try to extract from link if not key and link: if "/anime/stream/" in link: key = link.split("/anime/stream/")[-1].split("/")[0] elif link and "/" not in link: # Link is just a slug (e.g., "attack-on-titan") key = link else: # Extract key (primary identifier) key = getattr(match, "key", "") or getattr(match, "id", "") title = getattr(match, "title", "") or getattr( match, "name", "" ) site = getattr(match, "site", "") folder = getattr(match, "folder", "") link = getattr(match, "link", "") or getattr( match, "url", "" ) missing = getattr(match, "missing_episodes", {}) # If key is empty, try to extract from link if not key and link: if "/anime/stream/" in link: key = link.split("/anime/stream/")[-1].split("/")[0] elif link and "/" not in link: # Link is just a slug (e.g., "attack-on-titan") key = link summaries.append( AnimeSummary( key=key, name=title, site=site, folder=folder, link=link, missing_episodes=missing, ) ) return summaries except HTTPException: raise except Exception as exc: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Search failed", ) from exc @router.post("/add") async def add_series( request: AddSeriesRequest, _auth: dict = Depends(require_auth), series_app: Any = Depends(get_series_app), db: Optional[AsyncSession] = Depends(get_optional_database_session), anime_service: AnimeService = Depends(get_anime_service), ) -> dict: """Add a new series to the library with full initialization. This endpoint performs the complete series addition flow: 1. Validates inputs and extracts the series key from the link URL 2. Creates a sanitized folder name from the display name 3. Saves the series to the database (if available) 4. Creates the folder on disk with the sanitized name 5. Triggers a targeted scan for missing episodes (only this series) The `key` is the URL-safe identifier used for all lookups. The `name` is stored as display metadata and used to derive the filesystem folder name (sanitized for filesystem safety). Args: request: Request containing the series link and name. - link: URL to the series (e.g., aniworld.to/anime/stream/key) - name: Display name for the series _auth: Ensures the caller is authenticated (value unused) series_app: Core `SeriesApp` instance provided via dependency db: Optional database session for async operations anime_service: AnimeService for scanning operations Returns: Dict[str, Any]: Status payload with: - status: "success" or "exists" - message: Human-readable status message - key: Series unique identifier - folder: Created folder path - db_id: Database ID (if saved to DB) - missing_episodes: Dict of missing episodes by season - total_missing: Total count of missing episodes Raises: HTTPException: If adding the series fails or link is invalid """ try: # Step A: Validate inputs if not request.link or not request.link.strip(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Series link cannot be empty", ) if not request.name or not request.name.strip(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Series name cannot be empty", ) # Extract key from link URL # Expected format: https://aniworld.to/anime/stream/{key} link = request.link.strip() key = link # Try to extract key from URL path if "/anime/stream/" in link: # Extract everything after /anime/stream/ key = link.split("/anime/stream/")[-1].split("/")[0].strip() elif "/" in link: # Fallback: use last path segment key = link.rstrip("/").split("/")[-1].strip() # Validate extracted key if not key: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Could not extract series key from link", ) # Step B: Create sanitized folder name from display name name = request.name.strip() try: folder = sanitize_folder_name(name) except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid series name for folder: {str(e)}", ) db_id = None missing_episodes: dict = {} scan_error: Optional[str] = None # Step C: Save to database if available if db is not None: # Check if series already exists in database existing = await AnimeSeriesService.get_by_key(db, key) if existing: return { "status": "exists", "message": f"Series already exists: {name}", "key": key, "folder": existing.folder, "db_id": existing.id, "missing_episodes": {}, "total_missing": 0 } # Save to database using AnimeSeriesService anime_series = await AnimeSeriesService.create( db=db, key=key, name=name, site="aniworld.to", folder=folder, ) db_id = anime_series.id logger.info( "Added series to database: %s (key=%s, db_id=%d)", name, key, db_id ) # Step D: Add to SerieList (in-memory only, no folder creation) if series_app and hasattr(series_app, "list"): serie = Serie( key=key, name=name, site="aniworld.to", folder=folder, episodeDict={} ) # Add to in-memory cache without creating folder on disk if hasattr(series_app.list, 'keyDict'): series_app.list.keyDict[key] = serie logger.info( "Added series to in-memory cache: %s (key=%s, folder=%s)", name, key, folder ) # Step E: Trigger targeted scan for missing episodes try: if series_app and hasattr(series_app, "serie_scanner"): missing_episodes = series_app.serie_scanner.scan_single_series( key=key, folder=folder ) logger.info( "Targeted scan completed for %s: found %d missing episodes", key, sum(len(eps) for eps in missing_episodes.values()) ) # Update the serie in keyDict with the missing episodes if hasattr(series_app, "list") and hasattr(series_app.list, "keyDict"): if key in series_app.list.keyDict: series_app.list.keyDict[key].episodeDict = missing_episodes else: # Scanner not available - this shouldn't happen in normal operation logger.warning( "Scanner not available for targeted scan of %s", key ) except Exception as e: # Scan failure is not critical - series was still added scan_error = str(e) logger.warning( "Targeted scan failed for %s: %s (series still added)", key, e ) # Convert missing episodes keys to strings for JSON serialization missing_episodes_serializable = { str(season): episodes for season, episodes in missing_episodes.items() } # Calculate total missing total_missing = sum(len(eps) for eps in missing_episodes.values()) # Step F: Return response response = { "status": "success", "message": f"Successfully added series: {name}", "key": key, "folder": folder, "db_id": db_id, "missing_episodes": missing_episodes_serializable, "total_missing": total_missing } if scan_error: response["scan_warning"] = f"Scan partially failed: {scan_error}" return response except HTTPException: raise except Exception as exc: logger.error("Failed to add series: %s", exc, exc_info=True) # Attempt to rollback database entry if folder creation failed # (This is a best-effort cleanup) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to add series: {str(exc)}", ) from exc @router.get("/{anime_id}", response_model=AnimeDetail) async def get_anime( anime_id: str, series_app: Optional[Any] = Depends(get_series_app) ) -> AnimeDetail: """Return detailed information about a specific series. The `anime_id` parameter should be the series `key` (primary identifier). For backward compatibility, lookups by `folder` are also supported but deprecated. The `key` is checked first, then `folder` as fallback. Args: anime_id: Series `key` (primary) or `folder` (deprecated fallback). series_app: Optional SeriesApp instance provided via dependency. Returns: AnimeDetail: Detailed series metadata including episode list. Response includes `key` as the primary identifier and `folder` as metadata. Raises: HTTPException: If the anime cannot be located or retrieval fails. """ try: # Check if series_app is available if not series_app or not hasattr(series_app, "list"): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Series not found", ) series = series_app.list.GetList() found = None # Primary lookup: search by key first (preferred) for serie in series: if getattr(serie, "key", None) == anime_id: found = serie break # Fallback lookup: search by folder (backward compatibility) if not found: for serie in series: if getattr(serie, "folder", None) == anime_id: found = serie # Log deprecation warning for folder-based lookup key = getattr(serie, "key", "unknown") logger.warning( "Folder-based lookup for '%s' is deprecated. " "Use series key '%s' instead. Folder-based lookups " "will be removed in v3.0.0.", anime_id, key ) warnings.warn( f"Folder-based lookup for '{anime_id}' is deprecated. " f"Use series key '{key}' instead.", DeprecationWarning, stacklevel=2 ) break if not found: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Series not found", ) episodes: List[str] = [] episode_dict = getattr(found, "episodeDict", {}) or {} for season, episode_numbers in episode_dict.items(): for episode in episode_numbers: episodes.append(f"{season}-{episode}") # Return AnimeDetail with key as the primary identifier return AnimeDetail( key=getattr(found, "key", ""), title=getattr(found, "name", ""), folder=getattr(found, "folder", ""), episodes=episodes, description=getattr(found, "description", None), ) except HTTPException: raise except Exception as exc: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve series details", ) from exc # Maximum allowed input size for security MAX_INPUT_LENGTH = 100000 # 100KB