Files
BanGUI/backend/tests/test_routers/test_csrf.py
Lukas cc6dbcf3f0 feat: implement API versioning /api/v1/
- All backend routers moved to /api/v1/ prefix
- Frontend BASE_URL updated to /api/v1
- Setup redirect middleware updated to redirect to /api/v1/setup
- Health router path fixed: prefix=/api/v1/health, @router.get('')
- conftest.py: set server_status=online for test fixture
- Created Docs/API_VERSIONING.md with deprecation policy
- Updated Docs/Backend-Development.md with versioning section
- Updated Instructions.md curl examples

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-02 21:29:30 +02:00

297 lines
11 KiB
Python

"""Tests for CSRF protection middleware.
The CsrfMiddleware enforces custom header validation for cookie-authenticated
state-mutating requests (POST, PUT, DELETE, PATCH) to prevent cross-site
request forgery attacks.
"""
from __future__ import annotations
from httpx import AsyncClient
from app.utils.constants import SESSION_COOKIE_NAME
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_SETUP_PAYLOAD = {
"master_password": "Mysecretpass1!",
"database_path": "bangui.db",
"fail2ban_socket": "/var/run/fail2ban/fail2ban.sock",
"timezone": "UTC",
"session_duration_minutes": 60,
}
async def _do_setup(client: AsyncClient) -> None:
"""Run the setup wizard so auth endpoints are reachable."""
resp = await client.post("/api/v1/setup", json=_SETUP_PAYLOAD)
assert resp.status_code == 201
async def _login(client: AsyncClient, password: str = "Mysecretpass1!") -> str:
"""Helper: perform login and return the session token."""
resp = await client.post(
"/api/v1/auth/login",
json={"password": password},
headers={"X-BanGUI-Request": "1"},
)
assert resp.status_code == 200
return str(resp.json()["token"])
# ---------------------------------------------------------------------------
# CSRF Header Validation
# ---------------------------------------------------------------------------
class TestCsrfProtection:
"""CSRF middleware validation tests."""
async def test_post_with_cookie_and_csrf_header_passes(
self, client: AsyncClient
) -> None:
"""POST with session cookie and CSRF header is allowed."""
await _do_setup(client)
token = await _login(client)
# POST with correct CSRF header should succeed (endpoint may fail for other reasons)
response = await client.post(
"/api/v1/auth/logout",
cookies={SESSION_COOKIE_NAME: token},
headers={"X-BanGUI-Request": "1"},
)
# Expect 200 (logout succeeds) not 403 (CSRF failed)
assert response.status_code == 200
async def test_post_with_cookie_without_csrf_header_rejected(
self, client: AsyncClient
) -> None:
"""POST with session cookie but no CSRF header is rejected with 403."""
await _do_setup(client)
token = await _login(client)
# POST without CSRF header should be rejected
response = await client.post(
"/api/v1/auth/logout",
cookies={SESSION_COOKIE_NAME: token},
headers={}, # Explicitly omit X-BanGUI-Request
)
assert response.status_code == 403
body = response.json()
assert "detail" in body
assert "CSRF" in body["detail"]
async def test_post_with_cookie_with_wrong_csrf_value_rejected(
self, client: AsyncClient
) -> None:
"""POST with session cookie and wrong CSRF header value is rejected."""
await _do_setup(client)
token = await _login(client)
# POST with wrong CSRF header value should be rejected
response = await client.post(
"/api/v1/auth/logout",
cookies={SESSION_COOKIE_NAME: token},
headers={"X-BanGUI-Request": "invalid"},
)
assert response.status_code == 403
async def test_post_with_bearer_token_no_csrf_header_passes(
self, client: AsyncClient
) -> None:
"""POST with Bearer token but no CSRF header is allowed (not CSRF-vulnerable)."""
await _do_setup(client)
token = await _login(client)
# POST with Bearer token but no CSRF header should succeed
response = await client.post(
"/api/v1/auth/logout",
headers={"Authorization": f"Bearer {token}"},
)
# Expect 200 (logout succeeds) not 403 (CSRF check should be skipped)
assert response.status_code == 200
async def test_get_with_cookie_no_csrf_header_passes(
self, client: AsyncClient
) -> None:
"""GET with session cookie but no CSRF header is allowed (safe method)."""
await _do_setup(client)
token = await _login(client)
# GET without CSRF header should succeed (safe method)
response = await client.get(
"/api/v1/auth/session",
cookies={SESSION_COOKIE_NAME: token},
headers={}, # Explicitly omit X-BanGUI-Request
)
# Expect 200 (session valid) not 403 (CSRF check should be skipped for GET)
assert response.status_code == 200
async def test_options_with_cookie_no_csrf_header_passes(
self, client: AsyncClient
) -> None:
"""OPTIONS with session cookie but no CSRF header is allowed (safe method)."""
await _do_setup(client)
token = await _login(client)
# OPTIONS without CSRF header should succeed (safe method)
response = await client.options(
"/api/v1/auth/session",
cookies={SESSION_COOKIE_NAME: token},
headers={},
)
# Expect not 403
assert response.status_code != 403
async def test_head_with_cookie_no_csrf_header_passes(
self, client: AsyncClient
) -> None:
"""HEAD with session cookie but no CSRF header is allowed (safe method)."""
await _do_setup(client)
token = await _login(client)
# HEAD without CSRF header should succeed (safe method)
response = await client.head(
"/api/v1/auth/session",
cookies={SESSION_COOKIE_NAME: token},
headers={},
)
# Expect not 403
assert response.status_code != 403
async def test_delete_with_cookie_and_csrf_header_passes(
self, client: AsyncClient
) -> None:
"""DELETE with session cookie and CSRF header is allowed."""
await _do_setup(client)
token = await _login(client)
# DELETE with correct CSRF header should not be rejected by CSRF middleware
# The endpoint may fail for other reasons (no ban to delete), but not 403 CSRF
response = await client.request(
"DELETE",
"/api/v1/bans",
content='{"ip": "192.0.2.1", "jail": "sshd"}',
cookies={SESSION_COOKIE_NAME: token},
headers={"X-BanGUI-Request": "1"},
)
# Should not be 403 (CSRF failed)
assert response.status_code != 403
async def test_delete_with_cookie_without_csrf_header_rejected(
self, client: AsyncClient
) -> None:
"""DELETE with session cookie but no CSRF header is rejected with 403."""
await _do_setup(client)
token = await _login(client)
# DELETE without CSRF header should be rejected
response = await client.request(
"DELETE",
"/api/v1/bans",
content='{"ip": "192.0.2.1", "jail": "sshd"}',
cookies={SESSION_COOKIE_NAME: token},
headers={},
)
assert response.status_code == 403
async def test_put_with_cookie_and_csrf_header_passes(
self, client: AsyncClient
) -> None:
"""PUT with session cookie and CSRF header is allowed."""
await _do_setup(client)
token = await _login(client)
# PUT with correct CSRF header should not be rejected by CSRF middleware
response = await client.put(
"/api/v1/blocklists/schedule",
json={"enabled": False},
cookies={SESSION_COOKIE_NAME: token},
headers={"X-BanGUI-Request": "1"},
)
# Should not be 403 (CSRF failed)
assert response.status_code != 403
async def test_put_with_cookie_without_csrf_header_rejected(
self, client: AsyncClient
) -> None:
"""PUT with session cookie but no CSRF header is rejected with 403."""
await _do_setup(client)
token = await _login(client)
# PUT without CSRF header should be rejected
response = await client.put(
"/api/v1/blocklists/schedule",
json={"enabled": False},
cookies={SESSION_COOKIE_NAME: token},
headers={},
)
assert response.status_code == 403
async def test_patch_with_cookie_and_csrf_header_passes(
self, client: AsyncClient
) -> None:
"""PATCH with session cookie and CSRF header is allowed."""
await _do_setup(client)
token = await _login(client)
# PATCH with correct CSRF header should not be rejected by CSRF middleware
# (endpoint may not exist, but CSRF check should pass)
response = await client.patch(
"/api/v1/auth/logout",
cookies={SESSION_COOKIE_NAME: token},
headers={"X-BanGUI-Request": "1"},
)
# Should not be 403 (CSRF failed)
assert response.status_code != 403
async def test_patch_with_cookie_without_csrf_header_rejected(
self, client: AsyncClient
) -> None:
"""PATCH with session cookie but no CSRF header is rejected with 403."""
await _do_setup(client)
token = await _login(client)
# PATCH without CSRF header should be rejected
response = await client.patch(
"/api/v1/auth/logout",
cookies={SESSION_COOKIE_NAME: token},
headers={},
)
assert response.status_code == 403
async def test_post_without_cookie_no_csrf_header_passes(
self, client: AsyncClient
) -> None:
"""POST without session cookie or Bearer token bypasses CSRF check."""
await _do_setup(client)
# POST without any authentication should bypass CSRF check
# (the endpoint itself will reject it with 401, not 403)
response = await client.post(
"/api/v1/auth/logout",
headers={},
)
# Should be 401 (auth required) not 403 (CSRF failed)
# The endpoint may fail differently, but not with CSRF error
# (Actually logout is idempotent and doesn't require auth, so we expect 200)
assert response.status_code in (200, 401)
async def test_bearer_token_via_authorization_header(
self, client: AsyncClient
) -> None:
"""Bearer token in Authorization header bypasses CSRF check."""
await _do_setup(client)
token = await _login(client)
# POST with Bearer token via Authorization header and no CSRF header
# should NOT be rejected by CSRF middleware
response = await client.post(
"/api/v1/auth/logout",
headers={"Authorization": f"Bearer {token}"},
)
# Should succeed (200) not fail with 403
assert response.status_code == 200