mirror of
				https://github.com/clearml/clearml-agent
				synced 2025-06-26 18:16:15 +00:00 
			
		
		
		
	Add support for ${CLEARML_TASK.xx} as docker args arguments parsed based on Tasks value
This commit is contained in:
		
							parent
							
								
									bb9ad6b213
								
							
						
					
					
						commit
						55790b3c3a
					
				| @ -192,6 +192,9 @@ | |||||||
| 
 | 
 | ||||||
|     # optional arguments to pass to docker image |     # optional arguments to pass to docker image | ||||||
|     # these are local for this agent and will not be updated in the experiment's docker_cmd section |     # these are local for this agent and will not be updated in the experiment's docker_cmd section | ||||||
|  |     # Supports parsing ${CLEARML_TASK:default} and  ${CLEARML_QUEUE_NAME:default} values based on Task object | ||||||
|  |     # replace with real-time values. | ||||||
|  |     # Example: "${CLEARML_TASK.project}", "${TASK.hyperparams.properties.user_key.value:default_value}" | ||||||
|     # extra_docker_arguments: ["--ipc=host", ] |     # extra_docker_arguments: ["--ipc=host", ] | ||||||
| 
 | 
 | ||||||
|     # Allow the extra docker arg to override task level docker arg (if the same argument is passed on both), |     # Allow the extra docker arg to override task level docker arg (if the same argument is passed on both), | ||||||
|  | |||||||
| @ -150,7 +150,7 @@ from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fi | |||||||
| from clearml_agent.helper.resource_monitor import ResourceMonitor | from clearml_agent.helper.resource_monitor import ResourceMonitor | ||||||
| from clearml_agent.helper.runtime_verification import check_runtime, print_uptime_properties | from clearml_agent.helper.runtime_verification import check_runtime, print_uptime_properties | ||||||
| from clearml_agent.helper.singleton import Singleton | from clearml_agent.helper.singleton import Singleton | ||||||
| from clearml_agent.helper.docker_args import DockerArgsSanitizer | from clearml_agent.helper.docker_args import DockerArgsSanitizer, DockerArgsTemplateResolver | ||||||
| from clearml_agent.session import Session | from clearml_agent.session import Session | ||||||
| from .events import Events | from .events import Events | ||||||
| 
 | 
 | ||||||
| @ -1240,6 +1240,15 @@ class Worker(ServiceCommandSection): | |||||||
|                     else: |                     else: | ||||||
|                         print("Warning: generated docker container name is invalid: {}".format(name)) |                         print("Warning: generated docker container name is invalid: {}".format(name)) | ||||||
| 
 | 
 | ||||||
|  |             # convert template arguments now (i.e. ${CLEARML_} ), this is important for the docker arg | ||||||
|  |             # resolve the Task's docker arguments before everything else, because | ||||||
|  |             # unlike the vault/config these are not running as the agent's user, they are the user's, | ||||||
|  |             # we need to filter them post template parsing limitation to happen before the `docker_image_func` call | ||||||
|  |             docker_args_template_resolver = DockerArgsTemplateResolver(task_session=self._session, task_id=task_id) | ||||||
|  |             if docker_params.get("docker_arguments"): | ||||||
|  |                 docker_params["docker_arguments"] = docker_args_template_resolver.resolve_docker_args_from_template( | ||||||
|  |                     full_docker_cmd=docker_params["docker_arguments"]) | ||||||
|  | 
 | ||||||
|             full_docker_cmd = self.docker_image_func(env_task_id=task_id, **docker_params) |             full_docker_cmd = self.docker_image_func(env_task_id=task_id, **docker_params) | ||||||
| 
 | 
 | ||||||
|             # if we are using the default docker, update back the Task: |             # if we are using the default docker, update back the Task: | ||||||
| @ -1256,6 +1265,12 @@ class Worker(ServiceCommandSection): | |||||||
|                 except Exception: |                 except Exception: | ||||||
|                     pass |                     pass | ||||||
| 
 | 
 | ||||||
|  |             # convert template arguments now (i.e. ${CLEARML_} ) | ||||||
|  |             # Notice we do not parse the last part of the docker cmd because that's | ||||||
|  |             # the actual command to be executed inside the docker | ||||||
|  |             full_docker_cmd = docker_args_template_resolver.resolve_docker_args_from_template( | ||||||
|  |                 full_docker_cmd=full_docker_cmd[:-1]) + [full_docker_cmd[-1]] | ||||||
|  | 
 | ||||||
|             # if this is services_mode, change the worker_id to a unique name |             # if this is services_mode, change the worker_id to a unique name | ||||||
|             # abd use full-monitoring, ot it registers itself as a worker for this specific service. |             # abd use full-monitoring, ot it registers itself as a worker for this specific service. | ||||||
|             # notice, the internal agent will monitor itself once the docker is up and running |             # notice, the internal agent will monitor itself once the docker is up and running | ||||||
| @ -2711,6 +2726,11 @@ class Worker(ServiceCommandSection): | |||||||
|             docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script |             docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|  |         # convert docker template arguments (i.e. ${CLEARML_} ) based on the current Task | ||||||
|  |         docker_args_template_resolver = DockerArgsTemplateResolver(task_session=self._session, task_id=task_id) | ||||||
|  |         full_docker_cmd = docker_args_template_resolver.resolve_docker_args_from_template( | ||||||
|  |             full_docker_cmd=full_docker_cmd) | ||||||
|  | 
 | ||||||
|         end_of_build_marker = "build.done=true" |         end_of_build_marker = "build.done=true" | ||||||
|         docker_cmd_suffix = ' build --id {task_id} --install-globally; ' \ |         docker_cmd_suffix = ' build --id {task_id} --install-globally; ' \ | ||||||
|                             'ORG=$(stat -c "%u:%g" {conf_file}) ; chown $(whoami):$(whoami) {conf_file} ; ' \ |                             'ORG=$(stat -c "%u:%g" {conf_file}) ; chown $(whoami):$(whoami) {conf_file} ; ' \ | ||||||
|  | |||||||
| @ -1,8 +1,12 @@ | |||||||
| import re | import re | ||||||
| import shlex | import shlex | ||||||
|  | from functools import partial | ||||||
| from typing import Tuple, List, TYPE_CHECKING, Optional | from typing import Tuple, List, TYPE_CHECKING, Optional | ||||||
| from urllib.parse import urlunparse, urlparse | from urllib.parse import urlunparse, urlparse | ||||||
|  | from string import Template | ||||||
| 
 | 
 | ||||||
|  | from clearml_agent.backend_api.services import queues as queues_api | ||||||
|  | from clearml_agent.backend_api.session import Request | ||||||
| from clearml_agent.definitions import ( | from clearml_agent.definitions import ( | ||||||
|     ENV_AGENT_GIT_PASS, |     ENV_AGENT_GIT_PASS, | ||||||
|     ENV_AGENT_SECRET_KEY, |     ENV_AGENT_SECRET_KEY, | ||||||
| @ -270,3 +274,168 @@ class DockerArgsSanitizer: | |||||||
|         additional_task_runtime = {"_external_host_tcp_port_mapping": ports} |         additional_task_runtime = {"_external_host_tcp_port_mapping": ports} | ||||||
| 
 | 
 | ||||||
|         return docker_arguments+additional_cmd, additional_task_runtime |         return docker_arguments+additional_cmd, additional_task_runtime | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class CustomTemplate(Template): | ||||||
|  |     """ | ||||||
|  |     Parse ${CLEARML_<something>:default} values based on Task object and replace with real-time value | ||||||
|  |     Example: "-e project_id=${CLEARML_TASK.project}" will be replaced with the | ||||||
|  |              Task actual project ID from the Task object "-e project_id=<task.project>" | ||||||
|  |              "-e queue_name=${CLEARML_QUEUE_NAME}" | ||||||
|  |              "-e user_key=${TASK.hyperparams.properties.user_key.value:default_value}" | ||||||
|  | 
 | ||||||
|  |     It supports: | ||||||
|  | 
 | ||||||
|  |     ${QUEUE_NAME} - name of the queue | ||||||
|  |     # Task object nested variables: | ||||||
|  |     ${TASK.id} | ||||||
|  |     ${TASK.name} | ||||||
|  |     ${TASK.project} | ||||||
|  |     ${TASK.hyperparams.properties.user_key.value} | ||||||
|  |     """ | ||||||
|  | 
 | ||||||
|  |     idpattern = r'(?a:[_a-z][_a-z0-9|.|:]*)' | ||||||
|  |     prefix = "CLEARML_" | ||||||
|  |     queue_id_to_name_map = {} | ||||||
|  | 
 | ||||||
|  |     @classmethod | ||||||
|  |     def get_queue_name(cls, task_session, queue_id): | ||||||
|  |         if queue_id in cls.queue_id_to_name_map: | ||||||
|  |             return cls.queue_id_to_name_map[queue_id] | ||||||
|  | 
 | ||||||
|  |         # noinspection PyBroadException | ||||||
|  |         try: | ||||||
|  |             response = task_session.send_api(queues_api.GetByIdRequest(queue=queue_id)) | ||||||
|  |             cls.queue_id_to_name_map[queue_id] = response.queue.name | ||||||
|  |         except Exception: | ||||||
|  |             # if something went wrong start over from the highest priority queue | ||||||
|  |             return None | ||||||
|  |         return cls.queue_id_to_name_map.get(queue_id) | ||||||
|  | 
 | ||||||
|  |     def default_custom_substitute(self, task_info, queue_name): | ||||||
|  |         return self.custom_substitute(partial(CustomTemplate.default_resolve_template, task_info, queue_name)) | ||||||
|  | 
 | ||||||
|  |     def custom_substitute(self, mapping_func): | ||||||
|  |         # Helper function for .sub() | ||||||
|  |         def convert(mo): | ||||||
|  |             named = mo.group('named') or mo.group('braced') | ||||||
|  |             if not named or not str(named).startswith(self.prefix): | ||||||
|  |                 return mo.group() | ||||||
|  |             named = named[len(self.prefix):] | ||||||
|  |             if named is not None: | ||||||
|  |                 default_value = None | ||||||
|  |                 try: | ||||||
|  |                     if ":" in named: | ||||||
|  |                         named, default_value = named.split(":", 1) | ||||||
|  | 
 | ||||||
|  |                     return str(mapping_func(named, default_value)) | ||||||
|  |                 except KeyError: | ||||||
|  |                     return mo.group() | ||||||
|  |             if mo.group('escaped') is not None: | ||||||
|  |                 return self.delimiter | ||||||
|  |             if mo.group('invalid') is not None: | ||||||
|  |                 return mo.group() | ||||||
|  |             raise ValueError('Unrecognized named group in pattern', self.pattern) | ||||||
|  | 
 | ||||||
|  |         return self.pattern.sub(convert, self.template) | ||||||
|  | 
 | ||||||
|  |     def substitute(self, *args, **kwds): | ||||||
|  |         raise ValueError("Unsupported") | ||||||
|  | 
 | ||||||
|  |     def safe_substitute(self, *args, **kwds): | ||||||
|  |         raise ValueError("Unsupported") | ||||||
|  | 
 | ||||||
|  |     @classmethod | ||||||
|  |     def default_resolve_template(cls, task_info, queue, key, default): | ||||||
|  |         """ | ||||||
|  |         Notice CLEARML_ prefix omitted! (i.e. ${QUEUE_ID} is ${CLEARML_QUEUE_ID}) | ||||||
|  | 
 | ||||||
|  |         we support: | ||||||
|  |         ${QUEUE_NAME} - name of the queue | ||||||
|  |         ${WORKER_ID} - FUTURE | ||||||
|  | 
 | ||||||
|  |         # we also complex variables: | ||||||
|  |         ${TASK.id} | ||||||
|  |         ${TASK.name} | ||||||
|  |         ${TASK.project.id} | ||||||
|  |         ${TASK.project.name} | ||||||
|  |         ${TASK.hyperparams.properties.user_key.value} | ||||||
|  | 
 | ||||||
|  |         :param task_info: nested dict with task information | ||||||
|  |         :param queue: queue_id (str) | ||||||
|  |         :param key: key to be replaced | ||||||
|  |         :param default: default value, None will raise exception | ||||||
|  |         :return: string value | ||||||
|  |         """ | ||||||
|  |         try: | ||||||
|  |             parts = key.split(".") | ||||||
|  |             main_part = parts[0] | ||||||
|  |             if main_part == "QUEUE_NAME": | ||||||
|  |                 if len(parts) == 1: | ||||||
|  |                     return queue or default | ||||||
|  |                 raise ValueError() | ||||||
|  |             elif main_part == "QUEUE_NAME": | ||||||
|  |                 # future support | ||||||
|  |                 raise ValueError() | ||||||
|  |             elif main_part == "WORKER_ID": | ||||||
|  |                 # future support | ||||||
|  |                 raise ValueError() | ||||||
|  |             elif main_part == "TASK": | ||||||
|  |                 for part in parts[1:]: | ||||||
|  | 
 | ||||||
|  |                     task_info = task_info.get(part) | ||||||
|  |                     if task_info is None: | ||||||
|  |                         break | ||||||
|  | 
 | ||||||
|  |                 if isinstance(task_info, str): | ||||||
|  |                     return task_info | ||||||
|  | 
 | ||||||
|  |                 if default: | ||||||
|  |                     return default | ||||||
|  |                 raise ValueError() | ||||||
|  | 
 | ||||||
|  |         except Exception: | ||||||
|  |             raise KeyError((key,)) | ||||||
|  | 
 | ||||||
|  |         # default, nothing | ||||||
|  |         raise KeyError((key,)) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class DockerArgsTemplateResolver: | ||||||
|  |     def __init__(self, task_session, task_id): | ||||||
|  |         self._task_session = task_session | ||||||
|  |         self.task_info = None | ||||||
|  |         self.queue_name = None | ||||||
|  |         self.task_id = task_id | ||||||
|  | 
 | ||||||
|  |     def resolve_docker_args_from_template(self, full_docker_cmd): | ||||||
|  |         if not full_docker_cmd or not self._task_session.check_min_api_version("2.20"): | ||||||
|  |             return full_docker_cmd | ||||||
|  | 
 | ||||||
|  |         # convert docker template arguments (i.e. ${CLEARML_} ) based on the current Task | ||||||
|  |         for i, token in enumerate(full_docker_cmd[:-1]): | ||||||
|  |             # skip the ones which are obviously not our prefix | ||||||
|  |             if not CustomTemplate.delimiter in token or not CustomTemplate.prefix in token: | ||||||
|  |                 continue | ||||||
|  | 
 | ||||||
|  |             if self.task_info is None: | ||||||
|  |                 result = self._task_session.send_request( | ||||||
|  |                     service='tasks', | ||||||
|  |                     action='get_all', | ||||||
|  |                     version='2.20', | ||||||
|  |                     method=Request.def_method, | ||||||
|  |                     json={'id': [self.task_id], 'search_hidden': True} | ||||||
|  |                 ) | ||||||
|  |                 # we should not fail here | ||||||
|  |                 self.task_info = result.json().get("data", {}).get("tasks", [])[0] or {} | ||||||
|  |                 queue_id = self.task_info.get("execution", {}).get("queue") | ||||||
|  |                 self.queue_name = CustomTemplate.get_queue_name(self._task_session, queue_id) | ||||||
|  | 
 | ||||||
|  |             tmpl = CustomTemplate(token) | ||||||
|  |             # replace it | ||||||
|  |             try: | ||||||
|  |                 full_docker_cmd[i] = tmpl.default_custom_substitute(self.task_info, self.queue_name) | ||||||
|  |             except Exception as ex: | ||||||
|  |                 print("Failed parsing ClearML Template argument [{}] skipped: error ()".format(token, ex)) | ||||||
|  | 
 | ||||||
|  |         return full_docker_cmd | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user
	 clearml
						clearml