Add support for dynamic gpus opportunistic scheduling (with min/max gpus per queue)

This commit is contained in:
allegroai 2021-04-20 18:11:16 +03:00
parent 60a58f6fad
commit 5c8675e43a
2 changed files with 24 additions and 18 deletions

View File

@ -767,14 +767,13 @@ class Worker(ServiceCommandSection):
break break
if not len(response.queue.entries): if not len(response.queue.entries):
continue continue
# check if we have enough available gpus # check if we do not have enough available gpus
if gpu_queues[queue] > len(available_gpus): if gpu_queues[queue][0] > len(available_gpus):
# not enough available_gpus, we should sleep and start over # not enough available_gpus, we should sleep and start over
if self._daemon_foreground or worker_params.debug: if self._daemon_foreground or worker_params.debug:
print("Not enough free GPUs {}/{}, sleeping for {:.1f} seconds".format( print("Not enough free GPUs for queue={} {}/{}".format(
len(available_gpus), gpu_queues[queue], self._polling_interval)) queue, len(available_gpus), gpu_queues[queue][0]))
sleep(self._polling_interval) continue
break
# get next task in queue # get next task in queue
try: try:
@ -813,8 +812,10 @@ class Worker(ServiceCommandSection):
# the following is only executed in dynamic gpus mode # the following is only executed in dynamic gpus mode
if gpu_queues and gpu_queues.get(queue): if gpu_queues and gpu_queues.get(queue):
# pick the first available GPUs # pick the first available GPUs
gpus = available_gpus[:gpu_queues.get(queue)] # gpu_queues[queue] = (min_gpus, max_gpus)
available_gpus = available_gpus[gpu_queues.get(queue):] # get as many gpus as possible with max_gpus as limit, the min is covered before
gpus = available_gpus[:gpu_queues.get(queue)[1]]
available_gpus = available_gpus[gpu_queues.get(queue)[1]:]
self.set_runtime_properties( self.set_runtime_properties(
key='available_gpus', value=','.join(str(g) for g in available_gpus)) key='available_gpus', value=','.join(str(g) for g in available_gpus))
os.environ['CUDA_VISIBLE_DEVICES'] = \ os.environ['CUDA_VISIBLE_DEVICES'] = \
@ -1114,6 +1115,10 @@ class Worker(ServiceCommandSection):
columns = ("id", "name", "tags") columns = ("id", "name", "tags")
print("Listening to queues:") print("Listening to queues:")
if dynamic_gpus:
columns = ("id", "name", "tags", "gpus")
for q in queues_info:
q['gpus'] = str(dict(dynamic_gpus).get(q['id']) or '')
print_table(queues_info, columns=columns, titles=columns) print_table(queues_info, columns=columns, titles=columns)
# register worker # register worker
@ -1134,7 +1139,7 @@ class Worker(ServiceCommandSection):
# make sure we have CUDA set if we have --gpus # make sure we have CUDA set if we have --gpus
if kwargs.get('gpus') and self._session.config.get('agent.cuda_version', None) in (None, 0, '0'): if kwargs.get('gpus') and self._session.config.get('agent.cuda_version', None) in (None, 0, '0'):
message = 'Running with GPUs but no CUDA version was detected!\n' \ message = 'Running with GPUs but no CUDA version was detected!\n' \
'\tSet OS environemnt CUDA_VERSION & CUDNN_VERSION to the correct version\n' \ '\tSet OS environment CUDA_VERSION & CUDNN_VERSION to the correct version\n' \
'\tExample: export CUDA_VERSION=10.1 or (Windows: set CUDA_VERSION=10.1)' '\tExample: export CUDA_VERSION=10.1 or (Windows: set CUDA_VERSION=10.1)'
if is_conda(self._session.config): if is_conda(self._session.config):
self._unregister(queues) self._unregister(queues)
@ -1246,7 +1251,14 @@ class Worker(ServiceCommandSection):
'--dynamic_gpus must be use with ' '--dynamic_gpus must be use with '
'specific gpus for example "0-7" or "0,1,2,3"'.format(kwargs.get('gpus'))) 'specific gpus for example "0-7" or "0,1,2,3"'.format(kwargs.get('gpus')))
dynamic_gpus = [(s[:-1 - len(s.split('=')[-1])], int(s.split('=')[-1])) for s in queue_names] dynamic_gpus = []
for s in queue_names:
s_p = s.split('=')
name = s[:-1 - len(s_p[-1])]
min_max_g = int(s_p[-1].split('-')[0] or 1), int(s_p[-1].split('-')[-1])
if min(min_max_g) <= 0:
raise ValueError("Parsing min/max number of gpus <= 0 is not allowed: \"{}\"".format(s))
dynamic_gpus.append((name, min_max_g,))
queue_names = [q for q, _ in dynamic_gpus] queue_names = [q for q, _ in dynamic_gpus]
# resolve queue ids # resolve queue ids
dynamic_gpus_q = self._resolve_queue_names( dynamic_gpus_q = self._resolve_queue_names(
@ -1257,13 +1269,6 @@ class Worker(ServiceCommandSection):
self._dynamic_gpus = True self._dynamic_gpus = True
# order queue priority based on the combination we have
dynamic_gpus = sorted(
dynamic_gpus, reverse=True, key=cmp_to_key(
lambda x, y: -1 if x[1] < y[1] or x[1] == y[1] and
dynamic_gpus_q.index(x[0]) > dynamic_gpus_q.index(y[0])
else +1))
return dynamic_gpus, gpu_indexes, queues return dynamic_gpus, gpu_indexes, queues
def _register_dynamic_gpus(self, gpu_indexes): def _register_dynamic_gpus(self, gpu_indexes):

View File

@ -99,7 +99,8 @@ DAEMON_ARGS = dict({
'--dynamic-gpus': { '--dynamic-gpus': {
'help': 'Allow to dynamically allocate gpus based on queue properties, ' 'help': 'Allow to dynamically allocate gpus based on queue properties, '
'configure with \'--queues <queue_name>=<num_gpus>\'.' 'configure with \'--queues <queue_name>=<num_gpus>\'.'
' Example: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 single_gpu=1\'', ' Example: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 single_gpu=1\''
' Example Opportunistic: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 max_quad_gpus=1-4 \'',
'action': 'store_true', 'action': 'store_true',
}, },
'--uptime': { '--uptime': {