diff --git a/clearml_agent/backend_api/config/default/agent.conf b/clearml_agent/backend_api/config/default/agent.conf index f83c883..311135e 100644 --- a/clearml_agent/backend_api/config/default/agent.conf +++ b/clearml_agent/backend_api/config/default/agent.conf @@ -192,6 +192,9 @@ # optional arguments to pass to docker image # these are local for this agent and will not be updated in the experiment's docker_cmd section + # Supports parsing ${CLEARML_TASK:default} and ${CLEARML_QUEUE_NAME:default} values based on Task object + # replace with real-time values. + # Example: "${CLEARML_TASK.project}", "${TASK.hyperparams.properties.user_key.value:default_value}" # extra_docker_arguments: ["--ipc=host", ] # Allow the extra docker arg to override task level docker arg (if the same argument is passed on both), diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index 59b3cdd..0d4bdad 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -150,7 +150,7 @@ from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fi from clearml_agent.helper.resource_monitor import ResourceMonitor from clearml_agent.helper.runtime_verification import check_runtime, print_uptime_properties from clearml_agent.helper.singleton import Singleton -from clearml_agent.helper.docker_args import DockerArgsSanitizer +from clearml_agent.helper.docker_args import DockerArgsSanitizer, DockerArgsTemplateResolver from clearml_agent.session import Session from .events import Events @@ -1240,6 +1240,15 @@ class Worker(ServiceCommandSection): else: print("Warning: generated docker container name is invalid: {}".format(name)) + # convert template arguments now (i.e. ${CLEARML_} ), this is important for the docker arg + # resolve the Task's docker arguments before everything else, because + # unlike the vault/config these are not running as the agent's user, they are the user's, + # we need to filter them post template parsing limitation to happen before the `docker_image_func` call + docker_args_template_resolver = DockerArgsTemplateResolver(task_session=self._session, task_id=task_id) + if docker_params.get("docker_arguments"): + docker_params["docker_arguments"] = docker_args_template_resolver.resolve_docker_args_from_template( + full_docker_cmd=docker_params["docker_arguments"]) + full_docker_cmd = self.docker_image_func(env_task_id=task_id, **docker_params) # if we are using the default docker, update back the Task: @@ -1256,6 +1265,12 @@ class Worker(ServiceCommandSection): except Exception: pass + # convert template arguments now (i.e. ${CLEARML_} ) + # Notice we do not parse the last part of the docker cmd because that's + # the actual command to be executed inside the docker + full_docker_cmd = docker_args_template_resolver.resolve_docker_args_from_template( + full_docker_cmd=full_docker_cmd[:-1]) + [full_docker_cmd[-1]] + # if this is services_mode, change the worker_id to a unique name # abd use full-monitoring, ot it registers itself as a worker for this specific service. # notice, the internal agent will monitor itself once the docker is up and running @@ -2711,6 +2726,11 @@ class Worker(ServiceCommandSection): docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script ) + # convert docker template arguments (i.e. ${CLEARML_} ) based on the current Task + docker_args_template_resolver = DockerArgsTemplateResolver(task_session=self._session, task_id=task_id) + full_docker_cmd = docker_args_template_resolver.resolve_docker_args_from_template( + full_docker_cmd=full_docker_cmd) + end_of_build_marker = "build.done=true" docker_cmd_suffix = ' build --id {task_id} --install-globally; ' \ 'ORG=$(stat -c "%u:%g" {conf_file}) ; chown $(whoami):$(whoami) {conf_file} ; ' \ diff --git a/clearml_agent/helper/docker_args.py b/clearml_agent/helper/docker_args.py index 10c4c69..246105a 100644 --- a/clearml_agent/helper/docker_args.py +++ b/clearml_agent/helper/docker_args.py @@ -1,8 +1,12 @@ import re import shlex +from functools import partial from typing import Tuple, List, TYPE_CHECKING, Optional from urllib.parse import urlunparse, urlparse +from string import Template +from clearml_agent.backend_api.services import queues as queues_api +from clearml_agent.backend_api.session import Request from clearml_agent.definitions import ( ENV_AGENT_GIT_PASS, ENV_AGENT_SECRET_KEY, @@ -270,3 +274,168 @@ class DockerArgsSanitizer: additional_task_runtime = {"_external_host_tcp_port_mapping": ports} return docker_arguments+additional_cmd, additional_task_runtime + + +class CustomTemplate(Template): + """ + Parse ${CLEARML_:default} values based on Task object and replace with real-time value + Example: "-e project_id=${CLEARML_TASK.project}" will be replaced with the + Task actual project ID from the Task object "-e project_id=" + "-e queue_name=${CLEARML_QUEUE_NAME}" + "-e user_key=${TASK.hyperparams.properties.user_key.value:default_value}" + + It supports: + + ${QUEUE_NAME} - name of the queue + # Task object nested variables: + ${TASK.id} + ${TASK.name} + ${TASK.project} + ${TASK.hyperparams.properties.user_key.value} + """ + + idpattern = r'(?a:[_a-z][_a-z0-9|.|:]*)' + prefix = "CLEARML_" + queue_id_to_name_map = {} + + @classmethod + def get_queue_name(cls, task_session, queue_id): + if queue_id in cls.queue_id_to_name_map: + return cls.queue_id_to_name_map[queue_id] + + # noinspection PyBroadException + try: + response = task_session.send_api(queues_api.GetByIdRequest(queue=queue_id)) + cls.queue_id_to_name_map[queue_id] = response.queue.name + except Exception: + # if something went wrong start over from the highest priority queue + return None + return cls.queue_id_to_name_map.get(queue_id) + + def default_custom_substitute(self, task_info, queue_name): + return self.custom_substitute(partial(CustomTemplate.default_resolve_template, task_info, queue_name)) + + def custom_substitute(self, mapping_func): + # Helper function for .sub() + def convert(mo): + named = mo.group('named') or mo.group('braced') + if not named or not str(named).startswith(self.prefix): + return mo.group() + named = named[len(self.prefix):] + if named is not None: + default_value = None + try: + if ":" in named: + named, default_value = named.split(":", 1) + + return str(mapping_func(named, default_value)) + except KeyError: + return mo.group() + if mo.group('escaped') is not None: + return self.delimiter + if mo.group('invalid') is not None: + return mo.group() + raise ValueError('Unrecognized named group in pattern', self.pattern) + + return self.pattern.sub(convert, self.template) + + def substitute(self, *args, **kwds): + raise ValueError("Unsupported") + + def safe_substitute(self, *args, **kwds): + raise ValueError("Unsupported") + + @classmethod + def default_resolve_template(cls, task_info, queue, key, default): + """ + Notice CLEARML_ prefix omitted! (i.e. ${QUEUE_ID} is ${CLEARML_QUEUE_ID}) + + we support: + ${QUEUE_NAME} - name of the queue + ${WORKER_ID} - FUTURE + + # we also complex variables: + ${TASK.id} + ${TASK.name} + ${TASK.project.id} + ${TASK.project.name} + ${TASK.hyperparams.properties.user_key.value} + + :param task_info: nested dict with task information + :param queue: queue_id (str) + :param key: key to be replaced + :param default: default value, None will raise exception + :return: string value + """ + try: + parts = key.split(".") + main_part = parts[0] + if main_part == "QUEUE_NAME": + if len(parts) == 1: + return queue or default + raise ValueError() + elif main_part == "QUEUE_NAME": + # future support + raise ValueError() + elif main_part == "WORKER_ID": + # future support + raise ValueError() + elif main_part == "TASK": + for part in parts[1:]: + + task_info = task_info.get(part) + if task_info is None: + break + + if isinstance(task_info, str): + return task_info + + if default: + return default + raise ValueError() + + except Exception: + raise KeyError((key,)) + + # default, nothing + raise KeyError((key,)) + + +class DockerArgsTemplateResolver: + def __init__(self, task_session, task_id): + self._task_session = task_session + self.task_info = None + self.queue_name = None + self.task_id = task_id + + def resolve_docker_args_from_template(self, full_docker_cmd): + if not full_docker_cmd or not self._task_session.check_min_api_version("2.20"): + return full_docker_cmd + + # convert docker template arguments (i.e. ${CLEARML_} ) based on the current Task + for i, token in enumerate(full_docker_cmd[:-1]): + # skip the ones which are obviously not our prefix + if not CustomTemplate.delimiter in token or not CustomTemplate.prefix in token: + continue + + if self.task_info is None: + result = self._task_session.send_request( + service='tasks', + action='get_all', + version='2.20', + method=Request.def_method, + json={'id': [self.task_id], 'search_hidden': True} + ) + # we should not fail here + self.task_info = result.json().get("data", {}).get("tasks", [])[0] or {} + queue_id = self.task_info.get("execution", {}).get("queue") + self.queue_name = CustomTemplate.get_queue_name(self._task_session, queue_id) + + tmpl = CustomTemplate(token) + # replace it + try: + full_docker_cmd[i] = tmpl.default_custom_substitute(self.task_info, self.queue_name) + except Exception as ex: + print("Failed parsing ClearML Template argument [{}] skipped: error ()".format(token, ex)) + + return full_docker_cmd