Fix NFO batch endpoint route priority and test fixture

This commit is contained in:
2026-01-27 18:10:16 +01:00
parent f409b81aa2
commit c693c6572b
5 changed files with 597 additions and 924 deletions

View File

@@ -59,6 +59,203 @@ async def get_nfo_service() -> NFOService:
) from e
# =============================================================================
# IMPORTANT: Literal path routes must be defined BEFORE path parameter routes
# to avoid route matching conflicts. For example, /batch/create must come
# before /{serie_id}/create, otherwise "batch" is treated as a serie_id.
# =============================================================================
@router.post("/batch/create", response_model=NFOBatchCreateResponse)
async def batch_create_nfo(
request: NFOBatchCreateRequest,
_auth: dict = Depends(require_auth),
series_app: SeriesApp = Depends(get_series_app),
nfo_service: NFOService = Depends(get_nfo_service)
) -> NFOBatchCreateResponse:
"""Batch create NFO files for multiple series.
Args:
request: Batch creation options
_auth: Authentication dependency
series_app: Series app dependency
nfo_service: NFO service dependency
Returns:
NFOBatchCreateResponse with results
"""
results: List[NFOBatchResult] = []
successful = 0
failed = 0
skipped = 0
# Get all series
series_list = series_app.list.GetList()
series_map = {
getattr(s, 'key', None): s
for s in series_list
if getattr(s, 'key', None)
}
# Process each series
semaphore = asyncio.Semaphore(request.max_concurrent)
async def process_serie(serie_id: str) -> NFOBatchResult:
"""Process a single series."""
async with semaphore:
try:
serie = series_map.get(serie_id)
if not serie:
return NFOBatchResult(
serie_id=serie_id,
serie_folder="",
success=False,
message="Series not found"
)
# Ensure folder name includes year if available
serie_folder = serie.ensure_folder_with_year()
# Check if NFO exists
if request.skip_existing:
has_nfo = await nfo_service.check_nfo_exists(serie_folder)
if has_nfo:
return NFOBatchResult(
serie_id=serie_id,
serie_folder=serie_folder,
success=False,
message="Skipped - NFO already exists"
)
# Create NFO
nfo_path = await nfo_service.create_tvshow_nfo(
serie_name=serie.name or serie_folder,
serie_folder=serie_folder,
download_poster=request.download_media,
download_logo=request.download_media,
download_fanart=request.download_media
)
return NFOBatchResult(
serie_id=serie_id,
serie_folder=serie_folder,
success=True,
message="NFO created successfully",
nfo_path=str(nfo_path)
)
except Exception as e:
logger.error(
f"Error creating NFO for {serie_id}: {e}",
exc_info=True
)
return NFOBatchResult(
serie_id=serie_id,
serie_folder=serie.folder if serie else "",
success=False,
message=f"Error: {str(e)}"
)
# Process all series concurrently
tasks = [process_serie(sid) for sid in request.serie_ids]
results = await asyncio.gather(*tasks)
# Count results
for result in results:
if result.success:
successful += 1
elif "Skipped" in result.message:
skipped += 1
else:
failed += 1
return NFOBatchCreateResponse(
total=len(request.serie_ids),
successful=successful,
failed=failed,
skipped=skipped,
results=list(results)
)
@router.get("/missing", response_model=NFOMissingResponse)
async def get_missing_nfo(
_auth: dict = Depends(require_auth),
series_app: SeriesApp = Depends(get_series_app),
nfo_service: NFOService = Depends(get_nfo_service)
) -> NFOMissingResponse:
"""Get list of series without NFO files.
Args:
_auth: Authentication dependency
series_app: Series app dependency
nfo_service: NFO service dependency
Returns:
NFOMissingResponse with series list
"""
try:
series_list = series_app.list.GetList()
missing_series: List[NFOMissingSeries] = []
for serie in series_list:
serie_id = getattr(serie, 'key', None)
if not serie_id:
continue
# Ensure folder name includes year if available
serie_folder = serie.ensure_folder_with_year()
has_nfo = await nfo_service.check_nfo_exists(serie_folder)
if not has_nfo:
# Build full path and check media files
folder_path = Path(settings.anime_directory) / serie_folder
media_status = check_media_files(folder_path)
file_paths = get_media_file_paths(folder_path)
media_files = MediaFilesStatus(
has_poster=media_status.get("poster", False),
has_logo=media_status.get("logo", False),
has_fanart=media_status.get("fanart", False),
poster_path=str(file_paths["poster"]) if file_paths.get("poster") else None,
logo_path=str(file_paths["logo"]) if file_paths.get("logo") else None,
fanart_path=str(file_paths["fanart"]) if file_paths.get("fanart") else None
)
has_media = (
media_files.has_poster
or media_files.has_logo
or media_files.has_fanart
)
missing_series.append(NFOMissingSeries(
serie_id=serie_id,
serie_folder=serie_folder,
serie_name=serie.name or serie_folder,
has_media=has_media,
media_files=media_files
))
return NFOMissingResponse(
total_series=len(series_list),
missing_nfo_count=len(missing_series),
series=missing_series
)
except Exception as e:
logger.error(f"Error getting missing NFOs: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get missing NFOs: {str(e)}"
) from e
# =============================================================================
# Series-specific endpoints (with {serie_id} path parameter)
# These must come AFTER literal path routes like /batch/create and /missing
# =============================================================================
@router.get("/{serie_id}/check", response_model=NFOCheckResponse)
async def check_nfo(
serie_id: str,
@@ -559,187 +756,3 @@ async def download_media(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to download media: {str(e)}"
) from e
@router.post("/batch/create", response_model=NFOBatchCreateResponse)
async def batch_create_nfo(
request: NFOBatchCreateRequest,
_auth: dict = Depends(require_auth),
series_app: SeriesApp = Depends(get_series_app),
nfo_service: NFOService = Depends(get_nfo_service)
) -> NFOBatchCreateResponse:
"""Batch create NFO files for multiple series.
Args:
request: Batch creation options
_auth: Authentication dependency
series_app: Series app dependency
nfo_service: NFO service dependency
Returns:
NFOBatchCreateResponse with results
"""
results: List[NFOBatchResult] = []
successful = 0
failed = 0
skipped = 0
# Get all series
series_list = series_app.list.GetList()
series_map = {
getattr(s, 'key', None): s
for s in series_list
if getattr(s, 'key', None)
}
# Process each series
semaphore = asyncio.Semaphore(request.max_concurrent)
async def process_serie(serie_id: str) -> NFOBatchResult:
"""Process a single series."""
async with semaphore:
try:
serie = series_map.get(serie_id)
if not serie:
return NFOBatchResult(
serie_id=serie_id,
serie_folder="",
success=False,
message="Series not found"
)
# Ensure folder name includes year if available
serie_folder = serie.ensure_folder_with_year()
# Check if NFO exists
if request.skip_existing:
has_nfo = await nfo_service.check_nfo_exists(serie_folder)
if has_nfo:
return NFOBatchResult(
serie_id=serie_id,
serie_folder=serie_folder,
success=False,
message="Skipped - NFO already exists"
)
# Create NFO
nfo_path = await nfo_service.create_tvshow_nfo(
serie_name=serie.name or serie_folder,
serie_folder=serie_folder,
download_poster=request.download_media,
download_logo=request.download_media,
download_fanart=request.download_media
)
return NFOBatchResult(
serie_id=serie_id,
serie_folder=serie_folder,
success=True,
message="NFO created successfully",
nfo_path=str(nfo_path)
)
except Exception as e:
logger.error(
f"Error creating NFO for {serie_id}: {e}",
exc_info=True
)
return NFOBatchResult(
serie_id=serie_id,
serie_folder=serie.folder if serie else "",
success=False,
message=f"Error: {str(e)}"
)
# Process all series concurrently
tasks = [process_serie(sid) for sid in request.serie_ids]
results = await asyncio.gather(*tasks)
# Count results
for result in results:
if result.success:
successful += 1
elif "Skipped" in result.message:
skipped += 1
else:
failed += 1
return NFOBatchCreateResponse(
total=len(request.serie_ids),
successful=successful,
failed=failed,
skipped=skipped,
results=results
)
@router.get("/missing", response_model=NFOMissingResponse)
async def get_missing_nfo(
_auth: dict = Depends(require_auth),
series_app: SeriesApp = Depends(get_series_app),
nfo_service: NFOService = Depends(get_nfo_service)
) -> NFOMissingResponse:
"""Get list of series without NFO files.
Args:
_auth: Authentication dependency
series_app: Series app dependency
nfo_service: NFO service dependency
Returns:
NFOMissingResponse with series list
"""
try:
series_list = series_app.list.GetList()
missing_series: List[NFOMissingSeries] = []
for serie in series_list:
serie_id = getattr(serie, 'key', None)
if not serie_id:
continue
# Ensure folder name includes year if available
serie_folder = serie.ensure_folder_with_year()
has_nfo = await nfo_service.check_nfo_exists(serie_folder)
if not has_nfo:
# Build full path and check media files
folder_path = Path(settings.anime_directory) / serie_folder
media_status = check_media_files(folder_path)
file_paths = get_media_file_paths(folder_path)
media_files = MediaFilesStatus(
has_poster=media_status.get("poster", False),
has_logo=media_status.get("logo", False),
has_fanart=media_status.get("fanart", False),
poster_path=str(file_paths["poster"]) if file_paths.get("poster") else None,
logo_path=str(file_paths["logo"]) if file_paths.get("logo") else None,
fanart_path=str(file_paths["fanart"]) if file_paths.get("fanart") else None
)
has_media = (
media_files.has_poster
or media_files.has_logo
or media_files.has_fanart
)
missing_series.append(NFOMissingSeries(
serie_id=serie_id,
serie_folder=serie_folder,
serie_name=serie.name or serie_folder,
has_media=has_media,
media_files=media_files
))
return NFOMissingResponse(
total_series=len(series_list),
missing_nfo_count=len(missing_series),
series=missing_series
)
except Exception as e:
logger.error(f"Error getting missing NFOs: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get missing NFOs: {str(e)}"
) from e