"""Authentication service for Aniworld. Responsibilities: - Setup and validate a master password (hashed with bcrypt via passlib) - Issue and validate JWT access tokens - Track failed login attempts and apply temporary lockouts - Provide simple session model creation data This service is intentionally small and synchronous; FastAPI endpoints can call it from async routes via threadpool if needed. """ from __future__ import annotations import hashlib from datetime import datetime, timedelta from typing import Dict, Optional from jose import JWTError, jwt # type: ignore from passlib.context import CryptContext pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") from src.config.settings import settings from src.server.models.auth import LoginResponse, SessionModel class AuthError(Exception): pass class LockedOutError(AuthError): pass class AuthService: """Service to manage master password and JWT sessions. Notes: - Master password hash is stored in settings.master_password_hash when available. For persistence beyond environment variables, a proper config persistence should be used (not implemented here). - Lockout policy is kept in-memory and will reset when the process restarts. This is acceptable for single-process deployments. """ def __init__(self) -> None: self._hash: Optional[str] = settings.master_password_hash # In-memory failed attempts per identifier. Values are dicts with # keys: count, last, locked_until self._failed: Dict[str, Dict] = {} # Policy self.max_attempts = 5 self.lockout_seconds = 300 # 5 minutes self.token_expiry_hours = settings.token_expiry_hours or 24 self.secret = settings.jwt_secret_key # --- password helpers --- def _hash_password(self, password: str) -> str: return pwd_context.hash(password) def _verify_password(self, plain: str, hashed: str) -> bool: try: return pwd_context.verify(plain, hashed) except Exception: return False def is_configured(self) -> bool: return bool(self._hash) def setup_master_password(self, password: str) -> None: """Set the master password (hash and store in memory/settings). For now we update only the in-memory value and settings.master_password_hash. A future task should persist this to a config file. """ if len(password) < 8: raise ValueError("Password must be at least 8 characters long") # Basic strength checks if password.islower() or password.isupper(): raise ValueError("Password must include mixed case") if password.isalnum(): # encourage a special character raise ValueError("Password should include a symbol or punctuation") h = self._hash_password(password) self._hash = h # Mirror into settings for simple persistence via env (if used) try: settings.master_password_hash = h except Exception: # Settings may be frozen or not persisted - that's okay for now pass # --- failed attempts and lockout --- def _get_fail_record(self, identifier: str) -> Dict: return self._failed.setdefault( identifier, {"count": 0, "last": None, "locked_until": None}, ) def _record_failure(self, identifier: str) -> None: rec = self._get_fail_record(identifier) rec["count"] += 1 rec["last"] = datetime.utcnow() if rec["count"] >= self.max_attempts: rec["locked_until"] = ( datetime.utcnow() + timedelta(seconds=self.lockout_seconds) ) def _clear_failures(self, identifier: str) -> None: if identifier in self._failed: self._failed.pop(identifier, None) def _check_locked(self, identifier: str) -> None: rec = self._get_fail_record(identifier) lu = rec.get("locked_until") if lu and datetime.utcnow() < lu: raise LockedOutError( "Too many failed attempts - temporarily locked out" ) if lu and datetime.utcnow() >= lu: # lock expired, reset self._failed[identifier] = { "count": 0, "last": None, "locked_until": None, } # --- authentication --- def validate_master_password( self, password: str, identifier: str = "global" ) -> bool: """Validate provided password against stored master hash. identifier: string to track failed attempts (IP, user, or 'global'). """ # Check lockout self._check_locked(identifier) if not self._hash: raise AuthError("Master password not configured") ok = self._verify_password(password, self._hash) if not ok: self._record_failure(identifier) return False # success self._clear_failures(identifier) return True # --- JWT tokens --- def create_access_token( self, subject: str = "master", remember: bool = False ) -> LoginResponse: expiry = datetime.utcnow() + timedelta( hours=(168 if remember else self.token_expiry_hours) ) payload = { "sub": subject, "exp": int(expiry.timestamp()), "iat": int(datetime.utcnow().timestamp()), } token = jwt.encode(payload, self.secret, algorithm="HS256") return LoginResponse( access_token=token, token_type="bearer", expires_at=expiry ) def decode_token(self, token: str) -> Dict: try: data = jwt.decode(token, self.secret, algorithms=["HS256"]) return data except JWTError as e: raise AuthError("Invalid token") from e def create_session_model(self, token: str) -> SessionModel: data = self.decode_token(token) exp_val = data.get("exp") expires_at = ( datetime.utcfromtimestamp(exp_val) if exp_val is not None else None ) return SessionModel( session_id=hashlib.sha256(token.encode()).hexdigest(), user=data.get("sub"), expires_at=expires_at, ) def revoke_token(self, token: str) -> None: # For JWT stateless tokens we can't revoke without a store. This # is a placeholder. A real implementation would add the token jti # to a revocation list. return None # Singleton service instance for import convenience auth_service = AuthService()