248 lines
8.7 KiB
Python
248 lines
8.7 KiB
Python
"""
|
|
Application Flow Middleware for FastAPI.
|
|
|
|
This middleware enforces the application flow priorities:
|
|
1. Setup page (if setup is not complete)
|
|
2. Authentication page (if user is not authenticated)
|
|
3. Main application (for authenticated users with completed setup)
|
|
|
|
The middleware redirects users to the appropriate page based on their current state
|
|
and the state of the application setup.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from fastapi import Request
|
|
from fastapi.responses import RedirectResponse
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
|
|
# Import the setup service
|
|
try:
|
|
from ...core.application.services.setup_service import SetupService
|
|
except ImportError:
|
|
# Handle case where service is not available
|
|
class SetupService:
|
|
def is_setup_complete(self):
|
|
return True
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ApplicationFlowMiddleware(BaseHTTPMiddleware):
|
|
"""
|
|
Middleware to enforce application flow: setup → auth → main application.
|
|
|
|
This middleware:
|
|
1. Checks if setup is complete
|
|
2. Validates authentication status
|
|
3. Redirects to appropriate page based on state
|
|
4. Allows API endpoints and static files to pass through
|
|
"""
|
|
|
|
def __init__(self, app, setup_service: Optional[SetupService] = None):
|
|
"""
|
|
Initialize the application flow middleware.
|
|
|
|
Args:
|
|
app: FastAPI application instance
|
|
setup_service: Setup service instance (optional, will create if not provided)
|
|
"""
|
|
super().__init__(app)
|
|
self.setup_service = setup_service or SetupService()
|
|
|
|
# Define paths that should bypass flow enforcement
|
|
self.bypass_paths = {
|
|
"/static", # Static files
|
|
"/favicon.ico", # Browser favicon requests
|
|
"/robots.txt", # Robots.txt
|
|
"/health", # Health check endpoints
|
|
"/docs", # OpenAPI documentation
|
|
"/redoc", # ReDoc documentation
|
|
"/openapi.json" # OpenAPI spec
|
|
}
|
|
|
|
# API paths that should bypass flow but may require auth
|
|
self.api_paths = {
|
|
"/api",
|
|
"/auth"
|
|
}
|
|
|
|
# Pages that are part of the flow and should be accessible
|
|
self.flow_pages = {
|
|
"/setup",
|
|
"/login",
|
|
"/app"
|
|
}
|
|
|
|
async def dispatch(self, request: Request, call_next):
|
|
"""
|
|
Process the request and enforce application flow.
|
|
|
|
Args:
|
|
request: Incoming HTTP request
|
|
call_next: Next middleware/handler in chain
|
|
|
|
Returns:
|
|
Response: Either a redirect response or the result of call_next
|
|
"""
|
|
try:
|
|
# Get the request path
|
|
path = request.url.path
|
|
|
|
# Skip flow enforcement for certain paths
|
|
if self._should_bypass_flow(path):
|
|
return await call_next(request)
|
|
|
|
# Check application setup status
|
|
setup_complete = self.setup_service.is_setup_complete()
|
|
|
|
# Check authentication status
|
|
is_authenticated = await self._is_user_authenticated(request)
|
|
|
|
# Determine the appropriate action
|
|
redirect_response = self._determine_redirect(path, setup_complete, is_authenticated)
|
|
|
|
if redirect_response:
|
|
logger.info(f"Redirecting {path} to {redirect_response.headers.get('location')}")
|
|
return redirect_response
|
|
|
|
# Continue with the request
|
|
return await call_next(request)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in ApplicationFlowMiddleware: {e}", exc_info=True)
|
|
# In case of error, allow the request to continue
|
|
return await call_next(request)
|
|
|
|
def _should_bypass_flow(self, path: str) -> bool:
|
|
"""
|
|
Check if the given path should bypass flow enforcement.
|
|
|
|
Args:
|
|
path: Request path
|
|
|
|
Returns:
|
|
bool: True if path should bypass flow enforcement
|
|
"""
|
|
# Check exact bypass paths
|
|
for bypass_path in self.bypass_paths:
|
|
if path.startswith(bypass_path):
|
|
return True
|
|
|
|
# API paths bypass flow enforcement (but may have their own auth)
|
|
for api_path in self.api_paths:
|
|
if path.startswith(api_path):
|
|
return True
|
|
|
|
return False
|
|
|
|
async def _is_user_authenticated(self, request: Request) -> bool:
|
|
"""
|
|
Check if the user is authenticated by validating JWT token.
|
|
|
|
Args:
|
|
request: HTTP request object
|
|
|
|
Returns:
|
|
bool: True if user is authenticated, False otherwise
|
|
"""
|
|
try:
|
|
# Check for Authorization header
|
|
auth_header = request.headers.get("authorization")
|
|
if not auth_header or not auth_header.startswith("Bearer "):
|
|
return False
|
|
|
|
# Extract and validate token
|
|
token = auth_header.split(" ")[1]
|
|
|
|
# Import JWT validation function (avoid circular imports)
|
|
try:
|
|
from ..fastapi_app import verify_jwt_token
|
|
payload = verify_jwt_token(token)
|
|
return payload is not None
|
|
except ImportError:
|
|
# Fallback if import fails
|
|
logger.warning("Could not import JWT verification function")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking authentication: {e}")
|
|
return False
|
|
|
|
def _determine_redirect(self, path: str, setup_complete: bool, is_authenticated: bool) -> Optional[RedirectResponse]:
|
|
"""
|
|
Determine if a redirect is needed based on current state.
|
|
|
|
Args:
|
|
path: Current request path
|
|
setup_complete: Whether application setup is complete
|
|
is_authenticated: Whether user is authenticated
|
|
|
|
Returns:
|
|
Optional[RedirectResponse]: Redirect response if needed, None otherwise
|
|
"""
|
|
# If setup is not complete
|
|
if not setup_complete:
|
|
# Allow access to setup page
|
|
if path == "/setup":
|
|
return None
|
|
# Redirect everything else to setup
|
|
return RedirectResponse(url="/setup", status_code=302)
|
|
|
|
# Setup is complete, check authentication
|
|
if not is_authenticated:
|
|
# Allow access to login page
|
|
if path == "/login":
|
|
return None
|
|
# Redirect unauthenticated users to login (except for specific pages)
|
|
if path in self.flow_pages or path == "/":
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
# User is authenticated and setup is complete
|
|
else:
|
|
# Redirect from setup/login pages to main app
|
|
if path in ["/setup", "/login", "/"]:
|
|
return RedirectResponse(url="/app", status_code=302)
|
|
|
|
# No redirect needed
|
|
return None
|
|
|
|
def get_flow_status(self, request: Request) -> dict:
|
|
"""
|
|
Get current flow status for debugging/monitoring.
|
|
|
|
Args:
|
|
request: HTTP request object
|
|
|
|
Returns:
|
|
dict: Current flow status information
|
|
"""
|
|
try:
|
|
setup_complete = self.setup_service.is_setup_complete()
|
|
is_authenticated = self._is_user_authenticated(request)
|
|
|
|
return {
|
|
"setup_complete": setup_complete,
|
|
"authenticated": is_authenticated,
|
|
"path": request.url.path,
|
|
"should_bypass": self._should_bypass_flow(request.url.path)
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"error": str(e),
|
|
"path": request.url.path
|
|
}
|
|
|
|
|
|
def create_application_flow_middleware(setup_service: Optional[SetupService] = None) -> ApplicationFlowMiddleware:
|
|
"""
|
|
Factory function to create application flow middleware.
|
|
|
|
Args:
|
|
setup_service: Setup service instance (optional)
|
|
|
|
Returns:
|
|
ApplicationFlowMiddleware: Configured middleware instance
|
|
"""
|
|
return ApplicationFlowMiddleware(app=None, setup_service=setup_service) |