clearml-agent/clearml_agent/helper/resource_monitor.py

408 lines
16 KiB
Python
Raw Normal View History

2019-10-25 19:28:44 +00:00
from __future__ import unicode_literals, division
import logging
import os
import shlex
2019-10-25 19:28:44 +00:00
from collections import deque
from itertools import starmap
from threading import Thread, Event
from time import time
2024-03-17 17:06:57 +00:00
from typing import Text, Sequence, List, Dict, Optional
2019-10-25 19:28:44 +00:00
import attr
import psutil
from pathlib2 import Path
2020-12-22 21:00:57 +00:00
from clearml_agent.session import Session
from clearml_agent.definitions import ENV_WORKER_TAGS
2019-10-25 19:28:44 +00:00
try:
from .gpu import gpustat
except ImportError:
gpustat = None
log = logging.getLogger(__name__)
class BytesSizes(object):
@staticmethod
def kilobytes(x):
# type: (float) -> float
return x / 1024
@staticmethod
def megabytes(x):
# type: (float) -> float
return x / (1024*1024)
@staticmethod
def gigabytes(x):
# type: (float) -> float
return x / (1024*1024*1024)
class ResourceMonitor(object):
@attr.s
class StatusReport(object):
task = attr.ib(default=None, type=str)
queue = attr.ib(default=None, type=str)
queues = attr.ib(default=None, type=Sequence[str])
def to_dict(self):
return {
key: value
for key, value in attr.asdict(self).items()
if value is not None
}
2024-03-17 17:06:57 +00:00
@attr.s
class ClusterReport:
cluster_key = attr.ib(type=str)
max_gpus = attr.ib(type=int, default=None)
max_workers = attr.ib(type=int, default=None)
max_cpus = attr.ib(type=int, default=None)
resource_groups = attr.ib(type=Sequence[str], factory=list)
2019-10-25 19:28:44 +00:00
def __init__(
self,
session, # type: Session
worker_id, # type: ResourceMonitor.StatusReport,
sample_frequency_per_sec=2.0,
report_frequency_sec=30.0,
first_report_sec=None,
2024-03-17 17:06:57 +00:00
worker_tags=None
2019-10-25 19:28:44 +00:00
):
self.session = session
self.queue = deque(maxlen=1)
self.queue.appendleft(self.StatusReport())
self._worker_id = worker_id
self._sample_frequency = sample_frequency_per_sec
self._report_frequency = report_frequency_sec
self._first_report_sec = first_report_sec or report_frequency_sec
self._num_readouts = 0
self._readouts = {}
self._previous_readouts = {}
self._previous_readouts_ts = time()
self._thread = None
self._exit_event = Event()
self._gpustat_fail = 0
self._gpustat = gpustat
self._active_gpus = None
self._disk_use_path = str(session.config.get("agent.resource_monitoring.disk_use_path", None) or Path.home())
if not worker_tags and ENV_WORKER_TAGS.get():
worker_tags = shlex.split(ENV_WORKER_TAGS.get())
self._worker_tags = worker_tags
if Session.get_nvidia_visible_env() == 'none':
# NVIDIA_VISIBLE_DEVICES set to none, marks cpu_only flag
# active_gpus == False means no GPU reporting
self._active_gpus = False
elif not self._gpustat:
2020-12-22 21:00:57 +00:00
log.warning('ClearML-Agent Resource Monitor: GPU monitoring is not available')
2019-10-25 19:28:44 +00:00
else:
# None means no filtering, report all gpus
2019-10-25 19:28:44 +00:00
self._active_gpus = None
2024-03-17 17:06:57 +00:00
# noinspection PyBroadException
2019-10-25 19:28:44 +00:00
try:
active_gpus = Session.get_nvidia_visible_env()
# None means no filtering, report all gpus
if active_gpus and active_gpus != "all":
self._active_gpus = [g.strip() for g in str(active_gpus).split(',')]
2019-10-25 19:28:44 +00:00
except Exception:
pass
2024-03-17 17:06:57 +00:00
self._cluster_report_interval_sec = int(session.config.get(
"agent.resource_monitoring.cluster_report_interval_sec", 60
))
self._cluster_report = None
2019-10-25 19:28:44 +00:00
def set_report(self, report):
# type: (ResourceMonitor.StatusReport) -> ()
if report is not None:
self.queue.appendleft(report)
def get_report(self):
# type: () -> ResourceMonitor.StatusReport
return self.queue[0]
def start(self):
self._exit_event.clear()
self._thread = Thread(target=self._daemon)
self._thread.daemon = True
self._thread.start()
return self
def stop(self):
self._exit_event.set()
self.send_report()
def send_report(self, stats=None):
report = dict(
machine_stats=stats,
timestamp=(int(time()) * 1000),
worker=self._worker_id,
tags=self._worker_tags,
2019-10-25 19:28:44 +00:00
**self.get_report().to_dict()
)
log.debug("sending report: %s", report)
2024-03-17 17:06:57 +00:00
# noinspection PyBroadException
2019-10-25 19:28:44 +00:00
try:
self.session.get(service="workers", action="status_report", **report)
except Exception:
log.warning("Failed sending report: %s", report)
return False
return True
2024-03-17 17:06:57 +00:00
def send_cluster_report(self) -> bool:
if not self.session.feature_set == "basic":
return False
# noinspection PyBroadException
try:
properties = {
"max_cpus": self._cluster_report.max_cpus,
"max_gpus": self._cluster_report.max_gpus,
"max_workers": self._cluster_report.max_workers,
}
payload = {
"key": self._cluster_report.cluster_key,
"timestamp": int(time() * 1000),
"timeout": int(self._cluster_report_interval_sec * 2),
# "resource_groups": self._cluster_report.resource_groups, # yet to be supported
"properties": {k: v for k, v in properties.items() if v is not None},
}
self.session.post(service="workers", action="cluster_report", **payload)
except Exception as ex:
log.warning("Failed sending cluster report: %s", ex)
return False
return True
def setup_cluster_report(self, available_gpus, gpu_queues, worker_id=None, cluster_key=None, resource_groups=None):
# type: (List[int], Dict[str, int], Optional[str], Optional[str], Optional[List[str]]) -> ()
"""
Set up a cluster report for the enterprise server dashboard feature.
If a worker_id is provided, cluster_key and resource_groups are inferred from it.
"""
if self.session.feature_set == "basic":
return
if not worker_id and not cluster_key:
print("Error: cannot set up dashboard reporting - worker_id or cluster key are required")
return
# noinspection PyBroadException
try:
if not cluster_key:
worker_id_parts = worker_id.split(":")
if len(worker_id_parts) < 3:
cluster_key = self.session.config.get("agent.resource_dashboard.default_cluster_name", "onprem")
resource_group = ":".join((cluster_key, worker_id_parts[0]))
print(
'WARNING: your worker ID "{}" is not suitable for proper resource dashboard reporting, please '
'set up agent.worker_name to be at least two colon-separated parts (i.e. "<category>:<name>"). '
'Using "{}" as the resource dashboard category and "{}" as the resource group.'.format(
worker_id, cluster_key, resource_group
)
)
else:
cluster_key = worker_id_parts[0]
resource_group = ":".join((worker_id_parts[:2]))
resource_groups = [resource_group]
self._cluster_report = ResourceMonitor.ClusterReport(
cluster_key=cluster_key,
max_gpus=len(available_gpus),
max_workers=len(available_gpus) // min(x for x, _ in gpu_queues.values()),
resource_groups=resource_groups
)
self.send_cluster_report()
except Exception as ex:
print("Error: failed setting cluster report: {}".format(ex))
2019-10-25 19:28:44 +00:00
def _daemon(self):
2024-03-17 17:06:57 +00:00
last_cluster_report = 0
2019-10-25 19:28:44 +00:00
seconds_since_started = 0
reported = 0
try:
while True:
last_report = time()
current_report_frequency = (
self._report_frequency if reported != 0 else self._first_report_sec
)
while (time() - last_report) < current_report_frequency:
# wait for self._sample_frequency seconds, if event set quit
if self._exit_event.wait(1 / self._sample_frequency):
return
# noinspection PyBroadException
try:
self._update_readouts()
except Exception as ex:
log.warning("failed getting machine stats: %s", report_error(ex))
self._failure()
seconds_since_started += int(round(time() - last_report))
# check if we do not report any metric (so it means the last iteration will not be changed)
# if we do not have last_iteration, we just use seconds as iteration
# start reporting only when we figured out, if this is seconds based, or iterations based
average_readouts = self._get_average_readouts()
stats = {
# 3 points after the dot
key: round(value, 3) if isinstance(value, float) else [round(v, 3) for v in value]
for key, value in average_readouts.items()
}
# send actual report
if self.send_report(stats):
# clear readouts if this is update was sent
self._clear_readouts()
# count reported iterations
reported += 1
2024-03-17 17:06:57 +00:00
if (
self._cluster_report and
self._cluster_report_interval_sec
and time() - last_cluster_report > self._cluster_report_interval_sec
):
if self.send_cluster_report():
last_cluster_report = time()
except Exception as ex:
log.exception("Error reporting monitoring info: %s", str(ex))
2019-10-25 19:28:44 +00:00
def _update_readouts(self):
readouts = self._machine_stats()
elapsed = time() - self._previous_readouts_ts
self._previous_readouts_ts = time()
def fix(k, v):
if k.endswith("_mbs"):
v = (v - self._previous_readouts.get(k, v)) / elapsed
if v is None:
v = 0
return k, self._readouts.get(k, 0) + v
self._readouts.update(starmap(fix, readouts.items()))
self._num_readouts += 1
self._previous_readouts = readouts
def _get_num_readouts(self):
return self._num_readouts
def _get_average_readouts(self):
def create_general_key(old_key):
"""
Create key for backend payload
:param old_key: old stats key
:type old_key: str
:return: new key for sending stats
:rtype: str
"""
key_parts = old_key.rpartition("_")
return "{}_*".format(key_parts[0] if old_key.startswith("gpu") else old_key)
ret = {}
# make sure the gpu/cpu stats are always ordered in the accumulated values list (general_key)
ordered_keys = sorted(self._readouts.keys())
for k in ordered_keys:
v = self._readouts[k]
stat_key = self.BACKEND_STAT_MAP.get(k)
if stat_key:
ret[stat_key] = v / self._num_readouts
else:
general_key = create_general_key(k)
general_key = self.BACKEND_STAT_MAP.get(general_key)
if general_key:
ret.setdefault(general_key, []).append(v / self._num_readouts)
else:
pass # log.debug("Cannot find key {}".format(k))
return ret
def _clear_readouts(self):
self._readouts = {}
self._num_readouts = 0
def _machine_stats(self):
"""
:return: machine stats dictionary, all values expressed in megabytes
"""
cpu_usage = psutil.cpu_percent(percpu=True)
stats = {"cpu_usage": sum(cpu_usage) / len(cpu_usage)}
virtual_memory = psutil.virtual_memory()
stats["memory_used"] = BytesSizes.megabytes(virtual_memory.used)
stats["memory_free"] = BytesSizes.megabytes(virtual_memory.available)
disk_use_percentage = psutil.disk_usage(self._disk_use_path).percent
2019-10-25 19:28:44 +00:00
stats["disk_free_percent"] = 100 - disk_use_percentage
sensor_stat = (
psutil.sensors_temperatures()
if hasattr(psutil, "sensors_temperatures")
else {}
)
if "coretemp" in sensor_stat and len(sensor_stat["coretemp"]):
stats["cpu_temperature"] = max([t.current for t in sensor_stat["coretemp"]])
# update cached measurements
net_stats = psutil.net_io_counters()
stats["network_tx_mbs"] = BytesSizes.megabytes(net_stats.bytes_sent)
stats["network_rx_mbs"] = BytesSizes.megabytes(net_stats.bytes_recv)
io_stats = psutil.disk_io_counters()
stats["io_read_mbs"] = BytesSizes.megabytes(io_stats.read_bytes)
stats["io_write_mbs"] = BytesSizes.megabytes(io_stats.write_bytes)
# check if we need to monitor gpus and if we can access the gpu statistics
if self._active_gpus is not False and self._gpustat:
2019-10-25 19:28:44 +00:00
try:
gpu_stat = self._gpustat.new_query()
for i, g in enumerate(gpu_stat.gpus):
# only monitor the active gpu's, if none were selected, monitor everything
2023-12-20 15:47:19 +00:00
if self._active_gpus:
uuid = getattr(g, "uuid", None)
if str(i) not in self._active_gpus and (not uuid or uuid not in self._active_gpus):
continue
2019-10-25 19:28:44 +00:00
stats["gpu_temperature_{:d}".format(i)] = g["temperature.gpu"]
stats["gpu_utilization_{:d}".format(i)] = g["utilization.gpu"]
stats["gpu_mem_usage_{:d}".format(i)] = (
100.0 * g["memory.used"] / g["memory.total"]
)
# already in MBs
stats["gpu_mem_free_{:d}".format(i)] = (
g["memory.total"] - g["memory.used"]
)
stats["gpu_mem_used_%d" % i] = g["memory.used"]
except Exception as ex:
# something happened and we can't use gpu stats,
log.warning("failed getting machine stats: %s", report_error(ex))
self._failure()
return stats
def _failure(self):
self._gpustat_fail += 1
if self._gpustat_fail >= 3:
log.error(
"GPU monitoring failed getting GPU reading, switching off GPU monitoring"
)
self._gpustat = None
BACKEND_STAT_MAP = {"cpu_usage_*": "cpu_usage",
"cpu_temperature_*": "cpu_temperature",
"disk_free_percent": "disk_free_home",
"io_read_mbs": "disk_read",
"io_write_mbs": "disk_write",
"network_tx_mbs": "network_tx",
"network_rx_mbs": "network_rx",
"memory_free": "memory_free",
"memory_used": "memory_used",
"gpu_temperature_*": "gpu_temperature",
"gpu_mem_used_*": "gpu_memory_used",
"gpu_mem_free_*": "gpu_memory_free",
"gpu_utilization_*": "gpu_usage"}
def report_error(ex):
return "{}: {}".format(type(ex).__name__, ex)