""" SQL Injection Protection Tests. This module tests SQL injection vulnerabilities across the application. """ import pytest from httpx import AsyncClient from src.server.fastapi_app import app @pytest.mark.security class TestSQLInjection: """Security tests for SQL injection protection.""" @pytest.fixture async def client(self): """Create async HTTP client for testing.""" from httpx import ASGITransport async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test" ) as ac: yield ac # Classic SQL Injection payloads SQL_INJECTION_PAYLOADS = [ "' OR '1'='1", "' OR 1=1--", "admin'--", "' OR 'a'='a", "1' OR '1' = '1", "' UNION SELECT NULL--", "'; DROP TABLE users--", "1'; DELETE FROM anime--", "' AND 1=0 UNION ALL SELECT 'admin', '81dc9bdb52d04dc20036dbd8313ed055'", "admin' /*", "' or 1=1 limit 1 -- -+", "') OR ('1'='1", ] @pytest.mark.asyncio async def test_sql_injection_in_search(self, client): """Test SQL injection protection in search functionality.""" for payload in self.SQL_INJECTION_PAYLOADS: response = await client.get( "/api/anime/search", params={"query": payload} ) # Should not cause SQL error or return unauthorized data assert response.status_code in [200, 400, 422] if response.status_code == 200: data = response.json() # Should not return all records assert "success" in data or "error" in data @pytest.mark.asyncio async def test_sql_injection_in_login(self, client): """Test SQL injection protection in login.""" for payload in self.SQL_INJECTION_PAYLOADS: response = await client.post( "/api/auth/login", json={"username": payload, "password": "anything"}, ) # Should not authenticate (401), reject invalid input (422), # or rate limit (429) assert response.status_code in [401, 422, 429] @pytest.mark.asyncio async def test_sql_injection_in_anime_id(self, client): """Test SQL injection protection in ID parameters.""" malicious_ids = [ "1 OR 1=1", "1'; DROP TABLE anime--", "1 UNION SELECT * FROM users--", ] for malicious_id in malicious_ids: response = await client.get(f"/api/anime/{malicious_id}") # Should reject malicious ID assert response.status_code in [400, 404, 422] @pytest.mark.asyncio async def test_blind_sql_injection(self, client): """Test protection against blind SQL injection.""" # Time-based blind SQL injection time_payloads = [ "1' AND SLEEP(5)--", "1' WAITFOR DELAY '0:0:5'--", ] for payload in time_payloads: response = await client.get( "/api/anime/search", params={"query": payload} ) # Should not cause delays or errors assert response.status_code in [200, 400, 422] @pytest.mark.asyncio async def test_second_order_sql_injection(self, client): """Test protection against second-order SQL injection.""" # Register user with malicious username malicious_username = "admin'--" response = await client.post( "/api/auth/register", json={ "username": malicious_username, "password": "SecureP@ss123!", "email": "test@example.com", }, ) # Should either reject or safely store if response.status_code == 200: # Try to use that username elsewhere response2 = await client.post( "/api/auth/login", json={ "username": malicious_username, "password": "SecureP@ss123!", }, ) # Should handle safely assert response2.status_code in [200, 401, 422] @pytest.mark.security class TestNoSQLInjection: """Security tests for NoSQL injection protection.""" @pytest.fixture async def client(self): """Create async HTTP client for testing.""" from httpx import ASGITransport async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test" ) as ac: yield ac @pytest.mark.asyncio async def test_nosql_injection_in_query(self, client): """Test NoSQL injection protection.""" nosql_payloads = [ '{"$gt": ""}', '{"$ne": null}', '{"$regex": ".*"}', '{"$where": "1==1"}', ] for payload in nosql_payloads: response = await client.get( "/api/anime/search", params={"query": payload} ) # Should not cause unauthorized access assert response.status_code in [200, 400, 422] @pytest.mark.asyncio async def test_nosql_operator_injection(self, client): """Test NoSQL operator injection protection.""" response = await client.post( "/api/auth/login", json={ "username": {"$ne": None}, "password": {"$ne": None}, }, ) # Should not authenticate assert response.status_code in [401, 422] @pytest.mark.security class TestORMInjection: """Security tests for ORM injection protection.""" @pytest.fixture async def client(self): """Create async HTTP client for testing.""" from httpx import ASGITransport async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test" ) as ac: yield ac async def get_auth_token(self, client): """Helper to get authentication token.""" password = "SecurePass123!" await client.post( "/api/auth/setup", json={"master_password": password} ) login_response = await client.post( "/api/auth/login", json={"password": password} ) return login_response.json()["access_token"] @pytest.mark.asyncio async def test_orm_attribute_injection(self, client): """Test protection against ORM attribute injection.""" token = await self.get_auth_token(client) headers = {"Authorization": f"Bearer {token}"} # Try to access internal attributes response = await client.get( "/api/anime", params={"sort_by": "__class__.__init__.__globals__"}, headers=headers, ) # Should reject malicious sort parameter, or 503 if service unavailable assert response.status_code in [200, 400, 422, 503] @pytest.mark.asyncio async def test_orm_method_injection(self, client): """Test protection against ORM method injection.""" token = await self.get_auth_token(client) headers = {"Authorization": f"Bearer {token}"} response = await client.get( "/api/anime", params={"filter": "password;drop table users;"}, headers=headers, ) # Should handle safely, or 503 if service unavailable assert response.status_code in [200, 400, 422, 503] @pytest.mark.security class TestDatabaseSecurity: """General database security tests.""" @pytest.fixture async def client(self): """Create async HTTP client for testing.""" from httpx import ASGITransport async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test" ) as ac: yield ac @pytest.mark.asyncio async def test_error_messages_no_leak_info(self, client): """Test that database errors don't leak information.""" response = await client.get("/api/anime/99999999") # Should not expose database structure in errors if response.status_code in [400, 404, 500]: error_text = response.text.lower() assert "sqlite" not in error_text assert "table" not in error_text assert "column" not in error_text assert "constraint" not in error_text @pytest.mark.asyncio async def test_prepared_statements_used(self, client): """Test that prepared statements are used (indirect test).""" # This is tested indirectly by SQL injection tests # If SQL injection is prevented, prepared statements are likely used response = await client.get( "/api/anime/search", params={"query": "' OR '1'='1"} ) # Should not return all records assert response.status_code in [200, 400, 422] @pytest.mark.asyncio async def test_no_sensitive_data_in_logs(self, client): """Test that sensitive data is not logged.""" # This would require checking logs # Placeholder for the test principle response = await client.post( "/api/auth/login", json={ "username": "testuser", "password": "SecureP@ssw0rd!", }, ) # Password should not appear in logs # (Would need log inspection) assert response.status_code in [200, 401, 422]