Improve GPU monitoring

This commit is contained in:
allegroai 2024-03-17 19:13:57 +02:00
parent 6a4fcda1bf
commit 22672d2444
4 changed files with 1349 additions and 1227 deletions

View File

@ -248,6 +248,8 @@ ENV_TEMP_STDOUT_FILE_DIR = EnvironmentConfig("CLEARML_AGENT_TEMP_STDOUT_FILE_DIR
ENV_GIT_CLONE_VERBOSE = EnvironmentConfig("CLEARML_AGENT_GIT_CLONE_VERBOSE", type=bool)
ENV_GPU_FRACTIONS = EnvironmentConfig("CLEARML_AGENT_GPU_FRACTIONS")
class FileBuffering(IntEnum):
"""

View File

@ -57,6 +57,21 @@ class GPUStat(object):
"""
return self.entry['uuid']
@property
def mig_index(self):
"""
Returns the index of the MIG partition (as in nvidia-smi).
"""
return self.entry.get("mig_index")
@property
def mig_uuid(self):
"""
Returns the uuid of the MIG partition returned by nvidia-smi when running in MIG mode,
e.g. MIG-12345678-abcd-abcd-uuid-123456abcdef
"""
return self.entry.get("mig_uuid")
@property
def name(self):
"""
@ -161,6 +176,7 @@ class GPUStatCollection(object):
_initialized = False
_device_count = None
_gpu_device_info = {}
_mig_device_info = {}
def __init__(self, gpu_list, driver_version=None, driver_cuda_version=None):
self.gpus = gpu_list
@ -191,7 +207,7 @@ class GPUStatCollection(object):
return b.decode() # for python3, to unicode
return b
def get_gpu_info(index, handle):
def get_gpu_info(index, handle, is_mig=False):
"""Get one GPU information specified by nvml handle"""
def get_process_info(nv_process):
@ -227,12 +243,14 @@ class GPUStatCollection(object):
pass
return process
if not GPUStatCollection._gpu_device_info.get(index):
device_info = GPUStatCollection._mig_device_info if is_mig else GPUStatCollection._gpu_device_info
if not device_info.get(index):
name = _decode(N.nvmlDeviceGetName(handle))
uuid = _decode(N.nvmlDeviceGetUUID(handle))
GPUStatCollection._gpu_device_info[index] = (name, uuid)
device_info[index] = (name, uuid)
name, uuid = GPUStatCollection._gpu_device_info[index]
name, uuid = device_info[index]
try:
temperature = N.nvmlDeviceGetTemperature(
@ -328,8 +346,36 @@ class GPUStatCollection(object):
for index in range(GPUStatCollection._device_count):
handle = N.nvmlDeviceGetHandleByIndex(index)
gpu_info = get_gpu_info(index, handle)
gpu_stat = GPUStat(gpu_info)
gpu_list.append(gpu_stat)
mig_cnt = 0
# noinspection PyBroadException
try:
mig_cnt = N.nvmlDeviceGetMaxMigDeviceCount(handle)
except Exception:
pass
if mig_cnt <= 0:
gpu_list.append(GPUStat(gpu_info))
continue
got_mig_info = False
for mig_index in range(mig_cnt):
try:
mig_handle = N.nvmlDeviceGetMigDeviceHandleByIndex(handle, mig_index)
mig_info = get_gpu_info(mig_index, mig_handle, is_mig=True)
mig_info["mig_name"] = mig_info["name"]
mig_info["name"] = gpu_info["name"]
mig_info["mig_index"] = mig_info["index"]
mig_info["mig_uuid"] = mig_info["uuid"]
mig_info["index"] = gpu_info["index"]
mig_info["uuid"] = gpu_info["uuid"]
mig_info["temperature.gpu"] = gpu_info["temperature.gpu"]
mig_info["fan.speed"] = gpu_info["fan.speed"]
gpu_list.append(GPUStat(mig_info))
got_mig_info = True
except Exception as e:
pass
if not got_mig_info:
gpu_list.append(GPUStat(gpu_info))
# 2. additional info (driver version, etc).
if get_driver_info:

File diff suppressed because it is too large Load Diff

View File

@ -1,19 +1,20 @@
from __future__ import unicode_literals, division
import logging
import os
import re
import shlex
from collections import deque
from itertools import starmap
from threading import Thread, Event
from time import time
from typing import Text, Sequence, List, Dict, Optional
from typing import Sequence, List, Union, Dict, Optional
import attr
import psutil
from pathlib2 import Path
from clearml_agent.definitions import ENV_WORKER_TAGS, ENV_GPU_FRACTIONS
from clearml_agent.session import Session
from clearml_agent.definitions import ENV_WORKER_TAGS
try:
from .gpu import gpustat
@ -87,7 +88,13 @@ class ResourceMonitor(object):
self._gpustat_fail = 0
self._gpustat = gpustat
self._active_gpus = None
self._default_gpu_utilization = session.config.get("agent.resource_monitoring.default_gpu_utilization", 100)
# allow default_gpu_utilization as null in the config, in which case we don't log anything
if self._default_gpu_utilization is not None:
self._default_gpu_utilization = int(self._default_gpu_utilization)
self._gpu_utilization_warning_sent = False
self._disk_use_path = str(session.config.get("agent.resource_monitoring.disk_use_path", None) or Path.home())
self._fractions_handler = GpuFractionsHandler() if session.feature_set != "basic" else None
if not worker_tags and ENV_WORKER_TAGS.get():
worker_tags = shlex.split(ENV_WORKER_TAGS.get())
self._worker_tags = worker_tags
@ -237,7 +244,7 @@ class ResourceMonitor(object):
try:
self._update_readouts()
except Exception as ex:
log.warning("failed getting machine stats: %s", report_error(ex))
log.error("failed getting machine stats: %s", report_error(ex))
self._failure()
seconds_since_started += int(round(time() - last_report))
@ -357,25 +364,47 @@ class ResourceMonitor(object):
if self._active_gpus is not False and self._gpustat:
try:
gpu_stat = self._gpustat.new_query()
report_index = 0
for i, g in enumerate(gpu_stat.gpus):
# only monitor the active gpu's, if none were selected, monitor everything
if self._active_gpus:
uuid = getattr(g, "uuid", None)
if str(i) not in self._active_gpus and (not uuid or uuid not in self._active_gpus):
mig_uuid = getattr(g, "mig_uuid", None)
if (
str(g.index) not in self._active_gpus
and (not uuid or uuid not in self._active_gpus)
and (not mig_uuid or mig_uuid not in self._active_gpus)
):
continue
stats["gpu_temperature_{:d}".format(i)] = g["temperature.gpu"]
stats["gpu_utilization_{:d}".format(i)] = g["utilization.gpu"]
stats["gpu_mem_usage_{:d}".format(i)] = (
stats["gpu_temperature_{}".format(report_index)] = g["temperature.gpu"]
if g["utilization.gpu"] is not None:
stats["gpu_utilization_{}".format(report_index)] = g["utilization.gpu"]
elif self._default_gpu_utilization is not None:
stats["gpu_utilization_{}".format(report_index)] = self._default_gpu_utilization
if getattr(g, "mig_index", None) is None and not self._gpu_utilization_warning_sent:
# this shouldn't happen for non-MIGs, warn the user about it
log.error("Failed fetching GPU utilization")
self._gpu_utilization_warning_sent = True
stats["gpu_mem_usage_{}".format(report_index)] = (
100.0 * g["memory.used"] / g["memory.total"]
)
# already in MBs
stats["gpu_mem_free_{:d}".format(i)] = (
stats["gpu_mem_free_{}".format(report_index)] = (
g["memory.total"] - g["memory.used"]
)
stats["gpu_mem_used_%d" % i] = g["memory.used"]
stats["gpu_mem_used_{}".format(report_index)] = g["memory.used"] or 0
if self._fractions_handler:
fractions = self._fractions_handler.fractions
stats["gpu_fraction_{}".format(report_index)] = \
(fractions[i] if i < len(fractions) else fractions[-1]) if fractions else 1.0
except Exception as ex:
# something happened and we can't use gpu stats,
log.warning("failed getting machine stats: %s", report_error(ex))
log.error("failed getting machine stats: %s", report_error(ex))
self._failure()
return stats
@ -388,7 +417,8 @@ class ResourceMonitor(object):
)
self._gpustat = None
BACKEND_STAT_MAP = {"cpu_usage_*": "cpu_usage",
BACKEND_STAT_MAP = {
"cpu_usage_*": "cpu_usage",
"cpu_temperature_*": "cpu_temperature",
"disk_free_percent": "disk_free_home",
"io_read_mbs": "disk_read",
@ -400,7 +430,124 @@ class ResourceMonitor(object):
"gpu_temperature_*": "gpu_temperature",
"gpu_mem_used_*": "gpu_memory_used",
"gpu_mem_free_*": "gpu_memory_free",
"gpu_utilization_*": "gpu_usage"}
"gpu_utilization_*": "gpu_usage",
"gpu_fraction_*": "gpu_fraction"
}
class GpuFractionsHandler:
_number_re = re.compile(r"^clear\.ml/fraction(-\d+)?$")
_mig_re = re.compile(r"^nvidia\.com/mig-(?P<compute>[0-9]+)g\.(?P<memory>[0-9]+)gb$")
_gpu_name_to_memory_gb = {
"A30": 24,
"NVIDIA A30": 24,
"A100-SXM4-40GB": 40,
"NVIDIA-A100-40GB-PCIe": 40,
"NVIDIA A100-40GB-PCIe": 40,
"NVIDIA-A100-SXM4-40GB": 40,
"NVIDIA A100-SXM4-40GB": 40,
"NVIDIA-A100-SXM4-80GB": 79,
"NVIDIA A100-SXM4-80GB": 79,
"NVIDIA-A100-80GB-PCIe": 79,
"NVIDIA A100-80GB-PCIe": 79,
}
def __init__(self):
self._total_memory_gb = [
self._gpu_name_to_memory_gb.get(name, 0)
for name in (self._get_gpu_names() or [])
]
self._fractions = self._get_fractions()
@property
def fractions(self) -> List[float]:
return self._fractions
def _get_fractions(self) -> List[float]:
if not self._total_memory_gb:
# Can't compute
return [1.0]
fractions = (ENV_GPU_FRACTIONS.get() or "").strip()
if not fractions:
# No fractions
return [1.0]
decoded_fractions = self.decode_fractions(fractions)
if isinstance(decoded_fractions, list):
return decoded_fractions
totals = []
for i, (fraction, count) in enumerate(decoded_fractions.items()):
m = self._mig_re.match(fraction)
if not m:
continue
try:
total_gb = self._total_memory_gb[i] if i < len(self._total_memory_gb) else self._total_memory_gb[-1]
if not total_gb:
continue
totals.append((int(m.group("memory")) * count) / total_gb)
except ValueError:
pass
if not totals:
log.warning("Fractions count is empty for {}".format(fractions))
return [1.0]
return totals
@classmethod
def extract_custom_limits(cls, limits: dict):
for k, v in list(limits.items() or []):
if cls._number_re.match(k):
limits.pop(k, None)
@classmethod
def get_simple_fractions_total(cls, limits: dict) -> float:
try:
if any(cls._number_re.match(x) for x in limits):
return sum(float(v) for k, v in limits.items() if cls._number_re.match(k))
except Exception as ex:
log.error("Failed summing up fractions from {}: {}".format(limits, ex))
return 0
@classmethod
def encode_fractions(cls, limits: dict) -> str:
if any(cls._number_re.match(x) for x in (limits or {})):
return ",".join(str(v) for k, v in sorted(limits.items()) if cls._number_re.match(k))
return ",".join(("{}:{}".format(k, v) for k, v in (limits or {}).items() if cls._mig_re.match(k)))
@staticmethod
def decode_fractions(fractions: str) -> Union[List[float], Dict[str, int]]:
try:
items = [f.strip() for f in fractions.strip().split(",")]
tuples = [(k.strip(), v.strip()) for k, v in (f.partition(":")[::2] for f in items)]
if all(not v for _, v in tuples):
# comma-separated float fractions
return [float(k) for k, _ in tuples]
# comma-separated slice:count items
return {
k.strip(): int(v.strip())
for k, v in tuples
}
except Exception as ex:
log.error("Failed decoding GPU fractions '{}': {}".format(fractions, ex))
return {}
@staticmethod
def _get_gpu_names():
# noinspection PyBroadException
try:
gpus = gpustat.new_query().gpus
names = [g["name"] for g in gpus]
print("GPU names: {}".format(names))
return names
except Exception as ex:
log.error("Failed getting GPU names: {}".format(ex))
def report_error(ex):