From 09d454d4c04d64eb2b5e8ed2fff56b1b3f8347b5 Mon Sep 17 00:00:00 2001 From: Lukas Date: Thu, 4 Jun 2026 19:32:46 +0200 Subject: [PATCH] Refactor: move RescanService logic inline into SchedulerService RescanService was thin wrapper. Its logic (rescan, auto-download, folder scan, WebSocket broadcasts) moved into SchedulerService as private methods. RescanService and its module deleted. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/server/services/rescan_service.py | 261 ------------------ src/server/services/scheduler/__init__.py | 12 +- .../services/scheduler/scheduler_service.py | 162 +++++++++-- 3 files changed, 147 insertions(+), 288 deletions(-) delete mode 100644 src/server/services/rescan_service.py diff --git a/src/server/services/rescan_service.py b/src/server/services/rescan_service.py deleted file mode 100644 index 25a6450..0000000 --- a/src/server/services/rescan_service.py +++ /dev/null @@ -1,261 +0,0 @@ -"""Rescan service — orchestrates library rescans. - -This service handles the actual scan/rescan logic: - -- Library rescan via anime_service -- Auto-download of missing episodes (if enabled) -- Folder maintenance scan (if enabled) -- Orphaned folder key resolution - -SchedulerService only calls RescanService.execute() — it does not -know about the internal steps. -""" -from __future__ import annotations - -import logging -from datetime import datetime, timedelta, timezone -from typing import List, Optional - -from src.server.models.config import SchedulerConfig - -logger = logging.getLogger(__name__) - -_AUTO_DOWNLOAD_COOLDOWN_SECONDS = 300 # 5 minutes - - -class RescanService: - """Orchestrates all rescan-related operations. - - Encapsulates the full post-rescan workflow so SchedulerService - only needs to call a single execute() method. - """ - - def __init__(self, config: Optional[SchedulerConfig] = None) -> None: - """Initialize the rescan service. - - Args: - config: Optional scheduler config. If None, operations that depend - on config flags (auto_download, folder_scan) will be skipped. - """ - self._config = config - self._last_scan_time: Optional[datetime] = None - self._last_auto_download_time: Optional[datetime] = None - - @property - def last_scan_time(self) -> Optional[datetime]: - return self._last_scan_time - - # ------------------------------------------------------------------ - # Public entry point - # ------------------------------------------------------------------ - - async def execute(self) -> dict: - """Execute the full rescan workflow. - - Runs in order: - 1. anime_service.rescan() - 2. auto-download (if enabled) - 3. folder scan (if enabled) - 4. key resolution scan (always, if anime_directory configured) - - Returns: - Dict with duration and counts for each step. - """ - from src.server.services.websocket_service import get_websocket_service - - scan_start = datetime.now(timezone.utc) - results = { - "started_at": scan_start.isoformat(), - "duration_seconds": 0.0, - "rescan_completed": False, - "auto_download_queued": 0, - "folder_scan_completed": False, - "key_resolution": {"resolved": 0, "skipped": 0, "errors": 0}, - } - - await self._broadcast("scheduled_rescan_started", {"timestamp": scan_start.isoformat()}) - - try: - # 1. Main library rescan - await self._run_rescan() - results["rescan_completed"] = True - - # 2. Auto-download - if self._config and self._config.auto_download_after_rescan: - try: - queued = await self._run_auto_download() - results["auto_download_queued"] = queued - await self._broadcast("auto_download_started", {"queued_count": queued}) - except Exception as exc: - logger.error("Auto-download failed: %s", exc, exc_info=True) - await self._broadcast("auto_download_error", {"error": str(exc)}) - - # 3. Folder scan - if self._config and self._config.folder_scan_enabled: - try: - await self._run_folder_scan() - results["folder_scan_completed"] = True - except Exception as exc: - logger.error("Folder scan failed: %s", exc, exc_info=True) - await self._broadcast("folder_scan_error", {"error": str(exc)}) - - self._last_scan_time = datetime.now(timezone.utc) - results["duration_seconds"] = (self._last_scan_time - scan_start).total_seconds() - - await self._broadcast( - "scheduled_rescan_completed", - { - "timestamp": self._last_scan_time.isoformat(), - "duration_seconds": results["duration_seconds"], - }, - ) - - logger.info( - "Scheduled library rescan completed: duration=%.2fs", - results["duration_seconds"], - ) - - except Exception as exc: - logger.error("Scheduled rescan failed: %s", exc, exc_info=True) - await self._broadcast( - "scheduled_rescan_error", - {"error": str(exc), "timestamp": datetime.now(timezone.utc).isoformat()}, - ) - raise - - return results - - # ------------------------------------------------------------------ - # Step 1: Library rescan - # ------------------------------------------------------------------ - - async def _run_rescan(self) -> None: - """Run the anime service rescan.""" - from src.server.utils.dependencies import get_anime_service - - anime_service = get_anime_service() - logger.info("Anime service obtained, calling anime_service.rescan()...") - await anime_service.rescan() - logger.info("anime_service.rescan() completed") - - # ------------------------------------------------------------------ - # Step 2: Auto-download - # ------------------------------------------------------------------ - - async def _run_auto_download(self) -> int: - """Queue and start downloads for all series with missing episodes. - - Returns: - Number of episodes queued. - """ - from src.server.models.download import EpisodeIdentifier - from src.server.utils.dependencies import ( - get_anime_service, - get_download_service, - ) - - # Cooldown check to prevent rapid re-triggers - now = datetime.now(timezone.utc) - if self._last_auto_download_time is not None: - elapsed = now - self._last_auto_download_time - if elapsed < timedelta(seconds=_AUTO_DOWNLOAD_COOLDOWN_SECONDS): - logger.debug( - "Auto-download skipped: cooldown active (elapsed=%.1fs cooldown=%ds)", - elapsed.total_seconds(), - _AUTO_DOWNLOAD_COOLDOWN_SECONDS, - ) - return 0 - - anime_service = get_anime_service() - download_service = get_download_service() - - series_list = anime_service._cached_list_missing() - queued_count = 0 - - for series in series_list: - episode_dict: dict = series.get("episodeDict") or {} - if not episode_dict: - continue - - episodes: List[EpisodeIdentifier] = [] - for season_str, ep_numbers in episode_dict.items(): - for ep_num in ep_numbers: - episodes.append( - EpisodeIdentifier(season=int(season_str), episode=int(ep_num)) - ) - - if not episodes: - continue - - await download_service.add_to_queue( - serie_id=series.get("key", ""), - serie_folder=series.get("folder", series.get("name", "")), - serie_name=series.get("name", ""), - episodes=episodes, - ) - queued_count += len(episodes) - logger.info( - "Auto-download queued episodes for series=%s count=%d", - series.get("key"), - len(episodes), - ) - - if queued_count: - await download_service.start_queue_processing() - logger.info("Auto-download queue processing started: queued=%d", queued_count) - - self._last_auto_download_time = datetime.now(timezone.utc) - logger.info("Auto-download completed: queued_count=%d", queued_count) - return queued_count - - # ------------------------------------------------------------------ - # Step 3: Folder scan - # ------------------------------------------------------------------ - - async def _run_folder_scan(self) -> None: - """Run the folder scan maintenance task.""" - from src.server.services.scheduler.folder_scan_service import FolderScanService - - folder_scan_service = FolderScanService() - await folder_scan_service.run_folder_scan() - logger.info("Folder scan completed successfully") - - # ------------------------------------------------------------------ - # Private helpers - # ------------------------------------------------------------------ - - async def _broadcast(self, event_type: str, data: dict) -> None: - """Broadcast a WebSocket event to all connected clients.""" - try: - from src.server.services.websocket_service import get_websocket_service - - ws_service = get_websocket_service() - await ws_service.manager.broadcast({"type": event_type, "data": data}) - except Exception as exc: - logger.warning( - "WebSocket broadcast failed: event=%s error=%s", event_type, exc - ) - - -# --------------------------------------------------------------------------- -# Module-level singleton -# --------------------------------------------------------------------------- - -_rescan_service: Optional[RescanService] = None - - -def get_rescan_service(config: Optional[SchedulerConfig] = None) -> RescanService: - """Return a RescanService singleton (or create with optional config).""" - global _rescan_service - if _rescan_service is None or config is not None: - _rescan_service = RescanService(config=config) - logger.debug("Created new RescanService singleton") - else: - logger.debug("Returning existing RescanService singleton") - return _rescan_service - - -def reset_rescan_service() -> None: - """Reset the singleton (used in tests).""" - global _rescan_service - _rescan_service = None \ No newline at end of file diff --git a/src/server/services/scheduler/__init__.py b/src/server/services/scheduler/__init__.py index a85487c..09f3184 100644 --- a/src/server/services/scheduler/__init__.py +++ b/src/server/services/scheduler/__init__.py @@ -1,17 +1,11 @@ """Scheduler services package. -Contains scheduler orchestration and rescan coordination: +Contains scheduler orchestration: - scheduler_service: Cron-based scheduler using APScheduler """ from __future__ import annotations -from src.server.services.rescan_service import ( - RescanService, - get_rescan_service, - reset_rescan_service, -) - from src.server.services.scheduler.scheduler_service import ( SchedulerService, SchedulerServiceError, @@ -20,10 +14,6 @@ from src.server.services.scheduler.scheduler_service import ( ) __all__ = [ - # RescanService - "RescanService", - "get_rescan_service", - "reset_rescan_service", # Scheduler "SchedulerService", "SchedulerServiceError", diff --git a/src/server/services/scheduler/scheduler_service.py b/src/server/services/scheduler/scheduler_service.py index 02b9646..302cca7 100644 --- a/src/server/services/scheduler/scheduler_service.py +++ b/src/server/services/scheduler/scheduler_service.py @@ -6,14 +6,12 @@ cron-based scheduling. Jobs are held in memory (no separate scheduler database). On startup, if the last scan timestamp indicates a missed run (server was down at the scheduled cron time), a rescan is triggered immediately. - -Actual rescan logic is delegated to RescanService. """ from __future__ import annotations import logging from datetime import datetime, timedelta, timezone -from typing import Optional +from typing import List, Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger @@ -29,6 +27,8 @@ _JOB_ID = "scheduled_rescan" # scheduled time and startup). _MISFIRE_GRACE_SECONDS = 3600 +_AUTO_DOWNLOAD_COOLDOWN_SECONDS = 300 # 5 minutes + class SchedulerServiceError(Exception): """Service-level exception for scheduler operations.""" @@ -44,8 +44,7 @@ class SchedulerService: - Immediate manual trigger - Live config reloading without app restart - Actual rescan/folder-scan/auto-download work is delegated to - RescanService. + Actual rescan/folder-scan/auto-download work is handled inline. """ def __init__(self) -> None: @@ -54,6 +53,8 @@ class SchedulerService: self._scheduler: Optional[AsyncIOScheduler] = None self._config: Optional[SchedulerConfig] = None self._scan_in_progress: bool = False + self._last_scan_time: Optional[datetime] = None + self._last_auto_download_time: Optional[datetime] = None logger.info("SchedulerService initialised") # ------------------------------------------------------------------ @@ -247,10 +248,6 @@ class SchedulerService: Returns: Dict containing scheduler state and config fields. """ - from src.server.services.rescan_service import get_rescan_service - - rescan_service = get_rescan_service() - next_run: Optional[str] = None if self._scheduler and self._scheduler.running: job = self._scheduler.get_job(_JOB_ID) @@ -270,8 +267,8 @@ class SchedulerService: self._config.folder_scan_enabled if self._config else False ), "last_run": ( - rescan_service.last_scan_time.isoformat() - if rescan_service.last_scan_time + self._last_scan_time.isoformat() + if self._last_scan_time else None ), "next_run": next_run, @@ -365,9 +362,7 @@ class SchedulerService: logger.warning("Missed-run check failed (non-fatal): %s", exc) async def _perform_rescan(self) -> None: - """Execute a library rescan via RescanService.""" - from src.server.services.rescan_service import get_rescan_service - + """Execute a library rescan with auto-download and folder scan.""" logger.info( "Scheduler _perform_rescan entered: scan_in_progress=%s", self._scan_in_progress, @@ -377,13 +372,148 @@ class SchedulerService: return self._scan_in_progress = True + scan_start = datetime.now(timezone.utc) + try: - rescan_service = get_rescan_service(config=self._config) - await rescan_service.execute() + await self._broadcast("scheduled_rescan_started", {"timestamp": scan_start.isoformat()}) + + # 1. Main library rescan + await self._run_rescan() + + # 2. Auto-download (if enabled) + if self._config and self._config.auto_download_after_rescan: + try: + queued = await self._run_auto_download() + await self._broadcast("auto_download_started", {"queued_count": queued}) + except Exception as exc: + logger.error("Auto-download failed: %s", exc, exc_info=True) + await self._broadcast("auto_download_error", {"error": str(exc)}) + + # 3. Folder scan (if enabled) + if self._config and self._config.folder_scan_enabled: + try: + await self._run_folder_scan() + except Exception as exc: + logger.error("Folder scan failed: %s", exc, exc_info=True) + await self._broadcast("folder_scan_error", {"error": str(exc)}) + + self._last_scan_time = datetime.now(timezone.utc) + duration = (self._last_scan_time - scan_start).total_seconds() + + await self._broadcast( + "scheduled_rescan_completed", + { + "timestamp": self._last_scan_time.isoformat(), + "duration_seconds": duration, + }, + ) + logger.info( + "Scheduled library rescan completed: duration=%.2fs", + duration, + ) + + except Exception as exc: + logger.error("Scheduled rescan failed: %s", exc, exc_info=True) + await self._broadcast( + "scheduled_rescan_error", + {"error": str(exc), "timestamp": datetime.now(timezone.utc).isoformat()}, + ) + raise finally: self._scan_in_progress = False logger.info("Scheduled rescan finished: scan_in_progress reset to False") + async def _run_rescan(self) -> None: + """Run the anime service rescan.""" + from src.server.utils.dependencies import get_anime_service + + anime_service = get_anime_service() + logger.info("Anime service obtained, calling anime_service.rescan()...") + await anime_service.rescan() + logger.info("anime_service.rescan() completed") + + async def _run_auto_download(self) -> int: + """Queue and start downloads for all series with missing episodes.""" + from src.server.models.download import EpisodeIdentifier + from src.server.utils.dependencies import ( + get_anime_service, + get_download_service, + ) + + # Cooldown check + now = datetime.now(timezone.utc) + if self._last_auto_download_time is not None: + elapsed = now - self._last_auto_download_time + if elapsed < timedelta(seconds=_AUTO_DOWNLOAD_COOLDOWN_SECONDS): + logger.debug( + "Auto-download skipped: cooldown active (elapsed=%.1fs cooldown=%ds)", + elapsed.total_seconds(), + _AUTO_DOWNLOAD_COOLDOWN_SECONDS, + ) + return 0 + + anime_service = get_anime_service() + download_service = get_download_service() + + series_list = anime_service._cached_list_missing() + queued_count = 0 + + for series in series_list: + episode_dict: dict = series.get("episodeDict") or {} + if not episode_dict: + continue + + episodes: List[EpisodeIdentifier] = [] + for season_str, ep_numbers in episode_dict.items(): + for ep_num in ep_numbers: + episodes.append( + EpisodeIdentifier(season=int(season_str), episode=int(ep_num)) + ) + + if not episodes: + continue + + await download_service.add_to_queue( + serie_id=series.get("key", ""), + serie_folder=series.get("folder", series.get("name", "")), + serie_name=series.get("name", ""), + episodes=episodes, + ) + queued_count += len(episodes) + logger.info( + "Auto-download queued episodes for series=%s count=%d", + series.get("key"), + len(episodes), + ) + + if queued_count: + await download_service.start_queue_processing() + logger.info("Auto-download queue processing started: queued=%d", queued_count) + + self._last_auto_download_time = datetime.now(timezone.utc) + logger.info("Auto-download completed: queued_count=%d", queued_count) + return queued_count + + async def _run_folder_scan(self) -> None: + """Run the folder scan maintenance task.""" + from src.server.services.scheduler.folder_scan_service import FolderScanService + + folder_scan_service = FolderScanService() + await folder_scan_service.run_folder_scan() + logger.info("Folder scan completed successfully") + + async def _broadcast(self, event_type: str, data: dict) -> None: + """Broadcast a WebSocket event to all connected clients.""" + try: + from src.server.services.websocket_service import get_websocket_service + + ws_service = get_websocket_service() + await ws_service.manager.broadcast({"type": event_type, "data": data}) + except Exception as exc: + logger.warning( + "WebSocket broadcast failed: event=%s error=%s", event_type, exc + ) + # --------------------------------------------------------------------------- # Module-level job runner