"""Authentication service for Aniworld. Responsibilities: - Setup and validate a master password (hashed with bcrypt via passlib) - Issue and validate JWT access tokens - Track failed login attempts and apply temporary lockouts - Provide simple session model creation data This service is intentionally small and synchronous; FastAPI endpoints can call it from async routes via threadpool if needed. """ from __future__ import annotations import hashlib from datetime import datetime, timedelta, timezone from typing import Dict, Optional from jose import JWTError, jwt # type: ignore from passlib.context import CryptContext pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") from src.config.settings import settings from src.server.models.auth import LoginResponse, SessionModel class AuthError(Exception): pass class LockedOutError(AuthError): pass class AuthService: """Service to manage master password and JWT sessions. Notes: - Master password hash is stored in settings.master_password_hash when available. For persistence beyond environment variables, a proper config persistence should be used (not implemented here). - Lockout policy is kept in-memory and will reset when the process restarts. This is acceptable for single-process deployments. WARNING - SINGLE PROCESS LIMITATION: Failed login attempts are stored in memory dictionaries which RESET when the process restarts. This means: - Attackers can bypass lockouts by triggering a process restart - Lockout state is not shared across multiple workers/processes For production deployments, consider: - Storing failed attempts in database with TTL-based expiration - Using Redis for distributed lockout state - Implementing account-based (not just IP-based) lockout tracking """ def __init__(self) -> None: # Try to load master password hash from config file first # If not found, fallback to environment variable self._hash: Optional[str] = None # Try loading from config file try: from src.server.services.config_service import get_config_service config_service = get_config_service() config = config_service.load_config() hash_val = config.other.get('master_password_hash') if isinstance(hash_val, str): self._hash = hash_val except Exception: # Config doesn't exist or can't be loaded - that's OK pass # If not in config, try environment variable if not self._hash: self._hash = settings.master_password_hash # In-memory failed attempts per identifier. Values are dicts with # keys: count, last, locked_until # WARNING: In-memory storage resets on process restart. # This is acceptable for development but PRODUCTION deployments # should use Redis or a database to persist failed login attempts # and prevent bypass via process restart. self._failed: Dict[str, Dict] = {} # Policy self.max_attempts = 5 self.lockout_seconds = 300 # 5 minutes self.token_expiry_hours = settings.token_expiry_hours or 24 self.secret = settings.jwt_secret_key # --- password helpers --- def _hash_password(self, password: str) -> str: return pwd_context.hash(password) def _verify_password(self, plain: str, hashed: str) -> bool: """Verify a password against a hash. Args: plain: Plain text password hashed: Hashed password Returns: bool: True if password matches, False otherwise """ try: return pwd_context.verify(plain, hashed) except Exception: return False def is_configured(self) -> bool: return bool(self._hash) def setup_master_password(self, password: str) -> str: """Set the master password (hash and store in memory/settings). Enforces strong password requirements: - Minimum 8 characters - Mixed case (upper and lower) - At least one number - At least one special character For now we update only the in-memory value and settings.master_password_hash. Caller should persist the returned hash to a config file. Args: password: The password to set Returns: str: The hashed password Raises: ValueError: If password doesn't meet requirements """ # Length check if len(password) < 8: raise ValueError("Password must be at least 8 characters long") # Mixed case check if password.islower() or password.isupper(): raise ValueError( "Password must include both uppercase and lowercase letters" ) # Number check if not any(c.isdigit() for c in password): raise ValueError("Password must include at least one number") # Special character check if password.isalnum(): raise ValueError( "Password must include at least one special character " "(symbol or punctuation)" ) h = self._hash_password(password) self._hash = h # Mirror into settings for simple persistence via env (if used) try: settings.master_password_hash = h except Exception: # Settings may be frozen or not persisted - that's okay for now pass return h # --- failed attempts and lockout --- def _get_fail_record(self, identifier: str) -> Dict: return self._failed.setdefault( identifier, {"count": 0, "last": None, "locked_until": None}, ) def _record_failure(self, identifier: str) -> None: rec = self._get_fail_record(identifier) rec["count"] += 1 rec["last"] = datetime.now(timezone.utc) if rec["count"] >= self.max_attempts: rec["locked_until"] = ( datetime.now(timezone.utc) + timedelta(seconds=self.lockout_seconds) ) def _clear_failures(self, identifier: str) -> None: if identifier in self._failed: self._failed.pop(identifier, None) def _check_locked(self, identifier: str) -> None: rec = self._get_fail_record(identifier) lu = rec.get("locked_until") if lu and datetime.now(timezone.utc) < lu: raise LockedOutError( "Too many failed attempts - temporarily locked out" ) if lu and datetime.now(timezone.utc) >= lu: # lock expired, reset self._failed[identifier] = { "count": 0, "last": None, "locked_until": None, } # --- authentication --- def validate_master_password( self, password: str, identifier: str = "global" ) -> bool: """Validate provided password against stored master hash. identifier: string to track failed attempts (IP, user, or 'global'). """ # Check lockout self._check_locked(identifier) if not self._hash: raise AuthError("Master password not configured") ok = self._verify_password(password, self._hash) if not ok: self._record_failure(identifier) return False # success self._clear_failures(identifier) return True # --- JWT tokens --- def create_access_token( self, subject: str = "master", remember: bool = False ) -> LoginResponse: expiry = datetime.now(timezone.utc) + timedelta( hours=(168 if remember else self.token_expiry_hours) ) payload = { "sub": subject, "exp": int(expiry.timestamp()), "iat": int(datetime.now(timezone.utc).timestamp()), } token = jwt.encode(payload, self.secret, algorithm="HS256") return LoginResponse( access_token=token, token_type="bearer", expires_at=expiry ) def decode_token(self, token: str) -> Dict: try: data = jwt.decode(token, self.secret, algorithms=["HS256"]) return data except JWTError as e: raise AuthError("Invalid token") from e def create_session_model(self, token: str) -> SessionModel: data = self.decode_token(token) exp_val = data.get("exp") expires_at = ( datetime.fromtimestamp(exp_val, timezone.utc) if exp_val is not None else None ) return SessionModel( session_id=hashlib.sha256(token.encode()).hexdigest(), user=data.get("sub"), expires_at=expires_at, ) def revoke_token(self, token: str) -> None: # For JWT stateless tokens we can't revoke without a store. This # is a placeholder. A real implementation would add the token jti # to a revocation list. return None # Singleton service instance for import convenience auth_service = AuthService()