- Created SetupRedirectMiddleware to redirect unconfigured apps to /setup - Enhanced /api/auth/setup endpoint to save anime_directory to config - Updated SetupRequest model to accept optional anime_directory parameter - Modified setup.html to send anime_directory in setup API call - Added @pytest.mark.requires_clean_auth marker for tests needing unconfigured state - Modified conftest.py to conditionally setup auth based on test marker - Fixed all test failures (846/846 tests now passing) - Updated instructions.md to mark setup tasks as complete This implementation ensures users are guided through initial setup before accessing the application, while maintaining test isolation and preventing auth state leakage between tests.
296 lines
9.4 KiB
Python
296 lines
9.4 KiB
Python
"""
|
|
SQL Injection Protection Tests.
|
|
|
|
This module tests SQL injection vulnerabilities across the application.
|
|
"""
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
|
|
from src.server.fastapi_app import app
|
|
|
|
|
|
@pytest.mark.security
|
|
class TestSQLInjection:
|
|
"""Security tests for SQL injection protection."""
|
|
|
|
@pytest.fixture
|
|
async def client(self):
|
|
"""Create async HTTP client for testing."""
|
|
from httpx import ASGITransport
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as ac:
|
|
yield ac
|
|
|
|
# Classic SQL Injection payloads
|
|
SQL_INJECTION_PAYLOADS = [
|
|
"' OR '1'='1",
|
|
"' OR 1=1--",
|
|
"admin'--",
|
|
"' OR 'a'='a",
|
|
"1' OR '1' = '1",
|
|
"' UNION SELECT NULL--",
|
|
"'; DROP TABLE users--",
|
|
"1'; DELETE FROM anime--",
|
|
"' AND 1=0 UNION ALL SELECT 'admin', '81dc9bdb52d04dc20036dbd8313ed055'",
|
|
"admin' /*",
|
|
"' or 1=1 limit 1 -- -+",
|
|
"') OR ('1'='1",
|
|
]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sql_injection_in_search(self, client):
|
|
"""Test SQL injection protection in search functionality."""
|
|
for payload in self.SQL_INJECTION_PAYLOADS:
|
|
response = await client.get(
|
|
"/api/anime/search", params={"query": payload}
|
|
)
|
|
|
|
# Should not cause SQL error or return unauthorized data
|
|
assert response.status_code in [200, 400, 422]
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
# Should not return all records
|
|
assert "success" in data or "error" in data
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sql_injection_in_login(self, client):
|
|
"""Test SQL injection protection in login."""
|
|
for payload in self.SQL_INJECTION_PAYLOADS:
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"username": payload, "password": "anything"},
|
|
)
|
|
|
|
# Should not authenticate (401), reject invalid input (422),
|
|
# or rate limit (429)
|
|
assert response.status_code in [401, 422, 429]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sql_injection_in_anime_id(self, client):
|
|
"""Test SQL injection protection in ID parameters."""
|
|
malicious_ids = [
|
|
"1 OR 1=1",
|
|
"1'; DROP TABLE anime--",
|
|
"1 UNION SELECT * FROM users--",
|
|
]
|
|
|
|
for malicious_id in malicious_ids:
|
|
response = await client.get(f"/api/anime/{malicious_id}")
|
|
|
|
# Should reject malicious ID
|
|
assert response.status_code in [400, 404, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_blind_sql_injection(self, client):
|
|
"""Test protection against blind SQL injection."""
|
|
# Time-based blind SQL injection
|
|
time_payloads = [
|
|
"1' AND SLEEP(5)--",
|
|
"1' WAITFOR DELAY '0:0:5'--",
|
|
]
|
|
|
|
for payload in time_payloads:
|
|
response = await client.get(
|
|
"/api/anime/search", params={"query": payload}
|
|
)
|
|
|
|
# Should not cause delays or errors
|
|
assert response.status_code in [200, 400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_second_order_sql_injection(self, client):
|
|
"""Test protection against second-order SQL injection."""
|
|
# Register user with malicious username
|
|
malicious_username = "admin'--"
|
|
|
|
response = await client.post(
|
|
"/api/auth/register",
|
|
json={
|
|
"username": malicious_username,
|
|
"password": "SecureP@ss123!",
|
|
"email": "test@example.com",
|
|
},
|
|
)
|
|
|
|
# Should either reject or safely store
|
|
if response.status_code == 200:
|
|
# Try to use that username elsewhere
|
|
response2 = await client.post(
|
|
"/api/auth/login",
|
|
json={
|
|
"username": malicious_username,
|
|
"password": "SecureP@ss123!",
|
|
},
|
|
)
|
|
|
|
# Should handle safely
|
|
assert response2.status_code in [200, 401, 422]
|
|
|
|
|
|
@pytest.mark.security
|
|
class TestNoSQLInjection:
|
|
"""Security tests for NoSQL injection protection."""
|
|
|
|
@pytest.fixture
|
|
async def client(self):
|
|
"""Create async HTTP client for testing."""
|
|
from httpx import ASGITransport
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as ac:
|
|
yield ac
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_nosql_injection_in_query(self, client):
|
|
"""Test NoSQL injection protection."""
|
|
nosql_payloads = [
|
|
'{"$gt": ""}',
|
|
'{"$ne": null}',
|
|
'{"$regex": ".*"}',
|
|
'{"$where": "1==1"}',
|
|
]
|
|
|
|
for payload in nosql_payloads:
|
|
response = await client.get(
|
|
"/api/anime/search", params={"query": payload}
|
|
)
|
|
|
|
# Should not cause unauthorized access
|
|
assert response.status_code in [200, 400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_nosql_operator_injection(self, client):
|
|
"""Test NoSQL operator injection protection."""
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={
|
|
"username": {"$ne": None},
|
|
"password": {"$ne": None},
|
|
},
|
|
)
|
|
|
|
# Should not authenticate
|
|
assert response.status_code in [401, 422]
|
|
|
|
|
|
@pytest.mark.security
|
|
@pytest.mark.requires_clean_auth
|
|
class TestORMInjection:
|
|
"""Security tests for ORM injection protection."""
|
|
|
|
@pytest.fixture
|
|
async def client(self):
|
|
"""Create async HTTP client for testing."""
|
|
from httpx import ASGITransport
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as ac:
|
|
yield ac
|
|
|
|
async def get_auth_token(self, client):
|
|
"""Helper to get authentication token."""
|
|
password = "SecurePass123!"
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": password}
|
|
)
|
|
login_response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": password}
|
|
)
|
|
return login_response.json()["access_token"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_orm_attribute_injection(self, client):
|
|
"""Test protection against ORM attribute injection."""
|
|
token = await self.get_auth_token(client)
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
# Try to access internal attributes
|
|
response = await client.get(
|
|
"/api/anime",
|
|
params={"sort_by": "__class__.__init__.__globals__"},
|
|
headers=headers,
|
|
)
|
|
|
|
# Should reject malicious sort parameter, or 503 if service unavailable
|
|
assert response.status_code in [200, 400, 422, 503]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_orm_method_injection(self, client):
|
|
"""Test protection against ORM method injection."""
|
|
token = await self.get_auth_token(client)
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
response = await client.get(
|
|
"/api/anime",
|
|
params={"filter": "password;drop table users;"},
|
|
headers=headers,
|
|
)
|
|
|
|
# Should handle safely, or 503 if service unavailable
|
|
assert response.status_code in [200, 400, 422, 503]
|
|
|
|
|
|
@pytest.mark.security
|
|
class TestDatabaseSecurity:
|
|
"""General database security tests."""
|
|
|
|
@pytest.fixture
|
|
async def client(self):
|
|
"""Create async HTTP client for testing."""
|
|
from httpx import ASGITransport
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as ac:
|
|
yield ac
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_error_messages_no_leak_info(self, client):
|
|
"""Test that database errors don't leak information."""
|
|
response = await client.get("/api/anime/99999999")
|
|
|
|
# Should not expose database structure in errors
|
|
if response.status_code in [400, 404, 500]:
|
|
error_text = response.text.lower()
|
|
assert "sqlite" not in error_text
|
|
assert "table" not in error_text
|
|
assert "column" not in error_text
|
|
assert "constraint" not in error_text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_prepared_statements_used(self, client):
|
|
"""Test that prepared statements are used (indirect test)."""
|
|
# This is tested indirectly by SQL injection tests
|
|
# If SQL injection is prevented, prepared statements are likely used
|
|
response = await client.get(
|
|
"/api/anime/search", params={"query": "' OR '1'='1"}
|
|
)
|
|
|
|
# Should not return all records
|
|
assert response.status_code in [200, 400, 422]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_sensitive_data_in_logs(self, client):
|
|
"""Test that sensitive data is not logged."""
|
|
# This would require checking logs
|
|
# Placeholder for the test principle
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={
|
|
"username": "testuser",
|
|
"password": "SecureP@ssw0rd!",
|
|
},
|
|
)
|
|
|
|
# Password should not appear in logs
|
|
# (Would need log inspection)
|
|
assert response.status_code in [200, 401, 422]
|