From 3ae174134356eb8f294efd64847b36791903d67a Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Fri, 25 Jun 2021 17:35:01 +0300 Subject: [PATCH] Fix k8s glue task container arguments not supported in kubectl_run command Fix k8s glue not passing required extra_docker_bash_script to string format --- clearml_agent/glue/k8s.py | 85 ++++++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 29 deletions(-) diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index 25d93f9..176ec99 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -12,7 +12,7 @@ from copy import deepcopy from pathlib import Path from threading import Thread from time import sleep -from typing import Text, List +from typing import Text, List, Callable, Any, Collection, Optional, Union import yaml @@ -37,7 +37,7 @@ class K8sIntegration(Worker): KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f" KUBECTL_RUN_CMD = "kubectl run clearml-id-{task_id} " \ - "--image {docker_image} " \ + "--image {docker_image} {docker_args} " \ "--restart=Never " \ "--namespace={namespace}" @@ -104,7 +104,7 @@ class K8sIntegration(Worker): :param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler :param str|callable kubectl_cmd: kubectl command line str, supports formatting (default: KUBECTL_RUN_CMD) example: "task={task_id} image={docker_image} queue_id={queue_id}" - or a callable function: kubectl_cmd(task_id, docker_image, queue_id, task_data) + or a callable function: kubectl_cmd(task_id, docker_image, docker_args, queue_id, task_data) :param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT) Notice this string will use format() call, if you have curly brackets they should be doubled { -> {{ Format arguments passed: {task_id} and {extra_bash_init_cmd} @@ -374,20 +374,20 @@ class K8sIntegration(Worker): else: print("Kubernetes scheduling task id={}".format(task_id)) + kubectl_kwargs = dict( + create_clearml_conf=create_clearml_conf, + labels=labels, + docker_image=container['image'], + docker_args=container['arguments'], + docker_bash=container.get('setup_shell_script'), + task_id=task_id, + queue=queue + ) + if self.template_dict: - output, error = self._kubectl_apply( - create_clearml_conf=create_clearml_conf, - labels=labels, - docker_image=container['container'], - docker_args=container['arguments'], - docker_bash=container.get('setup_shell_script'), - task_id=task_id, queue=queue) + output, error = self._kubectl_apply(**kubectl_kwargs) else: - output, error = self._kubectl_run( - create_clearml_conf=create_clearml_conf, - labels=labels, docker_image=container['container'], - task_data=task_data, - task_id=task_id, queue=queue) + output, error = self._kubectl_run(task_data=task_data, **kubectl_kwargs) error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8')) output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8')) @@ -421,18 +421,31 @@ class K8sIntegration(Worker): **user_props ) - def _parse_docker_args(self, docker_args): - # type: (list) -> dict - kube_args = [] - while docker_args: - cmd = docker_args.pop(0).strip() - if cmd in ('-e', '--env',): - env = docker_args.pop(0).strip() - key, value = env.split('=', 1) - kube_args.append({'name': key, 'value': value}) + def _get_docker_args(self, docker_args, flags, target=None, convert=None): + # type: (List[str], Collection[str], Optional[str], Callable[[str], Any]) -> Union[dict, List[str]] + """ + Get docker args matching specific flags. + + :argument docker_args: List of docker argument strings (flags and values) + :argument flags: List of flags/names to intercept (e.g. "--env" etc.) + :argument target: Controls return format. If provided, returns a dict with a target field containing a list + of result strings, otherwise returns a list of result strings + :argument convert: Optional conversion function for each result string + """ + args = docker_args[:] if docker_args else [] + results = [] + while args: + cmd = args.pop(0).strip() + if cmd in flags: + env = args.pop(0).strip() + if convert: + env = convert(env) + results.append(env) else: self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd)) - return {'env': kube_args} if kube_args else {} + if target: + return {target: results} if results else {} + return results def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_id): template = deepcopy(self.template_dict) @@ -448,7 +461,13 @@ class K8sIntegration(Worker): labels_dict = dict(pair.split('=', 1) for pair in labels) template['metadata'].setdefault('labels', {}) template['metadata']['labels'].update(labels_dict) - container = self._parse_docker_args(docker_args) + + container = self._get_docker_args( + docker_args, + target="env", + flags={"-e", "--env"}, + convert=lambda env: {'name': env.partition("=")[0], 'value': env.partition("=")[2]}, + ) container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \ else self.container_bash_script @@ -511,13 +530,18 @@ class K8sIntegration(Worker): return output, error - def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id): + def _kubectl_run( + self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_data, task_id + ): if callable(self.kubectl_cmd): - kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data) + kubectl_cmd = self.kubectl_cmd(task_id, docker_image, docker_args, queue, task_data) else: kubectl_cmd = self.kubectl_cmd.format( task_id=task_id, docker_image=docker_image, + docker_args=" ".join(self._get_docker_args( + docker_args, flags={"-e", "--env"}, convert=lambda env: '--env={}'.format(env)) + ), queue_id=queue, namespace=self.namespace, ) @@ -544,7 +568,10 @@ class K8sIntegration(Worker): "/bin/sh", "-c", "{} ; {}".format(create_clearml_conf, container_bash_script.format( - extra_bash_init_cmd=self.extra_bash_init_script, task_id=task_id)), + extra_bash_init_cmd=self.extra_bash_init_script or "", + extra_docker_bash_script=docker_bash or "", + task_id=task_id + )), ] process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = process.communicate()