diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index 396ff1e..40e3c63 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -1396,7 +1396,7 @@ class Worker(ServiceCommandSection): def _setup_dynamic_gpus(self, gpu_queues): available_gpus = self.get_runtime_properties() if available_gpus is None: - raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server") + raise ValueError("Dynamic GPU allocation is not supported by your ClearML-server") available_gpus = [prop["value"] for prop in available_gpus if prop["key"] == 'available_gpus'] if available_gpus: gpus = [] @@ -1413,7 +1413,9 @@ class Worker(ServiceCommandSection): if not self.set_runtime_properties( key='available_gpus', value=','.join(str(g) for g in available_gpus)): - raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server") + raise ValueError("Dynamic GPU allocation is not supported by your ClearML-server") + + self.cluster_report_monitor(available_gpus=available_gpus, gpu_queues=gpu_queues) return available_gpus, gpu_queues @@ -1809,7 +1811,7 @@ class Worker(ServiceCommandSection): available_gpus = self._dynamic_gpu_get_available(gpu_indexes) if not self.set_runtime_properties( key='available_gpus', value=','.join(str(g) for g in available_gpus)): - raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server") + raise ValueError("Dynamic GPU allocation is not supported by your ClearML-server") def report_monitor(self, report): if not self.monitor: @@ -1818,6 +1820,13 @@ class Worker(ServiceCommandSection): self.monitor.set_report(report) self.monitor.send_report() + def cluster_report_monitor(self, available_gpus, gpu_queues): + if not self.monitor: + self.new_monitor() + self.monitor.setup_cluster_report( + worker_id=self.worker_id, available_gpus=available_gpus, gpu_queues=gpu_queues + ) + def stop_monitor(self): if self.monitor: self.monitor.stop() diff --git a/clearml_agent/helper/resource_monitor.py b/clearml_agent/helper/resource_monitor.py index c96a0a1..4936a13 100644 --- a/clearml_agent/helper/resource_monitor.py +++ b/clearml_agent/helper/resource_monitor.py @@ -7,7 +7,7 @@ from collections import deque from itertools import starmap from threading import Thread, Event from time import time -from typing import Text, Sequence +from typing import Text, Sequence, List, Dict, Optional import attr import psutil @@ -54,6 +54,14 @@ class ResourceMonitor(object): if value is not None } + @attr.s + class ClusterReport: + cluster_key = attr.ib(type=str) + max_gpus = attr.ib(type=int, default=None) + max_workers = attr.ib(type=int, default=None) + max_cpus = attr.ib(type=int, default=None) + resource_groups = attr.ib(type=Sequence[str], factory=list) + def __init__( self, session, # type: Session @@ -61,7 +69,7 @@ class ResourceMonitor(object): sample_frequency_per_sec=2.0, report_frequency_sec=30.0, first_report_sec=None, - worker_tags=None, + worker_tags=None ): self.session = session self.queue = deque(maxlen=1) @@ -92,6 +100,7 @@ class ResourceMonitor(object): else: # None means no filtering, report all gpus self._active_gpus = None + # noinspection PyBroadException try: active_gpus = Session.get_nvidia_visible_env() # None means no filtering, report all gpus @@ -99,6 +108,10 @@ class ResourceMonitor(object): self._active_gpus = [g.strip() for g in str(active_gpus).split(',')] except Exception: pass + self._cluster_report_interval_sec = int(session.config.get( + "agent.resource_monitoring.cluster_report_interval_sec", 60 + )) + self._cluster_report = None def set_report(self, report): # type: (ResourceMonitor.StatusReport) -> () @@ -130,6 +143,7 @@ class ResourceMonitor(object): ) log.debug("sending report: %s", report) + # noinspection PyBroadException try: self.session.get(service="workers", action="status_report", **report) except Exception: @@ -137,7 +151,76 @@ class ResourceMonitor(object): return False return True + def send_cluster_report(self) -> bool: + if not self.session.feature_set == "basic": + return False + + # noinspection PyBroadException + try: + properties = { + "max_cpus": self._cluster_report.max_cpus, + "max_gpus": self._cluster_report.max_gpus, + "max_workers": self._cluster_report.max_workers, + } + payload = { + "key": self._cluster_report.cluster_key, + "timestamp": int(time() * 1000), + "timeout": int(self._cluster_report_interval_sec * 2), + # "resource_groups": self._cluster_report.resource_groups, # yet to be supported + "properties": {k: v for k, v in properties.items() if v is not None}, + } + self.session.post(service="workers", action="cluster_report", **payload) + except Exception as ex: + log.warning("Failed sending cluster report: %s", ex) + return False + return True + + def setup_cluster_report(self, available_gpus, gpu_queues, worker_id=None, cluster_key=None, resource_groups=None): + # type: (List[int], Dict[str, int], Optional[str], Optional[str], Optional[List[str]]) -> () + """ + Set up a cluster report for the enterprise server dashboard feature. + If a worker_id is provided, cluster_key and resource_groups are inferred from it. + """ + if self.session.feature_set == "basic": + return + + if not worker_id and not cluster_key: + print("Error: cannot set up dashboard reporting - worker_id or cluster key are required") + return + + # noinspection PyBroadException + try: + if not cluster_key: + worker_id_parts = worker_id.split(":") + if len(worker_id_parts) < 3: + cluster_key = self.session.config.get("agent.resource_dashboard.default_cluster_name", "onprem") + resource_group = ":".join((cluster_key, worker_id_parts[0])) + print( + 'WARNING: your worker ID "{}" is not suitable for proper resource dashboard reporting, please ' + 'set up agent.worker_name to be at least two colon-separated parts (i.e. ":"). ' + 'Using "{}" as the resource dashboard category and "{}" as the resource group.'.format( + worker_id, cluster_key, resource_group + ) + ) + else: + cluster_key = worker_id_parts[0] + resource_group = ":".join((worker_id_parts[:2])) + + resource_groups = [resource_group] + + self._cluster_report = ResourceMonitor.ClusterReport( + cluster_key=cluster_key, + max_gpus=len(available_gpus), + max_workers=len(available_gpus) // min(x for x, _ in gpu_queues.values()), + resource_groups=resource_groups + ) + + self.send_cluster_report() + except Exception as ex: + print("Error: failed setting cluster report: {}".format(ex)) + def _daemon(self): + last_cluster_report = 0 seconds_since_started = 0 reported = 0 try: @@ -177,6 +260,15 @@ class ResourceMonitor(object): # count reported iterations reported += 1 + + if ( + self._cluster_report and + self._cluster_report_interval_sec + and time() - last_cluster_report > self._cluster_report_interval_sec + ): + if self.send_cluster_report(): + last_cluster_report = time() + except Exception as ex: log.exception("Error reporting monitoring info: %s", str(ex))