""" Simple Master Password Authentication Controller for AniWorld. This module implements a simple authentication system using: - Single master password (no user registration) - JWT tokens for session management - Environment-based configuration - No email system required """ import os import hashlib import jwt from datetime import datetime, timedelta from flask import Blueprint, request, jsonify from functools import wraps import logging from typing import Dict, Any, Optional, Tuple # Configure logging logger = logging.getLogger(__name__) # Create blueprint simple_auth_bp = Blueprint('simple_auth', __name__) # Configuration from environment JWT_SECRET_KEY = os.getenv('JWT_SECRET_KEY', 'default_jwt_secret') PASSWORD_SALT = os.getenv('PASSWORD_SALT', 'default_salt') MASTER_PASSWORD_HASH = os.getenv('MASTER_PASSWORD_HASH') TOKEN_EXPIRY_HOURS = int(os.getenv('SESSION_TIMEOUT_HOURS', '24')) def hash_password(password: str) -> str: """Hash password with salt using SHA-256.""" salted_password = password + PASSWORD_SALT return hashlib.sha256(salted_password.encode()).hexdigest() def verify_master_password(password: str) -> bool: """Verify password against master password hash.""" if not MASTER_PASSWORD_HASH: # If no hash is set, check against environment variable (development only) dev_password = os.getenv('MASTER_PASSWORD') if dev_password: return password == dev_password return False password_hash = hash_password(password) return password_hash == MASTER_PASSWORD_HASH def generate_jwt_token() -> str: """Generate JWT token for authentication.""" payload = { 'user': 'master', 'exp': datetime.utcnow() + timedelta(hours=TOKEN_EXPIRY_HOURS), 'iat': datetime.utcnow(), 'iss': 'aniworld-server' } return jwt.encode(payload, JWT_SECRET_KEY, algorithm='HS256') def verify_jwt_token(token: str) -> Optional[Dict[str, Any]]: """Verify and decode JWT token.""" try: payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=['HS256']) return payload except jwt.ExpiredSignatureError: logger.warning("Token has expired") return None except jwt.InvalidTokenError as e: logger.warning(f"Invalid token: {str(e)}") return None def require_auth(f): """Decorator to require authentication for API endpoints.""" @wraps(f) def decorated_function(*args, **kwargs): auth_header = request.headers.get('Authorization') if not auth_header: return jsonify({ 'success': False, 'error': 'Authorization header required', 'code': 'AUTH_REQUIRED' }), 401 try: # Expected format: "Bearer " token = auth_header.split(' ')[1] except IndexError: return jsonify({ 'success': False, 'error': 'Invalid authorization header format', 'code': 'INVALID_AUTH_FORMAT' }), 401 payload = verify_jwt_token(token) if not payload: return jsonify({ 'success': False, 'error': 'Invalid or expired token', 'code': 'INVALID_TOKEN' }), 401 # Add user info to request context request.current_user = payload return f(*args, **kwargs) return decorated_function # Auth endpoints @simple_auth_bp.route('/auth/login', methods=['POST']) def login() -> Tuple[Any, int]: """ Authenticate with master password and receive JWT token. Request Body: { "password": "master_password" } Response: { "success": true, "message": "Login successful", "data": { "token": "jwt_token_here", "expires_at": "2025-01-01T00:00:00Z", "user": "master" } } """ try: data = request.get_json() if not data: return jsonify({ 'success': False, 'error': 'JSON body required', 'code': 'MISSING_JSON' }), 400 password = data.get('password') if not password: return jsonify({ 'success': False, 'error': 'Password required', 'code': 'MISSING_PASSWORD' }), 400 # Verify master password if not verify_master_password(password): logger.warning(f"Failed login attempt from IP: {request.remote_addr}") return jsonify({ 'success': False, 'error': 'Invalid master password', 'code': 'INVALID_CREDENTIALS' }), 401 # Generate JWT token token = generate_jwt_token() expires_at = datetime.utcnow() + timedelta(hours=TOKEN_EXPIRY_HOURS) logger.info(f"Successful login from IP: {request.remote_addr}") return jsonify({ 'success': True, 'message': 'Login successful', 'data': { 'token': token, 'expires_at': expires_at.isoformat() + 'Z', 'user': 'master', 'token_type': 'Bearer' } }), 200 except Exception as e: logger.error(f"Login error: {str(e)}") return jsonify({ 'success': False, 'error': 'Internal server error', 'code': 'SERVER_ERROR' }), 500 @simple_auth_bp.route('/auth/verify', methods=['GET']) @require_auth def verify_token() -> Tuple[Any, int]: """ Verify if the current JWT token is valid. Headers: Authorization: Bearer Response: { "success": true, "message": "Token is valid", "data": { "user": "master", "expires_at": "2025-01-01T00:00:00Z", "issued_at": "2025-01-01T00:00:00Z" } } """ try: payload = request.current_user return jsonify({ 'success': True, 'message': 'Token is valid', 'data': { 'user': payload.get('user'), 'expires_at': datetime.utcfromtimestamp(payload.get('exp')).isoformat() + 'Z', 'issued_at': datetime.utcfromtimestamp(payload.get('iat')).isoformat() + 'Z', 'issuer': payload.get('iss') } }), 200 except Exception as e: logger.error(f"Token verification error: {str(e)}") return jsonify({ 'success': False, 'error': 'Internal server error', 'code': 'SERVER_ERROR' }), 500 @simple_auth_bp.route('/auth/logout', methods=['POST']) @require_auth def logout() -> Tuple[Any, int]: """ Logout (client-side token clearing). Since JWT tokens are stateless, logout is handled client-side by removing the token. This endpoint confirms logout action. Headers: Authorization: Bearer Response: { "success": true, "message": "Logout successful" } """ try: logger.info(f"User logged out from IP: {request.remote_addr}") return jsonify({ 'success': True, 'message': 'Logout successful. Please remove the token on client side.', 'data': { 'action': 'clear_token' } }), 200 except Exception as e: logger.error(f"Logout error: {str(e)}") return jsonify({ 'success': False, 'error': 'Internal server error', 'code': 'SERVER_ERROR' }), 500 @simple_auth_bp.route('/auth/status', methods=['GET']) def auth_status() -> Tuple[Any, int]: """ Check authentication system status. Response: { "success": true, "message": "Authentication system status", "data": { "auth_type": "master_password", "jwt_enabled": true, "password_configured": true } } """ try: password_configured = bool(MASTER_PASSWORD_HASH or os.getenv('MASTER_PASSWORD')) return jsonify({ 'success': True, 'message': 'Authentication system status', 'data': { 'auth_type': 'master_password', 'jwt_enabled': True, 'password_configured': password_configured, 'token_expiry_hours': TOKEN_EXPIRY_HOURS } }), 200 except Exception as e: logger.error(f"Auth status error: {str(e)}") return jsonify({ 'success': False, 'error': 'Internal server error', 'code': 'SERVER_ERROR' }), 500 # Utility function to set master password hash def set_master_password(password: str) -> str: """ Generate hash for master password. This should be used to set MASTER_PASSWORD_HASH in environment. Args: password: The master password to hash Returns: The hashed password that should be stored in environment """ return hash_password(password) # Health check endpoint @simple_auth_bp.route('/auth/health', methods=['GET']) def health_check() -> Tuple[Any, int]: """Health check for auth system.""" return jsonify({ 'success': True, 'message': 'Auth system is healthy', 'timestamp': datetime.utcnow().isoformat() + 'Z' }), 200