293 lines
11 KiB
Python
293 lines
11 KiB
Python
"""Rescan orchestrator — coordinates all scan/cleanup operations during a rescan.
|
|
|
|
Extracts the rescan workflow from SchedulerService so scheduling and scan
|
|
logic are cleanly separated.
|
|
|
|
Called by SchedulerService.trigger_rescan() and by _run_rescan_job().
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import List, Optional
|
|
|
|
from src.server.models.config import SchedulerConfig
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RescanOrchestrator:
|
|
"""Coordinates rescan, auto-download, folder scan, and key resolution.
|
|
|
|
This class encapsulates the entire post-rescan workflow so SchedulerService
|
|
only needs to call a single method.
|
|
"""
|
|
|
|
def __init__(self, config: Optional[SchedulerConfig] = None) -> None:
|
|
"""Initialize the orchestrator.
|
|
|
|
Args:
|
|
config: Optional scheduler config. If None, operations that depend
|
|
on config flags (auto_download, folder_scan) will be skipped.
|
|
"""
|
|
self._config = config
|
|
self._last_scan_time: Optional[datetime] = None
|
|
# Cooldown tracking for auto-download to prevent rapid re-triggers
|
|
self._last_auto_download_time: Optional[datetime] = None
|
|
self._auto_download_cooldown_seconds: int = 300 # 5 minutes default
|
|
|
|
@property
|
|
def last_scan_time(self) -> Optional[datetime]:
|
|
return self._last_scan_time
|
|
|
|
# ------------------------------------------------------------------
|
|
# Auto-download
|
|
# ------------------------------------------------------------------
|
|
|
|
async def run_auto_download(self) -> int:
|
|
"""Queue and start downloads for all series with missing episodes.
|
|
|
|
Returns:
|
|
Number of episodes queued.
|
|
"""
|
|
from datetime import timedelta
|
|
|
|
from src.server.models.download import EpisodeIdentifier
|
|
from src.server.utils.dependencies import (
|
|
get_anime_service,
|
|
get_download_service,
|
|
)
|
|
|
|
# Check cooldown to prevent rapid re-triggers
|
|
now = datetime.now(timezone.utc)
|
|
if self._last_auto_download_time is not None:
|
|
elapsed = now - self._last_auto_download_time
|
|
if elapsed < timedelta(seconds=self._auto_download_cooldown_seconds):
|
|
logger.debug(
|
|
"Auto-download skipped: cooldown active (elapsed=%.1fs cooldown=%ds)",
|
|
elapsed.total_seconds(),
|
|
self._auto_download_cooldown_seconds,
|
|
)
|
|
return 0
|
|
|
|
anime_service = get_anime_service()
|
|
download_service = get_download_service()
|
|
|
|
series_list = anime_service._cached_list_missing()
|
|
queued_count = 0
|
|
|
|
for series in series_list:
|
|
episode_dict: dict = series.get("episodeDict") or {}
|
|
if not episode_dict:
|
|
continue
|
|
|
|
episodes: List[EpisodeIdentifier] = []
|
|
for season_str, ep_numbers in episode_dict.items():
|
|
for ep_num in ep_numbers:
|
|
episodes.append(
|
|
EpisodeIdentifier(season=int(season_str), episode=int(ep_num))
|
|
)
|
|
|
|
if not episodes:
|
|
continue
|
|
|
|
await download_service.add_to_queue(
|
|
serie_id=series.get("key", ""),
|
|
serie_folder=series.get("folder", series.get("name", "")),
|
|
serie_name=series.get("name", ""),
|
|
episodes=episodes,
|
|
)
|
|
queued_count += len(episodes)
|
|
logger.info(
|
|
"Auto-download queued episodes for series=%s count=%d",
|
|
series.get("key"),
|
|
len(episodes),
|
|
)
|
|
|
|
if queued_count:
|
|
await download_service.start_queue_processing()
|
|
logger.info("Auto-download queue processing started: queued=%d", queued_count)
|
|
|
|
self._last_auto_download_time = datetime.now(timezone.utc)
|
|
logger.info("Auto-download completed: queued_count=%d", queued_count)
|
|
return queued_count
|
|
|
|
# ------------------------------------------------------------------
|
|
# Folder scan
|
|
# ------------------------------------------------------------------
|
|
|
|
async def run_folder_scan(self) -> None:
|
|
"""Run the folder scan maintenance task."""
|
|
from src.server.services.scheduler.folder_scan_service import FolderScanService
|
|
|
|
folder_scan_service = FolderScanService()
|
|
await folder_scan_service.run_folder_scan()
|
|
logger.info("Folder scan completed successfully")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Key resolution
|
|
# ------------------------------------------------------------------
|
|
|
|
async def run_key_resolution(self) -> dict:
|
|
"""Run the orphaned folder key resolution scan.
|
|
|
|
Returns:
|
|
Dict with resolved/skipped/errors counts.
|
|
"""
|
|
from src.server.services.scheduler.key_resolution_service import (
|
|
perform_key_resolution_scan,
|
|
)
|
|
|
|
key_stats = await perform_key_resolution_scan()
|
|
logger.info(
|
|
"Key resolution scan completed: resolved=%d, skipped=%d, errors=%d",
|
|
key_stats["resolved"],
|
|
key_stats["skipped"],
|
|
key_stats["errors"],
|
|
)
|
|
return key_stats
|
|
|
|
# ------------------------------------------------------------------
|
|
# Main orchestrator entry point
|
|
# ------------------------------------------------------------------
|
|
|
|
async def execute(self) -> dict:
|
|
"""Execute the full rescan workflow.
|
|
|
|
Runs in order:
|
|
1. anime_service.rescan()
|
|
2. auto-download (if enabled)
|
|
3. folder scan (if enabled)
|
|
4. key resolution scan (always, if anime_directory configured)
|
|
|
|
Returns:
|
|
Dict with duration and counts for each step.
|
|
"""
|
|
scan_start = datetime.now(timezone.utc)
|
|
results = {
|
|
"started_at": scan_start.isoformat(),
|
|
"duration_seconds": 0.0,
|
|
"rescan_completed": False,
|
|
"auto_download_queued": 0,
|
|
"folder_scan_completed": False,
|
|
"key_resolution": {"resolved": 0, "skipped": 0, "errors": 0},
|
|
}
|
|
|
|
await self._broadcast(
|
|
"scheduled_rescan_started",
|
|
{"timestamp": scan_start.isoformat()},
|
|
)
|
|
|
|
try:
|
|
# 1. Main library rescan
|
|
await self._run_rescan()
|
|
results["rescan_completed"] = True
|
|
|
|
# 2. Auto-download
|
|
if self._config and self._config.auto_download_after_rescan:
|
|
try:
|
|
queued = await self.run_auto_download()
|
|
results["auto_download_queued"] = queued
|
|
await self._broadcast(
|
|
"auto_download_started", {"queued_count": queued}
|
|
)
|
|
except Exception as exc:
|
|
logger.error("Auto-download failed: %s", exc, exc_info=True)
|
|
await self._broadcast(
|
|
"auto_download_error", {"error": str(exc)}
|
|
)
|
|
|
|
# 3. Folder scan
|
|
if self._config and self._config.folder_scan_enabled:
|
|
try:
|
|
await self.run_folder_scan()
|
|
results["folder_scan_completed"] = True
|
|
except Exception as exc:
|
|
logger.error("Folder scan failed: %s", exc, exc_info=True)
|
|
await self._broadcast("folder_scan_error", {"error": str(exc)})
|
|
|
|
# 4. Key resolution scan (always runs if anime_directory configured)
|
|
try:
|
|
key_stats = await self.run_key_resolution()
|
|
results["key_resolution"] = key_stats
|
|
except Exception as exc:
|
|
logger.error("Key resolution scan failed: %s", exc, exc_info=True)
|
|
|
|
self._last_scan_time = datetime.now(timezone.utc)
|
|
results["duration_seconds"] = (
|
|
self._last_scan_time - scan_start
|
|
).total_seconds()
|
|
|
|
await self._broadcast(
|
|
"scheduled_rescan_completed",
|
|
{
|
|
"timestamp": self._last_scan_time.isoformat(),
|
|
"duration_seconds": results["duration_seconds"],
|
|
},
|
|
)
|
|
|
|
logger.info(
|
|
"Scheduled library rescan completed: duration=%.2fs",
|
|
results["duration_seconds"],
|
|
)
|
|
|
|
except Exception as exc:
|
|
logger.error("Scheduled rescan failed: %s", exc, exc_info=True)
|
|
await self._broadcast(
|
|
"scheduled_rescan_error",
|
|
{"error": str(exc), "timestamp": datetime.now(timezone.utc).isoformat()},
|
|
)
|
|
raise
|
|
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# Private helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _run_rescan(self) -> None:
|
|
"""Run the anime service rescan."""
|
|
from src.server.utils.dependencies import get_anime_service
|
|
|
|
anime_service = get_anime_service()
|
|
logger.info("Anime service obtained, calling anime_service.rescan()...")
|
|
await anime_service.rescan()
|
|
logger.info("anime_service.rescan() completed")
|
|
|
|
async def _broadcast(self, event_type: str, data: dict) -> None:
|
|
"""Broadcast a WebSocket event to all connected clients."""
|
|
try:
|
|
from src.server.services.websocket_service import get_websocket_service
|
|
|
|
ws_service = get_websocket_service()
|
|
await ws_service.manager.broadcast({"type": event_type, "data": data})
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"WebSocket broadcast failed: event=%s error=%s", event_type, exc
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Module-level orchestrator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_orchestrator: Optional[RescanOrchestrator] = None
|
|
|
|
|
|
def get_rescan_orchestrator(
|
|
config: Optional[SchedulerConfig] = None,
|
|
) -> RescanOrchestrator:
|
|
"""Return a RescanOrchestrator singleton (or create with optional config)."""
|
|
global _orchestrator
|
|
if _orchestrator is None or config is not None:
|
|
_orchestrator = RescanOrchestrator(config=config)
|
|
logger.debug("Created new RescanOrchestrator singleton")
|
|
else:
|
|
logger.debug("Returning existing RescanOrchestrator singleton")
|
|
return _orchestrator
|
|
|
|
|
|
def reset_rescan_orchestrator() -> None:
|
|
"""Reset the orchestrator singleton (used in tests)."""
|
|
global _orchestrator
|
|
_orchestrator = None |