Fix --dynamic-gpus should keep original queue priority order

This commit is contained in:
allegroai 2021-03-31 23:55:12 +03:00
parent 14ac584577
commit 81edd2860f
2 changed files with 31 additions and 23 deletions

View File

@ -672,12 +672,6 @@ class Worker(ServiceCommandSection):
# update available gpus # update available gpus
if gpu_queues: if gpu_queues:
try:
response = self._session.send_api(workers_api.GetAllRequest(last_seen=60))
except Exception:
# if something went wrong start over from the highest priority queue
sleep(self._polling_interval)
continue
available_gpus = self._dynamic_gpu_get_available(gpu_indexes) available_gpus = self._dynamic_gpu_get_available(gpu_indexes)
# if something went wrong or we have no free gpus # if something went wrong or we have no free gpus
# start over from the highest priority queue # start over from the highest priority queue
@ -791,12 +785,13 @@ class Worker(ServiceCommandSection):
self.reload_config() self.reload_config()
def _dynamic_gpu_get_available(self, gpu_indexes): def _dynamic_gpu_get_available(self, gpu_indexes):
# noinspection PyBroadException
try: try:
response = self._session.send_api(workers_api.GetAllRequest(last_seen=60)) response = self._session.send_api(workers_api.GetAllRequest(last_seen=60))
except Exception: except Exception:
return None return None
worker_name = self._session.config["agent.worker_name"] + ':' worker_name = self._session.config["agent.worker_name"] + ':gpu'
our_workers = [ our_workers = [
w.id for w in response.workers w.id for w in response.workers
if w.id.startswith(worker_name) and w.id != self.worker_id] if w.id.startswith(worker_name) and w.id != self.worker_id]
@ -865,7 +860,10 @@ class Worker(ServiceCommandSection):
# noinspection PyBroadException # noinspection PyBroadException
try: try:
self.post("set_runtime_properties", worker=self.worker_id, json={key: value}) self.post("set_runtime_properties",
json={
'runtime_properties': [{'key': key, 'value': value}],
'worker': self.worker_id})
# definitely supported # definitely supported
self._runtime_props_support = True self._runtime_props_support = True
return True return True
@ -1010,7 +1008,10 @@ class Worker(ServiceCommandSection):
# make sure we only have a single instance, # make sure we only have a single instance,
# also make sure we set worker_id properly and cache folders # also make sure we set worker_id properly and cache folders
self._singleton(dynamic=bool(dynamic_gpus)) self._singleton(dynamic_gpus=bool(dynamic_gpus))
if dynamic_gpus:
self._register_dynamic_gpus(gpu_indexes)
# check if we have the latest version # check if we have the latest version
start_check_update_daemon() start_check_update_daemon()
@ -1148,7 +1149,10 @@ class Worker(ServiceCommandSection):
else: else:
gpu_indexes = [int(g) for g in gpu_indexes.split(',')] gpu_indexes = [int(g) for g in gpu_indexes.split(',')]
except Exception: except Exception:
raise ValueError('Failed parsing --gpus {}'.format(kwargs.get('gpus'))) raise ValueError(
'Failed parsing --gpus "{}". '
'--dynamic_gpus must be use with '
'specific gpus for example "0-7" or "0,1,2,3"'.format(kwargs.get('gpus')))
dynamic_gpus = [(s[:-1 - len(s.split('=')[-1])], int(s.split('=')[-1])) for s in queue_names] dynamic_gpus = [(s[:-1 - len(s.split('=')[-1])], int(s.split('=')[-1])) for s in queue_names]
queue_names = [q for q, _ in dynamic_gpus] queue_names = [q for q, _ in dynamic_gpus]
@ -1156,24 +1160,27 @@ class Worker(ServiceCommandSection):
dynamic_gpus_q = self._resolve_queue_names( dynamic_gpus_q = self._resolve_queue_names(
queue_names, create_if_missing=kwargs.get('create_queue', False)) queue_names, create_if_missing=kwargs.get('create_queue', False))
dynamic_gpus = list(zip(dynamic_gpus_q, [i for _, i in dynamic_gpus])) dynamic_gpus = list(zip(dynamic_gpus_q, [i for _, i in dynamic_gpus]))
# maintain original priority order
queues = [q for q, _ in dynamic_gpus]
self._dynamic_gpus = True
# order queue priority based on the combination we have
dynamic_gpus = sorted( dynamic_gpus = sorted(
dynamic_gpus, reverse=True, key=cmp_to_key( dynamic_gpus, reverse=True, key=cmp_to_key(
lambda x, y: -1 if x[1] < y[1] or x[1] == y[1] and lambda x, y: -1 if x[1] < y[1] or x[1] == y[1] and
dynamic_gpus_q.index(x[0]) > dynamic_gpus_q.index(y[0]) dynamic_gpus_q.index(x[0]) > dynamic_gpus_q.index(y[0])
else +1)) else +1))
# order queue priority based on the combination we have
queues = [q for q, _ in dynamic_gpus]
return dynamic_gpus, gpu_indexes, queues
def _register_dynamic_gpus(self, gpu_indexes):
# test server support # test server support
available_gpus = self._dynamic_gpu_get_available(gpu_indexes) available_gpus = self._dynamic_gpu_get_available(gpu_indexes)
if not self.set_runtime_properties( if not self.set_runtime_properties(
key='available_gpus', value=','.join(str(g) for g in available_gpus)): key='available_gpus', value=','.join(str(g) for g in available_gpus)):
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server") raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
self._dynamic_gpus = True
return dynamic_gpus, gpu_indexes, queues
def report_monitor(self, report): def report_monitor(self, report):
if not self.monitor: if not self.monitor:
self.new_monitor(report=report) self.new_monitor(report=report)
@ -2924,8 +2931,9 @@ class Worker(ServiceCommandSection):
return command, script_dir return command, script_dir
def _kill_daemon(self): def _kill_daemon(self, dynamic_gpus=False):
worker_id, worker_name = self._generate_worker_id_name() ## TODO kill dynamic agents with children
worker_id, worker_name = self._generate_worker_id_name(dynamic_gpus=dynamic_gpus)
# Iterate over all running process # Iterate over all running process
for pid, uid, slot, file in sorted(Singleton.get_running_pids(), key=lambda x: x[1] or ''): for pid, uid, slot, file in sorted(Singleton.get_running_pids(), key=lambda x: x[1] or ''):
# wither we have a match for the worker_id or we just pick the first one # wither we have a match for the worker_id or we just pick the first one
@ -2941,9 +2949,9 @@ class Worker(ServiceCommandSection):
worker_name, worker_id)) worker_name, worker_id))
return False return False
def _singleton(self, dynamic=False): def _singleton(self, dynamic_gpus=False):
# ensure singleton # ensure singleton
worker_id, worker_name = self._generate_worker_id_name(dynamic=dynamic) worker_id, worker_name = self._generate_worker_id_name(dynamic_gpus=dynamic_gpus)
# if we are running in services mode, we allow double register since # if we are running in services mode, we allow double register since
# docker-compose will kill instances before they cleanup # docker-compose will kill instances before they cleanup
@ -2958,14 +2966,14 @@ class Worker(ServiceCommandSection):
# update folders based on free slot # update folders based on free slot
self._session.create_cache_folders(slot_index=worker_slot) self._session.create_cache_folders(slot_index=worker_slot)
def _generate_worker_id_name(self, dynamic=False): def _generate_worker_id_name(self, dynamic_gpus=False):
worker_id = self._session.config["agent.worker_id"] worker_id = self._session.config["agent.worker_id"]
worker_name = self._session.config["agent.worker_name"] worker_name = self._session.config["agent.worker_name"]
if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None: if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None:
nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES') nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES')
if nvidia_visible_devices and nvidia_visible_devices.lower() != 'none': if nvidia_visible_devices and nvidia_visible_devices.lower() != 'none':
worker_id = '{}:{}gpu{}'.format( worker_id = '{}:{}gpu{}'.format(
worker_name, 'd' if dynamic else '', nvidia_visible_devices) worker_name, 'd' if dynamic_gpus else '', nvidia_visible_devices)
elif nvidia_visible_devices == '': elif nvidia_visible_devices == '':
pass pass
else: else:

View File

@ -99,7 +99,7 @@ DAEMON_ARGS = dict({
'--dynamic-gpus': { '--dynamic-gpus': {
'help': 'Allow to dynamically allocate gpus based on queue properties, ' 'help': 'Allow to dynamically allocate gpus based on queue properties, '
'configure with \'--queues <queue_name>=<num_gpus>\'.' 'configure with \'--queues <queue_name>=<num_gpus>\'.'
' Example: \'--dynamic-gpus --queue dual_gpus=2 single_gpu=1\'', ' Example: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 single_gpu=1\'',
'action': 'store_true', 'action': 'store_true',
}, },
'--uptime': { '--uptime': {