- Add sanitize_folder_name utility for filesystem-safe folder names - Add sanitized_folder property to Serie entity - Update SerieList.add() to use sanitized display names for folders - Add scan_single_series() method for targeted episode scanning - Enhance add_series endpoint: DB save -> folder create -> targeted scan - Update response to include missing_episodes and total_missing - Add comprehensive unit tests for new functionality - Update API tests with proper mock support
419 lines
13 KiB
Python
419 lines
13 KiB
Python
"""Tests for anime API endpoints."""
|
|
import asyncio
|
|
|
|
import pytest
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
from src.server.api import anime as anime_module
|
|
from src.server.fastapi_app import app
|
|
from src.server.services.auth_service import auth_service
|
|
|
|
|
|
class FakeSerie:
|
|
"""Mock Serie object for testing.
|
|
|
|
Note on identifiers:
|
|
- key: Provider-assigned URL-safe identifier (e.g., 'attack-on-titan')
|
|
- folder: Filesystem folder name for metadata only (e.g., 'Attack on Titan (2013)')
|
|
|
|
The 'key' is the primary identifier used for all lookups and operations.
|
|
The 'folder' is metadata only, not used for identification.
|
|
"""
|
|
|
|
def __init__(self, key, name, folder, episodeDict=None):
|
|
"""Initialize fake serie.
|
|
|
|
Args:
|
|
key: Provider-assigned URL-safe key (primary identifier)
|
|
name: Display name for the series
|
|
folder: Filesystem folder name (metadata only)
|
|
episodeDict: Dictionary of missing episodes
|
|
"""
|
|
self.key = key
|
|
self.name = name
|
|
self.folder = folder
|
|
self.episodeDict = episodeDict or {}
|
|
self.site = "aniworld.to" # Add site attribute
|
|
|
|
|
|
class FakeSeriesApp:
|
|
"""Mock SeriesApp for testing."""
|
|
|
|
def __init__(self):
|
|
"""Initialize fake series app."""
|
|
self.list = self # Changed from self.List to self.list
|
|
self.scanner = FakeScanner() # Add fake scanner
|
|
self.directory = "/tmp/fake_anime"
|
|
self.keyDict = {} # Add keyDict for direct access
|
|
self._items = [
|
|
# Using realistic key values (URL-safe, lowercase, hyphenated)
|
|
FakeSerie("test-show-key", "Test Show", "Test Show (2023)", {1: [1, 2]}),
|
|
FakeSerie("complete-show-key", "Complete Show", "Complete Show (2022)", {}),
|
|
]
|
|
# Populate keyDict
|
|
for item in self._items:
|
|
self.keyDict[item.key] = item
|
|
|
|
def GetMissingEpisode(self):
|
|
"""Return series with missing episodes."""
|
|
return [s for s in self._items if s.episodeDict]
|
|
|
|
def GetList(self):
|
|
"""Return all series."""
|
|
return self._items
|
|
|
|
def ReScan(self, callback):
|
|
"""Trigger rescan with callback."""
|
|
callback()
|
|
|
|
def add(self, serie, use_sanitized_folder=True):
|
|
"""Add a serie to the list.
|
|
|
|
Args:
|
|
serie: The Serie instance to add
|
|
use_sanitized_folder: Whether to use sanitized folder name
|
|
|
|
Returns:
|
|
str: The folder path (fake path for testing)
|
|
"""
|
|
# Check if already exists
|
|
if not any(s.key == serie.key for s in self._items):
|
|
self._items.append(serie)
|
|
self.keyDict[serie.key] = serie
|
|
return f"/tmp/fake_anime/{serie.folder}"
|
|
|
|
async def search(self, query):
|
|
"""Search for series (async)."""
|
|
# Return mock search results
|
|
return [
|
|
{
|
|
"key": "test-result",
|
|
"name": "Test Search Result",
|
|
"site": "aniworld.to",
|
|
"folder": "test-result",
|
|
"link": "https://aniworld.to/anime/test",
|
|
"missing_episodes": {},
|
|
}
|
|
]
|
|
|
|
def refresh_series_list(self):
|
|
"""Refresh series list."""
|
|
pass
|
|
|
|
|
|
class FakeScanner:
|
|
"""Mock SerieScanner for testing."""
|
|
|
|
def scan_single_series(self, key, folder):
|
|
"""Mock scan that returns some fake missing episodes."""
|
|
return {1: [1, 2, 3], 2: [1, 2]}
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_auth_state():
|
|
"""Reset auth service state before each test."""
|
|
if hasattr(auth_service, '_failed'):
|
|
auth_service._failed.clear()
|
|
yield
|
|
if hasattr(auth_service, '_failed'):
|
|
auth_service._failed.clear()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def mock_series_app_dependency():
|
|
"""Override the series_app dependency with FakeSeriesApp."""
|
|
from src.server.utils.dependencies import get_series_app
|
|
|
|
fake_app = FakeSeriesApp()
|
|
app.dependency_overrides[get_series_app] = lambda: fake_app
|
|
|
|
yield fake_app
|
|
|
|
# Clean up
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
@pytest.fixture
|
|
async def authenticated_client():
|
|
"""Create authenticated async client."""
|
|
if not auth_service.is_configured():
|
|
auth_service.setup_master_password("TestPass123!")
|
|
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
# Login to get token
|
|
r = await client.post(
|
|
"/api/auth/login", json={"password": "TestPass123!"}
|
|
)
|
|
if r.status_code == 200:
|
|
token = r.json()["access_token"]
|
|
client.headers["Authorization"] = f"Bearer {token}"
|
|
yield client
|
|
|
|
|
|
def test_list_anime_direct_call():
|
|
"""Test list_anime function directly."""
|
|
fake = FakeSeriesApp()
|
|
result = asyncio.run(anime_module.list_anime(series_app=fake))
|
|
assert isinstance(result, list)
|
|
assert any(item.name == "Test Show" for item in result)
|
|
|
|
|
|
def test_get_anime_detail_direct_call():
|
|
"""Test get_anime function directly.
|
|
|
|
Uses the series key (test-show-key) for lookup, not the folder name.
|
|
"""
|
|
fake = FakeSeriesApp()
|
|
# Use the series key (primary identifier) for lookup
|
|
result = asyncio.run(
|
|
anime_module.get_anime("test-show-key", series_app=fake)
|
|
)
|
|
assert result.title == "Test Show"
|
|
assert "1-1" in result.episodes
|
|
|
|
|
|
def test_rescan_direct_call():
|
|
"""Test trigger_rescan function directly."""
|
|
from unittest.mock import AsyncMock
|
|
|
|
from src.server.services.anime_service import AnimeService
|
|
|
|
# Create a mock anime service
|
|
mock_anime_service = AsyncMock(spec=AnimeService)
|
|
mock_anime_service.rescan = AsyncMock()
|
|
|
|
result = asyncio.run(
|
|
anime_module.trigger_rescan(anime_service=mock_anime_service)
|
|
)
|
|
assert result["success"] is True
|
|
mock_anime_service.rescan.assert_called_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_anime_endpoint_unauthorized():
|
|
"""Test GET /api/anime without authentication.
|
|
|
|
Should return 401 since authentication is required.
|
|
"""
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
response = await client.get("/api/anime/")
|
|
# Should return 401 since this endpoint requires authentication
|
|
assert response.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rescan_endpoint_unauthorized():
|
|
"""Test POST /api/anime/rescan without authentication."""
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
response = await client.post("/api/anime/rescan")
|
|
# Should require auth
|
|
assert response.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_anime_endpoint_unauthorized():
|
|
"""Test GET /api/anime/search without authentication.
|
|
|
|
This endpoint is intentionally public for read-only access.
|
|
"""
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(
|
|
transport=transport, base_url="http://test"
|
|
) as client:
|
|
response = await client.get(
|
|
"/api/anime/search", params={"query": "test"}
|
|
)
|
|
# Should return 200 since this is a public endpoint
|
|
assert response.status_code == 200
|
|
assert isinstance(response.json(), list)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_anime_detail_endpoint_unauthorized():
|
|
"""Test GET /api/v1/anime/{id} without authentication."""
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
response = await client.get("/api/v1/anime/1")
|
|
# Should work or require auth
|
|
assert response.status_code in (200, 401, 404, 503)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_series_endpoint_unauthorized():
|
|
"""Test POST /api/anime/add without authentication."""
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
response = await client.post(
|
|
"/api/anime/add",
|
|
json={"link": "test-link", "name": "Test Anime"}
|
|
)
|
|
# Should require auth
|
|
assert response.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_series_endpoint_authenticated(authenticated_client):
|
|
"""Test POST /api/anime/add with authentication."""
|
|
response = await authenticated_client.post(
|
|
"/api/anime/add",
|
|
json={"link": "test-anime-link", "name": "Test New Anime"}
|
|
)
|
|
|
|
# The endpoint should succeed (returns 200 or may fail if series exists)
|
|
assert response.status_code in (200, 400)
|
|
data = response.json()
|
|
|
|
if response.status_code == 200:
|
|
assert data["status"] == "success"
|
|
assert "Test New Anime" in data["message"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_series_endpoint_empty_name(authenticated_client):
|
|
"""Test POST /api/anime/add with empty name."""
|
|
response = await authenticated_client.post(
|
|
"/api/anime/add",
|
|
json={"link": "test-link", "name": ""}
|
|
)
|
|
|
|
# Should return 400 for empty name
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert "name" in data["detail"].lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_series_endpoint_empty_link(authenticated_client):
|
|
"""Test POST /api/anime/add with empty link."""
|
|
response = await authenticated_client.post(
|
|
"/api/anime/add",
|
|
json={"link": "", "name": "Test Anime"}
|
|
)
|
|
|
|
# Should return 400 for empty link
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert "link" in data["detail"].lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_series_extracts_key_from_full_url(authenticated_client):
|
|
"""Test that add_series extracts key from full URL."""
|
|
response = await authenticated_client.post(
|
|
"/api/anime/add",
|
|
json={
|
|
"link": "https://aniworld.to/anime/stream/attack-on-titan",
|
|
"name": "Attack on Titan"
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["key"] == "attack-on-titan"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_series_sanitizes_folder_name(authenticated_client):
|
|
"""Test that add_series creates sanitized folder name."""
|
|
response = await authenticated_client.post(
|
|
"/api/anime/add",
|
|
json={
|
|
"link": "https://aniworld.to/anime/stream/rezero",
|
|
"name": "Re:Zero - Starting Life in Another World?"
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Folder should not contain invalid characters
|
|
folder = data["folder"]
|
|
assert ":" not in folder
|
|
assert "?" not in folder
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_series_returns_missing_episodes(authenticated_client):
|
|
"""Test that add_series returns missing episodes info."""
|
|
response = await authenticated_client.post(
|
|
"/api/anime/add",
|
|
json={
|
|
"link": "https://aniworld.to/anime/stream/test-anime",
|
|
"name": "Test Anime"
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Response should contain missing episodes fields
|
|
assert "missing_episodes" in data
|
|
assert "total_missing" in data
|
|
assert isinstance(data["missing_episodes"], dict)
|
|
assert isinstance(data["total_missing"], int)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_series_response_structure(authenticated_client):
|
|
"""Test the full response structure of add_series."""
|
|
response = await authenticated_client.post(
|
|
"/api/anime/add",
|
|
json={
|
|
"link": "https://aniworld.to/anime/stream/new-anime",
|
|
"name": "New Anime Series"
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Verify all expected fields are present
|
|
assert "status" in data
|
|
assert "message" in data
|
|
assert "key" in data
|
|
assert "folder" in data
|
|
assert "missing_episodes" in data
|
|
assert "total_missing" in data
|
|
|
|
# Status should be success or exists
|
|
assert data["status"] in ("success", "exists")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_series_special_characters_in_name(authenticated_client):
|
|
"""Test adding series with various special characters in name."""
|
|
test_cases = [
|
|
("86: Eighty-Six", "86-eighty-six"),
|
|
("Fate/Stay Night", "fate-stay-night"),
|
|
("What If...?", "what-if"),
|
|
("Steins;Gate", "steins-gate"),
|
|
]
|
|
|
|
for name, key in test_cases:
|
|
response = await authenticated_client.post(
|
|
"/api/anime/add",
|
|
json={
|
|
"link": f"https://aniworld.to/anime/stream/{key}",
|
|
"name": name
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Get just the folder name (last part of path)
|
|
folder_path = data["folder"]
|
|
# Handle both full paths and just folder names
|
|
if "/" in folder_path:
|
|
folder_name = folder_path.rstrip("/").split("/")[-1]
|
|
else:
|
|
folder_name = folder_path
|
|
|
|
# Folder name should not contain invalid filesystem characters
|
|
invalid_chars = [':', '\\', '?', '*', '<', '>', '|', '"']
|
|
for char in invalid_chars:
|
|
assert char not in folder_name, f"Found '{char}' in folder name for {name}"
|