feat: remove startup NFO repair, update docs and tests
- Remove NFO repair scan step from ARCHITECTURE.md startup sequence - Update CHANGELOG.md: rephrase perform_nfo_repair_scan as scheduled scan - Add test verifying perform_nfo_repair_scan is NOT called in lifespan - Keep existing folder scan wiring tests and unit tests intact - NFO_GUIDE.md already correctly describes scheduled scan behavior
This commit is contained in:
331
src/server/services/folder_rename_service.py
Normal file
331
src/server/services/folder_rename_service.py
Normal file
@@ -0,0 +1,331 @@
|
||||
"""Folder rename service for validating and renaming series folders.
|
||||
|
||||
After NFO repair, this service iterates over every subfolder in
|
||||
``settings.anime_directory`` that contains a ``tvshow.nfo``. For each
|
||||
folder it parses the NFO to extract ``<title>`` and ``<year>``, computes
|
||||
the expected folder name ``f"{title} ({year})"``, sanitises it for
|
||||
filesystem safety, and renames the folder if the current name differs.
|
||||
|
||||
Database records (``AnimeSeries.folder``, ``Episode.file_path``,
|
||||
``DownloadQueueItem.file_destination``) are updated atomically to
|
||||
reflect the new paths.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
from lxml import etree
|
||||
|
||||
from src.config.settings import settings
|
||||
from src.server.database.connection import get_db_session
|
||||
from src.server.database.service import (
|
||||
AnimeSeriesService,
|
||||
DownloadQueueService,
|
||||
EpisodeService,
|
||||
)
|
||||
from src.server.utils.dependencies import get_download_service
|
||||
from src.server.utils.filesystem import sanitize_folder_name
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Characters that are invalid in filesystem paths across platforms
|
||||
INVALID_PATH_CHARS = '<>:"/\\|?*\x00'
|
||||
|
||||
|
||||
def _parse_nfo_title_and_year(nfo_path: Path) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Parse a tvshow.nfo and return (title, year) text values.
|
||||
|
||||
Args:
|
||||
nfo_path: Absolute path to the ``tvshow.nfo`` file.
|
||||
|
||||
Returns:
|
||||
Tuple of (title, year) where either may be ``None`` if missing
|
||||
or empty.
|
||||
"""
|
||||
try:
|
||||
tree = etree.parse(str(nfo_path))
|
||||
root = tree.getroot()
|
||||
|
||||
title_elem = root.find("./title")
|
||||
year_elem = root.find("./year")
|
||||
|
||||
title = title_elem.text.strip() if title_elem is not None and title_elem.text and title_elem.text.strip() else None
|
||||
year = year_elem.text.strip() if year_elem is not None and year_elem.text and year_elem.text.strip() else None
|
||||
|
||||
return title, year
|
||||
except etree.XMLSyntaxError as exc:
|
||||
logger.warning("Malformed XML in %s: %s", nfo_path, exc)
|
||||
return None, None
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
logger.warning("Unexpected error parsing %s: %s", nfo_path, exc)
|
||||
return None, None
|
||||
|
||||
|
||||
def _compute_expected_folder_name(title: str, year: str) -> str:
|
||||
"""Compute the expected folder name from title and year.
|
||||
|
||||
Args:
|
||||
title: Series title from NFO.
|
||||
year: Release year from NFO.
|
||||
|
||||
Returns:
|
||||
Sanitised folder name in the format ``"{title} ({year})"``.
|
||||
"""
|
||||
raw_name = f"{title} ({year})"
|
||||
return sanitize_folder_name(raw_name)
|
||||
|
||||
|
||||
def _is_series_being_downloaded(series_folder: str) -> bool:
|
||||
"""Check whether the given series has an active or pending download.
|
||||
|
||||
Args:
|
||||
series_folder: The series folder name (as stored in the DB).
|
||||
|
||||
Returns:
|
||||
``True`` if the series appears in the active download or the
|
||||
pending queue.
|
||||
"""
|
||||
try:
|
||||
download_service = get_download_service()
|
||||
active = download_service._active_download # pylint: disable=protected-access
|
||||
if active and active.serie_folder == series_folder:
|
||||
return True
|
||||
for item in download_service._pending_queue: # pylint: disable=protected-access
|
||||
if item.serie_folder == series_folder:
|
||||
return True
|
||||
return False
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
logger.warning(
|
||||
"Could not check download status for %s: %s", series_folder, exc
|
||||
)
|
||||
# Safer to skip renaming if we can't verify download status.
|
||||
return True
|
||||
|
||||
|
||||
async def _update_database_paths(
|
||||
old_folder: str,
|
||||
new_folder: str,
|
||||
anime_dir: Path,
|
||||
) -> None:
|
||||
"""Update all database records that reference the old folder path.
|
||||
|
||||
Updates:
|
||||
- ``AnimeSeries.folder`` → ``new_folder``
|
||||
- ``Episode.file_path`` → adjusted to new folder
|
||||
- ``DownloadQueueItem.file_destination`` → adjusted to new folder
|
||||
|
||||
Args:
|
||||
old_folder: Previous folder name.
|
||||
new_folder: New folder name.
|
||||
anime_dir: Root anime directory path.
|
||||
"""
|
||||
old_series_path = anime_dir / old_folder
|
||||
new_series_path = anime_dir / new_folder
|
||||
|
||||
async with get_db_session() as db:
|
||||
# 1. Update AnimeSeries.folder
|
||||
series = await AnimeSeriesService.get_by_key(db, old_folder)
|
||||
if series is None:
|
||||
# Fallback: try to find by folder name
|
||||
all_series = await AnimeSeriesService.get_all(db)
|
||||
for s in all_series:
|
||||
if s.folder == old_folder:
|
||||
series = s
|
||||
break
|
||||
|
||||
if series is None:
|
||||
logger.warning(
|
||||
"No database record found for folder '%s', skipping DB update",
|
||||
old_folder,
|
||||
)
|
||||
return
|
||||
|
||||
await AnimeSeriesService.update(db, series.id, folder=new_folder)
|
||||
logger.info(
|
||||
"Updated AnimeSeries.folder: %s → %s (id=%s)",
|
||||
old_folder,
|
||||
new_folder,
|
||||
series.id,
|
||||
)
|
||||
|
||||
# 2. Update Episode.file_path for all episodes of this series
|
||||
episodes = await EpisodeService.get_by_series(db, series.id)
|
||||
for episode in episodes:
|
||||
if episode.file_path:
|
||||
old_file_path = Path(episode.file_path)
|
||||
# Only update if the path is under the old series folder
|
||||
try:
|
||||
old_file_path.relative_to(old_series_path)
|
||||
new_file_path = new_series_path / old_file_path.relative_to(
|
||||
old_series_path
|
||||
)
|
||||
episode.file_path = str(new_file_path)
|
||||
logger.debug(
|
||||
"Updated Episode.file_path: %s → %s",
|
||||
old_file_path,
|
||||
new_file_path,
|
||||
)
|
||||
except ValueError:
|
||||
# Path is not under old_series_path, skip
|
||||
pass
|
||||
|
||||
await db.flush()
|
||||
|
||||
# 3. Update DownloadQueueItem.file_destination for pending items
|
||||
queue_items = await DownloadQueueService.get_all(db, with_series=True)
|
||||
for item in queue_items:
|
||||
if item.series_id == series.id and item.file_destination:
|
||||
old_dest = Path(item.file_destination)
|
||||
try:
|
||||
old_dest.relative_to(old_series_path)
|
||||
new_dest = new_series_path / old_dest.relative_to(
|
||||
old_series_path
|
||||
)
|
||||
item.file_destination = str(new_dest)
|
||||
logger.debug(
|
||||
"Updated DownloadQueueItem.file_destination: %s → %s",
|
||||
old_dest,
|
||||
new_dest,
|
||||
)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
await db.flush()
|
||||
logger.info(
|
||||
"Database paths updated for series '%s' → '%s'",
|
||||
old_folder,
|
||||
new_folder,
|
||||
)
|
||||
|
||||
|
||||
async def validate_and_rename_series_folders() -> Dict[str, int]:
|
||||
"""Validate and rename series folders to match NFO metadata.
|
||||
|
||||
Iterates over every subfolder in ``settings.anime_directory`` that
|
||||
contains a ``tvshow.nfo``. For each folder:
|
||||
|
||||
1. Parse the NFO to extract ``<title>`` and ``<year>``.
|
||||
2. Compute the expected folder name: ``f"{title} ({year})"``.
|
||||
3. Sanitise the expected name for filesystem safety.
|
||||
4. Compare with the current folder name.
|
||||
5. If different, rename the folder and update the database.
|
||||
|
||||
Skips folders where title or year is missing/empty. Logs every
|
||||
rename action.
|
||||
|
||||
Returns:
|
||||
Dictionary with counts:
|
||||
- ``"scanned"``: total folders scanned
|
||||
- ``"renamed"``: folders renamed
|
||||
- ``"skipped"``: folders skipped (missing title/year)
|
||||
- ``"errors"``: folders that caused an error
|
||||
"""
|
||||
if not settings.anime_directory:
|
||||
logger.warning("Folder rename skipped — anime directory not configured")
|
||||
return {"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0}
|
||||
|
||||
anime_dir = Path(settings.anime_directory)
|
||||
if not anime_dir.is_dir():
|
||||
logger.warning(
|
||||
"Folder rename skipped — anime directory not found: %s", anime_dir
|
||||
)
|
||||
return {"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0}
|
||||
|
||||
stats = {"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0}
|
||||
|
||||
for series_dir in sorted(anime_dir.iterdir()):
|
||||
if not series_dir.is_dir():
|
||||
continue
|
||||
|
||||
nfo_path = series_dir / "tvshow.nfo"
|
||||
if not nfo_path.exists():
|
||||
continue
|
||||
|
||||
stats["scanned"] += 1
|
||||
|
||||
title, year = _parse_nfo_title_and_year(nfo_path)
|
||||
if not title or not year:
|
||||
logger.info(
|
||||
"Skipping rename for '%s' — missing title or year in NFO",
|
||||
series_dir.name,
|
||||
)
|
||||
stats["skipped"] += 1
|
||||
continue
|
||||
|
||||
expected_name = _compute_expected_folder_name(title, year)
|
||||
current_name = series_dir.name
|
||||
|
||||
if expected_name == current_name:
|
||||
logger.debug(
|
||||
"Folder name already correct: '%s'", current_name
|
||||
)
|
||||
continue
|
||||
|
||||
# Check for active downloads
|
||||
if _is_series_being_downloaded(current_name):
|
||||
logger.info(
|
||||
"Skipping rename for '%s' — series has active or pending downloads",
|
||||
current_name,
|
||||
)
|
||||
stats["skipped"] += 1
|
||||
continue
|
||||
|
||||
expected_path = anime_dir / expected_name
|
||||
|
||||
# Check for duplicate target
|
||||
if expected_path.exists():
|
||||
logger.warning(
|
||||
"Cannot rename '%s' → '%s' — target already exists",
|
||||
current_name,
|
||||
expected_name,
|
||||
)
|
||||
stats["errors"] += 1
|
||||
continue
|
||||
|
||||
# Check path length limits
|
||||
if len(str(expected_path)) > 4096:
|
||||
logger.warning(
|
||||
"Cannot rename '%s' → '%s' — path exceeds OS limit",
|
||||
current_name,
|
||||
expected_name,
|
||||
)
|
||||
stats["errors"] += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
series_dir.rename(expected_path)
|
||||
logger.info(
|
||||
"Renamed folder: '%s' → '%s'", current_name, expected_name
|
||||
)
|
||||
stats["renamed"] += 1
|
||||
|
||||
# Update database records
|
||||
await _update_database_paths(current_name, expected_name, anime_dir)
|
||||
|
||||
except PermissionError as exc:
|
||||
logger.error(
|
||||
"Permission denied renaming '%s' → '%s': %s",
|
||||
current_name,
|
||||
expected_name,
|
||||
exc,
|
||||
)
|
||||
stats["errors"] += 1
|
||||
except OSError as exc:
|
||||
logger.error(
|
||||
"OS error renaming '%s' → '%s': %s",
|
||||
current_name,
|
||||
expected_name,
|
||||
exc,
|
||||
)
|
||||
stats["errors"] += 1
|
||||
|
||||
logger.info(
|
||||
"Folder rename scan complete: scanned=%d, renamed=%d, skipped=%d, errors=%d",
|
||||
stats["scanned"],
|
||||
stats["renamed"],
|
||||
stats["skipped"],
|
||||
stats["errors"],
|
||||
)
|
||||
return stats
|
||||
Reference in New Issue
Block a user