Fix k8s glue task container arguments not supported in kubectl_run command

Fix k8s glue not passing required extra_docker_bash_script to string format
This commit is contained in:
allegroai 2021-06-25 17:35:01 +03:00
parent 53c106c3af
commit 3ae1741343

View File

@ -12,7 +12,7 @@ from copy import deepcopy
from pathlib import Path
from threading import Thread
from time import sleep
from typing import Text, List
from typing import Text, List, Callable, Any, Collection, Optional, Union
import yaml
@ -37,7 +37,7 @@ class K8sIntegration(Worker):
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
KUBECTL_RUN_CMD = "kubectl run clearml-id-{task_id} " \
"--image {docker_image} " \
"--image {docker_image} {docker_args} " \
"--restart=Never " \
"--namespace={namespace}"
@ -104,7 +104,7 @@ class K8sIntegration(Worker):
:param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler
:param str|callable kubectl_cmd: kubectl command line str, supports formatting (default: KUBECTL_RUN_CMD)
example: "task={task_id} image={docker_image} queue_id={queue_id}"
or a callable function: kubectl_cmd(task_id, docker_image, queue_id, task_data)
or a callable function: kubectl_cmd(task_id, docker_image, docker_args, queue_id, task_data)
:param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT)
Notice this string will use format() call, if you have curly brackets they should be doubled { -> {{
Format arguments passed: {task_id} and {extra_bash_init_cmd}
@ -374,20 +374,20 @@ class K8sIntegration(Worker):
else:
print("Kubernetes scheduling task id={}".format(task_id))
kubectl_kwargs = dict(
create_clearml_conf=create_clearml_conf,
labels=labels,
docker_image=container['image'],
docker_args=container['arguments'],
docker_bash=container.get('setup_shell_script'),
task_id=task_id,
queue=queue
)
if self.template_dict:
output, error = self._kubectl_apply(
create_clearml_conf=create_clearml_conf,
labels=labels,
docker_image=container['container'],
docker_args=container['arguments'],
docker_bash=container.get('setup_shell_script'),
task_id=task_id, queue=queue)
output, error = self._kubectl_apply(**kubectl_kwargs)
else:
output, error = self._kubectl_run(
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=container['container'],
task_data=task_data,
task_id=task_id, queue=queue)
output, error = self._kubectl_run(task_data=task_data, **kubectl_kwargs)
error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8'))
output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8'))
@ -421,18 +421,31 @@ class K8sIntegration(Worker):
**user_props
)
def _parse_docker_args(self, docker_args):
# type: (list) -> dict
kube_args = []
while docker_args:
cmd = docker_args.pop(0).strip()
if cmd in ('-e', '--env',):
env = docker_args.pop(0).strip()
key, value = env.split('=', 1)
kube_args.append({'name': key, 'value': value})
def _get_docker_args(self, docker_args, flags, target=None, convert=None):
# type: (List[str], Collection[str], Optional[str], Callable[[str], Any]) -> Union[dict, List[str]]
"""
Get docker args matching specific flags.
:argument docker_args: List of docker argument strings (flags and values)
:argument flags: List of flags/names to intercept (e.g. "--env" etc.)
:argument target: Controls return format. If provided, returns a dict with a target field containing a list
of result strings, otherwise returns a list of result strings
:argument convert: Optional conversion function for each result string
"""
args = docker_args[:] if docker_args else []
results = []
while args:
cmd = args.pop(0).strip()
if cmd in flags:
env = args.pop(0).strip()
if convert:
env = convert(env)
results.append(env)
else:
self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd))
return {'env': kube_args} if kube_args else {}
if target:
return {target: results} if results else {}
return results
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_id):
template = deepcopy(self.template_dict)
@ -448,7 +461,13 @@ class K8sIntegration(Worker):
labels_dict = dict(pair.split('=', 1) for pair in labels)
template['metadata'].setdefault('labels', {})
template['metadata']['labels'].update(labels_dict)
container = self._parse_docker_args(docker_args)
container = self._get_docker_args(
docker_args,
target="env",
flags={"-e", "--env"},
convert=lambda env: {'name': env.partition("=")[0], 'value': env.partition("=")[2]},
)
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
else self.container_bash_script
@ -511,13 +530,18 @@ class K8sIntegration(Worker):
return output, error
def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id):
def _kubectl_run(
self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_data, task_id
):
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, docker_args, queue, task_data)
else:
kubectl_cmd = self.kubectl_cmd.format(
task_id=task_id,
docker_image=docker_image,
docker_args=" ".join(self._get_docker_args(
docker_args, flags={"-e", "--env"}, convert=lambda env: '--env={}'.format(env))
),
queue_id=queue,
namespace=self.namespace,
)
@ -544,7 +568,10 @@ class K8sIntegration(Worker):
"/bin/sh",
"-c",
"{} ; {}".format(create_clearml_conf, container_bash_script.format(
extra_bash_init_cmd=self.extra_bash_init_script, task_id=task_id)),
extra_bash_init_cmd=self.extra_bash_init_script or "",
extra_docker_bash_script=docker_bash or "",
task_id=task_id
)),
]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()