"""Tests for the auth router (POST /api/auth/login, POST /api/auth/logout).""" from __future__ import annotations from unittest.mock import patch import pytest from httpx import AsyncClient # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _SETUP_PAYLOAD = { "master_password": "mysecretpass1", "database_path": "bangui.db", "fail2ban_socket": "/var/run/fail2ban/fail2ban.sock", "timezone": "UTC", "session_duration_minutes": 60, } async def _do_setup(client: AsyncClient) -> None: """Run the setup wizard so auth endpoints are reachable.""" resp = await client.post("/api/setup", json=_SETUP_PAYLOAD) assert resp.status_code == 201 async def _login(client: AsyncClient, password: str = "mysecretpass1") -> str: """Helper: perform login and return the session token.""" resp = await client.post("/api/auth/login", json={"password": password}) assert resp.status_code == 200 return str(resp.json()["token"]) # --------------------------------------------------------------------------- # Login # --------------------------------------------------------------------------- class TestLogin: """POST /api/auth/login.""" async def test_login_succeeds_with_correct_password( self, client: AsyncClient ) -> None: """Login returns 200 and a session token for the correct password.""" await _do_setup(client) response = await client.post( "/api/auth/login", json={"password": "mysecretpass1"} ) assert response.status_code == 200 body = response.json() assert "token" in body assert len(body["token"]) > 0 assert "expires_at" in body async def test_login_sets_cookie(self, client: AsyncClient) -> None: """Login sets the bangui_session HttpOnly cookie.""" await _do_setup(client) response = await client.post( "/api/auth/login", json={"password": "mysecretpass1"} ) assert response.status_code == 200 assert "bangui_session" in response.cookies async def test_login_fails_with_wrong_password( self, client: AsyncClient ) -> None: """Login returns 401 for an incorrect password.""" await _do_setup(client) response = await client.post( "/api/auth/login", json={"password": "wrongpassword"} ) assert response.status_code == 401 async def test_login_rejects_empty_password(self, client: AsyncClient) -> None: """Login returns 422 when password field is missing.""" await _do_setup(client) response = await client.post("/api/auth/login", json={}) assert response.status_code == 422 # --------------------------------------------------------------------------- # Logout # --------------------------------------------------------------------------- class TestLogout: """POST /api/auth/logout.""" async def test_logout_returns_200(self, client: AsyncClient) -> None: """Logout returns 200 with a confirmation message.""" await _do_setup(client) await _login(client) response = await client.post("/api/auth/logout") assert response.status_code == 200 assert "message" in response.json() async def test_logout_clears_cookie(self, client: AsyncClient) -> None: """Logout clears the bangui_session cookie.""" await _do_setup(client) await _login(client) # sets cookie on client response = await client.post("/api/auth/logout") assert response.status_code == 200 # Cookie should be set to empty / deleted in the Set-Cookie header. set_cookie = response.headers.get("set-cookie", "") assert "bangui_session" in set_cookie async def test_logout_is_idempotent(self, client: AsyncClient) -> None: """Logout succeeds even when called without a session token.""" await _do_setup(client) response = await client.post("/api/auth/logout") assert response.status_code == 200 async def test_session_invalid_after_logout( self, client: AsyncClient ) -> None: """A session token is rejected after logout.""" await _do_setup(client) token = await _login(client) await client.post("/api/auth/logout") # Now try to use the invalidated token via Bearer header. The health # endpoint is unprotected so we validate against a hypothetical # protected endpoint by inspecting the auth service directly. # Here we just confirm the token is no longer in the DB by trying # to re-use it on logout (idempotent — still 200, not an error). response = await client.post( "/api/auth/logout", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 200 # --------------------------------------------------------------------------- # Auth dependency (protected route guard) # --------------------------------------------------------------------------- class TestRequireAuth: """Verify the require_auth dependency rejects unauthenticated requests.""" async def test_health_endpoint_requires_no_auth( self, client: AsyncClient ) -> None: """Health endpoint is accessible without authentication.""" # --------------------------------------------------------------------------- # Session-token cache (Task 4) # --------------------------------------------------------------------------- class TestRequireAuthSessionCache: """In-memory session token cache inside ``require_auth``.""" @pytest.fixture(autouse=True) def reset_cache(self) -> None: # type: ignore[misc] """Flush the session cache before and after every test in this class.""" from app import dependencies dependencies.clear_session_cache() yield # type: ignore[misc] dependencies.clear_session_cache() async def test_second_request_skips_db(self, client: AsyncClient) -> None: """Second authenticated request within TTL skips the session DB query. The first request populates the in-memory cache via ``require_auth``. The second request — using the same token before the TTL expires — must return ``session_repo.get_session`` *without* calling it. """ from app.repositories import session_repo await _do_setup(client) token = await _login(client) # Ensure cache is empty so the first request definitely hits the DB. from app import dependencies dependencies.clear_session_cache() call_count = 0 original_get_session = session_repo.get_session async def _tracking(db, tok): # type: ignore[no-untyped-def] nonlocal call_count call_count += 1 return await original_get_session(db, tok) with patch.object(session_repo, "get_session", side_effect=_tracking): resp1 = await client.get( "/api/dashboard/status", headers={"Authorization": f"Bearer {token}"}, ) resp2 = await client.get( "/api/dashboard/status", headers={"Authorization": f"Bearer {token}"}, ) assert resp1.status_code == 200 assert resp2.status_code == 200 # DB queried exactly once: the first request populates the cache, # the second request is served entirely from memory. assert call_count == 1 async def test_token_enters_cache_after_first_auth( self, client: AsyncClient ) -> None: """A successful auth request places the token in ``_session_cache``.""" from app import dependencies await _do_setup(client) token = await _login(client) dependencies.clear_session_cache() assert token not in dependencies._session_cache await client.get( "/api/dashboard/status", headers={"Authorization": f"Bearer {token}"}, ) assert token in dependencies._session_cache async def test_logout_evicts_token_from_cache( self, client: AsyncClient ) -> None: """Logout removes the session token from the in-memory cache immediately.""" from app import dependencies await _do_setup(client) token = await _login(client) # Warm the cache. await client.get( "/api/dashboard/status", headers={"Authorization": f"Bearer {token}"}, ) assert token in dependencies._session_cache # Logout must evict the entry. await client.post( "/api/auth/logout", headers={"Authorization": f"Bearer {token}"}, ) assert token not in dependencies._session_cache response = await client.get("/api/health") assert response.status_code == 200