"""System utility functions for monitoring and management.""" import logging import os import shutil from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional import psutil logger = logging.getLogger(__name__) @dataclass class DiskInfo: """Information about disk usage.""" total_bytes: int used_bytes: int free_bytes: int percent_used: float path: str @dataclass class ProcessInfo: """Information about a process.""" pid: int name: str status: str cpu_percent: float memory_percent: float memory_mb: float create_time: datetime class SystemUtilities: """Utilities for system monitoring and management.""" @staticmethod def get_disk_usage(path: str = "/") -> Optional[DiskInfo]: """Get disk usage information. Args: path: Path to check disk usage for. Returns: DiskInfo: Disk usage information. """ try: usage = psutil.disk_usage(path) return DiskInfo( total_bytes=usage.total, used_bytes=usage.used, free_bytes=usage.free, percent_used=usage.percent, path=path, ) except Exception as e: logger.error(f"Failed to get disk usage for {path}: {e}") return None @staticmethod def get_all_disk_usage() -> List[DiskInfo]: """Get disk usage for all mounted partitions. Returns: list: List of DiskInfo for each partition. """ try: partitions = psutil.disk_partitions() disk_infos = [] for partition in partitions: try: usage = psutil.disk_usage(partition.mountpoint) disk_infos.append( DiskInfo( total_bytes=usage.total, used_bytes=usage.used, free_bytes=usage.free, percent_used=usage.percent, path=partition.mountpoint, ) ) except Exception as e: logger.warning( f"Failed to get usage for {partition.mountpoint}: {e}" ) return disk_infos except Exception as e: logger.error(f"Failed to get all disk usage: {e}") return [] @staticmethod def cleanup_directory( directory: str, pattern: str = "*", max_age_days: int = 30 ) -> int: """Clean up files in a directory matching a pattern. Args: directory: Directory to clean. pattern: File pattern to match (glob). max_age_days: Only delete files older than this. Returns: int: Number of files deleted. """ try: from datetime import timedelta path = Path(directory) if not path.exists(): logger.warning(f"Directory not found: {directory}") return 0 deleted_count = 0 cutoff_time = datetime.now() - timedelta(days=max_age_days) for file_path in path.glob(pattern): if file_path.is_file(): file_time = datetime.fromtimestamp( file_path.stat().st_mtime ) if file_time < cutoff_time: try: file_path.unlink() deleted_count += 1 logger.debug(f"Deleted file: {file_path}") except Exception as e: logger.warning( f"Failed to delete {file_path}: {e}" ) logger.info(f"Cleaned up {deleted_count} files from {directory}") return deleted_count except Exception as e: logger.error(f"Failed to cleanup directory {directory}: {e}") return 0 @staticmethod def cleanup_empty_directories(directory: str) -> int: """Remove empty directories. Args: directory: Root directory to clean. Returns: int: Number of directories deleted. """ try: path = Path(directory) if not path.exists(): return 0 deleted_count = 0 # Walk from bottom to top to delete empty dirs for root, dirs, files in os.walk(directory, topdown=False): for dir_name in dirs: dir_path = Path(root) / dir_name try: if not os.listdir(dir_path): os.rmdir(dir_path) deleted_count += 1 logger.debug( f"Deleted empty directory: {dir_path}" ) except Exception as e: logger.debug(f"Cannot delete {dir_path}: {e}") logger.info(f"Cleaned up {deleted_count} empty directories") return deleted_count except Exception as e: logger.error(f"Failed to cleanup empty directories: {e}") return 0 @staticmethod def get_directory_size(directory: str) -> int: """Get total size of a directory. Args: directory: Directory path. Returns: int: Total size in bytes. """ try: path = Path(directory) if not path.exists(): return 0 total_size = 0 for entry in path.rglob("*"): if entry.is_file(): total_size += entry.stat().st_size return total_size except Exception as e: logger.error(f"Failed to get directory size for {directory}: {e}") return 0 @staticmethod def get_process_info(pid: Optional[int] = None) -> Optional[ProcessInfo]: """Get information about a process. Args: pid: Process ID. If None, uses current process. Returns: ProcessInfo: Process information. """ try: if pid is None: pid = os.getpid() process = psutil.Process(pid) with process.oneshot(): return ProcessInfo( pid=process.pid, name=process.name(), status=process.status(), cpu_percent=process.cpu_percent(), memory_percent=process.memory_percent(), memory_mb=process.memory_info().rss / (1024 * 1024), create_time=datetime.fromtimestamp( process.create_time() ), ) except Exception as e: logger.error(f"Failed to get process info for {pid}: {e}") return None @staticmethod def get_all_processes() -> List[ProcessInfo]: """Get information about all running processes. Returns: list: List of ProcessInfo for each process. """ try: processes = [] for proc in psutil.process_iter( ["pid", "name", "status", "cpu_num", "memory_percent"] ): try: info = SystemUtilities.get_process_info(proc.pid) if info: processes.append(info) except Exception as process_error: logger.debug( "Skipping process %s: %s", proc.pid, process_error, ) return processes except Exception as e: logger.error(f"Failed to get all processes: {e}") return [] @staticmethod def get_system_info() -> Dict[str, Any]: """Get comprehensive system information. Returns: dict: System information. """ try: import platform return { "platform": platform.platform(), "processor": platform.processor(), "cpu_count": psutil.cpu_count(logical=False), "cpu_count_logical": psutil.cpu_count(logical=True), "boot_time": datetime.fromtimestamp( psutil.boot_time() ).isoformat(), "hostname": platform.node(), "python_version": platform.python_version(), } except Exception as e: logger.error(f"Failed to get system info: {e}") return {} @staticmethod def get_network_info() -> Dict[str, Any]: """Get network information. Returns: dict: Network statistics. """ try: net_io = psutil.net_io_counters() return { "bytes_sent": net_io.bytes_sent, "bytes_recv": net_io.bytes_recv, "packets_sent": net_io.packets_sent, "packets_recv": net_io.packets_recv, "errors_in": net_io.errin, "errors_out": net_io.errout, "dropped_in": net_io.dropin, "dropped_out": net_io.dropout, } except Exception as e: logger.error(f"Failed to get network info: {e}") return {} @staticmethod def copy_file_atomic( src: str, dest: str, chunk_size: int = 1024 * 1024 ) -> bool: """Copy a file atomically using temporary file. Args: src: Source file path. dest: Destination file path. chunk_size: Size of chunks for copying. Returns: bool: True if successful. """ try: src_path = Path(src) dest_path = Path(dest) if not src_path.exists(): logger.error(f"Source file not found: {src}") return False # Create temporary file temp_path = dest_path.parent / f"{dest_path.name}.tmp" # Copy to temporary file shutil.copyfile(src, temp_path) # Atomic rename temp_path.replace(dest_path) logger.debug(f"Atomically copied {src} to {dest}") return True except Exception as e: logger.error(f"Failed to copy file {src} to {dest}: {e}") return False # Global system utilities instance _system_utilities: Optional[SystemUtilities] = None def get_system_utilities() -> SystemUtilities: """Get or create the global system utilities instance. Returns: SystemUtilities: The system utilities instance. """ global _system_utilities if _system_utilities is None: _system_utilities = SystemUtilities() return _system_utilities