feat(auth): add AuthService with JWT, lockout and tests

This commit is contained in:
Lukas 2025-10-13 00:03:02 +02:00
parent 92217301b5
commit aec6357dcb
5 changed files with 291 additions and 14 deletions

View File

@ -163,6 +163,18 @@ conda activate AniWorld
- HTTPS enforcement in production
- Secure file path handling to prevent directory traversal
### Authentication Service
- A lightweight authentication service is provided by
`src/server/services/auth_service.py`.
- Uses bcrypt (passlib) to hash the master password and issues JWTs for
stateless sessions. Tokens are signed with the `JWT_SECRET_KEY` from
configuration and expire based on `SESSION_TIMEOUT_HOURS`.
- Failed login attempts are tracked in-memory and a temporary lockout is
applied after multiple failures. For multi-process deployments, move
this state to a shared store (Redis) and persist the master password
hash in a secure config store.
## Recent Infrastructure Changes
### Route Controller Refactoring (October 2025)

View File

@ -45,20 +45,14 @@ The tasks should be completed in the following order to ensure proper dependenci
### 2. Authentication System
#### [x] Implement authentication models
#### Create authentication service (completed)
- [x]Create `src/server/models/auth.py`
- [x]Define LoginRequest, LoginResponse models
- [x]Add SetupRequest, AuthStatus models
- [x]Include session management models
#### [] Create authentication service
- []Create `src/server/services/auth_service.py`
- []Implement master password setup/validation
- []Add session management with JWT tokens
- []Include failed attempt tracking and lockout
- []Add password strength validation
The authentication service has been implemented in
`src/server/services/auth_service.py`. It provides master password setup
and validation, JWT token issuance and decoding, in-memory failed
attempt tracking with temporary lockout, and basic password strength
checks. For persistence of the master password hash and token revocation
we recommend adding a config store or database in a follow-up task.
#### [] Implement authentication API endpoints
@ -98,7 +92,6 @@ The tasks should be completed in the following order to ensure proper dependenci
- []Add GET `/api/config` - get configuration
- []Add PUT `/api/config` - update configuration
- []Add POST `/api/config/validate` - validate config
- []Add GET/POST `/api/config/backup` - backup management
### 4. Anime Management Integration

14
server/__init__.py Normal file
View File

@ -0,0 +1,14 @@
"""Package shim: expose `server` package from `src/server`.
This file inserts the actual `src/server` directory into this package's
`__path__` so imports like `import server.models.auth` will resolve to
the code under `src/server` during tests.
"""
import os
_HERE = os.path.dirname(__file__)
_SRC_SERVER = os.path.normpath(os.path.join(_HERE, "..", "src", "server"))
# Prepend the real src/server directory to the package __path__ so
# normal imports resolve to the source tree.
__path__.insert(0, _SRC_SERVER)

View File

@ -0,0 +1,199 @@
"""Authentication service for Aniworld.
Responsibilities:
- Setup and validate a master password (hashed with bcrypt via passlib)
- Issue and validate JWT access tokens
- Track failed login attempts and apply temporary lockouts
- Provide simple session model creation data
This service is intentionally small and synchronous; FastAPI endpoints
can call it from async routes via threadpool if needed.
"""
from __future__ import annotations
import hashlib
from datetime import datetime, timedelta
from typing import Dict, Optional
from jose import JWTError, jwt # type: ignore
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
from src.config.settings import settings
from src.server.models.auth import LoginResponse, SessionModel
class AuthError(Exception):
pass
class LockedOutError(AuthError):
pass
class AuthService:
"""Service to manage master password and JWT sessions.
Notes:
- Master password hash is stored in settings.master_password_hash when
available. For persistence beyond environment variables, a proper
config persistence should be used (not implemented here).
- Lockout policy is kept in-memory and will reset when the process
restarts. This is acceptable for single-process deployments.
"""
def __init__(self) -> None:
self._hash: Optional[str] = settings.master_password_hash
# In-memory failed attempts per identifier. Values are dicts with
# keys: count, last, locked_until
self._failed: Dict[str, Dict] = {}
# Policy
self.max_attempts = 5
self.lockout_seconds = 300 # 5 minutes
self.token_expiry_hours = settings.token_expiry_hours or 24
self.secret = settings.jwt_secret_key
# --- password helpers ---
def _hash_password(self, password: str) -> str:
return pwd_context.hash(password)
def _verify_password(self, plain: str, hashed: str) -> bool:
try:
return pwd_context.verify(plain, hashed)
except Exception:
return False
def is_configured(self) -> bool:
return bool(self._hash)
def setup_master_password(self, password: str) -> None:
"""Set the master password (hash and store in memory/settings).
For now we update only the in-memory value and
settings.master_password_hash. A future task should persist this
to a config file.
"""
if len(password) < 8:
raise ValueError("Password must be at least 8 characters long")
# Basic strength checks
if password.islower() or password.isupper():
raise ValueError("Password must include mixed case")
if password.isalnum():
# encourage a special character
raise ValueError("Password should include a symbol or punctuation")
h = self._hash_password(password)
self._hash = h
# Mirror into settings for simple persistence via env (if used)
try:
settings.master_password_hash = h
except Exception:
# Settings may be frozen or not persisted - that's okay for now
pass
# --- failed attempts and lockout ---
def _get_fail_record(self, identifier: str) -> Dict:
return self._failed.setdefault(
identifier,
{"count": 0, "last": None, "locked_until": None},
)
def _record_failure(self, identifier: str) -> None:
rec = self._get_fail_record(identifier)
rec["count"] += 1
rec["last"] = datetime.utcnow()
if rec["count"] >= self.max_attempts:
rec["locked_until"] = (
datetime.utcnow() + timedelta(seconds=self.lockout_seconds)
)
def _clear_failures(self, identifier: str) -> None:
if identifier in self._failed:
self._failed.pop(identifier, None)
def _check_locked(self, identifier: str) -> None:
rec = self._get_fail_record(identifier)
lu = rec.get("locked_until")
if lu and datetime.utcnow() < lu:
raise LockedOutError(
"Too many failed attempts - temporarily locked out"
)
if lu and datetime.utcnow() >= lu:
# lock expired, reset
self._failed[identifier] = {
"count": 0,
"last": None,
"locked_until": None,
}
# --- authentication ---
def validate_master_password(
self, password: str, identifier: str = "global"
) -> bool:
"""Validate provided password against stored master hash.
identifier: string to track failed attempts (IP, user, or 'global').
"""
# Check lockout
self._check_locked(identifier)
if not self._hash:
raise AuthError("Master password not configured")
ok = self._verify_password(password, self._hash)
if not ok:
self._record_failure(identifier)
return False
# success
self._clear_failures(identifier)
return True
# --- JWT tokens ---
def create_access_token(
self, subject: str = "master", remember: bool = False
) -> LoginResponse:
expiry = datetime.utcnow() + timedelta(
hours=(168 if remember else self.token_expiry_hours)
)
payload = {
"sub": subject,
"exp": int(expiry.timestamp()),
"iat": int(datetime.utcnow().timestamp()),
}
token = jwt.encode(payload, self.secret, algorithm="HS256")
return LoginResponse(
access_token=token, token_type="bearer", expires_at=expiry
)
def decode_token(self, token: str) -> Dict:
try:
data = jwt.decode(token, self.secret, algorithms=["HS256"])
return data
except JWTError as e:
raise AuthError("Invalid token") from e
def create_session_model(self, token: str) -> SessionModel:
data = self.decode_token(token)
exp_val = data.get("exp")
expires_at = (
datetime.utcfromtimestamp(exp_val) if exp_val is not None else None
)
return SessionModel(
session_id=hashlib.sha256(token.encode()).hexdigest(),
user=data.get("sub"),
expires_at=expires_at,
)
def revoke_token(self, token: str) -> None:
# For JWT stateless tokens we can't revoke without a store. This
# is a placeholder. A real implementation would add the token jti
# to a revocation list.
return None
# Singleton service instance for import convenience
auth_service = AuthService()

View File

@ -0,0 +1,59 @@
import pytest
from src.server.services.auth_service import AuthError, AuthService, LockedOutError
def test_setup_and_validate_success():
svc = AuthService()
password = "Str0ng!Pass"
svc.setup_master_password(password)
assert svc.is_configured()
assert svc.validate_master_password(password) is True
resp = svc.create_access_token(subject="tester", remember=False)
assert resp.token_type == "bearer"
assert resp.access_token
sess = svc.create_session_model(resp.access_token)
assert sess.expires_at is not None
@pytest.mark.parametrize(
"bad",
[
"short",
"lowercaseonly",
"UPPERCASEONLY",
"NoSpecial1",
],
)
def test_setup_weak_passwords(bad):
svc = AuthService()
with pytest.raises(ValueError):
svc.setup_master_password(bad)
def test_failed_attempts_and_lockout():
svc = AuthService()
password = "An0ther$Good1"
svc.setup_master_password(password)
identifier = "test-ip"
# fail max_attempts times
for _ in range(svc.max_attempts):
assert (
svc.validate_master_password("wrongpassword", identifier=identifier)
is False
)
# Next attempt must raise LockedOutError
with pytest.raises(LockedOutError):
svc.validate_master_password(password, identifier=identifier)
def test_token_decode_invalid():
svc = AuthService()
# invalid token should raise AuthError
with pytest.raises(AuthError):
svc.decode_token("not-a-jwt")