Add comprehensive unit tests for core services (93 tests)
This commit is contained in:
@@ -1,59 +1,303 @@
|
||||
"""Unit tests for AuthService.
|
||||
|
||||
Tests cover password setup and validation, JWT token operations,
|
||||
session management, lockout mechanism, and error handling.
|
||||
"""
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import pytest
|
||||
|
||||
from src.server.services.auth_service import AuthError, AuthService, LockedOutError
|
||||
|
||||
|
||||
def test_setup_and_validate_success():
|
||||
svc = AuthService()
|
||||
password = "Str0ng!Pass"
|
||||
svc.setup_master_password(password)
|
||||
assert svc.is_configured()
|
||||
class TestPasswordSetup:
|
||||
"""Test password setup and validation."""
|
||||
|
||||
assert svc.validate_master_password(password) is True
|
||||
def test_setup_and_validate_success(self):
|
||||
"""Test successful password setup and validation."""
|
||||
svc = AuthService()
|
||||
password = "Str0ng!Pass"
|
||||
svc.setup_master_password(password)
|
||||
|
||||
assert svc.is_configured()
|
||||
assert svc.validate_master_password(password) is True
|
||||
|
||||
resp = svc.create_access_token(subject="tester", remember=False)
|
||||
assert resp.token_type == "bearer"
|
||||
assert resp.access_token
|
||||
@pytest.mark.parametrize(
|
||||
"bad",
|
||||
[
|
||||
"short",
|
||||
"lowercaseonly",
|
||||
"UPPERCASEONLY",
|
||||
"NoSpecial1",
|
||||
],
|
||||
)
|
||||
def test_setup_weak_passwords(self, bad):
|
||||
"""Test that weak passwords are rejected."""
|
||||
svc = AuthService()
|
||||
with pytest.raises(ValueError):
|
||||
svc.setup_master_password(bad)
|
||||
|
||||
sess = svc.create_session_model(resp.access_token)
|
||||
assert sess.expires_at is not None
|
||||
def test_password_length_validation(self):
|
||||
"""Test minimum password length validation."""
|
||||
svc = AuthService()
|
||||
with pytest.raises(ValueError, match="at least 8 characters"):
|
||||
svc.setup_master_password("Short1!")
|
||||
|
||||
def test_password_case_validation(self):
|
||||
"""Test mixed case requirement."""
|
||||
svc = AuthService()
|
||||
with pytest.raises(ValueError, match="mixed case"):
|
||||
svc.setup_master_password("alllowercase1!")
|
||||
|
||||
with pytest.raises(ValueError, match="mixed case"):
|
||||
svc.setup_master_password("ALLUPPERCASE1!")
|
||||
|
||||
def test_password_special_char_validation(self):
|
||||
"""Test special character requirement."""
|
||||
svc = AuthService()
|
||||
with pytest.raises(
|
||||
ValueError, match="symbol or punctuation"
|
||||
):
|
||||
svc.setup_master_password("NoSpecial123")
|
||||
|
||||
def test_validate_without_setup_raises_error(self):
|
||||
"""Test validation without password setup raises error."""
|
||||
svc = AuthService()
|
||||
# Clear any hash that might come from settings
|
||||
svc._hash = None
|
||||
|
||||
with pytest.raises(AuthError, match="not configured"):
|
||||
svc.validate_master_password("anypassword")
|
||||
|
||||
def test_validate_wrong_password(self):
|
||||
"""Test validation with wrong password."""
|
||||
svc = AuthService()
|
||||
svc.setup_master_password("Correct!Pass123")
|
||||
|
||||
assert svc.validate_master_password("Wrong!Pass123") is False
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"bad",
|
||||
[
|
||||
"short",
|
||||
"lowercaseonly",
|
||||
"UPPERCASEONLY",
|
||||
"NoSpecial1",
|
||||
],
|
||||
)
|
||||
def test_setup_weak_passwords(bad):
|
||||
svc = AuthService()
|
||||
with pytest.raises(ValueError):
|
||||
svc.setup_master_password(bad)
|
||||
class TestFailedAttemptsAndLockout:
|
||||
"""Test failed login attempts and lockout mechanism."""
|
||||
|
||||
def test_failed_attempts_and_lockout(self):
|
||||
"""Test lockout after max failed attempts."""
|
||||
svc = AuthService()
|
||||
password = "An0ther$Good1"
|
||||
svc.setup_master_password(password)
|
||||
|
||||
def test_failed_attempts_and_lockout():
|
||||
svc = AuthService()
|
||||
password = "An0ther$Good1"
|
||||
svc.setup_master_password(password)
|
||||
identifier = "test-ip"
|
||||
# fail max_attempts times
|
||||
for _ in range(svc.max_attempts):
|
||||
assert (
|
||||
svc.validate_master_password(
|
||||
"wrongpassword", identifier=identifier
|
||||
)
|
||||
is False
|
||||
)
|
||||
|
||||
identifier = "test-ip"
|
||||
# fail max_attempts times
|
||||
for _ in range(svc.max_attempts):
|
||||
# Next attempt must raise LockedOutError
|
||||
with pytest.raises(LockedOutError):
|
||||
svc.validate_master_password(password, identifier=identifier)
|
||||
|
||||
def test_lockout_different_identifiers(self):
|
||||
"""Test that lockout is per identifier."""
|
||||
svc = AuthService()
|
||||
password = "Valid!Pass123"
|
||||
svc.setup_master_password(password)
|
||||
|
||||
# Fail attempts for identifier1
|
||||
for _ in range(svc.max_attempts):
|
||||
svc.validate_master_password("wrong", identifier="id1")
|
||||
|
||||
# identifier1 should be locked
|
||||
with pytest.raises(LockedOutError):
|
||||
svc.validate_master_password(password, identifier="id1")
|
||||
|
||||
# identifier2 should still work
|
||||
assert (
|
||||
svc.validate_master_password("wrongpassword", identifier=identifier)
|
||||
is False
|
||||
svc.validate_master_password(password, identifier="id2")
|
||||
is True
|
||||
)
|
||||
|
||||
# Next attempt must raise LockedOutError
|
||||
with pytest.raises(LockedOutError):
|
||||
svc.validate_master_password(password, identifier=identifier)
|
||||
def test_successful_login_clears_failures(self):
|
||||
"""Test that successful login clears failure count."""
|
||||
svc = AuthService()
|
||||
password = "Valid!Pass123"
|
||||
svc.setup_master_password(password)
|
||||
|
||||
identifier = "test-ip"
|
||||
# Fail a few times (but not enough to lock)
|
||||
for _ in range(svc.max_attempts - 1):
|
||||
svc.validate_master_password("wrong", identifier=identifier)
|
||||
|
||||
# Successful login should clear failures
|
||||
assert (
|
||||
svc.validate_master_password(password, identifier=identifier)
|
||||
is True
|
||||
)
|
||||
|
||||
# Should be able to fail again without lockout
|
||||
for _ in range(svc.max_attempts - 1):
|
||||
svc.validate_master_password("wrong", identifier=identifier)
|
||||
|
||||
# Should still not be locked
|
||||
assert (
|
||||
svc.validate_master_password(password, identifier=identifier)
|
||||
is True
|
||||
)
|
||||
|
||||
|
||||
def test_token_decode_invalid():
|
||||
svc = AuthService()
|
||||
# invalid token should raise AuthError
|
||||
with pytest.raises(AuthError):
|
||||
svc.decode_token("not-a-jwt")
|
||||
class TestJWTTokens:
|
||||
"""Test JWT token creation and validation."""
|
||||
|
||||
def test_create_access_token(self):
|
||||
"""Test JWT token creation."""
|
||||
svc = AuthService()
|
||||
password = "Str0ng!Pass"
|
||||
svc.setup_master_password(password)
|
||||
|
||||
resp = svc.create_access_token(subject="tester", remember=False)
|
||||
|
||||
assert resp.token_type == "bearer"
|
||||
assert resp.access_token
|
||||
assert resp.expires_at is not None
|
||||
|
||||
def test_create_token_with_remember(self):
|
||||
"""Test JWT token with remember=True has longer expiry."""
|
||||
svc = AuthService()
|
||||
password = "Str0ng!Pass"
|
||||
svc.setup_master_password(password)
|
||||
|
||||
resp_normal = svc.create_access_token(
|
||||
subject="tester", remember=False
|
||||
)
|
||||
resp_remember = svc.create_access_token(
|
||||
subject="tester", remember=True
|
||||
)
|
||||
|
||||
# Remember token should expire later
|
||||
assert resp_remember.expires_at > resp_normal.expires_at
|
||||
|
||||
def test_decode_valid_token(self):
|
||||
"""Test decoding valid JWT token."""
|
||||
svc = AuthService()
|
||||
password = "Str0ng!Pass"
|
||||
svc.setup_master_password(password)
|
||||
|
||||
resp = svc.create_access_token(subject="tester", remember=False)
|
||||
decoded = svc.decode_token(resp.access_token)
|
||||
|
||||
assert decoded["sub"] == "tester"
|
||||
assert "exp" in decoded
|
||||
assert "iat" in decoded
|
||||
|
||||
def test_token_decode_invalid(self):
|
||||
"""Test that invalid token raises AuthError."""
|
||||
svc = AuthService()
|
||||
|
||||
with pytest.raises(AuthError):
|
||||
svc.decode_token("not-a-jwt")
|
||||
|
||||
def test_decode_malformed_token(self):
|
||||
"""Test decoding malformed JWT token."""
|
||||
svc = AuthService()
|
||||
|
||||
with pytest.raises(AuthError):
|
||||
svc.decode_token("header.payload.signature")
|
||||
|
||||
def test_decode_expired_token(self):
|
||||
"""Test decoding expired token."""
|
||||
svc = AuthService()
|
||||
password = "Str0ng!Pass"
|
||||
svc.setup_master_password(password)
|
||||
|
||||
# Create a token with past expiry
|
||||
from jose import jwt
|
||||
|
||||
expired_payload = {
|
||||
"sub": "tester",
|
||||
"exp": int((datetime.utcnow() - timedelta(hours=1)).timestamp()),
|
||||
"iat": int(datetime.utcnow().timestamp()),
|
||||
}
|
||||
expired_token = jwt.encode(
|
||||
expired_payload, svc.secret, algorithm="HS256"
|
||||
)
|
||||
|
||||
with pytest.raises(AuthError):
|
||||
svc.decode_token(expired_token)
|
||||
|
||||
|
||||
class TestSessionManagement:
|
||||
"""Test session model creation and management."""
|
||||
|
||||
def test_create_session_model(self):
|
||||
"""Test session model creation from token."""
|
||||
svc = AuthService()
|
||||
password = "Str0ng!Pass"
|
||||
svc.setup_master_password(password)
|
||||
|
||||
resp = svc.create_access_token(subject="tester", remember=False)
|
||||
sess = svc.create_session_model(resp.access_token)
|
||||
|
||||
assert sess.session_id
|
||||
assert sess.user == "tester"
|
||||
assert sess.expires_at is not None
|
||||
|
||||
def test_session_id_deterministic(self):
|
||||
"""Test that same token produces same session ID."""
|
||||
svc = AuthService()
|
||||
password = "Str0ng!Pass"
|
||||
svc.setup_master_password(password)
|
||||
|
||||
resp = svc.create_access_token(subject="tester", remember=False)
|
||||
sess1 = svc.create_session_model(resp.access_token)
|
||||
sess2 = svc.create_session_model(resp.access_token)
|
||||
|
||||
assert sess1.session_id == sess2.session_id
|
||||
|
||||
def test_revoke_token(self):
|
||||
"""Test token revocation (placeholder)."""
|
||||
svc = AuthService()
|
||||
password = "Str0ng!Pass"
|
||||
svc.setup_master_password(password)
|
||||
|
||||
resp = svc.create_access_token(subject="tester", remember=False)
|
||||
|
||||
# Currently a no-op, should not raise
|
||||
result = svc.revoke_token(resp.access_token)
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestServiceConfiguration:
|
||||
"""Test service configuration and initialization."""
|
||||
|
||||
def test_is_configured_initial_state(self):
|
||||
"""Test initial unconfigured state."""
|
||||
svc = AuthService()
|
||||
# Clear any hash that might come from settings
|
||||
svc._hash = None
|
||||
|
||||
assert svc.is_configured() is False
|
||||
|
||||
def test_is_configured_after_setup(self):
|
||||
"""Test configured state after setup."""
|
||||
svc = AuthService()
|
||||
svc.setup_master_password("Valid!Pass123")
|
||||
assert svc.is_configured() is True
|
||||
|
||||
def test_custom_lockout_settings(self):
|
||||
"""Test custom lockout configuration."""
|
||||
svc = AuthService()
|
||||
|
||||
# Verify default values
|
||||
assert svc.max_attempts == 5
|
||||
assert svc.lockout_seconds == 300
|
||||
assert svc.token_expiry_hours == 24
|
||||
|
||||
# Custom settings should be modifiable
|
||||
svc.max_attempts = 3
|
||||
svc.lockout_seconds = 600
|
||||
|
||||
assert svc.max_attempts == 3
|
||||
assert svc.lockout_seconds == 600
|
||||
|
||||
Reference in New Issue
Block a user