Files
Aniworld/src/server/services/initialization_service.py
Lukas 21af502184 refactor: simplify NFO handling, remove legacy services
- Drop nfo_factory, nfo_repair_service, nfo_service, series_manager_service
- Delete key_resolution_service, consolidate into folder_rename_service
- Remove bulk of NFO-related tests (coverage via integration tests)
- Streamline SeriesApp, background_loader, initialization services
- Add folder_rename_service to scheduler

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-04 18:54:31 +02:00

565 lines
18 KiB
Python

"""Centralized initialization service for application startup and setup."""
import asyncio
import os
from pathlib import Path
from typing import Callable, Optional
import structlog
from src.config.settings import settings
from src.server.services.anime_service import sync_legacy_series_to_db
from src.server.services.legacy_file_migration import migrate_series_from_files_to_db
logger = structlog.get_logger(__name__)
async def _check_scan_status(
check_method: Callable,
scan_type: str,
log_completed_msg: str = None,
log_not_completed_msg: str = None
) -> bool:
"""Generic function to check if a scan has been completed.
Args:
check_method: SystemSettingsService method to check scan status
scan_type: Type of scan (e.g., "initial", "NFO", "media")
log_completed_msg: Optional custom message when scan is completed
log_not_completed_msg: Optional custom message when scan not completed
Returns:
bool: True if scan was completed, False otherwise
"""
from src.server.database.connection import get_db_session
from src.server.database.system_settings_service import SystemSettingsService
try:
async with get_db_session() as db:
is_completed = await check_method(SystemSettingsService, db)
if is_completed and log_completed_msg:
logger.info(log_completed_msg)
elif not is_completed and log_not_completed_msg:
logger.info(log_not_completed_msg)
return is_completed
except Exception as e:
logger.warning(
"Failed to check %s scan status: %s, assuming not done",
scan_type,
e
)
return False
async def _mark_scan_completed(
mark_method: Callable,
scan_type: str
) -> None:
"""Generic function to mark a scan as completed.
Args:
mark_method: SystemSettingsService method to mark scan as completed
scan_type: Type of scan (e.g., "initial", "NFO", "media")
"""
from src.server.database.connection import get_db_session
from src.server.database.system_settings_service import SystemSettingsService
try:
async with get_db_session() as db:
await mark_method(SystemSettingsService, db)
logger.info("Marked %s scan as completed", scan_type)
except Exception as e:
logger.warning("Failed to mark %s scan as completed: %s", scan_type, e)
async def _check_initial_scan_status() -> bool:
"""Check if initial scan has been completed.
Returns:
bool: True if scan was completed, False otherwise
"""
is_completed = await _check_scan_status(
check_method=lambda svc, db: svc.is_initial_scan_completed(db),
scan_type="initial",
log_completed_msg=(
"Initial scan already completed, skipping data file sync"
),
log_not_completed_msg=(
"Initial scan not completed, performing first-time setup"
)
)
return is_completed
async def _mark_initial_scan_completed() -> None:
"""Mark the initial scan as completed in system settings."""
await _mark_scan_completed(
mark_method=lambda svc, db: svc.mark_initial_scan_completed(db),
scan_type="initial"
)
async def _check_legacy_migration_status() -> bool:
"""Check if legacy key/data file migration has been completed.
Returns:
bool: True if migration was completed, False otherwise
"""
return await _check_scan_status(
check_method=lambda svc, db: svc.is_migration_legacy_files_completed(db),
scan_type="legacy_migration",
log_completed_msg="Legacy file migration already completed, skipping",
log_not_completed_msg="Legacy file migration not yet run, will check for files"
)
async def _mark_legacy_migration_completed() -> None:
"""Mark the legacy file migration as completed in system settings."""
await _mark_scan_completed(
mark_method=lambda svc, db: svc.mark_migration_legacy_files_completed(db),
scan_type="legacy_migration"
)
async def _check_legacy_key_cleanup_status() -> bool:
"""Check if legacy key file cleanup has been completed.
Returns:
bool: True if cleanup was completed, False otherwise
"""
return await _check_scan_status(
check_method=lambda svc, db: svc.is_legacy_key_cleanup_completed(db),
scan_type="legacy_key_cleanup",
log_completed_msg="Legacy key file cleanup already completed, skipping",
log_not_completed_msg="Legacy key file cleanup not yet run, will clean up key files"
)
async def _mark_legacy_key_cleanup_completed() -> None:
"""Mark the legacy key file cleanup as completed in system settings."""
await _mark_scan_completed(
mark_method=lambda svc, db: svc.mark_legacy_key_cleanup_completed(db),
scan_type="legacy_key_cleanup"
)
async def _migrate_legacy_files() -> int:
"""Migrate series from legacy key/data files to database.
Returns:
int: Number of series migrated
"""
from src.server.database.connection import get_db_session
logger.info("Checking for legacy key/data files to migrate...")
try:
async with get_db_session() as db:
migrated_count = await migrate_series_from_files_to_db(
settings.anime_directory,
db
)
if migrated_count > 0:
logger.info("Migrated %d series from legacy files", migrated_count)
else:
logger.info("No series found in legacy files to migrate")
return migrated_count
except Exception as e:
logger.warning("Failed to migrate legacy files: %s", e)
return 0
async def _cleanup_legacy_key_files() -> int:
"""Remove legacy key files from folders that already have DB entries.
This is a one-time cleanup task that runs at startup after legacy migration.
It removes deprecated 'key' files that cause duplicate key errors when
folders are renamed, since the DB is now the source of truth.
Returns:
int: Number of key files deleted
"""
from src.server.database.connection import get_db_session
from src.server.database.service import AnimeSeriesService
logger.info("Checking for legacy key files to clean up...")
if not settings.anime_directory or not os.path.isdir(settings.anime_directory):
logger.warning(
"Anime directory not configured or does not exist, skipping legacy key cleanup"
)
return 0
deleted_count = 0
scanned_count = 0
try:
async with get_db_session() as db:
# Get all series from DB to know which folders should have key files removed
all_series = await AnimeSeriesService.get_all(db)
# Build a set of known folder names from DB
db_folders: set[str] = {series.folder for series in all_series if series.folder}
for folder_name in db_folders:
folder_path = settings.anime_directory / folder_name
key_file = folder_path / "key"
if not key_file.exists():
continue
scanned_count += 1
try:
key_file.unlink()
deleted_count += 1
logger.info(
"Removed legacy key file",
folder=folder_name,
key_file=str(key_file)
)
except OSError as exc:
logger.warning(
"Could not remove legacy key file",
folder=folder_name,
key_file=str(key_file),
error=str(exc)
)
except Exception as e:
logger.error(
"Legacy key file cleanup failed",
error=str(e),
exc_info=True
)
return deleted_count
logger.info(
"Legacy key file cleanup complete",
scanned=scanned_count,
deleted=deleted_count
)
return deleted_count
async def _sync_anime_folders(progress_service=None) -> int:
"""Scan anime folders and sync series to database.
Args:
progress_service: Optional ProgressService for progress updates
Returns:
int: Number of series synced
"""
logger.info("Performing initial anime folder scan...")
if progress_service:
await progress_service.update_progress(
progress_id="series_sync",
current=25,
message="Scanning anime folders...",
metadata={"step_id": "series_sync"}
)
sync_count = await sync_legacy_series_to_db(settings.anime_directory)
logger.info("Data file sync complete. Added %d series.", sync_count)
if progress_service:
await progress_service.update_progress(
progress_id="series_sync",
current=75,
message=f"Synced {sync_count} series from data files",
metadata={"step_id": "series_sync"}
)
return sync_count
async def _load_series_into_memory(progress_service=None) -> None:
"""Load series from database into SeriesApp's in-memory cache.
Args:
progress_service: Optional ProgressService for progress updates
"""
from src.server.utils.dependencies import get_anime_service
anime_service = get_anime_service()
await anime_service._load_series_from_db()
logger.info("Series loaded from database into memory")
if progress_service:
await progress_service.complete_progress(
progress_id="series_sync",
message="Series loaded into memory",
metadata={"step_id": "series_sync"}
)
async def _validate_anime_directory(progress_service=None) -> bool:
"""Validate that anime directory is configured.
Args:
progress_service: Optional ProgressService for progress updates
Returns:
bool: True if directory is configured, False otherwise
"""
logger.info(
"Checking anime_directory setting: '%s'",
settings.anime_directory
)
if not settings.anime_directory:
logger.info("Initialization skipped - anime directory not configured")
if progress_service:
await progress_service.complete_progress(
progress_id="series_sync",
message="No anime directory configured",
metadata={"step_id": "series_sync"}
)
return False
return True
async def perform_initial_setup(progress_service=None):
"""Perform initial setup including series sync and scan completion marking.
This function is called both during application lifespan startup
and when the setup endpoint is completed. It ensures that:
1. Legacy key/data files are migrated to database (one-time)
2. Series are synced from data files to database
3. Initial scan is marked as completed
4. Series are loaded into memory
5. NFO scan is performed if configured
6. Media scan is performed
Args:
progress_service: Optional ProgressService for emitting updates
Returns:
bool: True if initialization was performed, False if skipped
"""
# Send initial progress update
if progress_service:
from src.server.services.progress_service import ProgressType
await progress_service.start_progress(
progress_id="series_sync",
progress_type=ProgressType.SYSTEM,
title="Syncing Series Database",
total=100,
message="Checking initialization status...",
metadata={"step_id": "series_sync"}
)
# Check if initial setup has already been completed
is_initial_scan_done = await _check_initial_scan_status()
if is_initial_scan_done:
if progress_service:
await progress_service.complete_progress(
progress_id="series_sync",
message="Already completed",
metadata={"step_id": "series_sync"}
)
return False
# Validate that anime directory is configured
if not await _validate_anime_directory(progress_service):
return False
# Perform the actual initialization
try:
# First, run legacy file migration if needed (independent of initial scan)
is_legacy_migration_done = await _check_legacy_migration_status()
if not is_legacy_migration_done:
await _migrate_legacy_files()
await _mark_legacy_migration_completed()
# Sync series from anime folders to database
await _sync_anime_folders(progress_service)
# Clean up legacy key files from folders that now have DB entries
# This runs after migration/sync to ensure DB entries exist before deletion
is_key_cleanup_done = await _check_legacy_key_cleanup_status()
if not is_key_cleanup_done:
await _cleanup_legacy_key_files()
await _mark_legacy_key_cleanup_completed()
# Mark the initial scan as completed
await _mark_initial_scan_completed()
# Load series into memory from database
await _load_series_into_memory(progress_service)
return True
except (OSError, RuntimeError, ValueError) as e:
logger.warning("Failed to perform initial setup: %s", e)
return False
async def _check_nfo_scan_status() -> bool:
"""Check if initial NFO scan has been completed.
Returns:
bool: True if NFO scan was completed, False otherwise
"""
return await _check_scan_status(
check_method=lambda svc, db: svc.is_initial_nfo_scan_completed(db),
scan_type="NFO"
)
async def _mark_nfo_scan_completed() -> None:
"""Mark the initial NFO scan as completed in system settings."""
await _mark_scan_completed(
mark_method=lambda svc, db: svc.mark_initial_nfo_scan_completed(db),
scan_type="NFO"
)
async def _is_nfo_scan_configured() -> bool:
"""Check if NFO scan features are properly configured.
Returns:
bool: True if TMDB API key and NFO features are configured
"""
return settings.tmdb_api_key and (
settings.nfo_auto_create or settings.nfo_update_on_scan
)
async def _execute_nfo_scan(progress_service=None) -> None:
"""Execute the actual NFO scan with TMDB data.
Note: NFO service removed. This function is now a no-op stub.
Args:
progress_service: Unused. Kept to avoid breaking call-sites.
"""
logger.info("NFO scan skipped — NFO service removed")
return
async def perform_nfo_scan_if_needed(progress_service=None):
"""Perform initial NFO scan if not yet completed and configured.
Args:
progress_service: Optional ProgressService for emitting updates
"""
if progress_service:
from src.server.services.progress_service import ProgressType
await progress_service.start_progress(
progress_id="nfo_scan",
progress_type=ProgressType.SYSTEM,
title="Processing NFO Metadata",
total=100,
message="Checking NFO scan status...",
metadata={"step_id": "nfo_scan"}
)
# Check if NFO scan was already completed
is_nfo_scan_done = await _check_nfo_scan_status()
# Check if NFO features are configured
if not await _is_nfo_scan_configured():
message = (
"Skipped - TMDB API key not configured"
if not settings.tmdb_api_key
else "Skipped - NFO features disabled"
)
logger.info("NFO scan skipped: %s", message)
if progress_service:
await progress_service.complete_progress(
progress_id="nfo_scan",
message=message,
metadata={"step_id": "nfo_scan"}
)
return
# Skip if already completed
if is_nfo_scan_done:
logger.info("Skipping NFO scan - already completed on previous run")
if progress_service:
await progress_service.complete_progress(
progress_id="nfo_scan",
message="Already completed",
metadata={"step_id": "nfo_scan"}
)
return
# Execute the NFO scan
try:
await _execute_nfo_scan(progress_service)
await _mark_nfo_scan_completed()
except Exception as e:
logger.error("Failed to complete NFO scan: %s", e, exc_info=True)
if progress_service:
await progress_service.fail_progress(
progress_id="nfo_scan",
error_message=f"NFO scan failed: {str(e)}",
metadata={"step_id": "nfo_scan"}
)
async def _check_media_scan_status() -> bool:
"""Check if initial media scan has been completed.
Returns:
bool: True if media scan was completed, False otherwise
"""
return await _check_scan_status(
check_method=lambda svc, db: svc.is_initial_media_scan_completed(db),
scan_type="media"
)
async def _mark_media_scan_completed() -> None:
"""Mark the initial media scan as completed in system settings."""
await _mark_scan_completed(
mark_method=lambda svc, db: svc.mark_initial_media_scan_completed(db),
scan_type="media"
)
async def _execute_media_scan(background_loader) -> None:
"""Execute the actual media scan and queue background loading.
Args:
background_loader: The background loader service instance
Raises:
Exception: If media scan fails
"""
from src.server.fastapi_app import _check_incomplete_series_on_startup
logger.info("Performing initial media scan...")
await _check_incomplete_series_on_startup(background_loader)
logger.info("Initial media scan completed")
async def perform_media_scan_if_needed(background_loader):
"""Perform initial media scan if not yet completed.
Args:
background_loader: The background loader service instance
"""
# Check if media scan was already completed
is_media_scan_done = await _check_media_scan_status()
if is_media_scan_done:
logger.info("Skipping media scan - already completed on previous run")
return
# Execute the media scan
try:
await _execute_media_scan(background_loader)
await _mark_media_scan_completed()
except Exception as e:
logger.error("Failed to complete media scan: %s", e, exc_info=True)