- Migrate settings.py to Pydantic V2 (SettingsConfigDict, validation_alias) - Update config models to use @field_validator with @classmethod - Replace deprecated datetime.utcnow() with datetime.now(timezone.utc) - Migrate FastAPI app from @app.on_event to lifespan context manager - Implement comprehensive rate limiting middleware with: * Endpoint-specific rate limits (login: 5/min, register: 3/min) * IP-based and user-based tracking * Authenticated user multiplier (2x limits) * Bypass paths for health, docs, static, websocket endpoints * Rate limit headers in responses - Add 13 comprehensive tests for rate limiting (all passing) - Update instructions.md to mark completed tasks - Fix asyncio.create_task usage in anime_service.py All 714 tests passing. No deprecation warnings.
304 lines
9.8 KiB
Python
304 lines
9.8 KiB
Python
"""Unit tests for AuthService.
|
|
|
|
Tests cover password setup and validation, JWT token operations,
|
|
session management, lockout mechanism, and error handling.
|
|
"""
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import pytest
|
|
|
|
from src.server.services.auth_service import AuthError, AuthService, LockedOutError
|
|
|
|
|
|
class TestPasswordSetup:
|
|
"""Test password setup and validation."""
|
|
|
|
def test_setup_and_validate_success(self):
|
|
"""Test successful password setup and validation."""
|
|
svc = AuthService()
|
|
password = "Str0ng!Pass"
|
|
svc.setup_master_password(password)
|
|
|
|
assert svc.is_configured()
|
|
assert svc.validate_master_password(password) is True
|
|
|
|
@pytest.mark.parametrize(
|
|
"bad",
|
|
[
|
|
"short",
|
|
"lowercaseonly",
|
|
"UPPERCASEONLY",
|
|
"NoSpecial1",
|
|
],
|
|
)
|
|
def test_setup_weak_passwords(self, bad):
|
|
"""Test that weak passwords are rejected."""
|
|
svc = AuthService()
|
|
with pytest.raises(ValueError):
|
|
svc.setup_master_password(bad)
|
|
|
|
def test_password_length_validation(self):
|
|
"""Test minimum password length validation."""
|
|
svc = AuthService()
|
|
with pytest.raises(ValueError, match="at least 8 characters"):
|
|
svc.setup_master_password("Short1!")
|
|
|
|
def test_password_case_validation(self):
|
|
"""Test mixed case requirement."""
|
|
svc = AuthService()
|
|
with pytest.raises(ValueError, match="uppercase and lowercase"):
|
|
svc.setup_master_password("alllowercase1!")
|
|
|
|
with pytest.raises(ValueError, match="uppercase and lowercase"):
|
|
svc.setup_master_password("ALLUPPERCASE1!")
|
|
|
|
def test_password_special_char_validation(self):
|
|
"""Test special character requirement."""
|
|
svc = AuthService()
|
|
with pytest.raises(
|
|
ValueError, match="symbol or punctuation"
|
|
):
|
|
svc.setup_master_password("NoSpecial123")
|
|
|
|
def test_validate_without_setup_raises_error(self):
|
|
"""Test validation without password setup raises error."""
|
|
svc = AuthService()
|
|
# Clear any hash that might come from settings
|
|
svc._hash = None
|
|
|
|
with pytest.raises(AuthError, match="not configured"):
|
|
svc.validate_master_password("anypassword")
|
|
|
|
def test_validate_wrong_password(self):
|
|
"""Test validation with wrong password."""
|
|
svc = AuthService()
|
|
svc.setup_master_password("Correct!Pass123")
|
|
|
|
assert svc.validate_master_password("Wrong!Pass123") is False
|
|
|
|
|
|
class TestFailedAttemptsAndLockout:
|
|
"""Test failed login attempts and lockout mechanism."""
|
|
|
|
def test_failed_attempts_and_lockout(self):
|
|
"""Test lockout after max failed attempts."""
|
|
svc = AuthService()
|
|
password = "An0ther$Good1"
|
|
svc.setup_master_password(password)
|
|
|
|
identifier = "test-ip"
|
|
# fail max_attempts times
|
|
for _ in range(svc.max_attempts):
|
|
assert (
|
|
svc.validate_master_password(
|
|
"wrongpassword", identifier=identifier
|
|
)
|
|
is False
|
|
)
|
|
|
|
# Next attempt must raise LockedOutError
|
|
with pytest.raises(LockedOutError):
|
|
svc.validate_master_password(password, identifier=identifier)
|
|
|
|
def test_lockout_different_identifiers(self):
|
|
"""Test that lockout is per identifier."""
|
|
svc = AuthService()
|
|
password = "Valid!Pass123"
|
|
svc.setup_master_password(password)
|
|
|
|
# Fail attempts for identifier1
|
|
for _ in range(svc.max_attempts):
|
|
svc.validate_master_password("wrong", identifier="id1")
|
|
|
|
# identifier1 should be locked
|
|
with pytest.raises(LockedOutError):
|
|
svc.validate_master_password(password, identifier="id1")
|
|
|
|
# identifier2 should still work
|
|
assert (
|
|
svc.validate_master_password(password, identifier="id2")
|
|
is True
|
|
)
|
|
|
|
def test_successful_login_clears_failures(self):
|
|
"""Test that successful login clears failure count."""
|
|
svc = AuthService()
|
|
password = "Valid!Pass123"
|
|
svc.setup_master_password(password)
|
|
|
|
identifier = "test-ip"
|
|
# Fail a few times (but not enough to lock)
|
|
for _ in range(svc.max_attempts - 1):
|
|
svc.validate_master_password("wrong", identifier=identifier)
|
|
|
|
# Successful login should clear failures
|
|
assert (
|
|
svc.validate_master_password(password, identifier=identifier)
|
|
is True
|
|
)
|
|
|
|
# Should be able to fail again without lockout
|
|
for _ in range(svc.max_attempts - 1):
|
|
svc.validate_master_password("wrong", identifier=identifier)
|
|
|
|
# Should still not be locked
|
|
assert (
|
|
svc.validate_master_password(password, identifier=identifier)
|
|
is True
|
|
)
|
|
|
|
|
|
class TestJWTTokens:
|
|
"""Test JWT token creation and validation."""
|
|
|
|
def test_create_access_token(self):
|
|
"""Test JWT token creation."""
|
|
svc = AuthService()
|
|
password = "Str0ng!Pass"
|
|
svc.setup_master_password(password)
|
|
|
|
resp = svc.create_access_token(subject="tester", remember=False)
|
|
|
|
assert resp.token_type == "bearer"
|
|
assert resp.access_token
|
|
assert resp.expires_at is not None
|
|
|
|
def test_create_token_with_remember(self):
|
|
"""Test JWT token with remember=True has longer expiry."""
|
|
svc = AuthService()
|
|
password = "Str0ng!Pass"
|
|
svc.setup_master_password(password)
|
|
|
|
resp_normal = svc.create_access_token(
|
|
subject="tester", remember=False
|
|
)
|
|
resp_remember = svc.create_access_token(
|
|
subject="tester", remember=True
|
|
)
|
|
|
|
# Remember token should expire later
|
|
assert resp_remember.expires_at > resp_normal.expires_at
|
|
|
|
def test_decode_valid_token(self):
|
|
"""Test decoding valid JWT token."""
|
|
svc = AuthService()
|
|
password = "Str0ng!Pass"
|
|
svc.setup_master_password(password)
|
|
|
|
resp = svc.create_access_token(subject="tester", remember=False)
|
|
decoded = svc.decode_token(resp.access_token)
|
|
|
|
assert decoded["sub"] == "tester"
|
|
assert "exp" in decoded
|
|
assert "iat" in decoded
|
|
|
|
def test_token_decode_invalid(self):
|
|
"""Test that invalid token raises AuthError."""
|
|
svc = AuthService()
|
|
|
|
with pytest.raises(AuthError):
|
|
svc.decode_token("not-a-jwt")
|
|
|
|
def test_decode_malformed_token(self):
|
|
"""Test decoding malformed JWT token."""
|
|
svc = AuthService()
|
|
|
|
with pytest.raises(AuthError):
|
|
svc.decode_token("header.payload.signature")
|
|
|
|
def test_decode_expired_token(self):
|
|
"""Test decoding expired token."""
|
|
svc = AuthService()
|
|
password = "Str0ng!Pass"
|
|
svc.setup_master_password(password)
|
|
|
|
# Create a token with past expiry
|
|
from jose import jwt
|
|
|
|
expired_payload = {
|
|
"sub": "tester",
|
|
"exp": int((datetime.now(timezone.utc) - timedelta(hours=1)).timestamp()),
|
|
"iat": int(datetime.now(timezone.utc).timestamp()),
|
|
}
|
|
expired_token = jwt.encode(
|
|
expired_payload, svc.secret, algorithm="HS256"
|
|
)
|
|
|
|
with pytest.raises(AuthError):
|
|
svc.decode_token(expired_token)
|
|
|
|
|
|
class TestSessionManagement:
|
|
"""Test session model creation and management."""
|
|
|
|
def test_create_session_model(self):
|
|
"""Test session model creation from token."""
|
|
svc = AuthService()
|
|
password = "Str0ng!Pass"
|
|
svc.setup_master_password(password)
|
|
|
|
resp = svc.create_access_token(subject="tester", remember=False)
|
|
sess = svc.create_session_model(resp.access_token)
|
|
|
|
assert sess.session_id
|
|
assert sess.user == "tester"
|
|
assert sess.expires_at is not None
|
|
|
|
def test_session_id_deterministic(self):
|
|
"""Test that same token produces same session ID."""
|
|
svc = AuthService()
|
|
password = "Str0ng!Pass"
|
|
svc.setup_master_password(password)
|
|
|
|
resp = svc.create_access_token(subject="tester", remember=False)
|
|
sess1 = svc.create_session_model(resp.access_token)
|
|
sess2 = svc.create_session_model(resp.access_token)
|
|
|
|
assert sess1.session_id == sess2.session_id
|
|
|
|
def test_revoke_token(self):
|
|
"""Test token revocation (placeholder)."""
|
|
svc = AuthService()
|
|
password = "Str0ng!Pass"
|
|
svc.setup_master_password(password)
|
|
|
|
resp = svc.create_access_token(subject="tester", remember=False)
|
|
|
|
# Currently a no-op, should not raise
|
|
result = svc.revoke_token(resp.access_token)
|
|
assert result is None
|
|
|
|
|
|
class TestServiceConfiguration:
|
|
"""Test service configuration and initialization."""
|
|
|
|
def test_is_configured_initial_state(self):
|
|
"""Test initial unconfigured state."""
|
|
svc = AuthService()
|
|
# Clear any hash that might come from settings
|
|
svc._hash = None
|
|
|
|
assert svc.is_configured() is False
|
|
|
|
def test_is_configured_after_setup(self):
|
|
"""Test configured state after setup."""
|
|
svc = AuthService()
|
|
svc.setup_master_password("Valid!Pass123")
|
|
assert svc.is_configured() is True
|
|
|
|
def test_custom_lockout_settings(self):
|
|
"""Test custom lockout configuration."""
|
|
svc = AuthService()
|
|
|
|
# Verify default values
|
|
assert svc.max_attempts == 5
|
|
assert svc.lockout_seconds == 300
|
|
assert svc.token_expiry_hours == 24
|
|
|
|
# Custom settings should be modifiable
|
|
svc.max_attempts = 3
|
|
svc.lockout_seconds = 600
|
|
|
|
assert svc.max_attempts == 3
|
|
assert svc.lockout_seconds == 600
|