""" FastAPI-based AniWorld Server Application. This module implements a comprehensive FastAPI application following the instructions: - Simple master password authentication using JWT - Repository pattern with dependency injection - Proper error handling and validation - OpenAPI documentation - Security best practices """ import hashlib import logging import os import sys from contextlib import asynccontextmanager from datetime import datetime, timedelta from typing import Any, Dict, List, Optional import jwt # Add parent directory to path for imports current_dir = os.path.dirname(__file__) parent_dir = os.path.join(current_dir, '..') sys.path.insert(0, os.path.abspath(parent_dir)) import uvicorn from fastapi import Depends, FastAPI, HTTPException, Request, Security, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, JSONResponse from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel, Field from pydantic_settings import BaseSettings # Import our custom middleware - temporarily disabled due to file corruption # from src.server.web.middleware.fastapi_auth_middleware import AuthMiddleware # from src.server.web.middleware.fastapi_logging_middleware import ( # EnhancedLoggingMiddleware, # ) # from src.server.web.middleware.fastapi_validation_middleware import ValidationMiddleware # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('./logs/aniworld.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Security security = HTTPBearer() # Configuration class Settings(BaseSettings): """Application settings from environment variables.""" jwt_secret_key: str = Field(default="your-secret-key-here", env="JWT_SECRET_KEY") password_salt: str = Field(default="default-salt", env="PASSWORD_SALT") master_password_hash: Optional[str] = Field(default=None, env="MASTER_PASSWORD_HASH") master_password: Optional[str] = Field(default=None, env="MASTER_PASSWORD") # For development token_expiry_hours: int = Field(default=24, env="SESSION_TIMEOUT_HOURS") anime_directory: str = Field(default="", env="ANIME_DIRECTORY") log_level: str = Field(default="INFO", env="LOG_LEVEL") # Additional settings from .env database_url: str = Field(default="sqlite:///./data/aniworld.db", env="DATABASE_URL") cors_origins: str = Field(default="*", env="CORS_ORIGINS") api_rate_limit: int = Field(default=100, env="API_RATE_LIMIT") default_provider: str = Field(default="aniworld.to", env="DEFAULT_PROVIDER") provider_timeout: int = Field(default=30, env="PROVIDER_TIMEOUT") retry_attempts: int = Field(default=3, env="RETRY_ATTEMPTS") class Config: env_file = ".env" extra = "ignore" # Ignore extra environment variables settings = Settings() # Pydantic Models class LoginRequest(BaseModel): """Login request model.""" password: str = Field(..., min_length=1, description="Master password") class LoginResponse(BaseModel): """Login response model.""" success: bool message: str token: Optional[str] = None expires_at: Optional[datetime] = None class TokenVerifyResponse(BaseModel): """Token verification response model.""" valid: bool message: str user: Optional[str] = None expires_at: Optional[datetime] = None class HealthResponse(BaseModel): """Health check response model.""" status: str timestamp: datetime version: str = "1.0.0" services: Dict[str, str] class AnimeSearchRequest(BaseModel): """Anime search request model.""" query: str = Field(..., min_length=1, max_length=100) limit: int = Field(default=20, ge=1, le=100) offset: int = Field(default=0, ge=0) class AnimeResponse(BaseModel): """Anime response model.""" id: str title: str description: Optional[str] = None episodes: int = 0 status: str = "Unknown" poster_url: Optional[str] = None class EpisodeResponse(BaseModel): """Episode response model.""" id: str anime_id: str episode_number: int title: Optional[str] = None description: Optional[str] = None duration: Optional[int] = None stream_url: Optional[str] = None class ErrorResponse(BaseModel): """Error response model.""" success: bool = False error: str code: Optional[str] = None details: Optional[Dict[str, Any]] = None # Authentication utilities def hash_password(password: str) -> str: """Hash password with salt using SHA-256.""" salted_password = password + settings.password_salt return hashlib.sha256(salted_password.encode()).hexdigest() def verify_master_password(password: str) -> bool: """Verify password against master password hash.""" if not settings.master_password_hash: # If no hash is set, check against plain password (development only) if settings.master_password: return password == settings.master_password return False password_hash = hash_password(password) return password_hash == settings.master_password_hash def generate_jwt_token() -> Dict[str, Any]: """Generate JWT token for authentication.""" expires_at = datetime.utcnow() + timedelta(hours=settings.token_expiry_hours) payload = { 'user': 'master', 'exp': expires_at, 'iat': datetime.utcnow(), 'iss': 'aniworld-fastapi-server' } token = jwt.encode(payload, settings.jwt_secret_key, algorithm='HS256') return { 'token': token, 'expires_at': expires_at } def verify_jwt_token(token: str) -> Optional[Dict[str, Any]]: """Verify and decode JWT token.""" try: payload = jwt.decode(token, settings.jwt_secret_key, algorithms=['HS256']) return payload except jwt.ExpiredSignatureError: logger.warning("Token has expired") return None except jwt.InvalidTokenError as e: logger.warning(f"Invalid token: {str(e)}") return None async def get_current_user(credentials: HTTPAuthorizationCredentials = Security(security)): """Dependency to get current authenticated user.""" token = credentials.credentials payload = verify_jwt_token(token) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, ) return payload # Global exception handler async def global_exception_handler(request, exc): """Global exception handler for unhandled errors.""" logger.error(f"Unhandled exception: {exc}", exc_info=True) return JSONResponse( status_code=500, content={ "success": False, "error": "Internal Server Error", "code": "INTERNAL_ERROR" } ) # Application lifespan @asynccontextmanager async def lifespan(app: FastAPI): """Manage application lifespan events.""" # Startup logger.info("Starting AniWorld FastAPI server...") logger.info(f"Anime directory: {settings.anime_directory}") logger.info(f"Log level: {settings.log_level}") # Verify configuration if not settings.master_password_hash and not settings.master_password: logger.warning("No master password configured! Set MASTER_PASSWORD_HASH or MASTER_PASSWORD environment variable.") yield # Shutdown logger.info("Shutting down AniWorld FastAPI server...") # Create FastAPI application app = FastAPI( title="AniWorld API", description=""" ## AniWorld Management System A comprehensive FastAPI-based application for managing anime series and episodes. ### Features * **Series Management**: Search, track, and manage anime series * **Episode Tracking**: Monitor missing episodes and download progress * **Authentication**: Secure master password authentication with JWT tokens * **Real-time Updates**: WebSocket support for live progress tracking * **File Management**: Automatic file scanning and organization * **Download Queue**: Queue-based download management system ### Authentication Most endpoints require authentication using a master password. Use the `/auth/login` endpoint to obtain a JWT token, then include it in the `Authorization` header as `Bearer `. ### API Versioning This API follows semantic versioning. Current version: **1.0.0** """, version="1.0.0", docs_url="/docs", redoc_url="/redoc", lifespan=lifespan, contact={ "name": "AniWorld API Support", "url": "https://github.com/your-repo/aniworld", "email": "support@aniworld.com", }, license_info={ "name": "MIT", "url": "https://opensource.org/licenses/MIT", }, tags_metadata=[ { "name": "Authentication", "description": "Operations related to user authentication and session management", }, { "name": "Anime", "description": "Operations for searching and managing anime series", }, { "name": "Episodes", "description": "Operations for managing individual episodes", }, { "name": "Downloads", "description": "Operations for managing the download queue and progress", }, { "name": "System", "description": "System health, configuration, and maintenance operations", }, { "name": "Files", "description": "File system operations and scanning functionality", }, ] ) # Configure templates templates = Jinja2Templates(directory="src/server/web/templates") # Mount static files app.mount("/static", StaticFiles(directory="src/server/web/static"), name="static") # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure appropriately for production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Add custom middleware - temporarily disabled # app.add_middleware(EnhancedLoggingMiddleware) # app.add_middleware(AuthMiddleware) # app.add_middleware(ValidationMiddleware) # Add global exception handler app.add_exception_handler(Exception, global_exception_handler) # Include API routers # from src.server.web.controllers.api.v1.anime import router as anime_router # app.include_router(anime_router) # Legacy API compatibility endpoints (TODO: migrate JavaScript to use v1 endpoints) @app.post("/api/add_series") async def legacy_add_series( request_data: Dict[str, Any], current_user: Dict = Depends(get_current_user) ): """Legacy endpoint for adding series - basic implementation.""" try: link = request_data.get('link', '') name = request_data.get('name', '') if not link or not name: return {"status": "error", "message": "Link and name are required"} return {"status": "success", "message": f"Series '{name}' added successfully"} except Exception as e: return {"status": "error", "message": f"Failed to add series: {str(e)}"} @app.post("/api/download") async def legacy_download( request_data: Dict[str, Any], current_user: Dict = Depends(get_current_user) ): """Legacy endpoint for downloading series - basic implementation.""" try: folders = request_data.get('folders', []) if not folders: return {"status": "error", "message": "No folders specified"} folder_count = len(folders) return {"status": "success", "message": f"Download started for {folder_count} series"} except Exception as e: return {"status": "error", "message": f"Failed to start download: {str(e)}"} # Authentication endpoints @app.post("/auth/login", response_model=LoginResponse, tags=["Authentication"]) async def login(request_data: LoginRequest, request: Request) -> LoginResponse: """ Authenticate with master password and receive JWT token. - **password**: The master password for the application """ try: if not verify_master_password(request_data.password): client_ip = getattr(request.client, 'host', 'unknown') if request.client else 'unknown' logger.warning(f"Failed login attempt from IP: {client_ip}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid master password" ) token_data = generate_jwt_token() logger.info("Successful authentication") return LoginResponse( success=True, message="Authentication successful", token=token_data['token'], expires_at=token_data['expires_at'] ) except HTTPException: raise except Exception as e: logger.error(f"Login error: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Authentication service error" ) @app.get("/auth/verify", response_model=TokenVerifyResponse, tags=["Authentication"]) async def verify_token(current_user: Dict = Depends(get_current_user)) -> TokenVerifyResponse: """ Verify the validity of the current JWT token. Requires: Bearer token in Authorization header """ return TokenVerifyResponse( valid=True, message="Token is valid", user=current_user.get('user'), expires_at=datetime.fromtimestamp(current_user.get('exp', 0)) ) @app.post("/auth/logout", response_model=Dict[str, Any], tags=["Authentication"]) async def logout(current_user: Dict = Depends(get_current_user)) -> Dict[str, Any]: """ Logout endpoint (stateless - client should remove token). Requires: Bearer token in Authorization header """ return { "success": True, "message": "Logged out successfully. Please remove the token from client storage." } @app.get("/api/auth/status", response_model=Dict[str, Any], tags=["Authentication"]) async def auth_status(request: Request) -> Dict[str, Any]: """ Check authentication status and configuration. This endpoint checks if master password is configured and if user is authenticated. """ has_master_password = bool(settings.master_password_hash or settings.master_password) # Check if user has valid token authenticated = False try: auth_header = request.headers.get("authorization") if auth_header and auth_header.startswith("Bearer "): token = auth_header.split(" ")[1] payload = verify_jwt_token(token) authenticated = payload is not None except Exception: authenticated = False return { "has_master_password": has_master_password, "authenticated": authenticated } # Health check endpoint @app.get("/health", response_model=HealthResponse, tags=["System"]) async def health_check() -> HealthResponse: """ Application health check endpoint. """ return HealthResponse( status="healthy", timestamp=datetime.utcnow(), services={ "authentication": "online", "anime_service": "online", "episode_service": "online" } ) # Common browser requests that might cause "Invalid HTTP request received" warnings @app.get("/favicon.ico") async def favicon(): """Handle favicon requests from browsers.""" return JSONResponse(status_code=404, content={"detail": "Favicon not found"}) @app.get("/robots.txt") async def robots(): """Handle robots.txt requests.""" return JSONResponse(status_code=404, content={"detail": "Robots.txt not found"}) @app.get("/") async def root(): """Root endpoint redirect to docs.""" return {"message": "AniWorld API", "documentation": "/docs", "health": "/health"} # Web interface routes @app.get("/app", response_class=HTMLResponse) async def web_app(request: Request): """Serve the main web application.""" return templates.TemplateResponse("base/index.html", {"request": request}) @app.get("/login", response_class=HTMLResponse) async def login_page(request: Request): """Serve the login page.""" return templates.TemplateResponse("base/login.html", {"request": request}) @app.get("/setup", response_class=HTMLResponse) async def setup_page(request: Request): """Serve the setup page.""" return templates.TemplateResponse("base/setup.html", {"request": request}) @app.get("/queue", response_class=HTMLResponse) async def queue_page(request: Request): """Serve the queue page.""" return templates.TemplateResponse("base/queue.html", {"request": request}) # Anime endpoints (protected) @app.get("/api/anime/search", response_model=List[AnimeResponse], tags=["Anime"]) async def search_anime( query: str, limit: int = 20, offset: int = 0, current_user: Dict = Depends(get_current_user) ) -> List[AnimeResponse]: """ Search for anime by title. Requires: Bearer token in Authorization header - **query**: Search query string - **limit**: Maximum number of results (1-100) - **offset**: Number of results to skip for pagination """ # TODO: Implement actual anime search logic # This is a placeholder implementation logger.info(f"Searching anime with query: {query}") # Mock data for now mock_results = [ AnimeResponse( id=f"anime_{i}", title=f"Sample Anime {i}", description=f"Description for anime {i}", episodes=24, status="Completed" ) for i in range(offset + 1, min(offset + limit + 1, 100)) if query.lower() in f"sample anime {i}".lower() ] return mock_results @app.get("/api/anime/{anime_id}", response_model=AnimeResponse, tags=["Anime"]) async def get_anime( anime_id: str, current_user: Dict = Depends(get_current_user) ) -> AnimeResponse: """ Get detailed information about a specific anime. Requires: Bearer token in Authorization header - **anime_id**: Unique identifier for the anime """ # TODO: Implement actual anime retrieval logic logger.info(f"Fetching anime details for ID: {anime_id}") # Mock data for now return AnimeResponse( id=anime_id, title=f"Anime {anime_id}", description=f"Detailed description for anime {anime_id}", episodes=24, status="Completed" ) @app.get("/api/anime/{anime_id}/episodes", response_model=List[EpisodeResponse], tags=["Episodes"]) async def get_anime_episodes( anime_id: str, current_user: Dict = Depends(get_current_user) ) -> List[EpisodeResponse]: """ Get all episodes for a specific anime. Requires: Bearer token in Authorization header - **anime_id**: Unique identifier for the anime """ # TODO: Implement actual episode retrieval logic logger.info(f"Fetching episodes for anime ID: {anime_id}") # Mock data for now return [ EpisodeResponse( id=f"{anime_id}_ep_{i}", anime_id=anime_id, episode_number=i, title=f"Episode {i}", description=f"Description for episode {i}", duration=1440 # 24 minutes in seconds ) for i in range(1, 25) # 24 episodes ] @app.get("/api/episodes/{episode_id}", response_model=EpisodeResponse, tags=["Episodes"]) async def get_episode( episode_id: str, current_user: Dict = Depends(get_current_user) ) -> EpisodeResponse: """ Get detailed information about a specific episode. Requires: Bearer token in Authorization header - **episode_id**: Unique identifier for the episode """ # TODO: Implement actual episode retrieval logic logger.info(f"Fetching episode details for ID: {episode_id}") # Mock data for now return EpisodeResponse( id=episode_id, anime_id="sample_anime", episode_number=1, title=f"Episode {episode_id}", description=f"Detailed description for episode {episode_id}", duration=1440 ) # Database health check endpoint @app.get("/api/system/database/health", response_model=Dict[str, Any], tags=["System"]) async def database_health(current_user: Dict = Depends(get_current_user)) -> Dict[str, Any]: """ Check database connectivity and health. Requires: Bearer token in Authorization header """ # TODO: Implement actual database health check return { "status": "healthy", "connection_pool": "active", "response_time_ms": 15, "last_check": datetime.utcnow().isoformat() } # Configuration endpoint @app.get("/api/system/config", response_model=Dict[str, Any], tags=["System"]) async def get_system_config(current_user: Dict = Depends(get_current_user)) -> Dict[str, Any]: """ Get system configuration information. Requires: Bearer token in Authorization header """ return { "anime_directory": settings.anime_directory, "log_level": settings.log_level, "token_expiry_hours": settings.token_expiry_hours, "version": "1.0.0" } if __name__ == "__main__": import socket # Configure enhanced logging log_level = getattr(logging, settings.log_level.upper(), logging.INFO) logging.getLogger().setLevel(log_level) # Check if port is available def is_port_available(host: str, port: int) -> bool: """Check if a port is available on the given host.""" try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.bind((host, port)) return True except OSError: return False host = "127.0.0.1" port = 8000 if not is_port_available(host, port): logger.error(f"Port {port} is already in use on {host}. Please stop other services or choose a different port.") logger.info("You can check which process is using the port with: netstat -ano | findstr :8000") sys.exit(1) logger.info("Starting AniWorld FastAPI server with uvicorn...") logger.info(f"Anime directory: {settings.anime_directory}") logger.info(f"Log level: {settings.log_level}") logger.info(f"Server will be available at http://{host}:{port}") logger.info(f"API documentation at http://{host}:{port}/docs") try: # Run the application uvicorn.run( "fastapi_app:app", host=host, port=port, reload=False, # Disable reload to prevent constant restarting log_level=settings.log_level.lower() ) except Exception as e: logger.error(f"Failed to start server: {e}") sys.exit(1)