From 5c8675e43af0a5b0d4870429d73e8107bb9adfec Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 20 Apr 2021 18:11:16 +0300 Subject: [PATCH] Add support for dynamic gpus opportunistic scheduling (with min/max gpus per queue) --- clearml_agent/commands/worker.py | 39 +++++++++++++++++-------------- clearml_agent/interface/worker.py | 3 ++- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index a5c4806..ade177b 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -767,14 +767,13 @@ class Worker(ServiceCommandSection): break if not len(response.queue.entries): continue - # check if we have enough available gpus - if gpu_queues[queue] > len(available_gpus): + # check if we do not have enough available gpus + if gpu_queues[queue][0] > len(available_gpus): # not enough available_gpus, we should sleep and start over if self._daemon_foreground or worker_params.debug: - print("Not enough free GPUs {}/{}, sleeping for {:.1f} seconds".format( - len(available_gpus), gpu_queues[queue], self._polling_interval)) - sleep(self._polling_interval) - break + print("Not enough free GPUs for queue={} {}/{}".format( + queue, len(available_gpus), gpu_queues[queue][0])) + continue # get next task in queue try: @@ -813,8 +812,10 @@ class Worker(ServiceCommandSection): # the following is only executed in dynamic gpus mode if gpu_queues and gpu_queues.get(queue): # pick the first available GPUs - gpus = available_gpus[:gpu_queues.get(queue)] - available_gpus = available_gpus[gpu_queues.get(queue):] + # gpu_queues[queue] = (min_gpus, max_gpus) + # get as many gpus as possible with max_gpus as limit, the min is covered before + gpus = available_gpus[:gpu_queues.get(queue)[1]] + available_gpus = available_gpus[gpu_queues.get(queue)[1]:] self.set_runtime_properties( key='available_gpus', value=','.join(str(g) for g in available_gpus)) os.environ['CUDA_VISIBLE_DEVICES'] = \ @@ -1114,6 +1115,10 @@ class Worker(ServiceCommandSection): columns = ("id", "name", "tags") print("Listening to queues:") + if dynamic_gpus: + columns = ("id", "name", "tags", "gpus") + for q in queues_info: + q['gpus'] = str(dict(dynamic_gpus).get(q['id']) or '') print_table(queues_info, columns=columns, titles=columns) # register worker @@ -1134,7 +1139,7 @@ class Worker(ServiceCommandSection): # make sure we have CUDA set if we have --gpus if kwargs.get('gpus') and self._session.config.get('agent.cuda_version', None) in (None, 0, '0'): message = 'Running with GPUs but no CUDA version was detected!\n' \ - '\tSet OS environemnt CUDA_VERSION & CUDNN_VERSION to the correct version\n' \ + '\tSet OS environment CUDA_VERSION & CUDNN_VERSION to the correct version\n' \ '\tExample: export CUDA_VERSION=10.1 or (Windows: set CUDA_VERSION=10.1)' if is_conda(self._session.config): self._unregister(queues) @@ -1246,7 +1251,14 @@ class Worker(ServiceCommandSection): '--dynamic_gpus must be use with ' 'specific gpus for example "0-7" or "0,1,2,3"'.format(kwargs.get('gpus'))) - dynamic_gpus = [(s[:-1 - len(s.split('=')[-1])], int(s.split('=')[-1])) for s in queue_names] + dynamic_gpus = [] + for s in queue_names: + s_p = s.split('=') + name = s[:-1 - len(s_p[-1])] + min_max_g = int(s_p[-1].split('-')[0] or 1), int(s_p[-1].split('-')[-1]) + if min(min_max_g) <= 0: + raise ValueError("Parsing min/max number of gpus <= 0 is not allowed: \"{}\"".format(s)) + dynamic_gpus.append((name, min_max_g,)) queue_names = [q for q, _ in dynamic_gpus] # resolve queue ids dynamic_gpus_q = self._resolve_queue_names( @@ -1257,13 +1269,6 @@ class Worker(ServiceCommandSection): self._dynamic_gpus = True - # order queue priority based on the combination we have - dynamic_gpus = sorted( - dynamic_gpus, reverse=True, key=cmp_to_key( - lambda x, y: -1 if x[1] < y[1] or x[1] == y[1] and - dynamic_gpus_q.index(x[0]) > dynamic_gpus_q.index(y[0]) - else +1)) - return dynamic_gpus, gpu_indexes, queues def _register_dynamic_gpus(self, gpu_indexes): diff --git a/clearml_agent/interface/worker.py b/clearml_agent/interface/worker.py index 430fc59..da47ced 100644 --- a/clearml_agent/interface/worker.py +++ b/clearml_agent/interface/worker.py @@ -99,7 +99,8 @@ DAEMON_ARGS = dict({ '--dynamic-gpus': { 'help': 'Allow to dynamically allocate gpus based on queue properties, ' 'configure with \'--queues =\'.' - ' Example: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 single_gpu=1\'', + ' Example: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 single_gpu=1\'' + ' Example Opportunistic: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 max_quad_gpus=1-4 \'', 'action': 'store_true', }, '--uptime': {