Add anime and episode management tests

- Integration tests for anime search, details, and episode endpoints
- Unit tests for search algorithms, filtering, and pagination logic
- Tests cover authentication requirements, parameter validation
- Episode filtering by status, range, and missing episode detection
- Search performance optimization tests with indexing and caching
- Data integrity and consistency validation for API responses
This commit is contained in:
Lukas Pupka-Lipinski 2025-10-06 10:55:59 +02:00
parent 548eda6c94
commit 63f17b647d
2 changed files with 823 additions and 0 deletions

View File

@ -0,0 +1,401 @@
"""
Integration tests for anime and episode management API endpoints.
Tests anime search, anime details, episode retrieval with pagination,
valid/invalid IDs, and search filtering functionality.
"""
import pytest
import sys
import os
from fastapi.testclient import TestClient
from unittest.mock import patch
# Add source directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..'))
# Import after path setup
from src.server.fastapi_app import app # noqa: E402
@pytest.fixture
def client():
"""Test client for anime API tests."""
return TestClient(app)
@pytest.mark.integration
class TestAnimeSearchEndpoint:
"""Test anime search API endpoint."""
def test_anime_search_requires_auth(self, client):
"""Test anime search endpoint requires authentication."""
response = client.get("/api/anime/search?query=test")
assert response.status_code == 403 # Should require authentication
def test_anime_search_with_auth(self, client, mock_settings, valid_jwt_token):
"""Test anime search with valid authentication."""
with patch('src.server.fastapi_app.settings', mock_settings):
response = client.get(
"/api/anime/search?query=sample",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
for anime in data:
assert "id" in anime
assert "title" in anime
assert "description" in anime
assert "episodes" in anime
assert "status" in anime
assert "sample" in anime["title"].lower()
def test_anime_search_pagination(self, client, mock_settings, valid_jwt_token):
"""Test anime search with pagination parameters."""
with patch('src.server.fastapi_app.settings', mock_settings):
# Test with limit and offset
response = client.get(
"/api/anime/search?query=anime&limit=5&offset=0",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) <= 5 # Should respect limit
def test_anime_search_invalid_params(self, client, mock_settings, valid_jwt_token):
"""Test anime search with invalid parameters."""
with patch('src.server.fastapi_app.settings', mock_settings):
# Test missing query parameter
response = client.get(
"/api/anime/search",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 422 # Validation error
# Test invalid limit (too high)
response = client.get(
"/api/anime/search?query=test&limit=200",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 422
# Test negative offset
response = client.get(
"/api/anime/search?query=test&offset=-1",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 422
def test_anime_search_empty_query(self, client, mock_settings, valid_jwt_token):
"""Test anime search with empty query."""
with patch('src.server.fastapi_app.settings', mock_settings):
response = client.get(
"/api/anime/search?query=",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
# Empty query should be rejected due to min_length validation
assert response.status_code == 422
def test_anime_search_no_results(self, client, mock_settings, valid_jwt_token):
"""Test anime search with query that returns no results."""
with patch('src.server.fastapi_app.settings', mock_settings):
response = client.get(
"/api/anime/search?query=nonexistent_anime_title_xyz",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) == 0 # Should return empty list
@pytest.mark.integration
class TestAnimeDetailsEndpoint:
"""Test anime details API endpoint."""
def test_get_anime_requires_auth(self, client):
"""Test anime details endpoint requires authentication."""
response = client.get("/api/anime/test_anime_id")
assert response.status_code == 403
def test_get_anime_with_auth(self, client, mock_settings, valid_jwt_token):
"""Test anime details with valid authentication."""
anime_id = "test_anime_123"
with patch('src.server.fastapi_app.settings', mock_settings):
response = client.get(
f"/api/anime/{anime_id}",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 200
data = response.json()
assert data["id"] == anime_id
assert "title" in data
assert "description" in data
assert "episodes" in data
assert "status" in data
assert isinstance(data["episodes"], int)
def test_get_anime_invalid_id(self, client, mock_settings, valid_jwt_token):
"""Test anime details with various ID formats."""
with patch('src.server.fastapi_app.settings', mock_settings):
# Test with special characters in ID
response = client.get(
"/api/anime/anime@#$%",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
# Should still return 200 since it's just an ID string
assert response.status_code == 200
def test_get_anime_empty_id(self, client, mock_settings, valid_jwt_token):
"""Test anime details with empty ID."""
with patch('src.server.fastapi_app.settings', mock_settings):
# Empty ID should result in 404 or 422
response = client.get(
"/api/anime/",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code in [404, 405] # Method not allowed or not found
@pytest.mark.integration
class TestEpisodeEndpoints:
"""Test episode-related API endpoints."""
def test_get_anime_episodes_requires_auth(self, client):
"""Test anime episodes endpoint requires authentication."""
response = client.get("/api/anime/test_anime/episodes")
assert response.status_code == 403
def test_get_anime_episodes_with_auth(self, client, mock_settings, valid_jwt_token):
"""Test anime episodes with valid authentication."""
anime_id = "test_anime_456"
with patch('src.server.fastapi_app.settings', mock_settings):
response = client.get(
f"/api/anime/{anime_id}/episodes",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
for episode in data:
assert "id" in episode
assert "anime_id" in episode
assert "episode_number" in episode
assert "title" in episode
assert "description" in episode
assert "duration" in episode
assert episode["anime_id"] == anime_id
assert isinstance(episode["episode_number"], int)
assert episode["episode_number"] > 0
def test_get_episode_details_requires_auth(self, client):
"""Test episode details endpoint requires authentication."""
response = client.get("/api/episodes/test_episode_id")
assert response.status_code == 403
def test_get_episode_details_with_auth(self, client, mock_settings, valid_jwt_token):
"""Test episode details with valid authentication."""
episode_id = "test_episode_789"
with patch('src.server.fastapi_app.settings', mock_settings):
response = client.get(
f"/api/episodes/{episode_id}",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 200
data = response.json()
assert data["id"] == episode_id
assert "anime_id" in data
assert "episode_number" in data
assert "title" in data
assert "description" in data
assert "duration" in data
assert isinstance(data["episode_number"], int)
assert isinstance(data["duration"], int)
def test_episode_endpoints_with_invalid_auth(self, client):
"""Test episode endpoints with invalid authentication."""
invalid_token = "invalid.token.here"
endpoints = [
"/api/anime/test/episodes",
"/api/episodes/test_episode"
]
for endpoint in endpoints:
response = client.get(
endpoint,
headers={"Authorization": f"Bearer {invalid_token}"}
)
assert response.status_code == 401
@pytest.mark.integration
class TestAnimeAPIErrorHandling:
"""Test error handling in anime API endpoints."""
def test_anime_endpoints_malformed_auth(self, client):
"""Test anime endpoints with malformed authorization headers."""
malformed_headers = [
{"Authorization": "Bearer"}, # Missing token
{"Authorization": "Basic token"}, # Wrong type
{"Authorization": "token"}, # Missing Bearer
]
endpoints = [
"/api/anime/search?query=test",
"/api/anime/test_id",
"/api/anime/test_id/episodes",
"/api/episodes/test_id"
]
for headers in malformed_headers:
for endpoint in endpoints:
response = client.get(endpoint, headers=headers)
assert response.status_code in [401, 403]
def test_anime_search_parameter_validation(self, client, mock_settings, valid_jwt_token):
"""Test anime search parameter validation."""
with patch('src.server.fastapi_app.settings', mock_settings):
# Test various invalid parameter combinations
invalid_params = [
"query=test&limit=0", # limit too low
"query=test&limit=101", # limit too high
"query=test&offset=-5", # negative offset
"query=&limit=10", # empty query
]
for params in invalid_params:
response = client.get(
f"/api/anime/search?{params}",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 422
def test_anime_endpoints_content_type_handling(self, client, mock_settings, valid_jwt_token):
"""Test anime endpoints with different content types."""
with patch('src.server.fastapi_app.settings', mock_settings):
# Test with different Accept headers
accept_headers = [
"application/json",
"application/xml",
"text/plain",
"*/*"
]
for accept_header in accept_headers:
response = client.get(
"/api/anime/search?query=test",
headers={
"Authorization": f"Bearer {valid_jwt_token}",
"Accept": accept_header
}
)
# Should always return JSON regardless of Accept header
assert response.status_code == 200
assert response.headers.get("content-type", "").startswith("application/json")
@pytest.mark.integration
class TestAnimeAPIDataIntegrity:
"""Test data integrity and consistency in anime API responses."""
def test_anime_search_response_structure(self, client, mock_settings, valid_jwt_token):
"""Test anime search response has consistent structure."""
with patch('src.server.fastapi_app.settings', mock_settings):
response = client.get(
"/api/anime/search?query=anime",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 200
data = response.json()
required_fields = ["id", "title", "description", "episodes", "status"]
for anime in data:
for field in required_fields:
assert field in anime, f"Missing field {field} in anime response"
# Validate field types
assert isinstance(anime["id"], str)
assert isinstance(anime["title"], str)
assert isinstance(anime["episodes"], int)
assert isinstance(anime["status"], str)
assert anime["episodes"] >= 0
def test_episode_response_structure(self, client, mock_settings, valid_jwt_token):
"""Test episode response has consistent structure."""
with patch('src.server.fastapi_app.settings', mock_settings):
response = client.get(
"/api/anime/test_anime/episodes",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 200
data = response.json()
required_fields = ["id", "anime_id", "episode_number", "title", "description", "duration"]
for episode in data:
for field in required_fields:
assert field in episode, f"Missing field {field} in episode response"
# Validate field types and ranges
assert isinstance(episode["id"], str)
assert isinstance(episode["anime_id"], str)
assert isinstance(episode["episode_number"], int)
assert isinstance(episode["title"], str)
assert isinstance(episode["duration"], int)
assert episode["episode_number"] > 0
assert episode["duration"] > 0
def test_episode_numbering_consistency(self, client, mock_settings, valid_jwt_token):
"""Test episode numbering is consistent and sequential."""
with patch('src.server.fastapi_app.settings', mock_settings):
response = client.get(
"/api/anime/test_anime/episodes",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 200
episodes = response.json()
if len(episodes) > 1:
# Check that episode numbers are sequential
episode_numbers = [ep["episode_number"] for ep in episodes]
episode_numbers.sort()
for i in range(len(episode_numbers) - 1):
assert episode_numbers[i + 1] == episode_numbers[i] + 1, \
"Episode numbers should be sequential"

View File

@ -0,0 +1,422 @@
"""
Unit tests for anime search and filtering logic.
Tests search algorithms, filtering functions, sorting mechanisms,
and data processing for anime and episode management.
"""
import pytest
import sys
import os
# Add source directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..'))
@pytest.mark.unit
class TestAnimeSearchLogic:
"""Test anime search and filtering functionality."""
def test_basic_text_search(self):
"""Test basic text search functionality."""
def search_anime_by_title(anime_list, query):
"""Simple title search function."""
if not query:
return []
query_lower = query.lower()
return [
anime for anime in anime_list
if query_lower in anime.get("title", "").lower()
]
# Test data
anime_list = [
{"id": "1", "title": "Attack on Titan", "genre": "Action"},
{"id": "2", "title": "My Hero Academia", "genre": "Action"},
{"id": "3", "title": "Demon Slayer", "genre": "Action"},
{"id": "4", "title": "One Piece", "genre": "Adventure"}
]
# Test exact match
results = search_anime_by_title(anime_list, "Attack on Titan")
assert len(results) == 1
assert results[0]["title"] == "Attack on Titan"
# Test partial match
results = search_anime_by_title(anime_list, "attack")
assert len(results) == 1
# Test case insensitive
results = search_anime_by_title(anime_list, "ATTACK")
assert len(results) == 1
# Test multiple matches
results = search_anime_by_title(anime_list, "a")
assert len(results) >= 2 # Should match "Attack" and "Academia"
# Test no matches
results = search_anime_by_title(anime_list, "Nonexistent")
assert len(results) == 0
# Test empty query
results = search_anime_by_title(anime_list, "")
assert len(results) == 0
def test_advanced_search_with_filters(self):
"""Test advanced search with multiple filters."""
def advanced_anime_search(anime_list, query="", genre=None, year=None, status=None):
"""Advanced search with multiple filters."""
results = anime_list.copy()
# Text search
if query:
query_lower = query.lower()
results = [
anime for anime in results
if (query_lower in anime.get("title", "").lower() or
query_lower in anime.get("description", "").lower())
]
# Genre filter
if genre:
results = [
anime for anime in results
if anime.get("genre", "").lower() == genre.lower()
]
# Year filter
if year:
results = [
anime for anime in results
if anime.get("year") == year
]
# Status filter
if status:
results = [
anime for anime in results
if anime.get("status", "").lower() == status.lower()
]
return results
# Test data
anime_list = [
{
"id": "1",
"title": "Attack on Titan",
"description": "Humanity fights giants",
"genre": "Action",
"year": 2013,
"status": "Completed"
},
{
"id": "2",
"title": "My Hero Academia",
"description": "Superheroes in training",
"genre": "Action",
"year": 2016,
"status": "Ongoing"
},
{
"id": "3",
"title": "Your Name",
"description": "Body swapping romance",
"genre": "Romance",
"year": 2016,
"status": "Completed"
}
]
# Test genre filter
results = advanced_anime_search(anime_list, genre="Action")
assert len(results) == 2
# Test year filter
results = advanced_anime_search(anime_list, year=2016)
assert len(results) == 2
# Test status filter
results = advanced_anime_search(anime_list, status="Completed")
assert len(results) == 2
# Test combined filters
results = advanced_anime_search(anime_list, genre="Action", status="Ongoing")
assert len(results) == 1
assert results[0]["title"] == "My Hero Academia"
# Test text search in description
results = advanced_anime_search(anime_list, query="giants")
assert len(results) == 1
assert results[0]["title"] == "Attack on Titan"
def test_search_pagination(self):
"""Test search result pagination."""
def paginate_results(results, limit=20, offset=0):
"""Paginate search results."""
if limit <= 0:
return []
start = max(0, offset)
end = start + limit
return results[start:end]
# Test data
results = [{"id": str(i), "title": f"Anime {i}"} for i in range(100)]
# Test normal pagination
page_1 = paginate_results(results, limit=10, offset=0)
assert len(page_1) == 10
assert page_1[0]["id"] == "0"
page_2 = paginate_results(results, limit=10, offset=10)
assert len(page_2) == 10
assert page_2[0]["id"] == "10"
# Test edge cases
last_page = paginate_results(results, limit=10, offset=95)
assert len(last_page) == 5 # Only 5 items left
beyond_results = paginate_results(results, limit=10, offset=200)
assert len(beyond_results) == 0
# Test invalid parameters
invalid_limit = paginate_results(results, limit=0, offset=0)
assert len(invalid_limit) == 0
negative_offset = paginate_results(results, limit=10, offset=-5)
assert len(negative_offset) == 10 # Should start from 0
def test_search_sorting(self):
"""Test search result sorting."""
def sort_anime_results(anime_list, sort_by="title", sort_order="asc"):
"""Sort anime results by different criteria."""
if not anime_list:
return []
reverse = sort_order.lower() == "desc"
if sort_by == "title":
return sorted(anime_list, key=lambda x: x.get("title", "").lower(), reverse=reverse)
elif sort_by == "year":
return sorted(anime_list, key=lambda x: x.get("year", 0), reverse=reverse)
elif sort_by == "episodes":
return sorted(anime_list, key=lambda x: x.get("episodes", 0), reverse=reverse)
elif sort_by == "rating":
return sorted(anime_list, key=lambda x: x.get("rating", 0), reverse=reverse)
else:
return anime_list
# Test data
anime_list = [
{"title": "Zorro", "year": 2020, "episodes": 12, "rating": 8.5},
{"title": "Alpha", "year": 2018, "episodes": 24, "rating": 9.0},
{"title": "Beta", "year": 2022, "episodes": 6, "rating": 7.5}
]
# Test title sorting ascending
sorted_results = sort_anime_results(anime_list, "title", "asc")
titles = [anime["title"] for anime in sorted_results]
assert titles == ["Alpha", "Beta", "Zorro"]
# Test title sorting descending
sorted_results = sort_anime_results(anime_list, "title", "desc")
titles = [anime["title"] for anime in sorted_results]
assert titles == ["Zorro", "Beta", "Alpha"]
# Test year sorting
sorted_results = sort_anime_results(anime_list, "year", "asc")
years = [anime["year"] for anime in sorted_results]
assert years == [2018, 2020, 2022]
# Test episodes sorting
sorted_results = sort_anime_results(anime_list, "episodes", "desc")
episodes = [anime["episodes"] for anime in sorted_results]
assert episodes == [24, 12, 6]
# Test rating sorting
sorted_results = sort_anime_results(anime_list, "rating", "desc")
ratings = [anime["rating"] for anime in sorted_results]
assert ratings == [9.0, 8.5, 7.5]
@pytest.mark.unit
class TestEpisodeFilteringLogic:
"""Test episode filtering and management logic."""
def test_episode_filtering_by_status(self):
"""Test filtering episodes by watch status."""
def filter_episodes_by_status(episodes, status):
"""Filter episodes by watch status."""
if not status:
return episodes
return [ep for ep in episodes if ep.get("watch_status", "").lower() == status.lower()]
episodes = [
{"id": "1", "title": "Episode 1", "watch_status": "watched"},
{"id": "2", "title": "Episode 2", "watch_status": "unwatched"},
{"id": "3", "title": "Episode 3", "watch_status": "watching"},
{"id": "4", "title": "Episode 4", "watch_status": "watched"}
]
watched = filter_episodes_by_status(episodes, "watched")
assert len(watched) == 2
unwatched = filter_episodes_by_status(episodes, "unwatched")
assert len(unwatched) == 1
watching = filter_episodes_by_status(episodes, "watching")
assert len(watching) == 1
def test_episode_range_filtering(self):
"""Test filtering episodes by number range."""
def filter_episodes_by_range(episodes, start_ep=None, end_ep=None):
"""Filter episodes by episode number range."""
results = episodes.copy()
if start_ep is not None:
results = [ep for ep in results if ep.get("episode_number", 0) >= start_ep]
if end_ep is not None:
results = [ep for ep in results if ep.get("episode_number", 0) <= end_ep]
return results
episodes = [
{"id": "1", "episode_number": 1, "title": "Episode 1"},
{"id": "2", "episode_number": 5, "title": "Episode 5"},
{"id": "3", "episode_number": 10, "title": "Episode 10"},
{"id": "4", "episode_number": 15, "title": "Episode 15"},
{"id": "5", "episode_number": 20, "title": "Episode 20"}
]
# Test start range
results = filter_episodes_by_range(episodes, start_ep=10)
assert len(results) == 3
assert all(ep["episode_number"] >= 10 for ep in results)
# Test end range
results = filter_episodes_by_range(episodes, end_ep=10)
assert len(results) == 3
assert all(ep["episode_number"] <= 10 for ep in results)
# Test both start and end
results = filter_episodes_by_range(episodes, start_ep=5, end_ep=15)
assert len(results) == 3
assert all(5 <= ep["episode_number"] <= 15 for ep in results)
def test_missing_episodes_detection(self):
"""Test detection of missing episodes in a series."""
def find_missing_episodes(episodes, expected_total):
"""Find missing episode numbers in a series."""
episode_numbers = {ep.get("episode_number") for ep in episodes if ep.get("episode_number")}
expected_numbers = set(range(1, expected_total + 1))
missing = expected_numbers - episode_numbers
return sorted(list(missing))
# Test with some missing episodes
episodes = [
{"episode_number": 1}, {"episode_number": 3},
{"episode_number": 5}, {"episode_number": 7}
]
missing = find_missing_episodes(episodes, 10)
assert missing == [2, 4, 6, 8, 9, 10]
# Test with no missing episodes
complete_episodes = [{"episode_number": i} for i in range(1, 6)]
missing = find_missing_episodes(complete_episodes, 5)
assert missing == []
# Test with all episodes missing
missing = find_missing_episodes([], 3)
assert missing == [1, 2, 3]
@pytest.mark.unit
class TestSearchPerformance:
"""Test search performance and optimization."""
def test_search_index_creation(self):
"""Test search index creation for performance."""
def create_search_index(anime_list):
"""Create a search index for faster lookups."""
index = {
"by_title": {},
"by_genre": {},
"by_year": {}
}
for anime in anime_list:
title = anime.get("title", "").lower()
genre = anime.get("genre", "").lower()
year = anime.get("year")
# Index by title keywords
for word in title.split():
if word not in index["by_title"]:
index["by_title"][word] = []
index["by_title"][word].append(anime)
# Index by genre
if genre:
if genre not in index["by_genre"]:
index["by_genre"][genre] = []
index["by_genre"][genre].append(anime)
# Index by year
if year:
if year not in index["by_year"]:
index["by_year"][year] = []
index["by_year"][year].append(anime)
return index
anime_list = [
{"title": "Attack on Titan", "genre": "Action", "year": 2013},
{"title": "My Hero Academia", "genre": "Action", "year": 2016},
{"title": "Your Name", "genre": "Romance", "year": 2016}
]
index = create_search_index(anime_list)
# Test title index
assert "attack" in index["by_title"]
assert len(index["by_title"]["attack"]) == 1
# Test genre index
assert "action" in index["by_genre"]
assert len(index["by_genre"]["action"]) == 2
# Test year index
assert 2016 in index["by_year"]
assert len(index["by_year"][2016]) == 2
def test_search_result_caching(self):
"""Test search result caching mechanism."""
def cached_search(query, cache={}):
"""Simple search with caching."""
if query in cache:
return cache[query], True # Return cached result and cache hit flag
# Simulate expensive search operation
result = [{"id": "1", "title": f"Result for {query}"}]
cache[query] = result
return result, False # Return new result and cache miss flag
# Test cache miss
result, cache_hit = cached_search("test_query")
assert not cache_hit
assert len(result) == 1
# Test cache hit
result, cache_hit = cached_search("test_query")
assert cache_hit
assert len(result) == 1
# Test different query
result, cache_hit = cached_search("another_query")
assert not cache_hit