Files
Aniworld/src/infrastructure/security/file_integrity.py

240 lines
7.4 KiB
Python

"""File integrity verification utilities.
This module provides checksum calculation and verification for
downloaded files. Supports SHA256 hashing for file integrity validation.
"""
import hashlib
import json
import logging
from pathlib import Path
from typing import Dict, Optional
logger = logging.getLogger(__name__)
class FileIntegrityManager:
"""Manages file integrity checksums and verification."""
def __init__(self, checksum_file: Optional[Path] = None):
"""Initialize the file integrity manager.
Args:
checksum_file: Path to store checksums.
Defaults to data/checksums.json
"""
if checksum_file is None:
project_root = Path(__file__).parent.parent.parent.parent
checksum_file = project_root / "data" / "checksums.json"
self.checksum_file = Path(checksum_file)
self.checksums: Dict[str, str] = {}
self._load_checksums()
def _load_checksums(self) -> None:
"""Load checksums from file."""
if self.checksum_file.exists():
try:
with open(self.checksum_file, 'r', encoding='utf-8') as f:
self.checksums = json.load(f)
count = len(self.checksums)
logger.info(
"Loaded %d checksums from %s",
count,
self.checksum_file,
)
except (json.JSONDecodeError, IOError) as e:
logger.error("Failed to load checksums: %s", e)
self.checksums = {}
else:
logger.info("Checksum file does not exist: %s", self.checksum_file)
self.checksums = {}
def _save_checksums(self) -> None:
"""Save checksums to file."""
try:
self.checksum_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.checksum_file, 'w', encoding='utf-8') as f:
json.dump(self.checksums, f, indent=2)
count = len(self.checksums)
logger.debug(
"Saved %d checksums to %s",
count,
self.checksum_file,
)
except IOError as e:
logger.error("Failed to save checksums: %s", e)
def calculate_checksum(
self, file_path: Path, algorithm: str = "sha256"
) -> str:
"""Calculate checksum for a file.
Args:
file_path: Path to the file
algorithm: Hash algorithm to use (default: sha256)
Returns:
Hexadecimal checksum string
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If algorithm is not supported
"""
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
if algorithm not in hashlib.algorithms_available:
raise ValueError(f"Unsupported hash algorithm: {algorithm}")
hash_obj = hashlib.new(algorithm)
try:
with open(file_path, 'rb') as f:
# Read file in chunks to handle large files
for chunk in iter(lambda: f.read(8192), b''):
hash_obj.update(chunk)
checksum = hash_obj.hexdigest()
filename = file_path.name
logger.debug(
"Calculated %s checksum for %s: %s",
algorithm,
filename,
checksum,
)
return checksum
except IOError as e:
logger.error("Failed to read file %s: %s", file_path, e)
raise
def store_checksum(
self, file_path: Path, checksum: Optional[str] = None
) -> str:
"""Calculate and store checksum for a file.
Args:
file_path: Path to the file
checksum: Pre-calculated checksum (optional, will calculate
if not provided)
Returns:
The stored checksum
Raises:
FileNotFoundError: If file doesn't exist
"""
if checksum is None:
checksum = self.calculate_checksum(file_path)
# Use relative path as key for portability
key = str(file_path.resolve())
self.checksums[key] = checksum
self._save_checksums()
logger.info("Stored checksum for %s", file_path.name)
return checksum
def verify_checksum(
self, file_path: Path, expected_checksum: Optional[str] = None
) -> bool:
"""Verify file integrity by comparing checksums.
Args:
file_path: Path to the file
expected_checksum: Expected checksum (optional, will look up
stored checksum)
Returns:
True if checksum matches, False otherwise
Raises:
FileNotFoundError: If file doesn't exist
"""
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Get expected checksum from storage if not provided
if expected_checksum is None:
key = str(file_path.resolve())
expected_checksum = self.checksums.get(key)
if expected_checksum is None:
filename = file_path.name
logger.warning(
"No stored checksum found for %s", filename
)
return False
# Calculate current checksum
try:
current_checksum = self.calculate_checksum(file_path)
if current_checksum == expected_checksum:
filename = file_path.name
logger.info("Checksum verification passed for %s", filename)
return True
else:
filename = file_path.name
logger.warning(
"Checksum mismatch for %s: "
"expected %s, got %s",
filename,
expected_checksum,
current_checksum
)
return False
except (IOError, OSError) as e:
logger.error("Failed to verify checksum for %s: %s", file_path, e)
return False
def remove_checksum(self, file_path: Path) -> bool:
"""Remove checksum for a file.
Args:
file_path: Path to the file
Returns:
True if checksum was removed, False if not found
"""
key = str(file_path.resolve())
if key in self.checksums:
del self.checksums[key]
self._save_checksums()
logger.info("Removed checksum for %s", file_path.name)
return True
else:
logger.debug("No checksum found to remove for %s", file_path.name)
return False
def has_checksum(self, file_path: Path) -> bool:
"""Check if a checksum exists for a file.
Args:
file_path: Path to the file
Returns:
True if checksum exists, False otherwise
"""
key = str(file_path.resolve())
return key in self.checksums
# Global instance
_integrity_manager: Optional[FileIntegrityManager] = None
def get_integrity_manager() -> FileIntegrityManager:
"""Get the global file integrity manager instance.
Returns:
FileIntegrityManager instance
"""
global _integrity_manager
if _integrity_manager is None:
_integrity_manager = FileIntegrityManager()
return _integrity_manager