From 888acfd33dbc60b51302f632c34207911f935862 Mon Sep 17 00:00:00 2001 From: Lukas Pupka-Lipinski Date: Mon, 6 Oct 2025 09:08:49 +0200 Subject: [PATCH] Remove unused Flask imports and dependencies - Cleaned up old Flask middleware files and updated requirements.txt with FastAPI dependencies --- requirements.txt | Bin 50 -> 256 bytes src/server/requirements.txt | Bin 1454 -> 1470 bytes src/server/web/middleware/auth_middleware.py | 178 ---------- .../web/middleware/validation_middleware.py | 329 ------------------ web_todo.md | 2 +- 5 files changed, 1 insertion(+), 508 deletions(-) delete mode 100644 src/server/web/middleware/auth_middleware.py delete mode 100644 src/server/web/middleware/validation_middleware.py diff --git a/requirements.txt b/requirements.txt index e435e2d0915c2ad431aaff98c3e5a31f436a5f55..e7707e08c4b15355bbb8a90d6f71d9da3a0eb376 100644 GIT binary patch literal 256 zcmX|+Q3}E^5Jcx&@Fbmsn2e}^*h69c47vL#(EtDd delta 9 QcmZo*GGhAwZ{h_501|lwMF0Q* diff --git a/src/server/requirements.txt b/src/server/requirements.txt index ed4a752744d72b05033c768dce87c386921f38bb..df76ec9887fd2cb1436127b8c317a25c605f8dbc 100644 GIT binary patch delta 98 zcmZ3-y^ni?0b^(yLn1>lLkW;9V8~>!1wsP`JqAM{X#%Es8Mqiq8Onfi$qe}nMGSci b(NOhy3@JceML;qZuFnXp%V2XUqa_Ofp;{4@ delta 82 zcmdnTy^ec>0i&xMLk>eCLoq`(gDnsmGw3lGfH5xv7Xw00mm!!TpCK8jDwUyx!IQxs RtlAW)5~Rayb1S1I3jlHv4kG{n diff --git a/src/server/web/middleware/auth_middleware.py b/src/server/web/middleware/auth_middleware.py deleted file mode 100644 index 103b04d..0000000 --- a/src/server/web/middleware/auth_middleware.py +++ /dev/null @@ -1,178 +0,0 @@ -""" -Authentication middleware for consistent auth handling across controllers. - -This module provides middleware for handling authentication logic -that was previously duplicated across multiple controller files. -""" - -from flask import Request, session, request, jsonify, g -from typing import Callable, Optional, Dict, Any -import logging -import functools - - -async def auth_middleware(request: Request, call_next: Callable): - """ - Authentication middleware to avoid duplicate auth logic. - - This middleware handles authentication for protected routes, - setting user context and handling auth failures consistently. - - Args: - request: Flask request object - call_next: Next function in the middleware chain - - Returns: - Response from next middleware or auth error - """ - try: - # Check for authentication token in various locations - auth_token = None - - # Check Authorization header - auth_header = request.headers.get('Authorization') - if auth_header and auth_header.startswith('Bearer '): - auth_token = auth_header[7:] # Remove 'Bearer ' prefix - - # Check session for web-based auth - elif 'user_id' in session: - auth_token = session.get('auth_token') - - # Check API key in query params or headers - elif request.args.get('api_key'): - auth_token = request.args.get('api_key') - elif request.headers.get('X-API-Key'): - auth_token = request.headers.get('X-API-Key') - - if auth_token: - # Validate the token and set user context - user_info = await validate_auth_token(auth_token) - if user_info: - g.current_user = user_info - g.is_authenticated = True - else: - g.current_user = None - g.is_authenticated = False - else: - g.current_user = None - g.is_authenticated = False - - # Continue to next middleware/handler - response = await call_next(request) - return response - - except Exception as e: - logging.getLogger(__name__).error(f"Auth middleware error: {str(e)}") - return jsonify({ - 'status': 'error', - 'message': 'Authentication error', - 'error_code': 500 - }), 500 - - -async def validate_auth_token(token: str) -> Optional[Dict[str, Any]]: - """ - Validate authentication token and return user information. - - Args: - token: Authentication token to validate - - Returns: - User information dictionary if valid, None otherwise - """ - try: - # This would integrate with your actual authentication system - # For now, this is a placeholder implementation - - # Example implementation: - # 1. Decode JWT token or lookup API key in database - # 2. Verify token is not expired - # 3. Get user information - # 4. Return user context - - # Placeholder - replace with actual implementation - if token and len(token) > 10: # Basic validation - return { - 'user_id': 'placeholder_user', - 'username': 'placeholder', - 'roles': ['user'], - 'permissions': ['read'] - } - - return None - - except Exception as e: - logging.getLogger(__name__).error(f"Token validation error: {str(e)}") - return None - - -def require_auth_middleware(f: Callable) -> Callable: - """ - Decorator to require authentication, using middleware context. - - This decorator checks if the user was authenticated by the auth middleware. - """ - @functools.wraps(f) - def decorated_function(*args, **kwargs): - if not hasattr(g, 'is_authenticated') or not g.is_authenticated: - return jsonify({ - 'status': 'error', - 'message': 'Authentication required', - 'error_code': 401 - }), 401 - - return f(*args, **kwargs) - - return decorated_function - - -def require_role_middleware(required_role: str) -> Callable: - """ - Decorator to require specific role, using middleware context. - - Args: - required_role: Role required to access the endpoint - - Returns: - Decorator function - """ - def decorator(f: Callable) -> Callable: - @functools.wraps(f) - def decorated_function(*args, **kwargs): - if not hasattr(g, 'is_authenticated') or not g.is_authenticated: - return jsonify({ - 'status': 'error', - 'message': 'Authentication required', - 'error_code': 401 - }), 401 - - user = getattr(g, 'current_user', {}) - user_roles = user.get('roles', []) - - if required_role not in user_roles: - return jsonify({ - 'status': 'error', - 'message': f'Role {required_role} required', - 'error_code': 403 - }), 403 - - return f(*args, **kwargs) - - return decorated_function - return decorator - - -def optional_auth_middleware(f: Callable) -> Callable: - """ - Decorator for optional authentication using middleware context. - - This allows endpoints to work with or without authentication, - providing additional functionality when authenticated. - """ - @functools.wraps(f) - def decorated_function(*args, **kwargs): - # User context is already set by auth middleware - # No validation required, just proceed - return f(*args, **kwargs) - - return decorated_function \ No newline at end of file diff --git a/src/server/web/middleware/validation_middleware.py b/src/server/web/middleware/validation_middleware.py deleted file mode 100644 index 04eae2d..0000000 --- a/src/server/web/middleware/validation_middleware.py +++ /dev/null @@ -1,329 +0,0 @@ -""" -Request validation middleware for consistent validation across controllers. - -This module provides middleware for handling request validation logic -that was previously duplicated across multiple controller files. -""" - -from flask import Request, request, jsonify, g -from typing import Callable, Dict, Any, List, Optional, Union -import json -import logging -import functools - - -async def validation_middleware(request: Request, call_next: Callable): - """ - Request validation middleware. - - This middleware handles common request validation tasks: - - Content-Type validation - - JSON parsing and validation - - Basic input sanitization - - Request size limits - - Args: - request: Flask request object - call_next: Next function in the middleware chain - - Returns: - Response from next middleware or validation error - """ - try: - # Store original request data for controllers to use - g.request_data = None - g.query_params = dict(request.args) - g.request_headers = dict(request.headers) - - # Validate request size - if request.content_length and request.content_length > (10 * 1024 * 1024): # 10MB limit - return jsonify({ - 'status': 'error', - 'message': 'Request too large', - 'error_code': 413 - }), 413 - - # Handle JSON requests - if request.is_json: - try: - data = request.get_json() - if data is not None: - # Basic sanitization - g.request_data = sanitize_json_data(data) - else: - g.request_data = {} - except json.JSONDecodeError as e: - return jsonify({ - 'status': 'error', - 'message': 'Invalid JSON format', - 'details': str(e), - 'error_code': 400 - }), 400 - - # Handle form data - elif request.form: - g.request_data = dict(request.form) - # Sanitize form data - for key, value in g.request_data.items(): - if isinstance(value, str): - g.request_data[key] = sanitize_string(value) - - # Sanitize query parameters - for key, value in g.query_params.items(): - if isinstance(value, str): - g.query_params[key] = sanitize_string(value) - - # Continue to next middleware/handler - response = await call_next(request) - return response - - except Exception as e: - logging.getLogger(__name__).error(f"Validation middleware error: {str(e)}") - return jsonify({ - 'status': 'error', - 'message': 'Validation error', - 'error_code': 500 - }), 500 - - -def sanitize_string(value: str, max_length: int = 1000) -> str: - """ - Sanitize string input by removing/escaping dangerous characters. - - Args: - value: String to sanitize - max_length: Maximum allowed length - - Returns: - Sanitized string - """ - if not isinstance(value, str): - return str(value) - - # Trim whitespace - value = value.strip() - - # Limit length - if len(value) > max_length: - value = value[:max_length] - - # Remove/escape potentially dangerous characters - # This is a basic implementation - enhance based on your security requirements - dangerous_chars = ['<', '>', '"', "'", '&', '\x00', '\x0a', '\x0d'] - for char in dangerous_chars: - value = value.replace(char, '') - - return value - - -def sanitize_json_data(data: Union[Dict, List, Any], max_depth: int = 10, current_depth: int = 0) -> Any: - """ - Recursively sanitize JSON data. - - Args: - data: Data to sanitize - max_depth: Maximum recursion depth - current_depth: Current recursion depth - - Returns: - Sanitized data - """ - if current_depth > max_depth: - return "Data too deeply nested" - - if isinstance(data, dict): - sanitized = {} - for key, value in data.items(): - sanitized_key = sanitize_string(str(key), 100) # Limit key length - sanitized[sanitized_key] = sanitize_json_data(value, max_depth, current_depth + 1) - return sanitized - - elif isinstance(data, list): - return [sanitize_json_data(item, max_depth, current_depth + 1) for item in data[:100]] # Limit list size - - elif isinstance(data, str): - return sanitize_string(data) - - elif isinstance(data, (int, float, bool)) or data is None: - return data - - else: - # Convert unknown types to string and sanitize - return sanitize_string(str(data)) - - -def validate_json_required_fields(required_fields: List[str]) -> Callable: - """ - Decorator to validate required JSON fields using middleware data. - - Args: - required_fields: List of required field names - - Returns: - Decorator function - """ - def decorator(f: Callable) -> Callable: - @functools.wraps(f) - def decorated_function(*args, **kwargs): - data = getattr(g, 'request_data', {}) - - if not data: - return jsonify({ - 'status': 'error', - 'message': 'JSON data required', - 'error_code': 400 - }), 400 - - missing_fields = [field for field in required_fields if field not in data] - if missing_fields: - return jsonify({ - 'status': 'error', - 'message': f'Missing required fields: {", ".join(missing_fields)}', - 'error_code': 400 - }), 400 - - return f(*args, **kwargs) - - return decorated_function - return decorator - - -def validate_query_params(required_params: Optional[List[str]] = None, - optional_params: Optional[List[str]] = None) -> Callable: - """ - Decorator to validate query parameters using middleware data. - - Args: - required_params: List of required parameter names - optional_params: List of allowed optional parameter names - - Returns: - Decorator function - """ - def decorator(f: Callable) -> Callable: - @functools.wraps(f) - def decorated_function(*args, **kwargs): - params = getattr(g, 'query_params', {}) - - # Check required parameters - if required_params: - missing_params = [param for param in required_params if param not in params] - if missing_params: - return jsonify({ - 'status': 'error', - 'message': f'Missing required parameters: {", ".join(missing_params)}', - 'error_code': 400 - }), 400 - - # Check for unexpected parameters - if optional_params is not None: - allowed_params = set((required_params or []) + optional_params) - unexpected_params = [param for param in params.keys() if param not in allowed_params] - if unexpected_params: - return jsonify({ - 'status': 'error', - 'message': f'Unexpected parameters: {", ".join(unexpected_params)}', - 'error_code': 400 - }), 400 - - return f(*args, **kwargs) - - return decorated_function - return decorator - - -def validate_pagination_params(max_per_page: int = 1000, default_per_page: int = 50) -> Callable: - """ - Decorator to validate pagination parameters. - - Args: - max_per_page: Maximum items per page - default_per_page: Default items per page - - Returns: - Decorator function - """ - def decorator(f: Callable) -> Callable: - @functools.wraps(f) - def decorated_function(*args, **kwargs): - params = getattr(g, 'query_params', {}) - - # Validate page parameter - try: - page = int(params.get('page', 1)) - if page < 1: - page = 1 - except (ValueError, TypeError): - return jsonify({ - 'status': 'error', - 'message': 'Invalid page parameter', - 'error_code': 400 - }), 400 - - # Validate per_page parameter - try: - per_page = int(params.get('per_page', default_per_page)) - if per_page < 1: - per_page = default_per_page - elif per_page > max_per_page: - per_page = max_per_page - except (ValueError, TypeError): - return jsonify({ - 'status': 'error', - 'message': 'Invalid per_page parameter', - 'error_code': 400 - }), 400 - - # Store validated pagination params - g.pagination = { - 'page': page, - 'per_page': per_page, - 'offset': (page - 1) * per_page - } - - return f(*args, **kwargs) - - return decorated_function - return decorator - - -def validate_id_parameter(param_name: str = 'id') -> Callable: - """ - Decorator to validate ID parameters. - - Args: - param_name: Name of the ID parameter to validate - - Returns: - Decorator function - """ - def decorator(f: Callable) -> Callable: - @functools.wraps(f) - def decorated_function(*args, **kwargs): - # ID is usually in the URL parameters, not query parameters - id_value = kwargs.get(param_name) - - if id_value is None: - return jsonify({ - 'status': 'error', - 'message': f'Missing {param_name} parameter', - 'error_code': 400 - }), 400 - - try: - # Validate as integer - id_int = int(id_value) - if id_int < 1: - raise ValueError("ID must be positive") - kwargs[param_name] = id_int - except (ValueError, TypeError): - return jsonify({ - 'status': 'error', - 'message': f'Invalid {param_name} parameter', - 'error_code': 400 - }), 400 - - return f(*args, **kwargs) - - return decorated_function - return decorator \ No newline at end of file diff --git a/web_todo.md b/web_todo.md index 3327d20..a399479 100644 --- a/web_todo.md +++ b/web_todo.md @@ -144,7 +144,7 @@ This document contains tasks for migrating the web application from Flask to Fas ### Code Cleanup -- [ ] Remove unused Flask imports and dependencies +- [x] Remove unused Flask imports and dependencies - [ ] Clean up any Flask-specific code patterns - [ ] Update imports to use FastAPI equivalents - [ ] Remove deprecated or unused template files