diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index 4b5375a..25d93f9 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -17,7 +17,7 @@ from typing import Text, List import yaml from clearml_agent.commands.events import Events -from clearml_agent.commands.worker import Worker +from clearml_agent.commands.worker import Worker, get_task_container from clearml_agent.definitions import ENV_DOCKER_IMAGE from clearml_agent.errors import APIError from clearml_agent.helper.base import safe_remove_file @@ -31,6 +31,8 @@ class K8sIntegration(Worker): K8S_PENDING_QUEUE = "k8s_scheduler" K8S_DEFAULT_NAMESPACE = "clearml" + AGENT_LABEL = "CLEARML=agent" + LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}" KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f" @@ -40,7 +42,7 @@ class K8sIntegration(Worker): "--namespace={namespace}" KUBECTL_DELETE_CMD = "kubectl delete pods " \ - "--selector=TRAINS=agent " \ + "--selector={selector} " \ "--field-selector=status.phase!=Pending,status.phase!=Running " \ "--namespace={namespace}" @@ -72,12 +74,10 @@ class K8sIntegration(Worker): "[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3", "$LOCAL_PYTHON -m pip install clearml-agent", "{extra_bash_init_cmd}", + "{extra_docker_bash_script}", "$LOCAL_PYTHON -m clearml_agent execute --full-monitoring --require-queue --id {task_id}" ] - AGENT_LABEL = "TRAINS=agent" - LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}" - _edit_hyperparams_version = "2.9" def __init__( @@ -275,16 +275,12 @@ class K8sIntegration(Worker): task_id, self.k8s_pending_queue_name, e)) return - if task_data.execution.docker_cmd: - docker_cmd = task_data.execution.docker_cmd - else: - docker_cmd = str(ENV_DOCKER_IMAGE.get() or - self._session.config.get("agent.default_docker.image", "nvidia/cuda")) - - # take the first part, this is the docker image name (not arguments) - docker_parts = docker_cmd.split() - docker_image = docker_parts[0] - docker_args = docker_parts[1:] if len(docker_parts) > 1 else [] + container = get_task_container(self._session, task_id) + if not container.get('image'): + container['image'] = str( + ENV_DOCKER_IMAGE.get() or self._session.config.get("agent.default_docker.image", "nvidia/cuda") + ) + container['arguments'] = self._session.config.get("agent.default_docker.arguments", None) # get the clearml.conf encoded file # noinspection PyProtectedMember @@ -381,12 +377,15 @@ class K8sIntegration(Worker): if self.template_dict: output, error = self._kubectl_apply( create_clearml_conf=create_clearml_conf, - labels=labels, docker_image=docker_image, docker_args=docker_args, + labels=labels, + docker_image=container['container'], + docker_args=container['arguments'], + docker_bash=container.get('setup_shell_script'), task_id=task_id, queue=queue) else: output, error = self._kubectl_run( create_clearml_conf=create_clearml_conf, - labels=labels, docker_image=docker_cmd, + labels=labels, docker_image=container['container'], task_data=task_data, task_id=task_id, queue=queue) @@ -435,7 +434,7 @@ class K8sIntegration(Worker): self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd)) return {'env': kube_args} if kube_args else {} - def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id): + def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_id): template = deepcopy(self.template_dict) template.setdefault('apiVersion', 'v1') template['kind'] = 'Pod' @@ -454,9 +453,15 @@ class K8sIntegration(Worker): container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \ else self.container_bash_script + extra_docker_bash_script = '\n'.join(self._session.config.get("agent.extra_docker_shell_script", None) or []) + if docker_bash: + extra_docker_bash_script += '\n' + str(docker_bash) + '\n' + script_encoded = '\n'.join( ['#!/bin/bash', ] + - [line.format(extra_bash_init_cmd=self.extra_bash_init_script or '', task_id=task_id) + [line.format(extra_bash_init_cmd=self.extra_bash_init_script or '', + task_id=task_id, + extra_docker_bash_script=extra_docker_bash_script) for line in container_bash_script]) create_init_script = \ @@ -572,7 +577,7 @@ class K8sIntegration(Worker): # iterate over queues (priority style, queues[0] is highest) for queue in queues: # delete old completed / failed pods - get_bash_output(self.KUBECTL_DELETE_CMD.format(namespace=self.namespace)) + get_bash_output(self.KUBECTL_DELETE_CMD.format(namespace=self.namespace, selector=self.AGENT_LABEL)) # get next task in queue try: