mirror of
https://github.com/clearml/clearml-agent
synced 2025-03-13 06:58:37 +00:00
Improve k8s nvidia container integration
This commit is contained in:
parent
ea63e4f66e
commit
0cf485f7a9
@ -12,8 +12,6 @@ import json
|
||||
from time import sleep
|
||||
from typing import Text, List
|
||||
|
||||
from pyhocon import HOCONConverter
|
||||
|
||||
from trains_agent.commands.events import Events
|
||||
from trains_agent.commands.worker import Worker
|
||||
from trains_agent.errors import APIError
|
||||
@ -40,34 +38,34 @@ class K8sIntegration(Worker):
|
||||
"--field-selector=status.phase!=Pending,status.phase!=Running " \
|
||||
"--namespace=trains"
|
||||
|
||||
BASH_INSTALL_SSH_CMD = \
|
||||
"apt-get install -y openssh-server ; " \
|
||||
"mkdir -p /var/run/sshd ; " \
|
||||
"echo 'root:training' | chpasswd ; " \
|
||||
"echo 'PermitRootLogin yes' >> /etc/ssh/sshd_config ; " \
|
||||
"sed -i 's/PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config ; " \
|
||||
"sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd ; " \
|
||||
"echo 'AcceptEnv TRAINS_API_ACCESS_KEY TRAINS_API_SECRET_KEY' >> /etc/ssh/sshd_config ; " \
|
||||
'echo "export VISIBLE=now" >> /etc/profile ; ' \
|
||||
'echo "export PATH=$PATH" >> /etc/profile ; ' \
|
||||
'echo "ldconfig" >> /etc/profile ; ' \
|
||||
"/usr/sbin/sshd -p {port} & ; "
|
||||
BASH_INSTALL_SSH_CMD = [
|
||||
"apt-get install -y openssh-server",
|
||||
"mkdir -p /var/run/sshd",
|
||||
"echo 'root:training' | chpasswd",
|
||||
"echo 'PermitRootLogin yes' >> /etc/ssh/sshd_config",
|
||||
"sed -i 's/PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config",
|
||||
r"sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd",
|
||||
"echo 'AcceptEnv TRAINS_API_ACCESS_KEY TRAINS_API_SECRET_KEY' >> /etc/ssh/sshd_config",
|
||||
'echo "export VISIBLE=now" >> /etc/profile',
|
||||
'echo "export PATH=$PATH" >> /etc/profile',
|
||||
'echo "ldconfig" >> /etc/profile',
|
||||
"/usr/sbin/sshd -p {port}"]
|
||||
|
||||
CONTAINER_BASH_SCRIPT = \
|
||||
"export DEBIAN_FRONTEND='noninteractive'; " \
|
||||
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " \
|
||||
"chown -R root /root/.cache/pip ; " \
|
||||
"apt-get update ; " \
|
||||
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0 ; " \
|
||||
"declare LOCAL_PYTHON ; " \
|
||||
"for i in {{10..5}}; do which python3.$i && " \
|
||||
"python3.$i -m pip --version && " \
|
||||
"export LOCAL_PYTHON=$(which python3.$i) && break ; done ; " \
|
||||
"[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip ; " \
|
||||
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3 ; " \
|
||||
"$LOCAL_PYTHON -m pip install trains-agent ; " \
|
||||
"{extra_bash_init_cmd} ; " \
|
||||
"$LOCAL_PYTHON -m trains_agent execute --full-monitoring --require-queue --id {task_id} ; "
|
||||
CONTAINER_BASH_SCRIPT = [
|
||||
"export DEBIAN_FRONTEND='noninteractive'",
|
||||
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean",
|
||||
"chown -R root /root/.cache/pip",
|
||||
"apt-get update",
|
||||
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0",
|
||||
"declare LOCAL_PYTHON",
|
||||
"for i in {{10..5}}; do which python3.$i && python3.$i -m pip --version && "
|
||||
"export LOCAL_PYTHON=$(which python3.$i) && break ; done",
|
||||
"[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip",
|
||||
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
|
||||
"$LOCAL_PYTHON -m pip install trains-agent",
|
||||
"{extra_bash_init_cmd}",
|
||||
"$LOCAL_PYTHON -m trains_agent execute --full-monitoring --require-queue --id {task_id}"
|
||||
]
|
||||
|
||||
AGENT_LABEL = "TRAINS=agent"
|
||||
LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
|
||||
@ -131,7 +129,7 @@ class K8sIntegration(Worker):
|
||||
self.template_dict = None
|
||||
self.extra_bash_init_script = extra_bash_init_script or None
|
||||
if self.extra_bash_init_script and not isinstance(self.extra_bash_init_script, str):
|
||||
self.extra_bash_init_script = ' ; '.join(self.extra_bash_init_script)
|
||||
self.extra_bash_init_script = ' ; '.join(self.extra_bash_init_script) # noqa
|
||||
self.pod_limits = []
|
||||
self.pod_requests = []
|
||||
if overrides_yaml:
|
||||
@ -163,6 +161,8 @@ class K8sIntegration(Worker):
|
||||
if trains_conf_file:
|
||||
with open(os.path.expandvars(os.path.expanduser(str(trains_conf_file))), 'rt') as f:
|
||||
self.trains_conf_file = f.read()
|
||||
# make sure we use system packages!
|
||||
self.trains_conf_file += '\nagent.package_manager.system_site_packages=true\n'
|
||||
|
||||
def _set_task_user_properties(self, task_id: str, **properties: str):
|
||||
if self._edit_hyperparams_support is not True:
|
||||
@ -223,7 +223,7 @@ class K8sIntegration(Worker):
|
||||
# get the trains.conf encoded file
|
||||
# noinspection PyProtectedMember
|
||||
hocon_config_encoded = (self.trains_conf_file or self._session._config_file).encode('ascii')
|
||||
create_trains_conf = "echo '{}' | base64 --decode >> ~/trains.conf && ".format(
|
||||
create_trains_conf = "echo '{}' | base64 --decode >> ~/trains.conf".format(
|
||||
base64.b64encode(
|
||||
hocon_config_encoded
|
||||
).decode('ascii')
|
||||
@ -252,7 +252,7 @@ class K8sIntegration(Worker):
|
||||
self.log.warning(
|
||||
"kubectl last result: {}\n{}\nAll k8s services are in use, task '{}' "
|
||||
"will be enqueued back to queue '{}'".format(
|
||||
error,output, task_id, queue
|
||||
error, output, task_id, queue
|
||||
)
|
||||
)
|
||||
self._session.api_client.tasks.reset(task_id)
|
||||
@ -326,13 +326,28 @@ class K8sIntegration(Worker):
|
||||
template['metadata'].setdefault('labels', {})
|
||||
template['metadata']['labels'].update(labels_dict)
|
||||
container = self._parse_docker_args(docker_args)
|
||||
|
||||
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
|
||||
else self.container_bash_script
|
||||
|
||||
script_encoded = '\n'.join(
|
||||
['#!/bin/bash', ] +
|
||||
[line.format(extra_bash_init_cmd=self.extra_bash_init_script or '', task_id=task_id)
|
||||
for line in container_bash_script])
|
||||
|
||||
create_init_script = \
|
||||
"echo '{}' | base64 --decode >> ~/__start_agent__.sh ; " \
|
||||
"/bin/bash ~/__start_agent__.sh".format(
|
||||
base64.b64encode(
|
||||
script_encoded.encode('ascii')
|
||||
).decode('ascii'))
|
||||
|
||||
container = merge_dicts(
|
||||
container,
|
||||
dict(name=name, image=docker_image,
|
||||
command=['/bin/bash'],
|
||||
args=['-c', create_trains_conf + self.container_bash_script.format(
|
||||
extra_bash_init_cmd=self.extra_bash_init_script,
|
||||
task_id=task_id)]))
|
||||
args=['-c', '{} ; {}'.format(create_trains_conf, create_init_script)])
|
||||
)
|
||||
|
||||
if template['spec']['containers']:
|
||||
template['spec']['containers'][0] = merge_dicts(template['spec']['containers'][0], container)
|
||||
@ -386,14 +401,18 @@ class K8sIntegration(Worker):
|
||||
if self.pod_requests:
|
||||
kubectl_cmd += ['--requests', ",".join(self.pod_requests)]
|
||||
|
||||
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
|
||||
else self.container_bash_script
|
||||
container_bash_script = ' ; '.join(container_bash_script)
|
||||
|
||||
kubectl_cmd += [
|
||||
"--labels=" + ",".join(labels),
|
||||
"--command",
|
||||
"--",
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
create_trains_conf + self.container_bash_script.format(
|
||||
extra_bash_init_cmd=self.extra_bash_init_script, task_id=task_id),
|
||||
"{} ; {}".format(create_trains_conf, container_bash_script.format(
|
||||
extra_bash_init_cmd=self.extra_bash_init_script, task_id=task_id)),
|
||||
]
|
||||
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
output, error = process.communicate()
|
||||
@ -413,6 +432,7 @@ class K8sIntegration(Worker):
|
||||
events_service = self.get_service(Events)
|
||||
|
||||
# make sure we have a k8s pending queue
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self._session.api_client.queues.create(self.k8s_pending_queue_name)
|
||||
except Exception:
|
||||
@ -471,8 +491,9 @@ class K8sIntegration(Worker):
|
||||
|
||||
:param list(str) queue: queue name to pull from
|
||||
"""
|
||||
return self.daemon(queues=[ObjectID(name=queue)] if queue else None, log_level=logging.INFO, foreground=True, docker=False)
|
||||
return self.daemon(queues=[ObjectID(name=queue)] if queue else None,
|
||||
log_level=logging.INFO, foreground=True, docker=False)
|
||||
|
||||
@classmethod
|
||||
def get_ssh_server_bash(cls, ssh_port_number):
|
||||
return cls.BASH_INSTALL_SSH_CMD.format(port=ssh_port_number)
|
||||
return ' ; '.join(line.format(port=ssh_port_number) for line in cls.BASH_INSTALL_SSH_CMD)
|
||||
|
Loading…
Reference in New Issue
Block a user