- Add FolderScanService.run_folder_scan() calling perform_nfo_repair_scan() - Remove startup-time NFO repair from fastapi_app lifespan - Update docs/NFO_GUIDE.md: repair now runs as part of daily scan - Update tests to verify integration wiring - Update ARCHITECTURE.md and scheduler_service for scan scheduling
530 lines
17 KiB
Python
530 lines
17 KiB
Python
"""Centralized initialization service for application startup and setup."""
|
|
import asyncio
|
|
from pathlib import Path
|
|
from typing import Callable, Optional
|
|
|
|
import structlog
|
|
|
|
from src.config.settings import settings
|
|
from src.server.services.anime_service import sync_series_from_data_files
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
|
|
async def _check_scan_status(
|
|
check_method: Callable,
|
|
scan_type: str,
|
|
log_completed_msg: str = None,
|
|
log_not_completed_msg: str = None
|
|
) -> bool:
|
|
"""Generic function to check if a scan has been completed.
|
|
|
|
Args:
|
|
check_method: SystemSettingsService method to check scan status
|
|
scan_type: Type of scan (e.g., "initial", "NFO", "media")
|
|
log_completed_msg: Optional custom message when scan is completed
|
|
log_not_completed_msg: Optional custom message when scan not completed
|
|
|
|
Returns:
|
|
bool: True if scan was completed, False otherwise
|
|
"""
|
|
from src.server.database.connection import get_db_session
|
|
from src.server.database.system_settings_service import SystemSettingsService
|
|
|
|
try:
|
|
async with get_db_session() as db:
|
|
is_completed = await check_method(SystemSettingsService, db)
|
|
|
|
if is_completed and log_completed_msg:
|
|
logger.info(log_completed_msg)
|
|
elif not is_completed and log_not_completed_msg:
|
|
logger.info(log_not_completed_msg)
|
|
|
|
return is_completed
|
|
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Failed to check %s scan status: %s, assuming not done",
|
|
scan_type,
|
|
e
|
|
)
|
|
return False
|
|
|
|
|
|
async def _mark_scan_completed(
|
|
mark_method: Callable,
|
|
scan_type: str
|
|
) -> None:
|
|
"""Generic function to mark a scan as completed.
|
|
|
|
Args:
|
|
mark_method: SystemSettingsService method to mark scan as completed
|
|
scan_type: Type of scan (e.g., "initial", "NFO", "media")
|
|
"""
|
|
from src.server.database.connection import get_db_session
|
|
from src.server.database.system_settings_service import SystemSettingsService
|
|
|
|
try:
|
|
async with get_db_session() as db:
|
|
await mark_method(SystemSettingsService, db)
|
|
logger.info("Marked %s scan as completed", scan_type)
|
|
except Exception as e:
|
|
logger.warning("Failed to mark %s scan as completed: %s", scan_type, e)
|
|
|
|
|
|
async def _check_initial_scan_status() -> bool:
|
|
"""Check if initial scan has been completed.
|
|
|
|
Returns:
|
|
bool: True if scan was completed, False otherwise
|
|
"""
|
|
is_completed = await _check_scan_status(
|
|
check_method=lambda svc, db: svc.is_initial_scan_completed(db),
|
|
scan_type="initial",
|
|
log_completed_msg=(
|
|
"Initial scan already completed, skipping data file sync"
|
|
),
|
|
log_not_completed_msg=(
|
|
"Initial scan not completed, performing first-time setup"
|
|
)
|
|
)
|
|
return is_completed
|
|
|
|
|
|
async def _mark_initial_scan_completed() -> None:
|
|
"""Mark the initial scan as completed in system settings."""
|
|
await _mark_scan_completed(
|
|
mark_method=lambda svc, db: svc.mark_initial_scan_completed(db),
|
|
scan_type="initial"
|
|
)
|
|
|
|
|
|
async def _sync_anime_folders(progress_service=None) -> int:
|
|
"""Scan anime folders and sync series to database.
|
|
|
|
Args:
|
|
progress_service: Optional ProgressService for progress updates
|
|
|
|
Returns:
|
|
int: Number of series synced
|
|
"""
|
|
logger.info("Performing initial anime folder scan...")
|
|
|
|
if progress_service:
|
|
await progress_service.update_progress(
|
|
progress_id="series_sync",
|
|
current=25,
|
|
message="Scanning anime folders...",
|
|
metadata={"step_id": "series_sync"}
|
|
)
|
|
|
|
sync_count = await sync_series_from_data_files(settings.anime_directory)
|
|
logger.info("Data file sync complete. Added %d series.", sync_count)
|
|
|
|
if progress_service:
|
|
await progress_service.update_progress(
|
|
progress_id="series_sync",
|
|
current=75,
|
|
message=f"Synced {sync_count} series from data files",
|
|
metadata={"step_id": "series_sync"}
|
|
)
|
|
|
|
return sync_count
|
|
|
|
|
|
async def _load_series_into_memory(progress_service=None) -> None:
|
|
"""Load series from database into SeriesApp's in-memory cache.
|
|
|
|
Args:
|
|
progress_service: Optional ProgressService for progress updates
|
|
"""
|
|
from src.server.utils.dependencies import get_anime_service
|
|
|
|
anime_service = get_anime_service()
|
|
await anime_service._load_series_from_db()
|
|
logger.info("Series loaded from database into memory")
|
|
|
|
if progress_service:
|
|
await progress_service.complete_progress(
|
|
progress_id="series_sync",
|
|
message="Series loaded into memory",
|
|
metadata={"step_id": "series_sync"}
|
|
)
|
|
|
|
|
|
async def _validate_anime_directory(progress_service=None) -> bool:
|
|
"""Validate that anime directory is configured.
|
|
|
|
Args:
|
|
progress_service: Optional ProgressService for progress updates
|
|
|
|
Returns:
|
|
bool: True if directory is configured, False otherwise
|
|
"""
|
|
logger.info(
|
|
"Checking anime_directory setting: '%s'",
|
|
settings.anime_directory
|
|
)
|
|
|
|
if not settings.anime_directory:
|
|
logger.info("Initialization skipped - anime directory not configured")
|
|
if progress_service:
|
|
await progress_service.complete_progress(
|
|
progress_id="series_sync",
|
|
message="No anime directory configured",
|
|
metadata={"step_id": "series_sync"}
|
|
)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
async def perform_initial_setup(progress_service=None):
|
|
"""Perform initial setup including series sync and scan completion marking.
|
|
|
|
This function is called both during application lifespan startup
|
|
and when the setup endpoint is completed. It ensures that:
|
|
1. Series are synced from data files to database
|
|
2. Initial scan is marked as completed
|
|
3. Series are loaded into memory
|
|
4. NFO scan is performed if configured
|
|
5. Media scan is performed
|
|
|
|
Args:
|
|
progress_service: Optional ProgressService for emitting updates
|
|
|
|
Returns:
|
|
bool: True if initialization was performed, False if skipped
|
|
"""
|
|
# Send initial progress update
|
|
if progress_service:
|
|
from src.server.services.progress_service import ProgressType
|
|
await progress_service.start_progress(
|
|
progress_id="series_sync",
|
|
progress_type=ProgressType.SYSTEM,
|
|
title="Syncing Series Database",
|
|
total=100,
|
|
message="Checking initialization status...",
|
|
metadata={"step_id": "series_sync"}
|
|
)
|
|
|
|
# Check if initial setup has already been completed
|
|
is_initial_scan_done = await _check_initial_scan_status()
|
|
if is_initial_scan_done:
|
|
if progress_service:
|
|
await progress_service.complete_progress(
|
|
progress_id="series_sync",
|
|
message="Already completed",
|
|
metadata={"step_id": "series_sync"}
|
|
)
|
|
return False
|
|
|
|
# Validate that anime directory is configured
|
|
if not await _validate_anime_directory(progress_service):
|
|
return False
|
|
|
|
# Perform the actual initialization
|
|
try:
|
|
# Sync series from anime folders to database
|
|
await _sync_anime_folders(progress_service)
|
|
|
|
# Mark the initial scan as completed
|
|
await _mark_initial_scan_completed()
|
|
|
|
# Load series into memory from database
|
|
await _load_series_into_memory(progress_service)
|
|
|
|
return True
|
|
|
|
except (OSError, RuntimeError, ValueError) as e:
|
|
logger.warning("Failed to perform initial setup: %s", e)
|
|
return False
|
|
|
|
|
|
async def _check_nfo_scan_status() -> bool:
|
|
"""Check if initial NFO scan has been completed.
|
|
|
|
Returns:
|
|
bool: True if NFO scan was completed, False otherwise
|
|
"""
|
|
return await _check_scan_status(
|
|
check_method=lambda svc, db: svc.is_initial_nfo_scan_completed(db),
|
|
scan_type="NFO"
|
|
)
|
|
|
|
|
|
async def _mark_nfo_scan_completed() -> None:
|
|
"""Mark the initial NFO scan as completed in system settings."""
|
|
await _mark_scan_completed(
|
|
mark_method=lambda svc, db: svc.mark_initial_nfo_scan_completed(db),
|
|
scan_type="NFO"
|
|
)
|
|
|
|
|
|
async def _is_nfo_scan_configured() -> bool:
|
|
"""Check if NFO scan features are properly configured.
|
|
|
|
Returns:
|
|
bool: True if TMDB API key and NFO features are configured
|
|
"""
|
|
return settings.tmdb_api_key and (
|
|
settings.nfo_auto_create or settings.nfo_update_on_scan
|
|
)
|
|
|
|
|
|
async def _execute_nfo_scan(progress_service=None) -> None:
|
|
"""Execute the actual NFO scan with TMDB data.
|
|
|
|
Args:
|
|
progress_service: Optional ProgressService for progress updates
|
|
|
|
Raises:
|
|
Exception: If NFO scan fails
|
|
"""
|
|
from src.core.services.series_manager_service import SeriesManagerService
|
|
|
|
logger.info("Performing initial NFO scan...")
|
|
|
|
if progress_service:
|
|
await progress_service.update_progress(
|
|
progress_id="nfo_scan",
|
|
current=25,
|
|
message="Scanning series for NFO files...",
|
|
metadata={"step_id": "nfo_scan"}
|
|
)
|
|
|
|
manager = SeriesManagerService.from_settings()
|
|
|
|
if progress_service:
|
|
await progress_service.update_progress(
|
|
progress_id="nfo_scan",
|
|
current=50,
|
|
message="Processing NFO files with TMDB data...",
|
|
metadata={"step_id": "nfo_scan"}
|
|
)
|
|
|
|
await manager.scan_and_process_nfo()
|
|
await manager.close()
|
|
logger.info("Initial NFO scan completed")
|
|
|
|
if progress_service:
|
|
await progress_service.complete_progress(
|
|
progress_id="nfo_scan",
|
|
message="NFO scan completed successfully",
|
|
metadata={"step_id": "nfo_scan"}
|
|
)
|
|
|
|
|
|
async def perform_nfo_scan_if_needed(progress_service=None):
|
|
"""Perform initial NFO scan if not yet completed and configured.
|
|
|
|
Args:
|
|
progress_service: Optional ProgressService for emitting updates
|
|
"""
|
|
if progress_service:
|
|
from src.server.services.progress_service import ProgressType
|
|
await progress_service.start_progress(
|
|
progress_id="nfo_scan",
|
|
progress_type=ProgressType.SYSTEM,
|
|
title="Processing NFO Metadata",
|
|
total=100,
|
|
message="Checking NFO scan status...",
|
|
metadata={"step_id": "nfo_scan"}
|
|
)
|
|
|
|
# Check if NFO scan was already completed
|
|
is_nfo_scan_done = await _check_nfo_scan_status()
|
|
|
|
# Check if NFO features are configured
|
|
if not await _is_nfo_scan_configured():
|
|
message = (
|
|
"Skipped - TMDB API key not configured"
|
|
if not settings.tmdb_api_key
|
|
else "Skipped - NFO features disabled"
|
|
)
|
|
logger.info("NFO scan skipped: %s", message)
|
|
|
|
if progress_service:
|
|
await progress_service.complete_progress(
|
|
progress_id="nfo_scan",
|
|
message=message,
|
|
metadata={"step_id": "nfo_scan"}
|
|
)
|
|
return
|
|
|
|
# Skip if already completed
|
|
if is_nfo_scan_done:
|
|
logger.info("Skipping NFO scan - already completed on previous run")
|
|
if progress_service:
|
|
await progress_service.complete_progress(
|
|
progress_id="nfo_scan",
|
|
message="Already completed",
|
|
metadata={"step_id": "nfo_scan"}
|
|
)
|
|
return
|
|
|
|
# Execute the NFO scan
|
|
try:
|
|
await _execute_nfo_scan(progress_service)
|
|
await _mark_nfo_scan_completed()
|
|
except Exception as e:
|
|
logger.error("Failed to complete NFO scan: %s", e, exc_info=True)
|
|
if progress_service:
|
|
await progress_service.fail_progress(
|
|
progress_id="nfo_scan",
|
|
error_message=f"NFO scan failed: {str(e)}",
|
|
metadata={"step_id": "nfo_scan"}
|
|
)
|
|
|
|
|
|
_NFO_REPAIR_SEMAPHORE: asyncio.Semaphore = asyncio.Semaphore(3)
|
|
|
|
|
|
async def _repair_one_series(series_dir: Path, series_name: str) -> None:
|
|
"""Repair a single series NFO in isolation.
|
|
|
|
Creates a fresh :class:`NFOService` and :class:`NfoRepairService` per
|
|
invocation so that each repair owns its own ``aiohttp`` session/connector
|
|
and concurrent tasks cannot interfere with each other.
|
|
|
|
A module-level semaphore (``_NFO_REPAIR_SEMAPHORE``) limits the number of
|
|
simultaneous TMDB requests to avoid rate-limiting.
|
|
|
|
Any exception is caught and logged so the asyncio task never silently
|
|
drops an unhandled error.
|
|
|
|
Args:
|
|
series_dir: Absolute path to the series folder.
|
|
series_name: Human-readable series name for log messages.
|
|
"""
|
|
from src.core.services.nfo_factory import NFOServiceFactory
|
|
from src.core.services.nfo_repair_service import NfoRepairService
|
|
|
|
async with _NFO_REPAIR_SEMAPHORE:
|
|
try:
|
|
factory = NFOServiceFactory()
|
|
nfo_service = factory.create()
|
|
repair_service = NfoRepairService(nfo_service)
|
|
await repair_service.repair_series(series_dir, series_name)
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
logger.error(
|
|
"NFO repair failed for %s: %s",
|
|
series_name,
|
|
exc,
|
|
)
|
|
|
|
|
|
async def perform_nfo_repair_scan(background_loader=None) -> None:
|
|
"""Scan all series folders and repair incomplete tvshow.nfo files.
|
|
|
|
Called from ``FolderScanService.run_folder_scan()`` during the scheduled
|
|
daily folder scan (not on every startup). Checks each subfolder of
|
|
``settings.anime_directory`` for a ``tvshow.nfo`` and calls
|
|
``_repair_one_series`` for every file with absent or empty required tags.
|
|
|
|
Each repair task creates its own isolated :class:`NFOService` /
|
|
:class:`TMDBClient` so concurrent tasks never share an ``aiohttp``
|
|
session — this prevents "Connector is closed" errors when many repairs
|
|
run in parallel. A semaphore caps TMDB concurrency at 3 to stay within
|
|
rate limits.
|
|
|
|
The ``background_loader`` parameter is accepted for backwards-compatibility
|
|
but is no longer used.
|
|
|
|
Args:
|
|
background_loader: Unused. Kept to avoid breaking call-sites.
|
|
"""
|
|
from src.core.services.nfo_repair_service import nfo_needs_repair
|
|
|
|
if not settings.tmdb_api_key:
|
|
logger.warning("NFO repair scan skipped — TMDB API key not configured")
|
|
return
|
|
if not settings.anime_directory:
|
|
logger.warning("NFO repair scan skipped — anime directory not configured")
|
|
return
|
|
anime_dir = Path(settings.anime_directory)
|
|
if not anime_dir.is_dir():
|
|
logger.warning("NFO repair scan skipped — anime directory not found: %s", anime_dir)
|
|
return
|
|
|
|
queued = 0
|
|
total = 0
|
|
for series_dir in sorted(anime_dir.iterdir()):
|
|
if not series_dir.is_dir():
|
|
continue
|
|
nfo_path = series_dir / "tvshow.nfo"
|
|
if not nfo_path.exists():
|
|
continue
|
|
total += 1
|
|
series_name = series_dir.name
|
|
if nfo_needs_repair(nfo_path):
|
|
queued += 1
|
|
# Each task creates its own NFOService so connectors are isolated.
|
|
asyncio.create_task(
|
|
_repair_one_series(series_dir, series_name),
|
|
name=f"nfo_repair:{series_name}",
|
|
)
|
|
|
|
logger.info(
|
|
"NFO repair scan complete: %d of %d series queued for repair",
|
|
queued,
|
|
total,
|
|
)
|
|
|
|
|
|
async def _check_media_scan_status() -> bool:
|
|
"""Check if initial media scan has been completed.
|
|
|
|
Returns:
|
|
bool: True if media scan was completed, False otherwise
|
|
"""
|
|
return await _check_scan_status(
|
|
check_method=lambda svc, db: svc.is_initial_media_scan_completed(db),
|
|
scan_type="media"
|
|
)
|
|
|
|
|
|
async def _mark_media_scan_completed() -> None:
|
|
"""Mark the initial media scan as completed in system settings."""
|
|
await _mark_scan_completed(
|
|
mark_method=lambda svc, db: svc.mark_initial_media_scan_completed(db),
|
|
scan_type="media"
|
|
)
|
|
|
|
|
|
async def _execute_media_scan(background_loader) -> None:
|
|
"""Execute the actual media scan and queue background loading.
|
|
|
|
Args:
|
|
background_loader: The background loader service instance
|
|
|
|
Raises:
|
|
Exception: If media scan fails
|
|
"""
|
|
from src.server.fastapi_app import _check_incomplete_series_on_startup
|
|
|
|
logger.info("Performing initial media scan...")
|
|
await _check_incomplete_series_on_startup(background_loader)
|
|
logger.info("Initial media scan completed")
|
|
|
|
|
|
async def perform_media_scan_if_needed(background_loader):
|
|
"""Perform initial media scan if not yet completed.
|
|
|
|
Args:
|
|
background_loader: The background loader service instance
|
|
"""
|
|
# Check if media scan was already completed
|
|
is_media_scan_done = await _check_media_scan_status()
|
|
|
|
if is_media_scan_done:
|
|
logger.info("Skipping media scan - already completed on previous run")
|
|
return
|
|
|
|
# Execute the media scan
|
|
try:
|
|
await _execute_media_scan(background_loader)
|
|
await _mark_media_scan_completed()
|
|
except Exception as e:
|
|
logger.error("Failed to complete media scan: %s", e, exc_info=True)
|