mirror of
https://github.com/clearml/clearml-agent
synced 2025-03-04 02:57:36 +00:00
Fix k8s glue task container handling fails parsing docker image
Fix k8s glue uses task container image arguments when no image is specified
This commit is contained in:
parent
44fc7dffe6
commit
53c106c3af
@ -17,7 +17,7 @@ from typing import Text, List
|
|||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from clearml_agent.commands.events import Events
|
from clearml_agent.commands.events import Events
|
||||||
from clearml_agent.commands.worker import Worker
|
from clearml_agent.commands.worker import Worker, get_task_container
|
||||||
from clearml_agent.definitions import ENV_DOCKER_IMAGE
|
from clearml_agent.definitions import ENV_DOCKER_IMAGE
|
||||||
from clearml_agent.errors import APIError
|
from clearml_agent.errors import APIError
|
||||||
from clearml_agent.helper.base import safe_remove_file
|
from clearml_agent.helper.base import safe_remove_file
|
||||||
@ -31,6 +31,8 @@ class K8sIntegration(Worker):
|
|||||||
K8S_PENDING_QUEUE = "k8s_scheduler"
|
K8S_PENDING_QUEUE = "k8s_scheduler"
|
||||||
|
|
||||||
K8S_DEFAULT_NAMESPACE = "clearml"
|
K8S_DEFAULT_NAMESPACE = "clearml"
|
||||||
|
AGENT_LABEL = "CLEARML=agent"
|
||||||
|
LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
|
||||||
|
|
||||||
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
|
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
|
||||||
|
|
||||||
@ -40,7 +42,7 @@ class K8sIntegration(Worker):
|
|||||||
"--namespace={namespace}"
|
"--namespace={namespace}"
|
||||||
|
|
||||||
KUBECTL_DELETE_CMD = "kubectl delete pods " \
|
KUBECTL_DELETE_CMD = "kubectl delete pods " \
|
||||||
"--selector=TRAINS=agent " \
|
"--selector={selector} " \
|
||||||
"--field-selector=status.phase!=Pending,status.phase!=Running " \
|
"--field-selector=status.phase!=Pending,status.phase!=Running " \
|
||||||
"--namespace={namespace}"
|
"--namespace={namespace}"
|
||||||
|
|
||||||
@ -72,12 +74,10 @@ class K8sIntegration(Worker):
|
|||||||
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
|
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
|
||||||
"$LOCAL_PYTHON -m pip install clearml-agent",
|
"$LOCAL_PYTHON -m pip install clearml-agent",
|
||||||
"{extra_bash_init_cmd}",
|
"{extra_bash_init_cmd}",
|
||||||
|
"{extra_docker_bash_script}",
|
||||||
"$LOCAL_PYTHON -m clearml_agent execute --full-monitoring --require-queue --id {task_id}"
|
"$LOCAL_PYTHON -m clearml_agent execute --full-monitoring --require-queue --id {task_id}"
|
||||||
]
|
]
|
||||||
|
|
||||||
AGENT_LABEL = "TRAINS=agent"
|
|
||||||
LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
|
|
||||||
|
|
||||||
_edit_hyperparams_version = "2.9"
|
_edit_hyperparams_version = "2.9"
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
@ -275,16 +275,12 @@ class K8sIntegration(Worker):
|
|||||||
task_id, self.k8s_pending_queue_name, e))
|
task_id, self.k8s_pending_queue_name, e))
|
||||||
return
|
return
|
||||||
|
|
||||||
if task_data.execution.docker_cmd:
|
container = get_task_container(self._session, task_id)
|
||||||
docker_cmd = task_data.execution.docker_cmd
|
if not container.get('image'):
|
||||||
else:
|
container['image'] = str(
|
||||||
docker_cmd = str(ENV_DOCKER_IMAGE.get() or
|
ENV_DOCKER_IMAGE.get() or self._session.config.get("agent.default_docker.image", "nvidia/cuda")
|
||||||
self._session.config.get("agent.default_docker.image", "nvidia/cuda"))
|
)
|
||||||
|
container['arguments'] = self._session.config.get("agent.default_docker.arguments", None)
|
||||||
# take the first part, this is the docker image name (not arguments)
|
|
||||||
docker_parts = docker_cmd.split()
|
|
||||||
docker_image = docker_parts[0]
|
|
||||||
docker_args = docker_parts[1:] if len(docker_parts) > 1 else []
|
|
||||||
|
|
||||||
# get the clearml.conf encoded file
|
# get the clearml.conf encoded file
|
||||||
# noinspection PyProtectedMember
|
# noinspection PyProtectedMember
|
||||||
@ -381,12 +377,15 @@ class K8sIntegration(Worker):
|
|||||||
if self.template_dict:
|
if self.template_dict:
|
||||||
output, error = self._kubectl_apply(
|
output, error = self._kubectl_apply(
|
||||||
create_clearml_conf=create_clearml_conf,
|
create_clearml_conf=create_clearml_conf,
|
||||||
labels=labels, docker_image=docker_image, docker_args=docker_args,
|
labels=labels,
|
||||||
|
docker_image=container['container'],
|
||||||
|
docker_args=container['arguments'],
|
||||||
|
docker_bash=container.get('setup_shell_script'),
|
||||||
task_id=task_id, queue=queue)
|
task_id=task_id, queue=queue)
|
||||||
else:
|
else:
|
||||||
output, error = self._kubectl_run(
|
output, error = self._kubectl_run(
|
||||||
create_clearml_conf=create_clearml_conf,
|
create_clearml_conf=create_clearml_conf,
|
||||||
labels=labels, docker_image=docker_cmd,
|
labels=labels, docker_image=container['container'],
|
||||||
task_data=task_data,
|
task_data=task_data,
|
||||||
task_id=task_id, queue=queue)
|
task_id=task_id, queue=queue)
|
||||||
|
|
||||||
@ -435,7 +434,7 @@ class K8sIntegration(Worker):
|
|||||||
self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd))
|
self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd))
|
||||||
return {'env': kube_args} if kube_args else {}
|
return {'env': kube_args} if kube_args else {}
|
||||||
|
|
||||||
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id):
|
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_id):
|
||||||
template = deepcopy(self.template_dict)
|
template = deepcopy(self.template_dict)
|
||||||
template.setdefault('apiVersion', 'v1')
|
template.setdefault('apiVersion', 'v1')
|
||||||
template['kind'] = 'Pod'
|
template['kind'] = 'Pod'
|
||||||
@ -454,9 +453,15 @@ class K8sIntegration(Worker):
|
|||||||
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
|
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
|
||||||
else self.container_bash_script
|
else self.container_bash_script
|
||||||
|
|
||||||
|
extra_docker_bash_script = '\n'.join(self._session.config.get("agent.extra_docker_shell_script", None) or [])
|
||||||
|
if docker_bash:
|
||||||
|
extra_docker_bash_script += '\n' + str(docker_bash) + '\n'
|
||||||
|
|
||||||
script_encoded = '\n'.join(
|
script_encoded = '\n'.join(
|
||||||
['#!/bin/bash', ] +
|
['#!/bin/bash', ] +
|
||||||
[line.format(extra_bash_init_cmd=self.extra_bash_init_script or '', task_id=task_id)
|
[line.format(extra_bash_init_cmd=self.extra_bash_init_script or '',
|
||||||
|
task_id=task_id,
|
||||||
|
extra_docker_bash_script=extra_docker_bash_script)
|
||||||
for line in container_bash_script])
|
for line in container_bash_script])
|
||||||
|
|
||||||
create_init_script = \
|
create_init_script = \
|
||||||
@ -572,7 +577,7 @@ class K8sIntegration(Worker):
|
|||||||
# iterate over queues (priority style, queues[0] is highest)
|
# iterate over queues (priority style, queues[0] is highest)
|
||||||
for queue in queues:
|
for queue in queues:
|
||||||
# delete old completed / failed pods
|
# delete old completed / failed pods
|
||||||
get_bash_output(self.KUBECTL_DELETE_CMD.format(namespace=self.namespace))
|
get_bash_output(self.KUBECTL_DELETE_CMD.format(namespace=self.namespace, selector=self.AGENT_LABEL))
|
||||||
|
|
||||||
# get next task in queue
|
# get next task in queue
|
||||||
try:
|
try:
|
||||||
|
Loading…
Reference in New Issue
Block a user