Use session_secret for signed auth session tokens

This commit is contained in:
2026-04-09 21:30:08 +02:00
parent 6eab47f7ba
commit 208f98dc97
8 changed files with 136 additions and 12 deletions

View File

@@ -8,6 +8,8 @@ survive server restarts.
from __future__ import annotations
import asyncio
import hashlib
import hmac
import secrets
from typing import TYPE_CHECKING
@@ -20,12 +22,39 @@ if TYPE_CHECKING:
from app.models.auth import Session
from app.repositories import session_repo
from app.utils.constants import SESSION_TOKEN_BYTES, SESSION_TOKEN_SIGNATURE_SEPARATOR
from app.utils.setup_utils import get_password_hash
from app.utils.time_utils import add_minutes, utc_now
log: structlog.stdlib.BoundLogger = structlog.get_logger()
def _session_token_signature(token: str, secret: str) -> str:
"""Return the HMAC-SHA256 signature for a session token."""
return hmac.new(secret.encode(), token.encode(), hashlib.sha256).hexdigest()
def sign_session_token(token: str, secret: str) -> str:
"""Return a signed session token string for the client."""
return f"{token}{SESSION_TOKEN_SIGNATURE_SEPARATOR}{_session_token_signature(token, secret)}"
def unwrap_session_token(token: str, secret: str) -> str:
"""Verify and return the raw token from a signed session token.
If the token has no signature component, it is returned unchanged. This
preserves compatibility with existing raw session tokens stored in the DB.
"""
if SESSION_TOKEN_SIGNATURE_SEPARATOR not in token:
return token
raw_token, signature = token.rsplit(SESSION_TOKEN_SIGNATURE_SEPARATOR, 1)
expected_signature = _session_token_signature(raw_token, secret)
if not hmac.compare_digest(expected_signature, signature):
raise ValueError("Invalid session token.")
return raw_token
async def _check_password(plain: str, hashed: str) -> bool:
"""Return ``True`` if *plain* matches the bcrypt *hashed* password.
@@ -74,7 +103,7 @@ async def login(
log.warning("bangui_login_wrong_password")
raise ValueError("Incorrect password.")
token = secrets.token_hex(32)
token = secrets.token_hex(SESSION_TOKEN_BYTES)
now = utc_now()
created_iso = now.isoformat()
expires_iso = add_minutes(now, session_duration_minutes).isoformat()
@@ -86,19 +115,30 @@ async def login(
return session
async def validate_session(db: aiosqlite.Connection, token: str) -> Session:
async def validate_session(
db: aiosqlite.Connection,
token: str,
session_secret: str | None = None,
) -> Session:
"""Return the session for *token* if it is valid and not expired.
Args:
db: Active aiosqlite connection.
token: The opaque session token from the client.
session_secret: Secret used to verify signed tokens.
Returns:
The :class:`~app.models.auth.Session` if it is valid.
Raises:
ValueError: If the token is not found or has expired.
ValueError: If the token is not found, invalid, or has expired.
"""
if session_secret is not None:
try:
token = unwrap_session_token(token, session_secret)
except ValueError as exc:
raise ValueError("Session token is invalid.") from exc
session = await session_repo.get_session(db, token)
if session is None:
raise ValueError("Session not found.")
@@ -111,12 +151,28 @@ async def validate_session(db: aiosqlite.Connection, token: str) -> Session:
return session
async def logout(db: aiosqlite.Connection, token: str) -> None:
async def logout(
db: aiosqlite.Connection,
token: str,
session_secret: str | None = None,
) -> str | None:
"""Invalidate the session identified by *token*.
Args:
db: Active aiosqlite connection.
token: The session token to revoke.
session_secret: Secret used to verify signed tokens.
Returns:
The raw session token that was revoked, or ``None`` if the token was invalid.
"""
if session_secret is not None:
try:
token = unwrap_session_token(token, session_secret)
except ValueError:
log.warning("bangui_logout_invalid_token", token_prefix=token[:8])
return None
await session_repo.delete_session(db, token)
log.info("bangui_logout", token_prefix=token[:8])
return token