Files
BanGUI/backend/app/services/auth_service.py

184 lines
5.9 KiB
Python

"""Authentication service.
Handles password verification, session creation, session validation, and
session expiry. Sessions are stored in the SQLite database so they
survive server restarts.
"""
from __future__ import annotations
import hashlib
import hmac
import secrets
from typing import TYPE_CHECKING
import bcrypt
import structlog
from app.utils.async_utils import run_blocking
if TYPE_CHECKING:
import aiosqlite
from app.models.auth import Session
from app.repositories.protocols import SessionRepository
from app.repositories import session_repo as default_session_repo
from app.services.setup_service import get_password_hash
from app.utils.constants import SESSION_TOKEN_BYTES, SESSION_TOKEN_SIGNATURE_SEPARATOR
from app.utils.time_utils import add_minutes, utc_now
log: structlog.stdlib.BoundLogger = structlog.get_logger()
def _session_token_signature(token: str, secret: str) -> str:
"""Return the HMAC-SHA256 signature for a session token."""
return hmac.new(secret.encode(), token.encode(), hashlib.sha256).hexdigest()
def sign_session_token(token: str, secret: str) -> str:
"""Return a signed session token string for the client."""
return f"{token}{SESSION_TOKEN_SIGNATURE_SEPARATOR}{_session_token_signature(token, secret)}"
def unwrap_session_token(token: str, secret: str) -> str:
"""Verify and return the raw token from a signed session token.
If the token has no signature component, it is returned unchanged. This
preserves compatibility with existing raw session tokens stored in the DB.
"""
if SESSION_TOKEN_SIGNATURE_SEPARATOR not in token:
return token
raw_token, signature = token.rsplit(SESSION_TOKEN_SIGNATURE_SEPARATOR, 1)
expected_signature = _session_token_signature(raw_token, secret)
if not hmac.compare_digest(expected_signature, signature):
raise ValueError("Invalid session token.")
return raw_token
async def _check_password(plain: str, hashed: str) -> bool:
"""Return ``True`` if *plain* matches the bcrypt *hashed* password.
Runs in a thread executor so the blocking bcrypt operation does not stall
the asyncio event loop.
Args:
plain: The plain-text password to verify.
hashed: The stored bcrypt hash string.
Returns:
``True`` on a successful match, ``False`` otherwise.
"""
plain_bytes = plain.encode()
hashed_bytes = hashed.encode()
return await run_blocking(lambda: bool(bcrypt.checkpw(plain_bytes, hashed_bytes)))
async def login(
db: aiosqlite.Connection,
password: str,
session_duration_minutes: int,
session_secret: str,
session_repo: SessionRepository = default_session_repo,
) -> tuple[str, str]:
"""Verify *password*, create a new session, and sign the token.
Args:
db: Active aiosqlite connection.
password: Plain-text password supplied by the user.
session_duration_minutes: How long the new session is valid for.
session_secret: Secret used to sign the session token.
Returns:
A tuple of the signed session token and its expiry timestamp.
Raises:
ValueError: If the password is incorrect or no password hash is stored.
"""
stored_hash = await get_password_hash(db)
if stored_hash is None:
log.warning("bangui_login_no_hash")
raise ValueError("No password is configured — run setup first.")
if not await _check_password(password, stored_hash):
log.warning("bangui_login_wrong_password")
raise ValueError("Incorrect password.")
token = secrets.token_hex(SESSION_TOKEN_BYTES)
now = utc_now()
created_iso = now.isoformat()
expires_iso = add_minutes(now, session_duration_minutes).isoformat()
session = await session_repo.create_session(
db, token=token, created_at=created_iso, expires_at=expires_iso
)
signed_token = sign_session_token(session.token, session_secret)
log.info("bangui_login_success", token_prefix=session.token[:8])
return signed_token, session.expires_at
async def validate_session(
db: aiosqlite.Connection,
token: str,
session_secret: str | None = None,
session_repo: SessionRepository = default_session_repo,
) -> Session:
"""Return the session for *token* if it is valid and not expired.
Args:
db: Active aiosqlite connection.
token: The opaque session token from the client.
session_secret: Secret used to verify signed tokens.
Returns:
The :class:`~app.models.auth.Session` if it is valid.
Raises:
ValueError: If the token is not found, invalid, or has expired.
"""
if session_secret is not None:
try:
token = unwrap_session_token(token, session_secret)
except ValueError as exc:
raise ValueError("Session token is invalid.") from exc
session = await session_repo.get_session(db, token)
if session is None:
raise ValueError("Session not found.")
now_iso = utc_now().isoformat()
if session.expires_at <= now_iso:
await session_repo.delete_session(db, token)
raise ValueError("Session has expired.")
return session
async def logout(
db: aiosqlite.Connection,
token: str,
session_secret: str | None = None,
session_repo: SessionRepository = default_session_repo,
) -> str | None:
"""Invalidate the session identified by *token*.
Args:
db: Active aiosqlite connection.
token: The session token to revoke.
session_secret: Secret used to verify signed tokens.
Returns:
The raw session token that was revoked, or ``None`` if the token was invalid.
"""
if session_secret is not None:
try:
token = unwrap_session_token(token, session_secret)
except ValueError:
log.warning("bangui_logout_invalid_token", token_prefix=token[:8])
return None
await session_repo.delete_session(db, token)
log.info("bangui_logout", token_prefix=token[:8])
return token