"""Tests for auth_service.""" from __future__ import annotations import asyncio import inspect from pathlib import Path import aiosqlite import pytest from app.db import init_db from app.services import auth_service, setup_service @pytest.fixture async def db(tmp_path: Path) -> aiosqlite.Connection: # type: ignore[misc] """Provide an initialised DB with setup already complete.""" conn: aiosqlite.Connection = await aiosqlite.connect(str(tmp_path / "auth.db")) conn.row_factory = aiosqlite.Row await init_db(conn) # Pre-run setup so auth operations have a password hash to check. await setup_service.run_setup( conn, master_password="correctpassword1", database_path=str(tmp_path / "auth.db"), fail2ban_socket="/var/run/fail2ban/fail2ban.sock", timezone="UTC", session_duration_minutes=60, ) yield conn await conn.close() @pytest.fixture async def db_no_setup(tmp_path: Path) -> aiosqlite.Connection: # type: ignore[misc] """Provide an initialised DB with no setup performed.""" conn: aiosqlite.Connection = await aiosqlite.connect(str(tmp_path / "auth_nosetup.db")) conn.row_factory = aiosqlite.Row await init_db(conn) yield conn await conn.close() class TestCheckPasswordAsync: async def test_check_password_is_coroutine_function(self) -> None: """_check_password must be a coroutine function (runs in thread executor).""" assert inspect.iscoroutinefunction(auth_service._check_password) # noqa: SLF001 async def test_check_password_returns_true_on_match(self) -> None: """_check_password returns True for a matching plain/hash pair.""" import bcrypt hashed = bcrypt.hashpw(b"secret", bcrypt.gensalt()).decode() result = await auth_service._check_password("secret", hashed) # noqa: SLF001 assert result is True async def test_check_password_returns_false_on_mismatch(self) -> None: """_check_password returns False when the password does not match.""" import bcrypt hashed = bcrypt.hashpw(b"secret", bcrypt.gensalt()).decode() result = await auth_service._check_password("wrong", hashed) # noqa: SLF001 assert result is False async def test_check_password_does_not_block_event_loop(self) -> None: """_check_password awaits without blocking; event-loop tasks can interleave.""" import bcrypt hashed = bcrypt.hashpw(b"secret", bcrypt.gensalt()).decode() # Running two concurrent checks must complete without deadlock. results = await asyncio.gather( auth_service._check_password("secret", hashed), # noqa: SLF001 auth_service._check_password("wrong", hashed), # noqa: SLF001 ) assert tuple(results) == (True, False) class TestLogin: async def test_login_returns_signed_token_on_correct_password( self, db: aiosqlite.Connection ) -> None: """login() returns a signed token and expiry on the correct password.""" signed_token, expires_at, _ = await auth_service.login( db, password="correctpassword1", session_duration_minutes=60, session_secret="test-secret", ) assert signed_token assert "." in signed_token assert expires_at async def test_login_raises_on_wrong_password( self, db: aiosqlite.Connection ) -> None: """login() raises ValueError for an incorrect password.""" with pytest.raises(ValueError, match="Incorrect password"): await auth_service.login( db, password="wrongpassword", session_duration_minutes=60, session_secret="test-secret", ) async def test_login_raises_when_no_hash_configured( self, db_no_setup: aiosqlite.Connection ) -> None: """login() raises ValueError when setup has not been run.""" with pytest.raises(ValueError, match="No password is configured"): await auth_service.login( db_no_setup, password="any", session_duration_minutes=60, session_secret="test-secret", ) async def test_login_persists_session(self, db: aiosqlite.Connection) -> None: """login() stores the session in the database.""" from app.repositories import session_repo signed_token, _, _ = await auth_service.login( db, password="correctpassword1", session_duration_minutes=60, session_secret="test-secret", ) raw_token = auth_service.unwrap_session_token(signed_token, "test-secret") stored = await session_repo.get_session(db, raw_token) assert stored is not None assert stored.token == raw_token class TestValidateSession: async def test_validate_returns_session_for_valid_token( self, db: aiosqlite.Connection ) -> None: """validate_session() returns the session for a valid token.""" signed_token, _, _ = await auth_service.login( db, password="correctpassword1", session_duration_minutes=60, session_secret="test-secret", ) raw_token = auth_service.unwrap_session_token(signed_token, "test-secret") validated = await auth_service.validate_session(db, raw_token) assert validated.token == raw_token async def test_validate_accepts_signed_token( self, db: aiosqlite.Connection ) -> None: """validate_session() accepts a token signed with the configured secret.""" signed_token, _, _ = await auth_service.login( db, password="correctpassword1", session_duration_minutes=60, session_secret="test-secret", ) validated = await auth_service.validate_session( db, signed_token, session_secret="test-secret" ) raw_token = auth_service.unwrap_session_token(signed_token, "test-secret") assert validated.token == raw_token async def test_validate_rejects_tampered_signed_token( self, db: aiosqlite.Connection ) -> None: """validate_session() rejects signed tokens with an invalid signature.""" signed_token, _, _ = await auth_service.login( db, password="correctpassword1", session_duration_minutes=60, session_secret="test-secret", ) tampered_token = signed_token[:-1] + ("0" if signed_token[-1] != "0" else "1") with pytest.raises(ValueError, match="invalid"): await auth_service.validate_session( db, tampered_token, session_secret="test-secret" ) async def test_validate_raises_for_unknown_token( self, db: aiosqlite.Connection ) -> None: """validate_session() raises ValueError for a non-existent token.""" with pytest.raises(ValueError, match="not found"): await auth_service.validate_session(db, "deadbeef" * 8) async def test_validate_raises_for_expired_session( self, db: aiosqlite.Connection ) -> None: """validate_session() raises ValueError and removes an expired session.""" from app.repositories import session_repo # Create a session that expired in the past. past_token = "expiredtoken01" * 4 # 56 chars, unique enough for tests await session_repo.create_session( db, token=past_token, created_at="2000-01-01T00:00:00+00:00", expires_at="2000-01-01T01:00:00+00:00", ) with pytest.raises(ValueError, match="expired"): await auth_service.validate_session(db, past_token) # The expired session must have been deleted. assert await session_repo.get_session(db, past_token) is None class TestLogout: async def test_logout_removes_session(self, db: aiosqlite.Connection) -> None: """logout() deletes the session so it can no longer be validated.""" from app.repositories import session_repo signed_token, _, _ = await auth_service.login( db, password="correctpassword1", session_duration_minutes=60, session_secret="test-secret", ) raw_token = auth_service.unwrap_session_token(signed_token, "test-secret") await auth_service.logout(db, raw_token) stored = await session_repo.get_session(db, raw_token) assert stored is None async def test_logout_accepts_signed_token(self, db: aiosqlite.Connection) -> None: """logout() accepts a signed token and revokes the underlying raw session.""" from app.repositories import session_repo signed_token, _, _ = await auth_service.login( db, password="correctpassword1", session_duration_minutes=60, session_secret="test-secret", ) raw_token = auth_service.unwrap_session_token(signed_token, "test-secret") await auth_service.logout(db, signed_token, session_secret="test-secret") stored = await session_repo.get_session(db, raw_token) assert stored is None class TestSecretRotation: """Tests for session secret rotation support.""" async def test_unwrap_with_rotation_accepts_current_secret( self, db: aiosqlite.Connection ) -> None: """Tokens signed with current secret are validated immediately.""" signed_token, _, _ = await auth_service.login( db, password="correctpassword1", session_duration_minutes=60, session_secret="current-secret", ) raw_token, was_re_signed = auth_service.unwrap_session_token_with_rotation( signed_token, "current-secret", None ) assert raw_token == auth_service.unwrap_session_token(signed_token, "current-secret") assert was_re_signed is False async def test_unwrap_with_rotation_accepts_previous_secret( self, db: aiosqlite.Connection ) -> None: """Tokens signed with previous secret are accepted during rotation.""" signed_token, _, _ = await auth_service.login( db, password="correctpassword1", session_duration_minutes=60, session_secret="old-secret", ) raw_token, was_re_signed = auth_service.unwrap_session_token_with_rotation( signed_token, "new-secret", "old-secret" ) assert raw_token == auth_service.unwrap_session_token(signed_token, "old-secret") assert was_re_signed is True async def test_unwrap_with_rotation_rejects_unknown_secret( self, db: aiosqlite.Connection ) -> None: """Tokens signed with unknown secrets are rejected.""" signed_token, _, _ = await auth_service.login( db, password="correctpassword1", session_duration_minutes=60, session_secret="secret-a", ) with pytest.raises(ValueError, match="Invalid session token"): auth_service.unwrap_session_token_with_rotation( signed_token, "secret-b", "secret-c" ) async def test_unwrap_with_rotation_prefers_current_secret( self, db: aiosqlite.Connection ) -> None: """If both secrets are valid, current secret is preferred.""" token_hex = "deadbeef" * 8 current_sig = auth_service._session_token_signature(token_hex, "same-secret") signed_token = f"{token_hex}.{current_sig}" raw_token, was_re_signed = auth_service.unwrap_session_token_with_rotation( signed_token, "same-secret", "same-secret" ) assert raw_token == token_hex assert was_re_signed is False async def test_validate_session_re_signs_token_with_previous_secret( self, db: aiosqlite.Connection ) -> None: """During rotation, tokens signed with previous secret are re-signed.""" signed_token, _, _ = await auth_service.login( db, password="correctpassword1", session_duration_minutes=60, session_secret="old-secret", ) raw_token = auth_service.unwrap_session_token(signed_token, "old-secret") session = await auth_service.validate_session( db, signed_token, session_secret="new-secret", session_secret_previous="old-secret", ) assert session.token == raw_token async def test_validate_session_logs_rotation_event( self, db: aiosqlite.Connection ) -> None: """Validation processes token rotation during validation.""" signed_token, _, _ = await auth_service.login( db, password="correctpassword1", session_duration_minutes=60, session_secret="old-secret", ) session = await auth_service.validate_session( db, signed_token, session_secret="new-secret", session_secret_previous="old-secret", ) assert session is not None assert session.token async def test_logout_accepts_previous_secret( self, db: aiosqlite.Connection ) -> None: """logout() accepts tokens signed with the previous secret.""" from app.repositories import session_repo signed_token, _, _ = await auth_service.login( db, password="correctpassword1", session_duration_minutes=60, session_secret="old-secret", ) raw_token = auth_service.unwrap_session_token(signed_token, "old-secret") await auth_service.logout( db, signed_token, session_secret="new-secret", session_secret_previous="old-secret", ) stored = await session_repo.get_session(db, raw_token) assert stored is None async def test_no_re_sign_without_previous_secret( self, db: aiosqlite.Connection ) -> None: """If no previous secret is configured, old tokens are rejected.""" signed_token, _, _ = await auth_service.login( db, password="correctpassword1", session_duration_minutes=60, session_secret="old-secret", ) with pytest.raises(ValueError, match="invalid"): await auth_service.validate_session( db, signed_token, session_secret="new-secret", session_secret_previous=None, )