Files
Aniworld/src/server/fastapi_app.py
Lukas fed6162452 fix: load series from database on every startup
- Add _load_series_from_db call in lifespan startup
- Series now loaded into memory on every app start
- Fixes empty anime list issue (GET /api/anime)
2026-01-23 17:26:42 +01:00

469 lines
18 KiB
Python

"""
FastAPI application for Aniworld anime download manager.
This module provides the main FastAPI application with proper CORS
configuration, middleware setup, static file serving, and Jinja2 template
integration.
"""
import asyncio
from contextlib import asynccontextmanager
from pathlib import Path
import uvicorn
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from src.config.settings import settings
# Import core functionality
from src.infrastructure.logging import setup_logging
from src.server.api.anime import router as anime_router
from src.server.api.auth import router as auth_router
from src.server.api.config import router as config_router
from src.server.api.download import router as download_router
from src.server.api.health import router as health_router
from src.server.api.nfo import router as nfo_router
from src.server.api.scheduler import router as scheduler_router
from src.server.api.websocket import router as websocket_router
from src.server.controllers.error_controller import (
not_found_handler,
server_error_handler,
)
# Import controllers
from src.server.controllers.page_controller import router as page_router
from src.server.middleware.auth import AuthMiddleware
from src.server.middleware.error_handler import register_exception_handlers
from src.server.middleware.setup_redirect import SetupRedirectMiddleware
from src.server.services.anime_service import sync_series_from_data_files
from src.server.services.progress_service import get_progress_service
from src.server.services.websocket_service import get_websocket_service
# Prefer storing application-wide singletons on FastAPI.state instead of
# module-level globals. This makes testing and multi-instance hosting safer.
async def _check_incomplete_series_on_startup(background_loader) -> None:
"""Check for incomplete series on startup and queue background loading.
Args:
background_loader: BackgroundLoaderService instance
"""
logger = setup_logging(log_level="INFO")
try:
from src.server.database.connection import get_db_session
from src.server.database.service import AnimeSeriesService
async with get_db_session() as db:
try:
# Get all series from database
series_list = await AnimeSeriesService.get_all(db)
incomplete_series = []
for series in series_list:
# Check if series has incomplete loading
if series.loading_status != "completed":
incomplete_series.append(series)
# Or check if specific data is missing
elif (not series.episodes_loaded or
not series.has_nfo or
not series.logo_loaded or
not series.images_loaded):
incomplete_series.append(series)
if incomplete_series:
logger.info(
f"Found {len(incomplete_series)} series with missing data. "
f"Queuing for background loading..."
)
for series in incomplete_series:
await background_loader.add_series_loading_task(
key=series.key,
folder=series.folder,
name=series.name,
year=series.year
)
logger.debug(
f"Queued background loading for series: {series.key}"
)
logger.info("All incomplete series queued for background loading")
else:
logger.info("All series data is complete. No background loading needed.")
except Exception as e:
logger.error(f"Error checking incomplete series: {e}", exc_info=True)
except Exception as e:
logger.error(f"Failed to check incomplete series on startup: {e}", exc_info=True)
@asynccontextmanager
async def lifespan(_application: FastAPI):
"""Manage application lifespan (startup and shutdown).
Args:
_application: The FastAPI application instance (unused but required
by the lifespan protocol).
"""
# Setup logging first with INFO level
logger = setup_logging(log_level="INFO")
# Track successful initialization steps
initialized = {
'database': False,
'services': False,
'background_loader': False
}
# Startup
startup_error = None
try:
logger.info("Starting FastAPI application...")
# Initialize database first (required for other services)
try:
from src.server.database.connection import init_db
await init_db()
initialized['database'] = True
logger.info("Database initialized successfully")
except Exception as e:
logger.error("Failed to initialize database: %s", e, exc_info=True)
startup_error = e
raise # Database is required, fail startup if it fails
# Load configuration from config.json and sync with settings
try:
from src.server.services.config_service import get_config_service
config_service = get_config_service()
config = config_service.load_config()
logger.debug(
"Config loaded: other=%s", config.other
)
# Sync anime_directory from config.json to settings
# config.other is Dict[str, object] - pylint doesn't infer this
other_settings = dict(config.other) if config.other else {}
if other_settings.get("anime_directory"):
anime_dir = other_settings["anime_directory"]
settings.anime_directory = str(anime_dir)
logger.info(
"Loaded anime_directory from config: %s",
settings.anime_directory
)
else:
logger.debug(
"anime_directory not found in config.other"
)
# Sync NFO settings from config.json to settings
if config.nfo:
if config.nfo.tmdb_api_key:
settings.tmdb_api_key = config.nfo.tmdb_api_key
logger.info("Loaded TMDB API key from config")
settings.nfo_auto_create = config.nfo.auto_create
settings.nfo_update_on_scan = config.nfo.update_on_scan
settings.nfo_download_poster = config.nfo.download_poster
settings.nfo_download_logo = config.nfo.download_logo
settings.nfo_download_fanart = config.nfo.download_fanart
settings.nfo_image_size = config.nfo.image_size
logger.debug("Synced NFO settings from config")
except (OSError, ValueError, KeyError) as e:
logger.warning("Failed to load config from config.json: %s", e)
# Initialize progress service with event subscription
progress_service = get_progress_service()
ws_service = get_websocket_service()
async def progress_event_handler(event) -> None:
"""Handle progress events and broadcast via WebSocket.
Args:
event: ProgressEvent containing progress update data
"""
message = {
"type": event.event_type,
"data": event.progress.to_dict(),
}
await ws_service.manager.broadcast_to_room(message, event.room)
# Subscribe to progress events
progress_service.subscribe("progress_updated", progress_event_handler)
# Perform initial setup (series sync and marking as completed)
# This is centralized in initialization_service and also called
# from the setup endpoint
from src.server.services.initialization_service import (
perform_initial_setup,
perform_media_scan_if_needed,
perform_nfo_scan_if_needed,
)
try:
logger.info(
"Checking anime_directory setting: '%s'",
settings.anime_directory
)
if settings.anime_directory:
# Perform initial setup if needed
await perform_initial_setup()
# Get anime service for later use
from src.server.utils.dependencies import get_anime_service
anime_service = get_anime_service()
# Always load series from database into memory on startup
logger.info("Loading series from database into memory...")
await anime_service._load_series_from_db()
logger.info("Series loaded from database into memory")
# Run NFO scan only on first run (if configured)
await perform_nfo_scan_if_needed()
# Now initialize download service (will use data from database)
from src.server.utils.dependencies import get_download_service
download_service = get_download_service()
await download_service.initialize()
initialized['services'] = True
logger.info("Download service initialized and queue restored")
# Initialize background loader service
from src.server.services.background_loader_service import (
init_background_loader_service,
)
from src.server.utils.dependencies import get_series_app
series_app_instance = get_series_app()
background_loader = init_background_loader_service(
websocket_service=ws_service,
anime_service=anime_service,
series_app=series_app_instance
)
await background_loader.start()
initialized['background_loader'] = True
logger.info("Background loader service started")
# Run media scan only on first run
await perform_media_scan_if_needed(background_loader)
else:
logger.info(
"Download service initialization skipped - "
"anime directory not configured"
)
except (OSError, RuntimeError, ValueError) as e:
logger.warning("Failed to initialize services: %s", e)
# Continue startup - services can be initialized later
logger.info("FastAPI application started successfully")
logger.info("Server running on http://127.0.0.1:8000")
logger.info(
"API documentation available at http://127.0.0.1:8000/api/docs"
)
except Exception as e:
logger.error("Error during startup: %s", e, exc_info=True)
startup_error = e
# Don't re-raise here, let the finally/cleanup handle shutdown
# Yield control to the application (or immediately go to cleanup on error)
if startup_error is None:
try:
yield
except Exception as e:
logger.error("Error during application runtime: %s", e, exc_info=True)
else:
# Startup failed, but we still need to yield to satisfy the protocol
# The app won't actually run since we'll raise after cleanup
try:
yield
finally:
# After cleanup, re-raise the startup error
pass
# Shutdown - execute in proper order with timeout protection
logger.info("FastAPI application shutting down (graceful shutdown initiated)")
# Only cleanup what was successfully initialized
if not initialized['database']:
logger.info("Database was not initialized, skipping all cleanup")
if startup_error:
raise startup_error
return
# Define shutdown timeout (total time allowed for all shutdown operations)
SHUTDOWN_TIMEOUT = 30.0
import time
shutdown_start = time.monotonic()
def remaining_time() -> float:
"""Calculate remaining shutdown time."""
elapsed = time.monotonic() - shutdown_start
return max(0.0, SHUTDOWN_TIMEOUT - elapsed)
# 1. Stop background loader service (only if initialized)
if initialized['background_loader']:
try:
from src.server.utils.dependencies import _background_loader_service
if _background_loader_service is not None:
logger.info("Stopping background loader service...")
await asyncio.wait_for(
_background_loader_service.stop(),
timeout=min(10.0, remaining_time())
)
logger.info("Background loader service stopped")
except asyncio.TimeoutError:
logger.warning("Background loader service shutdown timed out")
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Error stopping background loader service: %s", e, exc_info=True)
# 2. Broadcast shutdown notification via WebSocket
try:
ws_service = get_websocket_service()
logger.info("Broadcasting shutdown notification to WebSocket clients...")
await asyncio.wait_for(
ws_service.shutdown(timeout=min(5.0, remaining_time())),
timeout=min(5.0, remaining_time())
)
logger.info("WebSocket shutdown complete")
except asyncio.TimeoutError:
logger.warning("WebSocket shutdown timed out")
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Error during WebSocket shutdown: %s", e, exc_info=True)
# 3. Shutdown download service and persist active downloads
try:
from src.server.services.download_service import ( # noqa: E501
_download_service_instance,
)
if _download_service_instance is not None:
logger.info("Stopping download service...")
logger.info("Download service stopped successfully")
except asyncio.TimeoutError:
logger.warning("Download service shutdown timed out")
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Error stopping download service: %s", e, exc_info=True)
# 4. Shutdown SeriesApp and cleanup thread pool
try:
from src.server.utils.dependencies import _series_app
if _series_app is not None:
logger.info("Shutting down SeriesApp thread pool...")
_series_app.shutdown()
logger.info("SeriesApp shutdown complete")
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Error during SeriesApp shutdown: %s", e, exc_info=True)
# 5. Cleanup progress service
try:
progress_service = get_progress_service()
logger.info("Cleaning up progress service...")
# Clear any active progress tracking and subscribers
progress_service._active_progress.clear()
logger.info("Progress service cleanup complete")
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(
"Error cleaning up progress service: %s", e, exc_info=True
)
# 5. Close database connections with WAL checkpoint
try:
from src.server.database.connection import close_db
logger.info("Closing database connections...")
await asyncio.wait_for(
close_db(),
timeout=min(10.0, remaining_time())
)
logger.info("Database connections closed")
except asyncio.TimeoutError:
logger.warning("Database shutdown timed out")
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Error closing database: %s", e, exc_info=True)
elapsed_total = time.monotonic() - shutdown_start
logger.info(
"FastAPI application shutdown complete (took %.2fs)",
elapsed_total
)
# Re-raise startup error if it occurred
if startup_error:
raise startup_error
# Initialize FastAPI app with lifespan
app = FastAPI(
title="Aniworld Download Manager",
description="Modern web interface for Aniworld anime download management",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc",
lifespan=lifespan
)
# Configure CORS using environment-driven configuration.
allowed_origins = settings.allowed_origins or [
"http://localhost:3000",
"http://localhost:8000",
]
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Configure static files
STATIC_DIR = Path(__file__).parent / "web" / "static"
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
# Attach setup redirect middleware (runs before auth checks)
app.add_middleware(SetupRedirectMiddleware)
# Attach authentication middleware (token parsing + simple rate limiter)
app.add_middleware(AuthMiddleware, rate_limit_per_minute=5)
# Include routers
app.include_router(health_router)
app.include_router(page_router)
app.include_router(auth_router)
app.include_router(config_router)
app.include_router(scheduler_router)
app.include_router(anime_router)
app.include_router(download_router)
app.include_router(nfo_router)
app.include_router(websocket_router)
# Register exception handlers
register_exception_handlers(app)
@app.exception_handler(404)
async def handle_not_found(request: Request, exc: HTTPException):
"""Custom 404 handler."""
return await not_found_handler(request, exc)
@app.exception_handler(500)
async def handle_server_error(request: Request, exc: Exception):
"""Custom 500 handler."""
return await server_error_handler(request, exc)
if __name__ == "__main__":
uvicorn.run(
"fastapi_app:app",
host="127.0.0.1",
port=8000,
reload=True,
log_level="info"
)