feat: Enhanced anime add flow with sanitized folders and targeted scan
- Add sanitize_folder_name utility for filesystem-safe folder names - Add sanitized_folder property to Serie entity - Update SerieList.add() to use sanitized display names for folders - Add scan_single_series() method for targeted episode scanning - Enhance add_series endpoint: DB save -> folder create -> targeted scan - Update response to include missing_episodes and total_missing - Add comprehensive unit tests for new functionality - Update API tests with proper mock support
This commit is contained in:
@@ -461,3 +461,188 @@ class SerieScanner:
|
||||
episodes_dict[season] = missing_episodes
|
||||
|
||||
return episodes_dict, "aniworld.to"
|
||||
|
||||
def scan_single_series(
|
||||
self,
|
||||
key: str,
|
||||
folder: str,
|
||||
) -> dict[int, list[int]]:
|
||||
"""
|
||||
Scan a single series for missing episodes.
|
||||
|
||||
This method performs a targeted scan for only the specified series,
|
||||
without triggering a full library rescan. It fetches available
|
||||
episodes from the provider and compares with local files.
|
||||
|
||||
Args:
|
||||
key: The unique provider key for the series
|
||||
folder: The filesystem folder name where the series is stored
|
||||
|
||||
Returns:
|
||||
dict[int, list[int]]: Dictionary mapping season numbers to lists
|
||||
of missing episode numbers. Empty dict if no missing episodes.
|
||||
|
||||
Raises:
|
||||
ValueError: If key or folder is empty
|
||||
|
||||
Example:
|
||||
>>> scanner = SerieScanner("/path/to/anime", loader)
|
||||
>>> missing = scanner.scan_single_series(
|
||||
... "attack-on-titan",
|
||||
... "Attack on Titan"
|
||||
... )
|
||||
>>> print(missing)
|
||||
{1: [5, 6, 7], 2: [1, 2]}
|
||||
"""
|
||||
if not key or not key.strip():
|
||||
raise ValueError("Series key cannot be empty")
|
||||
if not folder or not folder.strip():
|
||||
raise ValueError("Series folder cannot be empty")
|
||||
|
||||
logger.info(
|
||||
"Starting targeted scan for series: %s (folder: %s)",
|
||||
key,
|
||||
folder
|
||||
)
|
||||
|
||||
# Generate unique operation ID for this targeted scan
|
||||
operation_id = str(uuid.uuid4())
|
||||
|
||||
# Notify scan starting
|
||||
self._callback_manager.notify_progress(
|
||||
ProgressContext(
|
||||
operation_type=OperationType.SCAN,
|
||||
operation_id=operation_id,
|
||||
phase=ProgressPhase.STARTING,
|
||||
current=0,
|
||||
total=1,
|
||||
percentage=0.0,
|
||||
message=f"Scanning series: {folder}",
|
||||
details=f"Key: {key}"
|
||||
)
|
||||
)
|
||||
|
||||
try:
|
||||
# Get the folder path
|
||||
folder_path = os.path.join(self.directory, folder)
|
||||
|
||||
# Check if folder exists
|
||||
if not os.path.isdir(folder_path):
|
||||
logger.info(
|
||||
"Series folder does not exist yet: %s - "
|
||||
"will scan for available episodes from provider",
|
||||
folder_path
|
||||
)
|
||||
mp4_files: list[str] = []
|
||||
else:
|
||||
# Find existing MP4 files in the folder
|
||||
mp4_files = []
|
||||
for root, _, files in os.walk(folder_path):
|
||||
for file in files:
|
||||
if file.endswith(".mp4"):
|
||||
mp4_files.append(os.path.join(root, file))
|
||||
|
||||
logger.debug(
|
||||
"Found %d existing MP4 files in folder %s",
|
||||
len(mp4_files),
|
||||
folder
|
||||
)
|
||||
|
||||
# Get missing episodes from provider
|
||||
missing_episodes, site = self.__get_missing_episodes_and_season(
|
||||
key, mp4_files
|
||||
)
|
||||
|
||||
# Update progress
|
||||
self._callback_manager.notify_progress(
|
||||
ProgressContext(
|
||||
operation_type=OperationType.SCAN,
|
||||
operation_id=operation_id,
|
||||
phase=ProgressPhase.IN_PROGRESS,
|
||||
current=1,
|
||||
total=1,
|
||||
percentage=100.0,
|
||||
message=f"Scanned: {folder}",
|
||||
details=f"Found {sum(len(eps) for eps in missing_episodes.values())} missing episodes"
|
||||
)
|
||||
)
|
||||
|
||||
# Create or update Serie in keyDict
|
||||
if key in self.keyDict:
|
||||
# Update existing serie
|
||||
self.keyDict[key].episodeDict = missing_episodes
|
||||
logger.debug(
|
||||
"Updated existing series %s with %d missing episodes",
|
||||
key,
|
||||
sum(len(eps) for eps in missing_episodes.values())
|
||||
)
|
||||
else:
|
||||
# Create new serie entry
|
||||
serie = Serie(
|
||||
key=key,
|
||||
name="", # Will be populated by caller if needed
|
||||
site=site,
|
||||
folder=folder,
|
||||
episodeDict=missing_episodes
|
||||
)
|
||||
self.keyDict[key] = serie
|
||||
logger.debug(
|
||||
"Created new series entry for %s with %d missing episodes",
|
||||
key,
|
||||
sum(len(eps) for eps in missing_episodes.values())
|
||||
)
|
||||
|
||||
# Notify completion
|
||||
self._callback_manager.notify_completion(
|
||||
CompletionContext(
|
||||
operation_type=OperationType.SCAN,
|
||||
operation_id=operation_id,
|
||||
success=True,
|
||||
message=f"Scan completed for {folder}",
|
||||
statistics={
|
||||
"missing_episodes": sum(
|
||||
len(eps) for eps in missing_episodes.values()
|
||||
),
|
||||
"seasons_with_missing": len(missing_episodes)
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Targeted scan completed for %s: %d missing episodes across %d seasons",
|
||||
key,
|
||||
sum(len(eps) for eps in missing_episodes.values()),
|
||||
len(missing_episodes)
|
||||
)
|
||||
|
||||
return missing_episodes
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to scan series {key}: {e}"
|
||||
logger.error(error_msg, exc_info=True)
|
||||
|
||||
# Notify error
|
||||
self._callback_manager.notify_error(
|
||||
ErrorContext(
|
||||
operation_type=OperationType.SCAN,
|
||||
operation_id=operation_id,
|
||||
error=e,
|
||||
message=error_msg,
|
||||
recoverable=True,
|
||||
metadata={"key": key, "folder": folder}
|
||||
)
|
||||
)
|
||||
|
||||
# Notify completion with failure
|
||||
self._callback_manager.notify_completion(
|
||||
CompletionContext(
|
||||
operation_type=OperationType.SCAN,
|
||||
operation_id=operation_id,
|
||||
success=False,
|
||||
message=error_msg
|
||||
)
|
||||
)
|
||||
|
||||
# Return empty dict on error (scan failed but not critical)
|
||||
return {}
|
||||
|
||||
|
||||
@@ -62,30 +62,49 @@ class SerieList:
|
||||
if not skip_load:
|
||||
self.load_series()
|
||||
|
||||
def add(self, serie: Serie) -> None:
|
||||
def add(self, serie: Serie, use_sanitized_folder: bool = True) -> str:
|
||||
"""
|
||||
Persist a new series if it is not already present (file-based mode).
|
||||
|
||||
Uses serie.key for identification. The serie.folder is used for
|
||||
filesystem operations only.
|
||||
Uses serie.key for identification. Creates the filesystem folder
|
||||
using either the sanitized display name (default) or the existing
|
||||
folder property.
|
||||
|
||||
Args:
|
||||
serie: The Serie instance to add
|
||||
use_sanitized_folder: If True (default), use serie.sanitized_folder
|
||||
for the filesystem folder name based on display name.
|
||||
If False, use serie.folder as-is for backward compatibility.
|
||||
|
||||
Returns:
|
||||
str: The folder path that was created/used
|
||||
|
||||
Note:
|
||||
This method creates data files on disk. For database storage,
|
||||
use add_to_db() instead.
|
||||
"""
|
||||
if self.contains(serie.key):
|
||||
return
|
||||
# Return existing folder path
|
||||
existing = self.keyDict[serie.key]
|
||||
return os.path.join(self.directory, existing.folder)
|
||||
|
||||
data_path = os.path.join(self.directory, serie.folder, "data")
|
||||
anime_path = os.path.join(self.directory, serie.folder)
|
||||
# Determine folder name to use
|
||||
if use_sanitized_folder:
|
||||
folder_name = serie.sanitized_folder
|
||||
# Update the serie's folder property to match what we create
|
||||
serie.folder = folder_name
|
||||
else:
|
||||
folder_name = serie.folder
|
||||
|
||||
data_path = os.path.join(self.directory, folder_name, "data")
|
||||
anime_path = os.path.join(self.directory, folder_name)
|
||||
os.makedirs(anime_path, exist_ok=True)
|
||||
if not os.path.isfile(data_path):
|
||||
serie.save_to_file(data_path)
|
||||
# Store by key, not folder
|
||||
self.keyDict[serie.key] = serie
|
||||
|
||||
return anime_path
|
||||
|
||||
def contains(self, key: str) -> bool:
|
||||
"""
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import json
|
||||
import warnings
|
||||
|
||||
from src.server.utils.filesystem import sanitize_folder_name
|
||||
|
||||
|
||||
class Serie:
|
||||
"""
|
||||
@@ -127,6 +129,35 @@ class Serie:
|
||||
def episodeDict(self, value: dict[int, list[int]]):
|
||||
self._episodeDict = value
|
||||
|
||||
@property
|
||||
def sanitized_folder(self) -> str:
|
||||
"""
|
||||
Get a filesystem-safe folder name derived from the display name.
|
||||
|
||||
This property returns a sanitized version of the series name
|
||||
suitable for use as a filesystem folder name. It removes/replaces
|
||||
characters that are invalid for filesystems while preserving
|
||||
Unicode characters.
|
||||
|
||||
Use this property when creating folders for the series on disk.
|
||||
The `folder` property stores the actual folder name used.
|
||||
|
||||
Returns:
|
||||
str: Filesystem-safe folder name based on display name
|
||||
|
||||
Example:
|
||||
>>> serie = Serie("attack-on-titan", "Attack on Titan: Final", ...)
|
||||
>>> serie.sanitized_folder
|
||||
'Attack on Titan Final'
|
||||
"""
|
||||
# Use name if available, fall back to folder, then key
|
||||
name_to_sanitize = self._name or self._folder or self._key
|
||||
try:
|
||||
return sanitize_folder_name(name_to_sanitize)
|
||||
except ValueError:
|
||||
# Fallback to key if name cannot be sanitized
|
||||
return sanitize_folder_name(self._key)
|
||||
|
||||
def to_dict(self):
|
||||
"""Convert Serie object to dictionary for JSON serialization."""
|
||||
return {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import logging
|
||||
import os
|
||||
import warnings
|
||||
from typing import Any, List, Optional
|
||||
|
||||
@@ -21,6 +22,7 @@ from src.server.utils.dependencies import (
|
||||
get_series_app,
|
||||
require_auth,
|
||||
)
|
||||
from src.server.utils.filesystem import sanitize_folder_name
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -620,16 +622,20 @@ async def add_series(
|
||||
_auth: dict = Depends(require_auth),
|
||||
series_app: Any = Depends(get_series_app),
|
||||
db: Optional[AsyncSession] = Depends(get_optional_database_session),
|
||||
anime_service: AnimeService = Depends(get_anime_service),
|
||||
) -> dict:
|
||||
"""Add a new series to the library.
|
||||
"""Add a new series to the library with full initialization.
|
||||
|
||||
Extracts the series `key` from the provided link URL.
|
||||
The `key` is the URL-safe identifier used for all lookups.
|
||||
The `name` is stored as display metadata along with a
|
||||
filesystem-friendly `folder` name derived from the name.
|
||||
This endpoint performs the complete series addition flow:
|
||||
1. Validates inputs and extracts the series key from the link URL
|
||||
2. Creates a sanitized folder name from the display name
|
||||
3. Saves the series to the database (if available)
|
||||
4. Creates the folder on disk with the sanitized name
|
||||
5. Triggers a targeted scan for missing episodes (only this series)
|
||||
|
||||
Series are saved to the database using AnimeSeriesService when
|
||||
database is available, falling back to in-memory storage otherwise.
|
||||
The `key` is the URL-safe identifier used for all lookups.
|
||||
The `name` is stored as display metadata and used to derive
|
||||
the filesystem folder name (sanitized for filesystem safety).
|
||||
|
||||
Args:
|
||||
request: Request containing the series link and name.
|
||||
@@ -638,15 +644,23 @@ async def add_series(
|
||||
_auth: Ensures the caller is authenticated (value unused)
|
||||
series_app: Core `SeriesApp` instance provided via dependency
|
||||
db: Optional database session for async operations
|
||||
anime_service: AnimeService for scanning operations
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: Status payload with success message, key, and db_id
|
||||
Dict[str, Any]: Status payload with:
|
||||
- status: "success" or "exists"
|
||||
- message: Human-readable status message
|
||||
- key: Series unique identifier
|
||||
- folder: Created folder path
|
||||
- db_id: Database ID (if saved to DB)
|
||||
- missing_episodes: Dict of missing episodes by season
|
||||
- total_missing: Total count of missing episodes
|
||||
|
||||
Raises:
|
||||
HTTPException: If adding the series fails or link is invalid
|
||||
"""
|
||||
try:
|
||||
# Validate inputs
|
||||
# Step A: Validate inputs
|
||||
if not request.link or not request.link.strip():
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
@@ -679,28 +693,40 @@ async def add_series(
|
||||
detail="Could not extract series key from link",
|
||||
)
|
||||
|
||||
# Create folder from name (filesystem-friendly)
|
||||
folder = request.name.strip()
|
||||
db_id = None
|
||||
# Step B: Create sanitized folder name from display name
|
||||
name = request.name.strip()
|
||||
try:
|
||||
folder = sanitize_folder_name(name)
|
||||
except ValueError as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=f"Invalid series name for folder: {str(e)}",
|
||||
)
|
||||
|
||||
# Try to save to database if available
|
||||
db_id = None
|
||||
missing_episodes: dict = {}
|
||||
scan_error: Optional[str] = None
|
||||
|
||||
# Step C: Save to database if available
|
||||
if db is not None:
|
||||
# Check if series already exists in database
|
||||
existing = await AnimeSeriesService.get_by_key(db, key)
|
||||
if existing:
|
||||
return {
|
||||
"status": "exists",
|
||||
"message": f"Series already exists: {request.name}",
|
||||
"message": f"Series already exists: {name}",
|
||||
"key": key,
|
||||
"folder": existing.folder,
|
||||
"db_id": existing.id
|
||||
"db_id": existing.id,
|
||||
"missing_episodes": {},
|
||||
"total_missing": 0
|
||||
}
|
||||
|
||||
# Save to database using AnimeSeriesService
|
||||
anime_series = await AnimeSeriesService.create(
|
||||
db=db,
|
||||
key=key,
|
||||
name=request.name.strip(),
|
||||
name=name,
|
||||
site="aniworld.to",
|
||||
folder=folder,
|
||||
)
|
||||
@@ -708,41 +734,109 @@ async def add_series(
|
||||
|
||||
logger.info(
|
||||
"Added series to database: %s (key=%s, db_id=%d)",
|
||||
request.name,
|
||||
name,
|
||||
key,
|
||||
db_id
|
||||
)
|
||||
|
||||
# Also add to in-memory cache if series_app has the list attribute
|
||||
# Step D: Create folder on disk and add to SerieList
|
||||
folder_path = None
|
||||
if series_app and hasattr(series_app, "list"):
|
||||
serie = Serie(
|
||||
key=key,
|
||||
name=request.name.strip(),
|
||||
name=name,
|
||||
site="aniworld.to",
|
||||
folder=folder,
|
||||
episodeDict={}
|
||||
)
|
||||
# Add to in-memory cache
|
||||
if hasattr(series_app.list, 'keyDict'):
|
||||
# Direct update without file saving
|
||||
series_app.list.keyDict[key] = serie
|
||||
elif hasattr(series_app.list, 'add'):
|
||||
# Legacy: use add method (may create file with deprecation warning)
|
||||
|
||||
# Add to SerieList - this creates the folder with sanitized name
|
||||
if hasattr(series_app.list, 'add'):
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
series_app.list.add(serie)
|
||||
folder_path = series_app.list.add(serie, use_sanitized_folder=True)
|
||||
# Update folder to reflect what was actually created
|
||||
folder = serie.folder
|
||||
elif hasattr(series_app.list, 'keyDict'):
|
||||
# Manual folder creation and cache update
|
||||
if hasattr(series_app.list, 'directory'):
|
||||
folder_path = os.path.join(series_app.list.directory, folder)
|
||||
os.makedirs(folder_path, exist_ok=True)
|
||||
series_app.list.keyDict[key] = serie
|
||||
|
||||
logger.info(
|
||||
"Created folder for series: %s at %s",
|
||||
name,
|
||||
folder_path or folder
|
||||
)
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": f"Successfully added series: {request.name}",
|
||||
"key": key,
|
||||
"folder": folder,
|
||||
"db_id": db_id
|
||||
# Step E: Trigger targeted scan for missing episodes
|
||||
try:
|
||||
if series_app and hasattr(series_app, "scanner"):
|
||||
missing_episodes = series_app.scanner.scan_single_series(
|
||||
key=key,
|
||||
folder=folder
|
||||
)
|
||||
logger.info(
|
||||
"Targeted scan completed for %s: found %d missing episodes",
|
||||
key,
|
||||
sum(len(eps) for eps in missing_episodes.values())
|
||||
)
|
||||
|
||||
# Update the serie in keyDict with the missing episodes
|
||||
if hasattr(series_app, "list") and hasattr(series_app.list, "keyDict"):
|
||||
if key in series_app.list.keyDict:
|
||||
series_app.list.keyDict[key].episodeDict = missing_episodes
|
||||
elif anime_service:
|
||||
# Fallback to anime_service if scanner not directly available
|
||||
# Note: This is a lightweight scan, not a full rescan
|
||||
logger.info(
|
||||
"Scanner not directly available, "
|
||||
"skipping targeted scan for %s",
|
||||
key
|
||||
)
|
||||
except Exception as e:
|
||||
# Scan failure is not critical - series was still added
|
||||
scan_error = str(e)
|
||||
logger.warning(
|
||||
"Targeted scan failed for %s: %s (series still added)",
|
||||
key,
|
||||
e
|
||||
)
|
||||
|
||||
# Convert missing episodes keys to strings for JSON serialization
|
||||
missing_episodes_serializable = {
|
||||
str(season): episodes
|
||||
for season, episodes in missing_episodes.items()
|
||||
}
|
||||
|
||||
# Calculate total missing
|
||||
total_missing = sum(len(eps) for eps in missing_episodes.values())
|
||||
|
||||
# Step F: Return response
|
||||
response = {
|
||||
"status": "success",
|
||||
"message": f"Successfully added series: {name}",
|
||||
"key": key,
|
||||
"folder": folder_path or folder,
|
||||
"db_id": db_id,
|
||||
"missing_episodes": missing_episodes_serializable,
|
||||
"total_missing": total_missing
|
||||
}
|
||||
|
||||
if scan_error:
|
||||
response["scan_warning"] = f"Scan partially failed: {scan_error}"
|
||||
|
||||
return response
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.error("Failed to add series: %s", exc, exc_info=True)
|
||||
|
||||
# Attempt to rollback database entry if folder creation failed
|
||||
# (This is a best-effort cleanup)
|
||||
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to add series: {str(exc)}",
|
||||
|
||||
180
src/server/utils/filesystem.py
Normal file
180
src/server/utils/filesystem.py
Normal file
@@ -0,0 +1,180 @@
|
||||
"""Filesystem utilities for safe file and folder operations.
|
||||
|
||||
This module provides utility functions for safely handling filesystem
|
||||
operations, including sanitizing folder names and path validation.
|
||||
|
||||
Security:
|
||||
- All functions sanitize inputs to prevent path traversal attacks
|
||||
- Invalid filesystem characters are removed or replaced
|
||||
- Unicode characters are preserved for international titles
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import unicodedata
|
||||
from typing import Optional
|
||||
|
||||
# Characters that are invalid in filesystem paths across platforms
|
||||
# Windows: < > : " / \ | ? *
|
||||
# Linux/Mac: / and null byte
|
||||
INVALID_PATH_CHARS = '<>:"/\\|?*\x00'
|
||||
|
||||
# Additional characters to remove for cleaner folder names
|
||||
EXTRA_CLEANUP_CHARS = '\r\n\t'
|
||||
|
||||
# Maximum folder name length (conservative for cross-platform compatibility)
|
||||
MAX_FOLDER_NAME_LENGTH = 200
|
||||
|
||||
|
||||
def sanitize_folder_name(
|
||||
name: str,
|
||||
replacement: str = "",
|
||||
max_length: Optional[int] = None,
|
||||
) -> str:
|
||||
"""Sanitize a string for use as a filesystem folder name.
|
||||
|
||||
Removes or replaces characters that are invalid for filesystems while
|
||||
preserving Unicode characters (for Japanese/Chinese titles, etc.).
|
||||
|
||||
Args:
|
||||
name: The string to sanitize (e.g., anime display name)
|
||||
replacement: Character to replace invalid chars with (default: "")
|
||||
max_length: Maximum length for the result (default: MAX_FOLDER_NAME_LENGTH)
|
||||
|
||||
Returns:
|
||||
str: A filesystem-safe folder name
|
||||
|
||||
Raises:
|
||||
ValueError: If name is None, empty, or results in empty string
|
||||
|
||||
Examples:
|
||||
>>> sanitize_folder_name("Attack on Titan: Final Season")
|
||||
'Attack on Titan Final Season'
|
||||
>>> sanitize_folder_name("What If...?")
|
||||
'What If...'
|
||||
>>> sanitize_folder_name("Re:Zero")
|
||||
'ReZero'
|
||||
>>> sanitize_folder_name("日本語タイトル")
|
||||
'日本語タイトル'
|
||||
"""
|
||||
if name is None:
|
||||
raise ValueError("Folder name cannot be None")
|
||||
|
||||
# Strip leading/trailing whitespace
|
||||
name = name.strip()
|
||||
|
||||
if not name:
|
||||
raise ValueError("Folder name cannot be empty")
|
||||
|
||||
max_len = max_length or MAX_FOLDER_NAME_LENGTH
|
||||
|
||||
# Normalize Unicode characters (NFC form for consistency)
|
||||
name = unicodedata.normalize('NFC', name)
|
||||
|
||||
# Remove invalid filesystem characters
|
||||
for char in INVALID_PATH_CHARS:
|
||||
name = name.replace(char, replacement)
|
||||
|
||||
# Remove extra cleanup characters
|
||||
for char in EXTRA_CLEANUP_CHARS:
|
||||
name = name.replace(char, replacement)
|
||||
|
||||
# Remove control characters but preserve Unicode
|
||||
name = ''.join(
|
||||
char for char in name
|
||||
if not unicodedata.category(char).startswith('C')
|
||||
or char == ' ' # Preserve spaces
|
||||
)
|
||||
|
||||
# Collapse multiple consecutive spaces
|
||||
name = re.sub(r' +', ' ', name)
|
||||
|
||||
# Remove leading/trailing dots and whitespace
|
||||
# (dots at start can make folders hidden on Unix)
|
||||
name = name.strip('. ')
|
||||
|
||||
# Handle edge case: all characters were invalid
|
||||
if not name:
|
||||
raise ValueError(
|
||||
"Folder name contains only invalid characters"
|
||||
)
|
||||
|
||||
# Truncate to max length while avoiding breaking in middle of word
|
||||
if len(name) > max_len:
|
||||
# Try to truncate at a word boundary
|
||||
truncated = name[:max_len]
|
||||
last_space = truncated.rfind(' ')
|
||||
if last_space > max_len // 2: # Only if we don't lose too much
|
||||
truncated = truncated[:last_space]
|
||||
name = truncated.rstrip()
|
||||
|
||||
return name
|
||||
|
||||
|
||||
def is_safe_path(base_path: str, target_path: str) -> bool:
|
||||
"""Check if target_path is safely within base_path.
|
||||
|
||||
Prevents path traversal attacks by ensuring the target path
|
||||
is actually within the base path after resolution.
|
||||
|
||||
Args:
|
||||
base_path: The base directory that should contain the target
|
||||
target_path: The path to validate
|
||||
|
||||
Returns:
|
||||
bool: True if target_path is safely within base_path
|
||||
|
||||
Example:
|
||||
>>> is_safe_path("/anime", "/anime/Attack on Titan")
|
||||
True
|
||||
>>> is_safe_path("/anime", "/anime/../etc/passwd")
|
||||
False
|
||||
"""
|
||||
# Resolve to absolute paths
|
||||
base_resolved = os.path.abspath(base_path)
|
||||
target_resolved = os.path.abspath(target_path)
|
||||
|
||||
# Check that target starts with base (with trailing separator)
|
||||
base_with_sep = base_resolved + os.sep
|
||||
return (
|
||||
target_resolved == base_resolved or
|
||||
target_resolved.startswith(base_with_sep)
|
||||
)
|
||||
|
||||
|
||||
def create_safe_folder(
|
||||
base_path: str,
|
||||
folder_name: str,
|
||||
exist_ok: bool = True,
|
||||
) -> str:
|
||||
"""Create a folder with a sanitized name safely within base_path.
|
||||
|
||||
Args:
|
||||
base_path: Base directory to create folder within
|
||||
folder_name: Unsanitized folder name
|
||||
exist_ok: If True, don't raise error if folder exists
|
||||
|
||||
Returns:
|
||||
str: Full path to the created folder
|
||||
|
||||
Raises:
|
||||
ValueError: If resulting path would be outside base_path
|
||||
OSError: If folder creation fails
|
||||
"""
|
||||
# Sanitize the folder name
|
||||
safe_name = sanitize_folder_name(folder_name)
|
||||
|
||||
# Construct full path
|
||||
full_path = os.path.join(base_path, safe_name)
|
||||
|
||||
# Validate path safety
|
||||
if not is_safe_path(base_path, full_path):
|
||||
raise ValueError(
|
||||
f"Folder name '{folder_name}' would create path outside "
|
||||
f"base directory"
|
||||
)
|
||||
|
||||
# Create the folder
|
||||
os.makedirs(full_path, exist_ok=exist_ok)
|
||||
|
||||
return full_path
|
||||
Reference in New Issue
Block a user