Aniworld/src/server/web/middleware/auth_middleware.py

178 lines
5.6 KiB
Python

"""
Authentication middleware for consistent auth handling across controllers.
This module provides middleware for handling authentication logic
that was previously duplicated across multiple controller files.
"""
from flask import Request, session, request, jsonify, g
from typing import Callable, Optional, Dict, Any
import logging
import functools
async def auth_middleware(request: Request, call_next: Callable):
"""
Authentication middleware to avoid duplicate auth logic.
This middleware handles authentication for protected routes,
setting user context and handling auth failures consistently.
Args:
request: Flask request object
call_next: Next function in the middleware chain
Returns:
Response from next middleware or auth error
"""
try:
# Check for authentication token in various locations
auth_token = None
# Check Authorization header
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
auth_token = auth_header[7:] # Remove 'Bearer ' prefix
# Check session for web-based auth
elif 'user_id' in session:
auth_token = session.get('auth_token')
# Check API key in query params or headers
elif request.args.get('api_key'):
auth_token = request.args.get('api_key')
elif request.headers.get('X-API-Key'):
auth_token = request.headers.get('X-API-Key')
if auth_token:
# Validate the token and set user context
user_info = await validate_auth_token(auth_token)
if user_info:
g.current_user = user_info
g.is_authenticated = True
else:
g.current_user = None
g.is_authenticated = False
else:
g.current_user = None
g.is_authenticated = False
# Continue to next middleware/handler
response = await call_next(request)
return response
except Exception as e:
logging.getLogger(__name__).error(f"Auth middleware error: {str(e)}")
return jsonify({
'status': 'error',
'message': 'Authentication error',
'error_code': 500
}), 500
async def validate_auth_token(token: str) -> Optional[Dict[str, Any]]:
"""
Validate authentication token and return user information.
Args:
token: Authentication token to validate
Returns:
User information dictionary if valid, None otherwise
"""
try:
# This would integrate with your actual authentication system
# For now, this is a placeholder implementation
# Example implementation:
# 1. Decode JWT token or lookup API key in database
# 2. Verify token is not expired
# 3. Get user information
# 4. Return user context
# Placeholder - replace with actual implementation
if token and len(token) > 10: # Basic validation
return {
'user_id': 'placeholder_user',
'username': 'placeholder',
'roles': ['user'],
'permissions': ['read']
}
return None
except Exception as e:
logging.getLogger(__name__).error(f"Token validation error: {str(e)}")
return None
def require_auth_middleware(f: Callable) -> Callable:
"""
Decorator to require authentication, using middleware context.
This decorator checks if the user was authenticated by the auth middleware.
"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
if not hasattr(g, 'is_authenticated') or not g.is_authenticated:
return jsonify({
'status': 'error',
'message': 'Authentication required',
'error_code': 401
}), 401
return f(*args, **kwargs)
return decorated_function
def require_role_middleware(required_role: str) -> Callable:
"""
Decorator to require specific role, using middleware context.
Args:
required_role: Role required to access the endpoint
Returns:
Decorator function
"""
def decorator(f: Callable) -> Callable:
@functools.wraps(f)
def decorated_function(*args, **kwargs):
if not hasattr(g, 'is_authenticated') or not g.is_authenticated:
return jsonify({
'status': 'error',
'message': 'Authentication required',
'error_code': 401
}), 401
user = getattr(g, 'current_user', {})
user_roles = user.get('roles', [])
if required_role not in user_roles:
return jsonify({
'status': 'error',
'message': f'Role {required_role} required',
'error_code': 403
}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
def optional_auth_middleware(f: Callable) -> Callable:
"""
Decorator for optional authentication using middleware context.
This allows endpoints to work with or without authentication,
providing additional functionality when authenticated.
"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
# User context is already set by auth middleware
# No validation required, just proceed
return f(*args, **kwargs)
return decorated_function