From 0cf485f7a939857131b70dfac957a50a6911c838 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 26 Nov 2020 01:15:49 +0200 Subject: [PATCH] Improve k8s nvidia container integration --- trains_agent/glue/k8s.py | 99 ++++++++++++++++++++++++---------------- 1 file changed, 60 insertions(+), 39 deletions(-) diff --git a/trains_agent/glue/k8s.py b/trains_agent/glue/k8s.py index 60aeee2..feb1cd4 100644 --- a/trains_agent/glue/k8s.py +++ b/trains_agent/glue/k8s.py @@ -12,8 +12,6 @@ import json from time import sleep from typing import Text, List -from pyhocon import HOCONConverter - from trains_agent.commands.events import Events from trains_agent.commands.worker import Worker from trains_agent.errors import APIError @@ -40,34 +38,34 @@ class K8sIntegration(Worker): "--field-selector=status.phase!=Pending,status.phase!=Running " \ "--namespace=trains" - BASH_INSTALL_SSH_CMD = \ - "apt-get install -y openssh-server ; " \ - "mkdir -p /var/run/sshd ; " \ - "echo 'root:training' | chpasswd ; " \ - "echo 'PermitRootLogin yes' >> /etc/ssh/sshd_config ; " \ - "sed -i 's/PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config ; " \ - "sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd ; " \ - "echo 'AcceptEnv TRAINS_API_ACCESS_KEY TRAINS_API_SECRET_KEY' >> /etc/ssh/sshd_config ; " \ - 'echo "export VISIBLE=now" >> /etc/profile ; ' \ - 'echo "export PATH=$PATH" >> /etc/profile ; ' \ - 'echo "ldconfig" >> /etc/profile ; ' \ - "/usr/sbin/sshd -p {port} & ; " + BASH_INSTALL_SSH_CMD = [ + "apt-get install -y openssh-server", + "mkdir -p /var/run/sshd", + "echo 'root:training' | chpasswd", + "echo 'PermitRootLogin yes' >> /etc/ssh/sshd_config", + "sed -i 's/PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config", + r"sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd", + "echo 'AcceptEnv TRAINS_API_ACCESS_KEY TRAINS_API_SECRET_KEY' >> /etc/ssh/sshd_config", + 'echo "export VISIBLE=now" >> /etc/profile', + 'echo "export PATH=$PATH" >> /etc/profile', + 'echo "ldconfig" >> /etc/profile', + "/usr/sbin/sshd -p {port}"] - CONTAINER_BASH_SCRIPT = \ - "export DEBIAN_FRONTEND='noninteractive'; " \ - "echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " \ - "chown -R root /root/.cache/pip ; " \ - "apt-get update ; " \ - "apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0 ; " \ - "declare LOCAL_PYTHON ; " \ - "for i in {{10..5}}; do which python3.$i && " \ - "python3.$i -m pip --version && " \ - "export LOCAL_PYTHON=$(which python3.$i) && break ; done ; " \ - "[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip ; " \ - "[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3 ; " \ - "$LOCAL_PYTHON -m pip install trains-agent ; " \ - "{extra_bash_init_cmd} ; " \ - "$LOCAL_PYTHON -m trains_agent execute --full-monitoring --require-queue --id {task_id} ; " + CONTAINER_BASH_SCRIPT = [ + "export DEBIAN_FRONTEND='noninteractive'", + "echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean", + "chown -R root /root/.cache/pip", + "apt-get update", + "apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0", + "declare LOCAL_PYTHON", + "for i in {{10..5}}; do which python3.$i && python3.$i -m pip --version && " + "export LOCAL_PYTHON=$(which python3.$i) && break ; done", + "[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip", + "[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3", + "$LOCAL_PYTHON -m pip install trains-agent", + "{extra_bash_init_cmd}", + "$LOCAL_PYTHON -m trains_agent execute --full-monitoring --require-queue --id {task_id}" + ] AGENT_LABEL = "TRAINS=agent" LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}" @@ -131,7 +129,7 @@ class K8sIntegration(Worker): self.template_dict = None self.extra_bash_init_script = extra_bash_init_script or None if self.extra_bash_init_script and not isinstance(self.extra_bash_init_script, str): - self.extra_bash_init_script = ' ; '.join(self.extra_bash_init_script) + self.extra_bash_init_script = ' ; '.join(self.extra_bash_init_script) # noqa self.pod_limits = [] self.pod_requests = [] if overrides_yaml: @@ -163,6 +161,8 @@ class K8sIntegration(Worker): if trains_conf_file: with open(os.path.expandvars(os.path.expanduser(str(trains_conf_file))), 'rt') as f: self.trains_conf_file = f.read() + # make sure we use system packages! + self.trains_conf_file += '\nagent.package_manager.system_site_packages=true\n' def _set_task_user_properties(self, task_id: str, **properties: str): if self._edit_hyperparams_support is not True: @@ -223,7 +223,7 @@ class K8sIntegration(Worker): # get the trains.conf encoded file # noinspection PyProtectedMember hocon_config_encoded = (self.trains_conf_file or self._session._config_file).encode('ascii') - create_trains_conf = "echo '{}' | base64 --decode >> ~/trains.conf && ".format( + create_trains_conf = "echo '{}' | base64 --decode >> ~/trains.conf".format( base64.b64encode( hocon_config_encoded ).decode('ascii') @@ -252,7 +252,7 @@ class K8sIntegration(Worker): self.log.warning( "kubectl last result: {}\n{}\nAll k8s services are in use, task '{}' " "will be enqueued back to queue '{}'".format( - error,output, task_id, queue + error, output, task_id, queue ) ) self._session.api_client.tasks.reset(task_id) @@ -326,13 +326,28 @@ class K8sIntegration(Worker): template['metadata'].setdefault('labels', {}) template['metadata']['labels'].update(labels_dict) container = self._parse_docker_args(docker_args) + + container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \ + else self.container_bash_script + + script_encoded = '\n'.join( + ['#!/bin/bash', ] + + [line.format(extra_bash_init_cmd=self.extra_bash_init_script or '', task_id=task_id) + for line in container_bash_script]) + + create_init_script = \ + "echo '{}' | base64 --decode >> ~/__start_agent__.sh ; " \ + "/bin/bash ~/__start_agent__.sh".format( + base64.b64encode( + script_encoded.encode('ascii') + ).decode('ascii')) + container = merge_dicts( container, dict(name=name, image=docker_image, command=['/bin/bash'], - args=['-c', create_trains_conf + self.container_bash_script.format( - extra_bash_init_cmd=self.extra_bash_init_script, - task_id=task_id)])) + args=['-c', '{} ; {}'.format(create_trains_conf, create_init_script)]) + ) if template['spec']['containers']: template['spec']['containers'][0] = merge_dicts(template['spec']['containers'][0], container) @@ -386,14 +401,18 @@ class K8sIntegration(Worker): if self.pod_requests: kubectl_cmd += ['--requests', ",".join(self.pod_requests)] + container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \ + else self.container_bash_script + container_bash_script = ' ; '.join(container_bash_script) + kubectl_cmd += [ "--labels=" + ",".join(labels), "--command", "--", "/bin/sh", "-c", - create_trains_conf + self.container_bash_script.format( - extra_bash_init_cmd=self.extra_bash_init_script, task_id=task_id), + "{} ; {}".format(create_trains_conf, container_bash_script.format( + extra_bash_init_cmd=self.extra_bash_init_script, task_id=task_id)), ] process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = process.communicate() @@ -413,6 +432,7 @@ class K8sIntegration(Worker): events_service = self.get_service(Events) # make sure we have a k8s pending queue + # noinspection PyBroadException try: self._session.api_client.queues.create(self.k8s_pending_queue_name) except Exception: @@ -471,8 +491,9 @@ class K8sIntegration(Worker): :param list(str) queue: queue name to pull from """ - return self.daemon(queues=[ObjectID(name=queue)] if queue else None, log_level=logging.INFO, foreground=True, docker=False) + return self.daemon(queues=[ObjectID(name=queue)] if queue else None, + log_level=logging.INFO, foreground=True, docker=False) @classmethod def get_ssh_server_bash(cls, ssh_port_number): - return cls.BASH_INSTALL_SSH_CMD.format(port=ssh_port_number) + return ' ; '.join(line.format(port=ssh_port_number) for line in cls.BASH_INSTALL_SSH_CMD)