"""Tests for the auth router (POST /api/auth/login, POST /api/auth/logout).""" from __future__ import annotations import asyncio from collections.abc import Generator from unittest.mock import patch import pytest from httpx import AsyncClient from app.utils.constants import SESSION_COOKIE_NAME # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _SETUP_PAYLOAD = { "master_password": "Mysecretpass1!", "database_path": "bangui.db", "fail2ban_socket": "/var/run/fail2ban/fail2ban.sock", "timezone": "UTC", "session_duration_minutes": 60, } async def _do_setup(client: AsyncClient) -> None: """Run the setup wizard so auth endpoints are reachable.""" resp = await client.post("/api/v1/setup", json=_SETUP_PAYLOAD) assert resp.status_code == 201 async def _login(client: AsyncClient, password: str = "Mysecretpass1!") -> str: """Helper: perform login and return the session token from the cookie. Note: The token is returned in the HttpOnly cookie, not in the JSON body. For testing Bearer token auth, we extract it from the cookie. """ resp = await client.post("/api/v1/auth/login", json={"password": password}) assert resp.status_code == 200 token = resp.cookies.get(SESSION_COOKIE_NAME) assert token is not None return str(token) # --------------------------------------------------------------------------- # Login # --------------------------------------------------------------------------- class TestLogin: """POST /api/auth/login.""" async def test_login_succeeds_with_correct_password( self, client: AsyncClient ) -> None: """Login returns 200 and sets a session cookie for the correct password.""" await _do_setup(client) response = await client.post( "/api/v1/auth/login", json={"password": "Mysecretpass1!"} ) assert response.status_code == 200 body = response.json() # Token is not returned in the JSON body; it's set as an HttpOnly cookie assert "token" not in body assert "expires_at" in body async def test_login_sets_cookie(self, client: AsyncClient) -> None: """Login sets the bangui_session HttpOnly cookie.""" await _do_setup(client) response = await client.post( "/api/v1/auth/login", json={"password": "Mysecretpass1!"} ) assert response.status_code == 200 assert SESSION_COOKIE_NAME in response.cookies assert "." in response.cookies[SESSION_COOKIE_NAME] set_cookie = response.headers.get("set-cookie", "") assert "HttpOnly" in set_cookie assert "SameSite=lax" in set_cookie async def test_login_sets_secure_cookie_when_enabled( self, client: AsyncClient ) -> None: """Login sets the Secure flag when session cookies are configured for HTTPS.""" client._transport.app.state.settings.session_cookie_secure = True await _do_setup(client) response = await client.post( "/api/v1/auth/login", json={"password": "Mysecretpass1!"} ) assert response.status_code == 200 set_cookie = response.headers.get("set-cookie", "") assert "Secure" in set_cookie async def test_login_fails_with_wrong_password( self, client: AsyncClient ) -> None: """Login returns 401 for an incorrect password.""" await _do_setup(client) response = await client.post( "/api/v1/auth/login", json={"password": "wrongpassword"} ) assert response.status_code == 401 async def test_login_rejects_empty_password(self, client: AsyncClient) -> None: """Login returns 422 when password field is missing.""" await _do_setup(client) response = await client.post("/api/v1/auth/login", json={}) assert response.status_code == 422 async def test_login_rate_limit_returns_429_after_5_attempts( self, client: AsyncClient ) -> None: """Login is blocked immediately after first failed attempt due to exponential backoff.""" await _do_setup(client) limiter = client._transport.app.state.login_rate_limiter limiter.reset() # First failed attempt is allowed response = await client.post( "/api/v1/auth/login", json={"password": "wrongpassword"} ) assert response.status_code == 401 # Second attempt immediately after is blocked by 1s penalty response = await client.post( "/api/v1/auth/login", json={"password": "wrongpassword"} ) assert response.status_code == 429 assert response.json()["detail"] == "Too many login attempts. Please try again later." # Verify the failure count is correct state = limiter.get_state() assert "127.0.0.1" in state assert state["127.0.0.1"] >= 1 async def test_login_rate_limit_includes_retry_after_header( self, client: AsyncClient ) -> None: """Rate-limited response includes Retry-After header.""" await _do_setup(client) limiter = client._transport.app.state.login_rate_limiter limiter.reset() # First attempt fails response = await client.post("/api/v1/auth/login", json={"password": "wrong"}) assert response.status_code == 401 # Second immediate attempt is rate-limited response = await client.post("/api/v1/auth/login", json={"password": "wrong"}) assert response.status_code == 429 assert "retry-after" in response.headers assert response.headers["retry-after"] == "60" async def test_login_rate_limit_per_ip( self, client: AsyncClient ) -> None: """Rate limit is tracked separately per IP address.""" await _do_setup(client) limiter = client._transport.app.state.login_rate_limiter limiter.reset() # Make 1 failed attempt with default IP response = await client.post("/api/v1/auth/login", json={"password": "wrong"}) assert response.status_code == 401 # 2nd attempt is blocked response = await client.post( "/api/v1/auth/login", json={"password": "correct"} ) assert response.status_code == 429 # Verify the failure count is correct state = limiter.get_state() assert "127.0.0.1" in state assert state["127.0.0.1"] >= 1 async def test_login_rate_limit_reset_after_window( self, client: AsyncClient ) -> None: """Rate limit counter resets after the window expires.""" await _do_setup(client) limiter = client._transport.app.state.login_rate_limiter limiter.reset() # Make 1 failed attempt (enough to trigger exponential backoff) response = await client.post("/api/v1/auth/login", json={"password": "wrong"}) assert response.status_code == 401 # 2nd attempt is blocked response = await client.post( "/api/v1/auth/login", json={"password": "wrong"} ) assert response.status_code == 429 # Reset the limiter (simulate window expiry) limiter.reset() # Now a fresh login attempt should succeed (use correct password) response = await client.post( "/api/v1/auth/login", json={"password": "Mysecretpass1!"} ) assert response.status_code == 200 async def test_login_exponential_backoff(self, client: AsyncClient) -> None: """Exponential backoff accumulates with each consecutive failure.""" await _do_setup(client) limiter = client._transport.app.state.login_rate_limiter limiter.reset() # 1st failure: 1 * 2^1 = 2s penalty response = await client.post("/api/v1/auth/login", json={"password": "wrong"}) assert response.status_code == 401 state = limiter.get_state() assert state["127.0.0.1"] == 1 # 2nd attempt blocked immediately by 2s penalty response = await client.post("/api/v1/auth/login", json={"password": "wrong"}) assert response.status_code == 429 # After 2.1s, the penalty expires and we can try again # (this will record a 2nd failure, creating a 1 * 2^2 = 4s penalty) await asyncio.sleep(2.1) response = await client.post("/api/v1/auth/login", json={"password": "wrong"}) assert response.status_code == 401 state = limiter.get_state() assert state["127.0.0.1"] == 2 # Now blocked by 4s penalty response = await client.post("/api/v1/auth/login", json={"password": "wrong"}) assert response.status_code == 429 # --------------------------------------------------------------------------- # Logout # --------------------------------------------------------------------------- class TestLogout: """POST /api/auth/logout.""" async def test_logout_returns_200(self, client: AsyncClient) -> None: """Logout returns 200 with a confirmation message.""" await _do_setup(client) await _login(client) response = await client.post("/api/v1/auth/logout") assert response.status_code == 200 assert "message" in response.json() async def test_logout_clears_cookie(self, client: AsyncClient) -> None: """Logout clears the bangui_session cookie.""" await _do_setup(client) await _login(client) # sets cookie on client response = await client.post("/api/v1/auth/logout") assert response.status_code == 200 # Cookie should be set to empty / deleted in the Set-Cookie header. set_cookie = response.headers.get("set-cookie", "") assert SESSION_COOKIE_NAME in set_cookie async def test_logout_is_idempotent(self, client: AsyncClient) -> None: """Logout succeeds even when called without a session token.""" await _do_setup(client) response = await client.post("/api/v1/auth/logout") assert response.status_code == 200 async def test_session_invalid_after_logout( self, client: AsyncClient ) -> None: """A session token is rejected after logout.""" await _do_setup(client) token = await _login(client) await client.post("/api/v1/auth/logout") # Now try to use the invalidated token via Bearer header. The health # endpoint is unprotected so we validate against a hypothetical # protected endpoint by inspecting the auth service directly. # Here we just confirm the token is no longer in the DB by trying # to re-use it on logout (idempotent — still 200, not an error). response = await client.post( "/api/v1/auth/logout", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 200 # --------------------------------------------------------------------------- # Auth dependency (protected route guard) # --------------------------------------------------------------------------- class TestRequireAuth: """Verify the require_auth dependency rejects unauthenticated requests.""" async def test_health_endpoint_requires_no_auth( self, client: AsyncClient ) -> None: """Health endpoint is accessible without authentication.""" response = await client.get("/api/v1/health") assert response.status_code == 200 async def test_session_cache_is_disabled_by_default( self, client: AsyncClient ) -> None: """Session validation does not use the in-memory cache unless enabled.""" from app.repositories import session_repo await _do_setup(client) token = await _login(client) call_count = 0 original_get_session = session_repo.get_session async def _tracking(db, tok): # type: ignore[no-untyped-def] nonlocal call_count call_count += 1 return await original_get_session(db, tok) with patch.object(session_repo, "get_session", side_effect=_tracking): resp1 = await client.get( "/api/v1/dashboard/status", headers={"Authorization": f"Bearer {token}"}, ) resp2 = await client.get( "/api/v1/dashboard/status", headers={"Authorization": f"Bearer {token}"}, ) assert resp1.status_code == 200 assert resp2.status_code == 200 assert call_count == 2 # --------------------------------------------------------------------------- # Session validation (Task 4) # --------------------------------------------------------------------------- class TestValidateSession: """GET /api/auth/session.""" async def test_validate_session_returns_200_with_valid_token( self, client: AsyncClient ) -> None: """Validate session returns 200 for a valid authenticated request.""" await _do_setup(client) token = await _login(client) # Use Bearer token to authenticate response = await client.get( "/api/v1/auth/session", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 200 assert response.json() == {"valid": True} async def test_validate_session_returns_401_without_token( self, client: AsyncClient ) -> None: """Validate session returns 401 when no token is present.""" await _do_setup(client) response = await client.get("/api/v1/auth/session") assert response.status_code == 401 async def test_validate_session_returns_401_with_invalid_token( self, client: AsyncClient ) -> None: """Validate session returns 401 for an invalid or expired token.""" await _do_setup(client) response = await client.get( "/api/v1/auth/session", headers={"Authorization": "Bearer invalidtoken"}, ) assert response.status_code == 401 async def test_validate_session_with_cookie( self, client: AsyncClient ) -> None: """Validate session works with cookie-based authentication.""" await _do_setup(client) token = await _login(client) # httpx should automatically send the cookie, but use Bearer token as fallback response = await client.get( "/api/v1/auth/session", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 200 assert response.json() == {"valid": True} async def test_validate_session_after_logout( self, client: AsyncClient ) -> None: """Validate session returns 401 after logout.""" await _do_setup(client) token = await _login(client) await client.post( "/api/v1/auth/logout", headers={"Authorization": f"Bearer {token}"}, ) response = await client.get( "/api/v1/auth/session", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 401 # --------------------------------------------------------------------------- # Session-token cache (Task 4) # --------------------------------------------------------------------------- class TestRequireAuthSessionCache: """In-memory session token cache inside ``require_auth``.""" @pytest.fixture(autouse=True) def reset_cache(self, client: AsyncClient) -> Generator[None, None, None]: """Flush the session cache before and after every test in this class.""" client._transport.app.state.session_cache.clear() yield client._transport.app.state.session_cache.clear() @pytest.fixture(autouse=True) def enable_session_cache(self, client: AsyncClient) -> Generator[None, None, None]: """Enable the in-memory auth cache for tests that exercise it.""" client._transport.app.state.settings.session_cache_enabled = True client._transport.app.state.settings.session_cache_ttl_seconds = 10.0 yield async def test_second_request_skips_db(self, client: AsyncClient) -> None: """Second authenticated request within TTL skips the session DB query. The first request populates the in-memory cache via ``require_auth``. The second request — using the same token before the TTL expires — must return ``session_repo.get_session`` *without* calling it. """ from app.repositories import session_repo await _do_setup(client) token = await _login(client) # Ensure cache is empty so the first request definitely hits the DB. client._transport.app.state.session_cache.clear() call_count = 0 original_get_session = session_repo.get_session async def _tracking(db, tok): # type: ignore[no-untyped-def] nonlocal call_count call_count += 1 return await original_get_session(db, tok) with patch.object(session_repo, "get_session", side_effect=_tracking): resp1 = await client.get( "/api/v1/dashboard/status", headers={"Authorization": f"Bearer {token}"}, ) resp2 = await client.get( "/api/v1/dashboard/status", headers={"Authorization": f"Bearer {token}"}, ) assert resp1.status_code == 200 assert resp2.status_code == 200 # DB queried exactly once: the first request populates the cache, # the second request is served entirely from memory. assert call_count == 1 async def test_token_enters_cache_after_first_auth( self, client: AsyncClient ) -> None: """A successful auth request places the token in the session cache.""" await _do_setup(client) token = await _login(client) client._transport.app.state.session_cache.clear() assert client._transport.app.state.session_cache.get(token) is None await client.get( "/api/v1/dashboard/status", headers={"Authorization": f"Bearer {token}"}, ) assert client._transport.app.state.session_cache.get(token) is not None async def test_logout_evicts_token_from_cache( self, client: AsyncClient ) -> None: """Logout removes the session token from the session cache immediately.""" await _do_setup(client) token = await _login(client) # Warm the cache. await client.get( "/api/v1/dashboard/status", headers={"Authorization": f"Bearer {token}"}, ) assert client._transport.app.state.session_cache.get(token) is not None # Logout must evict the entry. await client.post( "/api/v1/auth/logout", headers={"Authorization": f"Bearer {token}"}, ) assert client._transport.app.state.session_cache.get(token) is None response = await client.get("/api/v1/health") assert response.status_code == 200