Refactor: move RescanService logic inline into SchedulerService

RescanService was thin wrapper. Its logic (rescan, auto-download, folder
scan, WebSocket broadcasts) moved into SchedulerService as private methods.
RescanService and its module deleted.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-06-04 19:32:46 +02:00
parent 13504c3172
commit 09d454d4c0
3 changed files with 147 additions and 288 deletions

View File

@@ -1,261 +0,0 @@
"""Rescan service — orchestrates library rescans.
This service handles the actual scan/rescan logic:
- Library rescan via anime_service
- Auto-download of missing episodes (if enabled)
- Folder maintenance scan (if enabled)
- Orphaned folder key resolution
SchedulerService only calls RescanService.execute() — it does not
know about the internal steps.
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from src.server.models.config import SchedulerConfig
logger = logging.getLogger(__name__)
_AUTO_DOWNLOAD_COOLDOWN_SECONDS = 300 # 5 minutes
class RescanService:
"""Orchestrates all rescan-related operations.
Encapsulates the full post-rescan workflow so SchedulerService
only needs to call a single execute() method.
"""
def __init__(self, config: Optional[SchedulerConfig] = None) -> None:
"""Initialize the rescan service.
Args:
config: Optional scheduler config. If None, operations that depend
on config flags (auto_download, folder_scan) will be skipped.
"""
self._config = config
self._last_scan_time: Optional[datetime] = None
self._last_auto_download_time: Optional[datetime] = None
@property
def last_scan_time(self) -> Optional[datetime]:
return self._last_scan_time
# ------------------------------------------------------------------
# Public entry point
# ------------------------------------------------------------------
async def execute(self) -> dict:
"""Execute the full rescan workflow.
Runs in order:
1. anime_service.rescan()
2. auto-download (if enabled)
3. folder scan (if enabled)
4. key resolution scan (always, if anime_directory configured)
Returns:
Dict with duration and counts for each step.
"""
from src.server.services.websocket_service import get_websocket_service
scan_start = datetime.now(timezone.utc)
results = {
"started_at": scan_start.isoformat(),
"duration_seconds": 0.0,
"rescan_completed": False,
"auto_download_queued": 0,
"folder_scan_completed": False,
"key_resolution": {"resolved": 0, "skipped": 0, "errors": 0},
}
await self._broadcast("scheduled_rescan_started", {"timestamp": scan_start.isoformat()})
try:
# 1. Main library rescan
await self._run_rescan()
results["rescan_completed"] = True
# 2. Auto-download
if self._config and self._config.auto_download_after_rescan:
try:
queued = await self._run_auto_download()
results["auto_download_queued"] = queued
await self._broadcast("auto_download_started", {"queued_count": queued})
except Exception as exc:
logger.error("Auto-download failed: %s", exc, exc_info=True)
await self._broadcast("auto_download_error", {"error": str(exc)})
# 3. Folder scan
if self._config and self._config.folder_scan_enabled:
try:
await self._run_folder_scan()
results["folder_scan_completed"] = True
except Exception as exc:
logger.error("Folder scan failed: %s", exc, exc_info=True)
await self._broadcast("folder_scan_error", {"error": str(exc)})
self._last_scan_time = datetime.now(timezone.utc)
results["duration_seconds"] = (self._last_scan_time - scan_start).total_seconds()
await self._broadcast(
"scheduled_rescan_completed",
{
"timestamp": self._last_scan_time.isoformat(),
"duration_seconds": results["duration_seconds"],
},
)
logger.info(
"Scheduled library rescan completed: duration=%.2fs",
results["duration_seconds"],
)
except Exception as exc:
logger.error("Scheduled rescan failed: %s", exc, exc_info=True)
await self._broadcast(
"scheduled_rescan_error",
{"error": str(exc), "timestamp": datetime.now(timezone.utc).isoformat()},
)
raise
return results
# ------------------------------------------------------------------
# Step 1: Library rescan
# ------------------------------------------------------------------
async def _run_rescan(self) -> None:
"""Run the anime service rescan."""
from src.server.utils.dependencies import get_anime_service
anime_service = get_anime_service()
logger.info("Anime service obtained, calling anime_service.rescan()...")
await anime_service.rescan()
logger.info("anime_service.rescan() completed")
# ------------------------------------------------------------------
# Step 2: Auto-download
# ------------------------------------------------------------------
async def _run_auto_download(self) -> int:
"""Queue and start downloads for all series with missing episodes.
Returns:
Number of episodes queued.
"""
from src.server.models.download import EpisodeIdentifier
from src.server.utils.dependencies import (
get_anime_service,
get_download_service,
)
# Cooldown check to prevent rapid re-triggers
now = datetime.now(timezone.utc)
if self._last_auto_download_time is not None:
elapsed = now - self._last_auto_download_time
if elapsed < timedelta(seconds=_AUTO_DOWNLOAD_COOLDOWN_SECONDS):
logger.debug(
"Auto-download skipped: cooldown active (elapsed=%.1fs cooldown=%ds)",
elapsed.total_seconds(),
_AUTO_DOWNLOAD_COOLDOWN_SECONDS,
)
return 0
anime_service = get_anime_service()
download_service = get_download_service()
series_list = anime_service._cached_list_missing()
queued_count = 0
for series in series_list:
episode_dict: dict = series.get("episodeDict") or {}
if not episode_dict:
continue
episodes: List[EpisodeIdentifier] = []
for season_str, ep_numbers in episode_dict.items():
for ep_num in ep_numbers:
episodes.append(
EpisodeIdentifier(season=int(season_str), episode=int(ep_num))
)
if not episodes:
continue
await download_service.add_to_queue(
serie_id=series.get("key", ""),
serie_folder=series.get("folder", series.get("name", "")),
serie_name=series.get("name", ""),
episodes=episodes,
)
queued_count += len(episodes)
logger.info(
"Auto-download queued episodes for series=%s count=%d",
series.get("key"),
len(episodes),
)
if queued_count:
await download_service.start_queue_processing()
logger.info("Auto-download queue processing started: queued=%d", queued_count)
self._last_auto_download_time = datetime.now(timezone.utc)
logger.info("Auto-download completed: queued_count=%d", queued_count)
return queued_count
# ------------------------------------------------------------------
# Step 3: Folder scan
# ------------------------------------------------------------------
async def _run_folder_scan(self) -> None:
"""Run the folder scan maintenance task."""
from src.server.services.scheduler.folder_scan_service import FolderScanService
folder_scan_service = FolderScanService()
await folder_scan_service.run_folder_scan()
logger.info("Folder scan completed successfully")
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
async def _broadcast(self, event_type: str, data: dict) -> None:
"""Broadcast a WebSocket event to all connected clients."""
try:
from src.server.services.websocket_service import get_websocket_service
ws_service = get_websocket_service()
await ws_service.manager.broadcast({"type": event_type, "data": data})
except Exception as exc:
logger.warning(
"WebSocket broadcast failed: event=%s error=%s", event_type, exc
)
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
_rescan_service: Optional[RescanService] = None
def get_rescan_service(config: Optional[SchedulerConfig] = None) -> RescanService:
"""Return a RescanService singleton (or create with optional config)."""
global _rescan_service
if _rescan_service is None or config is not None:
_rescan_service = RescanService(config=config)
logger.debug("Created new RescanService singleton")
else:
logger.debug("Returning existing RescanService singleton")
return _rescan_service
def reset_rescan_service() -> None:
"""Reset the singleton (used in tests)."""
global _rescan_service
_rescan_service = None

View File

@@ -1,17 +1,11 @@
"""Scheduler services package. """Scheduler services package.
Contains scheduler orchestration and rescan coordination: Contains scheduler orchestration:
- scheduler_service: Cron-based scheduler using APScheduler - scheduler_service: Cron-based scheduler using APScheduler
""" """
from __future__ import annotations from __future__ import annotations
from src.server.services.rescan_service import (
RescanService,
get_rescan_service,
reset_rescan_service,
)
from src.server.services.scheduler.scheduler_service import ( from src.server.services.scheduler.scheduler_service import (
SchedulerService, SchedulerService,
SchedulerServiceError, SchedulerServiceError,
@@ -20,10 +14,6 @@ from src.server.services.scheduler.scheduler_service import (
) )
__all__ = [ __all__ = [
# RescanService
"RescanService",
"get_rescan_service",
"reset_rescan_service",
# Scheduler # Scheduler
"SchedulerService", "SchedulerService",
"SchedulerServiceError", "SchedulerServiceError",

View File

@@ -6,14 +6,12 @@ cron-based scheduling.
Jobs are held in memory (no separate scheduler database). On startup, Jobs are held in memory (no separate scheduler database). On startup,
if the last scan timestamp indicates a missed run (server was down at the if the last scan timestamp indicates a missed run (server was down at the
scheduled cron time), a rescan is triggered immediately. scheduled cron time), a rescan is triggered immediately.
Actual rescan logic is delegated to RescanService.
""" """
from __future__ import annotations from __future__ import annotations
import logging import logging
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Optional from typing import List, Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.cron import CronTrigger
@@ -29,6 +27,8 @@ _JOB_ID = "scheduled_rescan"
# scheduled time and startup). # scheduled time and startup).
_MISFIRE_GRACE_SECONDS = 3600 _MISFIRE_GRACE_SECONDS = 3600
_AUTO_DOWNLOAD_COOLDOWN_SECONDS = 300 # 5 minutes
class SchedulerServiceError(Exception): class SchedulerServiceError(Exception):
"""Service-level exception for scheduler operations.""" """Service-level exception for scheduler operations."""
@@ -44,8 +44,7 @@ class SchedulerService:
- Immediate manual trigger - Immediate manual trigger
- Live config reloading without app restart - Live config reloading without app restart
Actual rescan/folder-scan/auto-download work is delegated to Actual rescan/folder-scan/auto-download work is handled inline.
RescanService.
""" """
def __init__(self) -> None: def __init__(self) -> None:
@@ -54,6 +53,8 @@ class SchedulerService:
self._scheduler: Optional[AsyncIOScheduler] = None self._scheduler: Optional[AsyncIOScheduler] = None
self._config: Optional[SchedulerConfig] = None self._config: Optional[SchedulerConfig] = None
self._scan_in_progress: bool = False self._scan_in_progress: bool = False
self._last_scan_time: Optional[datetime] = None
self._last_auto_download_time: Optional[datetime] = None
logger.info("SchedulerService initialised") logger.info("SchedulerService initialised")
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@@ -247,10 +248,6 @@ class SchedulerService:
Returns: Returns:
Dict containing scheduler state and config fields. Dict containing scheduler state and config fields.
""" """
from src.server.services.rescan_service import get_rescan_service
rescan_service = get_rescan_service()
next_run: Optional[str] = None next_run: Optional[str] = None
if self._scheduler and self._scheduler.running: if self._scheduler and self._scheduler.running:
job = self._scheduler.get_job(_JOB_ID) job = self._scheduler.get_job(_JOB_ID)
@@ -270,8 +267,8 @@ class SchedulerService:
self._config.folder_scan_enabled if self._config else False self._config.folder_scan_enabled if self._config else False
), ),
"last_run": ( "last_run": (
rescan_service.last_scan_time.isoformat() self._last_scan_time.isoformat()
if rescan_service.last_scan_time if self._last_scan_time
else None else None
), ),
"next_run": next_run, "next_run": next_run,
@@ -365,9 +362,7 @@ class SchedulerService:
logger.warning("Missed-run check failed (non-fatal): %s", exc) logger.warning("Missed-run check failed (non-fatal): %s", exc)
async def _perform_rescan(self) -> None: async def _perform_rescan(self) -> None:
"""Execute a library rescan via RescanService.""" """Execute a library rescan with auto-download and folder scan."""
from src.server.services.rescan_service import get_rescan_service
logger.info( logger.info(
"Scheduler _perform_rescan entered: scan_in_progress=%s", "Scheduler _perform_rescan entered: scan_in_progress=%s",
self._scan_in_progress, self._scan_in_progress,
@@ -377,13 +372,148 @@ class SchedulerService:
return return
self._scan_in_progress = True self._scan_in_progress = True
scan_start = datetime.now(timezone.utc)
try: try:
rescan_service = get_rescan_service(config=self._config) await self._broadcast("scheduled_rescan_started", {"timestamp": scan_start.isoformat()})
await rescan_service.execute()
# 1. Main library rescan
await self._run_rescan()
# 2. Auto-download (if enabled)
if self._config and self._config.auto_download_after_rescan:
try:
queued = await self._run_auto_download()
await self._broadcast("auto_download_started", {"queued_count": queued})
except Exception as exc:
logger.error("Auto-download failed: %s", exc, exc_info=True)
await self._broadcast("auto_download_error", {"error": str(exc)})
# 3. Folder scan (if enabled)
if self._config and self._config.folder_scan_enabled:
try:
await self._run_folder_scan()
except Exception as exc:
logger.error("Folder scan failed: %s", exc, exc_info=True)
await self._broadcast("folder_scan_error", {"error": str(exc)})
self._last_scan_time = datetime.now(timezone.utc)
duration = (self._last_scan_time - scan_start).total_seconds()
await self._broadcast(
"scheduled_rescan_completed",
{
"timestamp": self._last_scan_time.isoformat(),
"duration_seconds": duration,
},
)
logger.info(
"Scheduled library rescan completed: duration=%.2fs",
duration,
)
except Exception as exc:
logger.error("Scheduled rescan failed: %s", exc, exc_info=True)
await self._broadcast(
"scheduled_rescan_error",
{"error": str(exc), "timestamp": datetime.now(timezone.utc).isoformat()},
)
raise
finally: finally:
self._scan_in_progress = False self._scan_in_progress = False
logger.info("Scheduled rescan finished: scan_in_progress reset to False") logger.info("Scheduled rescan finished: scan_in_progress reset to False")
async def _run_rescan(self) -> None:
"""Run the anime service rescan."""
from src.server.utils.dependencies import get_anime_service
anime_service = get_anime_service()
logger.info("Anime service obtained, calling anime_service.rescan()...")
await anime_service.rescan()
logger.info("anime_service.rescan() completed")
async def _run_auto_download(self) -> int:
"""Queue and start downloads for all series with missing episodes."""
from src.server.models.download import EpisodeIdentifier
from src.server.utils.dependencies import (
get_anime_service,
get_download_service,
)
# Cooldown check
now = datetime.now(timezone.utc)
if self._last_auto_download_time is not None:
elapsed = now - self._last_auto_download_time
if elapsed < timedelta(seconds=_AUTO_DOWNLOAD_COOLDOWN_SECONDS):
logger.debug(
"Auto-download skipped: cooldown active (elapsed=%.1fs cooldown=%ds)",
elapsed.total_seconds(),
_AUTO_DOWNLOAD_COOLDOWN_SECONDS,
)
return 0
anime_service = get_anime_service()
download_service = get_download_service()
series_list = anime_service._cached_list_missing()
queued_count = 0
for series in series_list:
episode_dict: dict = series.get("episodeDict") or {}
if not episode_dict:
continue
episodes: List[EpisodeIdentifier] = []
for season_str, ep_numbers in episode_dict.items():
for ep_num in ep_numbers:
episodes.append(
EpisodeIdentifier(season=int(season_str), episode=int(ep_num))
)
if not episodes:
continue
await download_service.add_to_queue(
serie_id=series.get("key", ""),
serie_folder=series.get("folder", series.get("name", "")),
serie_name=series.get("name", ""),
episodes=episodes,
)
queued_count += len(episodes)
logger.info(
"Auto-download queued episodes for series=%s count=%d",
series.get("key"),
len(episodes),
)
if queued_count:
await download_service.start_queue_processing()
logger.info("Auto-download queue processing started: queued=%d", queued_count)
self._last_auto_download_time = datetime.now(timezone.utc)
logger.info("Auto-download completed: queued_count=%d", queued_count)
return queued_count
async def _run_folder_scan(self) -> None:
"""Run the folder scan maintenance task."""
from src.server.services.scheduler.folder_scan_service import FolderScanService
folder_scan_service = FolderScanService()
await folder_scan_service.run_folder_scan()
logger.info("Folder scan completed successfully")
async def _broadcast(self, event_type: str, data: dict) -> None:
"""Broadcast a WebSocket event to all connected clients."""
try:
from src.server.services.websocket_service import get_websocket_service
ws_service = get_websocket_service()
await ws_service.manager.broadcast({"type": event_type, "data": data})
except Exception as exc:
logger.warning(
"WebSocket broadcast failed: event=%s error=%s", event_type, exc
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Module-level job runner # Module-level job runner