Move auth session signing into auth_service.login
This commit is contained in:
@@ -77,37 +77,58 @@ class TestCheckPasswordAsync:
|
||||
|
||||
|
||||
class TestLogin:
|
||||
async def test_login_returns_session_on_correct_password(
|
||||
async def test_login_returns_signed_token_on_correct_password(
|
||||
self, db: aiosqlite.Connection
|
||||
) -> None:
|
||||
"""login() returns a Session on the correct password."""
|
||||
session = await auth_service.login(db, password="correctpassword1", session_duration_minutes=60)
|
||||
assert session.token
|
||||
assert len(session.token) == 64 # 32 bytes → 64 hex chars
|
||||
assert session.expires_at > session.created_at
|
||||
"""login() returns a signed token and expiry on the correct password."""
|
||||
signed_token, expires_at = await auth_service.login(
|
||||
db,
|
||||
password="correctpassword1",
|
||||
session_duration_minutes=60,
|
||||
session_secret="test-secret",
|
||||
)
|
||||
assert signed_token
|
||||
assert "." in signed_token
|
||||
assert expires_at
|
||||
|
||||
async def test_login_raises_on_wrong_password(
|
||||
self, db: aiosqlite.Connection
|
||||
) -> None:
|
||||
"""login() raises ValueError for an incorrect password."""
|
||||
with pytest.raises(ValueError, match="Incorrect password"):
|
||||
await auth_service.login(db, password="wrongpassword", session_duration_minutes=60)
|
||||
await auth_service.login(
|
||||
db,
|
||||
password="wrongpassword",
|
||||
session_duration_minutes=60,
|
||||
session_secret="test-secret",
|
||||
)
|
||||
|
||||
async def test_login_raises_when_no_hash_configured(
|
||||
self, db_no_setup: aiosqlite.Connection
|
||||
) -> None:
|
||||
"""login() raises ValueError when setup has not been run."""
|
||||
with pytest.raises(ValueError, match="No password is configured"):
|
||||
await auth_service.login(db_no_setup, password="any", session_duration_minutes=60)
|
||||
await auth_service.login(
|
||||
db_no_setup,
|
||||
password="any",
|
||||
session_duration_minutes=60,
|
||||
session_secret="test-secret",
|
||||
)
|
||||
|
||||
async def test_login_persists_session(self, db: aiosqlite.Connection) -> None:
|
||||
"""login() stores the session in the database."""
|
||||
from app.repositories import session_repo
|
||||
|
||||
session = await auth_service.login(db, password="correctpassword1", session_duration_minutes=60)
|
||||
stored = await session_repo.get_session(db, session.token)
|
||||
signed_token, _ = await auth_service.login(
|
||||
db,
|
||||
password="correctpassword1",
|
||||
session_duration_minutes=60,
|
||||
session_secret="test-secret",
|
||||
)
|
||||
raw_token = auth_service.unwrap_session_token(signed_token, "test-secret")
|
||||
stored = await session_repo.get_session(db, raw_token)
|
||||
assert stored is not None
|
||||
assert stored.token == session.token
|
||||
assert stored.token == raw_token
|
||||
|
||||
|
||||
class TestValidateSession:
|
||||
@@ -115,27 +136,42 @@ class TestValidateSession:
|
||||
self, db: aiosqlite.Connection
|
||||
) -> None:
|
||||
"""validate_session() returns the session for a valid token."""
|
||||
session = await auth_service.login(db, password="correctpassword1", session_duration_minutes=60)
|
||||
validated = await auth_service.validate_session(db, session.token)
|
||||
assert validated.token == session.token
|
||||
signed_token, _ = await auth_service.login(
|
||||
db,
|
||||
password="correctpassword1",
|
||||
session_duration_minutes=60,
|
||||
session_secret="test-secret",
|
||||
)
|
||||
raw_token = auth_service.unwrap_session_token(signed_token, "test-secret")
|
||||
validated = await auth_service.validate_session(db, raw_token)
|
||||
assert validated.token == raw_token
|
||||
|
||||
async def test_validate_accepts_signed_token(
|
||||
self, db: aiosqlite.Connection
|
||||
) -> None:
|
||||
"""validate_session() accepts a token signed with the configured secret."""
|
||||
session = await auth_service.login(db, password="correctpassword1", session_duration_minutes=60)
|
||||
signed_token = auth_service.sign_session_token(session.token, "test-secret")
|
||||
signed_token, _ = await auth_service.login(
|
||||
db,
|
||||
password="correctpassword1",
|
||||
session_duration_minutes=60,
|
||||
session_secret="test-secret",
|
||||
)
|
||||
validated = await auth_service.validate_session(
|
||||
db, signed_token, session_secret="test-secret"
|
||||
)
|
||||
assert validated.token == session.token
|
||||
raw_token = auth_service.unwrap_session_token(signed_token, "test-secret")
|
||||
assert validated.token == raw_token
|
||||
|
||||
async def test_validate_rejects_tampered_signed_token(
|
||||
self, db: aiosqlite.Connection
|
||||
) -> None:
|
||||
"""validate_session() rejects signed tokens with an invalid signature."""
|
||||
session = await auth_service.login(db, password="correctpassword1", session_duration_minutes=60)
|
||||
signed_token = auth_service.sign_session_token(session.token, "test-secret")
|
||||
signed_token, _ = await auth_service.login(
|
||||
db,
|
||||
password="correctpassword1",
|
||||
session_duration_minutes=60,
|
||||
session_secret="test-secret",
|
||||
)
|
||||
tampered_token = signed_token[:-1] + ("0" if signed_token[-1] != "0" else "1")
|
||||
|
||||
with pytest.raises(ValueError, match="invalid"):
|
||||
@@ -177,18 +213,29 @@ class TestLogout:
|
||||
"""logout() deletes the session so it can no longer be validated."""
|
||||
from app.repositories import session_repo
|
||||
|
||||
session = await auth_service.login(db, password="correctpassword1", session_duration_minutes=60)
|
||||
await auth_service.logout(db, session.token)
|
||||
stored = await session_repo.get_session(db, session.token)
|
||||
signed_token, _ = await auth_service.login(
|
||||
db,
|
||||
password="correctpassword1",
|
||||
session_duration_minutes=60,
|
||||
session_secret="test-secret",
|
||||
)
|
||||
raw_token = auth_service.unwrap_session_token(signed_token, "test-secret")
|
||||
await auth_service.logout(db, raw_token)
|
||||
stored = await session_repo.get_session(db, raw_token)
|
||||
assert stored is None
|
||||
|
||||
async def test_logout_accepts_signed_token(self, db: aiosqlite.Connection) -> None:
|
||||
"""logout() accepts a signed token and revokes the underlying raw session."""
|
||||
from app.repositories import session_repo
|
||||
|
||||
session = await auth_service.login(db, password="correctpassword1", session_duration_minutes=60)
|
||||
signed_token = auth_service.sign_session_token(session.token, "test-secret")
|
||||
signed_token, _ = await auth_service.login(
|
||||
db,
|
||||
password="correctpassword1",
|
||||
session_duration_minutes=60,
|
||||
session_secret="test-secret",
|
||||
)
|
||||
raw_token = auth_service.unwrap_session_token(signed_token, "test-secret")
|
||||
await auth_service.logout(db, signed_token, session_secret="test-secret")
|
||||
|
||||
stored = await session_repo.get_session(db, session.token)
|
||||
stored = await session_repo.get_session(db, raw_token)
|
||||
assert stored is None
|
||||
|
||||
Reference in New Issue
Block a user