Files
Aniworld/src/server/services/scheduler/scheduler_service.py
Lukas e74b04c1ee feat: add NFO scan after rescan and year caching
- Add nfo_scan_after_rescan config option (default: true)
- Implement year caching in AniworldLoader and EnhancedAniWorldLoader
- Make get_year abstract method in base provider
- Run NFO validation/creation after scheduled rescan completes
- Add _YearDict cache to avoid re-extracting year from HTML
2026-06-05 18:15:41 +02:00

572 lines
21 KiB
Python

"""Scheduler service for automatic library rescans.
Uses APScheduler's AsyncIOScheduler with CronTrigger for precise
cron-based scheduling.
Jobs are held in memory (no separate scheduler database). On startup,
if the last scan timestamp indicates a missed run (server was down at the
scheduled cron time), a rescan is triggered immediately.
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from typing import Dict, List, Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from src.server.models.config import SchedulerConfig
from src.server.services.config_service import ConfigServiceError, get_config_service
logger = logging.getLogger(__name__)
_JOB_ID = "scheduled_rescan"
# Grace period for missed jobs (1 hour — handles server downtime between
# scheduled time and startup).
_MISFIRE_GRACE_SECONDS = 3600
_AUTO_DOWNLOAD_COOLDOWN_SECONDS = 300 # 5 minutes
class SchedulerServiceError(Exception):
"""Service-level exception for scheduler operations."""
class SchedulerService:
"""Manages automatic library rescans on a cron-based schedule.
Uses APScheduler's AsyncIOScheduler so scheduling integrates cleanly
with the running asyncio event loop. Supports:
- Cron-based scheduling (time of day + days of week)
- Immediate manual trigger
- Live config reloading without app restart
Actual rescan/folder-scan/auto-download work is handled inline.
"""
def __init__(self) -> None:
"""Initialise the scheduler service."""
self._is_running: bool = False
self._scheduler: Optional[AsyncIOScheduler] = None
self._config: Optional[SchedulerConfig] = None
self._scan_in_progress: bool = False
self._last_scan_time: Optional[datetime] = None
self._last_auto_download_time: Optional[datetime] = None
logger.info("SchedulerService initialised")
# ------------------------------------------------------------------
# Public lifecycle methods
# ------------------------------------------------------------------
async def start(self) -> None:
"""Start the APScheduler with the configured cron trigger.
Raises:
SchedulerServiceError: If the scheduler is already running or
config cannot be loaded.
"""
logger.info("SchedulerService.start() called")
if self._is_running:
logger.warning("Scheduler start called but already running")
raise SchedulerServiceError("Scheduler is already running")
try:
config_service = get_config_service()
config = config_service.load_config()
self._config = config.scheduler
logger.info("Scheduler config loaded successfully")
except ConfigServiceError as exc:
logger.error("Failed to load scheduler configuration: %s", exc)
raise SchedulerServiceError(f"Failed to load config: {exc}") from exc
self._scheduler = AsyncIOScheduler()
if not self._config.enabled:
logger.info("Scheduler is disabled in configuration — not adding jobs")
self._is_running = True
return
logger.info(
"Scheduler config loaded: enabled=%s time=%s days=%s auto_download=%s",
self._config.enabled,
self._config.schedule_time,
self._config.schedule_days,
self._config.auto_download_after_rescan,
)
trigger = self._build_cron_trigger()
if trigger is None:
logger.warning(
"schedule_days is empty — scheduler started but no job scheduled"
)
else:
self._scheduler.add_job(
_run_rescan_job,
trigger=trigger,
id=_JOB_ID,
replace_existing=True,
misfire_grace_time=_MISFIRE_GRACE_SECONDS,
coalesce=True,
)
logger.info(
"Scheduler started with cron trigger: time=%s days=%s",
self._config.schedule_time,
self._config.schedule_days,
)
self._scheduler.start()
self._is_running = True
# Log next scheduled run for visibility.
job = self._scheduler.get_job(_JOB_ID)
if job:
next_run = job.next_run_time
logger.info(
"Scheduler next run: %s",
next_run.isoformat() if next_run else None,
)
# Startup misfire recovery: check if the last scan was missed while
# the server was down.
await self._check_missed_run()
async def stop(self) -> None:
"""Stop the APScheduler gracefully."""
logger.info("SchedulerService.stop() called")
if not self._is_running:
logger.debug("Scheduler stop called but not running")
return
if self._scheduler and self._scheduler.running:
self._scheduler.shutdown(wait=False)
logger.info("Scheduler stopped")
else:
logger.info("Scheduler stop: scheduler was not running")
self._is_running = False
logger.info("SchedulerService stopped successfully")
async def ensure_started(self) -> None:
"""Ensure the scheduler is running (idempotent).
If already running, returns immediately. Otherwise, starts the scheduler.
This method is safe to call multiple times and from multiple callers.
Raises:
SchedulerServiceError: If startup fails (except for already running).
"""
if self._is_running:
logger.debug("Scheduler ensure_started called but already running")
return
logger.info("Scheduler ensure_started: starting scheduler")
await self.start()
async def trigger_rescan(self) -> bool:
"""Manually trigger a library rescan.
Returns:
True if rescan was started; False if a scan is already running.
Raises:
SchedulerServiceError: If the scheduler service is not started.
"""
if not self._is_running:
raise SchedulerServiceError("Scheduler is not running")
if self._scan_in_progress:
logger.warning("Cannot trigger rescan: scan already in progress")
return False
logger.info("Manual rescan triggered")
await self._perform_rescan()
return True
def reload_config(self, config: SchedulerConfig) -> None:
"""Apply a new SchedulerConfig immediately.
If the scheduler is already running the job is rescheduled (or
removed) without stopping the scheduler.
Args:
config: New scheduler configuration to apply.
"""
self._config = config
logger.info(
"Scheduler config reloaded: enabled=%s time=%s days=%s auto_download=%s",
config.enabled,
config.schedule_time,
config.schedule_days,
config.auto_download_after_rescan,
)
if not self._scheduler or not self._scheduler.running:
return
if not config.enabled:
if self._scheduler.get_job(_JOB_ID):
self._scheduler.remove_job(_JOB_ID)
logger.info("Scheduler job removed (disabled)")
return
trigger = self._build_cron_trigger()
if trigger is None:
if self._scheduler.get_job(_JOB_ID):
self._scheduler.remove_job(_JOB_ID)
logger.warning("Scheduler job removed — schedule_days is empty")
else:
if self._scheduler.get_job(_JOB_ID):
self._scheduler.reschedule_job(_JOB_ID, trigger=trigger)
logger.info(
"Scheduler rescheduled with cron trigger: time=%s days=%s",
config.schedule_time,
config.schedule_days,
)
else:
self._scheduler.add_job(
_run_rescan_job,
trigger=trigger,
id=_JOB_ID,
replace_existing=True,
misfire_grace_time=_MISFIRE_GRACE_SECONDS,
coalesce=True,
)
logger.info(
"Scheduler job added with cron trigger: time=%s days=%s",
config.schedule_time,
config.schedule_days,
)
def get_status(self) -> dict:
"""Return current scheduler status including cron configuration.
Returns:
Dict containing scheduler state and config fields.
"""
next_run: Optional[str] = None
if self._scheduler and self._scheduler.running:
job = self._scheduler.get_job(_JOB_ID)
if job and job.next_run_time:
next_run = job.next_run_time.isoformat()
return {
"is_running": self._is_running,
"enabled": self._config.enabled if self._config else False,
"interval_minutes": self._config.interval_minutes if self._config else None,
"schedule_time": self._config.schedule_time if self._config else None,
"schedule_days": self._config.schedule_days if self._config else [],
"auto_download_after_rescan": (
self._config.auto_download_after_rescan if self._config else False
),
"nfo_scan_after_rescan": (
self._config.nfo_scan_after_rescan if self._config else True
),
"last_run": (
self._last_scan_time.isoformat()
if self._last_scan_time
else None
),
"next_run": next_run,
"scan_in_progress": self._scan_in_progress,
}
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _build_cron_trigger(self) -> Optional[CronTrigger]:
"""Convert config fields into an APScheduler CronTrigger.
Returns:
CronTrigger instance or None if schedule_days is empty.
"""
if not self._config or not self._config.schedule_days:
return None
hour_str, minute_str = self._config.schedule_time.split(":")
day_of_week = ",".join(self._config.schedule_days)
trigger = CronTrigger(
hour=int(hour_str),
minute=int(minute_str),
day_of_week=day_of_week,
)
logger.debug(
"CronTrigger built: hour=%s minute=%s day_of_week=%s",
hour_str,
minute_str,
day_of_week,
)
return trigger
async def _check_missed_run(self) -> None:
"""Check if a scheduled rescan was missed while the server was down.
Compares system_settings.last_scan_timestamp against the expected
schedule. If the last scan is overdue (more than 24h ago for a daily
schedule) but within the grace period, triggers an immediate rescan.
"""
if not self._config or not self._config.enabled:
return
if not self._config.schedule_days:
return
try:
from src.server.database.connection import get_db_session
from src.server.database.system_settings_service import (
SystemSettingsService,
)
async with get_db_session() as db:
settings = await SystemSettingsService.get_or_create(db)
last_scan = settings.last_scan_timestamp
if last_scan is None:
# Never scanned before — trigger immediately
logger.info("No previous scan recorded — triggering immediate rescan")
await self._perform_rescan()
return
# Ensure timezone-aware comparison
if last_scan.tzinfo is None:
last_scan = last_scan.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
elapsed = now - last_scan
# If last scan was more than 24h + grace period ago, don't trigger
# (avoids surprise rescans after long downtime).
max_overdue = timedelta(hours=24, seconds=_MISFIRE_GRACE_SECONDS)
if elapsed > max_overdue:
logger.info(
"Last scan was %s ago (> %s) — skipping missed-run recovery",
elapsed,
max_overdue,
)
return
# Check if a run should have happened between last_scan and now.
if elapsed > timedelta(hours=23):
logger.info(
"Missed scheduled rescan detected (last scan %s ago) — triggering now",
elapsed,
)
await self._perform_rescan()
except Exception as exc: # pylint: disable=broad-exception-caught
logger.warning("Missed-run check failed (non-fatal): %s", exc)
async def _perform_rescan(self) -> None:
"""Execute a library rescan with auto-download and folder scan."""
logger.info(
"Scheduler _perform_rescan entered: scan_in_progress=%s",
self._scan_in_progress,
)
if self._scan_in_progress:
logger.warning("Skipping rescan: previous scan still in progress")
return
self._scan_in_progress = True
scan_start = datetime.now(timezone.utc)
try:
await self._broadcast("scheduled_rescan_started", {"timestamp": scan_start.isoformat()})
# 1. Main library rescan
await self._run_rescan()
# 2. NFO scan (if enabled)
if self._config and self._config.nfo_scan_after_rescan:
try:
nfo_result = await self._run_nfo_scan()
await self._broadcast("nfo_scan_started", {
"created": nfo_result.get("created", 0),
"updated": nfo_result.get("updated", 0),
})
except Exception as exc:
logger.error("NFO scan failed: %s", exc, exc_info=True)
await self._broadcast("nfo_scan_error", {"error": str(exc)})
# 3. Auto-download (if enabled)
if self._config and self._config.auto_download_after_rescan:
try:
queued = await self._run_auto_download()
await self._broadcast("auto_download_started", {"queued_count": queued})
except Exception as exc:
logger.error("Auto-download failed: %s", exc, exc_info=True)
await self._broadcast("auto_download_error", {"error": str(exc)})
self._last_scan_time = datetime.now(timezone.utc)
duration = (self._last_scan_time - scan_start).total_seconds()
await self._broadcast(
"scheduled_rescan_completed",
{
"timestamp": self._last_scan_time.isoformat(),
"duration_seconds": duration,
},
)
logger.info(
"Scheduled library rescan completed: duration=%.2fs",
duration,
)
except Exception as exc:
logger.error("Scheduled rescan failed: %s", exc, exc_info=True)
await self._broadcast(
"scheduled_rescan_error",
{"error": str(exc), "timestamp": datetime.now(timezone.utc).isoformat()},
)
raise
finally:
self._scan_in_progress = False
logger.info("Scheduled rescan finished: scan_in_progress reset to False")
async def _run_rescan(self) -> None:
"""Run the anime service rescan."""
from src.server.utils.dependencies import get_anime_service
anime_service = get_anime_service()
logger.info("Anime service obtained, calling anime_service.rescan()...")
await anime_service.rescan()
logger.info("anime_service.rescan() completed")
async def _run_nfo_scan(self) -> Dict[str, Any]:
"""Run NFO validation and creation across all series."""
from src.server.services.nfo_scan_service import get_nfo_scan_service
from src.server.utils.dependencies import get_anime_service
anime_service = get_anime_service()
nfo_scan_service = get_nfo_scan_service()
logger.info("Starting NFO scan...")
result = await nfo_scan_service.scan_all(anime_service)
logger.info(
"NFO scan completed: created=%d updated=%d errors=%d",
result.get("created", 0),
result.get("updated", 0),
result.get("errors_count", 0),
)
return result
async def _run_auto_download(self) -> int:
"""Queue and start downloads for all series with missing episodes."""
from src.server.models.download import EpisodeIdentifier
from src.server.utils.dependencies import (
get_anime_service,
get_download_service,
)
# Cooldown check
now = datetime.now(timezone.utc)
if self._last_auto_download_time is not None:
elapsed = now - self._last_auto_download_time
if elapsed < timedelta(seconds=_AUTO_DOWNLOAD_COOLDOWN_SECONDS):
logger.debug(
"Auto-download skipped: cooldown active (elapsed=%.1fs cooldown=%ds)",
elapsed.total_seconds(),
_AUTO_DOWNLOAD_COOLDOWN_SECONDS,
)
return 0
anime_service = get_anime_service()
download_service = get_download_service()
series_list = anime_service._cached_list_missing()
queued_count = 0
for series in series_list:
episode_dict: dict = series.get("episodeDict") or {}
if not episode_dict:
continue
episodes: List[EpisodeIdentifier] = []
for season_str, ep_numbers in episode_dict.items():
for ep_num in ep_numbers:
episodes.append(
EpisodeIdentifier(season=int(season_str), episode=int(ep_num))
)
if not episodes:
continue
await download_service.add_to_queue(
serie_id=series.get("key", ""),
serie_folder=series.get("folder", series.get("name", "")),
serie_name=series.get("name", ""),
episodes=episodes,
)
queued_count += len(episodes)
logger.info(
"Auto-download queued episodes for series=%s count=%d",
series.get("key"),
len(episodes),
)
if queued_count:
await download_service.start_queue_processing()
logger.info("Auto-download queue processing started: queued=%d", queued_count)
self._last_auto_download_time = datetime.now(timezone.utc)
logger.info("Auto-download completed: queued_count=%d", queued_count)
return queued_count
async def _broadcast(self, event_type: str, data: dict) -> None:
"""Broadcast a WebSocket event to all connected clients."""
try:
from src.server.services.websocket_service import get_websocket_service
ws_service = get_websocket_service()
await ws_service.manager.broadcast({"type": event_type, "data": data})
except Exception as exc:
logger.warning(
"WebSocket broadcast failed: event=%s error=%s", event_type, exc
)
# ---------------------------------------------------------------------------
# Module-level job runner
#
# APScheduler cannot serialize bound methods (SchedulerService instance
# contains a reference to the scheduler itself, creating a circular pickle
# error). Using a module-level function avoids this.
# ---------------------------------------------------------------------------
async def _run_rescan_job() -> None:
"""Module-level job entry point — delegates to the current service."""
logger.info("=" * 60)
logger.info("APScheduler triggered _run_rescan_job")
logger.info("Getting scheduler service singleton...")
svc = get_scheduler_service()
logger.info("Scheduler service obtained, calling _perform_rescan()")
await svc._perform_rescan()
logger.info("_run_rescan_job completed")
logger.info("=" * 60)
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
_scheduler_service: Optional[SchedulerService] = None
def get_scheduler_service() -> SchedulerService:
"""Return the singleton SchedulerService instance."""
global _scheduler_service
if _scheduler_service is None:
logger.info("Creating new SchedulerService singleton")
_scheduler_service = SchedulerService()
else:
logger.debug("Returning existing SchedulerService singleton")
return _scheduler_service
def reset_scheduler_service() -> None:
"""Reset the singleton (used in tests)."""
global _scheduler_service
_scheduler_service = None