cleanup contollers

This commit is contained in:
2025-10-05 11:39:33 +02:00
parent 94e6b77456
commit 64434ccd44
43 changed files with 14453 additions and 2332 deletions

View File

@@ -0,0 +1,557 @@
"""
Test cases for anime API endpoints.
"""
import pytest
from unittest.mock import Mock, patch, MagicMock
from flask import Flask
import json
# Mock the database managers first
mock_anime_manager = Mock()
mock_download_manager = Mock()
# Import the modules to test
try:
with patch.dict('sys.modules', {
'src.server.data.anime_manager': Mock(AnimeManager=Mock(return_value=mock_anime_manager)),
'src.server.data.download_manager': Mock(DownloadManager=Mock(return_value=mock_download_manager))
}):
from src.server.web.controllers.api.v1.anime import anime_bp
except ImportError:
anime_bp = None
class TestAnimeEndpoints:
"""Test cases for anime API endpoints."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
if not anime_bp:
pytest.skip("Module not available")
app = Flask(__name__)
app.config['TESTING'] = True
app.register_blueprint(anime_bp, url_prefix='/api/v1')
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return app.test_client()
@pytest.fixture
def mock_session(self):
"""Mock session for authentication."""
with patch('src.server.web.controllers.shared.auth_decorators.session') as mock_session:
mock_session.get.return_value = {'user_id': 1, 'username': 'testuser'}
yield mock_session
def setup_method(self):
"""Reset mocks before each test."""
mock_anime_manager.reset_mock()
mock_download_manager.reset_mock()
def test_list_anime_success(self, client, mock_session):
"""Test GET /anime - list anime with pagination."""
if not anime_bp:
pytest.skip("Module not available")
# Mock anime data
mock_anime_list = [
{'id': 1, 'name': 'Anime 1', 'url': 'https://example.com/1'},
{'id': 2, 'name': 'Anime 2', 'url': 'https://example.com/2'}
]
mock_anime_manager.get_all_anime.return_value = mock_anime_list
mock_anime_manager.get_anime_count.return_value = 2
response = client.get('/api/v1/anime?page=1&per_page=10')
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert 'pagination' in data
assert len(data['data']) == 2
# Verify manager was called with correct parameters
mock_anime_manager.get_all_anime.assert_called_once_with(
offset=0, limit=10, search=None, status=None, sort_by='name', sort_order='asc'
)
def test_list_anime_with_search(self, client, mock_session):
"""Test GET /anime with search parameter."""
if not anime_bp:
pytest.skip("Module not available")
mock_anime_manager.get_all_anime.return_value = []
mock_anime_manager.get_anime_count.return_value = 0
response = client.get('/api/v1/anime?search=naruto&status=completed')
assert response.status_code == 200
mock_anime_manager.get_all_anime.assert_called_once_with(
offset=0, limit=20, search='naruto', status='completed', sort_by='name', sort_order='asc'
)
def test_get_anime_by_id_success(self, client, mock_session):
"""Test GET /anime/<id> - get specific anime."""
if not anime_bp:
pytest.skip("Module not available")
mock_anime = {
'id': 1,
'name': 'Test Anime',
'url': 'https://example.com/1',
'description': 'A test anime'
}
mock_anime_manager.get_anime_by_id.return_value = mock_anime
response = client.get('/api/v1/anime/1')
assert response.status_code == 200
data = json.loads(response.data)
assert data['data']['id'] == 1
assert data['data']['name'] == 'Test Anime'
mock_anime_manager.get_anime_by_id.assert_called_once_with(1)
def test_get_anime_by_id_not_found(self, client, mock_session):
"""Test GET /anime/<id> - anime not found."""
if not anime_bp:
pytest.skip("Module not available")
mock_anime_manager.get_anime_by_id.return_value = None
response = client.get('/api/v1/anime/999')
assert response.status_code == 404
data = json.loads(response.data)
assert 'error' in data
assert 'not found' in data['error'].lower()
def test_create_anime_success(self, client, mock_session):
"""Test POST /anime - create new anime."""
if not anime_bp:
pytest.skip("Module not available")
anime_data = {
'name': 'New Anime',
'url': 'https://example.com/new-anime',
'description': 'A new anime',
'episodes': 12,
'status': 'ongoing'
}
mock_anime_manager.create_anime.return_value = 1
mock_anime_manager.get_anime_by_id.return_value = {
'id': 1,
**anime_data
}
response = client.post('/api/v1/anime',
json=anime_data,
content_type='application/json')
assert response.status_code == 201
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['id'] == 1
assert data['data']['name'] == 'New Anime'
mock_anime_manager.create_anime.assert_called_once()
def test_create_anime_validation_error(self, client, mock_session):
"""Test POST /anime - validation error."""
if not anime_bp:
pytest.skip("Module not available")
# Missing required fields
anime_data = {
'description': 'A new anime'
}
response = client.post('/api/v1/anime',
json=anime_data,
content_type='application/json')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_create_anime_duplicate(self, client, mock_session):
"""Test POST /anime - duplicate anime."""
if not anime_bp:
pytest.skip("Module not available")
anime_data = {
'name': 'Existing Anime',
'url': 'https://example.com/existing'
}
# Simulate duplicate error
mock_anime_manager.create_anime.side_effect = Exception("Duplicate entry")
response = client.post('/api/v1/anime',
json=anime_data,
content_type='application/json')
assert response.status_code == 500
data = json.loads(response.data)
assert 'error' in data
def test_update_anime_success(self, client, mock_session):
"""Test PUT /anime/<id> - update anime."""
if not anime_bp:
pytest.skip("Module not available")
update_data = {
'name': 'Updated Anime',
'description': 'Updated description',
'status': 'completed'
}
mock_anime_manager.get_anime_by_id.return_value = {
'id': 1,
'name': 'Original Anime'
}
mock_anime_manager.update_anime.return_value = True
response = client.put('/api/v1/anime/1',
json=update_data,
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
mock_anime_manager.update_anime.assert_called_once_with(1, update_data)
def test_update_anime_not_found(self, client, mock_session):
"""Test PUT /anime/<id> - anime not found."""
if not anime_bp:
pytest.skip("Module not available")
mock_anime_manager.get_anime_by_id.return_value = None
response = client.put('/api/v1/anime/999',
json={'name': 'Updated'},
content_type='application/json')
assert response.status_code == 404
data = json.loads(response.data)
assert 'error' in data
def test_delete_anime_success(self, client, mock_session):
"""Test DELETE /anime/<id> - delete anime."""
if not anime_bp:
pytest.skip("Module not available")
mock_anime_manager.get_anime_by_id.return_value = {
'id': 1,
'name': 'Test Anime'
}
mock_anime_manager.delete_anime.return_value = True
response = client.delete('/api/v1/anime/1')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
mock_anime_manager.delete_anime.assert_called_once_with(1)
def test_delete_anime_not_found(self, client, mock_session):
"""Test DELETE /anime/<id> - anime not found."""
if not anime_bp:
pytest.skip("Module not available")
mock_anime_manager.get_anime_by_id.return_value = None
response = client.delete('/api/v1/anime/999')
assert response.status_code == 404
data = json.loads(response.data)
assert 'error' in data
def test_bulk_create_anime_success(self, client, mock_session):
"""Test POST /anime/bulk - bulk create anime."""
if not anime_bp:
pytest.skip("Module not available")
bulk_data = {
'anime_list': [
{'name': 'Anime 1', 'url': 'https://example.com/1'},
{'name': 'Anime 2', 'url': 'https://example.com/2'}
]
}
mock_anime_manager.bulk_create_anime.return_value = {
'created': 2,
'failed': 0,
'created_ids': [1, 2]
}
response = client.post('/api/v1/anime/bulk',
json=bulk_data,
content_type='application/json')
assert response.status_code == 201
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['created'] == 2
assert data['data']['failed'] == 0
def test_bulk_update_anime_success(self, client, mock_session):
"""Test PUT /anime/bulk - bulk update anime."""
if not anime_bp:
pytest.skip("Module not available")
bulk_data = {
'updates': [
{'id': 1, 'status': 'completed'},
{'id': 2, 'status': 'completed'}
]
}
mock_anime_manager.bulk_update_anime.return_value = {
'updated': 2,
'failed': 0
}
response = client.put('/api/v1/anime/bulk',
json=bulk_data,
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['updated'] == 2
def test_bulk_delete_anime_success(self, client, mock_session):
"""Test DELETE /anime/bulk - bulk delete anime."""
if not anime_bp:
pytest.skip("Module not available")
bulk_data = {
'anime_ids': [1, 2, 3]
}
mock_anime_manager.bulk_delete_anime.return_value = {
'deleted': 3,
'failed': 0
}
response = client.delete('/api/v1/anime/bulk',
json=bulk_data,
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['deleted'] == 3
def test_get_anime_episodes_success(self, client, mock_session):
"""Test GET /anime/<id>/episodes - get anime episodes."""
if not anime_bp:
pytest.skip("Module not available")
mock_anime_manager.get_anime_by_id.return_value = {
'id': 1,
'name': 'Test Anime'
}
mock_episodes = [
{'id': 1, 'number': 1, 'title': 'Episode 1'},
{'id': 2, 'number': 2, 'title': 'Episode 2'}
]
mock_anime_manager.get_anime_episodes.return_value = mock_episodes
response = client.get('/api/v1/anime/1/episodes')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert len(data['data']) == 2
assert data['data'][0]['number'] == 1
def test_get_anime_stats_success(self, client, mock_session):
"""Test GET /anime/<id>/stats - get anime statistics."""
if not anime_bp:
pytest.skip("Module not available")
mock_anime_manager.get_anime_by_id.return_value = {
'id': 1,
'name': 'Test Anime'
}
mock_stats = {
'total_episodes': 12,
'downloaded_episodes': 8,
'download_progress': 66.7,
'total_size': 1073741824, # 1GB
'downloaded_size': 715827882 # ~680MB
}
mock_anime_manager.get_anime_stats.return_value = mock_stats
response = client.get('/api/v1/anime/1/stats')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['total_episodes'] == 12
assert data['data']['downloaded_episodes'] == 8
def test_search_anime_success(self, client, mock_session):
"""Test GET /anime/search - search anime."""
if not anime_bp:
pytest.skip("Module not available")
mock_results = [
{'id': 1, 'name': 'Naruto', 'url': 'https://example.com/naruto'},
{'id': 2, 'name': 'Naruto Shippuden', 'url': 'https://example.com/naruto-shippuden'}
]
mock_anime_manager.search_anime.return_value = mock_results
response = client.get('/api/v1/anime/search?q=naruto')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert len(data['data']) == 2
assert 'naruto' in data['data'][0]['name'].lower()
mock_anime_manager.search_anime.assert_called_once_with('naruto', limit=20)
def test_search_anime_no_query(self, client, mock_session):
"""Test GET /anime/search - missing search query."""
if not anime_bp:
pytest.skip("Module not available")
response = client.get('/api/v1/anime/search')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert 'query parameter' in data['error'].lower()
class TestAnimeAuthentication:
"""Test cases for anime endpoints authentication."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
if not anime_bp:
pytest.skip("Module not available")
app = Flask(__name__)
app.config['TESTING'] = True
app.register_blueprint(anime_bp, url_prefix='/api/v1')
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return app.test_client()
def test_unauthenticated_read_access(self, client):
"""Test that read operations work without authentication."""
if not anime_bp:
pytest.skip("Module not available")
with patch('src.server.web.controllers.shared.auth_decorators.session') as mock_session:
mock_session.get.return_value = None # No authentication
mock_anime_manager.get_all_anime.return_value = []
mock_anime_manager.get_anime_count.return_value = 0
response = client.get('/api/v1/anime')
# Should still work for read operations
assert response.status_code == 200
def test_authenticated_write_access(self, client):
"""Test that write operations require authentication."""
if not anime_bp:
pytest.skip("Module not available")
with patch('src.server.web.controllers.shared.auth_decorators.session') as mock_session:
mock_session.get.return_value = None # No authentication
response = client.post('/api/v1/anime',
json={'name': 'Test'},
content_type='application/json')
# Should require authentication for write operations
assert response.status_code == 401
class TestAnimeErrorHandling:
"""Test cases for anime endpoints error handling."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
if not anime_bp:
pytest.skip("Module not available")
app = Flask(__name__)
app.config['TESTING'] = True
app.register_blueprint(anime_bp, url_prefix='/api/v1')
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return app.test_client()
@pytest.fixture
def mock_session(self):
"""Mock session for authentication."""
with patch('src.server.web.controllers.shared.auth_decorators.session') as mock_session:
mock_session.get.return_value = {'user_id': 1, 'username': 'testuser'}
yield mock_session
def test_database_error_handling(self, client, mock_session):
"""Test handling of database errors."""
if not anime_bp:
pytest.skip("Module not available")
# Simulate database error
mock_anime_manager.get_all_anime.side_effect = Exception("Database connection failed")
response = client.get('/api/v1/anime')
assert response.status_code == 500
data = json.loads(response.data)
assert 'error' in data
def test_invalid_json_handling(self, client, mock_session):
"""Test handling of invalid JSON."""
if not anime_bp:
pytest.skip("Module not available")
response = client.post('/api/v1/anime',
data='invalid json',
content_type='application/json')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_method_not_allowed(self, client):
"""Test method not allowed responses."""
if not anime_bp:
pytest.skip("Module not available")
response = client.patch('/api/v1/anime/1')
assert response.status_code == 405
if __name__ == '__main__':
pytest.main([__file__])

View File

@@ -0,0 +1,717 @@
"""
Test cases for downloads API endpoints.
"""
import pytest
from unittest.mock import Mock, patch, MagicMock
from flask import Flask
import json
# Mock the database managers first
mock_download_manager = Mock()
mock_episode_manager = Mock()
mock_anime_manager = Mock()
# Import the modules to test
try:
with patch.dict('sys.modules', {
'src.server.data.download_manager': Mock(DownloadManager=Mock(return_value=mock_download_manager)),
'src.server.data.episode_manager': Mock(EpisodeManager=Mock(return_value=mock_episode_manager)),
'src.server.data.anime_manager': Mock(AnimeManager=Mock(return_value=mock_anime_manager))
}):
from src.server.web.controllers.api.v1.downloads import downloads_bp
except ImportError:
downloads_bp = None
class TestDownloadEndpoints:
"""Test cases for download API endpoints."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
if not downloads_bp:
pytest.skip("Module not available")
app = Flask(__name__)
app.config['TESTING'] = True
app.register_blueprint(downloads_bp, url_prefix='/api/v1')
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return app.test_client()
@pytest.fixture
def mock_session(self):
"""Mock session for authentication."""
with patch('src.server.web.controllers.shared.auth_decorators.session') as mock_session:
mock_session.get.return_value = {'user_id': 1, 'username': 'testuser'}
yield mock_session
def setup_method(self):
"""Reset mocks before each test."""
mock_download_manager.reset_mock()
mock_episode_manager.reset_mock()
mock_anime_manager.reset_mock()
def test_list_downloads_success(self, client, mock_session):
"""Test GET /downloads - list downloads with pagination."""
if not downloads_bp:
pytest.skip("Module not available")
mock_downloads = [
{
'id': 1,
'anime_id': 1,
'episode_id': 1,
'status': 'downloading',
'progress': 45.5,
'size': 1073741824, # 1GB
'downloaded_size': 488447385, # ~465MB
'speed': 1048576, # 1MB/s
'eta': 600, # 10 minutes
'created_at': '2023-01-01 12:00:00'
},
{
'id': 2,
'anime_id': 1,
'episode_id': 2,
'status': 'completed',
'progress': 100.0,
'size': 1073741824,
'downloaded_size': 1073741824,
'created_at': '2023-01-01 11:00:00',
'completed_at': '2023-01-01 11:30:00'
}
]
mock_download_manager.get_all_downloads.return_value = mock_downloads
mock_download_manager.get_downloads_count.return_value = 2
response = client.get('/api/v1/downloads?page=1&per_page=10')
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert 'pagination' in data
assert len(data['data']) == 2
assert data['data'][0]['status'] == 'downloading'
mock_download_manager.get_all_downloads.assert_called_once_with(
offset=0, limit=10, status=None, anime_id=None, sort_by='created_at', sort_order='desc'
)
def test_list_downloads_with_filters(self, client, mock_session):
"""Test GET /downloads with filters."""
if not downloads_bp:
pytest.skip("Module not available")
mock_download_manager.get_all_downloads.return_value = []
mock_download_manager.get_downloads_count.return_value = 0
response = client.get('/api/v1/downloads?status=completed&anime_id=5&sort_by=progress')
assert response.status_code == 200
mock_download_manager.get_all_downloads.assert_called_once_with(
offset=0, limit=20, status='completed', anime_id=5, sort_by='progress', sort_order='desc'
)
def test_get_download_by_id_success(self, client, mock_session):
"""Test GET /downloads/<id> - get specific download."""
if not downloads_bp:
pytest.skip("Module not available")
mock_download = {
'id': 1,
'anime_id': 1,
'episode_id': 1,
'status': 'downloading',
'progress': 75.0,
'size': 1073741824,
'downloaded_size': 805306368,
'speed': 2097152, # 2MB/s
'eta': 150, # 2.5 minutes
'file_path': '/downloads/anime1/episode1.mp4',
'created_at': '2023-01-01 12:00:00',
'started_at': '2023-01-01 12:05:00'
}
mock_download_manager.get_download_by_id.return_value = mock_download
response = client.get('/api/v1/downloads/1')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['id'] == 1
assert data['data']['progress'] == 75.0
assert data['data']['status'] == 'downloading'
mock_download_manager.get_download_by_id.assert_called_once_with(1)
def test_get_download_by_id_not_found(self, client, mock_session):
"""Test GET /downloads/<id> - download not found."""
if not downloads_bp:
pytest.skip("Module not available")
mock_download_manager.get_download_by_id.return_value = None
response = client.get('/api/v1/downloads/999')
assert response.status_code == 404
data = json.loads(response.data)
assert 'error' in data
assert 'not found' in data['error'].lower()
def test_create_download_success(self, client, mock_session):
"""Test POST /downloads - create new download."""
if not downloads_bp:
pytest.skip("Module not available")
download_data = {
'episode_id': 1,
'quality': '1080p',
'priority': 'normal'
}
# Mock episode exists
mock_episode_manager.get_episode_by_id.return_value = {
'id': 1,
'anime_id': 1,
'title': 'Episode 1',
'url': 'https://example.com/episode/1'
}
mock_download_manager.create_download.return_value = 1
mock_download_manager.get_download_by_id.return_value = {
'id': 1,
'episode_id': 1,
'status': 'queued',
'progress': 0.0
}
response = client.post('/api/v1/downloads',
json=download_data,
content_type='application/json')
assert response.status_code == 201
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['id'] == 1
assert data['data']['status'] == 'queued'
mock_download_manager.create_download.assert_called_once()
def test_create_download_invalid_episode(self, client, mock_session):
"""Test POST /downloads - invalid episode_id."""
if not downloads_bp:
pytest.skip("Module not available")
download_data = {
'episode_id': 999,
'quality': '1080p'
}
# Mock episode doesn't exist
mock_episode_manager.get_episode_by_id.return_value = None
response = client.post('/api/v1/downloads',
json=download_data,
content_type='application/json')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert 'episode' in data['error'].lower()
def test_create_download_validation_error(self, client, mock_session):
"""Test POST /downloads - validation error."""
if not downloads_bp:
pytest.skip("Module not available")
# Missing required fields
download_data = {
'quality': '1080p'
}
response = client.post('/api/v1/downloads',
json=download_data,
content_type='application/json')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_pause_download_success(self, client, mock_session):
"""Test PUT /downloads/<id>/pause - pause download."""
if not downloads_bp:
pytest.skip("Module not available")
mock_download_manager.get_download_by_id.return_value = {
'id': 1,
'status': 'downloading'
}
mock_download_manager.pause_download.return_value = True
response = client.put('/api/v1/downloads/1/pause')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert 'paused' in data['message'].lower()
mock_download_manager.pause_download.assert_called_once_with(1)
def test_pause_download_not_found(self, client, mock_session):
"""Test PUT /downloads/<id>/pause - download not found."""
if not downloads_bp:
pytest.skip("Module not available")
mock_download_manager.get_download_by_id.return_value = None
response = client.put('/api/v1/downloads/999/pause')
assert response.status_code == 404
data = json.loads(response.data)
assert 'error' in data
def test_resume_download_success(self, client, mock_session):
"""Test PUT /downloads/<id>/resume - resume download."""
if not downloads_bp:
pytest.skip("Module not available")
mock_download_manager.get_download_by_id.return_value = {
'id': 1,
'status': 'paused'
}
mock_download_manager.resume_download.return_value = True
response = client.put('/api/v1/downloads/1/resume')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert 'resumed' in data['message'].lower()
mock_download_manager.resume_download.assert_called_once_with(1)
def test_cancel_download_success(self, client, mock_session):
"""Test DELETE /downloads/<id> - cancel download."""
if not downloads_bp:
pytest.skip("Module not available")
mock_download_manager.get_download_by_id.return_value = {
'id': 1,
'status': 'downloading'
}
mock_download_manager.cancel_download.return_value = True
response = client.delete('/api/v1/downloads/1')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert 'cancelled' in data['message'].lower()
mock_download_manager.cancel_download.assert_called_once_with(1)
def test_get_download_queue_success(self, client, mock_session):
"""Test GET /downloads/queue - get download queue."""
if not downloads_bp:
pytest.skip("Module not available")
mock_queue = [
{
'id': 1,
'episode_id': 1,
'status': 'downloading',
'progress': 25.0,
'position': 1
},
{
'id': 2,
'episode_id': 2,
'status': 'queued',
'progress': 0.0,
'position': 2
}
]
mock_download_manager.get_download_queue.return_value = mock_queue
response = client.get('/api/v1/downloads/queue')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert len(data['data']) == 2
assert data['data'][0]['status'] == 'downloading'
assert data['data'][1]['status'] == 'queued'
def test_reorder_download_queue_success(self, client, mock_session):
"""Test PUT /downloads/queue/reorder - reorder download queue."""
if not downloads_bp:
pytest.skip("Module not available")
reorder_data = {
'download_ids': [3, 1, 2] # New order
}
mock_download_manager.reorder_download_queue.return_value = True
response = client.put('/api/v1/downloads/queue/reorder',
json=reorder_data,
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
mock_download_manager.reorder_download_queue.assert_called_once_with([3, 1, 2])
def test_clear_download_queue_success(self, client, mock_session):
"""Test DELETE /downloads/queue - clear download queue."""
if not downloads_bp:
pytest.skip("Module not available")
mock_download_manager.clear_download_queue.return_value = {
'cleared': 5,
'failed': 0
}
response = client.delete('/api/v1/downloads/queue')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['cleared'] == 5
mock_download_manager.clear_download_queue.assert_called_once()
def test_get_download_history_success(self, client, mock_session):
"""Test GET /downloads/history - get download history."""
if not downloads_bp:
pytest.skip("Module not available")
mock_history = [
{
'id': 1,
'episode_id': 1,
'status': 'completed',
'completed_at': '2023-01-01 12:30:00',
'file_size': 1073741824
},
{
'id': 2,
'episode_id': 2,
'status': 'failed',
'failed_at': '2023-01-01 11:45:00',
'error_message': 'Network timeout'
}
]
mock_download_manager.get_download_history.return_value = mock_history
mock_download_manager.get_history_count.return_value = 2
response = client.get('/api/v1/downloads/history?page=1&per_page=10')
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert 'pagination' in data
assert len(data['data']) == 2
assert data['data'][0]['status'] == 'completed'
def test_bulk_create_downloads_success(self, client, mock_session):
"""Test POST /downloads/bulk - bulk create downloads."""
if not downloads_bp:
pytest.skip("Module not available")
bulk_data = {
'downloads': [
{'episode_id': 1, 'quality': '1080p'},
{'episode_id': 2, 'quality': '720p'},
{'episode_id': 3, 'quality': '1080p'}
]
}
mock_download_manager.bulk_create_downloads.return_value = {
'created': 3,
'failed': 0,
'created_ids': [1, 2, 3]
}
response = client.post('/api/v1/downloads/bulk',
json=bulk_data,
content_type='application/json')
assert response.status_code == 201
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['created'] == 3
assert data['data']['failed'] == 0
def test_bulk_pause_downloads_success(self, client, mock_session):
"""Test PUT /downloads/bulk/pause - bulk pause downloads."""
if not downloads_bp:
pytest.skip("Module not available")
bulk_data = {
'download_ids': [1, 2, 3]
}
mock_download_manager.bulk_pause_downloads.return_value = {
'paused': 3,
'failed': 0
}
response = client.put('/api/v1/downloads/bulk/pause',
json=bulk_data,
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['paused'] == 3
def test_bulk_resume_downloads_success(self, client, mock_session):
"""Test PUT /downloads/bulk/resume - bulk resume downloads."""
if not downloads_bp:
pytest.skip("Module not available")
bulk_data = {
'download_ids': [1, 2, 3]
}
mock_download_manager.bulk_resume_downloads.return_value = {
'resumed': 3,
'failed': 0
}
response = client.put('/api/v1/downloads/bulk/resume',
json=bulk_data,
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['resumed'] == 3
def test_bulk_cancel_downloads_success(self, client, mock_session):
"""Test DELETE /downloads/bulk - bulk cancel downloads."""
if not downloads_bp:
pytest.skip("Module not available")
bulk_data = {
'download_ids': [1, 2, 3]
}
mock_download_manager.bulk_cancel_downloads.return_value = {
'cancelled': 3,
'failed': 0
}
response = client.delete('/api/v1/downloads/bulk',
json=bulk_data,
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['cancelled'] == 3
def test_get_download_stats_success(self, client, mock_session):
"""Test GET /downloads/stats - get download statistics."""
if not downloads_bp:
pytest.skip("Module not available")
mock_stats = {
'total_downloads': 150,
'completed_downloads': 125,
'active_downloads': 3,
'failed_downloads': 22,
'total_size_downloaded': 107374182400, # 100GB
'average_speed': 2097152, # 2MB/s
'queue_size': 5
}
mock_download_manager.get_download_stats.return_value = mock_stats
response = client.get('/api/v1/downloads/stats')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['total_downloads'] == 150
assert data['data']['completed_downloads'] == 125
assert data['data']['active_downloads'] == 3
def test_retry_failed_download_success(self, client, mock_session):
"""Test PUT /downloads/<id>/retry - retry failed download."""
if not downloads_bp:
pytest.skip("Module not available")
mock_download_manager.get_download_by_id.return_value = {
'id': 1,
'status': 'failed'
}
mock_download_manager.retry_download.return_value = True
response = client.put('/api/v1/downloads/1/retry')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert 'retrying' in data['message'].lower()
mock_download_manager.retry_download.assert_called_once_with(1)
def test_retry_download_invalid_status(self, client, mock_session):
"""Test PUT /downloads/<id>/retry - retry download with invalid status."""
if not downloads_bp:
pytest.skip("Module not available")
mock_download_manager.get_download_by_id.return_value = {
'id': 1,
'status': 'completed' # Can't retry completed downloads
}
response = client.put('/api/v1/downloads/1/retry')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert 'cannot be retried' in data['error'].lower()
class TestDownloadAuthentication:
"""Test cases for download endpoints authentication."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
if not downloads_bp:
pytest.skip("Module not available")
app = Flask(__name__)
app.config['TESTING'] = True
app.register_blueprint(downloads_bp, url_prefix='/api/v1')
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return app.test_client()
def test_unauthenticated_read_access(self, client):
"""Test that read operations work without authentication."""
if not downloads_bp:
pytest.skip("Module not available")
with patch('src.server.web.controllers.shared.auth_decorators.session') as mock_session:
mock_session.get.return_value = None # No authentication
mock_download_manager.get_all_downloads.return_value = []
mock_download_manager.get_downloads_count.return_value = 0
response = client.get('/api/v1/downloads')
# Should work for read operations
assert response.status_code == 200
def test_authenticated_write_access(self, client):
"""Test that write operations require authentication."""
if not downloads_bp:
pytest.skip("Module not available")
with patch('src.server.web.controllers.shared.auth_decorators.session') as mock_session:
mock_session.get.return_value = None # No authentication
response = client.post('/api/v1/downloads',
json={'episode_id': 1},
content_type='application/json')
# Should require authentication for write operations
assert response.status_code == 401
class TestDownloadErrorHandling:
"""Test cases for download endpoints error handling."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
if not downloads_bp:
pytest.skip("Module not available")
app = Flask(__name__)
app.config['TESTING'] = True
app.register_blueprint(downloads_bp, url_prefix='/api/v1')
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return app.test_client()
@pytest.fixture
def mock_session(self):
"""Mock session for authentication."""
with patch('src.server.web.controllers.shared.auth_decorators.session') as mock_session:
mock_session.get.return_value = {'user_id': 1, 'username': 'testuser'}
yield mock_session
def test_database_error_handling(self, client, mock_session):
"""Test handling of database errors."""
if not downloads_bp:
pytest.skip("Module not available")
# Simulate database error
mock_download_manager.get_all_downloads.side_effect = Exception("Database connection failed")
response = client.get('/api/v1/downloads')
assert response.status_code == 500
data = json.loads(response.data)
assert 'error' in data
def test_download_system_error(self, client, mock_session):
"""Test handling of download system errors."""
if not downloads_bp:
pytest.skip("Module not available")
mock_download_manager.get_download_by_id.return_value = {
'id': 1,
'status': 'downloading'
}
# Simulate download system error
mock_download_manager.pause_download.side_effect = Exception("Download system unavailable")
response = client.put('/api/v1/downloads/1/pause')
assert response.status_code == 500
data = json.loads(response.data)
assert 'error' in data
def test_invalid_download_status_transition(self, client, mock_session):
"""Test handling of invalid status transitions."""
if not downloads_bp:
pytest.skip("Module not available")
mock_download_manager.get_download_by_id.return_value = {
'id': 1,
'status': 'completed'
}
# Try to pause a completed download
response = client.put('/api/v1/downloads/1/pause')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert 'cannot be paused' in data['error'].lower()
if __name__ == '__main__':
pytest.main([__file__])

View File

@@ -0,0 +1,679 @@
"""
Test cases for episodes API endpoints.
"""
import pytest
from unittest.mock import Mock, patch, MagicMock
from flask import Flask
import json
# Mock the database managers first
mock_episode_manager = Mock()
mock_anime_manager = Mock()
mock_download_manager = Mock()
# Import the modules to test
try:
with patch.dict('sys.modules', {
'src.server.data.episode_manager': Mock(EpisodeManager=Mock(return_value=mock_episode_manager)),
'src.server.data.anime_manager': Mock(AnimeManager=Mock(return_value=mock_anime_manager)),
'src.server.data.download_manager': Mock(DownloadManager=Mock(return_value=mock_download_manager))
}):
from src.server.web.controllers.api.v1.episodes import episodes_bp
except ImportError:
episodes_bp = None
class TestEpisodeEndpoints:
"""Test cases for episode API endpoints."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
if not episodes_bp:
pytest.skip("Module not available")
app = Flask(__name__)
app.config['TESTING'] = True
app.register_blueprint(episodes_bp, url_prefix='/api/v1')
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return app.test_client()
@pytest.fixture
def mock_session(self):
"""Mock session for authentication."""
with patch('src.server.web.controllers.shared.auth_decorators.session') as mock_session:
mock_session.get.return_value = {'user_id': 1, 'username': 'testuser'}
yield mock_session
def setup_method(self):
"""Reset mocks before each test."""
mock_episode_manager.reset_mock()
mock_anime_manager.reset_mock()
mock_download_manager.reset_mock()
def test_list_episodes_success(self, client, mock_session):
"""Test GET /episodes - list episodes with pagination."""
if not episodes_bp:
pytest.skip("Module not available")
mock_episodes = [
{
'id': 1,
'anime_id': 1,
'number': 1,
'title': 'Episode 1',
'url': 'https://example.com/episode/1',
'status': 'available'
},
{
'id': 2,
'anime_id': 1,
'number': 2,
'title': 'Episode 2',
'url': 'https://example.com/episode/2',
'status': 'available'
}
]
mock_episode_manager.get_all_episodes.return_value = mock_episodes
mock_episode_manager.get_episodes_count.return_value = 2
response = client.get('/api/v1/episodes?page=1&per_page=10')
assert response.status_code == 200
data = json.loads(response.data)
assert 'data' in data
assert 'pagination' in data
assert len(data['data']) == 2
assert data['data'][0]['number'] == 1
mock_episode_manager.get_all_episodes.assert_called_once_with(
offset=0, limit=10, anime_id=None, status=None, sort_by='number', sort_order='asc'
)
def test_list_episodes_with_filters(self, client, mock_session):
"""Test GET /episodes with filters."""
if not episodes_bp:
pytest.skip("Module not available")
mock_episode_manager.get_all_episodes.return_value = []
mock_episode_manager.get_episodes_count.return_value = 0
response = client.get('/api/v1/episodes?anime_id=1&status=downloaded&sort_by=title&sort_order=desc')
assert response.status_code == 200
mock_episode_manager.get_all_episodes.assert_called_once_with(
offset=0, limit=20, anime_id=1, status='downloaded', sort_by='title', sort_order='desc'
)
def test_get_episode_by_id_success(self, client, mock_session):
"""Test GET /episodes/<id> - get specific episode."""
if not episodes_bp:
pytest.skip("Module not available")
mock_episode = {
'id': 1,
'anime_id': 1,
'number': 1,
'title': 'First Episode',
'url': 'https://example.com/episode/1',
'status': 'available',
'duration': 1440,
'description': 'The first episode'
}
mock_episode_manager.get_episode_by_id.return_value = mock_episode
response = client.get('/api/v1/episodes/1')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['id'] == 1
assert data['data']['title'] == 'First Episode'
mock_episode_manager.get_episode_by_id.assert_called_once_with(1)
def test_get_episode_by_id_not_found(self, client, mock_session):
"""Test GET /episodes/<id> - episode not found."""
if not episodes_bp:
pytest.skip("Module not available")
mock_episode_manager.get_episode_by_id.return_value = None
response = client.get('/api/v1/episodes/999')
assert response.status_code == 404
data = json.loads(response.data)
assert 'error' in data
assert 'not found' in data['error'].lower()
def test_create_episode_success(self, client, mock_session):
"""Test POST /episodes - create new episode."""
if not episodes_bp:
pytest.skip("Module not available")
episode_data = {
'anime_id': 1,
'number': 1,
'title': 'New Episode',
'url': 'https://example.com/new-episode',
'duration': 1440,
'description': 'A new episode'
}
# Mock anime exists
mock_anime_manager.get_anime_by_id.return_value = {'id': 1, 'name': 'Test Anime'}
mock_episode_manager.create_episode.return_value = 1
mock_episode_manager.get_episode_by_id.return_value = {
'id': 1,
**episode_data
}
response = client.post('/api/v1/episodes',
json=episode_data,
content_type='application/json')
assert response.status_code == 201
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['id'] == 1
assert data['data']['title'] == 'New Episode'
mock_episode_manager.create_episode.assert_called_once()
def test_create_episode_invalid_anime(self, client, mock_session):
"""Test POST /episodes - invalid anime_id."""
if not episodes_bp:
pytest.skip("Module not available")
episode_data = {
'anime_id': 999,
'number': 1,
'title': 'New Episode',
'url': 'https://example.com/new-episode'
}
# Mock anime doesn't exist
mock_anime_manager.get_anime_by_id.return_value = None
response = client.post('/api/v1/episodes',
json=episode_data,
content_type='application/json')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert 'anime' in data['error'].lower()
def test_create_episode_validation_error(self, client, mock_session):
"""Test POST /episodes - validation error."""
if not episodes_bp:
pytest.skip("Module not available")
# Missing required fields
episode_data = {
'title': 'New Episode'
}
response = client.post('/api/v1/episodes',
json=episode_data,
content_type='application/json')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_update_episode_success(self, client, mock_session):
"""Test PUT /episodes/<id> - update episode."""
if not episodes_bp:
pytest.skip("Module not available")
update_data = {
'title': 'Updated Episode',
'description': 'Updated description',
'status': 'downloaded'
}
mock_episode_manager.get_episode_by_id.return_value = {
'id': 1,
'title': 'Original Episode'
}
mock_episode_manager.update_episode.return_value = True
response = client.put('/api/v1/episodes/1',
json=update_data,
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
mock_episode_manager.update_episode.assert_called_once_with(1, update_data)
def test_update_episode_not_found(self, client, mock_session):
"""Test PUT /episodes/<id> - episode not found."""
if not episodes_bp:
pytest.skip("Module not available")
mock_episode_manager.get_episode_by_id.return_value = None
response = client.put('/api/v1/episodes/999',
json={'title': 'Updated'},
content_type='application/json')
assert response.status_code == 404
data = json.loads(response.data)
assert 'error' in data
def test_delete_episode_success(self, client, mock_session):
"""Test DELETE /episodes/<id> - delete episode."""
if not episodes_bp:
pytest.skip("Module not available")
mock_episode_manager.get_episode_by_id.return_value = {
'id': 1,
'title': 'Test Episode'
}
mock_episode_manager.delete_episode.return_value = True
response = client.delete('/api/v1/episodes/1')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
mock_episode_manager.delete_episode.assert_called_once_with(1)
def test_delete_episode_not_found(self, client, mock_session):
"""Test DELETE /episodes/<id> - episode not found."""
if not episodes_bp:
pytest.skip("Module not available")
mock_episode_manager.get_episode_by_id.return_value = None
response = client.delete('/api/v1/episodes/999')
assert response.status_code == 404
data = json.loads(response.data)
assert 'error' in data
def test_bulk_create_episodes_success(self, client, mock_session):
"""Test POST /episodes/bulk - bulk create episodes."""
if not episodes_bp:
pytest.skip("Module not available")
bulk_data = {
'episodes': [
{'anime_id': 1, 'number': 1, 'title': 'Episode 1', 'url': 'https://example.com/1'},
{'anime_id': 1, 'number': 2, 'title': 'Episode 2', 'url': 'https://example.com/2'}
]
}
mock_episode_manager.bulk_create_episodes.return_value = {
'created': 2,
'failed': 0,
'created_ids': [1, 2]
}
response = client.post('/api/v1/episodes/bulk',
json=bulk_data,
content_type='application/json')
assert response.status_code == 201
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['created'] == 2
assert data['data']['failed'] == 0
def test_bulk_update_status_success(self, client, mock_session):
"""Test PUT /episodes/bulk/status - bulk update episode status."""
if not episodes_bp:
pytest.skip("Module not available")
bulk_data = {
'episode_ids': [1, 2, 3],
'status': 'downloaded'
}
mock_episode_manager.bulk_update_status.return_value = {
'updated': 3,
'failed': 0
}
response = client.put('/api/v1/episodes/bulk/status',
json=bulk_data,
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['updated'] == 3
def test_bulk_delete_episodes_success(self, client, mock_session):
"""Test DELETE /episodes/bulk - bulk delete episodes."""
if not episodes_bp:
pytest.skip("Module not available")
bulk_data = {
'episode_ids': [1, 2, 3]
}
mock_episode_manager.bulk_delete_episodes.return_value = {
'deleted': 3,
'failed': 0
}
response = client.delete('/api/v1/episodes/bulk',
json=bulk_data,
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['deleted'] == 3
def test_sync_episodes_success(self, client, mock_session):
"""Test POST /episodes/sync - sync episodes for anime."""
if not episodes_bp:
pytest.skip("Module not available")
sync_data = {
'anime_id': 1
}
# Mock anime exists
mock_anime_manager.get_anime_by_id.return_value = {'id': 1, 'name': 'Test Anime'}
mock_episode_manager.sync_episodes.return_value = {
'anime_id': 1,
'episodes_found': 12,
'episodes_added': 5,
'episodes_updated': 2
}
response = client.post('/api/v1/episodes/sync',
json=sync_data,
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['episodes_found'] == 12
assert data['data']['episodes_added'] == 5
mock_episode_manager.sync_episodes.assert_called_once_with(1)
def test_sync_episodes_invalid_anime(self, client, mock_session):
"""Test POST /episodes/sync - invalid anime_id."""
if not episodes_bp:
pytest.skip("Module not available")
sync_data = {
'anime_id': 999
}
# Mock anime doesn't exist
mock_anime_manager.get_anime_by_id.return_value = None
response = client.post('/api/v1/episodes/sync',
json=sync_data,
content_type='application/json')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert 'anime' in data['error'].lower()
def test_get_episode_download_info_success(self, client, mock_session):
"""Test GET /episodes/<id>/download - get episode download info."""
if not episodes_bp:
pytest.skip("Module not available")
mock_episode_manager.get_episode_by_id.return_value = {
'id': 1,
'title': 'Test Episode'
}
mock_download_info = {
'episode_id': 1,
'download_id': 5,
'status': 'downloading',
'progress': 45.5,
'speed': 1048576, # 1MB/s
'eta': 300, # 5 minutes
'file_path': '/downloads/episode1.mp4'
}
mock_download_manager.get_episode_download_info.return_value = mock_download_info
response = client.get('/api/v1/episodes/1/download')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['status'] == 'downloading'
assert data['data']['progress'] == 45.5
def test_get_episode_download_info_not_downloading(self, client, mock_session):
"""Test GET /episodes/<id>/download - episode not downloading."""
if not episodes_bp:
pytest.skip("Module not available")
mock_episode_manager.get_episode_by_id.return_value = {
'id': 1,
'title': 'Test Episode'
}
mock_download_manager.get_episode_download_info.return_value = None
response = client.get('/api/v1/episodes/1/download')
assert response.status_code == 404
data = json.loads(response.data)
assert 'error' in data
assert 'download' in data['error'].lower()
def test_start_episode_download_success(self, client, mock_session):
"""Test POST /episodes/<id>/download - start episode download."""
if not episodes_bp:
pytest.skip("Module not available")
mock_episode_manager.get_episode_by_id.return_value = {
'id': 1,
'title': 'Test Episode',
'url': 'https://example.com/episode/1'
}
mock_download_manager.start_episode_download.return_value = {
'download_id': 5,
'status': 'queued',
'message': 'Download queued successfully'
}
response = client.post('/api/v1/episodes/1/download')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['download_id'] == 5
assert data['data']['status'] == 'queued'
mock_download_manager.start_episode_download.assert_called_once_with(1)
def test_cancel_episode_download_success(self, client, mock_session):
"""Test DELETE /episodes/<id>/download - cancel episode download."""
if not episodes_bp:
pytest.skip("Module not available")
mock_episode_manager.get_episode_by_id.return_value = {
'id': 1,
'title': 'Test Episode'
}
mock_download_manager.cancel_episode_download.return_value = True
response = client.delete('/api/v1/episodes/1/download')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
mock_download_manager.cancel_episode_download.assert_called_once_with(1)
def test_get_episodes_by_anime_success(self, client, mock_session):
"""Test GET /anime/<anime_id>/episodes - get episodes for anime."""
if not episodes_bp:
pytest.skip("Module not available")
mock_anime_manager.get_anime_by_id.return_value = {
'id': 1,
'name': 'Test Anime'
}
mock_episodes = [
{'id': 1, 'number': 1, 'title': 'Episode 1'},
{'id': 2, 'number': 2, 'title': 'Episode 2'}
]
mock_episode_manager.get_episodes_by_anime.return_value = mock_episodes
response = client.get('/api/v1/anime/1/episodes')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert len(data['data']) == 2
assert data['data'][0]['number'] == 1
mock_episode_manager.get_episodes_by_anime.assert_called_once_with(1)
class TestEpisodeAuthentication:
"""Test cases for episode endpoints authentication."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
if not episodes_bp:
pytest.skip("Module not available")
app = Flask(__name__)
app.config['TESTING'] = True
app.register_blueprint(episodes_bp, url_prefix='/api/v1')
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return app.test_client()
def test_unauthenticated_read_access(self, client):
"""Test that read operations work without authentication."""
if not episodes_bp:
pytest.skip("Module not available")
with patch('src.server.web.controllers.shared.auth_decorators.session') as mock_session:
mock_session.get.return_value = None # No authentication
mock_episode_manager.get_all_episodes.return_value = []
mock_episode_manager.get_episodes_count.return_value = 0
response = client.get('/api/v1/episodes')
# Should work for read operations
assert response.status_code == 200
def test_authenticated_write_access(self, client):
"""Test that write operations require authentication."""
if not episodes_bp:
pytest.skip("Module not available")
with patch('src.server.web.controllers.shared.auth_decorators.session') as mock_session:
mock_session.get.return_value = None # No authentication
response = client.post('/api/v1/episodes',
json={'title': 'Test'},
content_type='application/json')
# Should require authentication for write operations
assert response.status_code == 401
class TestEpisodeErrorHandling:
"""Test cases for episode endpoints error handling."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
if not episodes_bp:
pytest.skip("Module not available")
app = Flask(__name__)
app.config['TESTING'] = True
app.register_blueprint(episodes_bp, url_prefix='/api/v1')
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return app.test_client()
@pytest.fixture
def mock_session(self):
"""Mock session for authentication."""
with patch('src.server.web.controllers.shared.auth_decorators.session') as mock_session:
mock_session.get.return_value = {'user_id': 1, 'username': 'testuser'}
yield mock_session
def test_database_error_handling(self, client, mock_session):
"""Test handling of database errors."""
if not episodes_bp:
pytest.skip("Module not available")
# Simulate database error
mock_episode_manager.get_all_episodes.side_effect = Exception("Database connection failed")
response = client.get('/api/v1/episodes')
assert response.status_code == 500
data = json.loads(response.data)
assert 'error' in data
def test_invalid_episode_id_parameter(self, client, mock_session):
"""Test handling of invalid episode ID parameter."""
if not episodes_bp:
pytest.skip("Module not available")
response = client.get('/api/v1/episodes/invalid-id')
assert response.status_code == 404 # Flask will handle this as route not found
def test_concurrent_modification_error(self, client, mock_session):
"""Test handling of concurrent modification errors."""
if not episodes_bp:
pytest.skip("Module not available")
mock_episode_manager.get_episode_by_id.return_value = {
'id': 1,
'title': 'Test Episode'
}
# Simulate concurrent modification
mock_episode_manager.update_episode.side_effect = Exception("Episode was modified by another process")
response = client.put('/api/v1/episodes/1',
json={'title': 'Updated'},
content_type='application/json')
assert response.status_code == 500
data = json.loads(response.data)
assert 'error' in data
if __name__ == '__main__':
pytest.main([__file__])

View File

@@ -0,0 +1,330 @@
"""
Test cases for authentication decorators and utilities.
"""
import pytest
from unittest.mock import Mock, patch, MagicMock
from flask import Flask, request, session, jsonify
import json
# Import the modules to test
try:
from src.server.web.controllers.shared.auth_decorators import (
require_auth, optional_auth, get_current_user, get_client_ip,
is_authenticated, logout_current_user
)
except ImportError:
# Fallback for testing
require_auth = None
optional_auth = None
class TestAuthDecorators:
"""Test cases for authentication decorators."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
app = Flask(__name__)
app.config['TESTING'] = True
app.secret_key = 'test-secret-key'
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return app.test_client()
@pytest.fixture
def mock_session_manager(self):
"""Create a mock session manager."""
with patch('src.server.web.controllers.shared.auth_decorators.session_manager') as mock:
yield mock
def test_require_auth_authenticated_user(self, app, client, mock_session_manager):
"""Test require_auth decorator with authenticated user."""
if not require_auth:
pytest.skip("Module not available")
mock_session_manager.is_authenticated.return_value = True
@app.route('/test')
@require_auth
def test_endpoint():
return jsonify({'message': 'success'})
response = client.get('/test')
assert response.status_code == 200
data = json.loads(response.data)
assert data['message'] == 'success'
def test_require_auth_unauthenticated_api_request(self, app, client, mock_session_manager):
"""Test require_auth decorator with unauthenticated API request."""
if not require_auth:
pytest.skip("Module not available")
mock_session_manager.is_authenticated.return_value = False
@app.route('/api/test')
@require_auth
def test_api_endpoint():
return jsonify({'message': 'success'})
response = client.get('/api/test')
assert response.status_code == 401
data = json.loads(response.data)
assert data['status'] == 'error'
assert data['code'] == 'AUTH_REQUIRED'
def test_require_auth_unauthenticated_json_request(self, app, client, mock_session_manager):
"""Test require_auth decorator with unauthenticated JSON request."""
if not require_auth:
pytest.skip("Module not available")
mock_session_manager.is_authenticated.return_value = False
@app.route('/test')
@require_auth
def test_endpoint():
return jsonify({'message': 'success'})
response = client.get('/test', headers={'Accept': 'application/json'})
assert response.status_code == 401
data = json.loads(response.data)
assert data['status'] == 'error'
assert data['code'] == 'AUTH_REQUIRED'
def test_optional_auth_no_master_password(self, app, client, mock_session_manager):
"""Test optional_auth decorator when no master password is configured."""
if not optional_auth:
pytest.skip("Module not available")
mock_session_manager.is_authenticated.return_value = False
with patch('config.config') as mock_config:
mock_config.has_master_password.return_value = False
@app.route('/test')
@optional_auth
def test_endpoint():
return jsonify({'message': 'success'})
response = client.get('/test')
assert response.status_code == 200
data = json.loads(response.data)
assert data['message'] == 'success'
def test_optional_auth_with_master_password_authenticated(self, app, client, mock_session_manager):
"""Test optional_auth decorator with master password and authenticated user."""
if not optional_auth:
pytest.skip("Module not available")
mock_session_manager.is_authenticated.return_value = True
with patch('config.config') as mock_config:
mock_config.has_master_password.return_value = True
@app.route('/test')
@optional_auth
def test_endpoint():
return jsonify({'message': 'success'})
response = client.get('/test')
assert response.status_code == 200
data = json.loads(response.data)
assert data['message'] == 'success'
def test_optional_auth_with_master_password_unauthenticated(self, app, client, mock_session_manager):
"""Test optional_auth decorator with master password and unauthenticated user."""
if not optional_auth:
pytest.skip("Module not available")
mock_session_manager.is_authenticated.return_value = False
with patch('config.config') as mock_config:
mock_config.has_master_password.return_value = True
@app.route('/api/test')
@optional_auth
def test_endpoint():
return jsonify({'message': 'success'})
response = client.get('/api/test')
assert response.status_code == 401
data = json.loads(response.data)
assert data['status'] == 'error'
assert data['code'] == 'AUTH_REQUIRED'
class TestAuthUtilities:
"""Test cases for authentication utility functions."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
app = Flask(__name__)
app.config['TESTING'] = True
return app
@pytest.fixture
def mock_session_manager(self):
"""Create a mock session manager."""
with patch('src.server.web.controllers.shared.auth_decorators.session_manager') as mock:
yield mock
def test_get_client_ip_direct(self, app):
"""Test get_client_ip with direct IP."""
if not get_client_ip:
pytest.skip("Module not available")
with app.test_request_context('/', environ_base={'REMOTE_ADDR': '192.168.1.100'}):
ip = get_client_ip()
assert ip == '192.168.1.100'
def test_get_client_ip_forwarded(self, app):
"""Test get_client_ip with X-Forwarded-For header."""
if not get_client_ip:
pytest.skip("Module not available")
with app.test_request_context('/', headers={'X-Forwarded-For': '203.0.113.1, 192.168.1.100'}):
ip = get_client_ip()
assert ip == '203.0.113.1'
def test_get_client_ip_real_ip(self, app):
"""Test get_client_ip with X-Real-IP header."""
if not get_client_ip:
pytest.skip("Module not available")
with app.test_request_context('/', headers={'X-Real-IP': '203.0.113.2'}):
ip = get_client_ip()
assert ip == '203.0.113.2'
def test_get_client_ip_unknown(self, app):
"""Test get_client_ip with no IP information."""
if not get_client_ip:
pytest.skip("Module not available")
with app.test_request_context('/'):
ip = get_client_ip()
assert ip == 'unknown'
def test_get_current_user_authenticated(self, app, mock_session_manager):
"""Test get_current_user with authenticated user."""
if not get_current_user:
pytest.skip("Module not available")
mock_session_info = {
'user': 'admin',
'login_time': '2023-01-01T00:00:00',
'ip_address': '192.168.1.100'
}
mock_session_manager.is_authenticated.return_value = True
mock_session_manager.get_session_info.return_value = mock_session_info
with app.test_request_context('/'):
user = get_current_user()
assert user == mock_session_info
def test_get_current_user_unauthenticated(self, app, mock_session_manager):
"""Test get_current_user with unauthenticated user."""
if not get_current_user:
pytest.skip("Module not available")
mock_session_manager.is_authenticated.return_value = False
with app.test_request_context('/'):
user = get_current_user()
assert user is None
def test_is_authenticated_true(self, app, mock_session_manager):
"""Test is_authenticated returns True."""
if not is_authenticated:
pytest.skip("Module not available")
mock_session_manager.is_authenticated.return_value = True
with app.test_request_context('/'):
result = is_authenticated()
assert result is True
def test_is_authenticated_false(self, app, mock_session_manager):
"""Test is_authenticated returns False."""
if not is_authenticated:
pytest.skip("Module not available")
mock_session_manager.is_authenticated.return_value = False
with app.test_request_context('/'):
result = is_authenticated()
assert result is False
def test_logout_current_user(self, app, mock_session_manager):
"""Test logout_current_user function."""
if not logout_current_user:
pytest.skip("Module not available")
mock_session_manager.logout.return_value = True
with app.test_request_context('/'):
result = logout_current_user()
assert result is True
mock_session_manager.logout.assert_called_once_with(None)
class TestAuthDecoratorIntegration:
"""Integration tests for authentication decorators."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
app = Flask(__name__)
app.config['TESTING'] = True
app.secret_key = 'test-secret-key'
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return app.test_client()
def test_decorator_preserves_function_metadata(self, app):
"""Test that decorators preserve function metadata."""
if not require_auth:
pytest.skip("Module not available")
@require_auth
def test_function():
"""Test function docstring."""
return 'test'
assert test_function.__name__ == 'test_function'
assert test_function.__doc__ == 'Test function docstring.'
def test_multiple_decorators(self, app, client):
"""Test using multiple decorators together."""
if not require_auth or not optional_auth:
pytest.skip("Module not available")
with patch('src.server.web.controllers.shared.auth_decorators.session_manager') as mock_sm:
mock_sm.is_authenticated.return_value = True
@app.route('/test1')
@require_auth
def test_endpoint1():
return jsonify({'endpoint': 'test1'})
@app.route('/test2')
@optional_auth
def test_endpoint2():
return jsonify({'endpoint': 'test2'})
# Test both endpoints
response1 = client.get('/test1')
assert response1.status_code == 200
response2 = client.get('/test2')
assert response2.status_code == 200
if __name__ == '__main__':
pytest.main([__file__])

View File

@@ -0,0 +1,455 @@
"""
Test cases for error handling decorators and utilities.
"""
import pytest
from unittest.mock import Mock, patch, MagicMock
from flask import Flask, request, jsonify
import json
# Import the modules to test
try:
from src.server.web.controllers.shared.error_handlers import (
handle_api_errors, handle_database_errors, handle_file_operations,
create_error_response, create_success_response, APIException,
ValidationError, NotFoundError, PermissionError
)
except ImportError:
# Fallback for testing
handle_api_errors = None
handle_database_errors = None
handle_file_operations = None
create_error_response = None
create_success_response = None
APIException = None
ValidationError = None
NotFoundError = None
PermissionError = None
class TestErrorHandlingDecorators:
"""Test cases for error handling decorators."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
app = Flask(__name__)
app.config['TESTING'] = True
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return app.test_client()
def test_handle_api_errors_success(self, app, client):
"""Test handle_api_errors decorator with successful function."""
if not handle_api_errors:
pytest.skip("Module not available")
@app.route('/test')
@handle_api_errors
def test_endpoint():
return {'message': 'success'}
response = client.get('/test')
assert response.status_code == 200
data = json.loads(response.data)
assert data['status'] == 'success'
assert data['message'] == 'success'
def test_handle_api_errors_with_status_code(self, app, client):
"""Test handle_api_errors decorator with tuple return."""
if not handle_api_errors:
pytest.skip("Module not available")
@app.route('/test')
@handle_api_errors
def test_endpoint():
return {'message': 'created'}, 201
response = client.get('/test')
assert response.status_code == 201
data = json.loads(response.data)
assert data['status'] == 'success'
assert data['message'] == 'created'
def test_handle_api_errors_value_error(self, app, client):
"""Test handle_api_errors decorator with ValueError."""
if not handle_api_errors:
pytest.skip("Module not available")
@app.route('/test')
@handle_api_errors
def test_endpoint():
raise ValueError("Invalid input")
response = client.get('/test')
assert response.status_code == 400
data = json.loads(response.data)
assert data['status'] == 'error'
assert data['error_code'] == 'VALIDATION_ERROR'
assert 'Invalid input' in data['message']
def test_handle_api_errors_permission_error(self, app, client):
"""Test handle_api_errors decorator with PermissionError."""
if not handle_api_errors:
pytest.skip("Module not available")
@app.route('/test')
@handle_api_errors
def test_endpoint():
raise PermissionError("Access denied")
response = client.get('/test')
assert response.status_code == 403
data = json.loads(response.data)
assert data['status'] == 'error'
assert data['error_code'] == 'ACCESS_DENIED'
assert data['message'] == 'Access denied'
def test_handle_api_errors_file_not_found(self, app, client):
"""Test handle_api_errors decorator with FileNotFoundError."""
if not handle_api_errors:
pytest.skip("Module not available")
@app.route('/test')
@handle_api_errors
def test_endpoint():
raise FileNotFoundError("File not found")
response = client.get('/test')
assert response.status_code == 404
data = json.loads(response.data)
assert data['status'] == 'error'
assert data['error_code'] == 'NOT_FOUND'
assert data['message'] == 'Resource not found'
def test_handle_api_errors_generic_exception(self, app, client):
"""Test handle_api_errors decorator with generic Exception."""
if not handle_api_errors:
pytest.skip("Module not available")
@app.route('/test')
@handle_api_errors
def test_endpoint():
raise Exception("Something went wrong")
response = client.get('/test')
assert response.status_code == 500
data = json.loads(response.data)
assert data['status'] == 'error'
assert data['error_code'] == 'INTERNAL_ERROR'
assert data['message'] == 'Internal server error'
def test_handle_database_errors(self, app, client):
"""Test handle_database_errors decorator."""
if not handle_database_errors:
pytest.skip("Module not available")
@app.route('/test')
@handle_database_errors
def test_endpoint():
raise Exception("Database connection failed")
response = client.get('/test')
assert response.status_code == 500
data = json.loads(response.data)
assert data['status'] == 'error'
assert data['error_code'] == 'DATABASE_ERROR'
def test_handle_file_operations_file_not_found(self, app, client):
"""Test handle_file_operations decorator with FileNotFoundError."""
if not handle_file_operations:
pytest.skip("Module not available")
@app.route('/test')
@handle_file_operations
def test_endpoint():
raise FileNotFoundError("File not found")
response = client.get('/test')
assert response.status_code == 404
data = json.loads(response.data)
assert data['status'] == 'error'
assert data['error_code'] == 'FILE_NOT_FOUND'
def test_handle_file_operations_permission_error(self, app, client):
"""Test handle_file_operations decorator with PermissionError."""
if not handle_file_operations:
pytest.skip("Module not available")
@app.route('/test')
@handle_file_operations
def test_endpoint():
raise PermissionError("Permission denied")
response = client.get('/test')
assert response.status_code == 403
data = json.loads(response.data)
assert data['status'] == 'error'
assert data['error_code'] == 'PERMISSION_DENIED'
def test_handle_file_operations_os_error(self, app, client):
"""Test handle_file_operations decorator with OSError."""
if not handle_file_operations:
pytest.skip("Module not available")
@app.route('/test')
@handle_file_operations
def test_endpoint():
raise OSError("File system error")
response = client.get('/test')
assert response.status_code == 500
data = json.loads(response.data)
assert data['status'] == 'error'
assert data['error_code'] == 'FILE_SYSTEM_ERROR'
class TestResponseHelpers:
"""Test cases for response helper functions."""
def test_create_error_response_basic(self):
"""Test create_error_response with basic parameters."""
if not create_error_response:
pytest.skip("Module not available")
response, status_code = create_error_response("Test error")
assert status_code == 400
assert response['status'] == 'error'
assert response['message'] == 'Test error'
def test_create_error_response_with_code(self):
"""Test create_error_response with error code."""
if not create_error_response:
pytest.skip("Module not available")
response, status_code = create_error_response(
"Test error",
status_code=422,
error_code="VALIDATION_ERROR"
)
assert status_code == 422
assert response['status'] == 'error'
assert response['message'] == 'Test error'
assert response['error_code'] == 'VALIDATION_ERROR'
def test_create_error_response_with_errors_list(self):
"""Test create_error_response with errors list."""
if not create_error_response:
pytest.skip("Module not available")
errors = ["Field 1 is required", "Field 2 is invalid"]
response, status_code = create_error_response(
"Validation failed",
errors=errors
)
assert response['errors'] == errors
def test_create_error_response_with_data(self):
"""Test create_error_response with additional data."""
if not create_error_response:
pytest.skip("Module not available")
data = {"field": "value"}
response, status_code = create_error_response(
"Test error",
data=data
)
assert response['data'] == data
def test_create_success_response_basic(self):
"""Test create_success_response with basic parameters."""
if not create_success_response:
pytest.skip("Module not available")
response, status_code = create_success_response()
assert status_code == 200
assert response['status'] == 'success'
assert response['message'] == 'Operation successful'
def test_create_success_response_with_data(self):
"""Test create_success_response with data."""
if not create_success_response:
pytest.skip("Module not available")
data = {"id": 1, "name": "Test"}
response, status_code = create_success_response(
data=data,
message="Created successfully",
status_code=201
)
assert status_code == 201
assert response['status'] == 'success'
assert response['message'] == 'Created successfully'
assert response['data'] == data
class TestCustomExceptions:
"""Test cases for custom exception classes."""
def test_api_exception_basic(self):
"""Test APIException with basic parameters."""
if not APIException:
pytest.skip("Module not available")
exception = APIException("Test error")
assert str(exception) == "Test error"
assert exception.message == "Test error"
assert exception.status_code == 400
assert exception.error_code is None
assert exception.errors is None
def test_api_exception_full(self):
"""Test APIException with all parameters."""
if not APIException:
pytest.skip("Module not available")
errors = ["Error 1", "Error 2"]
exception = APIException(
"Test error",
status_code=422,
error_code="CUSTOM_ERROR",
errors=errors
)
assert exception.message == "Test error"
assert exception.status_code == 422
assert exception.error_code == "CUSTOM_ERROR"
assert exception.errors == errors
def test_validation_error(self):
"""Test ValidationError exception."""
if not ValidationError:
pytest.skip("Module not available")
exception = ValidationError("Invalid input")
assert exception.message == "Invalid input"
assert exception.status_code == 400
assert exception.error_code == "VALIDATION_ERROR"
def test_validation_error_with_errors(self):
"""Test ValidationError with errors list."""
if not ValidationError:
pytest.skip("Module not available")
errors = ["Field 1 is required", "Field 2 is invalid"]
exception = ValidationError("Validation failed", errors=errors)
assert exception.message == "Validation failed"
assert exception.errors == errors
def test_not_found_error(self):
"""Test NotFoundError exception."""
if not NotFoundError:
pytest.skip("Module not available")
exception = NotFoundError("Resource not found")
assert exception.message == "Resource not found"
assert exception.status_code == 404
assert exception.error_code == "NOT_FOUND"
def test_not_found_error_default(self):
"""Test NotFoundError with default message."""
if not NotFoundError:
pytest.skip("Module not available")
exception = NotFoundError()
assert exception.message == "Resource not found"
assert exception.status_code == 404
def test_permission_error_custom(self):
"""Test custom PermissionError exception."""
if not PermissionError:
pytest.skip("Module not available")
exception = PermissionError("Custom access denied")
assert exception.message == "Custom access denied"
assert exception.status_code == 403
assert exception.error_code == "ACCESS_DENIED"
def test_permission_error_default(self):
"""Test PermissionError with default message."""
if not PermissionError:
pytest.skip("Module not available")
exception = PermissionError()
assert exception.message == "Access denied"
assert exception.status_code == 403
class TestErrorHandlerIntegration:
"""Integration tests for error handling."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
app = Flask(__name__)
app.config['TESTING'] = True
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return app.test_client()
def test_nested_decorators(self, app, client):
"""Test nested error handling decorators."""
if not handle_api_errors or not handle_database_errors:
pytest.skip("Module not available")
@app.route('/test')
@handle_api_errors
@handle_database_errors
def test_endpoint():
raise ValueError("Test error")
response = client.get('/test')
assert response.status_code == 400
data = json.loads(response.data)
assert data['status'] == 'error'
def test_decorator_preserves_metadata(self):
"""Test that decorators preserve function metadata."""
if not handle_api_errors:
pytest.skip("Module not available")
@handle_api_errors
def test_function():
"""Test function docstring."""
return "test"
assert test_function.__name__ == "test_function"
assert test_function.__doc__ == "Test function docstring."
def test_custom_exception_handling(self, app, client):
"""Test handling of custom exceptions."""
if not handle_api_errors or not APIException:
pytest.skip("Module not available")
@app.route('/test')
@handle_api_errors
def test_endpoint():
raise APIException("Custom error", status_code=422, error_code="CUSTOM")
# Note: The current implementation doesn't handle APIException specifically
# This test documents the current behavior
response = client.get('/test')
assert response.status_code == 500 # Falls through to generic Exception handling
if __name__ == '__main__':
pytest.main([__file__])

View File

@@ -0,0 +1,560 @@
"""
Test cases for response helper utilities.
"""
import pytest
from unittest.mock import Mock, patch, MagicMock
import json
from datetime import datetime
# Import the modules to test
try:
from src.server.web.controllers.shared.response_helpers import (
create_response, create_error_response, create_success_response,
create_paginated_response, format_anime_data, format_episode_data,
format_download_data, format_user_data, format_datetime, format_file_size,
add_cors_headers, create_api_response
)
except ImportError:
# Fallback for testing
create_response = None
create_error_response = None
create_success_response = None
create_paginated_response = None
format_anime_data = None
format_episode_data = None
format_download_data = None
format_user_data = None
format_datetime = None
format_file_size = None
add_cors_headers = None
create_api_response = None
class TestResponseCreation:
"""Test cases for response creation functions."""
def test_create_response_success(self):
"""Test create_response with success data."""
if not create_response:
pytest.skip("Module not available")
data = {'test': 'data'}
response, status_code = create_response(data, 200)
assert status_code == 200
response_data = json.loads(response.data)
assert response_data['test'] == 'data'
assert response.status_code == 200
def test_create_response_with_headers(self):
"""Test create_response with custom headers."""
if not create_response:
pytest.skip("Module not available")
data = {'test': 'data'}
headers = {'X-Custom-Header': 'test-value'}
response, status_code = create_response(data, 200, headers)
assert response.headers.get('X-Custom-Header') == 'test-value'
def test_create_error_response_basic(self):
"""Test create_error_response with basic error."""
if not create_error_response:
pytest.skip("Module not available")
response, status_code = create_error_response("Test error", 400)
assert status_code == 400
response_data = json.loads(response.data)
assert response_data['error'] == 'Test error'
assert response_data['status'] == 'error'
def test_create_error_response_with_details(self):
"""Test create_error_response with error details."""
if not create_error_response:
pytest.skip("Module not available")
details = {'field': 'name', 'issue': 'required'}
response, status_code = create_error_response("Validation error", 422, details)
assert status_code == 422
response_data = json.loads(response.data)
assert response_data['error'] == 'Validation error'
assert response_data['details'] == details
def test_create_success_response_basic(self):
"""Test create_success_response with basic success."""
if not create_success_response:
pytest.skip("Module not available")
response, status_code = create_success_response("Operation successful")
assert status_code == 200
response_data = json.loads(response.data)
assert response_data['message'] == 'Operation successful'
assert response_data['status'] == 'success'
def test_create_success_response_with_data(self):
"""Test create_success_response with data."""
if not create_success_response:
pytest.skip("Module not available")
data = {'created_id': 123}
response, status_code = create_success_response("Created successfully", 201, data)
assert status_code == 201
response_data = json.loads(response.data)
assert response_data['message'] == 'Created successfully'
assert response_data['data'] == data
def test_create_api_response_success(self):
"""Test create_api_response for success case."""
if not create_api_response:
pytest.skip("Module not available")
data = {'test': 'data'}
response, status_code = create_api_response(data, success=True)
assert status_code == 200
response_data = json.loads(response.data)
assert response_data['success'] is True
assert response_data['data'] == data
def test_create_api_response_error(self):
"""Test create_api_response for error case."""
if not create_api_response:
pytest.skip("Module not available")
error_msg = "Something went wrong"
response, status_code = create_api_response(error_msg, success=False, status_code=500)
assert status_code == 500
response_data = json.loads(response.data)
assert response_data['success'] is False
assert response_data['error'] == error_msg
class TestPaginatedResponse:
"""Test cases for paginated response creation."""
def test_create_paginated_response_basic(self):
"""Test create_paginated_response with basic pagination."""
if not create_paginated_response:
pytest.skip("Module not available")
items = [{'id': 1}, {'id': 2}, {'id': 3}]
page = 1
per_page = 10
total = 25
response, status_code = create_paginated_response(items, page, per_page, total)
assert status_code == 200
response_data = json.loads(response.data)
assert response_data['data'] == items
assert response_data['pagination']['page'] == 1
assert response_data['pagination']['per_page'] == 10
assert response_data['pagination']['total'] == 25
assert response_data['pagination']['pages'] == 3 # ceil(25/10)
def test_create_paginated_response_with_endpoint(self):
"""Test create_paginated_response with endpoint for links."""
if not create_paginated_response:
pytest.skip("Module not available")
items = [{'id': 1}]
page = 2
per_page = 5
total = 20
endpoint = '/api/items'
response, status_code = create_paginated_response(
items, page, per_page, total, endpoint=endpoint
)
response_data = json.loads(response.data)
links = response_data['pagination']['links']
assert '/api/items?page=1' in links['first']
assert '/api/items?page=3' in links['next']
assert '/api/items?page=1' in links['prev']
assert '/api/items?page=4' in links['last']
def test_create_paginated_response_first_page(self):
"""Test create_paginated_response on first page."""
if not create_paginated_response:
pytest.skip("Module not available")
items = [{'id': 1}]
response, status_code = create_paginated_response(items, 1, 10, 20)
response_data = json.loads(response.data)
pagination = response_data['pagination']
assert pagination['has_prev'] is False
assert pagination['has_next'] is True
def test_create_paginated_response_last_page(self):
"""Test create_paginated_response on last page."""
if not create_paginated_response:
pytest.skip("Module not available")
items = [{'id': 1}]
response, status_code = create_paginated_response(items, 3, 10, 25)
response_data = json.loads(response.data)
pagination = response_data['pagination']
assert pagination['has_prev'] is True
assert pagination['has_next'] is False
def test_create_paginated_response_empty(self):
"""Test create_paginated_response with empty results."""
if not create_paginated_response:
pytest.skip("Module not available")
response, status_code = create_paginated_response([], 1, 10, 0)
response_data = json.loads(response.data)
assert response_data['data'] == []
assert response_data['pagination']['total'] == 0
assert response_data['pagination']['pages'] == 0
class TestDataFormatting:
"""Test cases for data formatting functions."""
def test_format_anime_data(self):
"""Test format_anime_data function."""
if not format_anime_data:
pytest.skip("Module not available")
anime = {
'id': 1,
'name': 'Test Anime',
'url': 'https://example.com/anime/1',
'description': 'A test anime',
'episodes': 12,
'status': 'completed',
'created_at': '2023-01-01 12:00:00',
'updated_at': '2023-01-02 12:00:00'
}
formatted = format_anime_data(anime)
assert formatted['id'] == 1
assert formatted['name'] == 'Test Anime'
assert formatted['url'] == 'https://example.com/anime/1'
assert formatted['description'] == 'A test anime'
assert formatted['episodes'] == 12
assert formatted['status'] == 'completed'
assert 'created_at' in formatted
assert 'updated_at' in formatted
def test_format_anime_data_with_episodes(self):
"""Test format_anime_data with episode information."""
if not format_anime_data:
pytest.skip("Module not available")
anime = {
'id': 1,
'name': 'Test Anime',
'url': 'https://example.com/anime/1'
}
episodes = [
{'id': 1, 'number': 1, 'title': 'Episode 1'},
{'id': 2, 'number': 2, 'title': 'Episode 2'}
]
formatted = format_anime_data(anime, include_episodes=True, episodes=episodes)
assert 'episodes' in formatted
assert len(formatted['episodes']) == 2
assert formatted['episodes'][0]['number'] == 1
def test_format_episode_data(self):
"""Test format_episode_data function."""
if not format_episode_data:
pytest.skip("Module not available")
episode = {
'id': 1,
'anime_id': 5,
'number': 1,
'title': 'First Episode',
'url': 'https://example.com/episode/1',
'duration': 1440, # 24 minutes in seconds
'status': 'available',
'created_at': '2023-01-01 12:00:00'
}
formatted = format_episode_data(episode)
assert formatted['id'] == 1
assert formatted['anime_id'] == 5
assert formatted['number'] == 1
assert formatted['title'] == 'First Episode'
assert formatted['url'] == 'https://example.com/episode/1'
assert formatted['duration'] == 1440
assert formatted['status'] == 'available'
assert 'created_at' in formatted
def test_format_download_data(self):
"""Test format_download_data function."""
if not format_download_data:
pytest.skip("Module not available")
download = {
'id': 1,
'anime_id': 5,
'episode_id': 10,
'status': 'downloading',
'progress': 75.5,
'size': 1073741824, # 1GB in bytes
'downloaded_size': 805306368, # 768MB
'speed': 1048576, # 1MB/s
'eta': 300, # 5 minutes
'created_at': '2023-01-01 12:00:00',
'started_at': '2023-01-01 12:05:00'
}
formatted = format_download_data(download)
assert formatted['id'] == 1
assert formatted['anime_id'] == 5
assert formatted['episode_id'] == 10
assert formatted['status'] == 'downloading'
assert formatted['progress'] == 75.5
assert formatted['size'] == 1073741824
assert formatted['downloaded_size'] == 805306368
assert formatted['speed'] == 1048576
assert formatted['eta'] == 300
assert 'created_at' in formatted
assert 'started_at' in formatted
def test_format_user_data(self):
"""Test format_user_data function."""
if not format_user_data:
pytest.skip("Module not available")
user = {
'id': 1,
'username': 'testuser',
'email': 'test@example.com',
'password_hash': 'secret_hash',
'role': 'user',
'last_login': '2023-01-01 12:00:00',
'created_at': '2023-01-01 10:00:00'
}
formatted = format_user_data(user)
assert formatted['id'] == 1
assert formatted['username'] == 'testuser'
assert formatted['email'] == 'test@example.com'
assert formatted['role'] == 'user'
assert 'last_login' in formatted
assert 'created_at' in formatted
# Should not include sensitive data
assert 'password_hash' not in formatted
def test_format_user_data_include_sensitive(self):
"""Test format_user_data with sensitive data included."""
if not format_user_data:
pytest.skip("Module not available")
user = {
'id': 1,
'username': 'testuser',
'password_hash': 'secret_hash'
}
formatted = format_user_data(user, include_sensitive=True)
assert 'password_hash' in formatted
assert formatted['password_hash'] == 'secret_hash'
class TestUtilityFormatting:
"""Test cases for utility formatting functions."""
def test_format_datetime_string(self):
"""Test format_datetime with string input."""
if not format_datetime:
pytest.skip("Module not available")
dt_string = "2023-01-01 12:30:45"
formatted = format_datetime(dt_string)
assert isinstance(formatted, str)
assert "2023" in formatted
assert "01" in formatted
def test_format_datetime_object(self):
"""Test format_datetime with datetime object."""
if not format_datetime:
pytest.skip("Module not available")
dt_object = datetime(2023, 1, 1, 12, 30, 45)
formatted = format_datetime(dt_object)
assert isinstance(formatted, str)
assert "2023" in formatted
def test_format_datetime_none(self):
"""Test format_datetime with None input."""
if not format_datetime:
pytest.skip("Module not available")
formatted = format_datetime(None)
assert formatted is None
def test_format_datetime_custom_format(self):
"""Test format_datetime with custom format."""
if not format_datetime:
pytest.skip("Module not available")
dt_string = "2023-01-01 12:30:45"
formatted = format_datetime(dt_string, fmt="%Y/%m/%d")
assert formatted == "2023/01/01"
def test_format_file_size_bytes(self):
"""Test format_file_size with bytes."""
if not format_file_size:
pytest.skip("Module not available")
assert format_file_size(512) == "512 B"
assert format_file_size(0) == "0 B"
def test_format_file_size_kilobytes(self):
"""Test format_file_size with kilobytes."""
if not format_file_size:
pytest.skip("Module not available")
assert format_file_size(1024) == "1.0 KB"
assert format_file_size(1536) == "1.5 KB"
def test_format_file_size_megabytes(self):
"""Test format_file_size with megabytes."""
if not format_file_size:
pytest.skip("Module not available")
assert format_file_size(1048576) == "1.0 MB"
assert format_file_size(1572864) == "1.5 MB"
def test_format_file_size_gigabytes(self):
"""Test format_file_size with gigabytes."""
if not format_file_size:
pytest.skip("Module not available")
assert format_file_size(1073741824) == "1.0 GB"
assert format_file_size(2147483648) == "2.0 GB"
def test_format_file_size_terabytes(self):
"""Test format_file_size with terabytes."""
if not format_file_size:
pytest.skip("Module not available")
assert format_file_size(1099511627776) == "1.0 TB"
def test_format_file_size_precision(self):
"""Test format_file_size with custom precision."""
if not format_file_size:
pytest.skip("Module not available")
size = 1536 # 1.5 KB
assert format_file_size(size, precision=2) == "1.50 KB"
assert format_file_size(size, precision=0) == "2 KB" # Rounded up
class TestCORSHeaders:
"""Test cases for CORS header utilities."""
def test_add_cors_headers_basic(self):
"""Test add_cors_headers with basic response."""
if not add_cors_headers:
pytest.skip("Module not available")
# Mock response object
response = Mock()
response.headers = {}
result = add_cors_headers(response)
assert result.headers['Access-Control-Allow-Origin'] == '*'
assert 'GET, POST, PUT, DELETE, OPTIONS' in result.headers['Access-Control-Allow-Methods']
assert 'Content-Type, Authorization' in result.headers['Access-Control-Allow-Headers']
def test_add_cors_headers_custom_origin(self):
"""Test add_cors_headers with custom origin."""
if not add_cors_headers:
pytest.skip("Module not available")
response = Mock()
response.headers = {}
result = add_cors_headers(response, origin='https://example.com')
assert result.headers['Access-Control-Allow-Origin'] == 'https://example.com'
def test_add_cors_headers_custom_methods(self):
"""Test add_cors_headers with custom methods."""
if not add_cors_headers:
pytest.skip("Module not available")
response = Mock()
response.headers = {}
result = add_cors_headers(response, methods=['GET', 'POST'])
assert result.headers['Access-Control-Allow-Methods'] == 'GET, POST'
def test_add_cors_headers_existing_headers(self):
"""Test add_cors_headers preserves existing headers."""
if not add_cors_headers:
pytest.skip("Module not available")
response = Mock()
response.headers = {'X-Custom-Header': 'custom-value'}
result = add_cors_headers(response)
assert result.headers['X-Custom-Header'] == 'custom-value'
assert 'Access-Control-Allow-Origin' in result.headers
class TestResponseIntegration:
"""Integration tests for response helpers."""
def test_formatted_paginated_response(self):
"""Test creating paginated response with formatted data."""
if not create_paginated_response or not format_anime_data:
pytest.skip("Module not available")
anime_list = [
{'id': 1, 'name': 'Anime 1', 'url': 'https://example.com/1'},
{'id': 2, 'name': 'Anime 2', 'url': 'https://example.com/2'}
]
formatted_items = [format_anime_data(anime) for anime in anime_list]
response, status_code = create_paginated_response(formatted_items, 1, 10, 2)
assert status_code == 200
response_data = json.loads(response.data)
assert len(response_data['data']) == 2
assert response_data['data'][0]['name'] == 'Anime 1'
def test_error_response_with_cors(self):
"""Test error response with CORS headers."""
if not create_error_response or not add_cors_headers:
pytest.skip("Module not available")
response, status_code = create_error_response("Test error", 400)
response_with_cors = add_cors_headers(response)
assert 'Access-Control-Allow-Origin' in response_with_cors.headers
assert status_code == 400
if __name__ == '__main__':
pytest.main([__file__])

View File

@@ -0,0 +1,546 @@
"""
Test cases for input validation utilities.
"""
import pytest
from unittest.mock import Mock, patch, MagicMock
from flask import Flask, request
import json
import tempfile
import os
# Import the modules to test
try:
from src.server.web.controllers.shared.validators import (
validate_json_input, validate_query_params, validate_pagination_params,
validate_anime_data, validate_file_upload, is_valid_url, is_valid_email,
sanitize_string, validate_id_parameter
)
except ImportError:
# Fallback for testing
validate_json_input = None
validate_query_params = None
validate_pagination_params = None
validate_anime_data = None
validate_file_upload = None
is_valid_url = None
is_valid_email = None
sanitize_string = None
validate_id_parameter = None
class TestValidationDecorators:
"""Test cases for validation decorators."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
app = Flask(__name__)
app.config['TESTING'] = True
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return app.test_client()
def test_validate_json_input_success(self, app, client):
"""Test validate_json_input decorator with valid JSON."""
if not validate_json_input:
pytest.skip("Module not available")
@app.route('/test', methods=['POST'])
@validate_json_input(
required_fields=['name'],
optional_fields=['description'],
field_types={'name': str, 'description': str}
)
def test_endpoint():
return {'status': 'success'}
response = client.post('/test',
json={'name': 'Test Name', 'description': 'Test Description'},
content_type='application/json')
assert response.status_code == 200
def test_validate_json_input_missing_required(self, app, client):
"""Test validate_json_input with missing required field."""
if not validate_json_input:
pytest.skip("Module not available")
@app.route('/test', methods=['POST'])
@validate_json_input(required_fields=['name'])
def test_endpoint():
return {'status': 'success'}
response = client.post('/test',
json={'description': 'Test Description'},
content_type='application/json')
assert response.status_code == 400
data = json.loads(response.data)
assert 'Missing required fields' in data[0]['message']
def test_validate_json_input_wrong_type(self, app, client):
"""Test validate_json_input with wrong field type."""
if not validate_json_input:
pytest.skip("Module not available")
@app.route('/test', methods=['POST'])
@validate_json_input(
required_fields=['age'],
field_types={'age': int}
)
def test_endpoint():
return {'status': 'success'}
response = client.post('/test',
json={'age': 'twenty'},
content_type='application/json')
assert response.status_code == 400
data = json.loads(response.data)
assert 'Type validation failed' in data[0]['message']
def test_validate_json_input_unexpected_fields(self, app, client):
"""Test validate_json_input with unexpected fields."""
if not validate_json_input:
pytest.skip("Module not available")
@app.route('/test', methods=['POST'])
@validate_json_input(
required_fields=['name'],
optional_fields=['description']
)
def test_endpoint():
return {'status': 'success'}
response = client.post('/test',
json={'name': 'Test', 'description': 'Test', 'extra_field': 'unexpected'},
content_type='application/json')
assert response.status_code == 400
data = json.loads(response.data)
assert 'Unexpected fields' in data[0]['message']
def test_validate_json_input_not_json(self, app, client):
"""Test validate_json_input with non-JSON content."""
if not validate_json_input:
pytest.skip("Module not available")
@app.route('/test', methods=['POST'])
@validate_json_input(required_fields=['name'])
def test_endpoint():
return {'status': 'success'}
response = client.post('/test', data='not json')
assert response.status_code == 400
data = json.loads(response.data)
assert 'Request must be JSON' in data[0]['message']
def test_validate_query_params_success(self, app, client):
"""Test validate_query_params decorator with valid parameters."""
if not validate_query_params:
pytest.skip("Module not available")
@app.route('/test')
@validate_query_params(
allowed_params=['page', 'limit'],
required_params=['page'],
param_types={'page': int, 'limit': int}
)
def test_endpoint():
return {'status': 'success'}
response = client.get('/test?page=1&limit=10')
assert response.status_code == 200
def test_validate_query_params_missing_required(self, app, client):
"""Test validate_query_params with missing required parameter."""
if not validate_query_params:
pytest.skip("Module not available")
@app.route('/test')
@validate_query_params(required_params=['page'])
def test_endpoint():
return {'status': 'success'}
response = client.get('/test')
assert response.status_code == 400
data = json.loads(response.data)
assert 'Missing required parameters' in data[0]['message']
def test_validate_query_params_unexpected(self, app, client):
"""Test validate_query_params with unexpected parameters."""
if not validate_query_params:
pytest.skip("Module not available")
@app.route('/test')
@validate_query_params(allowed_params=['page'])
def test_endpoint():
return {'status': 'success'}
response = client.get('/test?page=1&unexpected=value')
assert response.status_code == 400
data = json.loads(response.data)
assert 'Unexpected parameters' in data[0]['message']
def test_validate_query_params_wrong_type(self, app, client):
"""Test validate_query_params with wrong parameter type."""
if not validate_query_params:
pytest.skip("Module not available")
@app.route('/test')
@validate_query_params(
allowed_params=['page'],
param_types={'page': int}
)
def test_endpoint():
return {'status': 'success'}
response = client.get('/test?page=abc')
assert response.status_code == 400
data = json.loads(response.data)
assert 'Parameter type validation failed' in data[0]['message']
def test_validate_pagination_params_success(self, app, client):
"""Test validate_pagination_params decorator with valid parameters."""
if not validate_pagination_params:
pytest.skip("Module not available")
@app.route('/test')
@validate_pagination_params
def test_endpoint():
return {'status': 'success'}
response = client.get('/test?page=1&per_page=10&limit=20&offset=5')
assert response.status_code == 200
def test_validate_pagination_params_invalid_page(self, app, client):
"""Test validate_pagination_params with invalid page."""
if not validate_pagination_params:
pytest.skip("Module not available")
@app.route('/test')
@validate_pagination_params
def test_endpoint():
return {'status': 'success'}
response = client.get('/test?page=0')
assert response.status_code == 400
data = json.loads(response.data)
assert 'page must be greater than 0' in data[0]['errors']
def test_validate_pagination_params_invalid_per_page(self, app, client):
"""Test validate_pagination_params with invalid per_page."""
if not validate_pagination_params:
pytest.skip("Module not available")
@app.route('/test')
@validate_pagination_params
def test_endpoint():
return {'status': 'success'}
response = client.get('/test?per_page=2000')
assert response.status_code == 400
data = json.loads(response.data)
assert 'per_page cannot exceed 1000' in data[0]['errors']
def test_validate_id_parameter_success(self, app, client):
"""Test validate_id_parameter decorator with valid ID."""
if not validate_id_parameter:
pytest.skip("Module not available")
@app.route('/test/<int:id>')
@validate_id_parameter('id')
def test_endpoint(id):
return {'status': 'success', 'id': id}
response = client.get('/test/123')
assert response.status_code == 200
data = json.loads(response.data)
assert data['id'] == 123
def test_validate_id_parameter_invalid(self, app):
"""Test validate_id_parameter decorator with invalid ID."""
if not validate_id_parameter:
pytest.skip("Module not available")
@validate_id_parameter('id')
def test_function(id='abc'):
return {'status': 'success'}
# Since this is a decorator that modifies kwargs,
# we test it directly
result = test_function(id='abc')
# Should return error response
assert result[1] == 400
class TestValidationUtilities:
"""Test cases for validation utility functions."""
def test_validate_anime_data_valid(self):
"""Test validate_anime_data with valid data."""
if not validate_anime_data:
pytest.skip("Module not available")
data = {
'name': 'Test Anime',
'url': 'https://example.com/anime/test',
'description': 'A test anime',
'episodes': 12,
'status': 'completed'
}
errors = validate_anime_data(data)
assert len(errors) == 0
def test_validate_anime_data_missing_required(self):
"""Test validate_anime_data with missing required fields."""
if not validate_anime_data:
pytest.skip("Module not available")
data = {
'description': 'A test anime'
}
errors = validate_anime_data(data)
assert len(errors) > 0
assert any('Missing required field: name' in error for error in errors)
assert any('Missing required field: url' in error for error in errors)
def test_validate_anime_data_invalid_types(self):
"""Test validate_anime_data with invalid field types."""
if not validate_anime_data:
pytest.skip("Module not available")
data = {
'name': 123, # Should be string
'url': 'invalid-url', # Should be valid URL
'episodes': 'twelve' # Should be integer
}
errors = validate_anime_data(data)
assert len(errors) > 0
assert any('name must be a string' in error for error in errors)
assert any('url must be a valid URL' in error for error in errors)
assert any('episodes must be an integer' in error for error in errors)
def test_validate_anime_data_invalid_status(self):
"""Test validate_anime_data with invalid status."""
if not validate_anime_data:
pytest.skip("Module not available")
data = {
'name': 'Test Anime',
'url': 'https://example.com/anime/test',
'status': 'invalid_status'
}
errors = validate_anime_data(data)
assert len(errors) > 0
assert any('status must be one of' in error for error in errors)
def test_validate_file_upload_valid(self):
"""Test validate_file_upload with valid file."""
if not validate_file_upload:
pytest.skip("Module not available")
# Create a mock file object
mock_file = Mock()
mock_file.filename = 'test.txt'
mock_file.content_length = 1024 # 1KB
errors = validate_file_upload(mock_file, allowed_extensions=['txt'], max_size_mb=1)
assert len(errors) == 0
def test_validate_file_upload_no_file(self):
"""Test validate_file_upload with no file."""
if not validate_file_upload:
pytest.skip("Module not available")
errors = validate_file_upload(None)
assert len(errors) > 0
assert 'No file provided' in errors
def test_validate_file_upload_empty_filename(self):
"""Test validate_file_upload with empty filename."""
if not validate_file_upload:
pytest.skip("Module not available")
mock_file = Mock()
mock_file.filename = ''
errors = validate_file_upload(mock_file)
assert len(errors) > 0
assert 'No file selected' in errors
def test_validate_file_upload_invalid_extension(self):
"""Test validate_file_upload with invalid extension."""
if not validate_file_upload:
pytest.skip("Module not available")
mock_file = Mock()
mock_file.filename = 'test.exe'
errors = validate_file_upload(mock_file, allowed_extensions=['txt', 'pdf'])
assert len(errors) > 0
assert 'File type not allowed' in errors[0]
def test_validate_file_upload_too_large(self):
"""Test validate_file_upload with file too large."""
if not validate_file_upload:
pytest.skip("Module not available")
mock_file = Mock()
mock_file.filename = 'test.txt'
mock_file.content_length = 5 * 1024 * 1024 # 5MB
errors = validate_file_upload(mock_file, max_size_mb=1)
assert len(errors) > 0
assert 'File size exceeds maximum' in errors[0]
def test_is_valid_url_valid(self):
"""Test is_valid_url with valid URLs."""
if not is_valid_url:
pytest.skip("Module not available")
valid_urls = [
'https://example.com',
'http://test.co.uk',
'https://subdomain.example.com/path',
'http://localhost:8080',
'https://192.168.1.1:3000/api'
]
for url in valid_urls:
assert is_valid_url(url), f"URL should be valid: {url}"
def test_is_valid_url_invalid(self):
"""Test is_valid_url with invalid URLs."""
if not is_valid_url:
pytest.skip("Module not available")
invalid_urls = [
'not-a-url',
'ftp://example.com', # Only http/https supported
'https://',
'http://.',
'just-text'
]
for url in invalid_urls:
assert not is_valid_url(url), f"URL should be invalid: {url}"
def test_is_valid_email_valid(self):
"""Test is_valid_email with valid emails."""
if not is_valid_email:
pytest.skip("Module not available")
valid_emails = [
'test@example.com',
'user.name@domain.co.uk',
'admin+tag@site.org',
'user123@test-domain.com'
]
for email in valid_emails:
assert is_valid_email(email), f"Email should be valid: {email}"
def test_is_valid_email_invalid(self):
"""Test is_valid_email with invalid emails."""
if not is_valid_email:
pytest.skip("Module not available")
invalid_emails = [
'not-an-email',
'@domain.com',
'user@',
'user@domain',
'user space@domain.com'
]
for email in invalid_emails:
assert not is_valid_email(email), f"Email should be invalid: {email}"
def test_sanitize_string_basic(self):
"""Test sanitize_string with basic input."""
if not sanitize_string:
pytest.skip("Module not available")
result = sanitize_string(" Hello World ")
assert result == "Hello World"
def test_sanitize_string_max_length(self):
"""Test sanitize_string with max length."""
if not sanitize_string:
pytest.skip("Module not available")
long_string = "A" * 100
result = sanitize_string(long_string, max_length=50)
assert len(result) == 50
assert result == "A" * 50
def test_sanitize_string_control_characters(self):
"""Test sanitize_string removes control characters."""
if not sanitize_string:
pytest.skip("Module not available")
input_string = "Hello\x00World\x01Test"
result = sanitize_string(input_string)
assert result == "HelloWorldTest"
def test_sanitize_string_non_string(self):
"""Test sanitize_string with non-string input."""
if not sanitize_string:
pytest.skip("Module not available")
result = sanitize_string(123)
assert result == "123"
class TestValidatorIntegration:
"""Integration tests for validators."""
@pytest.fixture
def app(self):
"""Create a test Flask application."""
app = Flask(__name__)
app.config['TESTING'] = True
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return app.test_client()
def test_multiple_validators(self, app, client):
"""Test using multiple validators on the same endpoint."""
if not validate_json_input or not validate_pagination_params:
pytest.skip("Module not available")
@app.route('/test', methods=['POST'])
@validate_json_input(required_fields=['name'])
@validate_pagination_params
def test_endpoint():
return {'status': 'success'}
response = client.post('/test?page=1&per_page=10',
json={'name': 'Test'},
content_type='application/json')
assert response.status_code == 200
def test_validator_preserves_metadata(self):
"""Test that validators preserve function metadata."""
if not validate_json_input:
pytest.skip("Module not available")
@validate_json_input(required_fields=['name'])
def test_function():
"""Test function docstring."""
return "test"
assert test_function.__name__ == "test_function"
assert test_function.__doc__ == "Test function docstring."
if __name__ == '__main__':
pytest.main([__file__])