diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index 752d4cb..89310ca 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -727,6 +727,7 @@ class Worker(ServiceCommandSection): self.is_venv_update = self._session.config.agent.venv_update.enabled self.poetry = PoetryConfig(self._session) self.docker_image_func = None + self._patch_docker_cmd_func = None self._docker_image = None self._docker_arguments = None PackageManager.set_pip_version(self._session.config.get("agent.package_manager.pip_version", None)) @@ -740,7 +741,7 @@ class Worker(ServiceCommandSection): self._services_mode = None self._impersonate_as_task_owner = None self._worker_tags = None - self._dynamic_gpus = None + self._dynamic_gpus = None # valid options, True/False, "fractional" self._force_current_version = None self._redirected_stdout_file_no = None self._uptime_config = self._session.config.get("agent.uptime", None) @@ -1050,6 +1051,10 @@ class Worker(ServiceCommandSection): session=task_session, ) + # patch the full docker cmd if needed, notice this is done post reporting + if self._patch_docker_cmd_func: + full_docker_cmd = self._patch_docker_cmd_func(full_docker_cmd) + cmd = Argv(*full_docker_cmd, display_argv=display_docker_command) print('Running Docker:\n{}\n'.format(str(cmd))) @@ -1195,14 +1200,15 @@ class Worker(ServiceCommandSection): # get current running instances available_gpus = None + allocated_gpus = {} dynamic_gpus_worker_id = None if gpu_indexes and gpu_queues: - available_gpus, gpu_queues = self._setup_dynamic_gpus(gpu_queues) + available_gpus, gpu_queues = self._setup_dynamic_gpus(gpu_queues, gpu_indexes) # multi instance support self._services_mode = True # last 64 tasks - list_task_gpus_ids = {} + dict_task_gpus_ids = {} # {str(gpu_indexes): task_id} try: while True: queue_tags = None @@ -1226,7 +1232,7 @@ class Worker(ServiceCommandSection): # update available gpus if gpu_queues: - available_gpus = self._dynamic_gpu_get_available(gpu_indexes) + available_gpus, allocated_gpus = self._dynamic_gpu_get_available(gpu_indexes) # if something went wrong, or we have no free gpus # start over from the highest priority queue if not available_gpus: @@ -1255,11 +1261,21 @@ class Worker(ServiceCommandSection): if not len(response.queue.entries): continue # check if we do not have enough available gpus - if gpu_queues[queue][0] > len(available_gpus): + # Notice that gpu_queues[queue] is (min_gpu, max_gpu) that we should allocate + # for a Task pulled from that queue, this means + # gpu_queues[queue][0] is the minimum number of GPUs we need + # and min_available_fract_gpu is the maximum number of fraction of GPU + # and max_available_gpus is the maximum number of full GPUs we have + min_available_fract_gpu = max([v for v in available_gpus.values()]) + max_available_gpus = sum([v for v in available_gpus.values() if v >= 1]) + if gpu_queues[queue][0] > max(float(max_available_gpus), min_available_fract_gpu): # not enough available_gpus, we should sleep and start over if self._daemon_foreground or worker_params.debug: print("Not enough free GPUs {}/{}, sleeping for {:.1f} seconds".format( - len(available_gpus), gpu_queues[queue][0], self._polling_interval)) + max(float(max_available_gpus), min_available_fract_gpu), + gpu_queues[queue][0], + self._polling_interval) + ) sleep(self._polling_interval) break @@ -1317,17 +1333,85 @@ class Worker(ServiceCommandSection): dynamic_gpus_worker_id = self.worker_id # the following is only executed in dynamic gpus mode if gpu_queues and gpu_queues.get(queue): + gpus = [] + fractions = [] # pick the first available GPUs # gpu_queues[queue] = (min_gpus, max_gpus) - # get as many gpus as possible with max_gpus as limit, the min is covered before - gpus = available_gpus[:gpu_queues.get(queue)[1]] - available_gpus = available_gpus[gpu_queues.get(queue)[1]:] + # first check if the max_gpus is larger equal to 1 (if so, allocate full GPUs) + if float(gpu_queues.get(queue)[1]) >= 1: + gpus = [g for g, v in available_gpus.items() if v >= 1] + if gpus: + # get as many gpus as possible with max_gpus as limit, the min is covered before + gpus = gpus[:int(gpu_queues.get(queue)[1])] + fractions = [1] * len(gpus) + # update available gpus + available_gpus = {g: v for g, v in available_gpus.items() if g not in gpus} + else: + # we assume the minimum was < 1 GPU, otherwise why are we here + pass + + # if this is under 1 GPU + if not gpus: + # find the GPU with availability that covers the minimum + _max_req_gpu = min(float(gpu_queues.get(queue)[1]), 1.) + _min_req_gpu = float(gpu_queues.get(queue)[0]) + _potential_gpus = { + g: (v - float(_max_req_gpu)) for g, v in available_gpus.items() + if v >= float(_min_req_gpu)} + # sort based on the least available that can fit the maximum + # find the first instance that is positive or 0 + _potential_gpus = sorted(_potential_gpus.items(), key=lambda a: a[1]) + gpus = [(g, v) for g, v in _potential_gpus if v >= 0] + if gpus: + available_gpus[gpus[0][0]] -= _max_req_gpu + gpus = [gpus[0][0]] + fractions = [_max_req_gpu] + else: + gpus = [_potential_gpus[-1][0]] + # this is where we need to decide on the actual granularity + # now it is hardcoded to 1/8th + _base_fract = 8 + avail_fract = int(float(available_gpus[_potential_gpus[-1][0]]) * _base_fract) + fractions = [avail_fract/float(avail_fract)] + available_gpus[_potential_gpus[-1][0]] -= fractions[0] + + try: + from clearml_agent_fractional_gpu import patch_docker_cmd_gpu_fraction # noqa + # new docker image func + self._patch_docker_cmd_func = lambda docker_cmd: ( + patch_docker_cmd_gpu_fraction(docker_cmd, gpu_fraction=fractions[0])) + except Exception: + print("Error! could not load clearml_agent_fractional_gpu module! " + "failed configuring fractional GPU support") + raise + self.set_runtime_properties( - key='available_gpus', value=','.join(str(g) for g in available_gpus)) + key='available_gpus', + value=','.join("{}_{}".format(g, str(f)[2:]) for g, f in available_gpus.items())) + + # this is where we set the fractions as well as gpus Session.set_nvidia_visible_env(gpus) - list_task_gpus_ids.update({str(g): task_id for g in gpus}) - self.worker_id = ':'.join( - self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)]) + + if fractions and min(fractions) < 1: + # we assume a single gpu in the list + gpu_idx_fract = ["{}.{}".format(g, str(f)[2:]) for g, f in zip(gpus, fractions)] + # check a new available unique name (id) for us + from string import ascii_lowercase + for x in ascii_lowercase: + if gpu_idx_fract[0]+x not in allocated_gpus: + gpu_idx_fract[0] = gpu_idx_fract[0]+x + break + + # add the new task + allocated_gpus[gpu_idx_fract[0]] = fractions[0] + dict_task_gpus_ids.update({str(g): task_id for g in gpu_idx_fract}) + self.worker_id = ':'.join( + self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpu_idx_fract)]) + else: + # update the task list + dict_task_gpus_ids.update({str(g): task_id for g in gpus}) + self.worker_id = ':'.join( + self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)]) self.send_logs( task_id=task_id, @@ -1338,10 +1422,14 @@ class Worker(ServiceCommandSection): self.run_one_task(queue, task_id, worker_params, task_session=task_session) + # restore back worker_id / GPUs if gpu_queues: self.worker_id = dynamic_gpus_worker_id Session.set_nvidia_visible_env(org_gpus) + # clear docker patching function (if exists + self._patch_docker_cmd_func = None + self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues)) queue_tags = None @@ -1362,7 +1450,7 @@ class Worker(ServiceCommandSection): finally: # if we are in dynamic gpus mode, shutdown all active runs if self.docker_image_func: - for t_id in set(list_task_gpus_ids.values()): + for t_id in set(dict_task_gpus_ids.values()): if shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(t_id)): self.handle_task_termination(task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped) else: @@ -1380,8 +1468,8 @@ class Worker(ServiceCommandSection): self._unregister() def _dynamic_gpu_get_available(self, gpu_indexes): - # cast to string - gpu_indexes = [str(g) for g in gpu_indexes] + # key: cast to string, value: 1 (i.e. gull GPU) + gpu_indexes = {str(g): 1 for g in gpu_indexes} # noinspection PyBroadException try: response = self._session.send_api(workers_api.GetAllRequest(last_seen=600)) @@ -1392,29 +1480,50 @@ class Worker(ServiceCommandSection): our_workers = [ w.id for w in response.workers if w.id.startswith(worker_name) and w.id != self.worker_id] - gpus = [] + gpus = {} + allocated_gpus = {} + gpu_pattern = re.compile(r"\d+[.]?\d*[a-z]?") + fract_gpu_pattern = re.compile(r"\d+[.]?\d*[a-z]+") for w in our_workers: for g in w.split(':')[-1].lower().replace('gpu', '').split(','): try: - # verify "int.int" - gpus += [str(g).strip()] if float(g.strip()) >= 0 else [] + # verify pattern "int.int" or "int.int[a-z]" + gpu_idx_name = g.strip() + if gpu_pattern.fullmatch(gpu_idx_name): + # check if this is a fraction + if fract_gpu_pattern.fullmatch(gpu_idx_name): + gpu_idx = gpu_idx_name.split(".")[0] + gpu_fract = float("0.{}".format(gpu_idx_name.split(".")[-1][:-1])) + # the entire gpu + gpus[gpu_idx] = gpus.get(gpu_idx, 0) + gpu_fract + # the gpu fraction uid eg 0.25a + allocated_gpus[gpu_idx_name] = gpu_fract + else: + # or a static MIG slice + gpus[gpu_idx_name] = 1 + allocated_gpus[gpu_idx_name] = 1 + else: + print("INFO: failed parsing fractional GPU '{}' - skipping".format(g)) except (ValueError, TypeError): print("INFO: failed parsing GPU int('{}') - skipping".format(g)) - available_gpus = list(set(gpu_indexes) - set(gpus)) - return available_gpus + # remove the GPUs we have workers running on + available_gpus = {g: (v - gpus.get(g, 0)) for g, v in gpu_indexes.items() if (v - gpus.get(g, 0)) > 0} + return available_gpus, allocated_gpus - def _setup_dynamic_gpus(self, gpu_queues): + def _setup_dynamic_gpus(self, gpu_queues, gpu_indexes): available_gpus = self.get_runtime_properties() if available_gpus is None: raise ValueError("Dynamic GPU allocation is not supported by your ClearML-server") available_gpus = [prop["value"] for prop in available_gpus if prop["key"] == 'available_gpus'] if available_gpus: - gpus = [] - for g in available_gpus[-1].split(','): + gpus = {} + for g_v in available_gpus[-1].split(','): + g, v = g_v.split("_") try: - # verify "int.int" - gpus += [str(g).strip()] if float(g.strip()) >= 0 else [] + # verify "int.int_float" + if float(g.strip()) >= 0: + gpus[g.strip()] = float("0."+v) except (ValueError, TypeError): print("INFO: failed parsing GPU int('{}') - skipping".format(g)) available_gpus = gpus @@ -1423,10 +1532,11 @@ class Worker(ServiceCommandSection): gpu_queues = dict(gpu_queues) if not self.set_runtime_properties( - key='available_gpus', value=','.join(str(g) for g in available_gpus)): + key='available_gpus', value=','.join("{}_{}".format(g, str(v)[2:]) for g, v in available_gpus.items())): raise ValueError("Dynamic GPU allocation is not supported by your ClearML-server") - self.cluster_report_monitor(available_gpus=available_gpus, gpu_queues=gpu_queues) + # because it sets the MAX not the actual available (i.e. free) GPUS + self.cluster_report_monitor(available_gpus=gpu_indexes, gpu_queues=gpu_queues) return available_gpus, gpu_queues @@ -1606,6 +1716,10 @@ class Worker(ServiceCommandSection): if self._services_mode and dynamic_gpus: raise ValueError("Combining --dynamic-gpus and --services-mode is not supported") + if self._dynamic_gpus == "fractional" and not docker: + raise ValueError("Fractional GPUs are only supported in docker-mode, " + "add --docker to allow docker-mode operation") + # We are not running a daemon we are killing one. # find the pid send termination signal and leave if kwargs.get('stop', False) is not False: @@ -1774,6 +1888,7 @@ class Worker(ServiceCommandSection): if not dynamic_gpus: return None, None, queues + has_fractional = False queue_names = [q.name for q in queues] if not all('=' in q for q in queue_names): raise ValueError("using --dynamic-gpus, --queue [{}], " @@ -1802,9 +1917,12 @@ class Worker(ServiceCommandSection): for s in queue_names: s_p = s.split('=') name = s[:-1 - len(s_p[-1])] - min_max_g = int(s_p[-1].split('-')[0] or 1), int(s_p[-1].split('-')[-1]) + min_max_g = float(s_p[-1].split('-')[0] or 1), float(s_p[-1].split('-')[-1]) if min(min_max_g) <= 0: raise ValueError("Parsing min/max number of gpus <= 0 is not allowed: \"{}\"".format(s)) + if any(g for g in min_max_g if 1 < g != int(g)): + raise ValueError("Parsing min/max number of gpus, fractional gpu cannot be > 1: \"{}\"".format(s)) + has_fractional = min(min_max_g) < 1 dynamic_gpus.append((name, min_max_g,)) queue_names = [q for q, _ in dynamic_gpus] # resolve queue ids @@ -1814,15 +1932,16 @@ class Worker(ServiceCommandSection): # maintain original priority order queues = [q for q, _ in dynamic_gpus] - self._dynamic_gpus = True + self._dynamic_gpus = "fractional" if has_fractional else True return dynamic_gpus, gpu_indexes, queues def _register_dynamic_gpus(self, gpu_indexes): # test server support - available_gpus = self._dynamic_gpu_get_available(gpu_indexes) + available_gpus, allocated_gpus = self._dynamic_gpu_get_available(gpu_indexes) if not self.set_runtime_properties( - key='available_gpus', value=','.join(str(g) for g in available_gpus)): + key='available_gpus', + value=','.join("{}_{}".format(g, str(v)[2:]) for g, v in available_gpus.items())): raise ValueError("Dynamic GPU allocation is not supported by your ClearML-server") def report_monitor(self, report): @@ -2484,13 +2603,15 @@ class Worker(ServiceCommandSection): raise ValueError( "Execution required enqueued task, but task id={} is not queued.".format(current_task.id) ) - # Set task status to started to prevent any external monitoring from killing it - self._session.api_client.tasks.started( - task=current_task.id, - status_reason="starting execution soon", - status_message="", - force=True, - ) + # only force set started if we actually dequeued it (which would have changed the state) + if res.ok and res.json().get("data", {}).get("updated", 0): + # Set task status to started to prevent any external monitoring from killing it + self._session.api_client.tasks.started( + task=current_task.id, + status_reason="starting execution soon", + status_message="", + force=True, + ) except Exception: if require_queue: raise