Aniworld/infrastructure.md
Lukas 94de91ffa0 feat: implement WebSocket real-time progress updates
- Add ProgressService for centralized progress tracking and broadcasting
- Integrate ProgressService with DownloadService for download progress
- Integrate ProgressService with AnimeService for scan progress
- Add progress-related WebSocket message models (ScanProgress, ErrorNotification, etc.)
- Initialize ProgressService with WebSocket callback in application startup
- Add comprehensive unit tests for ProgressService
- Update infrastructure.md with ProgressService documentation
- Remove completed WebSocket Real-time Updates task from instructions.md

The ProgressService provides:
- Real-time progress tracking for downloads, scans, and queue operations
- Automatic progress percentage calculation
- Progress lifecycle management (start, update, complete, fail, cancel)
- WebSocket integration for instant client updates
- Progress history with size limits
- Thread-safe operations using asyncio locks
- Support for metadata and custom messages

Benefits:
- Decoupled progress tracking from WebSocket broadcasting
- Single reusable service across all components
- Supports multiple concurrent operations efficiently
- Centralized progress tracking simplifies monitoring
- Instant feedback to users on long-running operations
2025-10-17 11:12:06 +02:00

27 KiB

Aniworld Web Application Infrastructure

conda activate AniWorld

Project Structure

/home/lukas/Volume/repo/Aniworld/
├── src/
│   ├── server/                 # FastAPI web application
│   │   ├── fastapi_app.py     # Main FastAPI application (simplified)
│   │   ├── main.py            # FastAPI application entry point
│   │   ├── controllers/       # Route controllers
│   │   │   ├── __init__.py    # Controllers package
│   │   │   ├── health_controller.py  # Health check endpoints
│   │   │   ├── page_controller.py    # HTML page routes
│   │   │   └── error_controller.py   # Error handling controllers
│   │   ├── api/               # API route handlers
│   │   │   ├── __init__.py
│   │   │   ├── auth.py        # Authentication endpoints
│   │   │   ├── config.py      # Configuration endpoints
│   │   │   ├── anime.py       # Anime management endpoints
│   │   │   ├── download.py    # Download queue endpoints
│   │   │   ├── websocket.py   # WebSocket real-time endpoints
│   │   │   └── search.py      # Search endpoints
│   │   ├── models/            # Pydantic models
│   │   │   ├── __init__.py
│   │   │   ├── auth.py
│   │   │   ├── config.py
│   │   │   ├── anime.py
│   │   │   ├── download.py
│   │   │   └── websocket.py   # WebSocket message models
│   │   ├── services/          # Business logic services
│   │   │   ├── __init__.py
│   │   │   ├── auth_service.py
│   │   │   ├── config_service.py
│   │   │   ├── anime_service.py
│   │   │   ├── download_service.py
│   │   │   └── websocket_service.py  # WebSocket connection management
│   │   ├── utils/             # Utility functions
│   │   │   ├── __init__.py
│   │   │   ├── security.py
│   │   │   ├── dependencies.py  # Dependency injection
│   │   │   └── templates.py     # Shared Jinja2 template config
│   │   └── web/               # Frontend assets
│   │       ├── templates/     # Jinja2 HTML templates
│   │       │   ├── base.html
│   │       │   ├── login.html
│   │       │   ├── setup.html
│   │       │   ├── config.html
│   │       │   ├── anime.html
│   │       │   ├── download.html
│   │       │   └── search.html
│   │       └── static/        # Static web assets
│   │           ├── css/
│   │           ├── js/
│   │           └── images/
│   ├── core/                  # Existing core functionality
│   └── cli/                   # Existing CLI application
├── data/                      # Application data storage
│   ├── config.json           # Application configuration
│   ├── anime_library.db      # SQLite database for anime library
│   ├── download_queue.json   # Download queue state
│   └── cache/                # Temporary cache files
├── logs/                     # Application logs
│   ├── app.log              # Main application log
│   ├── download.log         # Download-specific logs
│   └── error.log            # Error logs
├── requirements.txt         # Python dependencies
├── docker-compose.yml       # Docker deployment configuration
└── README.md

Technology Stack

Backend

  • FastAPI: Modern Python web framework for building APIs
  • Uvicorn: ASGI server for running FastAPI applications
  • SQLite: Lightweight database for storing anime library and configuration
  • Pydantic: Data validation and serialization
  • Jinja2: Template engine for server-side rendering

Frontend

  • HTML5/CSS3: Core web technologies
  • JavaScript (Vanilla): Client-side interactivity
  • Bootstrap 5: CSS framework for responsive design
  • HTMX: Modern approach for dynamic web applications

Security

  • Passlib: Password hashing and verification
  • python-jose: JWT token handling
  • bcrypt: Secure password hashing

Authentication Models & Sessions

  • Authentication request/response Pydantic models live in src/server/models/auth.py.
  • Sessions are represented by SessionModel and can be backed by an in-memory store or a persistent table depending on deployment needs. JWTs are used for stateless authentication by default; a persistent session store may be configured in production to enable revocation and long-lived sessions.

Configuration

Data Storage

  • Configuration: JSON files in data/ directory
  • Anime Library: SQLite database with series information
  • Download Queue: JSON file with current download status
  • Logs: Structured logging to files in logs/ directory

API Endpoints

Authentication

  • POST /api/auth/login - Master password authentication
  • POST /api/auth/logout - Logout and invalidate session
  • GET /api/auth/status - Check authentication status

Configuration

  • GET /api/config - Get current configuration
  • PUT /api/config - Update configuration
  • POST /api/setup - Initial setup

Configuration API Notes

  • The configuration endpoints are exposed under /api/config and operate primarily on a JSON-serializable AppConfig model. They are designed to be lightweight and avoid performing IO during validation (the /api/config/validate endpoint runs in-memory checks only).
  • Persistence of configuration changes is intentionally "best-effort" for now and mirrors fields into the runtime settings object. A follow-up task should add durable storage (file or DB) for configs.

Anime Management

  • GET /api/anime - List anime with missing episodes
  • POST /api/anime/{id}/download - Add episodes to download queue
  • GET /api/anime/{id} - Get anime details

Note: The anime management API has been implemented under /api/v1/anime with endpoints for listing series with missing episodes, searching providers, triggering a local rescan, and fetching series details. The implementation delegates to the existing core SeriesApp and uses dependency injection for initialization.

Download Management

  • GET /api/queue/status - Get download queue status and statistics
  • POST /api/queue/add - Add episodes to download queue
  • DELETE /api/queue/{id} - Remove item from queue
  • DELETE /api/queue/ - Remove multiple items from queue
  • POST /api/queue/start - Start download queue processing
  • POST /api/queue/stop - Stop download queue processing
  • POST /api/queue/pause - Pause queue processing
  • POST /api/queue/resume - Resume queue processing
  • POST /api/queue/reorder - Reorder pending queue items
  • DELETE /api/queue/completed - Clear completed downloads
  • POST /api/queue/retry - Retry failed downloads
  • GET /api/search?q={query} - Search for anime
  • POST /api/search/add - Add anime to library

Logging

Log Levels

  • INFO: General application information
  • WARNING: Potential issues that don't stop execution
  • ERROR: Errors that affect functionality
  • DEBUG: Detailed debugging information (development only)

Log Files

  • app.log: General application logs
  • download.log: Download-specific operations
  • error.log: Error and exception logs

Security Considerations

  • Master password protection for application access
  • Secure session management with JWT tokens
  • Input validation and sanitization
  • Rate limiting on API endpoints
  • HTTPS enforcement in production
  • Secure file path handling to prevent directory traversal

Authentication Service

  • A lightweight authentication service is provided by src/server/services/auth_service.py.
  • Uses bcrypt (passlib) to hash the master password and issues JWTs for stateless sessions. Tokens are signed with the JWT_SECRET_KEY from configuration and expire based on SESSION_TIMEOUT_HOURS.
  • Failed login attempts are tracked in-memory and a temporary lockout is applied after multiple failures. For multi-process deployments, move this state to a shared store (Redis) and persist the master password hash in a secure config store.

Recent Infrastructure Changes

Route Controller Refactoring (October 2025)

Restructured the FastAPI application to use a controller-based architecture for better code organization and maintainability.

Changes Made

  1. Created Controller Structure:

    • src/server/controllers/ - New directory for route controllers
    • src/server/controllers/__init__.py - Controllers package initialization
    • src/server/controllers/health_controller.py - Health check endpoints
    • src/server/controllers/page_controller.py - HTML page routes
    • src/server/controllers/error_controller.py - Error handling controllers
  2. Shared Template Configuration:

    • src/server/utils/templates.py - Centralized Jinja2 template configuration
    • Fixed template path resolution for proper template loading
  3. Main Application Updates:

    • src/server/fastapi_app.py - Refactored to use controller routers
    • Removed direct route definitions from main file
    • Added router inclusion using app.include_router()
    • Simplified error handlers to delegate to controller functions
  4. Fixed Import Issues:

    • Resolved circular import in src/core/__init__.py
    • Removed non-existent application module import

Controller Architecture

Anime Service Notes

  • The new anime_service runs the existing blocking SeriesApp inside a threadpool (via ThreadPoolExecutor). This keeps the FastAPI event loop responsive while leveraging the existing core logic.
  • A small in-process LRU cache is used for the frequently-read "missing episodes" list to reduce IO; cache invalidation happens after a rescan.
  • For multi-worker or multi-host deployments, move cache/state to a shared store (Redis) and ensure the threadpool sizing matches the worker's CPU and IO profile.

Health Controller (health_controller.py):

router = APIRouter(prefix="/health", tags=["health"])
@router.get("") - Health check endpoint

Page Controller (page_controller.py):

router = APIRouter(tags=["pages"])
@router.get("/") - Main application page
@router.get("/setup") - Setup page
@router.get("/login") - Login page
@router.get("/queue") - Download queue page

Error Controller (error_controller.py):

async def not_found_handler() - Custom 404 error handling
async def server_error_handler() - Custom 500 error handling

Benefits of the New Structure

  • Separation of Concerns: Each controller handles specific functionality
  • Modularity: Easy to add new controllers and routes
  • Testability: Controllers can be tested independently
  • Maintainability: Cleaner code organization and easier debugging
  • Scalability: Simple to extend with new features

Verified Working Endpoints

All endpoints tested and confirmed working:

  • Health: /health → Returns {"status": "healthy", ...}
  • Root: / → Serves main application page
  • Setup: /setup → Serves setup page
  • Auth API: /api/auth/* → Endpoints for setup, login, logout and status (JWT-based)
  • Login: /login → Serves login page
  • Queue: /queue → Serves download queue page

File Structure After Refactoring

src/server/
├── fastapi_app.py                 # Main FastAPI application (simplified)
├── controllers/                   # NEW: Route controllers
│   ├── __init__.py               # Controllers package
├── utils/
│   ├── dependencies.py          # Existing dependency injection
│   └── templates.py              # NEW: Shared Jinja2 template config
└── web/                          # Existing frontend assets
    ├── templates/                # HTML templates
    └── static/                   # CSS, JS, images

Authentication Middleware (October 2025)

An authentication middleware component was added to the FastAPI application to centralize token parsing and provide lightweight protection of authentication endpoints:

  • src/server/middleware/auth.py implements:
    • Bearer JWT parsing and session attachment to request.state.session
    • A simple per-IP in-memory rate limiter applied to /api/auth/login and /api/auth/setup (default 5 requests/minute)

Notes:

  • This is intentionally simple and designed for single-process deployments. For production use across multiple workers or hosts, replace the in-memory limiter with a distributed store (e.g. Redis) and add a persistent token revocation list if needed.

API Models and Contracts

  • Pydantic models living in src/server/models/ define the canonical API contracts used by FastAPI endpoints. These models are intentionally lightweight and focused on serialization, validation, and OpenAPI documentation generation.
  • Keep models stable: changes to model shapes are breaking changes for clients. Bump API versioning or provide migration layers when altering public response fields.
  • Infrastructure considerations: ensure the deployment environment has required libraries (e.g., pydantic) installed and that schema validation errors are logged to the centralized logging system. For high-throughput routes, consider response model caching at the application or reverse-proxy layer.

WebSocket Real-time Communication (October 2025)

A comprehensive WebSocket infrastructure was implemented to provide real-time updates for downloads, queue status, and system events:

  • File: src/server/services/websocket_service.py
  • Models: src/server/models/websocket.py
  • Endpoint: ws://host:port/ws/connect

WebSocket Service Architecture

  • ConnectionManager: Low-level connection lifecycle management

    • Connection registry with unique connection IDs
    • Room-based messaging for topic subscriptions
    • Automatic connection cleanup and health monitoring
    • Thread-safe operations with asyncio locks
  • WebSocketService: High-level application messaging

    • Convenient interface for broadcasting application events
    • Pre-defined message types for downloads, queue, and system events
    • Singleton pattern via get_websocket_service() factory

Supported Message Types

  • Download Events: download_progress, download_complete, download_failed
  • Queue Events: queue_status, queue_started, queue_stopped, queue_paused, queue_resumed
  • System Events: system_info, system_warning, system_error
  • Connection: connected, ping, pong, error

Room-Based Messaging

Clients can subscribe to specific topics (rooms) to receive targeted updates:

  • downloads room: All download-related events
  • Custom rooms: Can be added for specific features

Integration with Download Service

  • Download service automatically broadcasts progress updates via WebSocket
  • Broadcast callback registered during service initialization
  • Updates sent to all clients subscribed to the downloads room
  • No blocking of download operations (async broadcast)

Client Connection Flow

  1. Client connects to /ws/connect endpoint
  2. Server assigns unique connection ID and sends confirmation
  3. Client joins rooms (e.g., {"action": "join", "room": "downloads"})
  4. Server broadcasts updates to subscribed rooms
  5. Client disconnects (automatic cleanup)

Infrastructure Notes

  • Single-process: Current implementation uses in-memory connection storage
  • Production: For multi-worker/multi-host deployments:
    • Move connection registry to Redis or similar shared store
    • Implement pub/sub for cross-process message broadcasting
    • Add connection persistence for recovery after restarts
  • Monitoring: WebSocket status available at /ws/status endpoint
  • Security: Optional authentication via JWT (user_id tracking)
  • Testing: Comprehensive unit tests in tests/unit/test_websocket_service.py

Download Queue Models

  • Download queue models in src/server/models/download.py define the data structures for the download queue system.
  • Key models include:
    • DownloadItem: Represents a single queued download with metadata, progress tracking, and error information
    • QueueStatus: Overall queue state with active, pending, completed, and failed downloads
    • QueueStats: Aggregated statistics for monitoring queue performance
    • DownloadProgress: Real-time progress information (percent, speed, ETA)
    • DownloadRequest/DownloadResponse: API request/response contracts
  • Models enforce validation constraints (e.g., positive episode numbers, progress percentage 0-100, non-negative retry counts) and provide clean JSON serialization for API endpoints and WebSocket updates.

Download Queue Service

  • The download service (src/server/services/download_service.py) manages the complete lifecycle of anime episode downloads.
  • Core features:
    • Priority-based Queue: Items added with HIGH priority are processed first, NORMAL and LOW follow in FIFO order
    • Concurrent Processing: Configurable max concurrent downloads (default 2) to optimize bandwidth usage
    • Persistence: Queue state is automatically saved to data/download_queue.json and recovered on service restart
    • Retry Logic: Failed downloads are automatically retried up to a configurable limit (default 3 attempts) with exponential backoff
    • Progress Tracking: Real-time download progress with speed, percentage, and ETA calculations
    • WebSocket Integration: Broadcasts queue updates, progress, and completion/failure events to connected clients
  • Operations:
    • add_to_queue(): Add episodes to download queue with priority
    • remove_from_queue(): Cancel pending or active downloads
    • reorder_queue(): Manually adjust queue order for pending items
    • pause_queue()/resume_queue(): Control download processing
    • retry_failed(): Retry failed downloads with retry count checks
    • get_queue_status(): Get complete queue state (active, pending, completed, failed)
    • get_queue_stats(): Get aggregated statistics (counts, download size, speed)
  • Infrastructure notes:
    • Service uses ThreadPoolExecutor for concurrent download processing
    • Queue processor runs as async background task with configurable sleep intervals
    • Progress callbacks are executed in threadpool and broadcast via async WebSocket
    • For multi-process deployments, move queue state to shared store (Redis/DB) and implement distributed locking for concurrent access control
    • Singleton instance pattern used via get_download_service() factory
  • Testing: Comprehensive unit tests in tests/unit/test_download_service.py cover queue operations, persistence, retry logic, and error handling

Download Queue API Endpoints (October 2025)

Implemented comprehensive REST API endpoints for download queue management:

  • File: src/server/api/download.py
  • Router Prefix: /api/queue
  • Authentication: All endpoints require JWT authentication via require_auth dependency

Implemented Endpoints

  1. GET /api/queue/status - Retrieve complete queue status

    • Returns: QueueStatusResponse with status and statistics
    • Includes: active downloads, pending items, completed/failed items, queue stats
  2. POST /api/queue/add - Add episodes to download queue

    • Request: DownloadRequest with serie info, episodes, and priority
    • Returns: DownloadResponse with added item IDs
    • Validates episode list is non-empty
    • Supports HIGH, NORMAL, and LOW priority levels
  3. DELETE /api/queue/{item_id} - Remove single item from queue

    • Returns: 204 No Content on success, 404 if item not found
    • Cancels active downloads if necessary
  4. DELETE /api/queue/ - Remove multiple items (batch operation)

    • Request: QueueOperationRequest with list of item IDs
    • Returns: 204 No Content (partial success acceptable)
  5. POST /api/queue/start - Start queue processor

    • Idempotent operation (safe to call multiple times)
  6. POST /api/queue/stop - Stop queue processor

    • Waits for active downloads to complete (with timeout)
  7. POST /api/queue/pause - Pause queue processing

    • Active downloads continue, no new downloads start
  8. POST /api/queue/resume - Resume queue processing

  9. POST /api/queue/reorder - Reorder pending queue item

    • Request: QueueReorderRequest with item_id and new_position
    • Returns: 404 if item not in pending queue
  10. DELETE /api/queue/completed - Clear completed items from history

    • Returns count of cleared items
  11. POST /api/queue/retry - Retry failed downloads

    • Request: QueueOperationRequest with item IDs (empty for all)
    • Only retries items under max retry limit

Dependencies

  • get_download_service: Factory function providing singleton DownloadService instance
    • Automatically initializes AnimeService as dependency
    • Raises 503 if anime directory not configured
  • get_anime_service: Factory function providing singleton AnimeService instance
    • Required by DownloadService for anime operations
  • Both dependencies added to src/server/utils/dependencies.py

Error Handling

  • All endpoints return structured JSON error responses
  • HTTP status codes follow REST conventions (200, 201, 204, 400, 401, 404, 500, 503)
  • Service-level exceptions (DownloadServiceError) mapped to 400 Bad Request
  • Generic exceptions mapped to 500 Internal Server Error
  • Authentication errors return 401 Unauthorized

Testing

  • Comprehensive test suite in tests/api/test_download_endpoints.py
  • Tests cover:
    • Successful operations for all endpoints
    • Authentication requirements
    • Error conditions (empty lists, not found, service errors)
    • Priority handling
    • Batch operations
  • Uses pytest fixtures for authenticated client and mocked download service

Integration

  • Router registered in src/server/fastapi_app.py via app.include_router(download_router)
  • Follows same patterns as other API routers (auth, anime, config)
  • Full OpenAPI documentation available at /api/docs

WebSocket Real-time Updates (October 2025)

Implemented real-time progress tracking and WebSocket broadcasting for downloads, scans, and system events.

ProgressService

File: src/server/services/progress_service.py

A centralized service for tracking and broadcasting real-time progress updates across the application.

Key Features:

  • Track multiple concurrent progress operations (downloads, scans, queue changes)
  • Automatic progress percentage calculation
  • Progress lifecycle management (start, update, complete, fail, cancel)
  • WebSocket integration for real-time client updates
  • Progress history with configurable size limit (default: 50 items)
  • Thread-safe operations using asyncio locks
  • Support for progress metadata and custom messages

Progress Types:

  • DOWNLOAD - File download progress
  • SCAN - Library scan progress
  • QUEUE - Queue operation progress
  • SYSTEM - System-level operations
  • ERROR - Error notifications

Progress Statuses:

  • STARTED - Operation initiated
  • IN_PROGRESS - Operation in progress
  • COMPLETED - Successfully completed
  • FAILED - Operation failed
  • CANCELLED - Cancelled by user

Core Methods:

  • start_progress() - Initialize new progress operation
  • update_progress() - Update progress with current/total values
  • complete_progress() - Mark operation as completed
  • fail_progress() - Mark operation as failed
  • cancel_progress() - Cancel ongoing operation
  • get_progress() - Retrieve progress by ID
  • get_all_active_progress() - Get all active operations (optionally filtered by type)

Broadcasting:

  • Integrates with WebSocketService via callback
  • Broadcasts to room-specific channels (e.g., download_progress, scan_progress)
  • Configurable broadcast throttling (only on significant changes >1% or forced)
  • Automatic progress state serialization to JSON

Singleton Pattern:

  • Global instance via get_progress_service() factory
  • Initialized during application startup with WebSocket callback

Integration with Services

DownloadService Integration:

  • Progress tracking for each download item
  • Real-time progress updates during file download
  • Automatic completion/failure notifications
  • Progress metadata includes speed, ETA, downloaded bytes

AnimeService Integration:

  • Progress tracking for library scans
  • Scan progress with current/total file counts
  • Scan completion with statistics
  • Error notifications on scan failures

WebSocket Message Models

File: src/server/models/websocket.py

Added progress-specific message models:

  • ScanProgressMessage - Scan progress updates
  • ScanCompleteMessage - Scan completion notification
  • ScanFailedMessage - Scan failure notification
  • ErrorNotificationMessage - Critical error notifications
  • ProgressUpdateMessage - Generic progress updates

WebSocket Message Types:

  • SCAN_PROGRESS - Scan progress updates
  • SCAN_COMPLETE - Scan completion
  • SCAN_FAILED - Scan failure
  • Extended existing types for downloads and queue updates

WebSocket Rooms

Clients can subscribe to specific progress channels:

  • download_progress - Download progress updates
  • scan_progress - Library scan updates
  • queue_progress - Queue operation updates
  • system_progress - System-level updates

Room subscription via client messages:

{
    "action": "join",
    "room": "download_progress"
}

Application Startup

File: src/server/fastapi_app.py

Progress service initialized on application startup:

  1. Get ProgressService singleton instance
  2. Get WebSocketService singleton instance
  3. Register broadcast callback to link progress updates with WebSocket
  4. Callback broadcasts progress messages to appropriate rooms

Testing

File: tests/unit/test_progress_service.py

Comprehensive test coverage including:

  • Progress lifecycle operations (start, update, complete, fail, cancel)
  • Percentage calculation accuracy
  • History management and size limits
  • Broadcast callback invocation
  • Concurrent progress operations
  • Metadata handling
  • Error conditions and edge cases

Architecture Benefits

  • Decoupling: ProgressService decouples progress tracking from WebSocket broadcasting
  • Reusability: Single service used across all application components
  • Scalability: Supports multiple concurrent operations efficiently
  • Observability: Centralized progress tracking simplifies monitoring
  • Real-time UX: Instant feedback to users on all long-running operations

Future Enhancements

  • Persistent progress history (database storage)
  • Progress rate calculation and trend analysis
  • Multi-process progress synchronization (Redis/shared store)
  • Progress event hooks for custom actions
  • Client-side progress resumption after reconnection