"""Authentication service. Handles password verification, session creation, session validation, and session expiry. Sessions are stored in the SQLite database so they survive server restarts. """ from __future__ import annotations import hashlib import hmac import secrets from typing import TYPE_CHECKING import bcrypt import structlog from app.utils.async_utils import run_blocking if TYPE_CHECKING: import aiosqlite from app.models.auth import Session from app.repositories.protocols import SessionRepository from app.repositories import session_repo as default_session_repo from app.services.setup_service import get_password_hash from app.utils.constants import SESSION_TOKEN_BYTES, SESSION_TOKEN_SIGNATURE_SEPARATOR from app.utils.time_utils import add_minutes, utc_now log: structlog.stdlib.BoundLogger = structlog.get_logger() def _session_token_signature(token: str, secret: str) -> str: """Return the HMAC-SHA256 signature for a session token.""" return hmac.new(secret.encode(), token.encode(), hashlib.sha256).hexdigest() def sign_session_token(token: str, secret: str) -> str: """Return a signed session token string for the client.""" return f"{token}{SESSION_TOKEN_SIGNATURE_SEPARATOR}{_session_token_signature(token, secret)}" def unwrap_session_token(token: str, secret: str) -> str: """Verify and return the raw token from a signed session token. All tokens must carry a valid HMAC-SHA256 signature. Tokens without a signature are rejected. Args: token: The signed session token in format "raw_token.signature". secret: The HMAC secret used to verify the signature. Returns: The raw token component. Raises: ValueError: If the token lacks a signature or the signature is invalid. """ if SESSION_TOKEN_SIGNATURE_SEPARATOR not in token: raise ValueError("Invalid session token.") raw_token, signature = token.rsplit(SESSION_TOKEN_SIGNATURE_SEPARATOR, 1) expected_signature = _session_token_signature(raw_token, secret) if not hmac.compare_digest(expected_signature, signature): raise ValueError("Invalid session token.") return raw_token async def _check_password(plain: str, hashed: str) -> bool: """Return ``True`` if *plain* matches the bcrypt *hashed* password. Runs in a thread executor so the blocking bcrypt operation does not stall the asyncio event loop. Args: plain: The plain-text password to verify. hashed: The stored bcrypt hash string. Returns: ``True`` on a successful match, ``False`` otherwise. """ plain_bytes = plain.encode() hashed_bytes = hashed.encode() return await run_blocking(lambda: bool(bcrypt.checkpw(plain_bytes, hashed_bytes))) async def login( db: aiosqlite.Connection, password: str, session_duration_minutes: int, session_secret: str, session_repo: SessionRepository = default_session_repo, ) -> tuple[str, str]: """Verify *password*, create a new session, and sign the token. Args: db: Active aiosqlite connection. password: Plain-text password supplied by the user. session_duration_minutes: How long the new session is valid for. session_secret: Secret used to sign the session token. Returns: A tuple of the signed session token and its expiry timestamp. Raises: ValueError: If the password is incorrect or no password hash is stored. """ stored_hash = await get_password_hash(db) if stored_hash is None: log.warning("bangui_login_no_hash") raise ValueError("No password is configured — run setup first.") if not await _check_password(password, stored_hash): log.warning("bangui_login_wrong_password") raise ValueError("Incorrect password.") token = secrets.token_hex(SESSION_TOKEN_BYTES) now = utc_now() created_iso = now.isoformat() expires_iso = add_minutes(now, session_duration_minutes).isoformat() session = await session_repo.create_session( db, token=token, created_at=created_iso, expires_at=expires_iso ) signed_token = sign_session_token(session.token, session_secret) log.info("bangui_login_success", session_id=session.id) return signed_token, session.expires_at async def validate_session( db: aiosqlite.Connection, token: str, session_secret: str | None = None, session_repo: SessionRepository = default_session_repo, ) -> Session: """Return the session for *token* if it is valid and not expired. Args: db: Active aiosqlite connection. token: The opaque session token from the client. session_secret: Secret used to verify signed tokens. Returns: The :class:`~app.models.auth.Session` if it is valid. Raises: ValueError: If the token is not found, invalid, or has expired. """ if session_secret is not None: try: token = unwrap_session_token(token, session_secret) except ValueError as exc: raise ValueError("Session token is invalid.") from exc session = await session_repo.get_session(db, token) if session is None: raise ValueError("Session not found.") now_iso = utc_now().isoformat() if session.expires_at <= now_iso: await session_repo.delete_session(db, token) raise ValueError("Session has expired.") return session async def logout( db: aiosqlite.Connection, token: str, session_secret: str | None = None, session_repo: SessionRepository = default_session_repo, ) -> str | None: """Invalidate the session identified by *token*. Args: db: Active aiosqlite connection. token: The session token to revoke. session_secret: Secret used to verify signed tokens. Returns: The raw session token that was revoked, or ``None`` if the token was invalid. """ if session_secret is not None: try: token = unwrap_session_token(token, session_secret) except ValueError: token_hash = hashlib.sha256(token.encode()).hexdigest()[:12] log.warning("bangui_logout_invalid_token", token_hash=token_hash) return None token_hash = hashlib.sha256(token.encode()).hexdigest()[:12] await session_repo.delete_session(db, token) log.info("bangui_logout", token_hash=token_hash) return token