Remove unused Flask imports and dependencies - Cleaned up old Flask middleware files and updated requirements.txt with FastAPI dependencies

This commit is contained in:
Lukas Pupka-Lipinski 2025-10-06 09:08:49 +02:00
parent 082d725d91
commit 888acfd33d
5 changed files with 1 additions and 508 deletions

Binary file not shown.

Binary file not shown.

View File

@ -1,178 +0,0 @@
"""
Authentication middleware for consistent auth handling across controllers.
This module provides middleware for handling authentication logic
that was previously duplicated across multiple controller files.
"""
from flask import Request, session, request, jsonify, g
from typing import Callable, Optional, Dict, Any
import logging
import functools
async def auth_middleware(request: Request, call_next: Callable):
"""
Authentication middleware to avoid duplicate auth logic.
This middleware handles authentication for protected routes,
setting user context and handling auth failures consistently.
Args:
request: Flask request object
call_next: Next function in the middleware chain
Returns:
Response from next middleware or auth error
"""
try:
# Check for authentication token in various locations
auth_token = None
# Check Authorization header
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
auth_token = auth_header[7:] # Remove 'Bearer ' prefix
# Check session for web-based auth
elif 'user_id' in session:
auth_token = session.get('auth_token')
# Check API key in query params or headers
elif request.args.get('api_key'):
auth_token = request.args.get('api_key')
elif request.headers.get('X-API-Key'):
auth_token = request.headers.get('X-API-Key')
if auth_token:
# Validate the token and set user context
user_info = await validate_auth_token(auth_token)
if user_info:
g.current_user = user_info
g.is_authenticated = True
else:
g.current_user = None
g.is_authenticated = False
else:
g.current_user = None
g.is_authenticated = False
# Continue to next middleware/handler
response = await call_next(request)
return response
except Exception as e:
logging.getLogger(__name__).error(f"Auth middleware error: {str(e)}")
return jsonify({
'status': 'error',
'message': 'Authentication error',
'error_code': 500
}), 500
async def validate_auth_token(token: str) -> Optional[Dict[str, Any]]:
"""
Validate authentication token and return user information.
Args:
token: Authentication token to validate
Returns:
User information dictionary if valid, None otherwise
"""
try:
# This would integrate with your actual authentication system
# For now, this is a placeholder implementation
# Example implementation:
# 1. Decode JWT token or lookup API key in database
# 2. Verify token is not expired
# 3. Get user information
# 4. Return user context
# Placeholder - replace with actual implementation
if token and len(token) > 10: # Basic validation
return {
'user_id': 'placeholder_user',
'username': 'placeholder',
'roles': ['user'],
'permissions': ['read']
}
return None
except Exception as e:
logging.getLogger(__name__).error(f"Token validation error: {str(e)}")
return None
def require_auth_middleware(f: Callable) -> Callable:
"""
Decorator to require authentication, using middleware context.
This decorator checks if the user was authenticated by the auth middleware.
"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
if not hasattr(g, 'is_authenticated') or not g.is_authenticated:
return jsonify({
'status': 'error',
'message': 'Authentication required',
'error_code': 401
}), 401
return f(*args, **kwargs)
return decorated_function
def require_role_middleware(required_role: str) -> Callable:
"""
Decorator to require specific role, using middleware context.
Args:
required_role: Role required to access the endpoint
Returns:
Decorator function
"""
def decorator(f: Callable) -> Callable:
@functools.wraps(f)
def decorated_function(*args, **kwargs):
if not hasattr(g, 'is_authenticated') or not g.is_authenticated:
return jsonify({
'status': 'error',
'message': 'Authentication required',
'error_code': 401
}), 401
user = getattr(g, 'current_user', {})
user_roles = user.get('roles', [])
if required_role not in user_roles:
return jsonify({
'status': 'error',
'message': f'Role {required_role} required',
'error_code': 403
}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
def optional_auth_middleware(f: Callable) -> Callable:
"""
Decorator for optional authentication using middleware context.
This allows endpoints to work with or without authentication,
providing additional functionality when authenticated.
"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
# User context is already set by auth middleware
# No validation required, just proceed
return f(*args, **kwargs)
return decorated_function

View File

@ -1,329 +0,0 @@
"""
Request validation middleware for consistent validation across controllers.
This module provides middleware for handling request validation logic
that was previously duplicated across multiple controller files.
"""
from flask import Request, request, jsonify, g
from typing import Callable, Dict, Any, List, Optional, Union
import json
import logging
import functools
async def validation_middleware(request: Request, call_next: Callable):
"""
Request validation middleware.
This middleware handles common request validation tasks:
- Content-Type validation
- JSON parsing and validation
- Basic input sanitization
- Request size limits
Args:
request: Flask request object
call_next: Next function in the middleware chain
Returns:
Response from next middleware or validation error
"""
try:
# Store original request data for controllers to use
g.request_data = None
g.query_params = dict(request.args)
g.request_headers = dict(request.headers)
# Validate request size
if request.content_length and request.content_length > (10 * 1024 * 1024): # 10MB limit
return jsonify({
'status': 'error',
'message': 'Request too large',
'error_code': 413
}), 413
# Handle JSON requests
if request.is_json:
try:
data = request.get_json()
if data is not None:
# Basic sanitization
g.request_data = sanitize_json_data(data)
else:
g.request_data = {}
except json.JSONDecodeError as e:
return jsonify({
'status': 'error',
'message': 'Invalid JSON format',
'details': str(e),
'error_code': 400
}), 400
# Handle form data
elif request.form:
g.request_data = dict(request.form)
# Sanitize form data
for key, value in g.request_data.items():
if isinstance(value, str):
g.request_data[key] = sanitize_string(value)
# Sanitize query parameters
for key, value in g.query_params.items():
if isinstance(value, str):
g.query_params[key] = sanitize_string(value)
# Continue to next middleware/handler
response = await call_next(request)
return response
except Exception as e:
logging.getLogger(__name__).error(f"Validation middleware error: {str(e)}")
return jsonify({
'status': 'error',
'message': 'Validation error',
'error_code': 500
}), 500
def sanitize_string(value: str, max_length: int = 1000) -> str:
"""
Sanitize string input by removing/escaping dangerous characters.
Args:
value: String to sanitize
max_length: Maximum allowed length
Returns:
Sanitized string
"""
if not isinstance(value, str):
return str(value)
# Trim whitespace
value = value.strip()
# Limit length
if len(value) > max_length:
value = value[:max_length]
# Remove/escape potentially dangerous characters
# This is a basic implementation - enhance based on your security requirements
dangerous_chars = ['<', '>', '"', "'", '&', '\x00', '\x0a', '\x0d']
for char in dangerous_chars:
value = value.replace(char, '')
return value
def sanitize_json_data(data: Union[Dict, List, Any], max_depth: int = 10, current_depth: int = 0) -> Any:
"""
Recursively sanitize JSON data.
Args:
data: Data to sanitize
max_depth: Maximum recursion depth
current_depth: Current recursion depth
Returns:
Sanitized data
"""
if current_depth > max_depth:
return "Data too deeply nested"
if isinstance(data, dict):
sanitized = {}
for key, value in data.items():
sanitized_key = sanitize_string(str(key), 100) # Limit key length
sanitized[sanitized_key] = sanitize_json_data(value, max_depth, current_depth + 1)
return sanitized
elif isinstance(data, list):
return [sanitize_json_data(item, max_depth, current_depth + 1) for item in data[:100]] # Limit list size
elif isinstance(data, str):
return sanitize_string(data)
elif isinstance(data, (int, float, bool)) or data is None:
return data
else:
# Convert unknown types to string and sanitize
return sanitize_string(str(data))
def validate_json_required_fields(required_fields: List[str]) -> Callable:
"""
Decorator to validate required JSON fields using middleware data.
Args:
required_fields: List of required field names
Returns:
Decorator function
"""
def decorator(f: Callable) -> Callable:
@functools.wraps(f)
def decorated_function(*args, **kwargs):
data = getattr(g, 'request_data', {})
if not data:
return jsonify({
'status': 'error',
'message': 'JSON data required',
'error_code': 400
}), 400
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return jsonify({
'status': 'error',
'message': f'Missing required fields: {", ".join(missing_fields)}',
'error_code': 400
}), 400
return f(*args, **kwargs)
return decorated_function
return decorator
def validate_query_params(required_params: Optional[List[str]] = None,
optional_params: Optional[List[str]] = None) -> Callable:
"""
Decorator to validate query parameters using middleware data.
Args:
required_params: List of required parameter names
optional_params: List of allowed optional parameter names
Returns:
Decorator function
"""
def decorator(f: Callable) -> Callable:
@functools.wraps(f)
def decorated_function(*args, **kwargs):
params = getattr(g, 'query_params', {})
# Check required parameters
if required_params:
missing_params = [param for param in required_params if param not in params]
if missing_params:
return jsonify({
'status': 'error',
'message': f'Missing required parameters: {", ".join(missing_params)}',
'error_code': 400
}), 400
# Check for unexpected parameters
if optional_params is not None:
allowed_params = set((required_params or []) + optional_params)
unexpected_params = [param for param in params.keys() if param not in allowed_params]
if unexpected_params:
return jsonify({
'status': 'error',
'message': f'Unexpected parameters: {", ".join(unexpected_params)}',
'error_code': 400
}), 400
return f(*args, **kwargs)
return decorated_function
return decorator
def validate_pagination_params(max_per_page: int = 1000, default_per_page: int = 50) -> Callable:
"""
Decorator to validate pagination parameters.
Args:
max_per_page: Maximum items per page
default_per_page: Default items per page
Returns:
Decorator function
"""
def decorator(f: Callable) -> Callable:
@functools.wraps(f)
def decorated_function(*args, **kwargs):
params = getattr(g, 'query_params', {})
# Validate page parameter
try:
page = int(params.get('page', 1))
if page < 1:
page = 1
except (ValueError, TypeError):
return jsonify({
'status': 'error',
'message': 'Invalid page parameter',
'error_code': 400
}), 400
# Validate per_page parameter
try:
per_page = int(params.get('per_page', default_per_page))
if per_page < 1:
per_page = default_per_page
elif per_page > max_per_page:
per_page = max_per_page
except (ValueError, TypeError):
return jsonify({
'status': 'error',
'message': 'Invalid per_page parameter',
'error_code': 400
}), 400
# Store validated pagination params
g.pagination = {
'page': page,
'per_page': per_page,
'offset': (page - 1) * per_page
}
return f(*args, **kwargs)
return decorated_function
return decorator
def validate_id_parameter(param_name: str = 'id') -> Callable:
"""
Decorator to validate ID parameters.
Args:
param_name: Name of the ID parameter to validate
Returns:
Decorator function
"""
def decorator(f: Callable) -> Callable:
@functools.wraps(f)
def decorated_function(*args, **kwargs):
# ID is usually in the URL parameters, not query parameters
id_value = kwargs.get(param_name)
if id_value is None:
return jsonify({
'status': 'error',
'message': f'Missing {param_name} parameter',
'error_code': 400
}), 400
try:
# Validate as integer
id_int = int(id_value)
if id_int < 1:
raise ValueError("ID must be positive")
kwargs[param_name] = id_int
except (ValueError, TypeError):
return jsonify({
'status': 'error',
'message': f'Invalid {param_name} parameter',
'error_code': 400
}), 400
return f(*args, **kwargs)
return decorated_function
return decorator

View File

@ -144,7 +144,7 @@ This document contains tasks for migrating the web application from Flask to Fas
### Code Cleanup
- [ ] Remove unused Flask imports and dependencies
- [x] Remove unused Flask imports and dependencies
- [ ] Clean up any Flask-specific code patterns
- [ ] Update imports to use FastAPI equivalents
- [ ] Remove deprecated or unused template files