- Add SERIES_LOADING_UPDATE WebSocket event - Update series cards to display loading indicators - Add real-time status updates via WebSocket - Include progress tracking (episodes, NFO, logo, images) - Add CSS styling for loading states - Implement updateSeriesLoadingStatus function
50 KiB
Aniworld Web Application Development Instructions
This document provides detailed tasks for AI agents to implement a modern web application for the Aniworld anime download manager. All tasks should follow the coding guidelines specified in the project's copilot instructions.
Project Overview
The goal is to create a FastAPI-based web application that provides a modern interface for the existing Aniworld anime download functionality. The core anime logic should remain in SeriesApp.py while the web layer provides REST API endpoints and a responsive UI.
Architecture Principles
- Single Responsibility: Each file/class has one clear purpose
- Dependency Injection: Use FastAPI's dependency system
- Clean Separation: Web layer calls core logic, never the reverse
- File Size Limit: Maximum 500 lines per file
- Type Hints: Use comprehensive type annotations
- Error Handling: Proper exception handling and logging
Additional Implementation Guidelines
Code Style and Standards
- Type Hints: Use comprehensive type annotations throughout all modules
- Docstrings: Follow PEP 257 for function and class documentation
- Error Handling: Implement custom exception classes with meaningful messages
- Logging: Use structured logging with appropriate log levels
- Security: Validate all inputs and sanitize outputs
- Performance: Use async/await patterns for I/O operations
📞 Escalation
If you encounter:
- Architecture issues requiring design decisions
- Tests that conflict with documented requirements
- Breaking changes needed
- Unclear requirements or expectations
Document the issue and escalate rather than guessing.
<EFBFBD> Credentials
Admin Login:
- Username:
admin - Password:
Hallo123!
<EFBFBD>📚 Helpful Commands
# Run all tests
conda run -n AniWorld python -m pytest tests/ -v --tb=short
# Run specific test file
conda run -n AniWorld python -m pytest tests/unit/test_websocket_service.py -v
# Run specific test class
conda run -n AniWorld python -m pytest tests/unit/test_websocket_service.py::TestWebSocketService -v
# Run specific test
conda run -n AniWorld python -m pytest tests/unit/test_websocket_service.py::TestWebSocketService::test_broadcast_download_progress -v
# Run with extra verbosity
conda run -n AniWorld python -m pytest tests/ -vv
# Run with full traceback
conda run -n AniWorld python -m pytest tests/ -v --tb=long
# Run and stop at first failure
conda run -n AniWorld python -m pytest tests/ -v -x
# Run tests matching pattern
conda run -n AniWorld python -m pytest tests/ -v -k "auth"
# Show all print statements
conda run -n AniWorld python -m pytest tests/ -v -s
#Run app
conda run -n AniWorld python -m uvicorn src.server.fastapi_app:app --host 127.0.0.1 --port 8000 --reload
Implementation Notes
- Incremental Development: Implement features incrementally, testing each component thoroughly before moving to the next
- Code Review: Review all generated code for adherence to project standards
- Documentation: Document all public APIs and complex logic
- Testing: Maintain test coverage above 80% for all new code
- Performance: Profile and optimize critical paths, especially download and streaming operations
- Security: Regular security audits and dependency updates
- Monitoring: Implement comprehensive monitoring and alerting
- Maintenance: Plan for regular maintenance and updates
Task Completion Checklist
For each task completed:
- Implementation follows coding standards
- Unit tests written and passing
- Integration tests passing
- Documentation updated
- Error handling implemented
- Logging added
- Security considerations addressed
- Performance validated
- Code reviewed
- Task marked as complete in instructions.md
- Infrastructure.md updated and other docs
- Changes committed to git; keep your messages in git short and clear
- Take the next task
TODO List:
Task: Implement Asynchronous Series Data Loading with Background Processing
Priority: High
Status: ✅ Completed
Implementation Summary
Successfully implemented asynchronous series data loading with background processing. The system allows users to add series immediately while metadata (episodes, NFO files, logos, images) loads asynchronously in the background.
Completed Items:
- ✅ Architecture document created with detailed component diagrams
- ✅ Database schema updated with loading status fields
- ✅ BackgroundLoaderService created with task queue and worker
- ✅ API endpoints updated (POST returns 202 Accepted, GET loading-status added)
- ✅ Startup check for incomplete series implemented
- ✅ Graceful shutdown handling for background tasks
- ✅ Database migration script created and tested
- ✅ Unit tests written and passing (10 tests, 100% pass rate)
- ✅ Frontend UI updates for loading indicators and WebSocket integration
- ✅ Git commit with clear message
Key Features Implemented:
- Immediate Series Addition: POST /api/anime/add returns 202 Accepted immediately
- Background Processing: Tasks queued and processed asynchronously
- Status Tracking: GET /api/anime/{key}/loading-status endpoint for real-time status
- Startup Validation: Checks for incomplete series on app startup
- WebSocket Integration: Real-time status updates via existing WebSocket service
- Clean Architecture: Reuses existing services, no code duplication
- Frontend UI: Loading indicators with progress tracking on series cards
- Real-time Updates: WebSocket handlers update UI as loading progresses
Remaining Work:
- Integration tests for complete flow (Task 9 in instructions)
- Manual end-to-end testing
Files Created:
docs/architecture/async_loading_architecture.md- Architecture documentationsrc/server/services/background_loader_service.py- Main service (481 lines)scripts/migrate_loading_status.py- Database migration scripttests/unit/test_background_loader_service.py- Unit tests (10 tests)
Files Modified:
src/server/database/models.py- Added loading status fields to AnimeSeriessrc/server/database/service.py- Updated AnimeSeriesService.create()src/server/api/anime.py- Updated POST /add, added GET loading-statussrc/server/fastapi_app.py- Added startup/shutdown integrationsrc/server/utils/dependencies.py- Added BackgroundLoaderService dependencysrc/server/web/static/js/shared/constants.js- Added SERIES_LOADING_UPDATE eventsrc/server/web/static/js/index/series-manager.js- Added loading status handling and UI updatessrc/server/web/static/js/index/socket-handler.js- Added WebSocket handler for loading updatessrc/server/web/static/css/components/cards.css- Added loading indicator styles
Overview
Implement a background loading system for series metadata (episodes, NFO files, logos, images) that allows users to add series immediately while data loads asynchronously. This improves UX by not blocking the user during time-consuming metadata operations.
Requirements
-
Immediate Series Addition
- When a user adds a series, return success immediately and show the series in the UI
- Mark the series with a loading status indicator
- Background task should start loading missing data automatically
-
Loading Status Indicators
- Display visual feedback showing which data is still loading (episodes, NFO, logo, images)
- Show progress information (e.g., "Loading episodes...", "Generating NFO...", "Downloading images...")
- Update UI in real-time as data becomes available
- Use WebSocket for real-time status updates
-
Startup Data Validation
- On application startup, check all existing series for missing/incomplete data
- Create a queue of series that need data loading
- Start background loading process for incomplete series
- Log which series require data loading
-
Background Processing Architecture
- Use async task queue (asyncio.Queue or similar) for background operations
- Implement worker pool to process loading tasks concurrently (with rate limiting)
- Ensure graceful shutdown of background tasks
- Handle errors without blocking other series from loading
Implementation Steps
Step 0: Architecture Planning and Code Review
Before implementation, conduct thorough architecture planning to avoid code duplication and ensure clean integration:
-
Review Existing Codebase
- Examine current
SeriesServiceimplementation - Check existing episode loading logic in
SeriesApp.py - Review NFO generation service
- Identify image/logo downloading mechanisms
- Document existing WebSocket patterns
- Examine current
-
Identify Reusable Components
- Map existing functions that can be reused vs. need refactoring
- Check for duplicate loading logic across services
- Identify common patterns for metadata operations
- Document interfaces that need to be preserved
-
Architecture Design
- Design BackgroundLoaderService interface
- Plan dependency injection strategy
- Define clear service boundaries
- Design data flow: API → Service → BackgroundLoader → Core Logic
- Plan error propagation strategy
- Design status update mechanism
-
Code Duplication Prevention
- Ensure BackgroundLoaderService calls existing methods rather than reimplementing
- Create adapter layer if needed to wrap existing synchronous code
- Extract common patterns into shared utilities
- Define clear contracts between services
-
Database Schema Review
- Check existing Series model fields
- Plan migration strategy for new fields
- Ensure backward compatibility
- Document field naming conventions
-
Integration Points
- Map all touchpoints with existing services
- Plan WebSocket message format (check existing patterns)
- Ensure API endpoint consistency
- Review authentication/authorization flow
-
Create Architecture Document
Create: docs/architecture/async_loading_architecture.md Include: - Component diagram - Sequence diagram for add series flow - Service interaction map - Data model changes - API contract definitions - Error handling strategy - Code reuse strategy -
Refactoring Plan
- Identify code that needs to be extracted before implementing new features
- Plan refactoring of existing services if needed
- Document any breaking changes
- Create refactoring tasks
-
Validation Checklist
- No duplicate loading logic between BackgroundLoaderService and existing services
- Clear separation of concerns
- Existing functionality not broken
- New services follow project patterns
- API design consistent with existing endpoints
- Database changes are backward compatible
- All integration points documented
- Error handling consistent across services
Key Questions to Answer:
- Does
SeriesServicealready have episode loading? If yes, reuse it. - How is NFO currently generated? Wrap existing logic, don't duplicate.
- Where are images downloaded? Use existing service.
- What WebSocket message format is already in use? Follow the pattern.
- Are there any existing background task patterns? Align with them.
Deliverables:
- Architecture document with diagrams
- List of existing methods to reuse
- List of new methods to create
- Refactoring tasks (if any)
- Database migration plan
- API specification
Estimated Time: 2-3 hours
Note: Do not proceed to implementation until this planning phase is complete and reviewed. This prevents code duplication and ensures clean architecture.
Step 1: Create Background Task Service
Create src/server/services/background_loader_service.py:
from typing import Optional, Dict, List
from asyncio import Queue, Task, create_task
from datetime import datetime
from enum import Enum
class LoadingStatus(Enum):
PENDING = "pending"
LOADING_EPISODES = "loading_episodes"
LOADING_NFO = "loading_nfo"
LOADING_IMAGES = "loading_images"
LOADING_LOGO = "loading_logo"
COMPLETED = "completed"
FAILED = "failed"
class SeriesLoadingTask:
series_id: str
status: LoadingStatus
progress: Dict[str, bool] # {episodes, nfo, logo, images}
started_at: datetime
completed_at: Optional[datetime]
error: Optional[str]
class BackgroundLoaderService:
"""
Service for managing background loading of series metadata.
Handles queuing, processing, and status tracking.
"""
def __init__(self, websocket_service, series_service):
self.task_queue: Queue[SeriesLoadingTask] = Queue()
self.active_tasks: Dict[str, SeriesLoadingTask] = {}
self.worker_task: Optional[Task] = None
self.websocket_service = websocket_service
self.series_service = series_service
async def start(self):
"""Start background worker."""
async def stop(self):
"""Stop background worker gracefully."""
async def add_series_loading_task(self, series_id: str):
"""Add a series to the loading queue."""
async def check_missing_data(self, series_id: str) -> Dict[str, bool]:
"""Check what data is missing for a series."""
async def _worker(self):
"""Background worker that processes loading tasks."""
async def _load_series_data(self, task: SeriesLoadingTask):
"""Load all missing data for a series."""
Key Features:
- Task queue for managing loading operations
- Status tracking for each series
- WebSocket integration for real-time updates
- Error handling and retry logic
- Concurrent loading with rate limiting
Step 2: Update Series Service
Modify src/server/services/series_service.py:
async def add_series_async(
self,
series_name: str,
background_loader: BackgroundLoaderService
) -> Dict[str, Any]:
"""
Add series immediately and queue background data loading.
Returns:
dict: Series info with loading_status field
"""
# 1. Create series entry with minimal data
# 2. Save to database with status="loading"
# 3. Queue background loading task
# 4. Return series info immediately
async def get_series_loading_status(self, series_id: str) -> Dict[str, Any]:
"""
Get current loading status for a series.
Returns:
dict: Status info including what's loaded and what's pending
"""
Step 3: Create API Endpoints
Create/update src/server/api/routes/series.py:
@router.post("/series", status_code=202)
async def add_series(
series_request: SeriesAddRequest,
background_loader: BackgroundLoaderService = Depends(get_background_loader),
series_service: SeriesService = Depends(get_series_service)
) -> SeriesResponse:
"""
Add a new series. Returns immediately with loading status.
Data will be loaded in background.
Returns 202 Accepted to indicate async processing.
"""
@router.get("/series/{series_id}/loading-status")
async def get_loading_status(
series_id: str,
background_loader: BackgroundLoaderService = Depends(get_background_loader)
) -> LoadingStatusResponse:
"""
Get current loading status for a series.
"""
Step 4: Add WebSocket Support
Update src/server/services/websocket_service.py:
async def broadcast_loading_status(
self,
series_id: str,
status: LoadingStatus,
progress: Dict[str, bool],
message: str
):
"""
Broadcast loading status updates to all connected clients.
Message format:
{
"type": "series_loading_update",
"series_id": "...",
"status": "loading_episodes",
"progress": {
"episodes": false,
"nfo": false,
"logo": false,
"images": false
},
"message": "Loading episodes...",
"timestamp": "2026-01-18T10:30:00Z"
}
"""
Step 5: Add Startup Data Check
Update src/server/fastapi_app.py:
@app.on_event("startup")
async def startup_event():
"""
Initialize application and check for incomplete series data.
"""
# 1. Initialize background loader service
# 2. Scan all series in database
# 3. Check for missing data (episodes, NFO, logo, images)
# 4. Queue loading tasks for incomplete series
# 5. Log summary of queued tasks
logger.info("Starting application...")
# Initialize services
background_loader = BackgroundLoaderService(websocket_service, series_service)
await background_loader.start()
# Check existing series
series_list = await series_service.get_all_series()
incomplete_series = []
for series in series_list:
missing_data = await background_loader.check_missing_data(series["id"])
if any(missing_data.values()):
incomplete_series.append(series["id"])
await background_loader.add_series_loading_task(series["id"])
if incomplete_series:
logger.info(
f"Found {len(incomplete_series)} series with missing data. "
f"Starting background loading..."
)
else:
logger.info("All series data is complete.")
@app.on_event("shutdown")
async def shutdown_event():
"""
Gracefully shutdown background tasks.
"""
await background_loader.stop()
logger.info("Application shutdown complete.")
Step 6: Update Database Schema
Add loading status fields to series table:
# In src/server/database/models.py
class Series(Base):
__tablename__ = "series"
# ... existing fields ...
# New fields for loading status
loading_status = Column(String, default="completed") # pending, loading, completed, failed
episodes_loaded = Column(Boolean, default=False)
nfo_loaded = Column(Boolean, default=False)
logo_loaded = Column(Boolean, default=False)
images_loaded = Column(Boolean, default=False)
loading_started_at = Column(DateTime, nullable=True)
loading_completed_at = Column(DateTime, nullable=True)
loading_error = Column(String, nullable=True)
Create migration script:
# In migrations/add_loading_status.py
# Add migration to add new columns to series table
Step 7: Update Frontend UI
Update frontend to show loading status:
// In src/server/web/static/js/series.js
function updateSeriesLoadingStatus(data) {
const seriesCard = document.querySelector(
`[data-series-id="${data.series_id}"]`,
);
if (!seriesCard) return;
// Update loading indicator
const loadingIndicator = seriesCard.querySelector(".loading-indicator");
if (data.status === "completed") {
loadingIndicator.style.display = "none";
} else {
loadingIndicator.style.display = "block";
loadingIndicator.innerHTML = `
<div class="loading-status">
<span class="status-text">${data.message}</span>
<div class="progress-items">
${getProgressHTML(data.progress)}
</div>
</div>
`;
}
}
function getProgressHTML(progress) {
const items = [
{ key: "episodes", label: "Episodes" },
{ key: "nfo", label: "NFO" },
{ key: "logo", label: "Logo" },
{ key: "images", label: "Images" },
];
return items
.map(
(item) => `
<div class="progress-item ${progress[item.key] ? "completed" : "pending"}">
<span class="icon">${progress[item.key] ? "✓" : "⋯"}</span>
<span class="label">${item.label}</span>
</div>
`,
)
.join("");
}
// WebSocket handler
websocket.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === "series_loading_update") {
updateSeriesLoadingStatus(data);
}
};
Add CSS for loading indicators:
/* In src/server/web/static/css/styles.css */
.loading-indicator {
background: var(--surface-secondary);
border-radius: 8px;
padding: 12px;
margin-top: 8px;
}
.loading-status {
display: flex;
flex-direction: column;
gap: 8px;
}
.status-text {
font-size: 0.9em;
color: var(--text-secondary);
}
.progress-items {
display: flex;
gap: 12px;
flex-wrap: wrap;
}
.progress-item {
display: flex;
align-items: center;
gap: 4px;
font-size: 0.85em;
}
.progress-item.completed {
color: var(--success-color);
}
.progress-item.pending {
color: var(--text-tertiary);
}
.progress-item .icon {
font-size: 1.2em;
}
Testing Requirements
Step 8: Create Unit Tests
Create tests/unit/test_background_loader_service.py:
"""
Unit tests for BackgroundLoaderService.
Tests task queuing, status tracking, and worker logic in isolation.
"""
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock, patch
from datetime import datetime
from src.server.services.background_loader_service import (
BackgroundLoaderService,
LoadingStatus,
SeriesLoadingTask
)
@pytest.fixture
def mock_websocket_service():
"""Mock WebSocket service."""
service = Mock()
service.broadcast_loading_status = AsyncMock()
return service
@pytest.fixture
def mock_series_service():
"""Mock series service."""
service = Mock()
service.load_episodes = AsyncMock()
service.load_nfo = AsyncMock()
service.load_logo = AsyncMock()
service.load_images = AsyncMock()
service.get_series = AsyncMock(return_value={"id": "test-series", "name": "Test Series"})
service.update_series_loading_status = AsyncMock()
return service
@pytest.fixture
async def background_loader(mock_websocket_service, mock_series_service):
"""Create BackgroundLoaderService instance."""
service = BackgroundLoaderService(
websocket_service=mock_websocket_service,
series_service=mock_series_service
)
yield service
await service.stop()
class TestBackgroundLoaderService:
"""Test suite for BackgroundLoaderService."""
@pytest.mark.asyncio
async def test_service_initialization(self, background_loader):
"""Test service initializes correctly."""
assert background_loader.task_queue is not None
assert isinstance(background_loader.active_tasks, dict)
assert len(background_loader.active_tasks) == 0
@pytest.mark.asyncio
async def test_start_worker(self, background_loader):
"""Test worker starts successfully."""
await background_loader.start()
assert background_loader.worker_task is not None
assert not background_loader.worker_task.done()
@pytest.mark.asyncio
async def test_stop_worker_gracefully(self, background_loader):
"""Test worker stops gracefully."""
await background_loader.start()
await background_loader.stop()
assert background_loader.worker_task.done()
@pytest.mark.asyncio
async def test_add_series_loading_task(self, background_loader):
"""Test adding a series to the loading queue."""
series_id = "test-series-123"
await background_loader.add_series_loading_task(series_id)
# Verify task was added to queue
assert not background_loader.task_queue.empty()
# Verify task in active tasks
assert series_id in background_loader.active_tasks
task = background_loader.active_tasks[series_id]
assert task.series_id == series_id
assert task.status == LoadingStatus.PENDING
@pytest.mark.asyncio
async def test_check_missing_data_all_missing(self, mock_series_service):
"""Test checking for missing data when all data is missing."""
mock_series_service.get_series.return_value = {
"id": "test-series",
"episodes_loaded": False,
"nfo_loaded": False,
"logo_loaded": False,
"images_loaded": False
}
loader = BackgroundLoaderService(Mock(), mock_series_service)
missing_data = await loader.check_missing_data("test-series")
assert missing_data["episodes"] is True
assert missing_data["nfo"] is True
assert missing_data["logo"] is True
assert missing_data["images"] is True
@pytest.mark.asyncio
async def test_check_missing_data_partial(self, mock_series_service):
"""Test checking for missing data when some data exists."""
mock_series_service.get_series.return_value = {
"id": "test-series",
"episodes_loaded": True,
"nfo_loaded": False,
"logo_loaded": True,
"images_loaded": False
}
loader = BackgroundLoaderService(Mock(), mock_series_service)
missing_data = await loader.check_missing_data("test-series")
assert missing_data["episodes"] is False
assert missing_data["nfo"] is True
assert missing_data["logo"] is False
assert missing_data["images"] is True
@pytest.mark.asyncio
async def test_check_missing_data_all_complete(self, mock_series_service):
"""Test checking for missing data when all data is complete."""
mock_series_service.get_series.return_value = {
"id": "test-series",
"episodes_loaded": True,
"nfo_loaded": True,
"logo_loaded": True,
"images_loaded": True
}
loader = BackgroundLoaderService(Mock(), mock_series_service)
missing_data = await loader.check_missing_data("test-series")
assert all(not value for value in missing_data.values())
@pytest.mark.asyncio
async def test_load_series_data_success(
self,
background_loader,
mock_websocket_service,
mock_series_service
):
"""Test successful loading of series data."""
task = SeriesLoadingTask()
task.series_id = "test-series"
task.status = LoadingStatus.PENDING
task.progress = {"episodes": False, "nfo": False, "logo": False, "images": False}
await background_loader._load_series_data(task)
# Verify all loading methods were called
mock_series_service.load_episodes.assert_called_once()
mock_series_service.load_nfo.assert_called_once()
mock_series_service.load_logo.assert_called_once()
mock_series_service.load_images.assert_called_once()
# Verify WebSocket broadcasts were sent
assert mock_websocket_service.broadcast_loading_status.call_count >= 4
# Verify task status is completed
assert task.status == LoadingStatus.COMPLETED
assert all(task.progress.values())
@pytest.mark.asyncio
async def test_load_series_data_with_errors(
self,
background_loader,
mock_websocket_service,
mock_series_service
):
"""Test loading series data when some operations fail."""
# Make NFO loading fail
mock_series_service.load_nfo.side_effect = Exception("NFO service error")
task = SeriesLoadingTask()
task.series_id = "test-series"
task.status = LoadingStatus.PENDING
task.progress = {"episodes": False, "nfo": False, "logo": False, "images": False}
await background_loader._load_series_data(task)
# Verify task status is failed
assert task.status == LoadingStatus.FAILED
assert task.error is not None
assert "NFO" in task.error
@pytest.mark.asyncio
async def test_concurrent_task_processing(self, background_loader):
"""Test processing multiple tasks concurrently."""
series_ids = ["series-1", "series-2", "series-3"]
await background_loader.start()
# Add multiple tasks
for series_id in series_ids:
await background_loader.add_series_loading_task(series_id)
# Wait for processing
await asyncio.sleep(0.5)
# Verify all tasks were processed
for series_id in series_ids:
assert series_id in background_loader.active_tasks
@pytest.mark.asyncio
async def test_task_queue_order(self, background_loader):
"""Test that tasks are processed in FIFO order."""
processed_order = []
async def mock_load(task):
processed_order.append(task.series_id)
background_loader._load_series_data = mock_load
await background_loader.start()
# Add tasks in specific order
await background_loader.add_series_loading_task("series-1")
await background_loader.add_series_loading_task("series-2")
await background_loader.add_series_loading_task("series-3")
# Wait for processing
await asyncio.sleep(0.5)
# Verify FIFO order
assert processed_order == ["series-1", "series-2", "series-3"]
@pytest.mark.asyncio
async def test_duplicate_task_handling(self, background_loader):
"""Test that duplicate tasks for same series are handled correctly."""
series_id = "test-series"
await background_loader.add_series_loading_task(series_id)
await background_loader.add_series_loading_task(series_id)
# Verify only one task exists
assert len([k for k in background_loader.active_tasks if k == series_id]) == 1
@pytest.mark.asyncio
async def test_rate_limiting(self, background_loader, mock_series_service):
"""Test rate limiting to avoid overwhelming external APIs."""
# Add multiple tasks quickly
for i in range(10):
await background_loader.add_series_loading_task(f"series-{i}")
await background_loader.start()
# Wait and verify rate limiting is applied
start_time = datetime.now()
await asyncio.sleep(1)
# Verify not all tasks completed instantly (rate limiting applied)
# This is a simple check; real implementation should be more sophisticated
assert len([t for t in background_loader.active_tasks.values()
if t.status != LoadingStatus.COMPLETED]) > 0
class TestSeriesLoadingTask:
"""Test SeriesLoadingTask model."""
def test_task_initialization(self):
"""Test task initializes with correct defaults."""
task = SeriesLoadingTask()
task.series_id = "test"
task.status = LoadingStatus.PENDING
task.progress = {"episodes": False, "nfo": False, "logo": False, "images": False}
assert task.series_id == "test"
assert task.status == LoadingStatus.PENDING
assert not any(task.progress.values())
def test_task_progress_tracking(self):
"""Test progress tracking updates correctly."""
task = SeriesLoadingTask()
task.progress = {"episodes": False, "nfo": False, "logo": False, "images": False}
task.progress["episodes"] = True
assert task.progress["episodes"] is True
assert not task.progress["nfo"]
Create tests/unit/test_series_service_async.py:
"""
Unit tests for async series operations in SeriesService.
"""
import pytest
from unittest.mock import Mock, AsyncMock, patch
from src.server.services.series_service import SeriesService
@pytest.fixture
def mock_background_loader():
"""Mock background loader service."""
loader = Mock()
loader.add_series_loading_task = AsyncMock()
loader.check_missing_data = AsyncMock(return_value={
"episodes": False,
"nfo": False,
"logo": False,
"images": False
})
return loader
class TestSeriesServiceAsync:
"""Test async series operations."""
@pytest.mark.asyncio
async def test_add_series_async_immediate_return(
self,
mock_background_loader,
mock_db_session
):
"""Test that add_series_async returns immediately."""
service = SeriesService(db=mock_db_session)
result = await service.add_series_async(
series_name="Test Series",
background_loader=mock_background_loader
)
# Verify series was created with loading status
assert result["name"] == "Test Series"
assert result["loading_status"] == "loading"
assert "id" in result
# Verify background task was queued
mock_background_loader.add_series_loading_task.assert_called_once()
@pytest.mark.asyncio
async def test_add_series_async_minimal_data(
self,
mock_background_loader,
mock_db_session
):
"""Test that minimal data is saved initially."""
service = SeriesService(db=mock_db_session)
result = await service.add_series_async(
series_name="Test Series",
background_loader=mock_background_loader
)
# Verify only basic info is present
assert result["name"] == "Test Series"
assert result["episodes_loaded"] is False
assert result["nfo_loaded"] is False
assert result["logo_loaded"] is False
assert result["images_loaded"] is False
@pytest.mark.asyncio
async def test_get_series_loading_status(self, mock_db_session):
"""Test retrieving loading status for a series."""
service = SeriesService(db=mock_db_session)
# Mock series data
mock_series = {
"id": "test-series",
"name": "Test Series",
"loading_status": "loading_episodes",
"episodes_loaded": False,
"nfo_loaded": True,
"logo_loaded": True,
"images_loaded": False
}
with patch.object(service, 'get_series', return_value=mock_series):
status = await service.get_series_loading_status("test-series")
assert status["loading_status"] == "loading_episodes"
assert status["progress"]["nfo"] is True
assert status["progress"]["episodes"] is False
Step 9: Create Integration Tests
Create tests/integration/test_async_series_loading.py:
"""
Integration tests for asynchronous series data loading.
Tests the complete flow from API to database with WebSocket notifications.
"""
import pytest
import asyncio
from httpx import AsyncClient
from unittest.mock import patch, AsyncMock
from src.server.fastapi_app import app
from src.server.database.models import Series
@pytest.fixture
async def async_client():
"""Create async test client."""
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
class TestAsyncSeriesLoading:
"""Integration tests for async series loading."""
@pytest.mark.asyncio
async def test_add_series_returns_202(self, async_client, auth_headers):
"""Test POST /series returns 202 Accepted."""
response = await async_client.post(
"/api/series",
json={"name": "Test Series"},
headers=auth_headers
)
assert response.status_code == 202
data = response.json()
assert data["name"] == "Test Series"
assert data["loading_status"] == "loading"
assert "id" in data
@pytest.mark.asyncio
async def test_series_immediately_visible(
self,
async_client,
auth_headers,
test_db
):
"""Test series is immediately visible in list."""
# Add series
add_response = await async_client.post(
"/api/series",
json={"name": "Test Series"},
headers=auth_headers
)
series_id = add_response.json()["id"]
# Immediately get series list
list_response = await async_client.get(
"/api/series",
headers=auth_headers
)
assert list_response.status_code == 200
series_list = list_response.json()
assert any(s["id"] == series_id for s in series_list)
@pytest.mark.asyncio
async def test_loading_status_endpoint(
self,
async_client,
auth_headers
):
"""Test GET /series/{id}/loading-status endpoint."""
# Add series
add_response = await async_client.post(
"/api/series",
json={"name": "Test Series"},
headers=auth_headers
)
series_id = add_response.json()["id"]
# Get loading status
status_response = await async_client.get(
f"/api/series/{series_id}/loading-status",
headers=auth_headers
)
assert status_response.status_code == 200
status = status_response.json()
assert "loading_status" in status
assert "progress" in status
assert all(key in status["progress"] for key in ["episodes", "nfo", "logo", "images"])
@pytest.mark.asyncio
async def test_background_loading_completes(
self,
async_client,
auth_headers,
test_db
):
"""Test that background loading completes successfully."""
# Add series
add_response = await async_client.post(
"/api/series",
json={"name": "Test Series"},
headers=auth_headers
)
series_id = add_response.json()["id"]
# Wait for background loading
max_wait = 30 # seconds
for _ in range(max_wait):
status_response = await async_client.get(
f"/api/series/{series_id}/loading-status",
headers=auth_headers
)
status = status_response.json()
if status["loading_status"] == "completed":
break
await asyncio.sleep(1)
# Verify all data loaded
assert status["loading_status"] == "completed"
assert all(status["progress"].values())
@pytest.mark.asyncio
async def test_websocket_status_updates(
self,
async_client,
auth_headers,
websocket_client
):
"""Test WebSocket broadcasts loading status updates."""
received_updates = []
# Connect WebSocket
async with websocket_client.connect("/ws") as websocket:
# Add series
add_response = await async_client.post(
"/api/series",
json={"name": "Test Series"},
headers=auth_headers
)
series_id = add_response.json()["id"]
# Collect WebSocket messages for 5 seconds
try:
async with asyncio.timeout(5):
while True:
message = await websocket.receive_json()
if message.get("type") == "series_loading_update":
if message.get("series_id") == series_id:
received_updates.append(message)
except asyncio.TimeoutError:
pass
# Verify updates were received
assert len(received_updates) > 0
assert any(u["status"] == "loading_episodes" for u in received_updates)
@pytest.mark.asyncio
async def test_database_status_persistence(
self,
async_client,
auth_headers,
test_db
):
"""Test loading status is persisted to database."""
# Add series
add_response = await async_client.post(
"/api/series",
json={"name": "Test Series"},
headers=auth_headers
)
series_id = add_response.json()["id"]
await asyncio.sleep(1)
# Query database directly
series = test_db.query(Series).filter(Series.id == series_id).first()
assert series is not None
assert series.loading_status in ["loading", "completed", "loading_episodes",
"loading_nfo", "loading_logo", "loading_images"]
assert series.loading_started_at is not None
@pytest.mark.asyncio
async def test_startup_incomplete_series_check(
self,
test_db,
mock_app_startup
):
"""Test startup checks for incomplete series."""
# Create series with missing data
series = Series(
id="incomplete-series",
name="Incomplete Series",
episodes_loaded=True,
nfo_loaded=False,
logo_loaded=True,
images_loaded=False,
loading_status="loading"
)
test_db.add(series)
test_db.commit()
# Trigger startup event
with patch('src.server.fastapi_app.logger') as mock_logger:
await mock_app_startup()
# Verify incomplete series were logged
assert mock_logger.info.called
log_messages = [call[0][0] for call in mock_logger.info.call_args_list]
assert any("missing data" in msg.lower() for msg in log_messages)
@pytest.mark.asyncio
async def test_error_handling_during_loading(
self,
async_client,
auth_headers
):
"""Test error handling when loading fails."""
# Mock series service to raise error
with patch('src.server.services.series_service.SeriesService.load_episodes',
side_effect=Exception("API Error")):
add_response = await async_client.post(
"/api/series",
json={"name": "Test Series"},
headers=auth_headers
)
series_id = add_response.json()["id"]
# Wait for error
await asyncio.sleep(2)
# Check status
status_response = await async_client.get(
f"/api/series/{series_id}/loading-status",
headers=auth_headers
)
status = status_response.json()
# Verify error was recorded
assert status["loading_status"] in ["failed", "loading"]
if status["loading_status"] == "failed":
assert "error" in status
@pytest.mark.asyncio
async def test_graceful_shutdown_with_pending_tasks(
self,
async_client,
auth_headers,
background_loader_service
):
"""Test graceful shutdown with pending loading tasks."""
# Add multiple series
series_ids = []
for i in range(5):
response = await async_client.post(
"/api/series",
json={"name": f"Series {i}"},
headers=auth_headers
)
series_ids.append(response.json()["id"])
# Trigger shutdown
await background_loader_service.stop()
# Verify no exceptions and tasks are cleaned up
assert background_loader_service.worker_task.done()
assert background_loader_service.task_queue.qsize() == 0
class TestConcurrentLoading:
"""Tests for concurrent series loading."""
@pytest.mark.asyncio
async def test_multiple_series_load_concurrently(
self,
async_client,
auth_headers
):
"""Test loading multiple series simultaneously."""
# Add 10 series rapidly
tasks = []
for i in range(10):
task = async_client.post(
"/api/series",
json={"name": f"Series {i}"},
headers=auth_headers
)
tasks.append(task)
responses = await asyncio.gather(*tasks)
# Verify all succeeded
assert all(r.status_code == 202 for r in responses)
# Verify all have unique IDs
series_ids = [r.json()["id"] for r in responses]
assert len(series_ids) == len(set(series_ids))
@pytest.mark.asyncio
async def test_no_ui_blocking_during_load(
self,
async_client,
auth_headers
):
"""Test that UI remains responsive during background loading."""
# Add series with background loading
await async_client.post(
"/api/series",
json={"name": "Loading Series"},
headers=auth_headers
)
# Immediately perform other operations
start_time = asyncio.get_event_loop().time()
response = await async_client.get(
"/api/series",
headers=auth_headers
)
elapsed = asyncio.get_event_loop().time() - start_time
# Verify response was fast (< 1 second)
assert response.status_code == 200
assert elapsed < 1.0
class TestRateLimiting:
"""Tests for API rate limiting."""
@pytest.mark.asyncio
async def test_rate_limiting_prevents_api_overload(
self,
async_client,
auth_headers,
background_loader_service
):
"""Test rate limiting prevents overwhelming external APIs."""
# Add many series
for i in range(20):
await async_client.post(
"/api/series",
json={"name": f"Series {i}"},
headers=auth_headers
)
# Monitor loading rate
# Should not process all immediately
await asyncio.sleep(1)
# Verify rate limiting is working
# (This would need actual implementation details)
# For now, just verify system is still responsive
response = await async_client.get("/api/health")
assert response.status_code == 200
Step 10: Create API Tests
Create tests/api/test_series_loading_endpoints.py:
"""
API tests for series loading endpoints.
"""
import pytest
from httpx import AsyncClient
class TestSeriesLoadingEndpoints:
"""Test series loading API endpoints."""
@pytest.mark.asyncio
async def test_post_series_endpoint_structure(self, async_client, auth_headers):
"""Test POST /api/series response structure."""
response = await async_client.post(
"/api/series",
json={"name": "Test Series"},
headers=auth_headers
)
assert response.status_code == 202
data = response.json()
# Verify required fields
required_fields = ["id", "name", "loading_status", "episodes_loaded",
"nfo_loaded", "logo_loaded", "images_loaded"]
for field in required_fields:
assert field in data
@pytest.mark.asyncio
async def test_get_loading_status_endpoint_structure(
self,
async_client,
auth_headers,
test_series
):
"""Test GET /api/series/{id}/loading-status response structure."""
response = await async_client.get(
f"/api/series/{test_series['id']}/loading-status",
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
# Verify structure
assert "loading_status" in data
assert "progress" in data
assert "started_at" in data
assert "message" in data
# Verify progress structure
progress = data["progress"]
assert all(key in progress for key in ["episodes", "nfo", "logo", "images"])
@pytest.mark.asyncio
async def test_unauthorized_access(self, async_client):
"""Test endpoints require authentication."""
# Without auth headers
response = await async_client.post(
"/api/series",
json={"name": "Test Series"}
)
assert response.status_code == 401
@pytest.mark.asyncio
async def test_invalid_series_id(self, async_client, auth_headers):
"""Test loading status with invalid series ID."""
response = await async_client.get(
"/api/series/invalid-id/loading-status",
headers=auth_headers
)
assert response.status_code == 404
Testing Summary
Coverage Requirements:
- Minimum 80% code coverage for all new modules
- 100% coverage for critical paths (task queuing, status updates)
- All edge cases and error conditions tested
Test Execution:
# Run all async loading tests
conda run -n AniWorld python -m pytest tests/unit/test_background_loader_service.py -v
# Run integration tests
conda run -n AniWorld python -m pytest tests/integration/test_async_series_loading.py -v
# Run API tests
conda run -n AniWorld python -m pytest tests/api/test_series_loading_endpoints.py -v
# Run all tests with coverage
conda run -n AniWorld python -m pytest tests/ --cov=src/server/services/background_loader_service --cov-report=html -v
Performance Benchmarks:
- Series addition: < 200ms response time
- Background loading: Complete within 30 seconds per series
- WebSocket updates: < 100ms latency
- Concurrent loading: Handle 10+ series simultaneously
- Memory usage: < 100MB increase during heavy loading
Success Criteria
- Users can add series and see them immediately in UI
- Loading status is clearly visible with progress indicators
- Real-time updates via WebSocket work correctly
- Application startup checks and loads missing data automatically
- Background loading doesn't impact UI responsiveness
- Errors are handled gracefully without stopping other loads
- All unit and integration tests pass
- Documentation is complete and accurate
- Code follows project standards and best practices
Files to Create/Modify
Create:
src/server/services/background_loader_service.pymigrations/add_loading_status.pytests/unit/test_background_loader_service.pytests/integration/test_async_series_loading.py
Modify:
src/server/services/series_service.pysrc/server/services/websocket_service.pysrc/server/api/routes/series.pysrc/server/fastapi_app.pysrc/server/database/models.pysrc/server/web/static/js/series.jssrc/server/web/static/css/styles.css
Notes
- Use
asyncio.Queuefor task management - Implement exponential backoff for retry logic
- Consider rate limiting to avoid overwhelming TMDB API
- Log all background operations for debugging
- Ensure thread-safety for shared data structures
- Handle network errors and timeouts gracefully
- Consider implementing priority queue for user-initiated loads vs startup loads
- Add monitoring/metrics for background task performance
Dependencies
- Requires WebSocket service to be fully functional
- Requires series service with episode/NFO/image loading capabilities
- May need database schema migration
Estimated Effort
- Backend implementation: 6-8 hours
- Frontend implementation: 3-4 hours
- Testing: 4-5 hours
- Documentation: 1-2 hours
- Total: 14-19 hours