From 239341629cb697633896ba0f49ee258b00ff4ad1 Mon Sep 17 00:00:00 2001 From: Lukas Date: Thu, 28 May 2026 21:24:06 +0200 Subject: [PATCH] Add orphaned folder cleanup after rename - Add _cleanup_orphaned_folder() to delete/move old folder contents after rename - Empty folders: delete directly via rmdir() - Non-empty folders: move contents to new path, then delete old folder - Handle PermissionError and OSError gracefully with logging - Add dry_run parameter to preview changes without applying them - Add --dry-run support to validate_and_rename_series_folders() - Add unit tests for _cleanup_orphaned_folder and dry-run mode - All 66 related tests pass Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/server/services/folder_rename_service.py | 129 ++++++++++++++++++- tests/unit/test_folder_rename_service.py | 93 +++++++++++++ 2 files changed, 221 insertions(+), 1 deletion(-) diff --git a/src/server/services/folder_rename_service.py b/src/server/services/folder_rename_service.py index 7a96f04..9c7f4fd 100644 --- a/src/server/services/folder_rename_service.py +++ b/src/server/services/folder_rename_service.py @@ -138,6 +138,113 @@ def _cleanup_stale_files_after_rename(new_path: Path, new_name: str) -> None: ) +def _cleanup_orphaned_folder(old_path: Path, new_path: Path, dry_run: bool = False) -> bool: + """Clean up orphaned folder after successful rename. + + After a folder is successfully renamed to new_path, this function checks + if the old_path still exists (orphaned folder) and removes it. If the + old folder contains files, they are moved to new_path before deletion. + + Args: + old_path: The original folder path before rename. + new_path: The new folder path after rename. + dry_run: If True, only log actions without executing them. + + Returns: + True if old folder was cleaned up (or would be in dry-run mode), + False if old folder does not exist or cleanup failed. + """ + if not old_path.exists(): + logger.debug( + "Old folder does not exist, no cleanup needed: %s", old_path + ) + return False + + # Check if folder is empty + try: + contents = list(old_path.iterdir()) + except PermissionError as exc: + logger.warning( + "Permission denied accessing old folder %s: %s", old_path, exc + ) + return False + except OSError as exc: + logger.warning( + "OS error accessing old folder %s: %s", old_path, exc + ) + return False + + if not contents: + # Empty folder — delete it + if dry_run: + logger.info( + "[DRY-RUN] Would delete empty orphaned folder: %s", old_path + ) + return True + try: + old_path.rmdir() + logger.info("Deleted empty orphaned folder: %s", old_path) + return True + except PermissionError as exc: + logger.warning( + "Permission denied deleting folder %s: %s", old_path, exc + ) + return False + except OSError as exc: + logger.warning( + "OS error deleting folder %s: %s", old_path, exc + ) + return False + + # Folder has contents — move files to new_path then delete + if dry_run: + logger.info( + "[DRY-RUN] Would move %d files from orphaned folder %s to %s", + len(contents), old_path, new_path + ) + for item in contents: + logger.info("[DRY-RUN] Would move: %s → %s", item, new_path / item.name) + logger.info("[DRY-RUN] Would then delete orphaned folder: %s", old_path) + return True + + files_moved = 0 + errors = 0 + for item in contents: + try: + dest = new_path / item.name + item.rename(dest) + logger.debug("Moved %s → %s", item, dest) + files_moved += 1 + except PermissionError as exc: + logger.warning( + "Permission denied moving %s: %s", item, exc + ) + errors += 1 + except OSError as exc: + logger.warning( + "OS error moving %s: %s", item, exc + ) + errors += 1 + + if files_moved > 0: + logger.info( + "Moved %d files from orphaned folder to %s", + files_moved, new_path + ) + + # Delete the now-empty old folder + try: + old_path.rmdir() + logger.info("Deleted orphaned folder after moving contents: %s", old_path) + return errors == 0 + except OSError as exc: + logger.warning( + "Could not delete orphaned folder %s (may not be empty): %s", + old_path, exc + ) + return False + + async def _update_database_paths( old_folder: str, new_folder: str, @@ -234,7 +341,7 @@ async def _update_database_paths( ) -async def validate_and_rename_series_folders() -> Dict[str, int]: +async def validate_and_rename_series_folders(dry_run: bool = False) -> Dict[str, int]: """Validate and rename series folders to match NFO metadata. Iterates over every subfolder in ``settings.anime_directory`` that @@ -249,6 +356,10 @@ async def validate_and_rename_series_folders() -> Dict[str, int]: Skips folders where title or year is missing/empty. Logs every rename action. + Args: + dry_run: If True, simulate rename operations without actually + moving folders or updating the database. + Returns: Dictionary with counts: - ``"scanned"``: total folders scanned @@ -267,6 +378,9 @@ async def validate_and_rename_series_folders() -> Dict[str, int]: ) return {"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0} + if dry_run: + logger.info("Running in DRY-RUN mode — no changes will be made") + stats = {"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0} for series_dir in sorted(anime_dir.iterdir()): @@ -328,7 +442,17 @@ async def validate_and_rename_series_folders() -> Dict[str, int]: stats["errors"] += 1 continue + if dry_run: + logger.info( + "[DRY-RUN] Would rename folder: '%s' → '%s'", + current_name, + expected_name, + ) + stats["renamed"] += 1 + continue + try: + old_path = series_dir series_dir.rename(expected_path) logger.info( "Renamed folder: '%s' → '%s'", current_name, expected_name @@ -341,6 +465,9 @@ async def validate_and_rename_series_folders() -> Dict[str, int]: # Clean up stale/legacy files after successful rename _cleanup_stale_files_after_rename(expected_path, expected_name) + # Clean up orphaned folder if old path still exists + _cleanup_orphaned_folder(old_path, expected_path, dry_run=False) + except PermissionError as exc: logger.error( "Permission denied renaming '%s' → '%s': %s", diff --git a/tests/unit/test_folder_rename_service.py b/tests/unit/test_folder_rename_service.py index 1d8f736..8a5af8b 100644 --- a/tests/unit/test_folder_rename_service.py +++ b/tests/unit/test_folder_rename_service.py @@ -9,6 +9,7 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest from src.server.services.folder_rename_service import ( + _cleanup_orphaned_folder, _compute_expected_folder_name, _is_series_being_downloaded, _parse_nfo_title_and_year, @@ -278,6 +279,71 @@ class TestUpdateDatabasePaths: assert mock_episode.file_path == str(new_path) +class TestCleanupOrphanedFolder: + """Tests for _cleanup_orphaned_folder.""" + + def test_returns_false_when_old_folder_does_not_exist(self, tmp_path: Path) -> None: + old_path = tmp_path / "nonexistent" + new_path = tmp_path / "new" + result = _cleanup_orphaned_folder(old_path, new_path) + assert result is False + + def test_deletes_empty_folder(self, tmp_path: Path) -> None: + old_path = tmp_path / "empty_orphan" + old_path.mkdir() + new_path = tmp_path / "new" + new_path.mkdir() + result = _cleanup_orphaned_folder(old_path, new_path) + assert result is True + assert not old_path.exists() + + def test_moves_files_and_deletes_folder(self, tmp_path: Path) -> None: + old_path = tmp_path / "old_orphan" + old_path.mkdir() + new_path = tmp_path / "new" + new_path.mkdir() + file1 = old_path / "S01E01.mkv" + file1.write_text("episode 1") + file2 = old_path / "S01E02.mkv" + file2.write_text("episode 2") + result = _cleanup_orphaned_folder(old_path, new_path) + assert result is True + assert not old_path.exists() + assert (new_path / "S01E01.mkv").exists() + assert (new_path / "S01E02.mkv").exists() + + def test_dry_run_does_not_delete_empty_folder(self, tmp_path: Path) -> None: + old_path = tmp_path / "empty_orphan" + old_path.mkdir() + new_path = tmp_path / "new" + new_path.mkdir() + result = _cleanup_orphaned_folder(old_path, new_path, dry_run=True) + assert result is True + assert old_path.exists() + + def test_dry_run_does_not_move_files(self, tmp_path: Path) -> None: + old_path = tmp_path / "old_orphan" + old_path.mkdir() + new_path = tmp_path / "new" + new_path.mkdir() + file1 = old_path / "S01E01.mkv" + file1.write_text("episode 1") + result = _cleanup_orphaned_folder(old_path, new_path, dry_run=True) + assert result is True + assert old_path.exists() + assert not (new_path / "S01E01.mkv").exists() + + def test_handles_permission_error_gracefully(self, tmp_path: Path) -> None: + old_path = tmp_path / "permission_denied" + old_path.mkdir() + new_path = tmp_path / "new" + new_path.mkdir() + # Simulate permission error by patching rmdir + with patch.object(Path, "rmdir", side_effect=PermissionError("Access denied")): + result = _cleanup_orphaned_folder(old_path, new_path) + assert result is False + + class TestValidateAndRenameSeriesFolders: """Integration-style tests for validate_and_rename_series_folders.""" @@ -459,3 +525,30 @@ class TestValidateAndRenameSeriesFolders: assert (anime_dir / "Show A (2020)").is_dir() assert d2.is_dir() assert d3.is_dir() + + @pytest.mark.asyncio + async def test_dry_run_does_not_rename_folders(self, tmp_path: Path) -> None: + anime_dir = tmp_path / "anime" + anime_dir.mkdir() + series_dir = anime_dir / "Attack on Titan" + series_dir.mkdir() + (series_dir / "tvshow.nfo").write_text( + "Attack on Titan2013" + ) + + with patch( + "src.server.services.folder_rename_service.settings.anime_directory", + str(anime_dir), + ), patch( + "src.server.services.folder_rename_service._is_series_being_downloaded", + return_value=False, + ): + stats = await validate_and_rename_series_folders(dry_run=True) + + assert stats["scanned"] == 1 + assert stats["renamed"] == 1 + assert stats["skipped"] == 0 + assert stats["errors"] == 0 + # Original folder should still exist (not renamed in dry-run) + assert series_dir.is_dir() + assert not (anime_dir / "Attack on Titan (2013)").exists()