feat: cron-based scheduler with auto-download after rescan
- Replace asyncio sleep loop with APScheduler AsyncIOScheduler + CronTrigger
- Add schedule_time (HH:MM), schedule_days (days of week), auto_download_after_rescan fields to SchedulerConfig
- Add _auto_download_missing() to queue missing episodes after rescan
- Reload config live via reload_config(SchedulerConfig) without restart
- Update GET/POST /api/scheduler/config to return {success, config, status} envelope
- Add day-of-week pill toggles to Settings -> Scheduler section in UI
- Update JS loadSchedulerConfig / saveSchedulerConfig for new API shape
- Add 29 unit tests for SchedulerConfig model, 18 unit tests for SchedulerService
- Rewrite 23 endpoint tests and 36 integration tests for APScheduler behaviour
- Coverage: 96% api/scheduler, 95% scheduler_service, 90% total (>= 80% threshold)
- Update docs: API.md, CONFIGURATION.md, features.md, CHANGELOG.md
This commit is contained in:
@@ -1,305 +1,377 @@
|
||||
"""Scheduler service for automatic library rescans.
|
||||
|
||||
This module provides a background scheduler that performs periodic library rescans
|
||||
according to the configured interval. It handles conflict resolution with manual
|
||||
scans and persists scheduler state.
|
||||
Uses APScheduler's AsyncIOScheduler with CronTrigger for precise
|
||||
cron-based scheduling. The legacy interval-based loop has been removed
|
||||
in favour of the cron approach.
|
||||
"""
|
||||
import asyncio
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
from typing import List, Optional
|
||||
|
||||
import structlog
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
|
||||
from src.server.models.config import SchedulerConfig
|
||||
from src.server.services.config_service import ConfigServiceError, get_config_service
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
_JOB_ID = "scheduled_rescan"
|
||||
|
||||
|
||||
class SchedulerServiceError(Exception):
|
||||
"""Service-level exception for scheduler operations."""
|
||||
|
||||
|
||||
class SchedulerService:
|
||||
"""Manages automatic library rescans on a configurable schedule.
|
||||
|
||||
Features:
|
||||
- Periodic library rescans based on configured interval
|
||||
- Conflict resolution (prevents concurrent scans)
|
||||
- State persistence across restarts
|
||||
- Manual trigger capability
|
||||
- Enable/disable functionality
|
||||
|
||||
The scheduler uses a simple interval-based approach where rescans
|
||||
are triggered every N minutes as configured.
|
||||
"""Manages automatic library rescans on a cron-based schedule.
|
||||
|
||||
Uses APScheduler's AsyncIOScheduler so scheduling integrates cleanly
|
||||
with the running asyncio event loop. Supports:
|
||||
|
||||
- Cron-based scheduling (time of day + days of week)
|
||||
- Immediate manual trigger
|
||||
- Live config reloading without app restart
|
||||
- Auto-queueing downloads of missing episodes after rescan
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the scheduler service."""
|
||||
def __init__(self) -> None:
|
||||
"""Initialise the scheduler service."""
|
||||
self._is_running: bool = False
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._scheduler: Optional[AsyncIOScheduler] = None
|
||||
self._config: Optional[SchedulerConfig] = None
|
||||
self._last_scan_time: Optional[datetime] = None
|
||||
self._next_scan_time: Optional[datetime] = None
|
||||
self._scan_in_progress: bool = False
|
||||
|
||||
logger.info("SchedulerService initialized")
|
||||
logger.info("SchedulerService initialised")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public lifecycle methods
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the scheduler background task.
|
||||
|
||||
"""Start the APScheduler with the configured cron trigger.
|
||||
|
||||
Raises:
|
||||
SchedulerServiceError: If scheduler is already running
|
||||
SchedulerServiceError: If the scheduler is already running or
|
||||
config cannot be loaded.
|
||||
"""
|
||||
if self._is_running:
|
||||
raise SchedulerServiceError("Scheduler is already running")
|
||||
|
||||
# Load configuration
|
||||
|
||||
try:
|
||||
config_service = get_config_service()
|
||||
config = config_service.load_config()
|
||||
self._config = config.scheduler
|
||||
except ConfigServiceError as e:
|
||||
logger.error("Failed to load scheduler configuration", error=str(e))
|
||||
raise SchedulerServiceError(f"Failed to load config: {e}") from e
|
||||
|
||||
except ConfigServiceError as exc:
|
||||
logger.error("Failed to load scheduler configuration", error=str(exc))
|
||||
raise SchedulerServiceError(f"Failed to load config: {exc}") from exc
|
||||
|
||||
self._scheduler = AsyncIOScheduler()
|
||||
|
||||
if not self._config.enabled:
|
||||
logger.info("Scheduler is disabled in configuration")
|
||||
logger.info("Scheduler is disabled in configuration — not adding jobs")
|
||||
self._is_running = True
|
||||
return
|
||||
|
||||
|
||||
trigger = self._build_cron_trigger()
|
||||
if trigger is None:
|
||||
logger.warning(
|
||||
"schedule_days is empty — scheduler started but no job scheduled"
|
||||
)
|
||||
else:
|
||||
self._scheduler.add_job(
|
||||
self._perform_rescan,
|
||||
trigger=trigger,
|
||||
id=_JOB_ID,
|
||||
replace_existing=True,
|
||||
misfire_grace_time=300,
|
||||
)
|
||||
logger.info(
|
||||
"Scheduler started with cron trigger",
|
||||
schedule_time=self._config.schedule_time,
|
||||
schedule_days=self._config.schedule_days,
|
||||
)
|
||||
|
||||
self._scheduler.start()
|
||||
self._is_running = True
|
||||
self._task = asyncio.create_task(self._scheduler_loop())
|
||||
logger.info(
|
||||
"Scheduler started",
|
||||
interval_minutes=self._config.interval_minutes
|
||||
)
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop the scheduler background task gracefully.
|
||||
|
||||
Cancels the running scheduler task and waits for it to complete.
|
||||
"""
|
||||
"""Stop the APScheduler gracefully."""
|
||||
if not self._is_running:
|
||||
logger.debug("Scheduler stop called but not running")
|
||||
return
|
||||
|
||||
|
||||
if self._scheduler and self._scheduler.running:
|
||||
self._scheduler.shutdown(wait=False)
|
||||
logger.info("Scheduler stopped")
|
||||
|
||||
self._is_running = False
|
||||
|
||||
if self._task and not self._task.done():
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Scheduler task cancelled successfully")
|
||||
|
||||
logger.info("Scheduler stopped")
|
||||
|
||||
async def trigger_rescan(self) -> bool:
|
||||
"""Manually trigger a library rescan.
|
||||
|
||||
|
||||
Returns:
|
||||
True if rescan was triggered, False if scan already in progress
|
||||
|
||||
True if rescan was started; False if a scan is already running.
|
||||
|
||||
Raises:
|
||||
SchedulerServiceError: If scheduler is not running
|
||||
SchedulerServiceError: If the scheduler service is not started.
|
||||
"""
|
||||
if not self._is_running:
|
||||
raise SchedulerServiceError("Scheduler is not running")
|
||||
|
||||
|
||||
if self._scan_in_progress:
|
||||
logger.warning("Cannot trigger rescan: scan already in progress")
|
||||
return False
|
||||
|
||||
|
||||
logger.info("Manual rescan triggered")
|
||||
await self._perform_rescan()
|
||||
return True
|
||||
|
||||
async def reload_config(self) -> None:
|
||||
"""Reload scheduler configuration from config service.
|
||||
|
||||
The scheduler will restart with the new configuration if it's running.
|
||||
|
||||
Raises:
|
||||
SchedulerServiceError: If config reload fails
|
||||
def reload_config(self, config: SchedulerConfig) -> None:
|
||||
"""Apply a new SchedulerConfig immediately.
|
||||
|
||||
If the scheduler is already running the job is rescheduled (or
|
||||
removed) without stopping the scheduler.
|
||||
|
||||
Args:
|
||||
config: New scheduler configuration to apply.
|
||||
"""
|
||||
try:
|
||||
config_service = get_config_service()
|
||||
config = config_service.load_config()
|
||||
old_config = self._config
|
||||
self._config = config.scheduler
|
||||
|
||||
logger.info(
|
||||
"Scheduler configuration reloaded",
|
||||
old_enabled=old_config.enabled if old_config else None,
|
||||
new_enabled=self._config.enabled,
|
||||
old_interval=old_config.interval_minutes if old_config else None,
|
||||
new_interval=self._config.interval_minutes
|
||||
)
|
||||
|
||||
# Restart scheduler if it's running and config changed
|
||||
if self._is_running:
|
||||
if not self._config.enabled:
|
||||
logger.info("Scheduler disabled, stopping...")
|
||||
await self.stop()
|
||||
elif old_config and old_config.interval_minutes != self._config.interval_minutes:
|
||||
logger.info("Interval changed, restarting scheduler...")
|
||||
await self.stop()
|
||||
await self.start()
|
||||
elif self._config.enabled and not self._is_running:
|
||||
logger.info("Scheduler enabled, starting...")
|
||||
await self.start()
|
||||
|
||||
except ConfigServiceError as e:
|
||||
logger.error("Failed to reload scheduler configuration", error=str(e))
|
||||
raise SchedulerServiceError(f"Failed to reload config: {e}") from e
|
||||
self._config = config
|
||||
logger.info(
|
||||
"Scheduler config reloaded",
|
||||
enabled=config.enabled,
|
||||
schedule_time=config.schedule_time,
|
||||
schedule_days=config.schedule_days,
|
||||
auto_download=config.auto_download_after_rescan,
|
||||
)
|
||||
|
||||
if not self._scheduler or not self._scheduler.running:
|
||||
return
|
||||
|
||||
if not config.enabled:
|
||||
if self._scheduler.get_job(_JOB_ID):
|
||||
self._scheduler.remove_job(_JOB_ID)
|
||||
logger.info("Scheduler job removed (disabled)")
|
||||
return
|
||||
|
||||
trigger = self._build_cron_trigger()
|
||||
if trigger is None:
|
||||
if self._scheduler.get_job(_JOB_ID):
|
||||
self._scheduler.remove_job(_JOB_ID)
|
||||
logger.warning("Scheduler job removed — schedule_days is empty")
|
||||
else:
|
||||
if self._scheduler.get_job(_JOB_ID):
|
||||
self._scheduler.reschedule_job(_JOB_ID, trigger=trigger)
|
||||
logger.info(
|
||||
"Scheduler rescheduled with cron trigger",
|
||||
schedule_time=config.schedule_time,
|
||||
schedule_days=config.schedule_days,
|
||||
)
|
||||
else:
|
||||
self._scheduler.add_job(
|
||||
self._perform_rescan,
|
||||
trigger=trigger,
|
||||
id=_JOB_ID,
|
||||
replace_existing=True,
|
||||
misfire_grace_time=300,
|
||||
)
|
||||
logger.info(
|
||||
"Scheduler job added with cron trigger",
|
||||
schedule_time=config.schedule_time,
|
||||
schedule_days=config.schedule_days,
|
||||
)
|
||||
|
||||
def get_status(self) -> dict:
|
||||
"""Get current scheduler status.
|
||||
|
||||
"""Return current scheduler status including cron configuration.
|
||||
|
||||
Returns:
|
||||
Dict containing scheduler state information
|
||||
Dict containing scheduler state and config fields.
|
||||
"""
|
||||
next_run: Optional[str] = None
|
||||
if self._scheduler and self._scheduler.running:
|
||||
job = self._scheduler.get_job(_JOB_ID)
|
||||
if job and job.next_run_time:
|
||||
next_run = job.next_run_time.isoformat()
|
||||
|
||||
return {
|
||||
"is_running": self._is_running,
|
||||
"enabled": self._config.enabled if self._config else False,
|
||||
"interval_minutes": self._config.interval_minutes if self._config else None,
|
||||
"last_scan_time": self._last_scan_time.isoformat() if self._last_scan_time else None,
|
||||
"next_scan_time": self._next_scan_time.isoformat() if self._next_scan_time else None,
|
||||
"schedule_time": self._config.schedule_time if self._config else None,
|
||||
"schedule_days": self._config.schedule_days if self._config else [],
|
||||
"auto_download_after_rescan": (
|
||||
self._config.auto_download_after_rescan if self._config else False
|
||||
),
|
||||
"last_run": self._last_scan_time.isoformat() if self._last_scan_time else None,
|
||||
"next_run": next_run,
|
||||
"scan_in_progress": self._scan_in_progress,
|
||||
}
|
||||
|
||||
async def _scheduler_loop(self) -> None:
|
||||
"""Main scheduler loop that runs periodic rescans.
|
||||
|
||||
This coroutine runs indefinitely until cancelled, sleeping for the
|
||||
configured interval between rescans.
|
||||
# ------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _build_cron_trigger(self) -> Optional[CronTrigger]:
|
||||
"""Convert config fields into an APScheduler CronTrigger.
|
||||
|
||||
Returns:
|
||||
CronTrigger instance or None if schedule_days is empty.
|
||||
"""
|
||||
logger.info("Scheduler loop started")
|
||||
|
||||
while self._is_running:
|
||||
try:
|
||||
if not self._config or not self._config.enabled:
|
||||
logger.debug("Scheduler disabled, exiting loop")
|
||||
break
|
||||
|
||||
# Calculate next scan time
|
||||
interval_seconds = self._config.interval_minutes * 60
|
||||
self._next_scan_time = datetime.now(timezone.utc)
|
||||
self._next_scan_time = self._next_scan_time.replace(
|
||||
second=0, microsecond=0
|
||||
)
|
||||
|
||||
# Wait for the interval
|
||||
logger.debug(
|
||||
"Waiting for next scan",
|
||||
interval_minutes=self._config.interval_minutes,
|
||||
next_scan=self._next_scan_time.isoformat()
|
||||
)
|
||||
await asyncio.sleep(interval_seconds)
|
||||
|
||||
# Perform the rescan
|
||||
if self._is_running: # Check again after sleep
|
||||
await self._perform_rescan()
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Scheduler loop cancelled")
|
||||
break
|
||||
except Exception as e: # pylint: disable=broad-exception-caught
|
||||
logger.error(
|
||||
"Error in scheduler loop",
|
||||
error=str(e),
|
||||
exc_info=True
|
||||
)
|
||||
# Continue loop despite errors
|
||||
await asyncio.sleep(60) # Wait 1 minute before retrying
|
||||
|
||||
logger.info("Scheduler loop exited")
|
||||
if not self._config or not self._config.schedule_days:
|
||||
return None
|
||||
|
||||
hour_str, minute_str = self._config.schedule_time.split(":")
|
||||
day_of_week = ",".join(self._config.schedule_days)
|
||||
|
||||
trigger = CronTrigger(
|
||||
hour=int(hour_str),
|
||||
minute=int(minute_str),
|
||||
day_of_week=day_of_week,
|
||||
)
|
||||
logger.debug(
|
||||
"CronTrigger built",
|
||||
hour=hour_str,
|
||||
minute=minute_str,
|
||||
day_of_week=day_of_week,
|
||||
)
|
||||
return trigger
|
||||
|
||||
async def _broadcast(self, event_type: str, data: dict) -> None:
|
||||
"""Broadcast a WebSocket event to all connected clients."""
|
||||
try:
|
||||
from src.server.services.websocket_service import ( # noqa: PLC0415
|
||||
get_websocket_service,
|
||||
)
|
||||
|
||||
ws_service = get_websocket_service()
|
||||
await ws_service.manager.broadcast({"type": event_type, "data": data})
|
||||
except Exception as exc: # pylint: disable=broad-exception-caught
|
||||
logger.warning("WebSocket broadcast failed", event=event_type, error=str(exc))
|
||||
|
||||
async def _auto_download_missing(self) -> None:
|
||||
"""Queue and start downloads for all series with missing episodes."""
|
||||
from src.server.models.download import EpisodeIdentifier # noqa: PLC0415
|
||||
from src.server.utils.dependencies import ( # noqa: PLC0415
|
||||
get_anime_service,
|
||||
get_download_service,
|
||||
)
|
||||
|
||||
anime_service = get_anime_service()
|
||||
download_service = get_download_service()
|
||||
|
||||
series_list = anime_service._cached_list_missing()
|
||||
queued_count = 0
|
||||
|
||||
for series in series_list:
|
||||
episode_dict: dict = series.get("episodeDict") or {}
|
||||
if not episode_dict:
|
||||
continue
|
||||
|
||||
episodes: List[EpisodeIdentifier] = []
|
||||
for season_str, ep_numbers in episode_dict.items():
|
||||
for ep_num in ep_numbers:
|
||||
episodes.append(
|
||||
EpisodeIdentifier(season=int(season_str), episode=int(ep_num))
|
||||
)
|
||||
|
||||
if not episodes:
|
||||
continue
|
||||
|
||||
await download_service.add_to_queue(
|
||||
serie_id=series.get("key", ""),
|
||||
serie_folder=series.get("folder", series.get("name", "")),
|
||||
serie_name=series.get("name", ""),
|
||||
episodes=episodes,
|
||||
)
|
||||
queued_count += len(episodes)
|
||||
logger.info(
|
||||
"Auto-download queued episodes",
|
||||
series=series.get("key"),
|
||||
count=len(episodes),
|
||||
)
|
||||
|
||||
if queued_count:
|
||||
await download_service.start_queue_processing()
|
||||
logger.info("Auto-download queue processing started", queued=queued_count)
|
||||
|
||||
await self._broadcast("auto_download_started", {"queued_count": queued_count})
|
||||
logger.info("Auto-download completed", queued_count=queued_count)
|
||||
|
||||
async def _perform_rescan(self) -> None:
|
||||
"""Execute a library rescan.
|
||||
|
||||
This method calls the anime service to perform the actual rescan.
|
||||
It includes conflict detection to prevent concurrent scans.
|
||||
"""
|
||||
"""Execute a library rescan and optionally trigger auto-download."""
|
||||
if self._scan_in_progress:
|
||||
logger.warning("Skipping rescan: previous scan still in progress")
|
||||
return
|
||||
|
||||
|
||||
self._scan_in_progress = True
|
||||
scan_start = datetime.now(timezone.utc)
|
||||
|
||||
|
||||
try:
|
||||
logger.info("Starting scheduled library rescan")
|
||||
|
||||
# Import here to avoid circular dependency
|
||||
from src.server.services.websocket_service import get_websocket_service
|
||||
from src.server.utils.dependencies import get_anime_service
|
||||
|
||||
|
||||
from src.server.utils.dependencies import get_anime_service # noqa: PLC0415
|
||||
|
||||
anime_service = get_anime_service()
|
||||
ws_service = get_websocket_service()
|
||||
|
||||
# Notify clients that scheduled rescan started
|
||||
await ws_service.manager.broadcast({
|
||||
"type": "scheduled_rescan_started",
|
||||
"data": {
|
||||
"timestamp": scan_start.isoformat()
|
||||
}
|
||||
})
|
||||
|
||||
# Perform the rescan
|
||||
|
||||
await self._broadcast(
|
||||
"scheduled_rescan_started",
|
||||
{"timestamp": scan_start.isoformat()},
|
||||
)
|
||||
|
||||
await anime_service.rescan()
|
||||
|
||||
|
||||
self._last_scan_time = datetime.now(timezone.utc)
|
||||
|
||||
logger.info(
|
||||
"Scheduled library rescan completed",
|
||||
duration_seconds=(self._last_scan_time - scan_start).total_seconds()
|
||||
)
|
||||
|
||||
# Notify clients that rescan completed
|
||||
await ws_service.manager.broadcast({
|
||||
"type": "scheduled_rescan_completed",
|
||||
"data": {
|
||||
duration = (self._last_scan_time - scan_start).total_seconds()
|
||||
|
||||
logger.info("Scheduled library rescan completed", duration_seconds=duration)
|
||||
|
||||
await self._broadcast(
|
||||
"scheduled_rescan_completed",
|
||||
{
|
||||
"timestamp": self._last_scan_time.isoformat(),
|
||||
"duration_seconds": (self._last_scan_time - scan_start).total_seconds()
|
||||
}
|
||||
})
|
||||
|
||||
except Exception as e: # pylint: disable=broad-exception-caught
|
||||
logger.error(
|
||||
"Scheduled rescan failed",
|
||||
error=str(e),
|
||||
exc_info=True
|
||||
"duration_seconds": duration,
|
||||
},
|
||||
)
|
||||
|
||||
# Notify clients of error
|
||||
try:
|
||||
from src.server.services.websocket_service import get_websocket_service
|
||||
ws_service = get_websocket_service()
|
||||
await ws_service.manager.broadcast({
|
||||
"type": "scheduled_rescan_error",
|
||||
"data": {
|
||||
"error": str(e),
|
||||
"timestamp": datetime.now(timezone.utc).isoformat()
|
||||
}
|
||||
})
|
||||
except Exception: # pylint: disable=broad-exception-caught
|
||||
pass # Don't fail if WebSocket notification fails
|
||||
|
||||
|
||||
# Auto-download after rescan
|
||||
if self._config and self._config.auto_download_after_rescan:
|
||||
logger.info("Auto-download after rescan is enabled — starting")
|
||||
try:
|
||||
await self._auto_download_missing()
|
||||
except Exception as dl_exc: # pylint: disable=broad-exception-caught
|
||||
logger.error(
|
||||
"Auto-download after rescan failed",
|
||||
error=str(dl_exc),
|
||||
exc_info=True,
|
||||
)
|
||||
await self._broadcast(
|
||||
"auto_download_error", {"error": str(dl_exc)}
|
||||
)
|
||||
else:
|
||||
logger.debug("Auto-download after rescan is disabled — skipping")
|
||||
|
||||
except Exception as exc: # pylint: disable=broad-exception-caught
|
||||
logger.error("Scheduled rescan failed", error=str(exc), exc_info=True)
|
||||
await self._broadcast(
|
||||
"scheduled_rescan_error",
|
||||
{"error": str(exc), "timestamp": datetime.now(timezone.utc).isoformat()},
|
||||
)
|
||||
|
||||
finally:
|
||||
self._scan_in_progress = False
|
||||
|
||||
|
||||
# Module-level singleton instance
|
||||
# ---------------------------------------------------------------------------
|
||||
# Module-level singleton
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_scheduler_service: Optional[SchedulerService] = None
|
||||
|
||||
|
||||
def get_scheduler_service() -> SchedulerService:
|
||||
"""Get the singleton scheduler service instance.
|
||||
|
||||
Returns:
|
||||
SchedulerService singleton
|
||||
"""
|
||||
"""Return the singleton SchedulerService instance."""
|
||||
global _scheduler_service
|
||||
if _scheduler_service is None:
|
||||
_scheduler_service = SchedulerService()
|
||||
@@ -307,6 +379,6 @@ def get_scheduler_service() -> SchedulerService:
|
||||
|
||||
|
||||
def reset_scheduler_service() -> None:
|
||||
"""Reset the scheduler service singleton (for testing)."""
|
||||
"""Reset the singleton (used in tests)."""
|
||||
global _scheduler_service
|
||||
_scheduler_service = None
|
||||
|
||||
Reference in New Issue
Block a user