"""Scheduler service for automatic library rescans. Uses APScheduler's AsyncIOScheduler with CronTrigger for precise cron-based scheduling. The legacy interval-based loop has been removed in favour of the cron approach. Jobs are held in memory (no separate scheduler database). On startup, if the last scan timestamp indicates a missed run (server was down at the scheduled cron time), a rescan is triggered immediately. """ from __future__ import annotations import logging from datetime import datetime, timedelta, timezone from typing import List, Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from src.server.models.config import SchedulerConfig from src.server.services.config_service import ConfigServiceError, get_config_service logger = logging.getLogger(__name__) _JOB_ID = "scheduled_rescan" # Grace period for missed jobs (1 hour — handles server downtime between # scheduled time and startup). _MISFIRE_GRACE_SECONDS = 3600 class SchedulerServiceError(Exception): """Service-level exception for scheduler operations.""" class SchedulerService: """Manages automatic library rescans on a cron-based schedule. Uses APScheduler's AsyncIOScheduler so scheduling integrates cleanly with the running asyncio event loop. Supports: - Cron-based scheduling (time of day + days of week) - Immediate manual trigger - Live config reloading without app restart - Auto-queueing downloads of missing episodes after rescan """ def __init__(self) -> None: """Initialise the scheduler service.""" self._is_running: bool = False self._scheduler: Optional[AsyncIOScheduler] = None self._config: Optional[SchedulerConfig] = None self._last_scan_time: Optional[datetime] = None self._scan_in_progress: bool = False # Cooldown tracking for auto-download to prevent rapid re-triggers self._last_auto_download_time: Optional[datetime] = None self._auto_download_cooldown_seconds: int = 300 # 5 minutes default logger.info("SchedulerService initialised") # ------------------------------------------------------------------ # Public lifecycle methods # ------------------------------------------------------------------ async def start(self) -> None: """Start the APScheduler with the configured cron trigger. Raises: SchedulerServiceError: If the scheduler is already running or config cannot be loaded. """ logger.info("SchedulerService.start() called") if self._is_running: logger.warning("Scheduler start called but already running") raise SchedulerServiceError("Scheduler is already running") try: config_service = get_config_service() config = config_service.load_config() self._config = config.scheduler logger.info("Scheduler config loaded successfully") except ConfigServiceError as exc: logger.error("Failed to load scheduler configuration: %s", exc) raise SchedulerServiceError(f"Failed to load config: {exc}") from exc # Use in-memory job store — no separate scheduler.db needed. # Jobs are reconstructed from config on every startup. self._scheduler = AsyncIOScheduler() if not self._config.enabled: logger.info("Scheduler is disabled in configuration — not adding jobs") self._is_running = True return logger.info( "Scheduler config loaded: enabled=%s time=%s days=%s auto_download=%s folder_scan=%s", self._config.enabled, self._config.schedule_time, self._config.schedule_days, self._config.auto_download_after_rescan, self._config.folder_scan_enabled, ) trigger = self._build_cron_trigger() if trigger is None: logger.warning( "schedule_days is empty — scheduler started but no job scheduled" ) else: self._scheduler.add_job( _run_rescan_job, trigger=trigger, id=_JOB_ID, replace_existing=True, misfire_grace_time=_MISFIRE_GRACE_SECONDS, coalesce=True, ) logger.info( "Scheduler started with cron trigger: time=%s days=%s", self._config.schedule_time, self._config.schedule_days, ) self._scheduler.start() self._is_running = True # Log next scheduled run for visibility. job = self._scheduler.get_job(_JOB_ID) if job: next_run = job.next_run_time logger.info( "Scheduler next run: %s", next_run.isoformat() if next_run else None, ) # Startup misfire recovery: check if the last scan was missed while # the server was down. If overdue by more than one interval but within # the grace period, trigger an immediate rescan. await self._check_missed_run() async def stop(self) -> None: """Stop the APScheduler gracefully.""" logger.info("SchedulerService.stop() called") if not self._is_running: logger.debug("Scheduler stop called but not running") return if self._scheduler and self._scheduler.running: self._scheduler.shutdown(wait=False) logger.info("Scheduler stopped") else: logger.info("Scheduler stop: scheduler was not running") self._is_running = False logger.info("SchedulerService stopped successfully") async def ensure_started(self) -> None: """Ensure the scheduler is running (idempotent). If already running, returns immediately. Otherwise, starts the scheduler. This method is safe to call multiple times and from multiple callers. Raises: SchedulerServiceError: If startup fails (except for already running). """ if self._is_running: logger.debug("Scheduler ensure_started called but already running") return logger.info("Scheduler ensure_started: starting scheduler") await self.start() async def trigger_rescan(self) -> bool: """Manually trigger a library rescan. Returns: True if rescan was started; False if a scan is already running. Raises: SchedulerServiceError: If the scheduler service is not started. """ if not self._is_running: raise SchedulerServiceError("Scheduler is not running") if self._scan_in_progress: logger.warning("Cannot trigger rescan: scan already in progress") return False logger.info("Manual rescan triggered") await self._perform_rescan() return True def reload_config(self, config: SchedulerConfig) -> None: """Apply a new SchedulerConfig immediately. If the scheduler is already running the job is rescheduled (or removed) without stopping the scheduler. Args: config: New scheduler configuration to apply. """ self._config = config logger.info( "Scheduler config reloaded: enabled=%s time=%s days=%s auto_download=%s folder_scan=%s", config.enabled, config.schedule_time, config.schedule_days, config.auto_download_after_rescan, config.folder_scan_enabled, ) if not self._scheduler or not self._scheduler.running: return if not config.enabled: if self._scheduler.get_job(_JOB_ID): self._scheduler.remove_job(_JOB_ID) logger.info("Scheduler job removed (disabled)") return trigger = self._build_cron_trigger() if trigger is None: if self._scheduler.get_job(_JOB_ID): self._scheduler.remove_job(_JOB_ID) logger.warning("Scheduler job removed — schedule_days is empty") else: if self._scheduler.get_job(_JOB_ID): self._scheduler.reschedule_job(_JOB_ID, trigger=trigger) logger.info( "Scheduler rescheduled with cron trigger: time=%s days=%s", config.schedule_time, config.schedule_days, ) else: self._scheduler.add_job( _run_rescan_job, trigger=trigger, id=_JOB_ID, replace_existing=True, misfire_grace_time=_MISFIRE_GRACE_SECONDS, coalesce=True, ) logger.info( "Scheduler job added with cron trigger: time=%s days=%s", config.schedule_time, config.schedule_days, ) def get_status(self) -> dict: """Return current scheduler status including cron configuration. Returns: Dict containing scheduler state and config fields. """ next_run: Optional[str] = None if self._scheduler and self._scheduler.running: job = self._scheduler.get_job(_JOB_ID) if job and job.next_run_time: next_run = job.next_run_time.isoformat() return { "is_running": self._is_running, "enabled": self._config.enabled if self._config else False, "interval_minutes": self._config.interval_minutes if self._config else None, "schedule_time": self._config.schedule_time if self._config else None, "schedule_days": self._config.schedule_days if self._config else [], "auto_download_after_rescan": ( self._config.auto_download_after_rescan if self._config else False ), "folder_scan_enabled": ( self._config.folder_scan_enabled if self._config else False ), "last_run": self._last_scan_time.isoformat() if self._last_scan_time else None, "next_run": next_run, "scan_in_progress": self._scan_in_progress, } # ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ def _build_cron_trigger(self) -> Optional[CronTrigger]: """Convert config fields into an APScheduler CronTrigger. Returns: CronTrigger instance or None if schedule_days is empty. """ if not self._config or not self._config.schedule_days: return None hour_str, minute_str = self._config.schedule_time.split(":") day_of_week = ",".join(self._config.schedule_days) trigger = CronTrigger( hour=int(hour_str), minute=int(minute_str), day_of_week=day_of_week, ) logger.debug( "CronTrigger built: hour=%s minute=%s day_of_week=%s", hour_str, minute_str, day_of_week, ) return trigger async def _check_missed_run(self) -> None: """Check if a scheduled rescan was missed while the server was down. Compares system_settings.last_scan_timestamp against the expected schedule. If the last scan is overdue (more than 24h ago for a daily schedule) but within the grace period, triggers an immediate rescan. """ if not self._config or not self._config.enabled: return if not self._config.schedule_days: return try: from src.server.database.connection import ( # noqa: PLC0415 get_db_session, ) from src.server.database.system_settings_service import ( # noqa: PLC0415 SystemSettingsService, ) async with get_db_session() as db: settings = await SystemSettingsService.get_or_create(db) last_scan = settings.last_scan_timestamp if last_scan is None: # Never scanned before — trigger immediately logger.info("No previous scan recorded — triggering immediate rescan") await self._perform_rescan() return # Ensure timezone-aware comparison if last_scan.tzinfo is None: last_scan = last_scan.replace(tzinfo=timezone.utc) now = datetime.now(timezone.utc) elapsed = now - last_scan # If last scan was more than 24h + grace period ago, don't trigger # (avoids surprise rescans after long downtime). max_overdue = timedelta(hours=24, seconds=_MISFIRE_GRACE_SECONDS) # If last scan was more than ~25h ago, skip (too stale) if elapsed > max_overdue: logger.info( "Last scan was %s ago (> %s) — skipping missed-run recovery", elapsed, max_overdue, ) return # Check if a run should have happened between last_scan and now. # Simple heuristic: if elapsed > 24h, we missed at least one daily run. if elapsed > timedelta(hours=23): logger.info( "Missed scheduled rescan detected (last scan %s ago) — triggering now", elapsed, ) await self._perform_rescan() except Exception as exc: # pylint: disable=broad-exception-caught logger.warning("Missed-run check failed (non-fatal): %s", exc) async def _broadcast(self, event_type: str, data: dict) -> None: """Broadcast a WebSocket event to all connected clients.""" try: from src.server.services.websocket_service import ( # noqa: PLC0415 get_websocket_service, ) ws_service = get_websocket_service() await ws_service.manager.broadcast({"type": event_type, "data": data}) except Exception as exc: # pylint: disable=broad-exception-caught logger.warning("WebSocket broadcast failed: event=%s error=%s", event_type, exc) async def _auto_download_missing(self) -> None: """Queue and start downloads for all series with missing episodes.""" from datetime import timedelta # noqa: PLC0415 from src.server.models.download import EpisodeIdentifier # noqa: PLC0415 from src.server.utils.dependencies import ( # noqa: PLC0415 get_anime_service, get_download_service, ) # Check cooldown to prevent rapid re-triggers now = datetime.now(timezone.utc) if self._last_auto_download_time is not None: elapsed = now - self._last_auto_download_time if elapsed < timedelta(seconds=self._auto_download_cooldown_seconds): logger.debug( "Auto-download skipped: cooldown active (elapsed=%.1fs cooldown=%ds)", elapsed.total_seconds(), self._auto_download_cooldown_seconds, ) return anime_service = get_anime_service() download_service = get_download_service() series_list = anime_service._cached_list_missing() queued_count = 0 for series in series_list: episode_dict: dict = series.get("episodeDict") or {} if not episode_dict: continue episodes: List[EpisodeIdentifier] = [] for season_str, ep_numbers in episode_dict.items(): for ep_num in ep_numbers: episodes.append( EpisodeIdentifier(season=int(season_str), episode=int(ep_num)) ) if not episodes: continue await download_service.add_to_queue( serie_id=series.get("key", ""), serie_folder=series.get("folder", series.get("name", "")), serie_name=series.get("name", ""), episodes=episodes, ) queued_count += len(episodes) logger.info( "Auto-download queued episodes for series=%s count=%d", series.get("key"), len(episodes), ) if queued_count: await download_service.start_queue_processing() logger.info("Auto-download queue processing started: queued=%d", queued_count) await self._broadcast("auto_download_started", {"queued_count": queued_count}) logger.info("Auto-download completed: queued_count=%d", queued_count) # Update cooldown timestamp after successful auto-download self._last_auto_download_time = datetime.now(timezone.utc) async def _perform_rescan(self) -> None: """Execute a library rescan and optionally trigger auto-download.""" logger.info("Scheduler _perform_rescan entered: scan_in_progress=%s", self._scan_in_progress) if self._scan_in_progress: logger.warning("Skipping rescan: previous scan still in progress") return self._scan_in_progress = True scan_start = datetime.now(timezone.utc) logger.info("Scheduled rescan started at %s", scan_start.isoformat()) try: logger.info("Starting scheduled library rescan") from src.server.utils.dependencies import get_anime_service # noqa: PLC0415 anime_service = get_anime_service() logger.info("Anime service obtained for rescan") await self._broadcast( "scheduled_rescan_started", {"timestamp": scan_start.isoformat()}, ) logger.info("Calling anime_service.rescan()...") await anime_service.rescan() self._last_scan_time = datetime.now(timezone.utc) duration = (self._last_scan_time - scan_start).total_seconds() logger.info("Scheduled library rescan completed: duration=%.2fs", duration) await self._broadcast( "scheduled_rescan_completed", { "timestamp": self._last_scan_time.isoformat(), "duration_seconds": duration, }, ) # Auto-download after rescan if self._config and self._config.auto_download_after_rescan: logger.info("Auto-download after rescan is enabled — starting") try: await self._auto_download_missing() except Exception as dl_exc: # pylint: disable=broad-exception-caught logger.error( "Auto-download after rescan failed: %s", dl_exc, exc_info=True, ) await self._broadcast( "auto_download_error", {"error": str(dl_exc)} ) else: logger.debug("Auto-download after rescan is disabled — skipping") # Folder scan (daily maintenance) if self._config and self._config.folder_scan_enabled: logger.info("Folder scan is enabled — starting") try: from src.server.services.folder_scan_service import ( # noqa: PLC0415 FolderScanService, ) folder_scan_service = FolderScanService() await folder_scan_service.run_folder_scan() logger.info("Folder scan completed successfully") except Exception as fs_exc: # pylint: disable=broad-exception-caught logger.error( "Folder scan failed: %s", fs_exc, exc_info=True, ) await self._broadcast( "folder_scan_error", {"error": str(fs_exc)} ) else: logger.debug("Folder scan is disabled — skipping") except Exception as exc: # pylint: disable=broad-exception-caught logger.error("Scheduled rescan failed: %s", exc, exc_info=True) await self._broadcast( "scheduled_rescan_error", {"error": str(exc), "timestamp": datetime.now(timezone.utc).isoformat()}, ) finally: self._scan_in_progress = False logger.info("Scheduled rescan finished: scan_in_progress reset to False") # --------------------------------------------------------------------------- # Module-level job runner # # APScheduler cannot serialize bound methods (SchedulerService instance # contains a reference to the scheduler itself, creating a circular pickle # error). Using a module-level function avoids this. # --------------------------------------------------------------------------- async def _run_rescan_job() -> None: """Module-level job entry point — delegates to the current service.""" logger.info("=" * 60) logger.info("APScheduler triggered _run_rescan_job") logger.info("Getting scheduler service singleton...") svc = get_scheduler_service() logger.info("Scheduler service obtained, calling _perform_rescan()") await svc._perform_rescan() logger.info("_run_rescan_job completed") logger.info("=" * 60) # --------------------------------------------------------------------------- # Module-level singleton # --------------------------------------------------------------------------- _scheduler_service: Optional[SchedulerService] = None def get_scheduler_service() -> SchedulerService: """Return the singleton SchedulerService instance.""" global _scheduler_service if _scheduler_service is None: logger.info("Creating new SchedulerService singleton") _scheduler_service = SchedulerService() else: logger.debug("Returning existing SchedulerService singleton") return _scheduler_service def reset_scheduler_service() -> None: """Reset the singleton (used in tests).""" global _scheduler_service _scheduler_service = None