diff --git a/src/tests/integration/test_anime_endpoints.py b/src/tests/integration/test_anime_endpoints.py new file mode 100644 index 0000000..3681cc2 --- /dev/null +++ b/src/tests/integration/test_anime_endpoints.py @@ -0,0 +1,401 @@ +""" +Integration tests for anime and episode management API endpoints. + +Tests anime search, anime details, episode retrieval with pagination, +valid/invalid IDs, and search filtering functionality. +""" + +import pytest +import sys +import os +from fastapi.testclient import TestClient +from unittest.mock import patch + +# Add source directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..')) + +# Import after path setup +from src.server.fastapi_app import app # noqa: E402 + + +@pytest.fixture +def client(): + """Test client for anime API tests.""" + return TestClient(app) + + +@pytest.mark.integration +class TestAnimeSearchEndpoint: + """Test anime search API endpoint.""" + + def test_anime_search_requires_auth(self, client): + """Test anime search endpoint requires authentication.""" + response = client.get("/api/anime/search?query=test") + + assert response.status_code == 403 # Should require authentication + + def test_anime_search_with_auth(self, client, mock_settings, valid_jwt_token): + """Test anime search with valid authentication.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/api/anime/search?query=sample", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 200 + data = response.json() + + assert isinstance(data, list) + for anime in data: + assert "id" in anime + assert "title" in anime + assert "description" in anime + assert "episodes" in anime + assert "status" in anime + assert "sample" in anime["title"].lower() + + def test_anime_search_pagination(self, client, mock_settings, valid_jwt_token): + """Test anime search with pagination parameters.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Test with limit and offset + response = client.get( + "/api/anime/search?query=anime&limit=5&offset=0", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 200 + data = response.json() + + assert isinstance(data, list) + assert len(data) <= 5 # Should respect limit + + def test_anime_search_invalid_params(self, client, mock_settings, valid_jwt_token): + """Test anime search with invalid parameters.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Test missing query parameter + response = client.get( + "/api/anime/search", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 422 # Validation error + + # Test invalid limit (too high) + response = client.get( + "/api/anime/search?query=test&limit=200", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 422 + + # Test negative offset + response = client.get( + "/api/anime/search?query=test&offset=-1", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 422 + + def test_anime_search_empty_query(self, client, mock_settings, valid_jwt_token): + """Test anime search with empty query.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/api/anime/search?query=", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + # Empty query should be rejected due to min_length validation + assert response.status_code == 422 + + def test_anime_search_no_results(self, client, mock_settings, valid_jwt_token): + """Test anime search with query that returns no results.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/api/anime/search?query=nonexistent_anime_title_xyz", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 200 + data = response.json() + + assert isinstance(data, list) + assert len(data) == 0 # Should return empty list + + +@pytest.mark.integration +class TestAnimeDetailsEndpoint: + """Test anime details API endpoint.""" + + def test_get_anime_requires_auth(self, client): + """Test anime details endpoint requires authentication.""" + response = client.get("/api/anime/test_anime_id") + + assert response.status_code == 403 + + def test_get_anime_with_auth(self, client, mock_settings, valid_jwt_token): + """Test anime details with valid authentication.""" + anime_id = "test_anime_123" + + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + f"/api/anime/{anime_id}", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 200 + data = response.json() + + assert data["id"] == anime_id + assert "title" in data + assert "description" in data + assert "episodes" in data + assert "status" in data + assert isinstance(data["episodes"], int) + + def test_get_anime_invalid_id(self, client, mock_settings, valid_jwt_token): + """Test anime details with various ID formats.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Test with special characters in ID + response = client.get( + "/api/anime/anime@#$%", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + # Should still return 200 since it's just an ID string + assert response.status_code == 200 + + def test_get_anime_empty_id(self, client, mock_settings, valid_jwt_token): + """Test anime details with empty ID.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Empty ID should result in 404 or 422 + response = client.get( + "/api/anime/", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code in [404, 405] # Method not allowed or not found + + +@pytest.mark.integration +class TestEpisodeEndpoints: + """Test episode-related API endpoints.""" + + def test_get_anime_episodes_requires_auth(self, client): + """Test anime episodes endpoint requires authentication.""" + response = client.get("/api/anime/test_anime/episodes") + + assert response.status_code == 403 + + def test_get_anime_episodes_with_auth(self, client, mock_settings, valid_jwt_token): + """Test anime episodes with valid authentication.""" + anime_id = "test_anime_456" + + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + f"/api/anime/{anime_id}/episodes", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 200 + data = response.json() + + assert isinstance(data, list) + + for episode in data: + assert "id" in episode + assert "anime_id" in episode + assert "episode_number" in episode + assert "title" in episode + assert "description" in episode + assert "duration" in episode + assert episode["anime_id"] == anime_id + assert isinstance(episode["episode_number"], int) + assert episode["episode_number"] > 0 + + def test_get_episode_details_requires_auth(self, client): + """Test episode details endpoint requires authentication.""" + response = client.get("/api/episodes/test_episode_id") + + assert response.status_code == 403 + + def test_get_episode_details_with_auth(self, client, mock_settings, valid_jwt_token): + """Test episode details with valid authentication.""" + episode_id = "test_episode_789" + + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + f"/api/episodes/{episode_id}", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 200 + data = response.json() + + assert data["id"] == episode_id + assert "anime_id" in data + assert "episode_number" in data + assert "title" in data + assert "description" in data + assert "duration" in data + assert isinstance(data["episode_number"], int) + assert isinstance(data["duration"], int) + + def test_episode_endpoints_with_invalid_auth(self, client): + """Test episode endpoints with invalid authentication.""" + invalid_token = "invalid.token.here" + + endpoints = [ + "/api/anime/test/episodes", + "/api/episodes/test_episode" + ] + + for endpoint in endpoints: + response = client.get( + endpoint, + headers={"Authorization": f"Bearer {invalid_token}"} + ) + + assert response.status_code == 401 + + +@pytest.mark.integration +class TestAnimeAPIErrorHandling: + """Test error handling in anime API endpoints.""" + + def test_anime_endpoints_malformed_auth(self, client): + """Test anime endpoints with malformed authorization headers.""" + malformed_headers = [ + {"Authorization": "Bearer"}, # Missing token + {"Authorization": "Basic token"}, # Wrong type + {"Authorization": "token"}, # Missing Bearer + ] + + endpoints = [ + "/api/anime/search?query=test", + "/api/anime/test_id", + "/api/anime/test_id/episodes", + "/api/episodes/test_id" + ] + + for headers in malformed_headers: + for endpoint in endpoints: + response = client.get(endpoint, headers=headers) + assert response.status_code in [401, 403] + + def test_anime_search_parameter_validation(self, client, mock_settings, valid_jwt_token): + """Test anime search parameter validation.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Test various invalid parameter combinations + invalid_params = [ + "query=test&limit=0", # limit too low + "query=test&limit=101", # limit too high + "query=test&offset=-5", # negative offset + "query=&limit=10", # empty query + ] + + for params in invalid_params: + response = client.get( + f"/api/anime/search?{params}", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 422 + + def test_anime_endpoints_content_type_handling(self, client, mock_settings, valid_jwt_token): + """Test anime endpoints with different content types.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Test with different Accept headers + accept_headers = [ + "application/json", + "application/xml", + "text/plain", + "*/*" + ] + + for accept_header in accept_headers: + response = client.get( + "/api/anime/search?query=test", + headers={ + "Authorization": f"Bearer {valid_jwt_token}", + "Accept": accept_header + } + ) + + # Should always return JSON regardless of Accept header + assert response.status_code == 200 + assert response.headers.get("content-type", "").startswith("application/json") + + +@pytest.mark.integration +class TestAnimeAPIDataIntegrity: + """Test data integrity and consistency in anime API responses.""" + + def test_anime_search_response_structure(self, client, mock_settings, valid_jwt_token): + """Test anime search response has consistent structure.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/api/anime/search?query=anime", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 200 + data = response.json() + + required_fields = ["id", "title", "description", "episodes", "status"] + + for anime in data: + for field in required_fields: + assert field in anime, f"Missing field {field} in anime response" + + # Validate field types + assert isinstance(anime["id"], str) + assert isinstance(anime["title"], str) + assert isinstance(anime["episodes"], int) + assert isinstance(anime["status"], str) + assert anime["episodes"] >= 0 + + def test_episode_response_structure(self, client, mock_settings, valid_jwt_token): + """Test episode response has consistent structure.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/api/anime/test_anime/episodes", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 200 + data = response.json() + + required_fields = ["id", "anime_id", "episode_number", "title", "description", "duration"] + + for episode in data: + for field in required_fields: + assert field in episode, f"Missing field {field} in episode response" + + # Validate field types and ranges + assert isinstance(episode["id"], str) + assert isinstance(episode["anime_id"], str) + assert isinstance(episode["episode_number"], int) + assert isinstance(episode["title"], str) + assert isinstance(episode["duration"], int) + assert episode["episode_number"] > 0 + assert episode["duration"] > 0 + + def test_episode_numbering_consistency(self, client, mock_settings, valid_jwt_token): + """Test episode numbering is consistent and sequential.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/api/anime/test_anime/episodes", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 200 + episodes = response.json() + + if len(episodes) > 1: + # Check that episode numbers are sequential + episode_numbers = [ep["episode_number"] for ep in episodes] + episode_numbers.sort() + + for i in range(len(episode_numbers) - 1): + assert episode_numbers[i + 1] == episode_numbers[i] + 1, \ + "Episode numbers should be sequential" \ No newline at end of file diff --git a/src/tests/unit/test_anime_search.py b/src/tests/unit/test_anime_search.py new file mode 100644 index 0000000..dfb9841 --- /dev/null +++ b/src/tests/unit/test_anime_search.py @@ -0,0 +1,422 @@ +""" +Unit tests for anime search and filtering logic. + +Tests search algorithms, filtering functions, sorting mechanisms, +and data processing for anime and episode management. +""" + +import pytest +import sys +import os + +# Add source directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..')) + + +@pytest.mark.unit +class TestAnimeSearchLogic: + """Test anime search and filtering functionality.""" + + def test_basic_text_search(self): + """Test basic text search functionality.""" + def search_anime_by_title(anime_list, query): + """Simple title search function.""" + if not query: + return [] + + query_lower = query.lower() + return [ + anime for anime in anime_list + if query_lower in anime.get("title", "").lower() + ] + + # Test data + anime_list = [ + {"id": "1", "title": "Attack on Titan", "genre": "Action"}, + {"id": "2", "title": "My Hero Academia", "genre": "Action"}, + {"id": "3", "title": "Demon Slayer", "genre": "Action"}, + {"id": "4", "title": "One Piece", "genre": "Adventure"} + ] + + # Test exact match + results = search_anime_by_title(anime_list, "Attack on Titan") + assert len(results) == 1 + assert results[0]["title"] == "Attack on Titan" + + # Test partial match + results = search_anime_by_title(anime_list, "attack") + assert len(results) == 1 + + # Test case insensitive + results = search_anime_by_title(anime_list, "ATTACK") + assert len(results) == 1 + + # Test multiple matches + results = search_anime_by_title(anime_list, "a") + assert len(results) >= 2 # Should match "Attack" and "Academia" + + # Test no matches + results = search_anime_by_title(anime_list, "Nonexistent") + assert len(results) == 0 + + # Test empty query + results = search_anime_by_title(anime_list, "") + assert len(results) == 0 + + def test_advanced_search_with_filters(self): + """Test advanced search with multiple filters.""" + def advanced_anime_search(anime_list, query="", genre=None, year=None, status=None): + """Advanced search with multiple filters.""" + results = anime_list.copy() + + # Text search + if query: + query_lower = query.lower() + results = [ + anime for anime in results + if (query_lower in anime.get("title", "").lower() or + query_lower in anime.get("description", "").lower()) + ] + + # Genre filter + if genre: + results = [ + anime for anime in results + if anime.get("genre", "").lower() == genre.lower() + ] + + # Year filter + if year: + results = [ + anime for anime in results + if anime.get("year") == year + ] + + # Status filter + if status: + results = [ + anime for anime in results + if anime.get("status", "").lower() == status.lower() + ] + + return results + + # Test data + anime_list = [ + { + "id": "1", + "title": "Attack on Titan", + "description": "Humanity fights giants", + "genre": "Action", + "year": 2013, + "status": "Completed" + }, + { + "id": "2", + "title": "My Hero Academia", + "description": "Superheroes in training", + "genre": "Action", + "year": 2016, + "status": "Ongoing" + }, + { + "id": "3", + "title": "Your Name", + "description": "Body swapping romance", + "genre": "Romance", + "year": 2016, + "status": "Completed" + } + ] + + # Test genre filter + results = advanced_anime_search(anime_list, genre="Action") + assert len(results) == 2 + + # Test year filter + results = advanced_anime_search(anime_list, year=2016) + assert len(results) == 2 + + # Test status filter + results = advanced_anime_search(anime_list, status="Completed") + assert len(results) == 2 + + # Test combined filters + results = advanced_anime_search(anime_list, genre="Action", status="Ongoing") + assert len(results) == 1 + assert results[0]["title"] == "My Hero Academia" + + # Test text search in description + results = advanced_anime_search(anime_list, query="giants") + assert len(results) == 1 + assert results[0]["title"] == "Attack on Titan" + + def test_search_pagination(self): + """Test search result pagination.""" + def paginate_results(results, limit=20, offset=0): + """Paginate search results.""" + if limit <= 0: + return [] + + start = max(0, offset) + end = start + limit + + return results[start:end] + + # Test data + results = [{"id": str(i), "title": f"Anime {i}"} for i in range(100)] + + # Test normal pagination + page_1 = paginate_results(results, limit=10, offset=0) + assert len(page_1) == 10 + assert page_1[0]["id"] == "0" + + page_2 = paginate_results(results, limit=10, offset=10) + assert len(page_2) == 10 + assert page_2[0]["id"] == "10" + + # Test edge cases + last_page = paginate_results(results, limit=10, offset=95) + assert len(last_page) == 5 # Only 5 items left + + beyond_results = paginate_results(results, limit=10, offset=200) + assert len(beyond_results) == 0 + + # Test invalid parameters + invalid_limit = paginate_results(results, limit=0, offset=0) + assert len(invalid_limit) == 0 + + negative_offset = paginate_results(results, limit=10, offset=-5) + assert len(negative_offset) == 10 # Should start from 0 + + def test_search_sorting(self): + """Test search result sorting.""" + def sort_anime_results(anime_list, sort_by="title", sort_order="asc"): + """Sort anime results by different criteria.""" + if not anime_list: + return [] + + reverse = sort_order.lower() == "desc" + + if sort_by == "title": + return sorted(anime_list, key=lambda x: x.get("title", "").lower(), reverse=reverse) + elif sort_by == "year": + return sorted(anime_list, key=lambda x: x.get("year", 0), reverse=reverse) + elif sort_by == "episodes": + return sorted(anime_list, key=lambda x: x.get("episodes", 0), reverse=reverse) + elif sort_by == "rating": + return sorted(anime_list, key=lambda x: x.get("rating", 0), reverse=reverse) + else: + return anime_list + + # Test data + anime_list = [ + {"title": "Zorro", "year": 2020, "episodes": 12, "rating": 8.5}, + {"title": "Alpha", "year": 2018, "episodes": 24, "rating": 9.0}, + {"title": "Beta", "year": 2022, "episodes": 6, "rating": 7.5} + ] + + # Test title sorting ascending + sorted_results = sort_anime_results(anime_list, "title", "asc") + titles = [anime["title"] for anime in sorted_results] + assert titles == ["Alpha", "Beta", "Zorro"] + + # Test title sorting descending + sorted_results = sort_anime_results(anime_list, "title", "desc") + titles = [anime["title"] for anime in sorted_results] + assert titles == ["Zorro", "Beta", "Alpha"] + + # Test year sorting + sorted_results = sort_anime_results(anime_list, "year", "asc") + years = [anime["year"] for anime in sorted_results] + assert years == [2018, 2020, 2022] + + # Test episodes sorting + sorted_results = sort_anime_results(anime_list, "episodes", "desc") + episodes = [anime["episodes"] for anime in sorted_results] + assert episodes == [24, 12, 6] + + # Test rating sorting + sorted_results = sort_anime_results(anime_list, "rating", "desc") + ratings = [anime["rating"] for anime in sorted_results] + assert ratings == [9.0, 8.5, 7.5] + + +@pytest.mark.unit +class TestEpisodeFilteringLogic: + """Test episode filtering and management logic.""" + + def test_episode_filtering_by_status(self): + """Test filtering episodes by watch status.""" + def filter_episodes_by_status(episodes, status): + """Filter episodes by watch status.""" + if not status: + return episodes + + return [ep for ep in episodes if ep.get("watch_status", "").lower() == status.lower()] + + episodes = [ + {"id": "1", "title": "Episode 1", "watch_status": "watched"}, + {"id": "2", "title": "Episode 2", "watch_status": "unwatched"}, + {"id": "3", "title": "Episode 3", "watch_status": "watching"}, + {"id": "4", "title": "Episode 4", "watch_status": "watched"} + ] + + watched = filter_episodes_by_status(episodes, "watched") + assert len(watched) == 2 + + unwatched = filter_episodes_by_status(episodes, "unwatched") + assert len(unwatched) == 1 + + watching = filter_episodes_by_status(episodes, "watching") + assert len(watching) == 1 + + def test_episode_range_filtering(self): + """Test filtering episodes by number range.""" + def filter_episodes_by_range(episodes, start_ep=None, end_ep=None): + """Filter episodes by episode number range.""" + results = episodes.copy() + + if start_ep is not None: + results = [ep for ep in results if ep.get("episode_number", 0) >= start_ep] + + if end_ep is not None: + results = [ep for ep in results if ep.get("episode_number", 0) <= end_ep] + + return results + + episodes = [ + {"id": "1", "episode_number": 1, "title": "Episode 1"}, + {"id": "2", "episode_number": 5, "title": "Episode 5"}, + {"id": "3", "episode_number": 10, "title": "Episode 10"}, + {"id": "4", "episode_number": 15, "title": "Episode 15"}, + {"id": "5", "episode_number": 20, "title": "Episode 20"} + ] + + # Test start range + results = filter_episodes_by_range(episodes, start_ep=10) + assert len(results) == 3 + assert all(ep["episode_number"] >= 10 for ep in results) + + # Test end range + results = filter_episodes_by_range(episodes, end_ep=10) + assert len(results) == 3 + assert all(ep["episode_number"] <= 10 for ep in results) + + # Test both start and end + results = filter_episodes_by_range(episodes, start_ep=5, end_ep=15) + assert len(results) == 3 + assert all(5 <= ep["episode_number"] <= 15 for ep in results) + + def test_missing_episodes_detection(self): + """Test detection of missing episodes in a series.""" + def find_missing_episodes(episodes, expected_total): + """Find missing episode numbers in a series.""" + episode_numbers = {ep.get("episode_number") for ep in episodes if ep.get("episode_number")} + expected_numbers = set(range(1, expected_total + 1)) + missing = expected_numbers - episode_numbers + return sorted(list(missing)) + + # Test with some missing episodes + episodes = [ + {"episode_number": 1}, {"episode_number": 3}, + {"episode_number": 5}, {"episode_number": 7} + ] + + missing = find_missing_episodes(episodes, 10) + assert missing == [2, 4, 6, 8, 9, 10] + + # Test with no missing episodes + complete_episodes = [{"episode_number": i} for i in range(1, 6)] + missing = find_missing_episodes(complete_episodes, 5) + assert missing == [] + + # Test with all episodes missing + missing = find_missing_episodes([], 3) + assert missing == [1, 2, 3] + + +@pytest.mark.unit +class TestSearchPerformance: + """Test search performance and optimization.""" + + def test_search_index_creation(self): + """Test search index creation for performance.""" + def create_search_index(anime_list): + """Create a search index for faster lookups.""" + index = { + "by_title": {}, + "by_genre": {}, + "by_year": {} + } + + for anime in anime_list: + title = anime.get("title", "").lower() + genre = anime.get("genre", "").lower() + year = anime.get("year") + + # Index by title keywords + for word in title.split(): + if word not in index["by_title"]: + index["by_title"][word] = [] + index["by_title"][word].append(anime) + + # Index by genre + if genre: + if genre not in index["by_genre"]: + index["by_genre"][genre] = [] + index["by_genre"][genre].append(anime) + + # Index by year + if year: + if year not in index["by_year"]: + index["by_year"][year] = [] + index["by_year"][year].append(anime) + + return index + + anime_list = [ + {"title": "Attack on Titan", "genre": "Action", "year": 2013}, + {"title": "My Hero Academia", "genre": "Action", "year": 2016}, + {"title": "Your Name", "genre": "Romance", "year": 2016} + ] + + index = create_search_index(anime_list) + + # Test title index + assert "attack" in index["by_title"] + assert len(index["by_title"]["attack"]) == 1 + + # Test genre index + assert "action" in index["by_genre"] + assert len(index["by_genre"]["action"]) == 2 + + # Test year index + assert 2016 in index["by_year"] + assert len(index["by_year"][2016]) == 2 + + def test_search_result_caching(self): + """Test search result caching mechanism.""" + def cached_search(query, cache={}): + """Simple search with caching.""" + if query in cache: + return cache[query], True # Return cached result and cache hit flag + + # Simulate expensive search operation + result = [{"id": "1", "title": f"Result for {query}"}] + cache[query] = result + return result, False # Return new result and cache miss flag + + # Test cache miss + result, cache_hit = cached_search("test_query") + assert not cache_hit + assert len(result) == 1 + + # Test cache hit + result, cache_hit = cached_search("test_query") + assert cache_hit + assert len(result) == 1 + + # Test different query + result, cache_hit = cached_search("another_query") + assert not cache_hit \ No newline at end of file