TASK-022: Hash session tokens in database for security

- Store session tokens as one-way SHA256 hashes instead of plaintext
- Hash tokens on write (create_session) and on read (get_session, delete_session)
- Add migration to drop plaintext sessions table and recreate with token_hash column
- Update Session model: token field still contains raw token for signing
- Add test to verify tokens are hashed in database, not plaintext
- Update Architekture.md to document session token hashing
- Update Backend-Development.md with implementation pattern and best practices

Prevents direct session token hijacking if database file is exposed to attacker.
If plaintext DB was readable, sessions are invalidated by the migration anyway.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-04-26 14:35:32 +02:00
parent 5709785942
commit 81f009e323
7 changed files with 168 additions and 45 deletions

View File

@@ -30,15 +30,15 @@ CREATE TABLE IF NOT EXISTS settings (
_CREATE_SESSIONS: str = """
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
token TEXT NOT NULL UNIQUE,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
expires_at TEXT NOT NULL
id INTEGER PRIMARY KEY AUTOINCREMENT,
token_hash TEXT NOT NULL UNIQUE,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
expires_at TEXT NOT NULL
);
"""
_CREATE_SESSIONS_TOKEN_INDEX: str = """
CREATE UNIQUE INDEX IF NOT EXISTS idx_sessions_token ON sessions (token);
CREATE UNIQUE INDEX IF NOT EXISTS idx_sessions_token_hash ON sessions (token_hash);
"""
_CREATE_BLOCKLIST_SOURCES: str = """
@@ -107,10 +107,24 @@ _SCHEMA_STATEMENTS: list[str] = [
_CREATE_HISTORY_ARCHIVE,
]
_CURRENT_SCHEMA_VERSION: int = 1
_CURRENT_SCHEMA_VERSION: int = 2
_MIGRATIONS: dict[int, str] = {
1: "\n".join(_SCHEMA_STATEMENTS),
2: """
-- Migration 2: Hash session tokens for security.
-- Drop the old sessions table and recreate with token_hash column.
-- This invalidates all existing sessions, which is acceptable as the DB
-- contents were exposed in plaintext.
DROP TABLE IF EXISTS sessions;
CREATE TABLE sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
token_hash TEXT NOT NULL UNIQUE,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
expires_at TEXT NOT NULL
);
CREATE UNIQUE INDEX idx_sessions_token_hash ON sessions (token_hash);
""",
}

View File

@@ -2,10 +2,15 @@
Provides storage, retrieval, and deletion of session records in the
``sessions`` table of the application SQLite database.
Session tokens are stored as one-way SHA256 hashes to ensure that if the
database is exposed, the session tokens themselves cannot be directly used.
The hash is computed from the raw token before all database operations.
"""
from __future__ import annotations
import hashlib
from typing import TYPE_CHECKING
if TYPE_CHECKING:
@@ -14,6 +19,18 @@ if TYPE_CHECKING:
from app.models.auth import Session
def _hash_token(token: str) -> str:
"""Return the SHA256 hash of a session token.
Args:
token: The raw session token to hash.
Returns:
The hexadecimal SHA256 digest of the token.
"""
return hashlib.sha256(token.encode()).hexdigest()
async def create_session(
db: aiosqlite.Connection,
token: str,
@@ -30,10 +47,15 @@ async def create_session(
Returns:
The newly created :class:`~app.models.auth.Session`.
Note:
The token is hashed before storage. The returned Session object
contains the original raw token for use in signing and response.
"""
token_hash = _hash_token(token)
cursor = await db.execute(
"INSERT INTO sessions (token, created_at, expires_at) VALUES (?, ?, ?)",
(token, created_at, expires_at),
"INSERT INTO sessions (token_hash, created_at, expires_at) VALUES (?, ?, ?)",
(token_hash, created_at, expires_at),
)
await db.commit()
return Session(
@@ -53,10 +75,14 @@ async def get_session(db: aiosqlite.Connection, token: str) -> Session | None:
Returns:
The :class:`~app.models.auth.Session` if found, else ``None``.
Note:
The token is hashed before the database lookup.
"""
token_hash = _hash_token(token)
async with db.execute(
"SELECT id, token, created_at, expires_at FROM sessions WHERE token = ?",
(token,),
"SELECT id, token_hash, created_at, expires_at FROM sessions WHERE token_hash = ?",
(token_hash,),
) as cursor:
row = await cursor.fetchone()
@@ -65,7 +91,7 @@ async def get_session(db: aiosqlite.Connection, token: str) -> Session | None:
return Session(
id=int(row[0]),
token=str(row[1]),
token=token,
created_at=str(row[2]),
expires_at=str(row[3]),
)
@@ -77,8 +103,12 @@ async def delete_session(db: aiosqlite.Connection, token: str) -> None:
Args:
db: Active aiosqlite connection.
token: The session token to remove.
Note:
The token is hashed before the database lookup.
"""
await db.execute("DELETE FROM sessions WHERE token = ?", (token,))
token_hash = _hash_token(token)
await db.execute("DELETE FROM sessions WHERE token_hash = ?", (token_hash,))
await db.commit()

View File

@@ -80,7 +80,7 @@ async def test_init_db_records_schema_version(tmp_path: Path) -> None:
) as cursor:
row = await cursor.fetchone()
assert row is not None
assert row[0] == 1
assert row[0] == 2
@pytest.mark.asyncio
@@ -97,4 +97,4 @@ async def test_init_db_migrates_legacy_database_without_schema_version(tmp_path:
) as cursor:
row = await cursor.fetchone()
assert row is not None
assert row[0] == 1
assert row[0] == 2

View File

@@ -116,3 +116,28 @@ class TestSessionRepo:
assert deleted == 1
assert await session_repo.get_session(db, "expired") is None
assert await session_repo.get_session(db, "valid") is not None
async def test_tokens_are_hashed_in_database(
self, db: aiosqlite.Connection
) -> None:
"""Verify that tokens are stored as hashes, not plaintext."""
import hashlib
token = "plaintext_token_12345"
await session_repo.create_session(
db,
token=token,
created_at="2025-01-01T00:00:00+00:00",
expires_at="2025-01-01T01:00:00+00:00",
)
# Query the database directly to verify the token is hashed.
async with db.execute("SELECT token_hash FROM sessions WHERE token_hash = ?", (hashlib.sha256(token.encode()).hexdigest(),)) as cursor:
row = await cursor.fetchone()
assert row is not None
assert row[0] == hashlib.sha256(token.encode()).hexdigest()
# The plaintext token should not be in the database.
async with db.execute("SELECT * FROM sessions WHERE token_hash = ?", (token,)) as cursor:
row = await cursor.fetchone()
assert row is None