Files
BanGUI/backend/tests/test_routers/test_auth.py
Lukas 93021500c3 TASK-033: Remove session token from JSON response body
Fixes a critical security vulnerability where the session token was
being returned in the JSON response body of POST /api/auth/login.
This exposed the token to JavaScript, allowing malicious scripts to
steal it and bypass the HttpOnly cookie protection.

Changes:
- Backend: Remove 'token' field from LoginResponse model (auth.py)
- Backend: Update login() endpoint to return only 'expires_at'
- Frontend: Update LoginResponse type to exclude 'token' field
- Backend: Update test helper _login() to extract token from cookie
- Backend: Update test cases to verify token is NOT in response body
- Documentation: Add section 'Authentication Endpoints' in Backend-Development.md
- Documentation: Update Web-Development.md to explain HttpOnly cookie benefits

Security benefit: Session tokens are now only accessible via HttpOnly
cookies, protected from JavaScript access, XSS attacks, and malicious
third-party scripts. The frontend continues to use only the cookie for
authentication.

All auth tests pass (23 tests). Type checking and linting pass with
zero errors.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-26 19:38:33 +02:00

482 lines
18 KiB
Python

"""Tests for the auth router (POST /api/auth/login, POST /api/auth/logout)."""
from __future__ import annotations
from collections.abc import Generator
from unittest.mock import patch
import pytest
from httpx import AsyncClient
from app.utils.constants import SESSION_COOKIE_NAME
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_SETUP_PAYLOAD = {
"master_password": "Mysecretpass1!",
"database_path": "bangui.db",
"fail2ban_socket": "/var/run/fail2ban/fail2ban.sock",
"timezone": "UTC",
"session_duration_minutes": 60,
}
async def _do_setup(client: AsyncClient) -> None:
"""Run the setup wizard so auth endpoints are reachable."""
resp = await client.post("/api/setup", json=_SETUP_PAYLOAD)
assert resp.status_code == 201
async def _login(client: AsyncClient, password: str = "Mysecretpass1!") -> str:
"""Helper: perform login and return the session token from the cookie.
Note: The token is returned in the HttpOnly cookie, not in the JSON body.
For testing Bearer token auth, we extract it from the cookie.
"""
resp = await client.post("/api/auth/login", json={"password": password})
assert resp.status_code == 200
token = resp.cookies.get(SESSION_COOKIE_NAME)
assert token is not None
return str(token)
# ---------------------------------------------------------------------------
# Login
# ---------------------------------------------------------------------------
class TestLogin:
"""POST /api/auth/login."""
async def test_login_succeeds_with_correct_password(
self, client: AsyncClient
) -> None:
"""Login returns 200 and sets a session cookie for the correct password."""
await _do_setup(client)
response = await client.post(
"/api/auth/login", json={"password": "Mysecretpass1!"}
)
assert response.status_code == 200
body = response.json()
# Token is not returned in the JSON body; it's set as an HttpOnly cookie
assert "token" not in body
assert "expires_at" in body
async def test_login_sets_cookie(self, client: AsyncClient) -> None:
"""Login sets the bangui_session HttpOnly cookie."""
await _do_setup(client)
response = await client.post(
"/api/auth/login", json={"password": "Mysecretpass1!"}
)
assert response.status_code == 200
assert SESSION_COOKIE_NAME in response.cookies
assert "." in response.cookies[SESSION_COOKIE_NAME]
set_cookie = response.headers.get("set-cookie", "")
assert "HttpOnly" in set_cookie
assert "SameSite=lax" in set_cookie
async def test_login_sets_secure_cookie_when_enabled(
self, client: AsyncClient
) -> None:
"""Login sets the Secure flag when session cookies are configured for HTTPS."""
client._transport.app.state.settings.session_cookie_secure = True
await _do_setup(client)
response = await client.post(
"/api/auth/login", json={"password": "Mysecretpass1!"}
)
assert response.status_code == 200
set_cookie = response.headers.get("set-cookie", "")
assert "Secure" in set_cookie
async def test_login_fails_with_wrong_password(
self, client: AsyncClient
) -> None:
"""Login returns 401 for an incorrect password."""
await _do_setup(client)
response = await client.post(
"/api/auth/login", json={"password": "wrongpassword"}
)
assert response.status_code == 401
async def test_login_rejects_empty_password(self, client: AsyncClient) -> None:
"""Login returns 422 when password field is missing."""
await _do_setup(client)
response = await client.post("/api/auth/login", json={})
assert response.status_code == 422
async def test_login_rate_limit_returns_429_after_5_attempts(
self, client: AsyncClient
) -> None:
"""Login returns 429 after 5 failed attempts within 60 seconds."""
await _do_setup(client)
# Make 5 failed login attempts
for i in range(5):
response = await client.post(
"/api/auth/login", json={"password": "wrongpassword"}
)
assert response.status_code == 401, f"Expected 401 on attempt {i + 1}"
# 6th attempt should be rate-limited
response = await client.post(
"/api/auth/login", json={"password": "Hallo123!"}
)
assert response.status_code == 429
assert response.json()["detail"] == "Too many login attempts. Please try again later."
async def test_login_rate_limit_includes_retry_after_header(
self, client: AsyncClient
) -> None:
"""Rate-limited response includes Retry-After header."""
await _do_setup(client)
# Exceed rate limit
for _ in range(5):
await client.post("/api/auth/login", json={"password": "wrong"})
response = await client.post(
"/api/auth/login", json={"password": "wrong"}
)
assert response.status_code == 429
assert "retry-after" in response.headers
assert response.headers["retry-after"] == "60"
async def test_login_rate_limit_per_ip(
self, client: AsyncClient
) -> None:
"""Rate limit is tracked separately per IP address."""
await _do_setup(client)
# Make 5 failed attempts with default IP
for _ in range(5):
await client.post("/api/auth/login", json={"password": "wrong"})
# 6th attempt is blocked
response = await client.post(
"/api/auth/login", json={"password": "correct"}
)
assert response.status_code == 429
# Simulate request from different IP via X-Forwarded-For
# (trusted proxy required to honor header, but we can test the logic)
response_from_other_ip = await client.post(
"/api/auth/login",
json={"password": "wrong"},
headers={"X-Forwarded-For": "203.0.113.1"}, # Different IP
)
# This should succeed (not rate-limited) because it's a different IP
# However, without a trusted proxy configured, the X-Forwarded-For is ignored
# So this will still use the client's actual IP and be rate-limited
# We can still verify the rate limiter state to confirm the design
limiter = client._transport.app.state.login_rate_limiter
assert "127.0.0.1" in limiter.get_state()
async def test_login_rate_limit_reset_after_window(
self, client: AsyncClient
) -> None:
"""Rate limit counter resets after the window expires."""
await _do_setup(client)
limiter = client._transport.app.state.login_rate_limiter
limiter.reset()
# Make 5 failed attempts
for _ in range(5):
await client.post("/api/auth/login", json={"password": "wrong"})
response = await client.post(
"/api/auth/login", json={"password": "wrong"}
)
assert response.status_code == 429
# Manually advance time by clearing old attempts
# In real scenario, this happens naturally as time passes
limiter.cleanup_expired()
# Simulate the full window expiring by resetting
limiter.reset()
# Now a fresh login attempt should succeed (use correct password)
response = await client.post(
"/api/auth/login", json={"password": "Mysecretpass1!"}
)
assert response.status_code == 200
# ---------------------------------------------------------------------------
# Logout
# ---------------------------------------------------------------------------
class TestLogout:
"""POST /api/auth/logout."""
async def test_logout_returns_200(self, client: AsyncClient) -> None:
"""Logout returns 200 with a confirmation message."""
await _do_setup(client)
await _login(client)
response = await client.post("/api/auth/logout")
assert response.status_code == 200
assert "message" in response.json()
async def test_logout_clears_cookie(self, client: AsyncClient) -> None:
"""Logout clears the bangui_session cookie."""
await _do_setup(client)
await _login(client) # sets cookie on client
response = await client.post("/api/auth/logout")
assert response.status_code == 200
# Cookie should be set to empty / deleted in the Set-Cookie header.
set_cookie = response.headers.get("set-cookie", "")
assert SESSION_COOKIE_NAME in set_cookie
async def test_logout_is_idempotent(self, client: AsyncClient) -> None:
"""Logout succeeds even when called without a session token."""
await _do_setup(client)
response = await client.post("/api/auth/logout")
assert response.status_code == 200
async def test_session_invalid_after_logout(
self, client: AsyncClient
) -> None:
"""A session token is rejected after logout."""
await _do_setup(client)
token = await _login(client)
await client.post("/api/auth/logout")
# Now try to use the invalidated token via Bearer header. The health
# endpoint is unprotected so we validate against a hypothetical
# protected endpoint by inspecting the auth service directly.
# Here we just confirm the token is no longer in the DB by trying
# to re-use it on logout (idempotent — still 200, not an error).
response = await client.post(
"/api/auth/logout",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200
# ---------------------------------------------------------------------------
# Auth dependency (protected route guard)
# ---------------------------------------------------------------------------
class TestRequireAuth:
"""Verify the require_auth dependency rejects unauthenticated requests."""
async def test_health_endpoint_requires_no_auth(
self, client: AsyncClient
) -> None:
"""Health endpoint is accessible without authentication."""
response = await client.get("/api/health")
assert response.status_code == 200
async def test_session_cache_is_disabled_by_default(
self, client: AsyncClient
) -> None:
"""Session validation does not use the in-memory cache unless enabled."""
from app.repositories import session_repo
await _do_setup(client)
token = await _login(client)
call_count = 0
original_get_session = session_repo.get_session
async def _tracking(db, tok): # type: ignore[no-untyped-def]
nonlocal call_count
call_count += 1
return await original_get_session(db, tok)
with patch.object(session_repo, "get_session", side_effect=_tracking):
resp1 = await client.get(
"/api/dashboard/status",
headers={"Authorization": f"Bearer {token}"},
)
resp2 = await client.get(
"/api/dashboard/status",
headers={"Authorization": f"Bearer {token}"},
)
assert resp1.status_code == 200
assert resp2.status_code == 200
assert call_count == 2
# ---------------------------------------------------------------------------
# Session validation (Task 4)
# ---------------------------------------------------------------------------
class TestValidateSession:
"""GET /api/auth/session."""
async def test_validate_session_returns_200_with_valid_token(
self, client: AsyncClient
) -> None:
"""Validate session returns 200 for a valid authenticated request."""
await _do_setup(client)
token = await _login(client)
# Use Bearer token to authenticate
response = await client.get(
"/api/auth/session",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200
assert response.json() == {"valid": True}
async def test_validate_session_returns_401_without_token(
self, client: AsyncClient
) -> None:
"""Validate session returns 401 when no token is present."""
await _do_setup(client)
response = await client.get("/api/auth/session")
assert response.status_code == 401
async def test_validate_session_returns_401_with_invalid_token(
self, client: AsyncClient
) -> None:
"""Validate session returns 401 for an invalid or expired token."""
await _do_setup(client)
response = await client.get(
"/api/auth/session",
headers={"Authorization": "Bearer invalidtoken"},
)
assert response.status_code == 401
async def test_validate_session_with_cookie(
self, client: AsyncClient
) -> None:
"""Validate session works with cookie-based authentication."""
await _do_setup(client)
token = await _login(client)
# httpx should automatically send the cookie, but use Bearer token as fallback
response = await client.get(
"/api/auth/session",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200
assert response.json() == {"valid": True}
async def test_validate_session_after_logout(
self, client: AsyncClient
) -> None:
"""Validate session returns 401 after logout."""
await _do_setup(client)
token = await _login(client)
await client.post(
"/api/auth/logout",
headers={"Authorization": f"Bearer {token}"},
)
response = await client.get(
"/api/auth/session",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 401
# ---------------------------------------------------------------------------
# Session-token cache (Task 4)
# ---------------------------------------------------------------------------
class TestRequireAuthSessionCache:
"""In-memory session token cache inside ``require_auth``."""
@pytest.fixture(autouse=True)
def reset_cache(self, client: AsyncClient) -> Generator[None, None, None]:
"""Flush the session cache before and after every test in this class."""
client._transport.app.state.session_cache.clear()
yield
client._transport.app.state.session_cache.clear()
@pytest.fixture(autouse=True)
def enable_session_cache(self, client: AsyncClient) -> Generator[None, None, None]:
"""Enable the in-memory auth cache for tests that exercise it."""
client._transport.app.state.settings.session_cache_enabled = True
client._transport.app.state.settings.session_cache_ttl_seconds = 10.0
yield
async def test_second_request_skips_db(self, client: AsyncClient) -> None:
"""Second authenticated request within TTL skips the session DB query.
The first request populates the in-memory cache via ``require_auth``.
The second request — using the same token before the TTL expires —
must return ``session_repo.get_session`` *without* calling it.
"""
from app.repositories import session_repo
await _do_setup(client)
token = await _login(client)
# Ensure cache is empty so the first request definitely hits the DB.
client._transport.app.state.session_cache.clear()
call_count = 0
original_get_session = session_repo.get_session
async def _tracking(db, tok): # type: ignore[no-untyped-def]
nonlocal call_count
call_count += 1
return await original_get_session(db, tok)
with patch.object(session_repo, "get_session", side_effect=_tracking):
resp1 = await client.get(
"/api/dashboard/status",
headers={"Authorization": f"Bearer {token}"},
)
resp2 = await client.get(
"/api/dashboard/status",
headers={"Authorization": f"Bearer {token}"},
)
assert resp1.status_code == 200
assert resp2.status_code == 200
# DB queried exactly once: the first request populates the cache,
# the second request is served entirely from memory.
assert call_count == 1
async def test_token_enters_cache_after_first_auth(
self, client: AsyncClient
) -> None:
"""A successful auth request places the token in the session cache."""
await _do_setup(client)
token = await _login(client)
client._transport.app.state.session_cache.clear()
assert client._transport.app.state.session_cache.get(token) is None
await client.get(
"/api/dashboard/status",
headers={"Authorization": f"Bearer {token}"},
)
assert client._transport.app.state.session_cache.get(token) is not None
async def test_logout_evicts_token_from_cache(
self, client: AsyncClient
) -> None:
"""Logout removes the session token from the session cache immediately."""
await _do_setup(client)
token = await _login(client)
# Warm the cache.
await client.get(
"/api/dashboard/status",
headers={"Authorization": f"Bearer {token}"},
)
assert client._transport.app.state.session_cache.get(token) is not None
# Logout must evict the entry.
await client.post(
"/api/auth/logout",
headers={"Authorization": f"Bearer {token}"},
)
assert client._transport.app.state.session_cache.get(token) is None
response = await client.get("/api/health")
assert response.status_code == 200