Files
BanGUI/backend/tests/test_services/test_auth_service.py
Lukas 1c0bac1353 refactor: improve backend type safety and import organization
- Add TYPE_CHECKING guards for runtime-expensive imports (aiohttp, aiosqlite)
- Reorganize imports to follow PEP 8 conventions
- Convert TypeAlias to modern PEP 695 type syntax (where appropriate)
- Use Sequence/Mapping from collections.abc for type hints (covariant)
- Replace string literals with cast() for improved type inference
- Fix casting of Fail2BanResponse and TypedDict patterns
- Add IpLookupResult TypedDict for precise return type annotation
- Reformat overlong lines for readability (120 char limit)
- Add asyncio_mode and filterwarnings to pytest config
- Update test fixtures with improved type hints

This improves mypy type checking and makes type relationships explicit.
2026-03-22 14:24:24 +01:00

160 lines
6.3 KiB
Python

"""Tests for auth_service."""
from __future__ import annotations
import asyncio
import inspect
from pathlib import Path
import aiosqlite
import pytest
from app.db import init_db
from app.services import auth_service, setup_service
@pytest.fixture
async def db(tmp_path: Path) -> aiosqlite.Connection: # type: ignore[misc]
"""Provide an initialised DB with setup already complete."""
conn: aiosqlite.Connection = await aiosqlite.connect(str(tmp_path / "auth.db"))
conn.row_factory = aiosqlite.Row
await init_db(conn)
# Pre-run setup so auth operations have a password hash to check.
await setup_service.run_setup(
conn,
master_password="correctpassword1",
database_path="bangui.db",
fail2ban_socket="/var/run/fail2ban/fail2ban.sock",
timezone="UTC",
session_duration_minutes=60,
)
yield conn
await conn.close()
@pytest.fixture
async def db_no_setup(tmp_path: Path) -> aiosqlite.Connection: # type: ignore[misc]
"""Provide an initialised DB with no setup performed."""
conn: aiosqlite.Connection = await aiosqlite.connect(str(tmp_path / "auth_nosetup.db"))
conn.row_factory = aiosqlite.Row
await init_db(conn)
yield conn
await conn.close()
class TestCheckPasswordAsync:
async def test_check_password_is_coroutine_function(self) -> None:
"""_check_password must be a coroutine function (runs in thread executor)."""
assert inspect.iscoroutinefunction(auth_service._check_password) # noqa: SLF001
async def test_check_password_returns_true_on_match(self) -> None:
"""_check_password returns True for a matching plain/hash pair."""
import bcrypt
hashed = bcrypt.hashpw(b"secret", bcrypt.gensalt()).decode()
result = await auth_service._check_password("secret", hashed) # noqa: SLF001
assert result is True
async def test_check_password_returns_false_on_mismatch(self) -> None:
"""_check_password returns False when the password does not match."""
import bcrypt
hashed = bcrypt.hashpw(b"secret", bcrypt.gensalt()).decode()
result = await auth_service._check_password("wrong", hashed) # noqa: SLF001
assert result is False
async def test_check_password_does_not_block_event_loop(self) -> None:
"""_check_password awaits without blocking; event-loop tasks can interleave."""
import bcrypt
hashed = bcrypt.hashpw(b"secret", bcrypt.gensalt()).decode()
# Running two concurrent checks must complete without deadlock.
results = await asyncio.gather(
auth_service._check_password("secret", hashed), # noqa: SLF001
auth_service._check_password("wrong", hashed), # noqa: SLF001
)
assert tuple(results) == (True, False)
class TestLogin:
async def test_login_returns_session_on_correct_password(
self, db: aiosqlite.Connection
) -> None:
"""login() returns a Session on the correct password."""
session = await auth_service.login(db, password="correctpassword1", session_duration_minutes=60)
assert session.token
assert len(session.token) == 64 # 32 bytes → 64 hex chars
assert session.expires_at > session.created_at
async def test_login_raises_on_wrong_password(
self, db: aiosqlite.Connection
) -> None:
"""login() raises ValueError for an incorrect password."""
with pytest.raises(ValueError, match="Incorrect password"):
await auth_service.login(db, password="wrongpassword", session_duration_minutes=60)
async def test_login_raises_when_no_hash_configured(
self, db_no_setup: aiosqlite.Connection
) -> None:
"""login() raises ValueError when setup has not been run."""
with pytest.raises(ValueError, match="No password is configured"):
await auth_service.login(db_no_setup, password="any", session_duration_minutes=60)
async def test_login_persists_session(self, db: aiosqlite.Connection) -> None:
"""login() stores the session in the database."""
from app.repositories import session_repo
session = await auth_service.login(db, password="correctpassword1", session_duration_minutes=60)
stored = await session_repo.get_session(db, session.token)
assert stored is not None
assert stored.token == session.token
class TestValidateSession:
async def test_validate_returns_session_for_valid_token(
self, db: aiosqlite.Connection
) -> None:
"""validate_session() returns the session for a valid token."""
session = await auth_service.login(db, password="correctpassword1", session_duration_minutes=60)
validated = await auth_service.validate_session(db, session.token)
assert validated.token == session.token
async def test_validate_raises_for_unknown_token(
self, db: aiosqlite.Connection
) -> None:
"""validate_session() raises ValueError for a non-existent token."""
with pytest.raises(ValueError, match="not found"):
await auth_service.validate_session(db, "deadbeef" * 8)
async def test_validate_raises_for_expired_session(
self, db: aiosqlite.Connection
) -> None:
"""validate_session() raises ValueError and removes an expired session."""
from app.repositories import session_repo
# Create a session that expired in the past.
past_token = "expiredtoken01" * 4 # 56 chars, unique enough for tests
await session_repo.create_session(
db,
token=past_token,
created_at="2000-01-01T00:00:00+00:00",
expires_at="2000-01-01T01:00:00+00:00",
)
with pytest.raises(ValueError, match="expired"):
await auth_service.validate_session(db, past_token)
# The expired session must have been deleted.
assert await session_repo.get_session(db, past_token) is None
class TestLogout:
async def test_logout_removes_session(self, db: aiosqlite.Connection) -> None:
"""logout() deletes the session so it can no longer be validated."""
from app.repositories import session_repo
session = await auth_service.login(db, password="correctpassword1", session_duration_minutes=60)
await auth_service.logout(db, session.token)
stored = await session_repo.get_session(db, session.token)
assert stored is None