"""Authentication service. Handles password verification, session creation, session validation, and session expiry. Sessions are stored in the SQLite database so they survive server restarts. """ from __future__ import annotations import secrets from typing import TYPE_CHECKING import bcrypt import structlog if TYPE_CHECKING: import aiosqlite from app.models.auth import Session from app.repositories import session_repo from app.services import setup_service from app.utils.time_utils import add_minutes, utc_now log: structlog.stdlib.BoundLogger = structlog.get_logger() def _check_password(plain: str, hashed: str) -> bool: """Return ``True`` if *plain* matches the bcrypt *hashed* password. Args: plain: The plain-text password to verify. hashed: The stored bcrypt hash string. Returns: ``True`` on a successful match, ``False`` otherwise. """ return bool(bcrypt.checkpw(plain.encode(), hashed.encode())) async def login( db: aiosqlite.Connection, password: str, session_duration_minutes: int, ) -> Session: """Verify *password* and create a new session on success. Args: db: Active aiosqlite connection. password: Plain-text password supplied by the user. session_duration_minutes: How long the new session is valid for. Returns: A :class:`~app.models.auth.Session` domain model for the new session. Raises: ValueError: If the password is incorrect or no password hash is stored. """ stored_hash = await setup_service.get_password_hash(db) if stored_hash is None: log.warning("bangui_login_no_hash") raise ValueError("No password is configured — run setup first.") if not _check_password(password, stored_hash): log.warning("bangui_login_wrong_password") raise ValueError("Incorrect password.") token = secrets.token_hex(32) now = utc_now() created_iso = now.isoformat() expires_iso = add_minutes(now, session_duration_minutes).isoformat() session = await session_repo.create_session( db, token=token, created_at=created_iso, expires_at=expires_iso ) log.info("bangui_login_success", token_prefix=token[:8]) return session async def validate_session(db: aiosqlite.Connection, token: str) -> Session: """Return the session for *token* if it is valid and not expired. Args: db: Active aiosqlite connection. token: The opaque session token from the client. Returns: The :class:`~app.models.auth.Session` if it is valid. Raises: ValueError: If the token is not found or has expired. """ session = await session_repo.get_session(db, token) if session is None: raise ValueError("Session not found.") now_iso = utc_now().isoformat() if session.expires_at <= now_iso: await session_repo.delete_session(db, token) raise ValueError("Session has expired.") return session async def logout(db: aiosqlite.Connection, token: str) -> None: """Invalidate the session identified by *token*. Args: db: Active aiosqlite connection. token: The session token to revoke. """ await session_repo.delete_session(db, token) log.info("bangui_logout", token_prefix=token[:8])