Add orphaned folder cleanup after rename
- Add _cleanup_orphaned_folder() to delete/move old folder contents after rename - Empty folders: delete directly via rmdir() - Non-empty folders: move contents to new path, then delete old folder - Handle PermissionError and OSError gracefully with logging - Add dry_run parameter to preview changes without applying them - Add --dry-run support to validate_and_rename_series_folders() - Add unit tests for _cleanup_orphaned_folder and dry-run mode - All 66 related tests pass Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -138,6 +138,113 @@ def _cleanup_stale_files_after_rename(new_path: Path, new_name: str) -> None:
|
||||
)
|
||||
|
||||
|
||||
def _cleanup_orphaned_folder(old_path: Path, new_path: Path, dry_run: bool = False) -> bool:
|
||||
"""Clean up orphaned folder after successful rename.
|
||||
|
||||
After a folder is successfully renamed to new_path, this function checks
|
||||
if the old_path still exists (orphaned folder) and removes it. If the
|
||||
old folder contains files, they are moved to new_path before deletion.
|
||||
|
||||
Args:
|
||||
old_path: The original folder path before rename.
|
||||
new_path: The new folder path after rename.
|
||||
dry_run: If True, only log actions without executing them.
|
||||
|
||||
Returns:
|
||||
True if old folder was cleaned up (or would be in dry-run mode),
|
||||
False if old folder does not exist or cleanup failed.
|
||||
"""
|
||||
if not old_path.exists():
|
||||
logger.debug(
|
||||
"Old folder does not exist, no cleanup needed: %s", old_path
|
||||
)
|
||||
return False
|
||||
|
||||
# Check if folder is empty
|
||||
try:
|
||||
contents = list(old_path.iterdir())
|
||||
except PermissionError as exc:
|
||||
logger.warning(
|
||||
"Permission denied accessing old folder %s: %s", old_path, exc
|
||||
)
|
||||
return False
|
||||
except OSError as exc:
|
||||
logger.warning(
|
||||
"OS error accessing old folder %s: %s", old_path, exc
|
||||
)
|
||||
return False
|
||||
|
||||
if not contents:
|
||||
# Empty folder — delete it
|
||||
if dry_run:
|
||||
logger.info(
|
||||
"[DRY-RUN] Would delete empty orphaned folder: %s", old_path
|
||||
)
|
||||
return True
|
||||
try:
|
||||
old_path.rmdir()
|
||||
logger.info("Deleted empty orphaned folder: %s", old_path)
|
||||
return True
|
||||
except PermissionError as exc:
|
||||
logger.warning(
|
||||
"Permission denied deleting folder %s: %s", old_path, exc
|
||||
)
|
||||
return False
|
||||
except OSError as exc:
|
||||
logger.warning(
|
||||
"OS error deleting folder %s: %s", old_path, exc
|
||||
)
|
||||
return False
|
||||
|
||||
# Folder has contents — move files to new_path then delete
|
||||
if dry_run:
|
||||
logger.info(
|
||||
"[DRY-RUN] Would move %d files from orphaned folder %s to %s",
|
||||
len(contents), old_path, new_path
|
||||
)
|
||||
for item in contents:
|
||||
logger.info("[DRY-RUN] Would move: %s → %s", item, new_path / item.name)
|
||||
logger.info("[DRY-RUN] Would then delete orphaned folder: %s", old_path)
|
||||
return True
|
||||
|
||||
files_moved = 0
|
||||
errors = 0
|
||||
for item in contents:
|
||||
try:
|
||||
dest = new_path / item.name
|
||||
item.rename(dest)
|
||||
logger.debug("Moved %s → %s", item, dest)
|
||||
files_moved += 1
|
||||
except PermissionError as exc:
|
||||
logger.warning(
|
||||
"Permission denied moving %s: %s", item, exc
|
||||
)
|
||||
errors += 1
|
||||
except OSError as exc:
|
||||
logger.warning(
|
||||
"OS error moving %s: %s", item, exc
|
||||
)
|
||||
errors += 1
|
||||
|
||||
if files_moved > 0:
|
||||
logger.info(
|
||||
"Moved %d files from orphaned folder to %s",
|
||||
files_moved, new_path
|
||||
)
|
||||
|
||||
# Delete the now-empty old folder
|
||||
try:
|
||||
old_path.rmdir()
|
||||
logger.info("Deleted orphaned folder after moving contents: %s", old_path)
|
||||
return errors == 0
|
||||
except OSError as exc:
|
||||
logger.warning(
|
||||
"Could not delete orphaned folder %s (may not be empty): %s",
|
||||
old_path, exc
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
async def _update_database_paths(
|
||||
old_folder: str,
|
||||
new_folder: str,
|
||||
@@ -234,7 +341,7 @@ async def _update_database_paths(
|
||||
)
|
||||
|
||||
|
||||
async def validate_and_rename_series_folders() -> Dict[str, int]:
|
||||
async def validate_and_rename_series_folders(dry_run: bool = False) -> Dict[str, int]:
|
||||
"""Validate and rename series folders to match NFO metadata.
|
||||
|
||||
Iterates over every subfolder in ``settings.anime_directory`` that
|
||||
@@ -249,6 +356,10 @@ async def validate_and_rename_series_folders() -> Dict[str, int]:
|
||||
Skips folders where title or year is missing/empty. Logs every
|
||||
rename action.
|
||||
|
||||
Args:
|
||||
dry_run: If True, simulate rename operations without actually
|
||||
moving folders or updating the database.
|
||||
|
||||
Returns:
|
||||
Dictionary with counts:
|
||||
- ``"scanned"``: total folders scanned
|
||||
@@ -267,6 +378,9 @@ async def validate_and_rename_series_folders() -> Dict[str, int]:
|
||||
)
|
||||
return {"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0}
|
||||
|
||||
if dry_run:
|
||||
logger.info("Running in DRY-RUN mode — no changes will be made")
|
||||
|
||||
stats = {"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0}
|
||||
|
||||
for series_dir in sorted(anime_dir.iterdir()):
|
||||
@@ -328,7 +442,17 @@ async def validate_and_rename_series_folders() -> Dict[str, int]:
|
||||
stats["errors"] += 1
|
||||
continue
|
||||
|
||||
if dry_run:
|
||||
logger.info(
|
||||
"[DRY-RUN] Would rename folder: '%s' → '%s'",
|
||||
current_name,
|
||||
expected_name,
|
||||
)
|
||||
stats["renamed"] += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
old_path = series_dir
|
||||
series_dir.rename(expected_path)
|
||||
logger.info(
|
||||
"Renamed folder: '%s' → '%s'", current_name, expected_name
|
||||
@@ -341,6 +465,9 @@ async def validate_and_rename_series_folders() -> Dict[str, int]:
|
||||
# Clean up stale/legacy files after successful rename
|
||||
_cleanup_stale_files_after_rename(expected_path, expected_name)
|
||||
|
||||
# Clean up orphaned folder if old path still exists
|
||||
_cleanup_orphaned_folder(old_path, expected_path, dry_run=False)
|
||||
|
||||
except PermissionError as exc:
|
||||
logger.error(
|
||||
"Permission denied renaming '%s' → '%s': %s",
|
||||
|
||||
@@ -9,6 +9,7 @@ from unittest.mock import AsyncMock, MagicMock, patch
|
||||
import pytest
|
||||
|
||||
from src.server.services.folder_rename_service import (
|
||||
_cleanup_orphaned_folder,
|
||||
_compute_expected_folder_name,
|
||||
_is_series_being_downloaded,
|
||||
_parse_nfo_title_and_year,
|
||||
@@ -278,6 +279,71 @@ class TestUpdateDatabasePaths:
|
||||
assert mock_episode.file_path == str(new_path)
|
||||
|
||||
|
||||
class TestCleanupOrphanedFolder:
|
||||
"""Tests for _cleanup_orphaned_folder."""
|
||||
|
||||
def test_returns_false_when_old_folder_does_not_exist(self, tmp_path: Path) -> None:
|
||||
old_path = tmp_path / "nonexistent"
|
||||
new_path = tmp_path / "new"
|
||||
result = _cleanup_orphaned_folder(old_path, new_path)
|
||||
assert result is False
|
||||
|
||||
def test_deletes_empty_folder(self, tmp_path: Path) -> None:
|
||||
old_path = tmp_path / "empty_orphan"
|
||||
old_path.mkdir()
|
||||
new_path = tmp_path / "new"
|
||||
new_path.mkdir()
|
||||
result = _cleanup_orphaned_folder(old_path, new_path)
|
||||
assert result is True
|
||||
assert not old_path.exists()
|
||||
|
||||
def test_moves_files_and_deletes_folder(self, tmp_path: Path) -> None:
|
||||
old_path = tmp_path / "old_orphan"
|
||||
old_path.mkdir()
|
||||
new_path = tmp_path / "new"
|
||||
new_path.mkdir()
|
||||
file1 = old_path / "S01E01.mkv"
|
||||
file1.write_text("episode 1")
|
||||
file2 = old_path / "S01E02.mkv"
|
||||
file2.write_text("episode 2")
|
||||
result = _cleanup_orphaned_folder(old_path, new_path)
|
||||
assert result is True
|
||||
assert not old_path.exists()
|
||||
assert (new_path / "S01E01.mkv").exists()
|
||||
assert (new_path / "S01E02.mkv").exists()
|
||||
|
||||
def test_dry_run_does_not_delete_empty_folder(self, tmp_path: Path) -> None:
|
||||
old_path = tmp_path / "empty_orphan"
|
||||
old_path.mkdir()
|
||||
new_path = tmp_path / "new"
|
||||
new_path.mkdir()
|
||||
result = _cleanup_orphaned_folder(old_path, new_path, dry_run=True)
|
||||
assert result is True
|
||||
assert old_path.exists()
|
||||
|
||||
def test_dry_run_does_not_move_files(self, tmp_path: Path) -> None:
|
||||
old_path = tmp_path / "old_orphan"
|
||||
old_path.mkdir()
|
||||
new_path = tmp_path / "new"
|
||||
new_path.mkdir()
|
||||
file1 = old_path / "S01E01.mkv"
|
||||
file1.write_text("episode 1")
|
||||
result = _cleanup_orphaned_folder(old_path, new_path, dry_run=True)
|
||||
assert result is True
|
||||
assert old_path.exists()
|
||||
assert not (new_path / "S01E01.mkv").exists()
|
||||
|
||||
def test_handles_permission_error_gracefully(self, tmp_path: Path) -> None:
|
||||
old_path = tmp_path / "permission_denied"
|
||||
old_path.mkdir()
|
||||
new_path = tmp_path / "new"
|
||||
new_path.mkdir()
|
||||
# Simulate permission error by patching rmdir
|
||||
with patch.object(Path, "rmdir", side_effect=PermissionError("Access denied")):
|
||||
result = _cleanup_orphaned_folder(old_path, new_path)
|
||||
assert result is False
|
||||
|
||||
|
||||
class TestValidateAndRenameSeriesFolders:
|
||||
"""Integration-style tests for validate_and_rename_series_folders."""
|
||||
|
||||
@@ -459,3 +525,30 @@ class TestValidateAndRenameSeriesFolders:
|
||||
assert (anime_dir / "Show A (2020)").is_dir()
|
||||
assert d2.is_dir()
|
||||
assert d3.is_dir()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dry_run_does_not_rename_folders(self, tmp_path: Path) -> None:
|
||||
anime_dir = tmp_path / "anime"
|
||||
anime_dir.mkdir()
|
||||
series_dir = anime_dir / "Attack on Titan"
|
||||
series_dir.mkdir()
|
||||
(series_dir / "tvshow.nfo").write_text(
|
||||
"<tvshow><title>Attack on Titan</title><year>2013</year></tvshow>"
|
||||
)
|
||||
|
||||
with patch(
|
||||
"src.server.services.folder_rename_service.settings.anime_directory",
|
||||
str(anime_dir),
|
||||
), patch(
|
||||
"src.server.services.folder_rename_service._is_series_being_downloaded",
|
||||
return_value=False,
|
||||
):
|
||||
stats = await validate_and_rename_series_folders(dry_run=True)
|
||||
|
||||
assert stats["scanned"] == 1
|
||||
assert stats["renamed"] == 1
|
||||
assert stats["skipped"] == 0
|
||||
assert stats["errors"] == 0
|
||||
# Original folder should still exist (not renamed in dry-run)
|
||||
assert series_dir.is_dir()
|
||||
assert not (anime_dir / "Attack on Titan (2013)").exists()
|
||||
|
||||
Reference in New Issue
Block a user