cleanup
This commit is contained in:
274
src/infrastructure/security/config_encryption.py
Normal file
274
src/infrastructure/security/config_encryption.py
Normal file
@@ -0,0 +1,274 @@
|
||||
"""Configuration encryption utilities.
|
||||
|
||||
This module provides encryption/decryption for sensitive configuration
|
||||
values such as passwords, API keys, and tokens.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConfigEncryption:
|
||||
"""Handles encryption/decryption of sensitive configuration values."""
|
||||
|
||||
def __init__(self, key_file: Optional[Path] = None):
|
||||
"""Initialize the configuration encryption.
|
||||
|
||||
Args:
|
||||
key_file: Path to store encryption key.
|
||||
Defaults to data/encryption.key
|
||||
"""
|
||||
if key_file is None:
|
||||
project_root = Path(__file__).parent.parent.parent.parent
|
||||
key_file = project_root / "data" / "encryption.key"
|
||||
|
||||
self.key_file = Path(key_file)
|
||||
self._cipher: Optional[Fernet] = None
|
||||
self._ensure_key_exists()
|
||||
|
||||
def _ensure_key_exists(self) -> None:
|
||||
"""Ensure encryption key exists or create one."""
|
||||
if not self.key_file.exists():
|
||||
logger.info(f"Creating new encryption key at {self.key_file}")
|
||||
self._generate_new_key()
|
||||
else:
|
||||
logger.info(f"Using existing encryption key from {self.key_file}")
|
||||
|
||||
def _generate_new_key(self) -> None:
|
||||
"""Generate and store a new encryption key."""
|
||||
try:
|
||||
self.key_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Generate a secure random key
|
||||
key = Fernet.generate_key()
|
||||
|
||||
# Write key with restrictive permissions (owner read/write only)
|
||||
self.key_file.write_bytes(key)
|
||||
os.chmod(self.key_file, 0o600)
|
||||
|
||||
logger.info("Generated new encryption key")
|
||||
|
||||
except IOError as e:
|
||||
logger.error(f"Failed to generate encryption key: {e}")
|
||||
raise
|
||||
|
||||
def _load_key(self) -> bytes:
|
||||
"""Load encryption key from file.
|
||||
|
||||
Returns:
|
||||
Encryption key bytes
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If key file doesn't exist
|
||||
"""
|
||||
if not self.key_file.exists():
|
||||
raise FileNotFoundError(
|
||||
f"Encryption key not found: {self.key_file}"
|
||||
)
|
||||
|
||||
try:
|
||||
key = self.key_file.read_bytes()
|
||||
return key
|
||||
except IOError as e:
|
||||
logger.error(f"Failed to load encryption key: {e}")
|
||||
raise
|
||||
|
||||
def _get_cipher(self) -> Fernet:
|
||||
"""Get or create Fernet cipher instance.
|
||||
|
||||
Returns:
|
||||
Fernet cipher instance
|
||||
"""
|
||||
if self._cipher is None:
|
||||
key = self._load_key()
|
||||
self._cipher = Fernet(key)
|
||||
return self._cipher
|
||||
|
||||
def encrypt_value(self, value: str) -> str:
|
||||
"""Encrypt a configuration value.
|
||||
|
||||
Args:
|
||||
value: Plain text value to encrypt
|
||||
|
||||
Returns:
|
||||
Base64-encoded encrypted value
|
||||
|
||||
Raises:
|
||||
ValueError: If value is empty
|
||||
"""
|
||||
if not value:
|
||||
raise ValueError("Cannot encrypt empty value")
|
||||
|
||||
try:
|
||||
cipher = self._get_cipher()
|
||||
encrypted_bytes = cipher.encrypt(value.encode('utf-8'))
|
||||
|
||||
# Return as base64 string for easy storage
|
||||
encrypted_str = base64.b64encode(encrypted_bytes).decode('utf-8')
|
||||
|
||||
logger.debug("Encrypted configuration value")
|
||||
return encrypted_str
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to encrypt value: {e}")
|
||||
raise
|
||||
|
||||
def decrypt_value(self, encrypted_value: str) -> str:
|
||||
"""Decrypt a configuration value.
|
||||
|
||||
Args:
|
||||
encrypted_value: Base64-encoded encrypted value
|
||||
|
||||
Returns:
|
||||
Decrypted plain text value
|
||||
|
||||
Raises:
|
||||
ValueError: If encrypted value is invalid
|
||||
"""
|
||||
if not encrypted_value:
|
||||
raise ValueError("Cannot decrypt empty value")
|
||||
|
||||
try:
|
||||
cipher = self._get_cipher()
|
||||
|
||||
# Decode from base64
|
||||
encrypted_bytes = base64.b64decode(encrypted_value.encode('utf-8'))
|
||||
|
||||
# Decrypt
|
||||
decrypted_bytes = cipher.decrypt(encrypted_bytes)
|
||||
decrypted_str = decrypted_bytes.decode('utf-8')
|
||||
|
||||
logger.debug("Decrypted configuration value")
|
||||
return decrypted_str
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to decrypt value: {e}")
|
||||
raise
|
||||
|
||||
def encrypt_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Encrypt sensitive fields in configuration dictionary.
|
||||
|
||||
Args:
|
||||
config: Configuration dictionary
|
||||
|
||||
Returns:
|
||||
Dictionary with encrypted sensitive fields
|
||||
"""
|
||||
# List of sensitive field names to encrypt
|
||||
sensitive_fields = {
|
||||
'password',
|
||||
'passwd',
|
||||
'secret',
|
||||
'key',
|
||||
'token',
|
||||
'api_key',
|
||||
'apikey',
|
||||
'auth_token',
|
||||
'jwt_secret',
|
||||
'master_password',
|
||||
}
|
||||
|
||||
encrypted_config = {}
|
||||
|
||||
for key, value in config.items():
|
||||
key_lower = key.lower()
|
||||
|
||||
# Check if field name suggests sensitive data
|
||||
is_sensitive = any(
|
||||
field in key_lower for field in sensitive_fields
|
||||
)
|
||||
|
||||
if is_sensitive and isinstance(value, str) and value:
|
||||
try:
|
||||
encrypted_config[key] = {
|
||||
'encrypted': True,
|
||||
'value': self.encrypt_value(value)
|
||||
}
|
||||
logger.debug(f"Encrypted config field: {key}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to encrypt {key}: {e}")
|
||||
encrypted_config[key] = value
|
||||
else:
|
||||
encrypted_config[key] = value
|
||||
|
||||
return encrypted_config
|
||||
|
||||
def decrypt_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Decrypt sensitive fields in configuration dictionary.
|
||||
|
||||
Args:
|
||||
config: Configuration dictionary with encrypted fields
|
||||
|
||||
Returns:
|
||||
Dictionary with decrypted values
|
||||
"""
|
||||
decrypted_config = {}
|
||||
|
||||
for key, value in config.items():
|
||||
# Check if this is an encrypted field
|
||||
if (
|
||||
isinstance(value, dict) and
|
||||
value.get('encrypted') is True and
|
||||
'value' in value
|
||||
):
|
||||
try:
|
||||
decrypted_config[key] = self.decrypt_value(
|
||||
value['value']
|
||||
)
|
||||
logger.debug(f"Decrypted config field: {key}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to decrypt {key}: {e}")
|
||||
decrypted_config[key] = None
|
||||
else:
|
||||
decrypted_config[key] = value
|
||||
|
||||
return decrypted_config
|
||||
|
||||
def rotate_key(self, new_key_file: Optional[Path] = None) -> None:
|
||||
"""Rotate encryption key.
|
||||
|
||||
**Warning**: This will invalidate all previously encrypted data.
|
||||
|
||||
Args:
|
||||
new_key_file: Path for new key file (optional)
|
||||
"""
|
||||
logger.warning(
|
||||
"Rotating encryption key - all encrypted data will "
|
||||
"need re-encryption"
|
||||
)
|
||||
|
||||
# Backup old key if it exists
|
||||
if self.key_file.exists():
|
||||
backup_path = self.key_file.with_suffix('.key.bak')
|
||||
self.key_file.rename(backup_path)
|
||||
logger.info(f"Backed up old key to {backup_path}")
|
||||
|
||||
# Generate new key
|
||||
if new_key_file:
|
||||
self.key_file = new_key_file
|
||||
|
||||
self._generate_new_key()
|
||||
self._cipher = None # Reset cipher to use new key
|
||||
|
||||
|
||||
# Global instance
|
||||
_config_encryption: Optional[ConfigEncryption] = None
|
||||
|
||||
|
||||
def get_config_encryption() -> ConfigEncryption:
|
||||
"""Get the global configuration encryption instance.
|
||||
|
||||
Returns:
|
||||
ConfigEncryption instance
|
||||
"""
|
||||
global _config_encryption
|
||||
if _config_encryption is None:
|
||||
_config_encryption = ConfigEncryption()
|
||||
return _config_encryption
|
||||
Reference in New Issue
Block a user