240 lines
7.4 KiB
Python
240 lines
7.4 KiB
Python
"""File integrity verification utilities.
|
|
|
|
This module provides checksum calculation and verification for
|
|
downloaded files. Supports SHA256 hashing for file integrity validation.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FileIntegrityManager:
|
|
"""Manages file integrity checksums and verification."""
|
|
|
|
def __init__(self, checksum_file: Optional[Path] = None):
|
|
"""Initialize the file integrity manager.
|
|
|
|
Args:
|
|
checksum_file: Path to store checksums.
|
|
Defaults to data/checksums.json
|
|
"""
|
|
if checksum_file is None:
|
|
project_root = Path(__file__).parent.parent.parent.parent
|
|
checksum_file = project_root / "data" / "checksums.json"
|
|
|
|
self.checksum_file = Path(checksum_file)
|
|
self.checksums: Dict[str, str] = {}
|
|
self._load_checksums()
|
|
|
|
def _load_checksums(self) -> None:
|
|
"""Load checksums from file."""
|
|
if self.checksum_file.exists():
|
|
try:
|
|
with open(self.checksum_file, 'r', encoding='utf-8') as f:
|
|
self.checksums = json.load(f)
|
|
count = len(self.checksums)
|
|
logger.info(
|
|
"Loaded %d checksums from %s",
|
|
count,
|
|
self.checksum_file,
|
|
)
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
logger.error("Failed to load checksums: %s", e)
|
|
self.checksums = {}
|
|
else:
|
|
logger.info("Checksum file does not exist: %s", self.checksum_file)
|
|
self.checksums = {}
|
|
|
|
def _save_checksums(self) -> None:
|
|
"""Save checksums to file."""
|
|
try:
|
|
self.checksum_file.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(self.checksum_file, 'w', encoding='utf-8') as f:
|
|
json.dump(self.checksums, f, indent=2)
|
|
count = len(self.checksums)
|
|
logger.debug(
|
|
"Saved %d checksums to %s",
|
|
count,
|
|
self.checksum_file,
|
|
)
|
|
except IOError as e:
|
|
logger.error("Failed to save checksums: %s", e)
|
|
|
|
def calculate_checksum(
|
|
self, file_path: Path, algorithm: str = "sha256"
|
|
) -> str:
|
|
"""Calculate checksum for a file.
|
|
|
|
Args:
|
|
file_path: Path to the file
|
|
algorithm: Hash algorithm to use (default: sha256)
|
|
|
|
Returns:
|
|
Hexadecimal checksum string
|
|
|
|
Raises:
|
|
FileNotFoundError: If file doesn't exist
|
|
ValueError: If algorithm is not supported
|
|
"""
|
|
if not file_path.exists():
|
|
raise FileNotFoundError(f"File not found: {file_path}")
|
|
|
|
if algorithm not in hashlib.algorithms_available:
|
|
raise ValueError(f"Unsupported hash algorithm: {algorithm}")
|
|
|
|
hash_obj = hashlib.new(algorithm)
|
|
|
|
try:
|
|
with open(file_path, 'rb') as f:
|
|
# Read file in chunks to handle large files
|
|
for chunk in iter(lambda: f.read(8192), b''):
|
|
hash_obj.update(chunk)
|
|
|
|
checksum = hash_obj.hexdigest()
|
|
filename = file_path.name
|
|
logger.debug(
|
|
"Calculated %s checksum for %s: %s",
|
|
algorithm,
|
|
filename,
|
|
checksum,
|
|
)
|
|
return checksum
|
|
|
|
except IOError as e:
|
|
logger.error("Failed to read file %s: %s", file_path, e)
|
|
raise
|
|
|
|
def store_checksum(
|
|
self, file_path: Path, checksum: Optional[str] = None
|
|
) -> str:
|
|
"""Calculate and store checksum for a file.
|
|
|
|
Args:
|
|
file_path: Path to the file
|
|
checksum: Pre-calculated checksum (optional, will calculate
|
|
if not provided)
|
|
|
|
Returns:
|
|
The stored checksum
|
|
|
|
Raises:
|
|
FileNotFoundError: If file doesn't exist
|
|
"""
|
|
if checksum is None:
|
|
checksum = self.calculate_checksum(file_path)
|
|
|
|
# Use relative path as key for portability
|
|
key = str(file_path.resolve())
|
|
self.checksums[key] = checksum
|
|
self._save_checksums()
|
|
|
|
logger.info("Stored checksum for %s", file_path.name)
|
|
return checksum
|
|
|
|
def verify_checksum(
|
|
self, file_path: Path, expected_checksum: Optional[str] = None
|
|
) -> bool:
|
|
"""Verify file integrity by comparing checksums.
|
|
|
|
Args:
|
|
file_path: Path to the file
|
|
expected_checksum: Expected checksum (optional, will look up
|
|
stored checksum)
|
|
|
|
Returns:
|
|
True if checksum matches, False otherwise
|
|
|
|
Raises:
|
|
FileNotFoundError: If file doesn't exist
|
|
"""
|
|
if not file_path.exists():
|
|
raise FileNotFoundError(f"File not found: {file_path}")
|
|
|
|
# Get expected checksum from storage if not provided
|
|
if expected_checksum is None:
|
|
key = str(file_path.resolve())
|
|
expected_checksum = self.checksums.get(key)
|
|
|
|
if expected_checksum is None:
|
|
filename = file_path.name
|
|
logger.warning(
|
|
"No stored checksum found for %s", filename
|
|
)
|
|
return False
|
|
|
|
# Calculate current checksum
|
|
try:
|
|
current_checksum = self.calculate_checksum(file_path)
|
|
|
|
if current_checksum == expected_checksum:
|
|
filename = file_path.name
|
|
logger.info("Checksum verification passed for %s", filename)
|
|
return True
|
|
else:
|
|
filename = file_path.name
|
|
logger.warning(
|
|
"Checksum mismatch for %s: "
|
|
"expected %s, got %s",
|
|
filename,
|
|
expected_checksum,
|
|
current_checksum
|
|
)
|
|
return False
|
|
|
|
except (IOError, OSError) as e:
|
|
logger.error("Failed to verify checksum for %s: %s", file_path, e)
|
|
return False
|
|
|
|
def remove_checksum(self, file_path: Path) -> bool:
|
|
"""Remove checksum for a file.
|
|
|
|
Args:
|
|
file_path: Path to the file
|
|
|
|
Returns:
|
|
True if checksum was removed, False if not found
|
|
"""
|
|
key = str(file_path.resolve())
|
|
|
|
if key in self.checksums:
|
|
del self.checksums[key]
|
|
self._save_checksums()
|
|
logger.info("Removed checksum for %s", file_path.name)
|
|
return True
|
|
else:
|
|
logger.debug("No checksum found to remove for %s", file_path.name)
|
|
return False
|
|
|
|
def has_checksum(self, file_path: Path) -> bool:
|
|
"""Check if a checksum exists for a file.
|
|
|
|
Args:
|
|
file_path: Path to the file
|
|
|
|
Returns:
|
|
True if checksum exists, False otherwise
|
|
"""
|
|
key = str(file_path.resolve())
|
|
return key in self.checksums
|
|
|
|
|
|
# Global instance
|
|
_integrity_manager: Optional[FileIntegrityManager] = None
|
|
|
|
|
|
def get_integrity_manager() -> FileIntegrityManager:
|
|
"""Get the global file integrity manager instance.
|
|
|
|
Returns:
|
|
FileIntegrityManager instance
|
|
"""
|
|
global _integrity_manager
|
|
if _integrity_manager is None:
|
|
_integrity_manager = FileIntegrityManager()
|
|
return _integrity_manager
|