mirror of
synced 2025-03-16 08:17:20 +00:00
561 lines
22 KiB
561 lines
22 KiB
from __future__ import unicode_literals, division
import logging
import re
import shlex
from collections import deque
from itertools import starmap
from threading import Thread, Event
from time import time
from typing import Sequence, List, Union, Dict, Optional
import attr
import psutil
from pathlib2 import Path
from clearml_agent.definitions import ENV_WORKER_TAGS, ENV_GPU_FRACTIONS
from clearml_agent.session import Session
from .gpu import gpustat
except ImportError:
gpustat = None
log = logging.getLogger(__name__)
class BytesSizes(object):
def kilobytes(x):
# type: (float) -> float
return x / 1024
def megabytes(x):
# type: (float) -> float
return x / (1024*1024)
def gigabytes(x):
# type: (float) -> float
return x / (1024*1024*1024)
class ResourceMonitor(object):
class StatusReport(object):
task = attr.ib(default=None, type=str)
queue = attr.ib(default=None, type=str)
queues = attr.ib(default=None, type=Sequence[str])
def to_dict(self):
return {
key: value
for key, value in attr.asdict(self).items()
if value is not None
class ClusterReport:
cluster_key = attr.ib(type=str)
max_gpus = attr.ib(type=int, default=None)
max_workers = attr.ib(type=int, default=None)
max_cpus = attr.ib(type=int, default=None)
resource_groups = attr.ib(type=Sequence[str], factory=list)
def __init__(
session, # type: Session
worker_id, # type: ResourceMonitor.StatusReport,
self.session = session
self.queue = deque(maxlen=1)
self._worker_id = worker_id
self._sample_frequency = sample_frequency_per_sec
self._report_frequency = report_frequency_sec
self._first_report_sec = first_report_sec or report_frequency_sec
self._num_readouts = 0
self._readouts = {}
self._previous_readouts = {}
self._previous_readouts_ts = time()
self._thread = None
self._exit_event = Event()
self._gpustat_fail = 0
self._gpustat = gpustat
self._active_gpus = None
self._default_gpu_utilization = session.config.get("agent.resource_monitoring.default_gpu_utilization", 100)
# allow default_gpu_utilization as null in the config, in which case we don't log anything
if self._default_gpu_utilization is not None:
self._default_gpu_utilization = int(self._default_gpu_utilization)
self._gpu_utilization_warning_sent = False
self._disk_use_path = str(session.config.get("agent.resource_monitoring.disk_use_path", None) or Path.home())
self._fractions_handler = GpuFractionsHandler() if session.feature_set != "basic" else None
if not worker_tags and ENV_WORKER_TAGS.get():
worker_tags = shlex.split(ENV_WORKER_TAGS.get())
self._worker_tags = worker_tags
if Session.get_nvidia_visible_env() == 'none':
# NVIDIA_VISIBLE_DEVICES set to none, marks cpu_only flag
# active_gpus == False means no GPU reporting
self._active_gpus = False
elif not self._gpustat:
log.warning('ClearML-Agent Resource Monitor: GPU monitoring is not available')
# None means no filtering, report all gpus
self._active_gpus = None
# noinspection PyBroadException
active_gpus = Session.get_nvidia_visible_env()
# None means no filtering, report all gpus
if active_gpus and active_gpus != "all":
self._active_gpus = [g.strip() for g in str(active_gpus).split(',')]
except Exception:
self._cluster_report_interval_sec = int(session.config.get(
"agent.resource_monitoring.cluster_report_interval_sec", 60
self._cluster_report = None
def set_report(self, report):
# type: (ResourceMonitor.StatusReport) -> ()
if report is not None:
def get_report(self):
# type: () -> ResourceMonitor.StatusReport
return self.queue[0]
def start(self):
self._thread = Thread(target=self._daemon)
self._thread.daemon = True
return self
def stop(self):
def send_report(self, stats=None):
report = dict(
timestamp=(int(time()) * 1000),
log.debug("sending report: %s", report)
# noinspection PyBroadException
self.session.get(service="workers", action="status_report", **report)
except Exception:
log.warning("Failed sending report: %s", report)
return False
return True
def send_cluster_report(self) -> bool:
if not self.session.feature_set == "basic":
return False
# noinspection PyBroadException
properties = {
"max_cpus": self._cluster_report.max_cpus,
"max_gpus": self._cluster_report.max_gpus,
"max_workers": self._cluster_report.max_workers,
payload = {
"key": self._cluster_report.cluster_key,
"timestamp": int(time() * 1000),
"timeout": int(self._cluster_report_interval_sec * 2),
# "resource_groups": self._cluster_report.resource_groups, # yet to be supported
"properties": {k: v for k, v in properties.items() if v is not None},
self.session.post(service="workers", action="cluster_report", **payload)
except Exception as ex:
log.warning("Failed sending cluster report: %s", ex)
return False
return True
def setup_cluster_report(self, available_gpus, gpu_queues, worker_id=None, cluster_key=None, resource_groups=None):
# type: (List[int], Dict[str, int], Optional[str], Optional[str], Optional[List[str]]) -> ()
Set up a cluster report for the enterprise server dashboard feature.
If a worker_id is provided, cluster_key and resource_groups are inferred from it.
if self.session.feature_set == "basic":
if not worker_id and not cluster_key:
print("Error: cannot set up dashboard reporting - worker_id or cluster key are required")
# noinspection PyBroadException
if not cluster_key:
worker_id_parts = worker_id.split(":")
if len(worker_id_parts) < 3:
cluster_key = self.session.config.get("agent.resource_dashboard.default_cluster_name", "onprem")
resource_group = ":".join((cluster_key, worker_id_parts[0]))
'WARNING: your worker ID "{}" is not suitable for proper resource dashboard reporting, please '
'set up agent.worker_name to be at least two colon-separated parts (i.e. "<category>:<name>"). '
'Using "{}" as the resource dashboard category and "{}" as the resource group.'.format(
worker_id, cluster_key, resource_group
cluster_key = worker_id_parts[0]
resource_group = ":".join((worker_id_parts[:2]))
resource_groups = [resource_group]
self._cluster_report = ResourceMonitor.ClusterReport(
max_workers=len(available_gpus) // min(x for x, _ in gpu_queues.values()),
except Exception as ex:
print("Error: failed setting cluster report: {}".format(ex))
def _daemon(self):
last_cluster_report = 0
seconds_since_started = 0
reported = 0
while True:
last_report = time()
current_report_frequency = (
self._report_frequency if reported != 0 else self._first_report_sec
while (time() - last_report) < current_report_frequency:
# wait for self._sample_frequency seconds, if event set quit
if self._exit_event.wait(1 / self._sample_frequency):
# noinspection PyBroadException
except Exception as ex:
log.error("failed getting machine stats: %s", report_error(ex))
seconds_since_started += int(round(time() - last_report))
# check if we do not report any metric (so it means the last iteration will not be changed)
# if we do not have last_iteration, we just use seconds as iteration
# start reporting only when we figured out, if this is seconds based, or iterations based
average_readouts = self._get_average_readouts()
stats = {
# 3 points after the dot
key: round(value, 3) if isinstance(value, float) else [round(v, 3) for v in value]
for key, value in average_readouts.items()
# send actual report
if self.send_report(stats):
# clear readouts if this is update was sent
# count reported iterations
reported += 1
if (
self._cluster_report and
and time() - last_cluster_report > self._cluster_report_interval_sec
if self.send_cluster_report():
last_cluster_report = time()
except Exception as ex:
log.exception("Error reporting monitoring info: %s", str(ex))
def _update_readouts(self):
readouts = self._machine_stats()
elapsed = time() - self._previous_readouts_ts
self._previous_readouts_ts = time()
def fix(k, v):
if k.endswith("_mbs"):
v = (v - self._previous_readouts.get(k, v)) / elapsed
if v is None:
v = 0
return k, self._readouts.get(k, 0) + v
self._readouts.update(starmap(fix, readouts.items()))
self._num_readouts += 1
self._previous_readouts = readouts
def _get_num_readouts(self):
return self._num_readouts
def _get_average_readouts(self):
def create_general_key(old_key):
Create key for backend payload
:param old_key: old stats key
:type old_key: str
:return: new key for sending stats
:rtype: str
key_parts = old_key.rpartition("_")
return "{}_*".format(key_parts[0] if old_key.startswith("gpu") else old_key)
ret = {}
# make sure the gpu/cpu stats are always ordered in the accumulated values list (general_key)
ordered_keys = sorted(self._readouts.keys())
for k in ordered_keys:
v = self._readouts[k]
stat_key = self.BACKEND_STAT_MAP.get(k)
if stat_key:
ret[stat_key] = v / self._num_readouts
general_key = create_general_key(k)
general_key = self.BACKEND_STAT_MAP.get(general_key)
if general_key:
ret.setdefault(general_key, []).append(v / self._num_readouts)
pass # log.debug("Cannot find key {}".format(k))
return ret
def _clear_readouts(self):
self._readouts = {}
self._num_readouts = 0
def _machine_stats(self):
:return: machine stats dictionary, all values expressed in megabytes
cpu_usage = psutil.cpu_percent(percpu=True)
stats = {"cpu_usage": sum(cpu_usage) / len(cpu_usage)}
virtual_memory = psutil.virtual_memory()
stats["memory_used"] = BytesSizes.megabytes(virtual_memory.used)
stats["memory_free"] = BytesSizes.megabytes(virtual_memory.available)
disk_use_percentage = psutil.disk_usage(self._disk_use_path).percent
stats["disk_free_percent"] = 100 - disk_use_percentage
sensor_stat = (
if hasattr(psutil, "sensors_temperatures")
else {}
if "coretemp" in sensor_stat and len(sensor_stat["coretemp"]):
stats["cpu_temperature"] = max([t.current for t in sensor_stat["coretemp"]])
# update cached measurements
net_stats = psutil.net_io_counters()
stats["network_tx_mbs"] = BytesSizes.megabytes(net_stats.bytes_sent)
stats["network_rx_mbs"] = BytesSizes.megabytes(net_stats.bytes_recv)
io_stats = psutil.disk_io_counters()
stats["io_read_mbs"] = BytesSizes.megabytes(io_stats.read_bytes)
stats["io_write_mbs"] = BytesSizes.megabytes(io_stats.write_bytes)
# check if we need to monitor gpus and if we can access the gpu statistics
if self._active_gpus is not False and self._gpustat:
gpu_stat = self._gpustat.new_query()
report_index = 0
for i, g in enumerate(gpu_stat.gpus):
# only monitor the active gpu's, if none were selected, monitor everything
if self._active_gpus:
uuid = getattr(g, "uuid", None)
mig_uuid = getattr(g, "mig_uuid", None)
if (
str(g.index) not in self._active_gpus
and (not uuid or uuid not in self._active_gpus)
and (not mig_uuid or mig_uuid not in self._active_gpus)
stats["gpu_temperature_{}".format(report_index)] = g["temperature.gpu"]
if g["utilization.gpu"] is not None:
stats["gpu_utilization_{}".format(report_index)] = g["utilization.gpu"]
elif self._default_gpu_utilization is not None:
stats["gpu_utilization_{}".format(report_index)] = self._default_gpu_utilization
if getattr(g, "mig_index", None) is None and not self._gpu_utilization_warning_sent:
# this shouldn't happen for non-MIGs, warn the user about it
log.error("Failed fetching GPU utilization")
self._gpu_utilization_warning_sent = True
stats["gpu_mem_usage_{}".format(report_index)] = (
100.0 * g["memory.used"] / g["memory.total"]
# already in MBs
stats["gpu_mem_free_{}".format(report_index)] = (
g["memory.total"] - g["memory.used"]
stats["gpu_mem_used_{}".format(report_index)] = g["memory.used"] or 0
if self._fractions_handler:
fractions = self._fractions_handler.fractions
stats["gpu_fraction_{}".format(report_index)] = \
(fractions[i] if i < len(fractions) else fractions[-1]) if fractions else 1.0
report_index += 1
except Exception as ex:
# something happened and we can't use gpu stats,
log.error("failed getting machine stats: %s", report_error(ex))
return stats
def _failure(self):
self._gpustat_fail += 1
if self._gpustat_fail >= 3:
"GPU monitoring failed getting GPU reading, switching off GPU monitoring"
self._gpustat = None
"cpu_usage_*": "cpu_usage",
"cpu_temperature_*": "cpu_temperature",
"disk_free_percent": "disk_free_home",
"io_read_mbs": "disk_read",
"io_write_mbs": "disk_write",
"network_tx_mbs": "network_tx",
"network_rx_mbs": "network_rx",
"memory_free": "memory_free",
"memory_used": "memory_used",
"gpu_temperature_*": "gpu_temperature",
"gpu_mem_used_*": "gpu_memory_used",
"gpu_mem_free_*": "gpu_memory_free",
"gpu_utilization_*": "gpu_usage",
"gpu_fraction_*": "gpu_fraction"
class GpuFractionsHandler:
_number_re = re.compile(r"^clear\.ml/fraction(-\d+)?$")
_mig_re = re.compile(r"^nvidia\.com/mig-(?P<compute>[0-9]+)g\.(?P<memory>[0-9]+)gb$")
_frac_gpu_injector_re = re.compile(r"^clearml-injector/fraction$")
_gpu_name_to_memory_gb = {
"A30": 24,
"NVIDIA A30": 24,
"A100-SXM4-40GB": 40,
"NVIDIA-A100-40GB-PCIe": 40,
"NVIDIA A100-40GB-PCIe": 40,
"NVIDIA-A100-SXM4-40GB": 40,
"NVIDIA A100-SXM4-40GB": 40,
"NVIDIA-A100-SXM4-80GB": 79,
"NVIDIA A100-SXM4-80GB": 79,
"NVIDIA-A100-80GB-PCIe": 79,
"NVIDIA A100-80GB-PCIe": 79,
def __init__(self):
self._total_memory_gb = [
self._gpu_name_to_memory_gb.get(name, 0)
for name in (self._get_gpu_names() or [])
self._fractions = self._get_fractions()
def fractions(self) -> List[float]:
return self._fractions
def _get_fractions(self) -> List[float]:
if not self._total_memory_gb:
# Can't compute
return [1.0]
fractions = (ENV_GPU_FRACTIONS.get() or "").strip()
if not fractions:
# No fractions
return [1.0]
decoded_fractions = self.decode_fractions(fractions)
if isinstance(decoded_fractions, list):
return decoded_fractions
totals = []
for i, (fraction, count) in enumerate(decoded_fractions.items()):
m = self._mig_re.match(fraction)
if not m:
total_gb = self._total_memory_gb[i] if i < len(self._total_memory_gb) else self._total_memory_gb[-1]
if not total_gb:
totals.append((int(m.group("memory")) * count) / total_gb)
except ValueError:
if not totals:
log.warning("Fractions count is empty for {}".format(fractions))
return [1.0]
return totals
def extract_custom_limits(cls, limits: dict):
for k, v in list((limits or {}).items()):
if cls._number_re.match(k):
limits.pop(k, None)
def get_simple_fractions_total(cls, limits: dict) -> float:
if any(cls._number_re.match(x) for x in limits):
return sum(float(v) for k, v in limits.items() if cls._number_re.match(k))
except Exception as ex:
log.error("Failed summing up fractions from {}: {}".format(limits, ex))
return 0
def encode_fractions(cls, limits: dict, annotations: dict) -> str:
if limits:
if any(cls._number_re.match(x) for x in (limits or {})):
return ",".join(str(v) for k, v in sorted(limits.items()) if cls._number_re.match(k))
return ",".join(("{}:{}".format(k, v) for k, v in (limits or {}).items() if cls._mig_re.match(k)))
elif annotations:
if any(cls._frac_gpu_injector_re.match(x) for x in (annotations or {})):
return ",".join(str(v) for k, v in sorted(annotations.items()) if cls._frac_gpu_injector_re.match(k))
def decode_fractions(fractions: str) -> Union[List[float], Dict[str, int]]:
items = [f.strip() for f in fractions.strip().split(",")]
tuples = [(k.strip(), v.strip()) for k, v in (f.partition(":")[::2] for f in items)]
if all(not v for _, v in tuples):
# comma-separated float fractions
return [float(k) for k, _ in tuples]
# comma-separated slice:count items
return {
k.strip(): int(v.strip())
for k, v in tuples
except Exception as ex:
log.error("Failed decoding GPU fractions '{}': {}".format(fractions, ex))
return {}
def _get_gpu_names():
# noinspection PyBroadException
gpus = gpustat.new_query().gpus
names = [g["name"] for g in gpus]
print("GPU names: {}".format(names))
return names
except Exception as ex:
log.error("Failed getting GPU names: {}".format(ex))
def report_error(ex):
return "{}: {}".format(type(ex).__name__, ex)