Compare commits

...

4 Commits

Author SHA1 Message Date
50a77976d5 chore: bump version 2026-05-26 13:28:12 +02:00
dfc28b8e66 fix(scheduler): ensure scheduler starts after setup/config update
Add ensure_started() to SchedulerService as idempotent entry point.
Start scheduler in auth setup run_initialization() after NFO scan.
Sync anime_directory and start scheduler in config update endpoint.
Add unit and endpoint tests for ensure_started() behavior.
2026-05-26 13:23:48 +02:00
6c9605e896 chore: bump version 2026-05-26 08:58:37 +02:00
3947f6d266 refactor(scheduler): replace structlog with std logging and add extensive diagnostics
- Switch scheduler_service from structlog to standard logging for consistency
- Add detailed lifecycle logging in SchedulerService (start, stop, rescan)
- Add debug logging in fastapi_app scheduler initialization
- Fix test_add_series_episodes to mock EpisodeService.get_by_series
2026-05-26 07:51:22 +02:00
9 changed files with 245 additions and 54 deletions

View File

@@ -1 +1 @@
v1.1.14 v1.1.16

View File

@@ -1,6 +1,6 @@
{ {
"name": "aniworld-web", "name": "aniworld-web",
"version": "1.1.14", "version": "1.1.16",
"description": "Aniworld Anime Download Manager - Web Frontend", "description": "Aniworld Anime Download Manager - Web Frontend",
"type": "module", "type": "module",
"scripts": { "scripts": {

View File

@@ -163,6 +163,22 @@ async def setup_auth(req: SetupRequest):
# Perform NFO scan if configured # Perform NFO scan if configured
await perform_nfo_scan_if_needed(progress_service) await perform_nfo_scan_if_needed(progress_service)
# Start scheduler if anime_directory is now set
try:
from src.server.services.scheduler_service import (
get_scheduler_service,
)
scheduler_svc = get_scheduler_service()
logger.info("Starting scheduler after initialization")
await scheduler_svc.ensure_started()
logger.info("Scheduler started successfully during setup")
except Exception as sched_exc:
logger.warning(
"Failed to start scheduler during setup: %s", sched_exc
)
# Continue — scheduler failure should not break initialization
# Send completion event # Send completion event
from src.server.services.progress_service import ProgressType from src.server.services.progress_service import ProgressType
await progress_service.start_progress( await progress_service.start_progress(

View File

@@ -1,7 +1,10 @@
import logging
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
logger = logging.getLogger(__name__)
from src.server.models.config import AppConfig, ConfigUpdate, ValidationResult from src.server.models.config import AppConfig, ConfigUpdate, ValidationResult
from src.server.services.config_service import ( from src.server.services.config_service import (
ConfigBackupError, ConfigBackupError,
@@ -28,16 +31,53 @@ def get_config(auth: Optional[dict] = Depends(require_auth)) -> AppConfig:
@router.put("", response_model=AppConfig) @router.put("", response_model=AppConfig)
def update_config( async def update_config(
update: ConfigUpdate, auth: dict = Depends(require_auth) update: ConfigUpdate, auth: dict = Depends(require_auth)
) -> AppConfig: ) -> AppConfig:
"""Apply an update to the configuration and persist it. """Apply an update to the configuration and persist it.
Creates automatic backup before applying changes. Creates automatic backup before applying changes. If anime_directory
is configured, starts the scheduler service.
""" """
try: try:
config_service = get_config_service() config_service = get_config_service()
return config_service.update_config(update) updated_config = config_service.update_config(update)
# Sync anime_directory to settings if it was updated
from src.config.settings import settings as app_settings
anime_dir_changed = False
if update.other and update.other.get("anime_directory"):
anime_dir = update.other.get("anime_directory")
if anime_dir and not app_settings.anime_directory:
app_settings.anime_directory = str(anime_dir)
anime_dir_changed = True
logger.info("Synced anime_directory from config: %s", anime_dir)
# Start scheduler if anime_directory was just configured
if anime_dir_changed:
try:
from src.server.services.scheduler_service import (
get_scheduler_service,
)
scheduler_svc = get_scheduler_service()
logger.info(
"Starting scheduler after anime_directory configuration"
)
await scheduler_svc.ensure_started()
logger.info(
"Scheduler started successfully after config update"
)
except Exception as sched_exc:
logger.warning(
"Failed to start scheduler after config update: %s",
sched_exc,
)
# Config was already saved, don't fail the request
return updated_config
except ConfigValidationError as e: except ConfigValidationError as e:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, status_code=status.HTTP_400_BAD_REQUEST,

View File

@@ -121,7 +121,7 @@ async def _run_startup_health_checks(logger) -> dict:
import asyncio import asyncio
import shutil import shutil
import socket import socket
from typing import Dict, Any from typing import Any, Dict
checks: Dict[str, Any] = { checks: Dict[str, Any] = {
"ffmpeg": {"status": "unknown", "message": None}, "ffmpeg": {"status": "unknown", "message": None},
@@ -400,13 +400,15 @@ async def lifespan(_application: FastAPI):
# Initialize and start scheduler service # Initialize and start scheduler service
try: try:
logger.info("Initializing scheduler service...")
from src.server.services.scheduler_service import ( from src.server.services.scheduler_service import (
get_scheduler_service, get_scheduler_service,
) )
scheduler_service = get_scheduler_service() scheduler_service = get_scheduler_service()
logger.info("Scheduler service instance obtained, starting...")
await scheduler_service.start() await scheduler_service.start()
initialized['scheduler'] = True initialized['scheduler'] = True
logger.info("Scheduler service started") logger.info("Scheduler service started successfully")
except Exception as e: except Exception as e:
logger.warning("Failed to start scheduler service: %s", e) logger.warning("Failed to start scheduler service: %s", e)
# Continue - scheduler is optional # Continue - scheduler is optional

View File

@@ -10,10 +10,10 @@ cron time), the job is triggered immediately within a grace period.
""" """
from __future__ import annotations from __future__ import annotations
import logging
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import List, Optional from typing import List, Optional
import structlog
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.cron import CronTrigger
@@ -21,7 +21,7 @@ from apscheduler.triggers.cron import CronTrigger
from src.server.models.config import SchedulerConfig from src.server.models.config import SchedulerConfig
from src.server.services.config_service import ConfigServiceError, get_config_service from src.server.services.config_service import ConfigServiceError, get_config_service
logger = structlog.get_logger(__name__) logger = logging.getLogger(__name__)
_JOB_ID = "scheduled_rescan" _JOB_ID = "scheduled_rescan"
@@ -69,15 +69,18 @@ class SchedulerService:
SchedulerServiceError: If the scheduler is already running or SchedulerServiceError: If the scheduler is already running or
config cannot be loaded. config cannot be loaded.
""" """
logger.info("SchedulerService.start() called")
if self._is_running: if self._is_running:
logger.warning("Scheduler start called but already running")
raise SchedulerServiceError("Scheduler is already running") raise SchedulerServiceError("Scheduler is already running")
try: try:
config_service = get_config_service() config_service = get_config_service()
config = config_service.load_config() config = config_service.load_config()
self._config = config.scheduler self._config = config.scheduler
logger.info("Scheduler config loaded successfully")
except ConfigServiceError as exc: except ConfigServiceError as exc:
logger.error("Failed to load scheduler configuration", error=str(exc)) logger.error("Failed to load scheduler configuration: %s", exc)
raise SchedulerServiceError(f"Failed to load config: {exc}") from exc raise SchedulerServiceError(f"Failed to load config: {exc}") from exc
jobstores = { jobstores = {
@@ -90,6 +93,15 @@ class SchedulerService:
self._is_running = True self._is_running = True
return return
logger.info(
"Scheduler config loaded: enabled=%s time=%s days=%s auto_download=%s folder_scan=%s",
self._config.enabled,
self._config.schedule_time,
self._config.schedule_days,
self._config.auto_download_after_rescan,
self._config.folder_scan_enabled,
)
trigger = self._build_cron_trigger() trigger = self._build_cron_trigger()
if trigger is None: if trigger is None:
logger.warning( logger.warning(
@@ -105,9 +117,9 @@ class SchedulerService:
coalesce=True, coalesce=True,
) )
logger.info( logger.info(
"Scheduler started with cron trigger", "Scheduler started with cron trigger: time=%s days=%s",
schedule_time=self._config.schedule_time, self._config.schedule_time,
schedule_days=self._config.schedule_days, self._config.schedule_days,
) )
self._scheduler.start() self._scheduler.start()
@@ -121,12 +133,13 @@ class SchedulerService:
if job: if job:
next_run = job.next_run_time next_run = job.next_run_time
logger.info( logger.info(
"Scheduler next run", "Scheduler next run: %s",
next_run=next_run.isoformat() if next_run else None, next_run.isoformat() if next_run else None,
) )
async def stop(self) -> None: async def stop(self) -> None:
"""Stop the APScheduler gracefully.""" """Stop the APScheduler gracefully."""
logger.info("SchedulerService.stop() called")
if not self._is_running: if not self._is_running:
logger.debug("Scheduler stop called but not running") logger.debug("Scheduler stop called but not running")
return return
@@ -134,8 +147,27 @@ class SchedulerService:
if self._scheduler and self._scheduler.running: if self._scheduler and self._scheduler.running:
self._scheduler.shutdown(wait=False) self._scheduler.shutdown(wait=False)
logger.info("Scheduler stopped") logger.info("Scheduler stopped")
else:
logger.info("Scheduler stop: scheduler was not running")
self._is_running = False self._is_running = False
logger.info("SchedulerService stopped successfully")
async def ensure_started(self) -> None:
"""Ensure the scheduler is running (idempotent).
If already running, returns immediately. Otherwise, starts the scheduler.
This method is safe to call multiple times and from multiple callers.
Raises:
SchedulerServiceError: If startup fails (except for already running).
"""
if self._is_running:
logger.debug("Scheduler ensure_started called but already running")
return
logger.info("Scheduler ensure_started: starting scheduler")
await self.start()
async def trigger_rescan(self) -> bool: async def trigger_rescan(self) -> bool:
"""Manually trigger a library rescan. """Manually trigger a library rescan.
@@ -168,12 +200,12 @@ class SchedulerService:
""" """
self._config = config self._config = config
logger.info( logger.info(
"Scheduler config reloaded", "Scheduler config reloaded: enabled=%s time=%s days=%s auto_download=%s folder_scan=%s",
enabled=config.enabled, config.enabled,
schedule_time=config.schedule_time, config.schedule_time,
schedule_days=config.schedule_days, config.schedule_days,
auto_download=config.auto_download_after_rescan, config.auto_download_after_rescan,
folder_scan=config.folder_scan_enabled, config.folder_scan_enabled,
) )
if not self._scheduler or not self._scheduler.running: if not self._scheduler or not self._scheduler.running:
@@ -194,9 +226,9 @@ class SchedulerService:
if self._scheduler.get_job(_JOB_ID): if self._scheduler.get_job(_JOB_ID):
self._scheduler.reschedule_job(_JOB_ID, trigger=trigger) self._scheduler.reschedule_job(_JOB_ID, trigger=trigger)
logger.info( logger.info(
"Scheduler rescheduled with cron trigger", "Scheduler rescheduled with cron trigger: time=%s days=%s",
schedule_time=config.schedule_time, config.schedule_time,
schedule_days=config.schedule_days, config.schedule_days,
) )
else: else:
self._scheduler.add_job( self._scheduler.add_job(
@@ -208,9 +240,9 @@ class SchedulerService:
coalesce=True, coalesce=True,
) )
logger.info( logger.info(
"Scheduler job added with cron trigger", "Scheduler job added with cron trigger: time=%s days=%s",
schedule_time=config.schedule_time, config.schedule_time,
schedule_days=config.schedule_days, config.schedule_days,
) )
def get_status(self) -> dict: def get_status(self) -> dict:
@@ -264,10 +296,10 @@ class SchedulerService:
day_of_week=day_of_week, day_of_week=day_of_week,
) )
logger.debug( logger.debug(
"CronTrigger built", "CronTrigger built: hour=%s minute=%s day_of_week=%s",
hour=hour_str, hour_str,
minute=minute_str, minute_str,
day_of_week=day_of_week, day_of_week,
) )
return trigger return trigger
@@ -281,7 +313,7 @@ class SchedulerService:
ws_service = get_websocket_service() ws_service = get_websocket_service()
await ws_service.manager.broadcast({"type": event_type, "data": data}) await ws_service.manager.broadcast({"type": event_type, "data": data})
except Exception as exc: # pylint: disable=broad-exception-caught except Exception as exc: # pylint: disable=broad-exception-caught
logger.warning("WebSocket broadcast failed", event=event_type, error=str(exc)) logger.warning("WebSocket broadcast failed: event=%s error=%s", event_type, exc)
async def _auto_download_missing(self) -> None: async def _auto_download_missing(self) -> None:
"""Queue and start downloads for all series with missing episodes.""" """Queue and start downloads for all series with missing episodes."""
@@ -299,9 +331,9 @@ class SchedulerService:
elapsed = now - self._last_auto_download_time elapsed = now - self._last_auto_download_time
if elapsed < timedelta(seconds=self._auto_download_cooldown_seconds): if elapsed < timedelta(seconds=self._auto_download_cooldown_seconds):
logger.debug( logger.debug(
"Auto-download skipped: cooldown active", "Auto-download skipped: cooldown active (elapsed=%.1fs cooldown=%ds)",
elapsed_seconds=elapsed.total_seconds(), elapsed.total_seconds(),
cooldown_seconds=self._auto_download_cooldown_seconds, self._auto_download_cooldown_seconds,
) )
return return
@@ -334,30 +366,31 @@ class SchedulerService:
) )
queued_count += len(episodes) queued_count += len(episodes)
logger.info( logger.info(
"Auto-download queued episodes", "Auto-download queued episodes for series=%s count=%d",
series=series.get("key"), series.get("key"),
count=len(episodes), len(episodes),
) )
if queued_count: if queued_count:
await download_service.start_queue_processing() await download_service.start_queue_processing()
logger.info("Auto-download queue processing started", queued=queued_count) logger.info("Auto-download queue processing started: queued=%d", queued_count)
await self._broadcast("auto_download_started", {"queued_count": queued_count}) await self._broadcast("auto_download_started", {"queued_count": queued_count})
logger.info("Auto-download completed", queued_count=queued_count) logger.info("Auto-download completed: queued_count=%d", queued_count)
# Update cooldown timestamp after successful auto-download # Update cooldown timestamp after successful auto-download
self._last_auto_download_time = datetime.now(timezone.utc) self._last_auto_download_time = datetime.now(timezone.utc)
async def _perform_rescan(self) -> None: async def _perform_rescan(self) -> None:
"""Execute a library rescan and optionally trigger auto-download.""" """Execute a library rescan and optionally trigger auto-download."""
logger.info("Scheduler _perform_rescan entered", scan_in_progress=self._scan_in_progress) logger.info("Scheduler _perform_rescan entered: scan_in_progress=%s", self._scan_in_progress)
if self._scan_in_progress: if self._scan_in_progress:
logger.warning("Skipping rescan: previous scan still in progress") logger.warning("Skipping rescan: previous scan still in progress")
return return
self._scan_in_progress = True self._scan_in_progress = True
scan_start = datetime.now(timezone.utc) scan_start = datetime.now(timezone.utc)
logger.info("Scheduled rescan started at %s", scan_start.isoformat())
try: try:
logger.info("Starting scheduled library rescan") logger.info("Starting scheduled library rescan")
@@ -365,18 +398,20 @@ class SchedulerService:
from src.server.utils.dependencies import get_anime_service # noqa: PLC0415 from src.server.utils.dependencies import get_anime_service # noqa: PLC0415
anime_service = get_anime_service() anime_service = get_anime_service()
logger.info("Anime service obtained for rescan")
await self._broadcast( await self._broadcast(
"scheduled_rescan_started", "scheduled_rescan_started",
{"timestamp": scan_start.isoformat()}, {"timestamp": scan_start.isoformat()},
) )
logger.info("Calling anime_service.rescan()...")
await anime_service.rescan() await anime_service.rescan()
self._last_scan_time = datetime.now(timezone.utc) self._last_scan_time = datetime.now(timezone.utc)
duration = (self._last_scan_time - scan_start).total_seconds() duration = (self._last_scan_time - scan_start).total_seconds()
logger.info("Scheduled library rescan completed", duration_seconds=duration) logger.info("Scheduled library rescan completed: duration=%.2fs", duration)
await self._broadcast( await self._broadcast(
"scheduled_rescan_completed", "scheduled_rescan_completed",
@@ -393,8 +428,8 @@ class SchedulerService:
await self._auto_download_missing() await self._auto_download_missing()
except Exception as dl_exc: # pylint: disable=broad-exception-caught except Exception as dl_exc: # pylint: disable=broad-exception-caught
logger.error( logger.error(
"Auto-download after rescan failed", "Auto-download after rescan failed: %s",
error=str(dl_exc), dl_exc,
exc_info=True, exc_info=True,
) )
await self._broadcast( await self._broadcast(
@@ -413,10 +448,11 @@ class SchedulerService:
folder_scan_service = FolderScanService() folder_scan_service = FolderScanService()
await folder_scan_service.run_folder_scan() await folder_scan_service.run_folder_scan()
logger.info("Folder scan completed successfully")
except Exception as fs_exc: # pylint: disable=broad-exception-caught except Exception as fs_exc: # pylint: disable=broad-exception-caught
logger.error( logger.error(
"Folder scan failed", "Folder scan failed: %s",
error=str(fs_exc), fs_exc,
exc_info=True, exc_info=True,
) )
await self._broadcast( await self._broadcast(
@@ -426,7 +462,7 @@ class SchedulerService:
logger.debug("Folder scan is disabled — skipping") logger.debug("Folder scan is disabled — skipping")
except Exception as exc: # pylint: disable=broad-exception-caught except Exception as exc: # pylint: disable=broad-exception-caught
logger.error("Scheduled rescan failed", error=str(exc), exc_info=True) logger.error("Scheduled rescan failed: %s", exc, exc_info=True)
await self._broadcast( await self._broadcast(
"scheduled_rescan_error", "scheduled_rescan_error",
{"error": str(exc), "timestamp": datetime.now(timezone.utc).isoformat()}, {"error": str(exc), "timestamp": datetime.now(timezone.utc).isoformat()},
@@ -434,6 +470,7 @@ class SchedulerService:
finally: finally:
self._scan_in_progress = False self._scan_in_progress = False
logger.info("Scheduled rescan finished: scan_in_progress reset to False")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -446,9 +483,14 @@ class SchedulerService:
async def _run_rescan_job() -> None: async def _run_rescan_job() -> None:
"""Module-level job entry point — delegates to the current service.""" """Module-level job entry point — delegates to the current service."""
logger.info("=" * 60)
logger.info("APScheduler triggered _run_rescan_job") logger.info("APScheduler triggered _run_rescan_job")
logger.info("Getting scheduler service singleton...")
svc = get_scheduler_service() svc = get_scheduler_service()
logger.info("Scheduler service obtained, calling _perform_rescan()")
await svc._perform_rescan() await svc._perform_rescan()
logger.info("_run_rescan_job completed")
logger.info("=" * 60)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -462,7 +504,10 @@ def get_scheduler_service() -> SchedulerService:
"""Return the singleton SchedulerService instance.""" """Return the singleton SchedulerService instance."""
global _scheduler_service global _scheduler_service
if _scheduler_service is None: if _scheduler_service is None:
logger.info("Creating new SchedulerService singleton")
_scheduler_service = SchedulerService() _scheduler_service = SchedulerService()
else:
logger.debug("Returning existing SchedulerService singleton")
return _scheduler_service return _scheduler_service

View File

@@ -2,7 +2,7 @@
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from unittest.mock import patch from unittest.mock import AsyncMock, patch
import pytest import pytest
from httpx import ASGITransport, AsyncClient from httpx import ASGITransport, AsyncClient
@@ -207,3 +207,46 @@ async def test_tmdb_validation_endpoint_exists(authenticated_client):
assert "message" in data assert "message" in data
assert data["valid"] is False # Empty key should be invalid assert data["valid"] is False # Empty key should be invalid
assert "required" in data["message"].lower() assert "required" in data["message"].lower()
@pytest.mark.asyncio
async def test_update_config_with_anime_directory_starts_scheduler(
authenticated_client, mock_config_service
):
"""PUT /api/config with anime_directory syncs and starts scheduler."""
mock_scheduler = AsyncMock()
with patch("src.server.services.scheduler_service.get_scheduler_service") as mock_sched_fn:
mock_sched_fn.return_value = mock_scheduler
with patch("src.config.settings.settings") as mock_settings:
mock_settings.anime_directory = None
resp = await authenticated_client.put(
"/api/config",
json={"other": {"anime_directory": "/data/anime"}},
)
assert resp.status_code == 200
mock_scheduler.ensure_started.assert_called_once()
@pytest.mark.asyncio
async def test_update_config_without_anime_directory_does_not_start_scheduler(
authenticated_client, mock_config_service
):
"""PUT /api/config without new anime_directory does not call scheduler.ensure_started()."""
mock_scheduler = AsyncMock()
with patch("src.server.services.scheduler_service.get_scheduler_service") as mock_sched_fn:
mock_sched_fn.return_value = mock_scheduler
with patch("src.config.settings.settings") as mock_settings:
mock_settings.anime_directory = "/already/set"
resp = await authenticated_client.put(
"/api/config", json={"other": {}}
)
assert resp.status_code == 200
mock_scheduler.ensure_started.assert_not_called()

View File

@@ -392,6 +392,7 @@ class TestAddSeriesWithEpisodes:
nfo_created_at=datetime(2024, 1, 1, 12, 0, 0), nfo_created_at=datetime(2024, 1, 1, 12, 0, 0),
nfo_updated_at=datetime(2024, 1, 2, 12, 0, 0) nfo_updated_at=datetime(2024, 1, 2, 12, 0, 0)
) )
mock_db_series.id = 1
# Create service with mocked WebSocket # Create service with mocked WebSocket
anime_service = AnimeService(mock_series_app) anime_service = AnimeService(mock_series_app)
@@ -403,12 +404,21 @@ class TestAddSeriesWithEpisodes:
mock_db_session.__aenter__ = AsyncMock(return_value=mock_db_session) mock_db_session.__aenter__ = AsyncMock(return_value=mock_db_session)
mock_db_session.__aexit__ = AsyncMock() mock_db_session.__aexit__ = AsyncMock()
# Mock episodes that match the in-memory episodeDict
mock_episodes = [
MagicMock(season=1, episode_number=1),
MagicMock(season=1, episode_number=2),
MagicMock(season=1, episode_number=3),
]
with patch('src.server.database.connection.get_db_session', return_value=mock_db_session): with patch('src.server.database.connection.get_db_session', return_value=mock_db_session):
with patch('src.server.database.service.AnimeSeriesService') as MockAnimeSeriesService: with patch('src.server.database.service.AnimeSeriesService') as MockAnimeSeriesService:
MockAnimeSeriesService.get_by_key = AsyncMock(return_value=mock_db_series) MockAnimeSeriesService.get_by_key = AsyncMock(return_value=mock_db_series)
with patch('src.server.database.service.EpisodeService') as MockEpisodeService:
MockEpisodeService.get_by_series = AsyncMock(return_value=mock_episodes)
# Act # Act
await anime_service._broadcast_series_updated(key) await anime_service._broadcast_series_updated(key)
# Assert # Assert
mock_websocket.broadcast.assert_called_once() mock_websocket.broadcast.assert_called_once()

View File

@@ -559,3 +559,38 @@ class TestStartupRecovery:
info_calls = [str(c) for c in mock_logger.info.call_args_list] info_calls = [str(c) for c in mock_logger.info.call_args_list]
assert any("next_run" in c for c in info_calls) assert any("next_run" in c for c in info_calls)
# ---------------------------------------------------------------------------
# 12.8 ensure_started() idempotent startup
# ---------------------------------------------------------------------------
class TestEnsureStarted:
@pytest.mark.asyncio
async def test_ensure_started_when_not_running(
self, scheduler_service, mock_config_service
):
"""ensure_started() calls start() when scheduler is not running."""
# Mock start method
scheduler_service.start = AsyncMock()
# Call ensure_started
await scheduler_service.ensure_started()
# Verify start() was called exactly once
scheduler_service.start.assert_called_once()
@pytest.mark.asyncio
async def test_ensure_started_when_already_running(self, scheduler_service):
"""ensure_started() returns immediately when already running (idempotent)."""
# Set up as already running
scheduler_service._is_running = True
# Mock start method
scheduler_service.start = AsyncMock()
# Call ensure_started
await scheduler_service.ensure_started()
# Verify start() was NOT called
scheduler_service.start.assert_not_called()