| """ |
| GPU Manager for SPARKNET |
| Handles GPU allocation, monitoring, and resource management |
| """ |
|
|
| import os |
| import torch |
| from typing import Optional, List, Dict |
| from contextlib import contextmanager |
| import pynvml |
| from loguru import logger |
|
|
|
|
| class GPUManager: |
| """Manages GPU resources for model deployment and monitoring.""" |
|
|
| def __init__(self, primary_gpu: int = 0, fallback_gpus: Optional[List[int]] = None): |
| """ |
| Initialize GPU Manager. |
| |
| Args: |
| primary_gpu: Primary GPU device ID (default: 0) |
| fallback_gpus: List of fallback GPU IDs (default: [1, 2, 3]) |
| """ |
| self.primary_gpu = primary_gpu |
| self.fallback_gpus = fallback_gpus or [1, 2, 3] |
| self.initialized = False |
|
|
| |
| try: |
| pynvml.nvmlInit() |
| self.initialized = True |
| logger.info("GPU Manager initialized with NVML") |
| except Exception as e: |
| logger.warning(f"Failed to initialize NVML: {e}") |
|
|
| |
| self.available_gpus = self._detect_gpus() |
| logger.info(f"Detected {len(self.available_gpus)} GPUs: {self.available_gpus}") |
|
|
| def _detect_gpus(self) -> List[int]: |
| """Detect available CUDA GPUs.""" |
| if not torch.cuda.is_available(): |
| logger.warning("CUDA not available!") |
| return [] |
|
|
| gpu_count = torch.cuda.device_count() |
| return list(range(gpu_count)) |
|
|
| def get_gpu_info(self, gpu_id: int) -> Dict[str, any]: |
| """ |
| Get detailed information about a GPU. |
| |
| Args: |
| gpu_id: GPU device ID |
| |
| Returns: |
| Dictionary with GPU information |
| """ |
| if not self.initialized: |
| return {"error": "NVML not initialized"} |
|
|
| try: |
| handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_id) |
| mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle) |
| utilization = pynvml.nvmlDeviceGetUtilizationRates(handle) |
| temperature = pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU) |
| name = pynvml.nvmlDeviceGetName(handle) |
|
|
| return { |
| "gpu_id": gpu_id, |
| "name": name, |
| "memory_total": mem_info.total, |
| "memory_used": mem_info.used, |
| "memory_free": mem_info.free, |
| "memory_percent": (mem_info.used / mem_info.total) * 100, |
| "gpu_utilization": utilization.gpu, |
| "memory_utilization": utilization.memory, |
| "temperature": temperature, |
| } |
| except Exception as e: |
| logger.error(f"Error getting GPU {gpu_id} info: {e}") |
| return {"error": str(e)} |
|
|
| def get_all_gpu_info(self) -> List[Dict[str, any]]: |
| """Get information for all available GPUs.""" |
| return [self.get_gpu_info(gpu_id) for gpu_id in self.available_gpus] |
|
|
| def get_free_memory(self, gpu_id: int) -> int: |
| """ |
| Get free memory on a GPU in bytes. |
| |
| Args: |
| gpu_id: GPU device ID |
| |
| Returns: |
| Free memory in bytes |
| """ |
| info = self.get_gpu_info(gpu_id) |
| return info.get("memory_free", 0) |
|
|
| def select_best_gpu(self, min_memory_gb: float = 8.0) -> Optional[int]: |
| """ |
| Select the best available GPU based on free memory. |
| |
| Args: |
| min_memory_gb: Minimum required free memory in GB |
| |
| Returns: |
| GPU ID or None if no suitable GPU found |
| """ |
| min_memory_bytes = min_memory_gb * 1024 ** 3 |
|
|
| |
| if self.primary_gpu in self.available_gpus: |
| free_mem = self.get_free_memory(self.primary_gpu) |
| if free_mem >= min_memory_bytes: |
| logger.info(f"Selected primary GPU {self.primary_gpu} ({free_mem / 1024**3:.2f} GB free)") |
| return self.primary_gpu |
|
|
| |
| for gpu_id in self.fallback_gpus: |
| if gpu_id in self.available_gpus: |
| free_mem = self.get_free_memory(gpu_id) |
| if free_mem >= min_memory_bytes: |
| logger.info(f"Selected fallback GPU {gpu_id} ({free_mem / 1024**3:.2f} GB free)") |
| return gpu_id |
|
|
| logger.warning(f"No GPU found with {min_memory_gb} GB free memory") |
| return None |
|
|
| def set_device(self, gpu_id: int): |
| """ |
| Set the CUDA device. |
| |
| Args: |
| gpu_id: GPU device ID |
| """ |
| if gpu_id not in self.available_gpus: |
| raise ValueError(f"GPU {gpu_id} not available") |
|
|
| os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) |
| torch.cuda.set_device(gpu_id) |
| logger.info(f"Set CUDA device to GPU {gpu_id}") |
|
|
| @contextmanager |
| def gpu_context(self, gpu_id: Optional[int] = None, min_memory_gb: float = 8.0): |
| """ |
| Context manager for GPU allocation. |
| |
| Args: |
| gpu_id: Specific GPU ID or None for auto-selection |
| min_memory_gb: Minimum required memory in GB |
| |
| Yields: |
| GPU device ID |
| """ |
| |
| if gpu_id is None: |
| gpu_id = self.select_best_gpu(min_memory_gb) |
| if gpu_id is None: |
| raise RuntimeError("No suitable GPU available") |
|
|
| |
| original_device = os.environ.get("CUDA_VISIBLE_DEVICES", "") |
|
|
| try: |
| self.set_device(gpu_id) |
| yield gpu_id |
| finally: |
| |
| if original_device: |
| os.environ["CUDA_VISIBLE_DEVICES"] = original_device |
| |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| logger.debug("Cleared CUDA cache") |
|
|
| def clear_cache(self, gpu_id: Optional[int] = None): |
| """ |
| Clear CUDA cache for a specific GPU or all GPUs. |
| |
| Args: |
| gpu_id: GPU device ID or None for all GPUs |
| """ |
| if gpu_id is not None: |
| with torch.cuda.device(gpu_id): |
| torch.cuda.empty_cache() |
| logger.info(f"Cleared cache for GPU {gpu_id}") |
| else: |
| torch.cuda.empty_cache() |
| logger.info("Cleared cache for all GPUs") |
|
|
| def monitor(self) -> str: |
| """ |
| Get a formatted monitoring string for all GPUs. |
| |
| Returns: |
| Formatted string with GPU status |
| """ |
| info_list = self.get_all_gpu_info() |
|
|
| lines = ["GPU Status:"] |
| for info in info_list: |
| if "error" in info: |
| lines.append(f" GPU {info.get('gpu_id', '?')}: Error - {info['error']}") |
| else: |
| lines.append( |
| f" GPU {info['gpu_id']}: {info['name']} | " |
| f"Memory: {info['memory_used'] / 1024**3:.2f}/{info['memory_total'] / 1024**3:.2f} GB " |
| f"({info['memory_percent']:.1f}%) | " |
| f"Utilization: {info['gpu_utilization']}% | " |
| f"Temp: {info['temperature']}°C" |
| ) |
|
|
| return "\n".join(lines) |
|
|
| def __del__(self): |
| """Cleanup NVML on deletion.""" |
| if self.initialized: |
| try: |
| pynvml.nvmlShutdown() |
| except Exception: |
| pass |
|
|
|
|
| |
| _gpu_manager = None |
|
|
|
|
| def get_gpu_manager() -> GPUManager: |
| """Get or create the global GPU manager instance.""" |
| global _gpu_manager |
| if _gpu_manager is None: |
| _gpu_manager = GPUManager() |
| return _gpu_manager |
|
|