From ec9d02767865ec3d42dcb5d713be03646f4d1a07 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Mon, 1 Aug 2022 18:58:42 +0300 Subject: [PATCH] Add support for MIG devices, use 0:1 for GPU 0 slice 1 (or use 0.1) --- clearml_agent/commands/worker.py | 47 +++++++++++++++--------- clearml_agent/helper/resource_monitor.py | 9 ++--- clearml_agent/session.py | 21 ++++++++++- 3 files changed, 52 insertions(+), 25 deletions(-) diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index e8e179b..e3b196b 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -555,6 +555,7 @@ class Worker(ServiceCommandSection): self.worker_id = self._session.config["agent.worker_id"] or "{}:{}".format( self._session.config["agent.worker_name"], os.getpid() ) + self.parent_worker_id = None # maybe add os env for overriding self._last_stats = defaultdict(lambda: 0) self._last_report_timestamp = psutil.time.time() self.temp_config_path = None @@ -943,7 +944,7 @@ class Worker(ServiceCommandSection): # update available gpus if gpu_queues: available_gpus = self._dynamic_gpu_get_available(gpu_indexes) - # if something went wrong or we have no free gpus + # if something went wrong, or we have no free gpus # start over from the highest priority queue if not available_gpus: if self._daemon_foreground or worker_params.debug: @@ -1029,7 +1030,7 @@ class Worker(ServiceCommandSection): self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id)) - org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES') + org_gpus = Session.get_nvidia_visible_env() dynamic_gpus_worker_id = self.worker_id # the following is only executed in dynamic gpus mode if gpu_queues and gpu_queues.get(queue): @@ -1040,10 +1041,10 @@ class Worker(ServiceCommandSection): available_gpus = available_gpus[gpu_queues.get(queue)[1]:] self.set_runtime_properties( key='available_gpus', value=','.join(str(g) for g in available_gpus)) - os.environ['CUDA_VISIBLE_DEVICES'] = \ - os.environ['NVIDIA_VISIBLE_DEVICES'] = ','.join(str(g) for g in gpus) + Session.set_nvidia_visible_env(gpus) list_task_gpus_ids.update({str(g): task_id for g in gpus}) - self.worker_id = ':'.join(self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)]) + self.worker_id = ':'.join( + self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)]) self.send_logs( task_id=task_id, @@ -1056,8 +1057,7 @@ class Worker(ServiceCommandSection): if gpu_queues: self.worker_id = dynamic_gpus_worker_id - os.environ['CUDA_VISIBLE_DEVICES'] = \ - os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus + Session.set_nvidia_visible_env(org_gpus) self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues)) @@ -1065,7 +1065,7 @@ class Worker(ServiceCommandSection): runtime_props = None # if we are using priority start pulling from the first always, - # if we are doing round robin, pull from the next one + # if we are doing roundrobin, pull from the next one if priority_order: break else: @@ -1097,6 +1097,8 @@ class Worker(ServiceCommandSection): self._unregister() def _dynamic_gpu_get_available(self, gpu_indexes): + # cast to string + gpu_indexes = [str(g) for g in gpu_indexes] # noinspection PyBroadException try: response = self._session.send_api(workers_api.GetAllRequest(last_seen=600)) @@ -1111,7 +1113,8 @@ class Worker(ServiceCommandSection): for w in our_workers: for g in w.split(':')[-1].lower().replace('gpu', '').split(','): try: - gpus += [int(g.strip())] + # verify "int.int" + gpus += [str(g).strip()] if float(g.strip()) >= 0 else [] except (ValueError, TypeError): print("INFO: failed parsing GPU int('{}') - skipping".format(g)) available_gpus = list(set(gpu_indexes) - set(gpus)) @@ -1127,10 +1130,12 @@ class Worker(ServiceCommandSection): gpus = [] for g in available_gpus[-1].split(','): try: - gpus += [int(g.strip())] + # verify "int.int" + gpus += [str(g).strip()] if float(g.strip()) >= 0 else [] except (ValueError, TypeError): print("INFO: failed parsing GPU int('{}') - skipping".format(g)) available_gpus = gpus + if not isinstance(gpu_queues, dict): gpu_queues = dict(gpu_queues) @@ -1487,12 +1492,14 @@ class Worker(ServiceCommandSection): if '-' in gpu_indexes: gpu_indexes = list(range(int(gpu_indexes.split('-')[0]), 1 + int(gpu_indexes.split('-')[1]))) else: - gpu_indexes = [int(g) for g in gpu_indexes.split(',')] + gpu_indexes = [str(g).replace(":", ".").strip() for g in gpu_indexes.split(',')] + # verify (basically numbers with single "." dot) + gpu_indexes = [str(g) for g in gpu_indexes if float(g) >= 0] except Exception: raise ValueError( 'Failed parsing --gpus "{}". ' '--dynamic_gpus must be use with ' - 'specific gpus for example "0-7" or "0,1,2,3"'.format(kwargs.get('gpus'))) + 'specific gpus for example "0-7" or "0,1,2,3" or "0:0,0:1,1:0,1:1"'.format(kwargs.get('gpus'))) dynamic_gpus = [] for s in queue_names: @@ -3433,7 +3440,7 @@ class Worker(ServiceCommandSection): docker_cmd = dict( worker_id=self.worker_id, - parent_worker_id=self.worker_id, + parent_worker_id=self.parent_worker_id or self.worker_id, # docker_image=docker_image, # docker_arguments=docker_arguments, extra_docker_arguments=self._extra_docker_arguments, @@ -3516,10 +3523,10 @@ class Worker(ServiceCommandSection): base_cmd = [docker, 'run', '-t'] update_scheme = "" dockers_nvidia_visible_devices = 'all' - gpu_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES', None) + gpu_devices = Session.get_nvidia_visible_env() if gpu_devices is None or gpu_devices.lower().strip() == 'all': if ENV_DOCKER_SKIP_GPUS_FLAG.get(): - dockers_nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES') or \ + dockers_nvidia_visible_devices = Session.get_nvidia_visible_env() or \ dockers_nvidia_visible_devices else: base_cmd += ['--gpus', 'all', ] @@ -3527,7 +3534,8 @@ class Worker(ServiceCommandSection): if ENV_DOCKER_SKIP_GPUS_FLAG.get(): dockers_nvidia_visible_devices = gpu_devices else: - base_cmd += ['--gpus', '\"device={}\"'.format(gpu_devices), ] + # replace back "." to ":" MIG support + base_cmd += ['--gpus', '\"device={}\"'.format(gpu_devices.replace(".", ":")), ] # We are using --gpu, so we should not pass NVIDIA_VISIBLE_DEVICES, I think. # base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES=' + gpu_devices, ] elif gpu_devices.strip() == 'none': @@ -3844,6 +3852,9 @@ class Worker(ServiceCommandSection): unique_worker_id=worker_id, worker_name=worker_name, api_client=self._session.api_client, allow_double=bool(ENV_DOCKER_HOST_MOUNT.get()) # and bool(self._services_mode), ) + # set the parent ID the first time we have a worker ID (it might change for services-mode / dgpus) + if not self.parent_worker_id: + self.parent_worker_id = self.worker_id if self.worker_id is None: error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id)) @@ -3854,8 +3865,8 @@ class Worker(ServiceCommandSection): def _generate_worker_id_name(self, dynamic_gpus=False): worker_id = self._session.config["agent.worker_id"] worker_name = self._session.config["agent.worker_name"] - if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None: - nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES') + if not worker_id and Session.get_nvidia_visible_env() is not None: + nvidia_visible_devices = Session.get_nvidia_visible_env() if nvidia_visible_devices and nvidia_visible_devices.lower() != 'none': worker_id = '{}:{}gpu{}'.format( worker_name, 'd' if dynamic_gpus else '', nvidia_visible_devices) diff --git a/clearml_agent/helper/resource_monitor.py b/clearml_agent/helper/resource_monitor.py index 3d8f34f..00ae111 100644 --- a/clearml_agent/helper/resource_monitor.py +++ b/clearml_agent/helper/resource_monitor.py @@ -82,7 +82,7 @@ class ResourceMonitor(object): if not worker_tags and ENV_WORKER_TAGS.get(): worker_tags = shlex.split(ENV_WORKER_TAGS.get()) self._worker_tags = worker_tags - if os.environ.get('NVIDIA_VISIBLE_DEVICES') == 'none': + if Session.get_nvidia_visible_env() == 'none': # NVIDIA_VISIBLE_DEVICES set to none, marks cpu_only flag # active_gpus == False means no GPU reporting self._active_gpus = False @@ -92,10 +92,9 @@ class ResourceMonitor(object): # None means no filtering, report all gpus self._active_gpus = None try: - active_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES', '') or \ - os.environ.get('CUDA_VISIBLE_DEVICES', '') + active_gpus = Session.get_nvidia_visible_env() or "" if active_gpus: - self._active_gpus = [int(g.strip()) for g in active_gpus.split(',')] + self._active_gpus = [g.strip() for g in active_gpus.split(',')] except Exception: pass @@ -263,7 +262,7 @@ class ResourceMonitor(object): gpu_stat = self._gpustat.new_query() for i, g in enumerate(gpu_stat.gpus): # only monitor the active gpu's, if none were selected, monitor everything - if self._active_gpus and i not in self._active_gpus: + if self._active_gpus and str(i) not in self._active_gpus: continue stats["gpu_temperature_{:d}".format(i)] = g["temperature.gpu"] stats["gpu_utilization_{:d}".format(i)] = g["utilization.gpu"] diff --git a/clearml_agent/session.py b/clearml_agent/session.py index 03c5746..f813cca 100644 --- a/clearml_agent/session.py +++ b/clearml_agent/session.py @@ -76,7 +76,7 @@ class Session(_Session): cpu_only = kwargs.get('cpu_only') if cpu_only: - os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = 'none' + Session.set_nvidia_visible_env('none') if kwargs.get('gpus') and not os.environ.get('KUBERNETES_SERVICE_HOST') \ and not os.environ.get('KUBERNETES_PORT'): @@ -85,7 +85,7 @@ class Session(_Session): os.environ.pop('CUDA_VISIBLE_DEVICES', None) os.environ['NVIDIA_VISIBLE_DEVICES'] = kwargs.get('gpus') else: - os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = kwargs.get('gpus') + Session.set_nvidia_visible_env(kwargs.get('gpus')) if kwargs.get('only_load_config'): from clearml_agent.backend_api.config import load @@ -327,6 +327,23 @@ class Session(_Session): def command(self, *args): return Argv(*args, log=self.get_logger(Argv.__module__)) + @staticmethod + def set_nvidia_visible_env(gpus): + if not gpus: + gpus = "" + visible_env = gpus.replace(".", ":") if isinstance(gpus, str) else \ + ','.join(str(g).replace(".", ":") for g in gpus) + + os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = visible_env + + @staticmethod + def get_nvidia_visible_env(): + visible_env = os.environ.get('NVIDIA_VISIBLE_DEVICES') or os.environ.get('CUDA_VISIBLE_DEVICES') + if visible_env is None: + return None + visible_env = str(visible_env).replace(":", ".") + return visible_env + @attr.s class TrainsAgentLogger(object):