Remove key_resolution and folder_rename services
Both services deleted. Orchestrator and rescan service no longer call them. Folder scan step numbering adjusted (1.5 → 1.4). Tests updated.
This commit is contained in:
@@ -1,739 +0,0 @@
|
||||
"""Folder rename service for validating and renaming series folders.
|
||||
|
||||
After NFO repair, this service iterates over every subfolder in
|
||||
``settings.anime_directory`` that contains a ``tvshow.nfo``. For each
|
||||
folder it parses the NFO to extract ``<title>`` and ``<year>``, computes
|
||||
the expected folder name ``f"{title} ({year})"``, sanitises it for
|
||||
filesystem safety, and renames the folder if the current name differs.
|
||||
|
||||
Database records (``AnimeSeries.folder``, ``Episode.file_path``,
|
||||
``DownloadQueueItem.file_destination``) are updated atomically to
|
||||
reflect the new paths.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
import shutil
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from lxml import etree
|
||||
|
||||
from src.config.settings import settings
|
||||
from src.server.database.connection import get_db_session
|
||||
from src.server.database.service import (
|
||||
AnimeSeriesService,
|
||||
DownloadQueueService,
|
||||
EpisodeService,
|
||||
)
|
||||
from src.server.utils.dependencies import get_download_service
|
||||
from src.server.utils.filesystem import sanitize_folder_name
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Pre-compiled pattern for stripping existing year suffixes
|
||||
_YEAR_SUFFIX_PATTERN = re.compile(r'(\s*\(\d{4}\))+\s*$')
|
||||
|
||||
|
||||
@dataclass
|
||||
class DuplicateGroup:
|
||||
"""Represents a group of duplicate folders for the same series.
|
||||
|
||||
Attributes:
|
||||
key: The series key (folder name before rename).
|
||||
folders: List of folder paths that map to this series.
|
||||
nfo_paths: List of corresponding NFO file paths.
|
||||
"""
|
||||
|
||||
key: str
|
||||
folders: list[str]
|
||||
nfo_paths: list[Path]
|
||||
|
||||
@property
|
||||
def count(self) -> int:
|
||||
return len(self.folders)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"DuplicateGroup(key={self.key!r}, folders={self.folders})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class RenameStats:
|
||||
"""Statistics from a folder rename operation."""
|
||||
|
||||
scanned: int = 0
|
||||
renamed: int = 0
|
||||
skipped: int = 0
|
||||
errors: int = 0
|
||||
|
||||
def to_dict(self) -> dict[str, int]:
|
||||
return {"scanned": self.scanned, "renamed": self.renamed, "skipped": self.skipped, "errors": self.errors}
|
||||
|
||||
|
||||
def _scan_for_pre_existing_duplicates(anime_dir: Path) -> list[DuplicateGroup]:
|
||||
"""Scan anime directory for pre-existing duplicate folders.
|
||||
|
||||
Groups folders by the series key extracted from their NFO files.
|
||||
Folders with the same title+year (same expected name) are flagged as duplicates.
|
||||
|
||||
Args:
|
||||
anime_dir: Path to the anime directory to scan.
|
||||
|
||||
Returns:
|
||||
List of DuplicateGroup objects, one per series with duplicate folders.
|
||||
"""
|
||||
groups: dict[str, list[tuple[str, Path]]] = defaultdict(list)
|
||||
|
||||
for series_dir in anime_dir.iterdir():
|
||||
if not series_dir.is_dir():
|
||||
continue
|
||||
nfo_path = series_dir / "tvshow.nfo"
|
||||
if not nfo_path.exists():
|
||||
continue
|
||||
title, year = _parse_nfo_title_and_year(nfo_path)
|
||||
if not title or not year:
|
||||
continue
|
||||
expected_name = _compute_expected_folder_name(title, year)
|
||||
groups[expected_name].append((series_dir.name, nfo_path))
|
||||
|
||||
duplicates = []
|
||||
for key, items in groups.items():
|
||||
if len(items) > 1:
|
||||
folders = [item[0] for item in items]
|
||||
nfo_paths = [item[1] for item in items]
|
||||
duplicates.append(DuplicateGroup(key=key, folders=folders, nfo_paths=nfo_paths))
|
||||
|
||||
return duplicates
|
||||
|
||||
|
||||
def _try_merge_duplicate_group(group: DuplicateGroup, dry_run: bool = False) -> bool:
|
||||
"""Attempt to merge a duplicate group automatically.
|
||||
|
||||
Uses the first folder as the canonical one and removes others if they are
|
||||
empty or contain only symlinks.
|
||||
|
||||
Args:
|
||||
group: The DuplicateGroup to merge.
|
||||
dry_run: If True, only log actions without executing them.
|
||||
|
||||
Returns:
|
||||
True if merge was successful, False otherwise.
|
||||
"""
|
||||
if len(group.folders) < 2:
|
||||
return True
|
||||
|
||||
canonical = group.folders[0]
|
||||
to_remove = group.folders[1:]
|
||||
|
||||
for folder in to_remove:
|
||||
folder_path = group.nfo_paths[0].parent.parent / folder
|
||||
if not folder_path.exists():
|
||||
continue
|
||||
|
||||
try:
|
||||
contents = list(folder_path.iterdir())
|
||||
except PermissionError:
|
||||
logger.warning("Permission denied accessing %s, skip merge", folder_path)
|
||||
return False
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
if not contents:
|
||||
if dry_run:
|
||||
logger.info("[DRY-RUN] Would delete empty duplicate folder: %s", folder_path)
|
||||
else:
|
||||
try:
|
||||
folder_path.rmdir()
|
||||
logger.info("Deleted empty duplicate folder: %s", folder_path)
|
||||
except OSError:
|
||||
return False
|
||||
continue
|
||||
|
||||
canonical_path = folder_path.parent / canonical
|
||||
all_symlinks = all(
|
||||
item.is_symlink() and item.resolve() == canonical_path.resolve()
|
||||
for item in contents
|
||||
)
|
||||
if all_symlinks:
|
||||
if dry_run:
|
||||
logger.info("[DRY-RUN] Would remove symlinks in duplicate folder: %s", folder_path)
|
||||
else:
|
||||
for item in contents:
|
||||
item.unlink()
|
||||
try:
|
||||
folder_path.rmdir()
|
||||
logger.info("Removed symlink-only duplicate folder: %s", folder_path)
|
||||
except OSError:
|
||||
return False
|
||||
continue
|
||||
|
||||
logger.warning(
|
||||
"Cannot auto-merge duplicate folders for '%s': %s (manual merge required)",
|
||||
group.key,
|
||||
[canonical] + to_remove,
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def _parse_nfo_title_and_year(nfo_path: Path) -> tuple[Optional[str], Optional[str]]:
|
||||
"""Parse a tvshow.nfo and return (title, year) text values.
|
||||
|
||||
Args:
|
||||
nfo_path: Absolute path to the ``tvshow.nfo`` file.
|
||||
|
||||
Returns:
|
||||
Tuple of (title, year) where either may be ``None`` if missing
|
||||
or empty.
|
||||
"""
|
||||
try:
|
||||
tree = etree.parse(str(nfo_path))
|
||||
root = tree.getroot()
|
||||
|
||||
title_elem = root.find("./title")
|
||||
year_elem = root.find("./year")
|
||||
|
||||
title = title_elem.text.strip() if title_elem is not None and title_elem.text and title_elem.text.strip() else None
|
||||
year = year_elem.text.strip() if year_elem is not None and year_elem.text and year_elem.text.strip() else None
|
||||
|
||||
return title, year
|
||||
except etree.XMLSyntaxError as exc:
|
||||
logger.warning("Malformed XML in %s: %s", nfo_path, exc)
|
||||
return None, None
|
||||
except Exception as exc:
|
||||
logger.warning("Unexpected error parsing %s: %s", nfo_path, exc)
|
||||
return None, None
|
||||
|
||||
|
||||
def _compute_expected_folder_name(title: str, year: str) -> str:
|
||||
"""Compute the expected folder name from title and year.
|
||||
|
||||
Removes any existing year suffixes (e.g., "(2021)") before adding the
|
||||
canonical one to prevent duplication across multiple folder rename runs.
|
||||
|
||||
Args:
|
||||
title: Series title from NFO.
|
||||
year: Release year from NFO.
|
||||
|
||||
Returns:
|
||||
Sanitised folder name in the format ``"{title} ({year})"``.
|
||||
"""
|
||||
clean_title = _YEAR_SUFFIX_PATTERN.sub('', title).strip()
|
||||
year_suffix = f" ({year})"
|
||||
raw_name = f"{clean_title}{year_suffix}"
|
||||
return sanitize_folder_name(raw_name)
|
||||
|
||||
|
||||
def _is_series_being_downloaded(series_folder: str) -> bool:
|
||||
"""Check whether the given series has an active or pending download.
|
||||
|
||||
Args:
|
||||
series_folder: The series folder name (as stored in the DB).
|
||||
|
||||
Returns:
|
||||
``True`` if the series appears in the active download or the
|
||||
pending queue.
|
||||
"""
|
||||
try:
|
||||
download_service = get_download_service()
|
||||
active = download_service._active_download
|
||||
if active and active.serie_folder == series_folder:
|
||||
return True
|
||||
for item in download_service._pending_queue:
|
||||
if item.serie_folder == series_folder:
|
||||
return True
|
||||
return False
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Could not check download status for %s: %s", series_folder, exc
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
def _remove_key_file(path: Path) -> None:
|
||||
"""Remove legacy 'key' file from a series folder.
|
||||
|
||||
Args:
|
||||
path: Path to the series folder.
|
||||
"""
|
||||
key_file = path / "key"
|
||||
if key_file.exists():
|
||||
try:
|
||||
key_file.unlink()
|
||||
logger.info("Removed legacy 'key' file after rename: %s", key_file)
|
||||
except OSError as exc:
|
||||
logger.warning("Could not remove legacy 'key' file %s: %s", key_file, exc)
|
||||
|
||||
|
||||
def _move_file(item: Path, dest: Path) -> bool:
|
||||
"""Move a single file or directory to destination.
|
||||
|
||||
Args:
|
||||
item: Source path to move.
|
||||
dest: Destination path.
|
||||
|
||||
Returns:
|
||||
True if move succeeded, False otherwise.
|
||||
"""
|
||||
try:
|
||||
item.rename(dest)
|
||||
logger.debug("Moved %s → %s", item, dest)
|
||||
return True
|
||||
except PermissionError as exc:
|
||||
logger.warning("Permission denied moving %s: %s", item, exc)
|
||||
return False
|
||||
except OSError as exc:
|
||||
logger.warning("OS error moving %s: %s", item, exc)
|
||||
return False
|
||||
|
||||
|
||||
def _cleanup_orphaned_folder(old_path: Path, new_path: Path, dry_run: bool = False) -> bool:
|
||||
"""Clean up orphaned folder after successful rename.
|
||||
|
||||
After a folder is successfully renamed to new_path, this function checks
|
||||
if the old_path still exists (orphaned folder) and removes it. If the
|
||||
old folder contains files, they are moved to new_path before deletion.
|
||||
|
||||
Args:
|
||||
old_path: The original folder path before rename.
|
||||
new_path: The new folder path after rename.
|
||||
dry_run: If True, only log actions without executing them.
|
||||
|
||||
Returns:
|
||||
True if old folder was cleaned up (or would be in dry-run mode),
|
||||
False if old folder does not exist or cleanup failed.
|
||||
"""
|
||||
if not old_path.exists():
|
||||
logger.debug("Old folder does not exist, no cleanup needed: %s", old_path)
|
||||
return False
|
||||
|
||||
try:
|
||||
contents = list(old_path.iterdir())
|
||||
except PermissionError as exc:
|
||||
logger.warning("Permission denied accessing old folder %s: %s", old_path, exc)
|
||||
return False
|
||||
except OSError as exc:
|
||||
logger.warning("OS error accessing old folder %s: %s", old_path, exc)
|
||||
return False
|
||||
|
||||
if not contents:
|
||||
if dry_run:
|
||||
logger.info("[DRY-RUN] Would delete empty orphaned folder: %s", old_path)
|
||||
return True
|
||||
try:
|
||||
old_path.rmdir()
|
||||
logger.info("Deleted empty orphaned folder: %s", old_path)
|
||||
return True
|
||||
except PermissionError as exc:
|
||||
logger.warning("Permission denied deleting folder %s: %s", old_path, exc)
|
||||
return False
|
||||
except OSError as exc:
|
||||
logger.warning("OS error deleting folder %s: %s", old_path, exc)
|
||||
return False
|
||||
|
||||
if dry_run:
|
||||
logger.info("[DRY-RUN] Would move %d files from orphaned folder %s to %s",
|
||||
len(contents), old_path, new_path)
|
||||
for item in contents:
|
||||
logger.info("[DRY-RUN] Would move: %s → %s", item, new_path / item.name)
|
||||
logger.info("[DRY-RUN] Would then delete orphaned folder: %s", old_path)
|
||||
return True
|
||||
|
||||
files_moved = 0
|
||||
errors = 0
|
||||
for item in contents:
|
||||
if not _move_file(item, new_path / item.name):
|
||||
errors += 1
|
||||
else:
|
||||
files_moved += 1
|
||||
|
||||
if files_moved > 0:
|
||||
logger.info("Moved %d files from orphaned folder to %s", files_moved, new_path)
|
||||
|
||||
try:
|
||||
old_path.rmdir()
|
||||
logger.info("Deleted orphaned folder after moving contents: %s", old_path)
|
||||
return errors == 0
|
||||
except OSError as exc:
|
||||
logger.warning("Could not delete orphaned folder %s (may not be empty): %s", old_path, exc)
|
||||
return False
|
||||
|
||||
|
||||
async def _update_series_folder(db, series, new_folder: str) -> None:
|
||||
"""Update AnimeSeries.folder in the database.
|
||||
|
||||
Args:
|
||||
db: Database session.
|
||||
series: The AnimeSeries instance to update.
|
||||
new_folder: New folder name.
|
||||
"""
|
||||
if series is None:
|
||||
return
|
||||
|
||||
await AnimeSeriesService.update(db, series.id, folder=new_folder)
|
||||
logger.info("Updated AnimeSeries.folder: %s (id=%s)", new_folder, series.id)
|
||||
|
||||
|
||||
def _update_episode_paths(episodes, old_series_path: Path, new_series_path: Path) -> None:
|
||||
"""Update Episode.file_path for all episodes of a series.
|
||||
|
||||
Args:
|
||||
episodes: List of Episode instances.
|
||||
old_series_path: Path to the old series folder.
|
||||
new_series_path: Path to the new series folder.
|
||||
"""
|
||||
for episode in episodes:
|
||||
if not episode.file_path:
|
||||
continue
|
||||
old_file_path = Path(episode.file_path)
|
||||
try:
|
||||
old_file_path.relative_to(old_series_path)
|
||||
new_file_path = new_series_path / old_file_path.relative_to(old_series_path)
|
||||
episode.file_path = str(new_file_path)
|
||||
logger.debug("Updated Episode.file_path: %s → %s", old_file_path, new_file_path)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def _update_queue_destinations(
|
||||
queue_items,
|
||||
series_id,
|
||||
old_series_path: Path,
|
||||
new_series_path: Path,
|
||||
) -> None:
|
||||
"""Update DownloadQueueItem.file_destination for pending items.
|
||||
|
||||
Args:
|
||||
queue_items: List of DownloadQueueItem instances.
|
||||
series_id: ID of the series to filter by.
|
||||
old_series_path: Path to the old series folder.
|
||||
new_series_path: Path to the new series folder.
|
||||
"""
|
||||
for item in queue_items:
|
||||
if item.series_id != series_id or not item.file_destination:
|
||||
continue
|
||||
old_dest = Path(item.file_destination)
|
||||
try:
|
||||
old_dest.relative_to(old_series_path)
|
||||
new_dest = new_series_path / old_dest.relative_to(old_series_path)
|
||||
item.file_destination = str(new_dest)
|
||||
logger.debug("Updated DownloadQueueItem.file_destination: %s → %s", old_dest, new_dest)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
async def _update_database_paths(
|
||||
old_folder: str,
|
||||
new_folder: str,
|
||||
anime_dir: Path,
|
||||
) -> None:
|
||||
"""Update all database records that reference the old folder path.
|
||||
|
||||
Updates:
|
||||
- ``AnimeSeries.folder`` → ``new_folder``
|
||||
- ``Episode.file_path`` → adjusted to new folder
|
||||
- ``DownloadQueueItem.file_destination`` → adjusted to new folder
|
||||
|
||||
Args:
|
||||
old_folder: Previous folder name.
|
||||
new_folder: New folder name.
|
||||
anime_dir: Root anime directory path.
|
||||
"""
|
||||
old_series_path = anime_dir / old_folder
|
||||
new_series_path = anime_dir / new_folder
|
||||
|
||||
async with get_db_session() as db:
|
||||
series = await AnimeSeriesService.get_by_folder(db, old_folder)
|
||||
if series is None:
|
||||
all_series = await AnimeSeriesService.get_all(db)
|
||||
for s in all_series:
|
||||
if s.folder == old_folder:
|
||||
series = s
|
||||
break
|
||||
|
||||
await _update_series_folder(db, series, new_folder)
|
||||
|
||||
if series is None:
|
||||
return
|
||||
|
||||
episodes = await EpisodeService.get_by_series(db, series.id)
|
||||
_update_episode_paths(episodes, old_series_path, new_series_path)
|
||||
|
||||
await db.flush()
|
||||
|
||||
queue_items = await DownloadQueueService.get_all(db, with_series=True)
|
||||
_update_queue_destinations(queue_items, series.id, old_series_path, new_series_path)
|
||||
|
||||
await db.flush()
|
||||
logger.info("Database paths updated for series '%s' → '%s'", old_folder, new_folder)
|
||||
|
||||
|
||||
def _remove_duplicate_target_folder(
|
||||
series_dir: Path,
|
||||
current_name: str,
|
||||
expected_name: str,
|
||||
expected_path: Path,
|
||||
) -> bool:
|
||||
"""Handle the case where the target folder already exists.
|
||||
|
||||
Removes the source folder and its DB record to avoid orphaning
|
||||
episodes/downloads.
|
||||
|
||||
Args:
|
||||
series_dir: Path to the series directory being processed.
|
||||
current_name: Current folder name.
|
||||
expected_name: Expected folder name.
|
||||
expected_path: Path to the expected (target) folder.
|
||||
|
||||
Returns:
|
||||
True if folder was removed successfully, False otherwise.
|
||||
"""
|
||||
logger.warning(
|
||||
"Cannot rename '%s' → '%s' — target already exists",
|
||||
current_name,
|
||||
expected_name,
|
||||
)
|
||||
try:
|
||||
try:
|
||||
contents = list(series_dir.iterdir())
|
||||
logger.warning(
|
||||
"REMOVING folder '%s' with %d items — target '%s' already exists",
|
||||
current_name,
|
||||
len(contents),
|
||||
expected_name,
|
||||
)
|
||||
for item in contents:
|
||||
logger.warning(" Would remove: %s", item)
|
||||
except OSError as exc:
|
||||
logger.warning(
|
||||
"Could not list contents of folder '%s' before removal: %s",
|
||||
current_name,
|
||||
exc,
|
||||
)
|
||||
|
||||
shutil.rmtree(series_dir)
|
||||
logger.info(
|
||||
"Removed source folder '%s' — series already exists at target",
|
||||
current_name,
|
||||
)
|
||||
|
||||
# Delete source DB record using synchronous helper
|
||||
_delete_series_db_record(current_name, expected_name)
|
||||
|
||||
return True
|
||||
except OSError as exc:
|
||||
logger.error("Failed to remove source folder '%s': %s", current_name, exc)
|
||||
return False
|
||||
|
||||
|
||||
def _delete_series_db_record(current_name: str, expected_name: str) -> None:
|
||||
"""Delete the series DB record for a folder that was removed.
|
||||
|
||||
Args:
|
||||
current_name: The folder name to look up in the DB.
|
||||
expected_name: The target folder name (for logging).
|
||||
"""
|
||||
try:
|
||||
import asyncio
|
||||
asyncio.run(_delete_series_db_record_async(current_name, expected_name))
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Could not delete DB record for '%s': %s",
|
||||
current_name,
|
||||
exc,
|
||||
)
|
||||
|
||||
|
||||
async def _delete_series_db_record_async(current_name: str, expected_name: str) -> None:
|
||||
"""Async helper to delete series DB record.
|
||||
|
||||
Args:
|
||||
current_name: The folder name to look up.
|
||||
expected_name: The target folder name (for logging).
|
||||
"""
|
||||
async with get_db_session() as db:
|
||||
source_series = await AnimeSeriesService.get_by_folder(db, current_name)
|
||||
if source_series is None:
|
||||
all_series = await AnimeSeriesService.get_all(db)
|
||||
for s in all_series:
|
||||
if s.folder == current_name:
|
||||
source_series = s
|
||||
break
|
||||
if source_series is not None:
|
||||
await AnimeSeriesService.delete(db, source_series.id)
|
||||
logger.info(
|
||||
"Deleted source DB record for '%s' (id=%s) — target folder '%s' retains DB record",
|
||||
current_name,
|
||||
source_series.id,
|
||||
expected_name,
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"No DB record found for source folder '%s' — folder removed only",
|
||||
current_name,
|
||||
)
|
||||
|
||||
|
||||
async def validate_and_rename_series_folders(dry_run: bool = False) -> dict[str, int]:
|
||||
"""Validate and rename series folders to match NFO metadata.
|
||||
|
||||
Iterates over every subfolder in ``settings.anime_directory`` that
|
||||
contains a ``tvshow.nfo``. For each folder:
|
||||
|
||||
1. Parse the NFO to extract ``<title>`` and ``<year>``.
|
||||
2. Compute the expected folder name: ``f"{title} ({year})"``.
|
||||
3. Sanitise the expected name for filesystem safety.
|
||||
4. Compare with the current folder name.
|
||||
5. If different, rename the folder and update the database.
|
||||
|
||||
Skips folders where title or year is missing/empty. Logs every
|
||||
rename action.
|
||||
|
||||
Args:
|
||||
dry_run: If True, simulate rename operations without actually
|
||||
moving folders or updating the database.
|
||||
|
||||
Returns:
|
||||
Dictionary with counts:
|
||||
- ``"scanned"``: total folders scanned
|
||||
- ``"renamed"``: folders renamed
|
||||
- ``"skipped"``: folders skipped (missing title/year)
|
||||
- ``"errors"``: folders that caused an error
|
||||
"""
|
||||
if not settings.anime_directory:
|
||||
logger.warning("Folder rename skipped — anime directory not configured")
|
||||
return RenameStats().to_dict()
|
||||
|
||||
anime_dir = Path(settings.anime_directory)
|
||||
if not anime_dir.is_dir():
|
||||
logger.warning("Folder rename skipped — anime directory not found: %s", anime_dir)
|
||||
return RenameStats().to_dict()
|
||||
|
||||
if dry_run:
|
||||
logger.info("Running in DRY-RUN mode — no changes will be made")
|
||||
|
||||
stats = RenameStats()
|
||||
pre_existing_duplicates: set[str] = set()
|
||||
duplicates = _scan_for_pre_existing_duplicates(anime_dir)
|
||||
|
||||
for dup_group in duplicates:
|
||||
if _try_merge_duplicate_group(dup_group, dry_run=dry_run):
|
||||
logger.info(
|
||||
"Auto-merged duplicate group for '%s' (%d folders)",
|
||||
dup_group.key,
|
||||
dup_group.count,
|
||||
)
|
||||
else:
|
||||
for folder in dup_group.folders:
|
||||
pre_existing_duplicates.add(folder)
|
||||
logger.warning(
|
||||
"Duplicate folders detected for series '%s': %s — "
|
||||
"manual cleanup required (different releases or non-empty duplicates)",
|
||||
dup_group.key,
|
||||
dup_group.folders,
|
||||
)
|
||||
|
||||
for series_dir in sorted(anime_dir.iterdir()):
|
||||
if not series_dir.is_dir():
|
||||
continue
|
||||
|
||||
nfo_path = series_dir / "tvshow.nfo"
|
||||
if not nfo_path.exists():
|
||||
continue
|
||||
|
||||
stats.scanned += 1
|
||||
|
||||
title, year = _parse_nfo_title_and_year(nfo_path)
|
||||
if not title or not year:
|
||||
logger.info(
|
||||
"Skipping rename for '%s' — missing title or year in NFO",
|
||||
series_dir.name,
|
||||
)
|
||||
stats.skipped += 1
|
||||
continue
|
||||
|
||||
expected_name = _compute_expected_folder_name(title, year)
|
||||
current_name = series_dir.name
|
||||
|
||||
if expected_name == current_name:
|
||||
logger.debug("Folder name already correct: '%s'", current_name)
|
||||
continue
|
||||
|
||||
if _is_series_being_downloaded(current_name):
|
||||
logger.info(
|
||||
"Skipping rename for '%s' — series has active or pending downloads",
|
||||
current_name,
|
||||
)
|
||||
stats.skipped += 1
|
||||
continue
|
||||
|
||||
expected_path = anime_dir / expected_name
|
||||
|
||||
if current_name in pre_existing_duplicates:
|
||||
logger.warning(
|
||||
"Skipping rename for '%s' — pre-existing duplicate folder detected",
|
||||
current_name,
|
||||
)
|
||||
stats.errors += 1
|
||||
continue
|
||||
|
||||
if expected_path.exists():
|
||||
if _remove_duplicate_target_folder(series_dir, current_name, expected_name, expected_path):
|
||||
stats.renamed += 1
|
||||
else:
|
||||
stats.errors += 1
|
||||
continue
|
||||
|
||||
if len(str(expected_path)) > 4096:
|
||||
logger.warning(
|
||||
"Cannot rename '%s' → '%s' — path exceeds OS limit",
|
||||
current_name,
|
||||
expected_name,
|
||||
)
|
||||
stats.errors += 1
|
||||
continue
|
||||
|
||||
if dry_run:
|
||||
logger.info("[DRY-RUN] Would rename folder: '%s' → '%s'", current_name, expected_name)
|
||||
stats.renamed += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
old_path = series_dir
|
||||
series_dir.rename(expected_path)
|
||||
logger.info("Renamed folder: '%s' → '%s'", current_name, expected_name)
|
||||
stats.renamed += 1
|
||||
|
||||
await _update_database_paths(current_name, expected_name, anime_dir)
|
||||
_remove_key_file(expected_path)
|
||||
_cleanup_orphaned_folder(old_path, expected_path, dry_run=False)
|
||||
|
||||
except PermissionError as exc:
|
||||
logger.error(
|
||||
"Permission denied renaming '%s' → '%s': %s",
|
||||
current_name,
|
||||
expected_name,
|
||||
exc,
|
||||
)
|
||||
stats.errors += 1
|
||||
except OSError as exc:
|
||||
logger.error(
|
||||
"OS error renaming '%s' → '%s': %s",
|
||||
current_name,
|
||||
expected_name,
|
||||
exc,
|
||||
)
|
||||
stats.errors += 1
|
||||
|
||||
logger.info(
|
||||
"Folder rename scan complete: scanned=%d, renamed=%d, skipped=%d, errors=%d",
|
||||
stats.scanned,
|
||||
stats.renamed,
|
||||
stats.skipped,
|
||||
stats.errors,
|
||||
)
|
||||
return stats.to_dict()
|
||||
@@ -200,22 +200,7 @@ class FolderScanService:
|
||||
await perform_nfo_repair_scan(background_loader=None)
|
||||
logger.info("NFO repair scan complete")
|
||||
|
||||
# 1.4 — Validate and rename series folders after NFO repair.
|
||||
logger.info("Starting folder rename validation")
|
||||
from src.server.services.scheduler.folder_rename_service import (
|
||||
validate_and_rename_series_folders,
|
||||
)
|
||||
|
||||
rename_stats = await validate_and_rename_series_folders()
|
||||
logger.info(
|
||||
"Folder rename validation complete",
|
||||
scanned=rename_stats["scanned"],
|
||||
renamed=rename_stats["renamed"],
|
||||
skipped=rename_stats["skipped"],
|
||||
errors=rename_stats["errors"],
|
||||
)
|
||||
|
||||
# 1.5 — Check and download missing poster.jpg files.
|
||||
# 1.4 — Check and download missing poster.jpg files.
|
||||
logger.info("Starting poster check")
|
||||
poster_stats = await self.check_and_download_missing_posters()
|
||||
logger.info(
|
||||
|
||||
@@ -1,109 +0,0 @@
|
||||
"""Integration tests for folder rename service wiring.
|
||||
|
||||
These tests verify that:
|
||||
1. FolderScanService.run_folder_scan calls validate_and_rename_series_folders.
|
||||
2. The rename logic is properly integrated into the scheduled folder scan.
|
||||
"""
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class TestFolderRenameScanCalledInFolderScan:
|
||||
"""Verify validate_and_rename_series_folders is invoked from FolderScanService."""
|
||||
|
||||
def test_validate_and_rename_imported_in_folder_scan_service(self):
|
||||
"""folder_scan_service.py imports validate_and_rename_series_folders."""
|
||||
import importlib
|
||||
|
||||
source = importlib.util.find_spec(
|
||||
"src.server.services.scheduler.folder_scan_service"
|
||||
).origin
|
||||
with open(source, "r", encoding="utf-8") as fh:
|
||||
content = fh.read()
|
||||
|
||||
assert "validate_and_rename_series_folders" in content, (
|
||||
"validate_and_rename_series_folders must be imported in folder_scan_service.py"
|
||||
)
|
||||
|
||||
def test_validate_and_rename_called_in_run_folder_scan(self):
|
||||
"""validate_and_rename_series_folders must be called inside run_folder_scan."""
|
||||
import importlib
|
||||
|
||||
source = importlib.util.find_spec(
|
||||
"src.server.services.scheduler.folder_scan_service"
|
||||
).origin
|
||||
with open(source, "r", encoding="utf-8") as fh:
|
||||
content = fh.read()
|
||||
|
||||
run_folder_scan_pos = content.find("def run_folder_scan")
|
||||
rename_call_pos = content.find("validate_and_rename_series_folders()")
|
||||
|
||||
assert run_folder_scan_pos != -1, "run_folder_scan method not found"
|
||||
assert rename_call_pos != -1, "validate_and_rename_series_folders call not found"
|
||||
assert rename_call_pos > run_folder_scan_pos, (
|
||||
"validate_and_rename_series_folders must be called INSIDE run_folder_scan"
|
||||
)
|
||||
|
||||
|
||||
class TestFolderRenameIntegration:
|
||||
"""Integration test: folder rename is triggered during folder scan."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_folder_rename_runs_during_scan(self, tmp_path):
|
||||
"""When folder_scan_enabled is true, the scan renames mismatched folders."""
|
||||
from src.server.services.scheduler.folder_scan_service import FolderScanService
|
||||
|
||||
anime_dir = tmp_path / "anime"
|
||||
anime_dir.mkdir()
|
||||
series_dir = anime_dir / "Attack on Titan"
|
||||
series_dir.mkdir()
|
||||
(series_dir / "tvshow.nfo").write_text(
|
||||
"<tvshow><title>Attack on Titan</title><year>2013</year></tvshow>"
|
||||
)
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.tmdb_api_key = "test-key"
|
||||
mock_settings.anime_directory = str(anime_dir)
|
||||
|
||||
with patch(
|
||||
"src.config.settings.settings", mock_settings
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_rename_service.settings", mock_settings
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_scan_service.perform_nfo_repair_scan",
|
||||
new_callable=AsyncMock,
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_rename_service._is_series_being_downloaded",
|
||||
return_value=False,
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_rename_service._update_database_paths",
|
||||
new_callable=AsyncMock,
|
||||
):
|
||||
service = FolderScanService()
|
||||
await service.run_folder_scan()
|
||||
|
||||
assert not series_dir.exists()
|
||||
assert (anime_dir / "Attack on Titan (2013)").is_dir()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_folder_rename_skipped_when_prerequisites_not_met(self, tmp_path):
|
||||
"""If anime directory is missing, rename logic is skipped gracefully."""
|
||||
from src.server.services.scheduler.folder_scan_service import FolderScanService
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.tmdb_api_key = "test-key"
|
||||
mock_settings.anime_directory = str(tmp_path / "nonexistent")
|
||||
|
||||
with patch(
|
||||
"src.config.settings.settings", mock_settings
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_scan_service.perform_nfo_repair_scan",
|
||||
new_callable=AsyncMock,
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_rename_service.validate_and_rename_series_folders"
|
||||
) as mock_rename:
|
||||
service = FolderScanService()
|
||||
await service.run_folder_scan()
|
||||
|
||||
mock_rename.assert_not_called()
|
||||
@@ -93,10 +93,6 @@ class TestPosterCheckIntegration:
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_scan_service.perform_nfo_repair_scan",
|
||||
new_callable=AsyncMock,
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_rename_service.validate_and_rename_series_folders",
|
||||
new_callable=AsyncMock,
|
||||
return_value={"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0},
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_scan_service.ImageDownloader",
|
||||
new=MockDownloader,
|
||||
@@ -138,10 +134,6 @@ class TestPosterCheckIntegration:
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_scan_service.perform_nfo_repair_scan",
|
||||
new_callable=AsyncMock,
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_rename_service.validate_and_rename_series_folders",
|
||||
new_callable=AsyncMock,
|
||||
return_value={"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0},
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_scan_service.ImageDownloader"
|
||||
) as mock_downloader_cls:
|
||||
@@ -175,10 +167,6 @@ class TestPosterCheckIntegration:
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_scan_service.perform_nfo_repair_scan",
|
||||
new_callable=AsyncMock,
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_rename_service.validate_and_rename_series_folders",
|
||||
new_callable=AsyncMock,
|
||||
return_value={"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0},
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_scan_service.ImageDownloader"
|
||||
) as mock_downloader_cls:
|
||||
@@ -202,8 +190,6 @@ class TestPosterCheckIntegration:
|
||||
"src.server.services.scheduler.folder_scan_service.perform_nfo_repair_scan",
|
||||
new_callable=AsyncMock,
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_rename_service.validate_and_rename_series_folders"
|
||||
) as mock_rename, patch(
|
||||
"src.server.services.scheduler.folder_scan_service.ImageDownloader"
|
||||
) as mock_downloader_cls:
|
||||
service = FolderScanService()
|
||||
@@ -272,10 +258,6 @@ class TestPosterCheckSemaphore:
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_scan_service.perform_nfo_repair_scan",
|
||||
new_callable=AsyncMock,
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_rename_service.validate_and_rename_series_folders",
|
||||
new_callable=AsyncMock,
|
||||
return_value={"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0},
|
||||
), patch(
|
||||
"src.server.services.scheduler.folder_scan_service.ImageDownloader"
|
||||
) as mock_downloader_cls:
|
||||
|
||||
Reference in New Issue
Block a user