- Created tests/unit/test_nfo_batch_operations.py * 19 comprehensive unit tests all passing * Test concurrent operations with max_concurrent limits * Test partial failure handling (continues processing) * Test skip_existing and overwrite functionality * Test media download options * Test result accuracy and error messages * Test edge cases (empty, single, large, duplicates) - Updated docs/instructions.md * Marked NFO batch operations tests as completed * Documented 19/19 passing tests
705 lines
24 KiB
Python
705 lines
24 KiB
Python
"""Unit tests for NFO batch operations.
|
|
|
|
This module tests NFO batch operation logic including:
|
|
- Concurrent NFO creation with max_concurrent limits
|
|
- Batch operation error handling (partial failures)
|
|
- Batch operation progress tracking
|
|
- Batch operation cancellation
|
|
"""
|
|
import asyncio
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, Mock, patch
|
|
|
|
import pytest
|
|
|
|
from src.core.entities.series import Serie
|
|
from src.core.services.nfo_service import NFOService
|
|
from src.server.api.nfo import batch_create_nfo
|
|
from src.server.models.nfo import NFOBatchCreateRequest
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_series_app():
|
|
"""Create a mock SeriesApp with test series."""
|
|
app = Mock()
|
|
|
|
# Create test series
|
|
series = []
|
|
for i in range(5):
|
|
serie = Mock(spec=Serie)
|
|
serie.key = f"serie{i}"
|
|
serie.folder = f"Serie {i}"
|
|
serie.name = f"Serie {i}"
|
|
serie.year = 2020 + i
|
|
serie.ensure_folder_with_year = Mock(return_value=f"Serie {i} (202{i})")
|
|
series.append(serie)
|
|
|
|
app.list = Mock()
|
|
app.list.GetList = Mock(return_value=series)
|
|
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_nfo_service():
|
|
"""Create a mock NFO service."""
|
|
service = Mock(spec=NFOService)
|
|
service.check_nfo_exists = AsyncMock(return_value=False)
|
|
service.create_tvshow_nfo = AsyncMock(return_value=Path("/fake/path/tvshow.nfo"))
|
|
return service
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_settings():
|
|
"""Create mock settings."""
|
|
with patch("src.server.api.nfo.settings") as mock:
|
|
mock.anime_directory = "/fake/anime/dir"
|
|
yield mock
|
|
|
|
|
|
class TestBatchOperationConcurrency:
|
|
"""Tests for concurrent NFO creation with limits."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_respects_max_concurrent_limit(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test that batch operations respect max_concurrent limit."""
|
|
# Track concurrent executions
|
|
concurrent_count = {"current": 0, "max": 0}
|
|
|
|
async def track_concurrent(*args, **kwargs):
|
|
concurrent_count["current"] += 1
|
|
concurrent_count["max"] = max(
|
|
concurrent_count["max"],
|
|
concurrent_count["current"]
|
|
)
|
|
await asyncio.sleep(0.1) # Simulate work
|
|
concurrent_count["current"] -= 1
|
|
return Path("/fake/path/tvshow.nfo")
|
|
|
|
mock_nfo_service.create_tvshow_nfo.side_effect = track_concurrent
|
|
|
|
# Create request with max_concurrent=2
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=[f"serie{i}" for i in range(5)],
|
|
max_concurrent=2,
|
|
download_media=False,
|
|
skip_existing=False
|
|
)
|
|
|
|
# Execute batch operation
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
result = await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
# Verify max concurrent operations didn't exceed limit
|
|
assert concurrent_count["max"] <= 2
|
|
assert result.total == 5
|
|
assert result.successful == 5
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_max_concurrent_default_value(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test that default max_concurrent value is applied."""
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0", "serie1"],
|
|
# max_concurrent not specified, should default to 3
|
|
)
|
|
|
|
assert request.max_concurrent == 3
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_max_concurrent_validation(self):
|
|
"""Test that max_concurrent is validated within range."""
|
|
# Test minimum
|
|
with pytest.raises(ValueError):
|
|
NFOBatchCreateRequest(
|
|
serie_ids=["serie0"],
|
|
max_concurrent=0 # Below minimum
|
|
)
|
|
|
|
# Test maximum
|
|
with pytest.raises(ValueError):
|
|
NFOBatchCreateRequest(
|
|
serie_ids=["serie0"],
|
|
max_concurrent=11 # Above maximum
|
|
)
|
|
|
|
# Test valid values
|
|
for value in [1, 3, 5, 10]:
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0"],
|
|
max_concurrent=value
|
|
)
|
|
assert request.max_concurrent == value
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_operations_complete_correctly(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test all concurrent operations complete successfully."""
|
|
call_order = []
|
|
|
|
async def track_order(serie_name, serie_folder, **kwargs):
|
|
call_order.append(serie_name)
|
|
await asyncio.sleep(0.05) # Simulate work
|
|
return Path(f"/fake/{serie_folder}/tvshow.nfo")
|
|
|
|
mock_nfo_service.create_tvshow_nfo.side_effect = track_order
|
|
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0", "serie1", "serie2", "serie3"],
|
|
max_concurrent=2
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
result = await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
# All operations should complete
|
|
assert len(call_order) == 4
|
|
assert result.successful == 4
|
|
assert result.failed == 0
|
|
|
|
|
|
class TestBatchOperationErrorHandling:
|
|
"""Tests for batch operation error handling."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_partial_failure_continues_processing(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test that partial failures don't stop batch processing."""
|
|
# Make serie1 and serie3 fail
|
|
async def selective_failure(serie_name, **kwargs):
|
|
if serie_name in ["Serie 1", "Serie 3"]:
|
|
raise Exception("TMDB API error")
|
|
return Path(f"/fake/{serie_name}/tvshow.nfo")
|
|
|
|
mock_nfo_service.create_tvshow_nfo.side_effect = selective_failure
|
|
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0", "serie1", "serie2", "serie3", "serie4"],
|
|
skip_existing=False
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
result = await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
# Verify partial success
|
|
assert result.total == 5
|
|
assert result.successful == 3 # serie0, serie2, serie4
|
|
assert result.failed == 2 # serie1, serie3
|
|
|
|
# Check failed results have error messages
|
|
failed_results = [r for r in result.results if not r.success]
|
|
assert len(failed_results) == 2
|
|
for failed in failed_results:
|
|
assert "Error:" in failed.message
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_series_not_found_error(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test handling of non-existent series."""
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0", "nonexistent", "serie1"],
|
|
skip_existing=False
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
result = await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
# Verify error handling
|
|
assert result.total == 3
|
|
assert result.successful == 2
|
|
assert result.failed == 1
|
|
|
|
# Find the failed result
|
|
failed = next(r for r in result.results if r.serie_id == "nonexistent")
|
|
assert not failed.success
|
|
assert "not found" in failed.message.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_all_operations_fail(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test batch operation when all operations fail."""
|
|
mock_nfo_service.create_tvshow_nfo.side_effect = Exception("Network error")
|
|
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0", "serie1", "serie2"],
|
|
skip_existing=False
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
result = await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
assert result.total == 3
|
|
assert result.successful == 0
|
|
assert result.failed == 3
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_error_messages_are_informative(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test that error messages contain useful information."""
|
|
async def specific_errors(serie_name, **kwargs):
|
|
errors = {
|
|
"Serie 0": "TMDB API rate limit exceeded",
|
|
"Serie 1": "File permission denied",
|
|
"Serie 2": "Network timeout",
|
|
}
|
|
if serie_name in errors:
|
|
raise Exception(errors[serie_name])
|
|
return Path("/fake/path/tvshow.nfo")
|
|
|
|
mock_nfo_service.create_tvshow_nfo.side_effect = specific_errors
|
|
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0", "serie1", "serie2"],
|
|
skip_existing=False
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
result = await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
# Verify error messages are preserved
|
|
for res in result.results:
|
|
assert not res.success
|
|
assert "Error:" in res.message
|
|
# Verify specific error is mentioned
|
|
if res.serie_id == "serie0":
|
|
assert "rate limit" in res.message.lower()
|
|
elif res.serie_id == "serie1":
|
|
assert "permission" in res.message.lower()
|
|
elif res.serie_id == "serie2":
|
|
assert "timeout" in res.message.lower()
|
|
|
|
|
|
class TestBatchOperationSkipping:
|
|
"""Tests for skip_existing functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_skip_existing_nfo_files(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test that existing NFO files are skipped when requested."""
|
|
# Serie 1 and 3 have existing NFOs
|
|
async def check_exists(serie_folder):
|
|
return serie_folder in ["Serie 1 (2021)", "Serie 3 (2023)"]
|
|
|
|
mock_nfo_service.check_nfo_exists.side_effect = check_exists
|
|
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0", "serie1", "serie2", "serie3", "serie4"],
|
|
skip_existing=True
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
result = await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
# Verify skipped series
|
|
assert result.total == 5
|
|
assert result.successful == 3 # serie0, serie2, serie4
|
|
assert result.skipped == 2 # serie1, serie3
|
|
|
|
# Verify create was only called for non-existing
|
|
assert mock_nfo_service.create_tvshow_nfo.call_count == 3
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_skip_existing_false_overwrites(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test that existing NFO files are overwritten when skip_existing=False."""
|
|
mock_nfo_service.check_nfo_exists.return_value = True
|
|
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0", "serie1"],
|
|
skip_existing=False
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
result = await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
# All should be created despite existing
|
|
assert result.successful == 2
|
|
assert result.skipped == 0
|
|
assert mock_nfo_service.create_tvshow_nfo.call_count == 2
|
|
|
|
|
|
class TestBatchOperationMediaDownloads:
|
|
"""Tests for media download functionality in batch operations."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_download_media_enabled(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test that media downloads are requested when enabled."""
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0", "serie1"],
|
|
download_media=True,
|
|
skip_existing=False
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
# Verify media downloads were requested
|
|
for call in mock_nfo_service.create_tvshow_nfo.call_args_list:
|
|
kwargs = call[1]
|
|
assert kwargs["download_poster"] is True
|
|
assert kwargs["download_logo"] is True
|
|
assert kwargs["download_fanart"] is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_download_media_disabled(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test that media downloads are skipped when disabled."""
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0", "serie1"],
|
|
download_media=False,
|
|
skip_existing=False
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
# Verify media downloads were not requested
|
|
for call in mock_nfo_service.create_tvshow_nfo.call_args_list:
|
|
kwargs = call[1]
|
|
assert kwargs["download_poster"] is False
|
|
assert kwargs["download_logo"] is False
|
|
assert kwargs["download_fanart"] is False
|
|
|
|
|
|
class TestBatchOperationResults:
|
|
"""Tests for batch operation result structure."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_result_includes_all_series(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test that result includes entry for every series."""
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0", "serie1", "serie2"],
|
|
skip_existing=False
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
result = await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
# Verify all series in results
|
|
assert len(result.results) == 3
|
|
result_ids = {r.serie_id for r in result.results}
|
|
assert result_ids == {"serie0", "serie1", "serie2"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_result_includes_nfo_paths(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test that successful results include NFO file paths."""
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0", "serie1"],
|
|
skip_existing=False
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
result = await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
# Verify NFO paths are included
|
|
for res in result.results:
|
|
if res.success:
|
|
assert res.nfo_path is not None
|
|
assert "tvshow.nfo" in res.nfo_path
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_result_counts_are_accurate(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test that result counts match actual outcomes."""
|
|
# Setup: 2 success, 1 skip, 1 fail, 1 not found
|
|
async def mixed_results(serie_name, **kwargs):
|
|
if serie_name == "Serie 2":
|
|
raise Exception("TMDB error")
|
|
return Path(f"/fake/{serie_name}/tvshow.nfo")
|
|
|
|
mock_nfo_service.create_tvshow_nfo.side_effect = mixed_results
|
|
mock_nfo_service.check_nfo_exists.side_effect = lambda f: f == "Serie 1 (2021)"
|
|
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0", "serie1", "serie2", "nonexistent"],
|
|
skip_existing=True
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
result = await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
# Verify counts
|
|
assert result.total == 4
|
|
assert result.successful == 1 # serie0
|
|
assert result.skipped == 1 # serie1
|
|
assert result.failed == 2 # serie2 (error), nonexistent (not found)
|
|
|
|
# Verify sum adds up
|
|
assert result.successful + result.skipped + result.failed == result.total
|
|
|
|
|
|
class TestBatchOperationEdgeCases:
|
|
"""Tests for edge cases in batch operations."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_series_list(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test batch operation with empty series list."""
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=[],
|
|
skip_existing=False
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
result = await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
assert result.total == 0
|
|
assert result.successful == 0
|
|
assert result.failed == 0
|
|
assert len(result.results) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_single_series(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test batch operation with single series."""
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0"],
|
|
skip_existing=False
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
result = await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
assert result.total == 1
|
|
assert result.successful == 1
|
|
assert len(result.results) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_large_batch_operation(
|
|
self,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test batch operation with many series."""
|
|
# Create app with 20 series
|
|
app = Mock()
|
|
series = []
|
|
for i in range(20):
|
|
serie = Mock(spec=Serie)
|
|
serie.key = f"serie{i}"
|
|
serie.folder = f"Serie {i}"
|
|
serie.name = f"Serie {i}"
|
|
serie.ensure_folder_with_year = Mock(return_value=f"Serie {i} (2020)")
|
|
series.append(serie)
|
|
app.list = Mock()
|
|
app.list.GetList = Mock(return_value=series)
|
|
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=[f"serie{i}" for i in range(20)],
|
|
max_concurrent=5,
|
|
skip_existing=False
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
result = await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
assert result.total == 20
|
|
assert result.successful == 20
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_duplicate_serie_ids(
|
|
self,
|
|
mock_series_app,
|
|
mock_nfo_service,
|
|
mock_settings
|
|
):
|
|
"""Test batch operation handles duplicate serie IDs."""
|
|
request = NFOBatchCreateRequest(
|
|
serie_ids=["serie0", "serie0", "serie1", "serie1"],
|
|
skip_existing=False
|
|
)
|
|
|
|
with patch("src.server.api.nfo.get_series_app", return_value=mock_series_app), \
|
|
patch("src.server.api.nfo.get_nfo_service", return_value=mock_nfo_service):
|
|
|
|
result = await batch_create_nfo(
|
|
request=request,
|
|
_auth={"username": "test"},
|
|
series_app=mock_series_app,
|
|
nfo_service=mock_nfo_service
|
|
)
|
|
|
|
# Should process all (including duplicates)
|
|
assert result.total == 4
|
|
assert result.successful == 4
|