"""File integrity verification utilities. This module provides checksum calculation and verification for downloaded files. Supports SHA256 hashing for file integrity validation. """ import hashlib import json import logging from pathlib import Path from typing import Dict, Optional logger = logging.getLogger(__name__) class FileIntegrityManager: """Manages file integrity checksums and verification.""" def __init__(self, checksum_file: Optional[Path] = None): """Initialize the file integrity manager. Args: checksum_file: Path to store checksums. Defaults to data/checksums.json """ if checksum_file is None: project_root = Path(__file__).parent.parent.parent.parent checksum_file = project_root / "data" / "checksums.json" self.checksum_file = Path(checksum_file) self.checksums: Dict[str, str] = {} self._load_checksums() def _load_checksums(self) -> None: """Load checksums from file.""" if self.checksum_file.exists(): try: with open(self.checksum_file, 'r', encoding='utf-8') as f: self.checksums = json.load(f) count = len(self.checksums) logger.info( "Loaded %d checksums from %s", count, self.checksum_file, ) except (json.JSONDecodeError, IOError) as e: logger.error("Failed to load checksums: %s", e) self.checksums = {} else: logger.info("Checksum file does not exist: %s", self.checksum_file) self.checksums = {} def _save_checksums(self) -> None: """Save checksums to file.""" try: self.checksum_file.parent.mkdir(parents=True, exist_ok=True) with open(self.checksum_file, 'w', encoding='utf-8') as f: json.dump(self.checksums, f, indent=2) count = len(self.checksums) logger.debug( "Saved %d checksums to %s", count, self.checksum_file, ) except IOError as e: logger.error("Failed to save checksums: %s", e) def calculate_checksum( self, file_path: Path, algorithm: str = "sha256" ) -> str: """Calculate checksum for a file. Args: file_path: Path to the file algorithm: Hash algorithm to use (default: sha256) Returns: Hexadecimal checksum string Raises: FileNotFoundError: If file doesn't exist ValueError: If algorithm is not supported """ if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") if algorithm not in hashlib.algorithms_available: raise ValueError(f"Unsupported hash algorithm: {algorithm}") hash_obj = hashlib.new(algorithm) try: with open(file_path, 'rb') as f: # Read file in chunks to handle large files for chunk in iter(lambda: f.read(8192), b''): hash_obj.update(chunk) checksum = hash_obj.hexdigest() filename = file_path.name logger.debug( "Calculated %s checksum for %s: %s", algorithm, filename, checksum, ) return checksum except IOError as e: logger.error("Failed to read file %s: %s", file_path, e) raise def store_checksum( self, file_path: Path, checksum: Optional[str] = None ) -> str: """Calculate and store checksum for a file. Args: file_path: Path to the file checksum: Pre-calculated checksum (optional, will calculate if not provided) Returns: The stored checksum Raises: FileNotFoundError: If file doesn't exist """ if checksum is None: checksum = self.calculate_checksum(file_path) # Use relative path as key for portability key = str(file_path.resolve()) self.checksums[key] = checksum self._save_checksums() logger.info("Stored checksum for %s", file_path.name) return checksum def verify_checksum( self, file_path: Path, expected_checksum: Optional[str] = None ) -> bool: """Verify file integrity by comparing checksums. Args: file_path: Path to the file expected_checksum: Expected checksum (optional, will look up stored checksum) Returns: True if checksum matches, False otherwise Raises: FileNotFoundError: If file doesn't exist """ if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") # Get expected checksum from storage if not provided if expected_checksum is None: key = str(file_path.resolve()) expected_checksum = self.checksums.get(key) if expected_checksum is None: filename = file_path.name logger.warning( "No stored checksum found for %s", filename ) return False # Calculate current checksum try: current_checksum = self.calculate_checksum(file_path) if current_checksum == expected_checksum: filename = file_path.name logger.info("Checksum verification passed for %s", filename) return True else: filename = file_path.name logger.warning( "Checksum mismatch for %s: " "expected %s, got %s", filename, expected_checksum, current_checksum ) return False except (IOError, OSError) as e: logger.error("Failed to verify checksum for %s: %s", file_path, e) return False def remove_checksum(self, file_path: Path) -> bool: """Remove checksum for a file. Args: file_path: Path to the file Returns: True if checksum was removed, False if not found """ key = str(file_path.resolve()) if key in self.checksums: del self.checksums[key] self._save_checksums() logger.info("Removed checksum for %s", file_path.name) return True else: logger.debug("No checksum found to remove for %s", file_path.name) return False def has_checksum(self, file_path: Path) -> bool: """Check if a checksum exists for a file. Args: file_path: Path to the file Returns: True if checksum exists, False otherwise """ key = str(file_path.resolve()) return key in self.checksums # Global instance _integrity_manager: Optional[FileIntegrityManager] = None def get_integrity_manager() -> FileIntegrityManager: """Get the global file integrity manager instance. Returns: FileIntegrityManager instance """ global _integrity_manager if _integrity_manager is None: _integrity_manager = FileIntegrityManager() return _integrity_manager