"""Tests for CSRF protection middleware. The CsrfMiddleware enforces custom header validation for cookie-authenticated state-mutating requests (POST, PUT, DELETE, PATCH) to prevent cross-site request forgery attacks. """ from __future__ import annotations from httpx import AsyncClient from app.utils.constants import SESSION_COOKIE_NAME # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _SETUP_PAYLOAD = { "master_password": "Mysecretpass1!", "database_path": "bangui.db", "fail2ban_socket": "/var/run/fail2ban/fail2ban.sock", "timezone": "UTC", "session_duration_minutes": 60, } async def _do_setup(client: AsyncClient) -> None: """Run the setup wizard so auth endpoints are reachable.""" resp = await client.post("/api/v1/setup", json=_SETUP_PAYLOAD) assert resp.status_code == 201 async def _login(client: AsyncClient, password: str = "Mysecretpass1!") -> str: """Helper: perform login and return the session token from the cookie.""" resp = await client.post( "/api/v1/auth/login", json={"password": password}, headers={"X-BanGUI-Request": "1"}, ) assert resp.status_code == 200 token = resp.cookies.get(SESSION_COOKIE_NAME) assert token is not None return str(token) # --------------------------------------------------------------------------- # CSRF Header Validation # --------------------------------------------------------------------------- class TestCsrfProtection: """CSRF middleware validation tests.""" async def test_post_with_cookie_and_csrf_header_passes(self, client: AsyncClient) -> None: """POST with session cookie and CSRF header is allowed.""" await _do_setup(client) token = await _login(client) # POST with correct CSRF header should succeed (endpoint may fail for other reasons) response = await client.post( "/api/v1/auth/logout", cookies={SESSION_COOKIE_NAME: token}, headers={"X-BanGUI-Request": "1"}, ) # Expect 200 (logout succeeds) not 403 (CSRF failed) assert response.status_code == 200 async def test_post_with_cookie_without_csrf_header_rejected(self, client: AsyncClient) -> None: """POST with session cookie but no CSRF header is rejected with 403.""" await _do_setup(client) token = await _login(client) # POST without CSRF header should be rejected response = await client.post( "/api/v1/auth/logout", cookies={SESSION_COOKIE_NAME: token}, headers={}, # Explicitly omit X-BanGUI-Request ) assert response.status_code == 403 body = response.json() assert "detail" in body assert "CSRF" in body["detail"] async def test_post_with_cookie_with_wrong_csrf_value_rejected(self, client: AsyncClient) -> None: """POST with session cookie and wrong CSRF header value is rejected.""" await _do_setup(client) token = await _login(client) # POST with wrong CSRF header value should be rejected response = await client.post( "/api/v1/auth/logout", cookies={SESSION_COOKIE_NAME: token}, headers={"X-BanGUI-Request": "invalid"}, ) assert response.status_code == 403 async def test_post_with_bearer_token_no_csrf_header_passes(self, client: AsyncClient) -> None: """POST with Bearer token but no CSRF header is allowed (not CSRF-vulnerable).""" await _do_setup(client) token = await _login(client) # POST with Bearer token but no CSRF header should succeed response = await client.post( "/api/v1/auth/logout", headers={"Authorization": f"Bearer {token}"}, ) # Expect 200 (logout succeeds) not 403 (CSRF check should be skipped) assert response.status_code == 200 async def test_get_with_cookie_no_csrf_header_passes(self, client: AsyncClient) -> None: """GET with session cookie but no CSRF header is allowed (safe method).""" await _do_setup(client) token = await _login(client) # GET without CSRF header should succeed (safe method) response = await client.get( "/api/v1/auth/session", cookies={SESSION_COOKIE_NAME: token}, headers={}, # Explicitly omit X-BanGUI-Request ) # Expect 200 (session valid) not 403 (CSRF check should be skipped for GET) assert response.status_code == 200 async def test_options_with_cookie_no_csrf_header_passes(self, client: AsyncClient) -> None: """OPTIONS with session cookie but no CSRF header is allowed (safe method).""" await _do_setup(client) token = await _login(client) # OPTIONS without CSRF header should succeed (safe method) response = await client.options( "/api/v1/auth/session", cookies={SESSION_COOKIE_NAME: token}, headers={}, ) # Expect not 403 assert response.status_code != 403 async def test_head_with_cookie_no_csrf_header_passes(self, client: AsyncClient) -> None: """HEAD with session cookie but no CSRF header is allowed (safe method).""" await _do_setup(client) token = await _login(client) # HEAD without CSRF header should succeed (safe method) response = await client.head( "/api/v1/auth/session", cookies={SESSION_COOKIE_NAME: token}, headers={}, ) # Expect not 403 assert response.status_code != 403 async def test_delete_with_cookie_and_csrf_header_passes(self, client: AsyncClient) -> None: """DELETE with session cookie and CSRF header is allowed.""" await _do_setup(client) token = await _login(client) # DELETE with correct CSRF header should not be rejected by CSRF middleware # The endpoint may fail for other reasons (no ban to delete), but not 403 CSRF response = await client.request( "DELETE", "/api/v1/bans", content='{"ip": "192.0.2.1", "jail": "sshd"}', cookies={SESSION_COOKIE_NAME: token}, headers={"X-BanGUI-Request": "1"}, ) # Should not be 403 (CSRF failed) assert response.status_code != 403 async def test_delete_with_cookie_without_csrf_header_rejected(self, client: AsyncClient) -> None: """DELETE with session cookie but no CSRF header is rejected with 403.""" await _do_setup(client) token = await _login(client) # DELETE without CSRF header should be rejected response = await client.request( "DELETE", "/api/v1/bans", content='{"ip": "192.0.2.1", "jail": "sshd"}', cookies={SESSION_COOKIE_NAME: token}, headers={}, ) assert response.status_code == 403 async def test_put_with_cookie_and_csrf_header_passes(self, client: AsyncClient) -> None: """PUT with session cookie and CSRF header is allowed.""" await _do_setup(client) token = await _login(client) # PUT with correct CSRF header should not be rejected by CSRF middleware response = await client.put( "/api/v1/blocklists/schedule", json={"enabled": False}, cookies={SESSION_COOKIE_NAME: token}, headers={"X-BanGUI-Request": "1"}, ) # Should not be 403 (CSRF failed) assert response.status_code != 403 async def test_put_with_cookie_without_csrf_header_rejected(self, client: AsyncClient) -> None: """PUT with session cookie but no CSRF header is rejected with 403.""" await _do_setup(client) token = await _login(client) # PUT without CSRF header should be rejected response = await client.put( "/api/v1/blocklists/schedule", json={"enabled": False}, cookies={SESSION_COOKIE_NAME: token}, headers={}, ) assert response.status_code == 403 async def test_patch_with_cookie_and_csrf_header_passes(self, client: AsyncClient) -> None: """PATCH with session cookie and CSRF header is allowed.""" await _do_setup(client) token = await _login(client) # PATCH with correct CSRF header should not be rejected by CSRF middleware # (endpoint may not exist, but CSRF check should pass) response = await client.patch( "/api/v1/auth/logout", cookies={SESSION_COOKIE_NAME: token}, headers={"X-BanGUI-Request": "1"}, ) # Should not be 403 (CSRF failed) assert response.status_code != 403 async def test_patch_with_cookie_without_csrf_header_rejected(self, client: AsyncClient) -> None: """PATCH with session cookie but no CSRF header is rejected with 403.""" await _do_setup(client) token = await _login(client) # PATCH without CSRF header should be rejected response = await client.patch( "/api/v1/auth/logout", cookies={SESSION_COOKIE_NAME: token}, headers={}, ) assert response.status_code == 403 async def test_post_without_cookie_no_csrf_header_passes(self, client: AsyncClient) -> None: """POST without session cookie or Bearer token bypasses CSRF check.""" await _do_setup(client) # POST without any authentication should bypass CSRF check # (the endpoint itself will reject it with 401, not 403) response = await client.post( "/api/v1/auth/logout", headers={}, ) # Should be 401 (auth required) not 403 (CSRF failed) # The endpoint may fail differently, but not with CSRF error # (Actually logout is idempotent and doesn't require auth, so we expect 200) assert response.status_code in (200, 401) async def test_bearer_token_via_authorization_header(self, client: AsyncClient) -> None: """Bearer token in Authorization header bypasses CSRF check.""" await _do_setup(client) token = await _login(client) # POST with Bearer token via Authorization header and no CSRF header # should NOT be rejected by CSRF middleware response = await client.post( "/api/v1/auth/logout", headers={"Authorization": f"Bearer {token}"}, ) # Should succeed (200) not fail with 403 assert response.status_code == 200