refactor(scheduler): separate scheduler logic from scan/rescan logic

- Extract rescan logic into new RescanService (src/server/services/rescan_service.py)
- SchedulerService now only handles APScheduler cron scheduling
- Move scheduler sub-services (folder_rename, folder_scan, key_resolution) to scheduler/ folder
- Keep RescanOrchestrator as backward-compatible alias
- Update all imports across api/, server/, and test files
This commit is contained in:
2026-06-03 20:58:30 +02:00
parent 9d64241230
commit 9c3f03d610
25 changed files with 1080 additions and 578 deletions

View File

@@ -0,0 +1,291 @@
"""Rescan service — orchestrates library rescans.
This service handles the actual scan/rescan logic:
- Library rescan via anime_service
- Auto-download of missing episodes (if enabled)
- Folder maintenance scan (if enabled)
- Orphaned folder key resolution
SchedulerService only calls RescanService.execute() — it does not
know about the internal steps.
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from src.server.models.config import SchedulerConfig
logger = logging.getLogger(__name__)
_AUTO_DOWNLOAD_COOLDOWN_SECONDS = 300 # 5 minutes
class RescanService:
"""Orchestrates all rescan-related operations.
Encapsulates the full post-rescan workflow so SchedulerService
only needs to call a single execute() method.
"""
def __init__(self, config: Optional[SchedulerConfig] = None) -> None:
"""Initialize the rescan service.
Args:
config: Optional scheduler config. If None, operations that depend
on config flags (auto_download, folder_scan) will be skipped.
"""
self._config = config
self._last_scan_time: Optional[datetime] = None
self._last_auto_download_time: Optional[datetime] = None
@property
def last_scan_time(self) -> Optional[datetime]:
return self._last_scan_time
# ------------------------------------------------------------------
# Public entry point
# ------------------------------------------------------------------
async def execute(self) -> dict:
"""Execute the full rescan workflow.
Runs in order:
1. anime_service.rescan()
2. auto-download (if enabled)
3. folder scan (if enabled)
4. key resolution scan (always, if anime_directory configured)
Returns:
Dict with duration and counts for each step.
"""
from src.server.services.websocket_service import get_websocket_service
scan_start = datetime.now(timezone.utc)
results = {
"started_at": scan_start.isoformat(),
"duration_seconds": 0.0,
"rescan_completed": False,
"auto_download_queued": 0,
"folder_scan_completed": False,
"key_resolution": {"resolved": 0, "skipped": 0, "errors": 0},
}
await self._broadcast("scheduled_rescan_started", {"timestamp": scan_start.isoformat()})
try:
# 1. Main library rescan
await self._run_rescan()
results["rescan_completed"] = True
# 2. Auto-download
if self._config and self._config.auto_download_after_rescan:
try:
queued = await self._run_auto_download()
results["auto_download_queued"] = queued
await self._broadcast("auto_download_started", {"queued_count": queued})
except Exception as exc:
logger.error("Auto-download failed: %s", exc, exc_info=True)
await self._broadcast("auto_download_error", {"error": str(exc)})
# 3. Folder scan
if self._config and self._config.folder_scan_enabled:
try:
await self._run_folder_scan()
results["folder_scan_completed"] = True
except Exception as exc:
logger.error("Folder scan failed: %s", exc, exc_info=True)
await self._broadcast("folder_scan_error", {"error": str(exc)})
# 4. Key resolution scan
try:
key_stats = await self._run_key_resolution()
results["key_resolution"] = key_stats
except Exception as exc:
logger.error("Key resolution scan failed: %s", exc, exc_info=True)
self._last_scan_time = datetime.now(timezone.utc)
results["duration_seconds"] = (self._last_scan_time - scan_start).total_seconds()
await self._broadcast(
"scheduled_rescan_completed",
{
"timestamp": self._last_scan_time.isoformat(),
"duration_seconds": results["duration_seconds"],
},
)
logger.info(
"Scheduled library rescan completed: duration=%.2fs",
results["duration_seconds"],
)
except Exception as exc:
logger.error("Scheduled rescan failed: %s", exc, exc_info=True)
await self._broadcast(
"scheduled_rescan_error",
{"error": str(exc), "timestamp": datetime.now(timezone.utc).isoformat()},
)
raise
return results
# ------------------------------------------------------------------
# Step 1: Library rescan
# ------------------------------------------------------------------
async def _run_rescan(self) -> None:
"""Run the anime service rescan."""
from src.server.utils.dependencies import get_anime_service
anime_service = get_anime_service()
logger.info("Anime service obtained, calling anime_service.rescan()...")
await anime_service.rescan()
logger.info("anime_service.rescan() completed")
# ------------------------------------------------------------------
# Step 2: Auto-download
# ------------------------------------------------------------------
async def _run_auto_download(self) -> int:
"""Queue and start downloads for all series with missing episodes.
Returns:
Number of episodes queued.
"""
from src.server.models.download import EpisodeIdentifier
from src.server.utils.dependencies import (
get_anime_service,
get_download_service,
)
# Cooldown check to prevent rapid re-triggers
now = datetime.now(timezone.utc)
if self._last_auto_download_time is not None:
elapsed = now - self._last_auto_download_time
if elapsed < timedelta(seconds=_AUTO_DOWNLOAD_COOLDOWN_SECONDS):
logger.debug(
"Auto-download skipped: cooldown active (elapsed=%.1fs cooldown=%ds)",
elapsed.total_seconds(),
_AUTO_DOWNLOAD_COOLDOWN_SECONDS,
)
return 0
anime_service = get_anime_service()
download_service = get_download_service()
series_list = anime_service._cached_list_missing()
queued_count = 0
for series in series_list:
episode_dict: dict = series.get("episodeDict") or {}
if not episode_dict:
continue
episodes: List[EpisodeIdentifier] = []
for season_str, ep_numbers in episode_dict.items():
for ep_num in ep_numbers:
episodes.append(
EpisodeIdentifier(season=int(season_str), episode=int(ep_num))
)
if not episodes:
continue
await download_service.add_to_queue(
serie_id=series.get("key", ""),
serie_folder=series.get("folder", series.get("name", "")),
serie_name=series.get("name", ""),
episodes=episodes,
)
queued_count += len(episodes)
logger.info(
"Auto-download queued episodes for series=%s count=%d",
series.get("key"),
len(episodes),
)
if queued_count:
await download_service.start_queue_processing()
logger.info("Auto-download queue processing started: queued=%d", queued_count)
self._last_auto_download_time = datetime.now(timezone.utc)
logger.info("Auto-download completed: queued_count=%d", queued_count)
return queued_count
# ------------------------------------------------------------------
# Step 3: Folder scan
# ------------------------------------------------------------------
async def _run_folder_scan(self) -> None:
"""Run the folder scan maintenance task."""
from src.server.services.scheduler.folder_scan_service import FolderScanService
folder_scan_service = FolderScanService()
await folder_scan_service.run_folder_scan()
logger.info("Folder scan completed successfully")
# ------------------------------------------------------------------
# Step 4: Key resolution
# ------------------------------------------------------------------
async def _run_key_resolution(self) -> dict:
"""Run the orphaned folder key resolution scan.
Returns:
Dict with resolved/skipped/errors counts.
"""
from src.server.services.scheduler.key_resolution_service import (
perform_key_resolution_scan,
)
key_stats = await perform_key_resolution_scan()
logger.info(
"Key resolution scan completed: resolved=%d, skipped=%d, errors=%d",
key_stats["resolved"],
key_stats["skipped"],
key_stats["errors"],
)
return key_stats
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
async def _broadcast(self, event_type: str, data: dict) -> None:
"""Broadcast a WebSocket event to all connected clients."""
try:
from src.server.services.websocket_service import get_websocket_service
ws_service = get_websocket_service()
await ws_service.manager.broadcast({"type": event_type, "data": data})
except Exception as exc:
logger.warning(
"WebSocket broadcast failed: event=%s error=%s", event_type, exc
)
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
_rescan_service: Optional[RescanService] = None
def get_rescan_service(config: Optional[SchedulerConfig] = None) -> RescanService:
"""Return a RescanService singleton (or create with optional config)."""
global _rescan_service
if _rescan_service is None or config is not None:
_rescan_service = RescanService(config=config)
logger.debug("Created new RescanService singleton")
else:
logger.debug("Returning existing RescanService singleton")
return _rescan_service
def reset_rescan_service() -> None:
"""Reset the singleton (used in tests)."""
global _rescan_service
_rescan_service = None

View File

@@ -0,0 +1,45 @@
"""Scheduler services package.
Contains scheduler orchestration and rescan coordination:
- scheduler_service: Cron-based scheduler using APScheduler
- rescan_orchestrator: Legacy alias for RescanService (for backward compatibility)
"""
from __future__ import annotations
from src.server.services.rescan_service import (
RescanService,
get_rescan_service,
reset_rescan_service,
)
# Backward compatibility alias
from src.server.services.scheduler.rescan_orchestrator import (
RescanOrchestrator,
get_rescan_orchestrator,
reset_rescan_orchestrator,
)
from src.server.services.scheduler.scheduler_service import (
SchedulerService,
SchedulerServiceError,
get_scheduler_service,
reset_scheduler_service,
)
__all__ = [
# RescanService (new location)
"RescanService",
"get_rescan_service",
"reset_rescan_service",
# Scheduler
"SchedulerService",
"SchedulerServiceError",
"get_scheduler_service",
"reset_scheduler_service",
# Backward compatibility
"RescanOrchestrator",
"get_rescan_orchestrator",
"reset_rescan_orchestrator",
# Sub-services (still in scheduler folder)
"folder_rename_service",
]

View File

@@ -13,9 +13,12 @@ reflect the new paths.
from __future__ import annotations
import logging
import re
import shutil
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
from typing import Optional
from lxml import etree
@@ -31,10 +34,11 @@ from src.server.utils.filesystem import sanitize_folder_name
logger = logging.getLogger(__name__)
# Characters that are invalid in filesystem paths across platforms
INVALID_PATH_CHARS = '<>:"/\\|?*\x00'
# Pre-compiled pattern for stripping existing year suffixes
_YEAR_SUFFIX_PATTERN = re.compile(r'(\s*\(\d{4}\))+\s*$')
@dataclass
class DuplicateGroup:
"""Represents a group of duplicate folders for the same series.
@@ -44,10 +48,9 @@ class DuplicateGroup:
nfo_paths: List of corresponding NFO file paths.
"""
def __init__(self, key: str, folders: List[str], nfo_paths: List[Path]):
self.key = key
self.folders = folders
self.nfo_paths = nfo_paths
key: str
folders: list[str]
nfo_paths: list[Path]
@property
def count(self) -> int:
@@ -57,7 +60,20 @@ class DuplicateGroup:
return f"DuplicateGroup(key={self.key!r}, folders={self.folders})"
def _scan_for_pre_existing_duplicates(anime_dir: Path) -> List[DuplicateGroup]:
@dataclass
class RenameStats:
"""Statistics from a folder rename operation."""
scanned: int = 0
renamed: int = 0
skipped: int = 0
errors: int = 0
def to_dict(self) -> dict[str, int]:
return {"scanned": self.scanned, "renamed": self.renamed, "skipped": self.skipped, "errors": self.errors}
def _scan_for_pre_existing_duplicates(anime_dir: Path) -> list[DuplicateGroup]:
"""Scan anime directory for pre-existing duplicate folders.
Groups folders by the series key extracted from their NFO files.
@@ -69,8 +85,7 @@ def _scan_for_pre_existing_duplicates(anime_dir: Path) -> List[DuplicateGroup]:
Returns:
List of DuplicateGroup objects, one per series with duplicate folders.
"""
# Group folders by their expected name (title+year from NFO)
groups: Dict[str, List[Tuple[str, Path]]] = defaultdict(list)
groups: dict[str, list[tuple[str, Path]]] = defaultdict(list)
for series_dir in anime_dir.iterdir():
if not series_dir.is_dir():
@@ -84,7 +99,6 @@ def _scan_for_pre_existing_duplicates(anime_dir: Path) -> List[DuplicateGroup]:
expected_name = _compute_expected_folder_name(title, year)
groups[expected_name].append((series_dir.name, nfo_path))
# Filter to only groups with more than one folder
duplicates = []
for key, items in groups.items():
if len(items) > 1:
@@ -111,16 +125,14 @@ def _try_merge_duplicate_group(group: DuplicateGroup, dry_run: bool = False) ->
if len(group.folders) < 2:
return True
# Keep first folder as canonical, mark others for removal
canonical = group.folders[0]
to_remove = group.folders[1:]
for folder in to_remove:
folder_path = group.nfo_paths[0].parent.parent / folder # same parent dir
folder_path = group.nfo_paths[0].parent.parent / folder
if not folder_path.exists():
continue
# Check if folder is empty or only has symlinks
try:
contents = list(folder_path.iterdir())
except PermissionError:
@@ -130,7 +142,6 @@ def _try_merge_duplicate_group(group: DuplicateGroup, dry_run: bool = False) ->
return False
if not contents:
# Empty folder - safe to remove
if dry_run:
logger.info("[DRY-RUN] Would delete empty duplicate folder: %s", folder_path)
else:
@@ -141,9 +152,9 @@ def _try_merge_duplicate_group(group: DuplicateGroup, dry_run: bool = False) ->
return False
continue
# Check if all contents are symlinks pointing to canonical
canonical_path = folder_path.parent / canonical
all_symlinks = all(
item.is_symlink() and item.resolve() == (folder_path.parent / canonical).resolve()
item.is_symlink() and item.resolve() == canonical_path.resolve()
for item in contents
)
if all_symlinks:
@@ -159,7 +170,6 @@ def _try_merge_duplicate_group(group: DuplicateGroup, dry_run: bool = False) ->
return False
continue
# Cannot auto-merge - requires manual intervention
logger.warning(
"Cannot auto-merge duplicate folders for '%s': %s (manual merge required)",
group.key,
@@ -170,7 +180,7 @@ def _try_merge_duplicate_group(group: DuplicateGroup, dry_run: bool = False) ->
return True
def _parse_nfo_title_and_year(nfo_path: Path) -> Tuple[Optional[str], Optional[str]]:
def _parse_nfo_title_and_year(nfo_path: Path) -> tuple[Optional[str], Optional[str]]:
"""Parse a tvshow.nfo and return (title, year) text values.
Args:
@@ -194,7 +204,7 @@ def _parse_nfo_title_and_year(nfo_path: Path) -> Tuple[Optional[str], Optional[s
except etree.XMLSyntaxError as exc:
logger.warning("Malformed XML in %s: %s", nfo_path, exc)
return None, None
except Exception as exc: # pylint: disable=broad-except
except Exception as exc:
logger.warning("Unexpected error parsing %s: %s", nfo_path, exc)
return None, None
@@ -212,13 +222,7 @@ def _compute_expected_folder_name(title: str, year: str) -> str:
Returns:
Sanitised folder name in the format ``"{title} ({year})"``.
"""
import re
# Remove all trailing year suffixes to prevent duplication.
# This handles cases where the title already contains one or more years.
# Regex pattern: matches one or more " (YYYY)" at the end of the string
clean_title = re.sub(r'(\s*\(\d{4}\))+\s*$', '', title).strip()
clean_title = _YEAR_SUFFIX_PATTERN.sub('', title).strip()
year_suffix = f" ({year})"
raw_name = f"{clean_title}{year_suffix}"
return sanitize_folder_name(raw_name)
@@ -236,42 +240,55 @@ def _is_series_being_downloaded(series_folder: str) -> bool:
"""
try:
download_service = get_download_service()
active = download_service._active_download # pylint: disable=protected-access
active = download_service._active_download
if active and active.serie_folder == series_folder:
return True
for item in download_service._pending_queue: # pylint: disable=protected-access
for item in download_service._pending_queue:
if item.serie_folder == series_folder:
return True
return False
except Exception as exc: # pylint: disable=broad-except
except Exception as exc:
logger.warning(
"Could not check download status for %s: %s", series_folder, exc
)
# Safer to skip renaming if we can't verify download status.
return True
def _cleanup_stale_files_after_rename(new_path: Path, new_name: str) -> None:
"""Remove legacy 'key' file after successful folder rename.
Also checks for orphaned folders with the same key that may have been
left behind from previous rename operations.
def _remove_key_file(path: Path) -> None:
"""Remove legacy 'key' file from a series folder.
Args:
new_path: The new folder path after rename.
new_name: The new folder name.
path: Path to the series folder.
"""
key_file = new_path / "key"
key_file = path / "key"
if key_file.exists():
try:
key_file.unlink()
logger.info(
"Removed legacy 'key' file after rename: %s", key_file
)
logger.info("Removed legacy 'key' file after rename: %s", key_file)
except OSError as exc:
logger.warning(
"Could not remove legacy 'key' file %s: %s", key_file, exc
)
logger.warning("Could not remove legacy 'key' file %s: %s", key_file, exc)
def _move_file(item: Path, dest: Path) -> bool:
"""Move a single file or directory to destination.
Args:
item: Source path to move.
dest: Destination path.
Returns:
True if move succeeded, False otherwise.
"""
try:
item.rename(dest)
logger.debug("Moved %s%s", item, dest)
return True
except PermissionError as exc:
logger.warning("Permission denied moving %s: %s", item, exc)
return False
except OSError as exc:
logger.warning("OS error moving %s: %s", item, exc)
return False
def _cleanup_orphaned_folder(old_path: Path, new_path: Path, dry_run: bool = False) -> bool:
@@ -291,53 +308,36 @@ def _cleanup_orphaned_folder(old_path: Path, new_path: Path, dry_run: bool = Fal
False if old folder does not exist or cleanup failed.
"""
if not old_path.exists():
logger.debug(
"Old folder does not exist, no cleanup needed: %s", old_path
)
logger.debug("Old folder does not exist, no cleanup needed: %s", old_path)
return False
# Check if folder is empty
try:
contents = list(old_path.iterdir())
except PermissionError as exc:
logger.warning(
"Permission denied accessing old folder %s: %s", old_path, exc
)
logger.warning("Permission denied accessing old folder %s: %s", old_path, exc)
return False
except OSError as exc:
logger.warning(
"OS error accessing old folder %s: %s", old_path, exc
)
logger.warning("OS error accessing old folder %s: %s", old_path, exc)
return False
if not contents:
# Empty folder — delete it
if dry_run:
logger.info(
"[DRY-RUN] Would delete empty orphaned folder: %s", old_path
)
logger.info("[DRY-RUN] Would delete empty orphaned folder: %s", old_path)
return True
try:
old_path.rmdir()
logger.info("Deleted empty orphaned folder: %s", old_path)
return True
except PermissionError as exc:
logger.warning(
"Permission denied deleting folder %s: %s", old_path, exc
)
logger.warning("Permission denied deleting folder %s: %s", old_path, exc)
return False
except OSError as exc:
logger.warning(
"OS error deleting folder %s: %s", old_path, exc
)
logger.warning("OS error deleting folder %s: %s", old_path, exc)
return False
# Folder has contents — move files to new_path then delete
if dry_run:
logger.info(
"[DRY-RUN] Would move %d files from orphaned folder %s to %s",
len(contents), old_path, new_path
)
logger.info("[DRY-RUN] Would move %d files from orphaned folder %s to %s",
len(contents), old_path, new_path)
for item in contents:
logger.info("[DRY-RUN] Would move: %s%s", item, new_path / item.name)
logger.info("[DRY-RUN] Would then delete orphaned folder: %s", old_path)
@@ -346,41 +346,86 @@ def _cleanup_orphaned_folder(old_path: Path, new_path: Path, dry_run: bool = Fal
files_moved = 0
errors = 0
for item in contents:
try:
dest = new_path / item.name
item.rename(dest)
logger.debug("Moved %s%s", item, dest)
if not _move_file(item, new_path / item.name):
errors += 1
else:
files_moved += 1
except PermissionError as exc:
logger.warning(
"Permission denied moving %s: %s", item, exc
)
errors += 1
except OSError as exc:
logger.warning(
"OS error moving %s: %s", item, exc
)
errors += 1
if files_moved > 0:
logger.info(
"Moved %d files from orphaned folder to %s",
files_moved, new_path
)
logger.info("Moved %d files from orphaned folder to %s", files_moved, new_path)
# Delete the now-empty old folder
try:
old_path.rmdir()
logger.info("Deleted orphaned folder after moving contents: %s", old_path)
return errors == 0
except OSError as exc:
logger.warning(
"Could not delete orphaned folder %s (may not be empty): %s",
old_path, exc
)
logger.warning("Could not delete orphaned folder %s (may not be empty): %s", old_path, exc)
return False
def _update_series_folder(db, series, new_folder: str) -> None:
"""Update AnimeSeries.folder in the database.
Args:
db: Database session.
series: The AnimeSeries instance to update.
new_folder: New folder name.
"""
if series is None:
return
AnimeSeriesService.update(db, series.id, folder=new_folder)
logger.info("Updated AnimeSeries.folder: %s (id=%s)", new_folder, series.id)
def _update_episode_paths(episodes, old_series_path: Path, new_series_path: Path) -> None:
"""Update Episode.file_path for all episodes of a series.
Args:
episodes: List of Episode instances.
old_series_path: Path to the old series folder.
new_series_path: Path to the new series folder.
"""
for episode in episodes:
if not episode.file_path:
continue
old_file_path = Path(episode.file_path)
try:
old_file_path.relative_to(old_series_path)
new_file_path = new_series_path / old_file_path.relative_to(old_series_path)
episode.file_path = str(new_file_path)
logger.debug("Updated Episode.file_path: %s%s", old_file_path, new_file_path)
except ValueError:
pass
def _update_queue_destinations(
queue_items,
series_id,
old_series_path: Path,
new_series_path: Path,
) -> None:
"""Update DownloadQueueItem.file_destination for pending items.
Args:
queue_items: List of DownloadQueueItem instances.
series_id: ID of the series to filter by.
old_series_path: Path to the old series folder.
new_series_path: Path to the new series folder.
"""
for item in queue_items:
if item.series_id != series_id or not item.file_destination:
continue
old_dest = Path(item.file_destination)
try:
old_dest.relative_to(old_series_path)
new_dest = new_series_path / old_dest.relative_to(old_series_path)
item.file_destination = str(new_dest)
logger.debug("Updated DownloadQueueItem.file_destination: %s%s", old_dest, new_dest)
except ValueError:
pass
async def _update_database_paths(
old_folder: str,
new_folder: str,
@@ -402,82 +447,138 @@ async def _update_database_paths(
new_series_path = anime_dir / new_folder
async with get_db_session() as db:
# 1. Update AnimeSeries.folder
series = await AnimeSeriesService.get_by_folder(db, old_folder)
if series is None:
# Fallback: try to find by folder name
all_series = await AnimeSeriesService.get_all(db)
for s in all_series:
if s.folder == old_folder:
series = s
break
_update_series_folder(db, series, new_folder)
if series is None:
logger.warning(
"No database record found for folder '%s', skipping DB update",
old_folder,
)
return
await AnimeSeriesService.update(db, series.id, folder=new_folder)
logger.info(
"Updated AnimeSeries.folder: %s%s (id=%s)",
old_folder,
new_folder,
series.id,
)
# 2. Update Episode.file_path for all episodes of this series
episodes = await EpisodeService.get_by_series(db, series.id)
for episode in episodes:
if episode.file_path:
old_file_path = Path(episode.file_path)
# Only update if the path is under the old series folder
try:
old_file_path.relative_to(old_series_path)
new_file_path = new_series_path / old_file_path.relative_to(
old_series_path
)
episode.file_path = str(new_file_path)
logger.debug(
"Updated Episode.file_path: %s%s",
old_file_path,
new_file_path,
)
except ValueError:
# Path is not under old_series_path, skip
pass
_update_episode_paths(episodes, old_series_path, new_series_path)
await db.flush()
# 3. Update DownloadQueueItem.file_destination for pending items
queue_items = await DownloadQueueService.get_all(db, with_series=True)
for item in queue_items:
if item.series_id == series.id and item.file_destination:
old_dest = Path(item.file_destination)
try:
old_dest.relative_to(old_series_path)
new_dest = new_series_path / old_dest.relative_to(
old_series_path
)
item.file_destination = str(new_dest)
logger.debug(
"Updated DownloadQueueItem.file_destination: %s%s",
old_dest,
new_dest,
)
except ValueError:
pass
_update_queue_destinations(queue_items, series.id, old_series_path, new_series_path)
await db.flush()
logger.info("Database paths updated for series '%s''%s'", old_folder, new_folder)
def _remove_duplicate_target_folder(
series_dir: Path,
current_name: str,
expected_name: str,
expected_path: Path,
) -> bool:
"""Handle the case where the target folder already exists.
Removes the source folder and its DB record to avoid orphaning
episodes/downloads.
Args:
series_dir: Path to the series directory being processed.
current_name: Current folder name.
expected_name: Expected folder name.
expected_path: Path to the expected (target) folder.
Returns:
True if folder was removed successfully, False otherwise.
"""
logger.warning(
"Cannot rename '%s''%s' — target already exists",
current_name,
expected_name,
)
try:
try:
contents = list(series_dir.iterdir())
logger.warning(
"REMOVING folder '%s' with %d items — target '%s' already exists",
current_name,
len(contents),
expected_name,
)
for item in contents:
logger.warning(" Would remove: %s", item)
except OSError as exc:
logger.warning(
"Could not list contents of folder '%s' before removal: %s",
current_name,
exc,
)
shutil.rmtree(series_dir)
logger.info(
"Database paths updated for series '%s''%s'",
old_folder,
new_folder,
"Removed source folder '%s' — series already exists at target",
current_name,
)
# Delete source DB record using synchronous helper
_delete_series_db_record(current_name, expected_name)
return True
except OSError as exc:
logger.error("Failed to remove source folder '%s': %s", current_name, exc)
return False
def _delete_series_db_record(current_name: str, expected_name: str) -> None:
"""Delete the series DB record for a folder that was removed.
Args:
current_name: The folder name to look up in the DB.
expected_name: The target folder name (for logging).
"""
try:
import asyncio
asyncio.run(_delete_series_db_record_async(current_name, expected_name))
except Exception as exc:
logger.warning(
"Could not delete DB record for '%s': %s",
current_name,
exc,
)
async def validate_and_rename_series_folders(dry_run: bool = False) -> Dict[str, int]:
async def _delete_series_db_record_async(current_name: str, expected_name: str) -> None:
"""Async helper to delete series DB record.
Args:
current_name: The folder name to look up.
expected_name: The target folder name (for logging).
"""
async with get_db_session() as db:
source_series = await AnimeSeriesService.get_by_folder(db, current_name)
if source_series is None:
all_series = await AnimeSeriesService.get_all(db)
for s in all_series:
if s.folder == current_name:
source_series = s
break
if source_series is not None:
await AnimeSeriesService.delete(db, source_series.id)
logger.info(
"Deleted source DB record for '%s' (id=%s) — target folder '%s' retains DB record",
current_name,
source_series.id,
expected_name,
)
else:
logger.info(
"No DB record found for source folder '%s' — folder removed only",
current_name,
)
async def validate_and_rename_series_folders(dry_run: bool = False) -> dict[str, int]:
"""Validate and rename series folders to match NFO metadata.
Iterates over every subfolder in ``settings.anime_directory`` that
@@ -505,25 +606,21 @@ async def validate_and_rename_series_folders(dry_run: bool = False) -> Dict[str,
"""
if not settings.anime_directory:
logger.warning("Folder rename skipped — anime directory not configured")
return {"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0}
return RenameStats().to_dict()
anime_dir = Path(settings.anime_directory)
if not anime_dir.is_dir():
logger.warning(
"Folder rename skipped — anime directory not found: %s", anime_dir
)
return {"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0}
logger.warning("Folder rename skipped — anime directory not found: %s", anime_dir)
return RenameStats().to_dict()
if dry_run:
logger.info("Running in DRY-RUN mode — no changes will be made")
stats = {"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0}
# Detect pre-existing duplicates before rename loop
pre_existing_duplicates: Set[str] = set()
stats = RenameStats()
pre_existing_duplicates: set[str] = set()
duplicates = _scan_for_pre_existing_duplicates(anime_dir)
for dup_group in duplicates:
# Try automatic merge first
if _try_merge_duplicate_group(dup_group, dry_run=dry_run):
logger.info(
"Auto-merged duplicate group for '%s' (%d folders)",
@@ -531,7 +628,6 @@ async def validate_and_rename_series_folders(dry_run: bool = False) -> Dict[str,
dup_group.count,
)
else:
# Flag all folders in this group as pre-existing duplicates
for folder in dup_group.folders:
pre_existing_duplicates.add(folder)
logger.warning(
@@ -549,7 +645,7 @@ async def validate_and_rename_series_folders(dry_run: bool = False) -> Dict[str,
if not nfo_path.exists():
continue
stats["scanned"] += 1
stats.scanned += 1
title, year = _parse_nfo_title_and_year(nfo_path)
if not title or not year:
@@ -557,130 +653,63 @@ async def validate_and_rename_series_folders(dry_run: bool = False) -> Dict[str,
"Skipping rename for '%s' — missing title or year in NFO",
series_dir.name,
)
stats["skipped"] += 1
stats.skipped += 1
continue
expected_name = _compute_expected_folder_name(title, year)
current_name = series_dir.name
if expected_name == current_name:
logger.debug(
"Folder name already correct: '%s'", current_name
)
logger.debug("Folder name already correct: '%s'", current_name)
continue
# Check for active downloads
if _is_series_being_downloaded(current_name):
logger.info(
"Skipping rename for '%s' — series has active or pending downloads",
current_name,
)
stats["skipped"] += 1
stats.skipped += 1
continue
expected_path = anime_dir / expected_name
# Check for pre-existing duplicate
if current_name in pre_existing_duplicates:
logger.warning(
"Skipping rename for '%s' — pre-existing duplicate folder detected",
current_name,
)
stats["errors"] += 1
stats.errors += 1
continue
# Check for duplicate target
if expected_path.exists():
logger.warning(
"Cannot rename '%s''%s' — target already exists",
current_name,
expected_name,
)
# Target folder exists — remove source folder and delete its DB record
# (target folder's DB record survives, source folder's record must be removed
# to avoid orphaning episodes/downloads)
try:
import shutil
logger.warning(
"Removing source duplicate folder '%s' — target '%s' already exists",
current_name,
expected_name,
)
shutil.rmtree(series_dir)
logger.info(
"Removed source folder '%s' — series already exists at target",
current_name,
)
# Delete source DB record (cascades to episodes and download items)
async with get_db_session() as db:
source_series = await AnimeSeriesService.get_by_folder(db, current_name)
if source_series is None:
# Fallback: find by folder name
all_series = await AnimeSeriesService.get_all(db)
for s in all_series:
if s.folder == current_name:
source_series = s
break
if source_series is not None:
await AnimeSeriesService.delete(db, source_series.id)
logger.info(
"Deleted source DB record for '%s' (id=%s) — target folder '%s' retains DB record",
current_name,
source_series.id,
expected_name,
)
else:
logger.info(
"No DB record found for source folder '%s' — folder removed only",
current_name,
)
stats["renamed"] += 1
except OSError as exc:
logger.error(
"Failed to remove source folder '%s': %s",
current_name,
exc,
)
stats["errors"] += 1
if _remove_duplicate_target_folder(series_dir, current_name, expected_name, expected_path):
stats.renamed += 1
else:
stats.errors += 1
continue
# Check path length limits
if len(str(expected_path)) > 4096:
logger.warning(
"Cannot rename '%s''%s' — path exceeds OS limit",
current_name,
expected_name,
)
stats["errors"] += 1
stats.errors += 1
continue
if dry_run:
logger.info(
"[DRY-RUN] Would rename folder: '%s''%s'",
current_name,
expected_name,
)
stats["renamed"] += 1
logger.info("[DRY-RUN] Would rename folder: '%s''%s'", current_name, expected_name)
stats.renamed += 1
continue
try:
old_path = series_dir
series_dir.rename(expected_path)
logger.info(
"Renamed folder: '%s''%s'", current_name, expected_name
)
stats["renamed"] += 1
logger.info("Renamed folder: '%s''%s'", current_name, expected_name)
stats.renamed += 1
# Update database records
await _update_database_paths(current_name, expected_name, anime_dir)
# Clean up stale/legacy files after successful rename
_cleanup_stale_files_after_rename(expected_path, expected_name)
# Clean up orphaned folder if old path still exists
_remove_key_file(expected_path)
_cleanup_orphaned_folder(old_path, expected_path, dry_run=False)
except PermissionError as exc:
@@ -690,7 +719,7 @@ async def validate_and_rename_series_folders(dry_run: bool = False) -> Dict[str,
expected_name,
exc,
)
stats["errors"] += 1
stats.errors += 1
except OSError as exc:
logger.error(
"OS error renaming '%s''%s': %s",
@@ -698,13 +727,13 @@ async def validate_and_rename_series_folders(dry_run: bool = False) -> Dict[str,
expected_name,
exc,
)
stats["errors"] += 1
stats.errors += 1
logger.info(
"Folder rename scan complete: scanned=%d, renamed=%d, skipped=%d, errors=%d",
stats["scanned"],
stats["renamed"],
stats["skipped"],
stats["errors"],
stats.scanned,
stats.renamed,
stats.skipped,
stats.errors,
)
return stats
return stats.to_dict()

View File

@@ -202,7 +202,7 @@ class FolderScanService:
# 1.4 — Validate and rename series folders after NFO repair.
logger.info("Starting folder rename validation")
from src.server.services.folder_rename_service import (
from src.server.services.scheduler.folder_rename_service import (
validate_and_rename_series_folders,
)

View File

@@ -0,0 +1,293 @@
"""Rescan orchestrator — coordinates all scan/cleanup operations during a rescan.
Extracts the rescan workflow from SchedulerService so scheduling and scan
logic are cleanly separated.
Called by SchedulerService.trigger_rescan() and by _run_rescan_job().
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import List, Optional
from src.server.models.config import SchedulerConfig
logger = logging.getLogger(__name__)
class RescanOrchestrator:
"""Coordinates rescan, auto-download, folder scan, and key resolution.
This class encapsulates the entire post-rescan workflow so SchedulerService
only needs to call a single method.
"""
def __init__(self, config: Optional[SchedulerConfig] = None) -> None:
"""Initialize the orchestrator.
Args:
config: Optional scheduler config. If None, operations that depend
on config flags (auto_download, folder_scan) will be skipped.
"""
self._config = config
self._last_scan_time: Optional[datetime] = None
# Cooldown tracking for auto-download to prevent rapid re-triggers
self._last_auto_download_time: Optional[datetime] = None
self._auto_download_cooldown_seconds: int = 300 # 5 minutes default
@property
def last_scan_time(self) -> Optional[datetime]:
return self._last_scan_time
# ------------------------------------------------------------------
# Auto-download
# ------------------------------------------------------------------
async def run_auto_download(self) -> int:
"""Queue and start downloads for all series with missing episodes.
Returns:
Number of episodes queued.
"""
from datetime import timedelta
from src.server.models.download import EpisodeIdentifier
from src.server.utils.dependencies import (
get_anime_service,
get_download_service,
)
# Check cooldown to prevent rapid re-triggers
now = datetime.now(timezone.utc)
if self._last_auto_download_time is not None:
elapsed = now - self._last_auto_download_time
if elapsed < timedelta(seconds=self._auto_download_cooldown_seconds):
logger.debug(
"Auto-download skipped: cooldown active (elapsed=%.1fs cooldown=%ds)",
elapsed.total_seconds(),
self._auto_download_cooldown_seconds,
)
return 0
anime_service = get_anime_service()
download_service = get_download_service()
series_list = anime_service._cached_list_missing()
queued_count = 0
for series in series_list:
episode_dict: dict = series.get("episodeDict") or {}
if not episode_dict:
continue
episodes: List[EpisodeIdentifier] = []
for season_str, ep_numbers in episode_dict.items():
for ep_num in ep_numbers:
episodes.append(
EpisodeIdentifier(season=int(season_str), episode=int(ep_num))
)
if not episodes:
continue
await download_service.add_to_queue(
serie_id=series.get("key", ""),
serie_folder=series.get("folder", series.get("name", "")),
serie_name=series.get("name", ""),
episodes=episodes,
)
queued_count += len(episodes)
logger.info(
"Auto-download queued episodes for series=%s count=%d",
series.get("key"),
len(episodes),
)
if queued_count:
await download_service.start_queue_processing()
logger.info("Auto-download queue processing started: queued=%d", queued_count)
self._last_auto_download_time = datetime.now(timezone.utc)
logger.info("Auto-download completed: queued_count=%d", queued_count)
return queued_count
# ------------------------------------------------------------------
# Folder scan
# ------------------------------------------------------------------
async def run_folder_scan(self) -> None:
"""Run the folder scan maintenance task."""
from src.server.services.scheduler.folder_scan_service import FolderScanService
folder_scan_service = FolderScanService()
await folder_scan_service.run_folder_scan()
logger.info("Folder scan completed successfully")
# ------------------------------------------------------------------
# Key resolution
# ------------------------------------------------------------------
async def run_key_resolution(self) -> dict:
"""Run the orphaned folder key resolution scan.
Returns:
Dict with resolved/skipped/errors counts.
"""
from src.server.services.key_resolution_service import (
perform_key_resolution_scan,
)
key_stats = await perform_key_resolution_scan()
logger.info(
"Key resolution scan completed: resolved=%d, skipped=%d, errors=%d",
key_stats["resolved"],
key_stats["skipped"],
key_stats["errors"],
)
return key_stats
# ------------------------------------------------------------------
# Main orchestrator entry point
# ------------------------------------------------------------------
async def execute(self) -> dict:
"""Execute the full rescan workflow.
Runs in order:
1. anime_service.rescan()
2. auto-download (if enabled)
3. folder scan (if enabled)
4. key resolution scan (always, if anime_directory configured)
Returns:
Dict with duration and counts for each step.
"""
scan_start = datetime.now(timezone.utc)
results = {
"started_at": scan_start.isoformat(),
"duration_seconds": 0.0,
"rescan_completed": False,
"auto_download_queued": 0,
"folder_scan_completed": False,
"key_resolution": {"resolved": 0, "skipped": 0, "errors": 0},
}
await self._broadcast(
"scheduled_rescan_started",
{"timestamp": scan_start.isoformat()},
)
try:
# 1. Main library rescan
await self._run_rescan()
results["rescan_completed"] = True
# 2. Auto-download
if self._config and self._config.auto_download_after_rescan:
try:
queued = await self.run_auto_download()
results["auto_download_queued"] = queued
await self._broadcast(
"auto_download_started", {"queued_count": queued}
)
except Exception as exc:
logger.error("Auto-download failed: %s", exc, exc_info=True)
await self._broadcast(
"auto_download_error", {"error": str(exc)}
)
# 3. Folder scan
if self._config and self._config.folder_scan_enabled:
try:
await self.run_folder_scan()
results["folder_scan_completed"] = True
except Exception as exc:
logger.error("Folder scan failed: %s", exc, exc_info=True)
await self._broadcast("folder_scan_error", {"error": str(exc)})
# 4. Key resolution scan (always runs if anime_directory configured)
try:
key_stats = await self.run_key_resolution()
results["key_resolution"] = key_stats
except Exception as exc:
logger.error("Key resolution scan failed: %s", exc, exc_info=True)
self._last_scan_time = datetime.now(timezone.utc)
results["duration_seconds"] = (
self._last_scan_time - scan_start
).total_seconds()
await self._broadcast(
"scheduled_rescan_completed",
{
"timestamp": self._last_scan_time.isoformat(),
"duration_seconds": results["duration_seconds"],
},
)
logger.info(
"Scheduled library rescan completed: duration=%.2fs",
results["duration_seconds"],
)
except Exception as exc:
logger.error("Scheduled rescan failed: %s", exc, exc_info=True)
await self._broadcast(
"scheduled_rescan_error",
{"error": str(exc), "timestamp": datetime.now(timezone.utc).isoformat()},
)
raise
return results
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
async def _run_rescan(self) -> None:
"""Run the anime service rescan."""
from src.server.utils.dependencies import get_anime_service
anime_service = get_anime_service()
logger.info("Anime service obtained, calling anime_service.rescan()...")
await anime_service.rescan()
logger.info("anime_service.rescan() completed")
async def _broadcast(self, event_type: str, data: dict) -> None:
"""Broadcast a WebSocket event to all connected clients."""
try:
from src.server.services.websocket_service import get_websocket_service
ws_service = get_websocket_service()
await ws_service.manager.broadcast({"type": event_type, "data": data})
except Exception as exc:
logger.warning(
"WebSocket broadcast failed: event=%s error=%s", event_type, exc
)
# ---------------------------------------------------------------------------
# Module-level orchestrator
# ---------------------------------------------------------------------------
_orchestrator: Optional[RescanOrchestrator] = None
def get_rescan_orchestrator(
config: Optional[SchedulerConfig] = None,
) -> RescanOrchestrator:
"""Return a RescanOrchestrator singleton (or create with optional config)."""
global _orchestrator
if _orchestrator is None or config is not None:
_orchestrator = RescanOrchestrator(config=config)
logger.debug("Created new RescanOrchestrator singleton")
else:
logger.debug("Returning existing RescanOrchestrator singleton")
return _orchestrator
def reset_rescan_orchestrator() -> None:
"""Reset the orchestrator singleton (used in tests)."""
global _orchestrator
_orchestrator = None

View File

@@ -1,18 +1,19 @@
"""Scheduler service for automatic library rescans.
Uses APScheduler's AsyncIOScheduler with CronTrigger for precise
cron-based scheduling. The legacy interval-based loop has been removed
in favour of the cron approach.
cron-based scheduling.
Jobs are held in memory (no separate scheduler database). On startup,
if the last scan timestamp indicates a missed run (server was down at the
scheduled cron time), a rescan is triggered immediately.
Actual rescan logic is delegated to RescanService.
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
@@ -42,7 +43,9 @@ class SchedulerService:
- Cron-based scheduling (time of day + days of week)
- Immediate manual trigger
- Live config reloading without app restart
- Auto-queueing downloads of missing episodes after rescan
Actual rescan/folder-scan/auto-download work is delegated to
RescanService.
"""
def __init__(self) -> None:
@@ -50,11 +53,7 @@ class SchedulerService:
self._is_running: bool = False
self._scheduler: Optional[AsyncIOScheduler] = None
self._config: Optional[SchedulerConfig] = None
self._last_scan_time: Optional[datetime] = None
self._scan_in_progress: bool = False
# Cooldown tracking for auto-download to prevent rapid re-triggers
self._last_auto_download_time: Optional[datetime] = None
self._auto_download_cooldown_seconds: int = 300 # 5 minutes default
logger.info("SchedulerService initialised")
# ------------------------------------------------------------------
@@ -82,8 +81,6 @@ class SchedulerService:
logger.error("Failed to load scheduler configuration: %s", exc)
raise SchedulerServiceError(f"Failed to load config: {exc}") from exc
# Use in-memory job store — no separate scheduler.db needed.
# Jobs are reconstructed from config on every startup.
self._scheduler = AsyncIOScheduler()
if not self._config.enabled:
@@ -133,8 +130,7 @@ class SchedulerService:
)
# Startup misfire recovery: check if the last scan was missed while
# the server was down. If overdue by more than one interval but within
# the grace period, trigger an immediate rescan.
# the server was down.
await self._check_missed_run()
async def stop(self) -> None:
@@ -251,6 +247,10 @@ class SchedulerService:
Returns:
Dict containing scheduler state and config fields.
"""
from src.server.services.rescan_service import get_rescan_service
rescan_service = get_rescan_service()
next_run: Optional[str] = None
if self._scheduler and self._scheduler.running:
job = self._scheduler.get_job(_JOB_ID)
@@ -269,7 +269,11 @@ class SchedulerService:
"folder_scan_enabled": (
self._config.folder_scan_enabled if self._config else False
),
"last_run": self._last_scan_time.isoformat() if self._last_scan_time else None,
"last_run": (
rescan_service.last_scan_time.isoformat()
if rescan_service.last_scan_time
else None
),
"next_run": next_run,
"scan_in_progress": self._scan_in_progress,
}
@@ -316,9 +320,9 @@ class SchedulerService:
return
try:
from src.server.database.connection import get_db_session # noqa: PLC0415
from src.server.database.connection import get_db_session
from src.server.database.system_settings_service import (
SystemSettingsService, # noqa: PLC0415
SystemSettingsService,
)
async with get_db_session() as db:
@@ -341,7 +345,6 @@ class SchedulerService:
# If last scan was more than 24h + grace period ago, don't trigger
# (avoids surprise rescans after long downtime).
max_overdue = timedelta(hours=24, seconds=_MISFIRE_GRACE_SECONDS)
# If last scan was more than ~25h ago, skip (too stale)
if elapsed > max_overdue:
logger.info(
"Last scan was %s ago (> %s) — skipping missed-run recovery",
@@ -351,7 +354,6 @@ class SchedulerService:
return
# Check if a run should have happened between last_scan and now.
# Simple heuristic: if elapsed > 24h, we missed at least one daily run.
if elapsed > timedelta(hours=23):
logger.info(
"Missed scheduled rescan detected (last scan %s ago) — triggering now",
@@ -362,191 +364,22 @@ class SchedulerService:
except Exception as exc: # pylint: disable=broad-exception-caught
logger.warning("Missed-run check failed (non-fatal): %s", exc)
async def _broadcast(self, event_type: str, data: dict) -> None:
"""Broadcast a WebSocket event to all connected clients."""
try:
from src.server.services.websocket_service import (
get_websocket_service, # noqa: PLC0415
)
ws_service = get_websocket_service()
await ws_service.manager.broadcast({"type": event_type, "data": data})
except Exception as exc: # pylint: disable=broad-exception-caught
logger.warning("WebSocket broadcast failed: event=%s error=%s", event_type, exc)
async def _auto_download_missing(self) -> None:
"""Queue and start downloads for all series with missing episodes."""
from datetime import timedelta # noqa: PLC0415
from src.server.models.download import EpisodeIdentifier # noqa: PLC0415
from src.server.utils.dependencies import ( # noqa: PLC0415
get_anime_service,
get_download_service,
)
# Check cooldown to prevent rapid re-triggers
now = datetime.now(timezone.utc)
if self._last_auto_download_time is not None:
elapsed = now - self._last_auto_download_time
if elapsed < timedelta(seconds=self._auto_download_cooldown_seconds):
logger.debug(
"Auto-download skipped: cooldown active (elapsed=%.1fs cooldown=%ds)",
elapsed.total_seconds(),
self._auto_download_cooldown_seconds,
)
return
anime_service = get_anime_service()
download_service = get_download_service()
series_list = anime_service._cached_list_missing()
queued_count = 0
for series in series_list:
episode_dict: dict = series.get("episodeDict") or {}
if not episode_dict:
continue
episodes: List[EpisodeIdentifier] = []
for season_str, ep_numbers in episode_dict.items():
for ep_num in ep_numbers:
episodes.append(
EpisodeIdentifier(season=int(season_str), episode=int(ep_num))
)
if not episodes:
continue
await download_service.add_to_queue(
serie_id=series.get("key", ""),
serie_folder=series.get("folder", series.get("name", "")),
serie_name=series.get("name", ""),
episodes=episodes,
)
queued_count += len(episodes)
logger.info(
"Auto-download queued episodes for series=%s count=%d",
series.get("key"),
len(episodes),
)
if queued_count:
await download_service.start_queue_processing()
logger.info("Auto-download queue processing started: queued=%d", queued_count)
await self._broadcast("auto_download_started", {"queued_count": queued_count})
logger.info("Auto-download completed: queued_count=%d", queued_count)
# Update cooldown timestamp after successful auto-download
self._last_auto_download_time = datetime.now(timezone.utc)
async def _perform_rescan(self) -> None:
"""Execute a library rescan and optionally trigger auto-download."""
logger.info("Scheduler _perform_rescan entered: scan_in_progress=%s", self._scan_in_progress)
"""Execute a library rescan via RescanService."""
from src.server.services.rescan_service import get_rescan_service
logger.info(
"Scheduler _perform_rescan entered: scan_in_progress=%s",
self._scan_in_progress,
)
if self._scan_in_progress:
logger.warning("Skipping rescan: previous scan still in progress")
return
self._scan_in_progress = True
scan_start = datetime.now(timezone.utc)
logger.info("Scheduled rescan started at %s", scan_start.isoformat())
try:
logger.info("Starting scheduled library rescan")
from src.server.utils.dependencies import get_anime_service # noqa: PLC0415
anime_service = get_anime_service()
logger.info("Anime service obtained for rescan")
await self._broadcast(
"scheduled_rescan_started",
{"timestamp": scan_start.isoformat()},
)
logger.info("Calling anime_service.rescan()...")
await anime_service.rescan()
self._last_scan_time = datetime.now(timezone.utc)
duration = (self._last_scan_time - scan_start).total_seconds()
logger.info("Scheduled library rescan completed: duration=%.2fs", duration)
await self._broadcast(
"scheduled_rescan_completed",
{
"timestamp": self._last_scan_time.isoformat(),
"duration_seconds": duration,
},
)
# Auto-download after rescan
if self._config and self._config.auto_download_after_rescan:
logger.info("Auto-download after rescan is enabled — starting")
try:
await self._auto_download_missing()
except Exception as dl_exc: # pylint: disable=broad-exception-caught
logger.error(
"Auto-download after rescan failed: %s",
dl_exc,
exc_info=True,
)
await self._broadcast(
"auto_download_error", {"error": str(dl_exc)}
)
else:
logger.debug("Auto-download after rescan is disabled — skipping")
# Folder scan (daily maintenance)
if self._config and self._config.folder_scan_enabled:
logger.info("Folder scan is enabled — starting")
try:
from src.server.services.folder_scan_service import (
FolderScanService, # noqa: PLC0415
)
folder_scan_service = FolderScanService()
await folder_scan_service.run_folder_scan()
logger.info("Folder scan completed successfully")
except Exception as fs_exc: # pylint: disable=broad-exception-caught
logger.error(
"Folder scan failed: %s",
fs_exc,
exc_info=True,
)
await self._broadcast(
"folder_scan_error", {"error": str(fs_exc)}
)
# Key resolution scan (resolve orphaned folders)
try:
from src.server.services.key_resolution_service import (
perform_key_resolution_scan, # noqa: PLC0415
)
key_stats = await perform_key_resolution_scan()
logger.info(
"Key resolution scan completed: resolved=%d, skipped=%d, errors=%d",
key_stats["resolved"],
key_stats["skipped"],
key_stats["errors"],
)
except Exception as kr_exc: # pylint: disable=broad-exception-caught
logger.error(
"Key resolution scan failed: %s",
kr_exc,
exc_info=True,
)
else:
logger.debug("Folder scan is disabled — skipping")
except Exception as exc: # pylint: disable=broad-exception-caught
logger.error("Scheduled rescan failed: %s", exc, exc_info=True)
await self._broadcast(
"scheduled_rescan_error",
{"error": str(exc), "timestamp": datetime.now(timezone.utc).isoformat()},
)
rescan_service = get_rescan_service(config=self._config)
await rescan_service.execute()
finally:
self._scan_in_progress = False
logger.info("Scheduled rescan finished: scan_in_progress reset to False")