- Add TYPE_CHECKING guards for runtime-expensive imports (aiohttp, aiosqlite) - Reorganize imports to follow PEP 8 conventions - Convert TypeAlias to modern PEP 695 type syntax (where appropriate) - Use Sequence/Mapping from collections.abc for type hints (covariant) - Replace string literals with cast() for improved type inference - Fix casting of Fail2BanResponse and TypedDict patterns - Add IpLookupResult TypedDict for precise return type annotation - Reformat overlong lines for readability (120 char limit) - Add asyncio_mode and filterwarnings to pytest config - Update test fixtures with improved type hints This improves mypy type checking and makes type relationships explicit.
254 lines
8.9 KiB
Python
254 lines
8.9 KiB
Python
"""Tests for the auth router (POST /api/auth/login, POST /api/auth/logout)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Generator
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_SETUP_PAYLOAD = {
|
|
"master_password": "mysecretpass1",
|
|
"database_path": "bangui.db",
|
|
"fail2ban_socket": "/var/run/fail2ban/fail2ban.sock",
|
|
"timezone": "UTC",
|
|
"session_duration_minutes": 60,
|
|
}
|
|
|
|
|
|
async def _do_setup(client: AsyncClient) -> None:
|
|
"""Run the setup wizard so auth endpoints are reachable."""
|
|
resp = await client.post("/api/setup", json=_SETUP_PAYLOAD)
|
|
assert resp.status_code == 201
|
|
|
|
|
|
async def _login(client: AsyncClient, password: str = "mysecretpass1") -> str:
|
|
"""Helper: perform login and return the session token."""
|
|
resp = await client.post("/api/auth/login", json={"password": password})
|
|
assert resp.status_code == 200
|
|
return str(resp.json()["token"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Login
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestLogin:
|
|
"""POST /api/auth/login."""
|
|
|
|
async def test_login_succeeds_with_correct_password(
|
|
self, client: AsyncClient
|
|
) -> None:
|
|
"""Login returns 200 and a session token for the correct password."""
|
|
await _do_setup(client)
|
|
response = await client.post(
|
|
"/api/auth/login", json={"password": "mysecretpass1"}
|
|
)
|
|
assert response.status_code == 200
|
|
body = response.json()
|
|
assert "token" in body
|
|
assert len(body["token"]) > 0
|
|
assert "expires_at" in body
|
|
|
|
async def test_login_sets_cookie(self, client: AsyncClient) -> None:
|
|
"""Login sets the bangui_session HttpOnly cookie."""
|
|
await _do_setup(client)
|
|
response = await client.post(
|
|
"/api/auth/login", json={"password": "mysecretpass1"}
|
|
)
|
|
assert response.status_code == 200
|
|
assert "bangui_session" in response.cookies
|
|
|
|
async def test_login_fails_with_wrong_password(
|
|
self, client: AsyncClient
|
|
) -> None:
|
|
"""Login returns 401 for an incorrect password."""
|
|
await _do_setup(client)
|
|
response = await client.post(
|
|
"/api/auth/login", json={"password": "wrongpassword"}
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
async def test_login_rejects_empty_password(self, client: AsyncClient) -> None:
|
|
"""Login returns 422 when password field is missing."""
|
|
await _do_setup(client)
|
|
response = await client.post("/api/auth/login", json={})
|
|
assert response.status_code == 422
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Logout
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestLogout:
|
|
"""POST /api/auth/logout."""
|
|
|
|
async def test_logout_returns_200(self, client: AsyncClient) -> None:
|
|
"""Logout returns 200 with a confirmation message."""
|
|
await _do_setup(client)
|
|
await _login(client)
|
|
response = await client.post("/api/auth/logout")
|
|
assert response.status_code == 200
|
|
assert "message" in response.json()
|
|
|
|
async def test_logout_clears_cookie(self, client: AsyncClient) -> None:
|
|
"""Logout clears the bangui_session cookie."""
|
|
await _do_setup(client)
|
|
await _login(client) # sets cookie on client
|
|
response = await client.post("/api/auth/logout")
|
|
assert response.status_code == 200
|
|
# Cookie should be set to empty / deleted in the Set-Cookie header.
|
|
set_cookie = response.headers.get("set-cookie", "")
|
|
assert "bangui_session" in set_cookie
|
|
|
|
async def test_logout_is_idempotent(self, client: AsyncClient) -> None:
|
|
"""Logout succeeds even when called without a session token."""
|
|
await _do_setup(client)
|
|
response = await client.post("/api/auth/logout")
|
|
assert response.status_code == 200
|
|
|
|
async def test_session_invalid_after_logout(
|
|
self, client: AsyncClient
|
|
) -> None:
|
|
"""A session token is rejected after logout."""
|
|
await _do_setup(client)
|
|
token = await _login(client)
|
|
|
|
await client.post("/api/auth/logout")
|
|
|
|
# Now try to use the invalidated token via Bearer header. The health
|
|
# endpoint is unprotected so we validate against a hypothetical
|
|
# protected endpoint by inspecting the auth service directly.
|
|
# Here we just confirm the token is no longer in the DB by trying
|
|
# to re-use it on logout (idempotent — still 200, not an error).
|
|
response = await client.post(
|
|
"/api/auth/logout",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auth dependency (protected route guard)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRequireAuth:
|
|
"""Verify the require_auth dependency rejects unauthenticated requests."""
|
|
|
|
async def test_health_endpoint_requires_no_auth(
|
|
self, client: AsyncClient
|
|
) -> None:
|
|
"""Health endpoint is accessible without authentication."""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Session-token cache (Task 4)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRequireAuthSessionCache:
|
|
"""In-memory session token cache inside ``require_auth``."""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_cache(self) -> Generator[None, None, None]:
|
|
"""Flush the session cache before and after every test in this class."""
|
|
from app import dependencies
|
|
|
|
dependencies.clear_session_cache()
|
|
yield
|
|
dependencies.clear_session_cache()
|
|
|
|
async def test_second_request_skips_db(self, client: AsyncClient) -> None:
|
|
"""Second authenticated request within TTL skips the session DB query.
|
|
|
|
The first request populates the in-memory cache via ``require_auth``.
|
|
The second request — using the same token before the TTL expires —
|
|
must return ``session_repo.get_session`` *without* calling it.
|
|
"""
|
|
from app.repositories import session_repo
|
|
|
|
await _do_setup(client)
|
|
token = await _login(client)
|
|
|
|
# Ensure cache is empty so the first request definitely hits the DB.
|
|
from app import dependencies
|
|
|
|
dependencies.clear_session_cache()
|
|
|
|
call_count = 0
|
|
original_get_session = session_repo.get_session
|
|
|
|
async def _tracking(db, tok): # type: ignore[no-untyped-def]
|
|
nonlocal call_count
|
|
call_count += 1
|
|
return await original_get_session(db, tok)
|
|
|
|
with patch.object(session_repo, "get_session", side_effect=_tracking):
|
|
resp1 = await client.get(
|
|
"/api/dashboard/status",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
resp2 = await client.get(
|
|
"/api/dashboard/status",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
|
|
assert resp1.status_code == 200
|
|
assert resp2.status_code == 200
|
|
# DB queried exactly once: the first request populates the cache,
|
|
# the second request is served entirely from memory.
|
|
assert call_count == 1
|
|
|
|
async def test_token_enters_cache_after_first_auth(
|
|
self, client: AsyncClient
|
|
) -> None:
|
|
"""A successful auth request places the token in ``_session_cache``."""
|
|
from app import dependencies
|
|
|
|
await _do_setup(client)
|
|
token = await _login(client)
|
|
|
|
dependencies.clear_session_cache()
|
|
assert token not in dependencies._session_cache
|
|
|
|
await client.get(
|
|
"/api/dashboard/status",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
|
|
assert token in dependencies._session_cache
|
|
|
|
async def test_logout_evicts_token_from_cache(
|
|
self, client: AsyncClient
|
|
) -> None:
|
|
"""Logout removes the session token from the in-memory cache immediately."""
|
|
from app import dependencies
|
|
|
|
await _do_setup(client)
|
|
token = await _login(client)
|
|
|
|
# Warm the cache.
|
|
await client.get(
|
|
"/api/dashboard/status",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
assert token in dependencies._session_cache
|
|
|
|
# Logout must evict the entry.
|
|
await client.post(
|
|
"/api/auth/logout",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
assert token not in dependencies._session_cache
|
|
|
|
response = await client.get("/api/health")
|
|
assert response.status_code == 200
|