mirror of
synced 2025-03-03 02:32:17 +00:00
Add --dynamic-gpus and limit in --services-mode
This commit is contained in:
@ -15,7 +15,7 @@ from collections import defaultdict
from copy import deepcopy
from datetime import datetime
from distutils.spawn import find_executable
from functools import partial
from functools import partial, cmp_to_key
from itertools import chain
from tempfile import mkdtemp, NamedTemporaryFile
from time import sleep, time
@ -26,6 +26,7 @@ import psutil
import six
from clearml_agent.backend_api.services import queues as queues_api
from clearml_agent.backend_api.services import tasks as tasks_api
from clearml_agent.backend_api.services import workers as workers_api
from pathlib2 import Path
from pyhocon import ConfigTree, ConfigFactory
from six.moves.urllib.parse import quote
@ -433,7 +434,7 @@ class Worker(ServiceCommandSection):
if kwargs.get('services_mode'):
kwargs['cpu_only'] = True
kwargs['docker'] = kwargs.get('docker') or []
# kwargs['docker'] = kwargs.get('docker') or []
kwargs['gpus'] = None
return kwargs
@ -551,7 +552,7 @@ class Worker(ServiceCommandSection):
print('Running Docker:\n{}\n'.format(str(cmd)))
cmd = worker_args.get_argv_for_command("execute") + (
'--full-monitoring' if self._services_mode else '--disable-monitoring',
@ -592,7 +593,11 @@ class Worker(ServiceCommandSection):
errors = temp_stderr_name and Path(temp_stderr_name).read_text()
if errors:
print("\nEncountered errors:\n\n{}\n".format(errors))
if status is None:
if status is None and self._services_mode:
"Service bootstrap completed: task '{}'".format(task_id)
elif status is None:
"DONE: Running task '{}' (user aborted)".format(task_id)
@ -612,29 +617,71 @@ class Worker(ServiceCommandSection):
def run_tasks_loop(self, queues, worker_params, priority_order=True):
def run_tasks_loop(self, queues, worker_params, priority_order=True, gpu_indexes=None, gpu_queues=None):
:summary: Pull and run tasks from queues.
:description: 1. Go through ``queues`` by order.
2. Try getting the next task for each and run the first one that returns.
3. Go to step 1
:param queues: IDs of queues to pull tasks from
:type queues: list of ``Text``
:param worker_params: Worker command line arguments
:type worker_params: ``clearml_agent.helper.process.WorkerParams``
:param priority_order: If True pull order in priority manner. always from the first
:param list(str) queues: IDs of queues to pull tasks from
:param worker_params worker_params: Worker command line arguments
:param bool priority_order: If True pull order in priority manner. always from the first
If False, pull from each queue once in a round robin manner
:type priority_order: bool
:param list gpu_indexes: list of gpu_indexes. Needs special backend support
:param list gpu_queues: list of pairs (queue_id, num_gpus). Needs special backend support
if not self._daemon_foreground:
print('Starting infinite task polling loop...')
_last_machine_update_ts = 0
if self._services_mode:
max_num_instances = int(self._services_mode)
max_num_instances = None
# store in runtime configuration,
if max_num_instances and not self.set_runtime_properties(key='max_num_instances', value=max_num_instances):
warning('Maximum number of service instance not supported, removing limit.')
max_num_instances = -1
# get current running instances
available_gpus = None
if gpu_indexes and gpu_queues:
available_gpus, gpu_queues = self._setup_dynamic_gpus(gpu_queues)
# multi instance support
self._services_mode = True
while True:
queue_tags = None
runtime_props = None
if max_num_instances and max_num_instances > 0:
# make sure we do not have too many instances to run
if len(Singleton.get_running_pids()) >= max_num_instances:
if self._daemon_foreground or worker_params.debug:
print("Reached max number of services, sleeping for {:.1f} seconds".format(
# update available gpus
if gpu_queues:
response = self._session.send_api(workers_api.GetAllRequest(last_seen=60))
except Exception:
# if something went wrong start over from the highest priority queue
available_gpus = self._dynamic_gpu_get_available(gpu_indexes)
# if something went wrong or we have no free gpus
# start over from the highest priority queue
if not available_gpus:
if self._daemon_foreground or worker_params.debug:
print("All GPUs allocated, sleeping for {:.1f} seconds".format(self._polling_interval))
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
@ -644,6 +691,25 @@ class Worker(ServiceCommandSection):
if not self.should_be_currently_active(queue_tags[queue], runtime_props):
if gpu_queues:
# peek into queue
# get next task in queue
response = self._session.send_api(queues_api.GetByIdRequest(queue=queue))
except Exception:
# if something went wrong start over from the highest priority queue
if not len(response.queue.entries):
# check if we have enough available gpus
if gpu_queues[queue] > len(available_gpus):
# not enough available_gpus, we should sleep and start over
if self._daemon_foreground or worker_params.debug:
print("Not enough free GPUs {}/{}, sleeping for {:.1f} seconds".format(
len(available_gpus), gpu_queues[queue], self._polling_interval))
# get next task in queue
response = self._session.send_api(
@ -674,13 +740,33 @@ class Worker(ServiceCommandSection):
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES')
worker_id = self.worker_id
if gpu_queues and gpu_queues.get(queue):
# pick the first available GPUs
gpus = available_gpus[:gpu_queues.get(queue)]
available_gpus = available_gpus[gpu_queues.get(queue):]
key='available_gpus', value=','.join(str(g) for g in available_gpus))
os.environ['CUDA_VISIBLE_DEVICES'] = \
os.environ['NVIDIA_VISIBLE_DEVICES'] = ','.join(str(g) for g in gpus)
self.worker_id = ':'.join(self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)])
lines=["task {} pulled from {} by worker {}\n".format(task_id, queue, self.worker_id)],
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
self.run_one_task(queue, task_id, worker_params)
if gpu_queues:
self.worker_id = worker_id
os.environ['CUDA_VISIBLE_DEVICES'] = \
os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus
queue_tags = None
@ -699,6 +785,39 @@ class Worker(ServiceCommandSection):
if self._session.config["agent.reload_config"]:
def _dynamic_gpu_get_available(self, gpu_indexes):
response = self._session.send_api(workers_api.GetAllRequest(last_seen=60))
except Exception:
return None
worker_name = self._session.config["agent.worker_name"] + ':'
our_workers = [
w.id for w in response.workers
if w.id.startswith(worker_name) and w.id != self.worker_id]
gpus = []
for w in our_workers:
gpus += [int(g) for g in w.split(':')[-1].lower().replace('gpu', '').split(',')]
available_gpus = list(set(gpu_indexes) - set(gpus))
return available_gpus
def _setup_dynamic_gpus(self, gpu_queues):
available_gpus = self.get_runtime_properties()
if available_gpus is None:
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
available_gpus = [prop["value"] for prop in available_gpus if prop["key"] == 'available_gpus']
if available_gpus:
available_gpus = [int(g) for g in available_gpus[-1].split(',')]
if not isinstance(gpu_queues, dict):
gpu_queues = dict(gpu_queues)
if not self.set_runtime_properties(
key='available_gpus', value=','.join(str(g) for g in available_gpus)):
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
return available_gpus, gpu_queues
def get_worker_properties(self, queue_ids):
queue_tags = {
q.id: {'name': q.name, 'tags': q.tags}
@ -714,11 +833,11 @@ class Worker(ServiceCommandSection):
# either not supported or never tested
if self._runtime_props_support == self._session.api_version:
# tested against latest api_version, not supported
return []
return None
if not self._session.check_min_api_version(UptimeConf.min_api_version):
# not supported due to insufficient api_version
self._runtime_props_support = self._session.api_version
return []
return None
res = self.get("get_runtime_properties", worker=self.worker_id)["runtime_properties"]
# definitely supported
@ -726,12 +845,40 @@ class Worker(ServiceCommandSection):
return res
except APIError:
self._runtime_props_support = self._session.api_version
return []
return None
def set_runtime_properties(self, key, value):
return True
if self._runtime_props_support is not True:
# either not supported or never tested
if self._runtime_props_support == self._session.api_version:
# tested against latest api_version, not supported
return False
if not self._session.check_min_api_version(UptimeConf.min_api_version):
# not supported due to insufficient api_version
self._runtime_props_support = self._session.api_version
return False
# noinspection PyBroadException
self.post("set_runtime_properties", worker=self.worker_id, json={key: value})
# definitely supported
self._runtime_props_support = True
return True
except APIError:
self._runtime_props_support = self._session.api_version
except Exception as ex:
# not sure what happened
return False
def should_be_currently_active(self, current_queue, runtime_properties):
Checks if a worker is active according to queue tags, worker's runtime properties and uptime schedule.
runtime_properties = runtime_properties or []
if UptimeConf.queue_tag_off in current_queue['tags']:
self.log.debug("Queue {} is tagged '{}', worker will not pull tasks".format(
current_queue['name'], UptimeConf.queue_tag_off)
@ -832,6 +979,9 @@ class Worker(ServiceCommandSection):
self._uptime_config = None
self._downtime_config = None
# support --dynamic-gpus
dynamic_gpus, gpu_indexes, queues = self._parse_dynamic_gpus(kwargs, queues)
# We are not running a daemon we are killing one.
# find the pid send termination signal and leave
if kwargs.get('stop', False):
@ -948,6 +1098,8 @@ class Worker(ServiceCommandSection):
priority_order=not order_fairness,
except Exception:
tb = six.text_type(traceback.format_exc())
@ -968,6 +1120,45 @@ class Worker(ServiceCommandSection):
def _parse_dynamic_gpus(self, kwargs, queues):
gpu_indexes = kwargs.get('gpus')
dynamic_gpus = kwargs.get('dynamic_gpus', None)
if dynamic_gpus:
# test gpus were passed correctly
if not gpu_indexes or len(gpu_indexes.split('-')) > 2 or (',' in gpu_indexes and '-' in gpu_indexes):
raise ValueError('--gpus must be provided, in one of two ways: '
'comma separated \'0,1,2,3\' or range \'0-3\'')
if '-' in gpu_indexes:
gpu_indexes = list(range(int(gpu_indexes.split('-')[0]), 1 + int(gpu_indexes.split('-')[1])))
gpu_indexes = [int(g) for g in gpu_indexes.split(',')]
except Exception:
raise ValueError('Failed parsing --gpus {}'.format(kwargs.get('gpus')))
dynamic_gpus = [(s[:-1 - len(s.split('=')[-1])], int(s.split('=')[-1])) for s in dynamic_gpus]
# resolve queue ids
dynamic_gpus_q = self._resolve_queue_names([q for q, _ in dynamic_gpus], create_if_missing=False)
dynamic_gpus = list(zip(dynamic_gpus_q, [i for _, i in dynamic_gpus]))
if set(dynamic_gpus_q) != set(queues):
raise ValueError(
"--dynamic-gpus [{}] and --queues [{}] must contain the same queues".format(
dynamic_gpus, queues))
dynamic_gpus = sorted(
dynamic_gpus, reverse=True, key=cmp_to_key(
lambda x, y: -1 if x[1] < y[1] or x[1] == y[1] and queues.index(x[0]) > queues.index(y[0])
else +1))
# order queue priority based on the combination we have
queues = [q for q, _ in dynamic_gpus]
# test server support
available_gpus = self._dynamic_gpu_get_available(gpu_indexes)
if not self.set_runtime_properties(
key='available_gpus', value=','.join(str(g) for g in available_gpus)):
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
return dynamic_gpus, gpu_indexes, queues
def report_monitor(self, report):
if not self.monitor:
@ -1438,6 +1629,11 @@ class Worker(ServiceCommandSection):
# We expect the same behaviour in case full_monitoring was set, and in case docker mode is used
if full_monitoring or docker is not False:
if full_monitoring and not (ENV_WORKER_ID.get() or '').strip():
self._session.config["agent"]["worker_id"] = ''
# make sure we support multiple instances if we need to
worker_params = WorkerParams(
@ -2684,15 +2880,15 @@ class Worker(ServiceCommandSection):
queues = return_list(queues)
if not create_if_missing:
return [self._resolve_name(q.name, "queues") for q in queues]
return [self._resolve_name(q if isinstance(q, str) else q.name, "queues") for q in queues]
queue_ids = []
for q in queues:
q_id = self._resolve_name(q.name, "queues")
q_id = self._resolve_name(q if isinstance(q, str) else q.name, "queues")
q_id = self._resolve_name(q.name, "queues")
self._session.send_api(queues_api.CreateRequest(name=q if isinstance(q, str) else q.name))
q_id = self._resolve_name(q if isinstance(q, str) else q.name, "queues")
return queue_ids
@ -129,8 +129,9 @@ def get_uptime_string(entry):
def get_runtime_properties_string(runtime_properties):
# type: (List[dict]) -> Tuple[Optional[str], str]
# type: (Optional[List[dict]]) -> Tuple[Optional[str], str]
server_string = []
runtime_properties = runtime_properties or []
force_flag = next(
(prop for prop in runtime_properties if prop["key"] == UptimeConf.worker_key),
@ -78,7 +78,10 @@ DAEMON_ARGS = dict({
'--services-mode': {
'help': 'Launch multiple long-term docker services. Implies docker & cpu-only flags.',
'action': 'store_true',
'nargs': '?',
'const': -1,
'type': int,
'default': None,
'--create-queue': {
'help': 'Create requested queue if it does not exist already.',
@ -93,6 +96,12 @@ DAEMON_ARGS = dict({
'help': 'Stop the running agent (based on the same set of arguments)',
'action': 'store_true',
'--dynamic-gpus': {
'help': 'Allow to dynamically allocate gpus based on queue properties, pass \'<queue_name>=<num_gpus>\'.'
' Example: \'dual_gpus=2 single_gpu=1\'',
'nargs': '*',
'default': None,
'--uptime': {
'help': 'Specify uptime for clearml-agent in "<hours> <days>" format. for example, use "17-20 TUE" to set '
'Tuesday\'s uptime to 17-20'
Reference in New Issue
Block a user