Files
Aniworld/src/infrastructure/security/config_encryption.py

275 lines
8.0 KiB
Python

"""Configuration encryption utilities.
This module provides encryption/decryption for sensitive configuration
values such as passwords, API keys, and tokens.
"""
import base64
import logging
import os
from pathlib import Path
from typing import Any, Dict, Optional
from cryptography.fernet import Fernet
logger = logging.getLogger(__name__)
class ConfigEncryption:
"""Handles encryption/decryption of sensitive configuration values."""
def __init__(self, key_file: Optional[Path] = None):
"""Initialize the configuration encryption.
Args:
key_file: Path to store encryption key.
Defaults to data/encryption.key
"""
if key_file is None:
project_root = Path(__file__).parent.parent.parent.parent
key_file = project_root / "data" / "encryption.key"
self.key_file = Path(key_file)
self._cipher: Optional[Fernet] = None
self._ensure_key_exists()
def _ensure_key_exists(self) -> None:
"""Ensure encryption key exists or create one."""
if not self.key_file.exists():
logger.info("Creating new encryption key at %s", self.key_file)
self._generate_new_key()
else:
logger.info("Using existing encryption key from %s", self.key_file)
def _generate_new_key(self) -> None:
"""Generate and store a new encryption key."""
try:
self.key_file.parent.mkdir(parents=True, exist_ok=True)
# Generate a secure random key
key = Fernet.generate_key()
# Write key with restrictive permissions (owner read/write only)
self.key_file.write_bytes(key)
os.chmod(self.key_file, 0o600)
logger.info("Generated new encryption key")
except IOError as e:
logger.error("Failed to generate encryption key: %s", e)
raise
def _load_key(self) -> bytes:
"""Load encryption key from file.
Returns:
Encryption key bytes
Raises:
FileNotFoundError: If key file doesn't exist
"""
if not self.key_file.exists():
raise FileNotFoundError(
f"Encryption key not found: {self.key_file}"
)
try:
key = self.key_file.read_bytes()
return key
except IOError as e:
logger.error("Failed to load encryption key: %s", e)
raise
def _get_cipher(self) -> Fernet:
"""Get or create Fernet cipher instance.
Returns:
Fernet cipher instance
"""
if self._cipher is None:
key = self._load_key()
self._cipher = Fernet(key)
return self._cipher
def encrypt_value(self, value: str) -> str:
"""Encrypt a configuration value.
Args:
value: Plain text value to encrypt
Returns:
Base64-encoded encrypted value
Raises:
ValueError: If value is empty
"""
if not value:
raise ValueError("Cannot encrypt empty value")
try:
cipher = self._get_cipher()
encrypted_bytes = cipher.encrypt(value.encode('utf-8'))
# Return as base64 string for easy storage
encrypted_str = base64.b64encode(encrypted_bytes).decode('utf-8')
logger.debug("Encrypted configuration value")
return encrypted_str
except Exception as e:
logger.error("Failed to encrypt value: %s", e)
raise
def decrypt_value(self, encrypted_value: str) -> str:
"""Decrypt a configuration value.
Args:
encrypted_value: Base64-encoded encrypted value
Returns:
Decrypted plain text value
Raises:
ValueError: If encrypted value is invalid
"""
if not encrypted_value:
raise ValueError("Cannot decrypt empty value")
try:
cipher = self._get_cipher()
# Decode from base64
encrypted_bytes = base64.b64decode(encrypted_value.encode('utf-8'))
# Decrypt
decrypted_bytes = cipher.decrypt(encrypted_bytes)
decrypted_str = decrypted_bytes.decode('utf-8')
logger.debug("Decrypted configuration value")
return decrypted_str
except Exception as e:
logger.error("Failed to decrypt value: %s", e)
raise
def encrypt_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Encrypt sensitive fields in configuration dictionary.
Args:
config: Configuration dictionary
Returns:
Dictionary with encrypted sensitive fields
"""
# List of sensitive field names to encrypt
sensitive_fields = {
'password',
'passwd',
'secret',
'key',
'token',
'api_key',
'apikey',
'auth_token',
'jwt_secret',
'master_password',
}
encrypted_config = {}
for key, value in config.items():
key_lower = key.lower()
# Check if field name suggests sensitive data
is_sensitive = any(
field in key_lower for field in sensitive_fields
)
if is_sensitive and isinstance(value, str) and value:
try:
encrypted_config[key] = {
'encrypted': True,
'value': self.encrypt_value(value)
}
logger.debug("Encrypted config field: %s", key)
except Exception as e:
logger.warning("Failed to encrypt %s: %s", key, e)
encrypted_config[key] = value
else:
encrypted_config[key] = value
return encrypted_config
def decrypt_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Decrypt sensitive fields in configuration dictionary.
Args:
config: Configuration dictionary with encrypted fields
Returns:
Dictionary with decrypted values
"""
decrypted_config = {}
for key, value in config.items():
# Check if this is an encrypted field
if (
isinstance(value, dict) and
value.get('encrypted') is True and
'value' in value
):
try:
decrypted_config[key] = self.decrypt_value(
value['value']
)
logger.debug("Decrypted config field: %s", key)
except Exception as e:
logger.error("Failed to decrypt %s: %s", key, e)
decrypted_config[key] = None
else:
decrypted_config[key] = value
return decrypted_config
def rotate_key(self, new_key_file: Optional[Path] = None) -> None:
"""Rotate encryption key.
**Warning**: This will invalidate all previously encrypted data.
Args:
new_key_file: Path for new key file (optional)
"""
logger.warning(
"Rotating encryption key - all encrypted data will "
"need re-encryption"
)
# Backup old key if it exists
if self.key_file.exists():
backup_path = self.key_file.with_suffix('.key.bak')
self.key_file.rename(backup_path)
logger.info("Backed up old key to %s", backup_path)
# Generate new key
if new_key_file:
self.key_file = new_key_file
self._generate_new_key()
self._cipher = None # Reset cipher to use new key
# Global instance
_config_encryption: Optional[ConfigEncryption] = None
def get_config_encryption() -> ConfigEncryption:
"""Get the global configuration encryption instance.
Returns:
ConfigEncryption instance
"""
global _config_encryption
if _config_encryption is None:
_config_encryption = ConfigEncryption()
return _config_encryption