refactor(scheduler): replace structlog with std logging and add extensive diagnostics

- Switch scheduler_service from structlog to standard logging for consistency
- Add detailed lifecycle logging in SchedulerService (start, stop, rescan)
- Add debug logging in fastapi_app scheduler initialization
- Fix test_add_series_episodes to mock EpisodeService.get_by_series
This commit is contained in:
2026-05-26 07:51:22 +02:00
parent a3176f5ac1
commit 3947f6d266
3 changed files with 89 additions and 48 deletions

View File

@@ -121,7 +121,7 @@ async def _run_startup_health_checks(logger) -> dict:
import asyncio
import shutil
import socket
from typing import Dict, Any
from typing import Any, Dict
checks: Dict[str, Any] = {
"ffmpeg": {"status": "unknown", "message": None},
@@ -400,13 +400,15 @@ async def lifespan(_application: FastAPI):
# Initialize and start scheduler service
try:
logger.info("Initializing scheduler service...")
from src.server.services.scheduler_service import (
get_scheduler_service,
)
scheduler_service = get_scheduler_service()
logger.info("Scheduler service instance obtained, starting...")
await scheduler_service.start()
initialized['scheduler'] = True
logger.info("Scheduler service started")
logger.info("Scheduler service started successfully")
except Exception as e:
logger.warning("Failed to start scheduler service: %s", e)
# Continue - scheduler is optional

View File

@@ -10,10 +10,10 @@ cron time), the job is triggered immediately within a grace period.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import List, Optional
import structlog
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
@@ -21,7 +21,7 @@ from apscheduler.triggers.cron import CronTrigger
from src.server.models.config import SchedulerConfig
from src.server.services.config_service import ConfigServiceError, get_config_service
logger = structlog.get_logger(__name__)
logger = logging.getLogger(__name__)
_JOB_ID = "scheduled_rescan"
@@ -69,15 +69,18 @@ class SchedulerService:
SchedulerServiceError: If the scheduler is already running or
config cannot be loaded.
"""
logger.info("SchedulerService.start() called")
if self._is_running:
logger.warning("Scheduler start called but already running")
raise SchedulerServiceError("Scheduler is already running")
try:
config_service = get_config_service()
config = config_service.load_config()
self._config = config.scheduler
logger.info("Scheduler config loaded successfully")
except ConfigServiceError as exc:
logger.error("Failed to load scheduler configuration", error=str(exc))
logger.error("Failed to load scheduler configuration: %s", exc)
raise SchedulerServiceError(f"Failed to load config: {exc}") from exc
jobstores = {
@@ -90,6 +93,15 @@ class SchedulerService:
self._is_running = True
return
logger.info(
"Scheduler config loaded: enabled=%s time=%s days=%s auto_download=%s folder_scan=%s",
self._config.enabled,
self._config.schedule_time,
self._config.schedule_days,
self._config.auto_download_after_rescan,
self._config.folder_scan_enabled,
)
trigger = self._build_cron_trigger()
if trigger is None:
logger.warning(
@@ -105,9 +117,9 @@ class SchedulerService:
coalesce=True,
)
logger.info(
"Scheduler started with cron trigger",
schedule_time=self._config.schedule_time,
schedule_days=self._config.schedule_days,
"Scheduler started with cron trigger: time=%s days=%s",
self._config.schedule_time,
self._config.schedule_days,
)
self._scheduler.start()
@@ -121,12 +133,13 @@ class SchedulerService:
if job:
next_run = job.next_run_time
logger.info(
"Scheduler next run",
next_run=next_run.isoformat() if next_run else None,
"Scheduler next run: %s",
next_run.isoformat() if next_run else None,
)
async def stop(self) -> None:
"""Stop the APScheduler gracefully."""
logger.info("SchedulerService.stop() called")
if not self._is_running:
logger.debug("Scheduler stop called but not running")
return
@@ -134,8 +147,11 @@ class SchedulerService:
if self._scheduler and self._scheduler.running:
self._scheduler.shutdown(wait=False)
logger.info("Scheduler stopped")
else:
logger.info("Scheduler stop: scheduler was not running")
self._is_running = False
logger.info("SchedulerService stopped successfully")
async def trigger_rescan(self) -> bool:
"""Manually trigger a library rescan.
@@ -168,12 +184,12 @@ class SchedulerService:
"""
self._config = config
logger.info(
"Scheduler config reloaded",
enabled=config.enabled,
schedule_time=config.schedule_time,
schedule_days=config.schedule_days,
auto_download=config.auto_download_after_rescan,
folder_scan=config.folder_scan_enabled,
"Scheduler config reloaded: enabled=%s time=%s days=%s auto_download=%s folder_scan=%s",
config.enabled,
config.schedule_time,
config.schedule_days,
config.auto_download_after_rescan,
config.folder_scan_enabled,
)
if not self._scheduler or not self._scheduler.running:
@@ -194,9 +210,9 @@ class SchedulerService:
if self._scheduler.get_job(_JOB_ID):
self._scheduler.reschedule_job(_JOB_ID, trigger=trigger)
logger.info(
"Scheduler rescheduled with cron trigger",
schedule_time=config.schedule_time,
schedule_days=config.schedule_days,
"Scheduler rescheduled with cron trigger: time=%s days=%s",
config.schedule_time,
config.schedule_days,
)
else:
self._scheduler.add_job(
@@ -208,9 +224,9 @@ class SchedulerService:
coalesce=True,
)
logger.info(
"Scheduler job added with cron trigger",
schedule_time=config.schedule_time,
schedule_days=config.schedule_days,
"Scheduler job added with cron trigger: time=%s days=%s",
config.schedule_time,
config.schedule_days,
)
def get_status(self) -> dict:
@@ -264,10 +280,10 @@ class SchedulerService:
day_of_week=day_of_week,
)
logger.debug(
"CronTrigger built",
hour=hour_str,
minute=minute_str,
day_of_week=day_of_week,
"CronTrigger built: hour=%s minute=%s day_of_week=%s",
hour_str,
minute_str,
day_of_week,
)
return trigger
@@ -281,7 +297,7 @@ class SchedulerService:
ws_service = get_websocket_service()
await ws_service.manager.broadcast({"type": event_type, "data": data})
except Exception as exc: # pylint: disable=broad-exception-caught
logger.warning("WebSocket broadcast failed", event=event_type, error=str(exc))
logger.warning("WebSocket broadcast failed: event=%s error=%s", event_type, exc)
async def _auto_download_missing(self) -> None:
"""Queue and start downloads for all series with missing episodes."""
@@ -299,9 +315,9 @@ class SchedulerService:
elapsed = now - self._last_auto_download_time
if elapsed < timedelta(seconds=self._auto_download_cooldown_seconds):
logger.debug(
"Auto-download skipped: cooldown active",
elapsed_seconds=elapsed.total_seconds(),
cooldown_seconds=self._auto_download_cooldown_seconds,
"Auto-download skipped: cooldown active (elapsed=%.1fs cooldown=%ds)",
elapsed.total_seconds(),
self._auto_download_cooldown_seconds,
)
return
@@ -334,30 +350,31 @@ class SchedulerService:
)
queued_count += len(episodes)
logger.info(
"Auto-download queued episodes",
series=series.get("key"),
count=len(episodes),
"Auto-download queued episodes for series=%s count=%d",
series.get("key"),
len(episodes),
)
if queued_count:
await download_service.start_queue_processing()
logger.info("Auto-download queue processing started", queued=queued_count)
logger.info("Auto-download queue processing started: queued=%d", queued_count)
await self._broadcast("auto_download_started", {"queued_count": queued_count})
logger.info("Auto-download completed", queued_count=queued_count)
logger.info("Auto-download completed: queued_count=%d", queued_count)
# Update cooldown timestamp after successful auto-download
self._last_auto_download_time = datetime.now(timezone.utc)
async def _perform_rescan(self) -> None:
"""Execute a library rescan and optionally trigger auto-download."""
logger.info("Scheduler _perform_rescan entered", scan_in_progress=self._scan_in_progress)
logger.info("Scheduler _perform_rescan entered: scan_in_progress=%s", self._scan_in_progress)
if self._scan_in_progress:
logger.warning("Skipping rescan: previous scan still in progress")
return
self._scan_in_progress = True
scan_start = datetime.now(timezone.utc)
logger.info("Scheduled rescan started at %s", scan_start.isoformat())
try:
logger.info("Starting scheduled library rescan")
@@ -365,18 +382,20 @@ class SchedulerService:
from src.server.utils.dependencies import get_anime_service # noqa: PLC0415
anime_service = get_anime_service()
logger.info("Anime service obtained for rescan")
await self._broadcast(
"scheduled_rescan_started",
{"timestamp": scan_start.isoformat()},
)
logger.info("Calling anime_service.rescan()...")
await anime_service.rescan()
self._last_scan_time = datetime.now(timezone.utc)
duration = (self._last_scan_time - scan_start).total_seconds()
logger.info("Scheduled library rescan completed", duration_seconds=duration)
logger.info("Scheduled library rescan completed: duration=%.2fs", duration)
await self._broadcast(
"scheduled_rescan_completed",
@@ -393,8 +412,8 @@ class SchedulerService:
await self._auto_download_missing()
except Exception as dl_exc: # pylint: disable=broad-exception-caught
logger.error(
"Auto-download after rescan failed",
error=str(dl_exc),
"Auto-download after rescan failed: %s",
dl_exc,
exc_info=True,
)
await self._broadcast(
@@ -413,10 +432,11 @@ class SchedulerService:
folder_scan_service = FolderScanService()
await folder_scan_service.run_folder_scan()
logger.info("Folder scan completed successfully")
except Exception as fs_exc: # pylint: disable=broad-exception-caught
logger.error(
"Folder scan failed",
error=str(fs_exc),
"Folder scan failed: %s",
fs_exc,
exc_info=True,
)
await self._broadcast(
@@ -426,7 +446,7 @@ class SchedulerService:
logger.debug("Folder scan is disabled — skipping")
except Exception as exc: # pylint: disable=broad-exception-caught
logger.error("Scheduled rescan failed", error=str(exc), exc_info=True)
logger.error("Scheduled rescan failed: %s", exc, exc_info=True)
await self._broadcast(
"scheduled_rescan_error",
{"error": str(exc), "timestamp": datetime.now(timezone.utc).isoformat()},
@@ -434,6 +454,7 @@ class SchedulerService:
finally:
self._scan_in_progress = False
logger.info("Scheduled rescan finished: scan_in_progress reset to False")
# ---------------------------------------------------------------------------
@@ -446,9 +467,14 @@ class SchedulerService:
async def _run_rescan_job() -> None:
"""Module-level job entry point — delegates to the current service."""
logger.info("=" * 60)
logger.info("APScheduler triggered _run_rescan_job")
logger.info("Getting scheduler service singleton...")
svc = get_scheduler_service()
logger.info("Scheduler service obtained, calling _perform_rescan()")
await svc._perform_rescan()
logger.info("_run_rescan_job completed")
logger.info("=" * 60)
# ---------------------------------------------------------------------------
@@ -462,7 +488,10 @@ def get_scheduler_service() -> SchedulerService:
"""Return the singleton SchedulerService instance."""
global _scheduler_service
if _scheduler_service is None:
logger.info("Creating new SchedulerService singleton")
_scheduler_service = SchedulerService()
else:
logger.debug("Returning existing SchedulerService singleton")
return _scheduler_service

View File

@@ -392,6 +392,7 @@ class TestAddSeriesWithEpisodes:
nfo_created_at=datetime(2024, 1, 1, 12, 0, 0),
nfo_updated_at=datetime(2024, 1, 2, 12, 0, 0)
)
mock_db_series.id = 1
# Create service with mocked WebSocket
anime_service = AnimeService(mock_series_app)
@@ -403,9 +404,18 @@ class TestAddSeriesWithEpisodes:
mock_db_session.__aenter__ = AsyncMock(return_value=mock_db_session)
mock_db_session.__aexit__ = AsyncMock()
# Mock episodes that match the in-memory episodeDict
mock_episodes = [
MagicMock(season=1, episode_number=1),
MagicMock(season=1, episode_number=2),
MagicMock(season=1, episode_number=3),
]
with patch('src.server.database.connection.get_db_session', return_value=mock_db_session):
with patch('src.server.database.service.AnimeSeriesService') as MockAnimeSeriesService:
MockAnimeSeriesService.get_by_key = AsyncMock(return_value=mock_db_series)
with patch('src.server.database.service.EpisodeService') as MockEpisodeService:
MockEpisodeService.get_by_series = AsyncMock(return_value=mock_episodes)
# Act
await anime_service._broadcast_series_updated(key)