Implement application setup and flow middleware
- Add SetupService for detecting application setup completion - Create ApplicationFlowMiddleware to enforce setup auth main flow - Add setup processing endpoints (/api/auth/setup, /api/auth/setup/status) - Add Pydantic models for setup requests and responses - Integrate middleware into FastAPI application - Fix logging paths to use ./logs consistently - All existing templates (setup.html, login.html) already working
This commit is contained in:
268
src/server/services/setup_service.py
Normal file
268
src/server/services/setup_service.py
Normal file
@@ -0,0 +1,268 @@
|
||||
"""
|
||||
Setup service for detecting and managing application setup state.
|
||||
|
||||
This service determines if the application is properly configured and set up,
|
||||
following the application flow pattern: setup → auth → main application.
|
||||
"""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, Optional, List
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SetupService:
|
||||
"""Service for managing application setup detection and configuration."""
|
||||
|
||||
def __init__(self, config_path: str = "data/config.json", db_path: str = "data/aniworld.db"):
|
||||
"""Initialize the setup service with configuration and database paths."""
|
||||
self.config_path = Path(config_path)
|
||||
self.db_path = Path(db_path)
|
||||
self._config_cache: Optional[Dict[str, Any]] = None
|
||||
|
||||
def is_setup_complete(self) -> bool:
|
||||
"""
|
||||
Check if the application setup is complete.
|
||||
|
||||
Setup is considered complete if:
|
||||
1. Configuration file exists and is valid
|
||||
2. Database exists and is accessible
|
||||
3. Master password is configured
|
||||
4. Setup completion flag is set (if present)
|
||||
|
||||
Returns:
|
||||
bool: True if setup is complete, False otherwise
|
||||
"""
|
||||
try:
|
||||
# Check if configuration file exists and is valid
|
||||
if not self._is_config_valid():
|
||||
logger.info("Setup incomplete: Configuration file is missing or invalid")
|
||||
return False
|
||||
|
||||
# Check if database exists and is accessible
|
||||
if not self._is_database_accessible():
|
||||
logger.info("Setup incomplete: Database is not accessible")
|
||||
return False
|
||||
|
||||
# Check if master password is configured
|
||||
if not self._is_master_password_configured():
|
||||
logger.info("Setup incomplete: Master password is not configured")
|
||||
return False
|
||||
|
||||
# Check for explicit setup completion flag
|
||||
config = self.get_config()
|
||||
if config and config.get("setup", {}).get("completed") is False:
|
||||
logger.info("Setup incomplete: Setup completion flag is False")
|
||||
return False
|
||||
|
||||
logger.debug("Setup validation complete: All checks passed")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking setup completion: {e}")
|
||||
return False
|
||||
|
||||
def _is_config_valid(self) -> bool:
|
||||
"""Check if the configuration file exists and contains valid JSON."""
|
||||
try:
|
||||
if not self.config_path.exists():
|
||||
return False
|
||||
|
||||
config = self.get_config()
|
||||
return config is not None and isinstance(config, dict)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Configuration validation error: {e}")
|
||||
return False
|
||||
|
||||
def _is_database_accessible(self) -> bool:
|
||||
"""Check if the database exists and is accessible."""
|
||||
try:
|
||||
if not self.db_path.exists():
|
||||
return False
|
||||
|
||||
# Try to connect and perform a simple query
|
||||
with sqlite3.connect(str(self.db_path)) as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' LIMIT 1")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Database accessibility check failed: {e}")
|
||||
return False
|
||||
|
||||
def _is_master_password_configured(self) -> bool:
|
||||
"""Check if master password is properly configured."""
|
||||
try:
|
||||
config = self.get_config()
|
||||
if not config:
|
||||
return False
|
||||
|
||||
security_config = config.get("security", {})
|
||||
|
||||
# Check if password hash exists
|
||||
password_hash = security_config.get("master_password_hash")
|
||||
salt = security_config.get("salt")
|
||||
|
||||
return bool(password_hash and salt and len(password_hash) > 0 and len(salt) > 0)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Master password configuration check failed: {e}")
|
||||
return False
|
||||
|
||||
def get_config(self, force_reload: bool = False) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Get the configuration data from the config file.
|
||||
|
||||
Args:
|
||||
force_reload: If True, reload config from file even if cached
|
||||
|
||||
Returns:
|
||||
dict: Configuration data or None if not accessible
|
||||
"""
|
||||
try:
|
||||
if self._config_cache is None or force_reload:
|
||||
if not self.config_path.exists():
|
||||
return None
|
||||
|
||||
with open(self.config_path, 'r', encoding='utf-8') as f:
|
||||
self._config_cache = json.load(f)
|
||||
|
||||
return self._config_cache
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading configuration: {e}")
|
||||
return None
|
||||
|
||||
def mark_setup_complete(self, config_updates: Optional[Dict[str, Any]] = None) -> bool:
|
||||
"""
|
||||
Mark the setup as completed and optionally update configuration.
|
||||
|
||||
Args:
|
||||
config_updates: Additional configuration updates to apply
|
||||
|
||||
Returns:
|
||||
bool: True if successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
config = self.get_config() or {}
|
||||
|
||||
# Update configuration with any provided updates
|
||||
if config_updates:
|
||||
config.update(config_updates)
|
||||
|
||||
# Set setup completion flag
|
||||
if "setup" not in config:
|
||||
config["setup"] = {}
|
||||
config["setup"]["completed"] = True
|
||||
config["setup"]["completed_at"] = str(datetime.utcnow())
|
||||
|
||||
# Save updated configuration
|
||||
return self._save_config(config)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error marking setup as complete: {e}")
|
||||
return False
|
||||
|
||||
def reset_setup(self) -> bool:
|
||||
"""
|
||||
Reset the setup completion status (for development/testing).
|
||||
|
||||
Returns:
|
||||
bool: True if successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
config = self.get_config()
|
||||
if not config:
|
||||
return False
|
||||
|
||||
# Remove or set setup completion flag to false
|
||||
if "setup" in config:
|
||||
config["setup"]["completed"] = False
|
||||
|
||||
return self._save_config(config)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error resetting setup: {e}")
|
||||
return False
|
||||
|
||||
def _save_config(self, config: Dict[str, Any]) -> bool:
|
||||
"""Save configuration to file."""
|
||||
try:
|
||||
# Ensure directory exists
|
||||
self.config_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Save configuration
|
||||
with open(self.config_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(config, f, indent=4, ensure_ascii=False)
|
||||
|
||||
# Clear cache to force reload on next access
|
||||
self._config_cache = None
|
||||
|
||||
logger.info(f"Configuration saved to {self.config_path}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving configuration: {e}")
|
||||
return False
|
||||
|
||||
def get_setup_requirements(self) -> Dict[str, bool]:
|
||||
"""
|
||||
Get detailed breakdown of setup requirements and their status.
|
||||
|
||||
Returns:
|
||||
dict: Dictionary with requirement names and their completion status
|
||||
"""
|
||||
config = self.get_config()
|
||||
return {
|
||||
"config_file_exists": self.config_path.exists(),
|
||||
"config_file_valid": self._is_config_valid(),
|
||||
"database_exists": self.db_path.exists(),
|
||||
"database_accessible": self._is_database_accessible(),
|
||||
"master_password_configured": self._is_master_password_configured(),
|
||||
"setup_marked_complete": bool(config and config.get("setup", {}).get("completed", True))
|
||||
}
|
||||
|
||||
def get_missing_requirements(self) -> List[str]:
|
||||
"""
|
||||
Get list of missing setup requirements.
|
||||
|
||||
Returns:
|
||||
list: List of missing requirement descriptions
|
||||
"""
|
||||
requirements = self.get_setup_requirements()
|
||||
missing = []
|
||||
|
||||
if not requirements["config_file_exists"]:
|
||||
missing.append("Configuration file is missing")
|
||||
elif not requirements["config_file_valid"]:
|
||||
missing.append("Configuration file is invalid or corrupted")
|
||||
|
||||
if not requirements["database_exists"]:
|
||||
missing.append("Database file is missing")
|
||||
elif not requirements["database_accessible"]:
|
||||
missing.append("Database is not accessible or corrupted")
|
||||
|
||||
if not requirements["master_password_configured"]:
|
||||
missing.append("Master password is not configured")
|
||||
|
||||
if not requirements["setup_marked_complete"]:
|
||||
missing.append("Setup process was not completed")
|
||||
|
||||
return missing
|
||||
|
||||
|
||||
# Convenience functions for easy import
|
||||
def is_setup_complete() -> bool:
|
||||
"""Convenience function to check if setup is complete."""
|
||||
service = SetupService()
|
||||
return service.is_setup_complete()
|
||||
|
||||
|
||||
def get_setup_service() -> SetupService:
|
||||
"""Get a configured setup service instance."""
|
||||
return SetupService()
|
||||
Reference in New Issue
Block a user