Aniworld/src/server/services/auth_service.py
Lukas a3651e0e47 fix: load configuration from config.json and fix authentication
- Load anime_directory and master_password_hash from config.json on startup
- Sync configuration from config.json to settings object in fastapi_app.py
- Update dependencies.py to load config from JSON if not in environment
- Fix app.js to use makeAuthenticatedRequest() for all authenticated API calls
- Fix API endpoint paths from /api/v1/anime to /api/anime
- Update auth_service.py to load master_password_hash from config.json
- Update auth.py setup endpoint to save master_password_hash to config
- Fix rate limiting code to satisfy type checker
- Update config.json with test master password hash

Fixes:
- 401 Unauthorized errors on /api/anime endpoint
- 503 Service Unavailable errors on /api/anime/process/locks
- Configuration not being loaded from config.json file
- Authentication flow now works end-to-end with JWT tokens
2025-10-24 20:55:10 +02:00

261 lines
8.6 KiB
Python

"""Authentication service for Aniworld.
Responsibilities:
- Setup and validate a master password (hashed with bcrypt via passlib)
- Issue and validate JWT access tokens
- Track failed login attempts and apply temporary lockouts
- Provide simple session model creation data
This service is intentionally small and synchronous; FastAPI endpoints
can call it from async routes via threadpool if needed.
"""
from __future__ import annotations
import hashlib
from datetime import datetime, timedelta, timezone
from typing import Dict, Optional
from jose import JWTError, jwt # type: ignore
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
from src.config.settings import settings
from src.server.models.auth import LoginResponse, SessionModel
class AuthError(Exception):
pass
class LockedOutError(AuthError):
pass
class AuthService:
"""Service to manage master password and JWT sessions.
Notes:
- Master password hash is stored in settings.master_password_hash when
available. For persistence beyond environment variables, a proper
config persistence should be used (not implemented here).
- Lockout policy is kept in-memory and will reset when the process
restarts. This is acceptable for single-process deployments.
"""
def __init__(self) -> None:
# Try to load master password hash from config file first
# If not found, fallback to environment variable
self._hash: Optional[str] = None
# Try loading from config file
try:
from src.server.services.config_service import get_config_service
config_service = get_config_service()
config = config_service.load_config()
hash_val = config.other.get('master_password_hash')
if isinstance(hash_val, str):
self._hash = hash_val
except Exception:
# Config doesn't exist or can't be loaded - that's OK
pass
# If not in config, try environment variable
if not self._hash:
self._hash = settings.master_password_hash
# In-memory failed attempts per identifier. Values are dicts with
# keys: count, last, locked_until
# WARNING: In-memory storage resets on process restart.
# This is acceptable for development but PRODUCTION deployments
# should use Redis or a database to persist failed login attempts
# and prevent bypass via process restart.
self._failed: Dict[str, Dict] = {}
# Policy
self.max_attempts = 5
self.lockout_seconds = 300 # 5 minutes
self.token_expiry_hours = settings.token_expiry_hours or 24
self.secret = settings.jwt_secret_key
# --- password helpers ---
def _hash_password(self, password: str) -> str:
return pwd_context.hash(password)
def _verify_password(self, plain: str, hashed: str) -> bool:
"""Verify a password against a hash.
Args:
plain: Plain text password
hashed: Hashed password
Returns:
bool: True if password matches, False otherwise
"""
try:
return pwd_context.verify(plain, hashed)
except Exception:
return False
def is_configured(self) -> bool:
return bool(self._hash)
def setup_master_password(self, password: str) -> str:
"""Set the master password (hash and store in memory/settings).
Enforces strong password requirements:
- Minimum 8 characters
- Mixed case (upper and lower)
- At least one number
- At least one special character
For now we update only the in-memory value and
settings.master_password_hash. Caller should persist the returned
hash to a config file.
Args:
password: The password to set
Returns:
str: The hashed password
Raises:
ValueError: If password doesn't meet requirements
"""
# Length check
if len(password) < 8:
raise ValueError("Password must be at least 8 characters long")
# Mixed case check
if password.islower() or password.isupper():
raise ValueError(
"Password must include both uppercase and lowercase letters"
)
# Number check
if not any(c.isdigit() for c in password):
raise ValueError("Password must include at least one number")
# Special character check
if password.isalnum():
raise ValueError(
"Password must include at least one special character "
"(symbol or punctuation)"
)
h = self._hash_password(password)
self._hash = h
# Mirror into settings for simple persistence via env (if used)
try:
settings.master_password_hash = h
except Exception:
# Settings may be frozen or not persisted - that's okay for now
pass
return h
# --- failed attempts and lockout ---
def _get_fail_record(self, identifier: str) -> Dict:
return self._failed.setdefault(
identifier,
{"count": 0, "last": None, "locked_until": None},
)
def _record_failure(self, identifier: str) -> None:
rec = self._get_fail_record(identifier)
rec["count"] += 1
rec["last"] = datetime.now(timezone.utc)
if rec["count"] >= self.max_attempts:
rec["locked_until"] = (
datetime.now(timezone.utc) + timedelta(seconds=self.lockout_seconds)
)
def _clear_failures(self, identifier: str) -> None:
if identifier in self._failed:
self._failed.pop(identifier, None)
def _check_locked(self, identifier: str) -> None:
rec = self._get_fail_record(identifier)
lu = rec.get("locked_until")
if lu and datetime.now(timezone.utc) < lu:
raise LockedOutError(
"Too many failed attempts - temporarily locked out"
)
if lu and datetime.now(timezone.utc) >= lu:
# lock expired, reset
self._failed[identifier] = {
"count": 0,
"last": None,
"locked_until": None,
}
# --- authentication ---
def validate_master_password(
self, password: str, identifier: str = "global"
) -> bool:
"""Validate provided password against stored master hash.
identifier: string to track failed attempts (IP, user, or 'global').
"""
# Check lockout
self._check_locked(identifier)
if not self._hash:
raise AuthError("Master password not configured")
ok = self._verify_password(password, self._hash)
if not ok:
self._record_failure(identifier)
return False
# success
self._clear_failures(identifier)
return True
# --- JWT tokens ---
def create_access_token(
self, subject: str = "master", remember: bool = False
) -> LoginResponse:
expiry = datetime.now(timezone.utc) + timedelta(
hours=(168 if remember else self.token_expiry_hours)
)
payload = {
"sub": subject,
"exp": int(expiry.timestamp()),
"iat": int(datetime.now(timezone.utc).timestamp()),
}
token = jwt.encode(payload, self.secret, algorithm="HS256")
return LoginResponse(
access_token=token, token_type="bearer", expires_at=expiry
)
def decode_token(self, token: str) -> Dict:
try:
data = jwt.decode(token, self.secret, algorithms=["HS256"])
return data
except JWTError as e:
raise AuthError("Invalid token") from e
def create_session_model(self, token: str) -> SessionModel:
data = self.decode_token(token)
exp_val = data.get("exp")
expires_at = (
datetime.fromtimestamp(exp_val, timezone.utc) if exp_val is not None else None
)
return SessionModel(
session_id=hashlib.sha256(token.encode()).hexdigest(),
user=data.get("sub"),
expires_at=expires_at,
)
def revoke_token(self, token: str) -> None:
# For JWT stateless tokens we can't revoke without a store. This
# is a placeholder. A real implementation would add the token jti
# to a revocation list.
return None
# Singleton service instance for import convenience
auth_service = AuthService()