"""Tests for auth_service.""" from __future__ import annotations import asyncio import inspect from pathlib import Path import aiosqlite import pytest from app.db import init_db from app.services import auth_service, setup_service @pytest.fixture async def db(tmp_path: Path) -> aiosqlite.Connection: # type: ignore[misc] """Provide an initialised DB with setup already complete.""" conn: aiosqlite.Connection = await aiosqlite.connect(str(tmp_path / "auth.db")) conn.row_factory = aiosqlite.Row await init_db(conn) # Pre-run setup so auth operations have a password hash to check. await setup_service.run_setup( conn, master_password="correctpassword1", database_path="bangui.db", fail2ban_socket="/var/run/fail2ban/fail2ban.sock", timezone="UTC", session_duration_minutes=60, ) yield conn await conn.close() @pytest.fixture async def db_no_setup(tmp_path: Path) -> aiosqlite.Connection: # type: ignore[misc] """Provide an initialised DB with no setup performed.""" conn: aiosqlite.Connection = await aiosqlite.connect(str(tmp_path / "auth_nosetup.db")) conn.row_factory = aiosqlite.Row await init_db(conn) yield conn await conn.close() class TestCheckPasswordAsync: async def test_check_password_is_coroutine_function(self) -> None: """_check_password must be a coroutine function (runs in thread executor).""" assert inspect.iscoroutinefunction(auth_service._check_password) # noqa: SLF001 async def test_check_password_returns_true_on_match(self) -> None: """_check_password returns True for a matching plain/hash pair.""" import bcrypt hashed = bcrypt.hashpw(b"secret", bcrypt.gensalt()).decode() result = await auth_service._check_password("secret", hashed) # noqa: SLF001 assert result is True async def test_check_password_returns_false_on_mismatch(self) -> None: """_check_password returns False when the password does not match.""" import bcrypt hashed = bcrypt.hashpw(b"secret", bcrypt.gensalt()).decode() result = await auth_service._check_password("wrong", hashed) # noqa: SLF001 assert result is False async def test_check_password_does_not_block_event_loop(self) -> None: """_check_password awaits without blocking; event-loop tasks can interleave.""" import bcrypt hashed = bcrypt.hashpw(b"secret", bcrypt.gensalt()).decode() # Running two concurrent checks must complete without deadlock. results = await asyncio.gather( auth_service._check_password("secret", hashed), # noqa: SLF001 auth_service._check_password("wrong", hashed), # noqa: SLF001 ) assert results == [True, False] class TestLogin: async def test_login_returns_session_on_correct_password( self, db: aiosqlite.Connection ) -> None: """login() returns a Session on the correct password.""" session = await auth_service.login(db, password="correctpassword1", session_duration_minutes=60) assert session.token assert len(session.token) == 64 # 32 bytes → 64 hex chars assert session.expires_at > session.created_at async def test_login_raises_on_wrong_password( self, db: aiosqlite.Connection ) -> None: """login() raises ValueError for an incorrect password.""" with pytest.raises(ValueError, match="Incorrect password"): await auth_service.login(db, password="wrongpassword", session_duration_minutes=60) async def test_login_raises_when_no_hash_configured( self, db_no_setup: aiosqlite.Connection ) -> None: """login() raises ValueError when setup has not been run.""" with pytest.raises(ValueError, match="No password is configured"): await auth_service.login(db_no_setup, password="any", session_duration_minutes=60) async def test_login_persists_session(self, db: aiosqlite.Connection) -> None: """login() stores the session in the database.""" from app.repositories import session_repo session = await auth_service.login(db, password="correctpassword1", session_duration_minutes=60) stored = await session_repo.get_session(db, session.token) assert stored is not None assert stored.token == session.token class TestValidateSession: async def test_validate_returns_session_for_valid_token( self, db: aiosqlite.Connection ) -> None: """validate_session() returns the session for a valid token.""" session = await auth_service.login(db, password="correctpassword1", session_duration_minutes=60) validated = await auth_service.validate_session(db, session.token) assert validated.token == session.token async def test_validate_raises_for_unknown_token( self, db: aiosqlite.Connection ) -> None: """validate_session() raises ValueError for a non-existent token.""" with pytest.raises(ValueError, match="not found"): await auth_service.validate_session(db, "deadbeef" * 8) async def test_validate_raises_for_expired_session( self, db: aiosqlite.Connection ) -> None: """validate_session() raises ValueError and removes an expired session.""" from app.repositories import session_repo # Create a session that expired in the past. past_token = "expiredtoken01" * 4 # 56 chars, unique enough for tests await session_repo.create_session( db, token=past_token, created_at="2000-01-01T00:00:00+00:00", expires_at="2000-01-01T01:00:00+00:00", ) with pytest.raises(ValueError, match="expired"): await auth_service.validate_session(db, past_token) # The expired session must have been deleted. assert await session_repo.get_session(db, past_token) is None class TestLogout: async def test_logout_removes_session(self, db: aiosqlite.Connection) -> None: """logout() deletes the session so it can no longer be validated.""" from app.repositories import session_repo session = await auth_service.login(db, password="correctpassword1", session_duration_minutes=60) await auth_service.logout(db, session.token) stored = await session_repo.get_session(db, session.token) assert stored is None