From 410cc8c7bec82e303d11ff0aa9b2d1c205c1452a Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 11 Feb 2021 14:46:37 +0200 Subject: [PATCH] Add --dynamic-gpus and limit in --services-mode --- clearml_agent/commands/worker.py | 234 +++++++++++++++++-- clearml_agent/helper/runtime_verification.py | 3 +- clearml_agent/interface/worker.py | 11 +- 3 files changed, 227 insertions(+), 21 deletions(-) diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index 6b91668..566a030 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -15,7 +15,7 @@ from collections import defaultdict from copy import deepcopy from datetime import datetime from distutils.spawn import find_executable -from functools import partial +from functools import partial, cmp_to_key from itertools import chain from tempfile import mkdtemp, NamedTemporaryFile from time import sleep, time @@ -26,6 +26,7 @@ import psutil import six from clearml_agent.backend_api.services import queues as queues_api from clearml_agent.backend_api.services import tasks as tasks_api +from clearml_agent.backend_api.services import workers as workers_api from pathlib2 import Path from pyhocon import ConfigTree, ConfigFactory from six.moves.urllib.parse import quote @@ -433,7 +434,7 @@ class Worker(ServiceCommandSection): """ if kwargs.get('services_mode'): kwargs['cpu_only'] = True - kwargs['docker'] = kwargs.get('docker') or [] + # kwargs['docker'] = kwargs.get('docker') or [] kwargs['gpus'] = None return kwargs @@ -551,7 +552,7 @@ class Worker(ServiceCommandSection): print('Running Docker:\n{}\n'.format(str(cmd))) else: cmd = worker_args.get_argv_for_command("execute") + ( - "--disable-monitoring", + '--full-monitoring' if self._services_mode else '--disable-monitoring', "--id", task_id, ) @@ -592,7 +593,11 @@ class Worker(ServiceCommandSection): errors = temp_stderr_name and Path(temp_stderr_name).read_text() if errors: print("\nEncountered errors:\n\n{}\n".format(errors)) - if status is None: + if status is None and self._services_mode: + print( + "Service bootstrap completed: task '{}'".format(task_id) + ) + elif status is None: print( "DONE: Running task '{}' (user aborted)".format(task_id) ) @@ -612,29 +617,71 @@ class Worker(ServiceCommandSection): safe_remove_file(temp_stdout_name) safe_remove_file(temp_stderr_name) - def run_tasks_loop(self, queues, worker_params, priority_order=True): + def run_tasks_loop(self, queues, worker_params, priority_order=True, gpu_indexes=None, gpu_queues=None): """ :summary: Pull and run tasks from queues. :description: 1. Go through ``queues`` by order. 2. Try getting the next task for each and run the first one that returns. 3. Go to step 1 - :param queues: IDs of queues to pull tasks from - :type queues: list of ``Text`` - :param worker_params: Worker command line arguments - :type worker_params: ``clearml_agent.helper.process.WorkerParams`` - :param priority_order: If True pull order in priority manner. always from the first + :param list(str) queues: IDs of queues to pull tasks from + :param worker_params worker_params: Worker command line arguments + :param bool priority_order: If True pull order in priority manner. always from the first If False, pull from each queue once in a round robin manner - :type priority_order: bool + :param list gpu_indexes: list of gpu_indexes. Needs special backend support + :param list gpu_queues: list of pairs (queue_id, num_gpus). Needs special backend support """ if not self._daemon_foreground: print('Starting infinite task polling loop...') _last_machine_update_ts = 0 + if self._services_mode: + max_num_instances = int(self._services_mode) + else: + max_num_instances = None + + # store in runtime configuration, + if max_num_instances and not self.set_runtime_properties(key='max_num_instances', value=max_num_instances): + warning('Maximum number of service instance not supported, removing limit.') + max_num_instances = -1 + + # get current running instances + available_gpus = None + if gpu_indexes and gpu_queues: + available_gpus, gpu_queues = self._setup_dynamic_gpus(gpu_queues) + # multi instance support + self._services_mode = True while True: queue_tags = None runtime_props = None + + if max_num_instances and max_num_instances > 0: + # make sure we do not have too many instances to run + if len(Singleton.get_running_pids()) >= max_num_instances: + if self._daemon_foreground or worker_params.debug: + print("Reached max number of services, sleeping for {:.1f} seconds".format( + self._polling_interval)) + sleep(self._polling_interval) + continue + + # update available gpus + if gpu_queues: + try: + response = self._session.send_api(workers_api.GetAllRequest(last_seen=60)) + except Exception: + # if something went wrong start over from the highest priority queue + sleep(self._polling_interval) + continue + available_gpus = self._dynamic_gpu_get_available(gpu_indexes) + # if something went wrong or we have no free gpus + # start over from the highest priority queue + if not available_gpus: + if self._daemon_foreground or worker_params.debug: + print("All GPUs allocated, sleeping for {:.1f} seconds".format(self._polling_interval)) + sleep(self._polling_interval) + continue + # iterate over queues (priority style, queues[0] is highest) for queue in queues: @@ -644,6 +691,25 @@ class Worker(ServiceCommandSection): if not self.should_be_currently_active(queue_tags[queue], runtime_props): continue + if gpu_queues: + # peek into queue + # get next task in queue + try: + response = self._session.send_api(queues_api.GetByIdRequest(queue=queue)) + except Exception: + # if something went wrong start over from the highest priority queue + break + if not len(response.queue.entries): + continue + # check if we have enough available gpus + if gpu_queues[queue] > len(available_gpus): + # not enough available_gpus, we should sleep and start over + if self._daemon_foreground or worker_params.debug: + print("Not enough free GPUs {}/{}, sleeping for {:.1f} seconds".format( + len(available_gpus), gpu_queues[queue], self._polling_interval)) + sleep(self._polling_interval) + break + # get next task in queue try: response = self._session.send_api( @@ -674,13 +740,33 @@ class Worker(ServiceCommandSection): except: pass + self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id)) + + org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES') + worker_id = self.worker_id + if gpu_queues and gpu_queues.get(queue): + # pick the first available GPUs + gpus = available_gpus[:gpu_queues.get(queue)] + available_gpus = available_gpus[gpu_queues.get(queue):] + self.set_runtime_properties( + key='available_gpus', value=','.join(str(g) for g in available_gpus)) + os.environ['CUDA_VISIBLE_DEVICES'] = \ + os.environ['NVIDIA_VISIBLE_DEVICES'] = ','.join(str(g) for g in gpus) + self.worker_id = ':'.join(self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)]) + self.send_logs( task_id=task_id, lines=["task {} pulled from {} by worker {}\n".format(task_id, queue, self.worker_id)], level="INFO", ) - self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id)) + self.run_one_task(queue, task_id, worker_params) + + if gpu_queues: + self.worker_id = worker_id + os.environ['CUDA_VISIBLE_DEVICES'] = \ + os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus + self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues)) queue_tags = None @@ -699,6 +785,39 @@ class Worker(ServiceCommandSection): if self._session.config["agent.reload_config"]: self.reload_config() + def _dynamic_gpu_get_available(self, gpu_indexes): + try: + response = self._session.send_api(workers_api.GetAllRequest(last_seen=60)) + except Exception: + return None + + worker_name = self._session.config["agent.worker_name"] + ':' + our_workers = [ + w.id for w in response.workers + if w.id.startswith(worker_name) and w.id != self.worker_id] + gpus = [] + for w in our_workers: + gpus += [int(g) for g in w.split(':')[-1].lower().replace('gpu', '').split(',')] + available_gpus = list(set(gpu_indexes) - set(gpus)) + + return available_gpus + + def _setup_dynamic_gpus(self, gpu_queues): + available_gpus = self.get_runtime_properties() + if available_gpus is None: + raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server") + available_gpus = [prop["value"] for prop in available_gpus if prop["key"] == 'available_gpus'] + if available_gpus: + available_gpus = [int(g) for g in available_gpus[-1].split(',')] + if not isinstance(gpu_queues, dict): + gpu_queues = dict(gpu_queues) + + if not self.set_runtime_properties( + key='available_gpus', value=','.join(str(g) for g in available_gpus)): + raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server") + + return available_gpus, gpu_queues + def get_worker_properties(self, queue_ids): queue_tags = { q.id: {'name': q.name, 'tags': q.tags} @@ -714,11 +833,11 @@ class Worker(ServiceCommandSection): # either not supported or never tested if self._runtime_props_support == self._session.api_version: # tested against latest api_version, not supported - return [] + return None if not self._session.check_min_api_version(UptimeConf.min_api_version): # not supported due to insufficient api_version self._runtime_props_support = self._session.api_version - return [] + return None try: res = self.get("get_runtime_properties", worker=self.worker_id)["runtime_properties"] # definitely supported @@ -726,12 +845,40 @@ class Worker(ServiceCommandSection): return res except APIError: self._runtime_props_support = self._session.api_version - return [] + return None + + def set_runtime_properties(self, key, value): + ## + return True + if self._runtime_props_support is not True: + # either not supported or never tested + if self._runtime_props_support == self._session.api_version: + # tested against latest api_version, not supported + return False + if not self._session.check_min_api_version(UptimeConf.min_api_version): + # not supported due to insufficient api_version + self._runtime_props_support = self._session.api_version + return False + + # noinspection PyBroadException + try: + self.post("set_runtime_properties", worker=self.worker_id, json={key: value}) + # definitely supported + self._runtime_props_support = True + return True + except APIError: + self._runtime_props_support = self._session.api_version + except Exception as ex: + # not sure what happened + pass + + return False def should_be_currently_active(self, current_queue, runtime_properties): """ Checks if a worker is active according to queue tags, worker's runtime properties and uptime schedule. """ + runtime_properties = runtime_properties or [] if UptimeConf.queue_tag_off in current_queue['tags']: self.log.debug("Queue {} is tagged '{}', worker will not pull tasks".format( current_queue['name'], UptimeConf.queue_tag_off) @@ -832,6 +979,9 @@ class Worker(ServiceCommandSection): self._uptime_config = None self._downtime_config = None + # support --dynamic-gpus + dynamic_gpus, gpu_indexes, queues = self._parse_dynamic_gpus(kwargs, queues) + # We are not running a daemon we are killing one. # find the pid send termination signal and leave if kwargs.get('stop', False): @@ -948,6 +1098,8 @@ class Worker(ServiceCommandSection): trace=self._session.trace, ), priority_order=not order_fairness, + gpu_indexes=gpu_indexes, + gpu_queues=dynamic_gpus, ) except Exception: tb = six.text_type(traceback.format_exc()) @@ -968,6 +1120,45 @@ class Worker(ServiceCommandSection): self._unregister(queues) safe_remove_file(self.temp_config_path) + def _parse_dynamic_gpus(self, kwargs, queues): + gpu_indexes = kwargs.get('gpus') + dynamic_gpus = kwargs.get('dynamic_gpus', None) + if dynamic_gpus: + # test gpus were passed correctly + if not gpu_indexes or len(gpu_indexes.split('-')) > 2 or (',' in gpu_indexes and '-' in gpu_indexes): + raise ValueError('--gpus must be provided, in one of two ways: ' + 'comma separated \'0,1,2,3\' or range \'0-3\'') + try: + if '-' in gpu_indexes: + gpu_indexes = list(range(int(gpu_indexes.split('-')[0]), 1 + int(gpu_indexes.split('-')[1]))) + else: + gpu_indexes = [int(g) for g in gpu_indexes.split(',')] + except Exception: + raise ValueError('Failed parsing --gpus {}'.format(kwargs.get('gpus'))) + + dynamic_gpus = [(s[:-1 - len(s.split('=')[-1])], int(s.split('=')[-1])) for s in dynamic_gpus] + # resolve queue ids + dynamic_gpus_q = self._resolve_queue_names([q for q, _ in dynamic_gpus], create_if_missing=False) + dynamic_gpus = list(zip(dynamic_gpus_q, [i for _, i in dynamic_gpus])) + if set(dynamic_gpus_q) != set(queues): + raise ValueError( + "--dynamic-gpus [{}] and --queues [{}] must contain the same queues".format( + dynamic_gpus, queues)) + dynamic_gpus = sorted( + dynamic_gpus, reverse=True, key=cmp_to_key( + lambda x, y: -1 if x[1] < y[1] or x[1] == y[1] and queues.index(x[0]) > queues.index(y[0]) + else +1)) + # order queue priority based on the combination we have + queues = [q for q, _ in dynamic_gpus] + + # test server support + available_gpus = self._dynamic_gpu_get_available(gpu_indexes) + if not self.set_runtime_properties( + key='available_gpus', value=','.join(str(g) for g in available_gpus)): + raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server") + + return dynamic_gpus, gpu_indexes, queues + def report_monitor(self, report): if not self.monitor: self.new_monitor(report=report) @@ -1438,6 +1629,11 @@ class Worker(ServiceCommandSection): # We expect the same behaviour in case full_monitoring was set, and in case docker mode is used if full_monitoring or docker is not False: + if full_monitoring and not (ENV_WORKER_ID.get() or '').strip(): + self._session.config["agent"]["worker_id"] = '' + # make sure we support multiple instances if we need to + self._singleton() + worker_params = WorkerParams( log_level=log_level, config_file=self._session.config_file, @@ -2684,15 +2880,15 @@ class Worker(ServiceCommandSection): queues = return_list(queues) if not create_if_missing: - return [self._resolve_name(q.name, "queues") for q in queues] + return [self._resolve_name(q if isinstance(q, str) else q.name, "queues") for q in queues] queue_ids = [] for q in queues: try: - q_id = self._resolve_name(q.name, "queues") + q_id = self._resolve_name(q if isinstance(q, str) else q.name, "queues") except: - self._session.send_api(queues_api.CreateRequest(name=q.name)) - q_id = self._resolve_name(q.name, "queues") + self._session.send_api(queues_api.CreateRequest(name=q if isinstance(q, str) else q.name)) + q_id = self._resolve_name(q if isinstance(q, str) else q.name, "queues") queue_ids.append(q_id) return queue_ids diff --git a/clearml_agent/helper/runtime_verification.py b/clearml_agent/helper/runtime_verification.py index 685807b..d5b7aa5 100644 --- a/clearml_agent/helper/runtime_verification.py +++ b/clearml_agent/helper/runtime_verification.py @@ -129,8 +129,9 @@ def get_uptime_string(entry): def get_runtime_properties_string(runtime_properties): - # type: (List[dict]) -> Tuple[Optional[str], str] + # type: (Optional[List[dict]]) -> Tuple[Optional[str], str] server_string = [] + runtime_properties = runtime_properties or [] force_flag = next( (prop for prop in runtime_properties if prop["key"] == UptimeConf.worker_key), None, diff --git a/clearml_agent/interface/worker.py b/clearml_agent/interface/worker.py index 3bf8fd7..b98538c 100644 --- a/clearml_agent/interface/worker.py +++ b/clearml_agent/interface/worker.py @@ -78,7 +78,10 @@ DAEMON_ARGS = dict({ }, '--services-mode': { 'help': 'Launch multiple long-term docker services. Implies docker & cpu-only flags.', - 'action': 'store_true', + 'nargs': '?', + 'const': -1, + 'type': int, + 'default': None, }, '--create-queue': { 'help': 'Create requested queue if it does not exist already.', @@ -93,6 +96,12 @@ DAEMON_ARGS = dict({ 'help': 'Stop the running agent (based on the same set of arguments)', 'action': 'store_true', }, + '--dynamic-gpus': { + 'help': 'Allow to dynamically allocate gpus based on queue properties, pass \'=\'.' + ' Example: \'dual_gpus=2 single_gpu=1\'', + 'nargs': '*', + 'default': None, + }, '--uptime': { 'help': 'Specify uptime for clearml-agent in " " format. for example, use "17-20 TUE" to set ' 'Tuesday\'s uptime to 17-20'