Add support for use-owner-token in k8s glue

This commit is contained in:
allegroai
2022-04-27 14:59:27 +03:00
parent c9ffb8a053
commit 2e5298b737
3 changed files with 64 additions and 22 deletions

View File

@@ -18,7 +18,7 @@ from typing import Text, List, Callable, Any, Collection, Optional, Union
import yaml
from clearml_agent.commands.events import Events
from clearml_agent.commands.worker import Worker, get_task_container, set_task_container
from clearml_agent.commands.worker import Worker, get_task_container, set_task_container, get_next_task
from clearml_agent.definitions import ENV_DOCKER_IMAGE
from clearml_agent.errors import APIError
from clearml_agent.helper.base import safe_remove_file
@@ -362,7 +362,7 @@ class K8sIntegration(Worker):
print('Failed getting number of used pods: {}'.format(ex))
return -2
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, **_):
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
@@ -398,11 +398,19 @@ class K8sIntegration(Worker):
self.conf_file_content
or Path(self._session._config_file).read_text()
).encode("ascii")
create_clearml_conf = "echo '{}' | base64 --decode >> ~/clearml.conf".format(
create_clearml_conf = ["echo '{}' | base64 --decode >> ~/clearml.conf".format(
base64.b64encode(
hocon_config_encoded
).decode('ascii')
)
)]
if task_session:
create_clearml_conf.append(
"export CLEARML_AUTH_TOKEN=$(echo '{}' | base64 --decode)".format(
base64.b64encode(task_session.token.encode("ascii")).decode('ascii')
)
)
if self.ports_mode:
print("Kubernetes looking for available pod to use")
@@ -594,19 +602,22 @@ class K8sIntegration(Worker):
extra_docker_bash_script=extra_docker_bash_script)
for line in container_bash_script])
create_init_script = \
"echo '{}' | base64 --decode >> ~/__start_agent__.sh ; " \
extra_bash_commands = list(create_clearml_conf or [])
extra_bash_commands.append(
"echo '{}' | base64 --decode >> ~/__start_agent__.sh ; "
"/bin/bash ~/__start_agent__.sh".format(
base64.b64encode(
script_encoded.encode('ascii')
).decode('ascii'))
)
# Notice: we always leave with exit code 0, so pods are never restarted
container = self._merge_containers(
container,
dict(name=name, image=docker_image,
command=['/bin/bash'],
args=['-c', '{} ; {} ; exit 0'.format(create_clearml_conf, create_init_script)])
args=['-c', '{} ; exit 0'.format(' ; '.join(extra_bash_commands))])
)
if template['spec']['containers']:
@@ -685,7 +696,7 @@ class K8sIntegration(Worker):
"--",
"/bin/sh",
"-c",
"{} ; {}".format(create_clearml_conf, container_bash_script.format(
"{} ; {}".format(" ; ".join(create_clearml_conf or []), container_bash_script.format(
extra_bash_init_cmd=self.extra_bash_init_script or "",
extra_docker_bash_script=docker_bash or "",
task_id=task_id
@@ -742,14 +753,16 @@ class K8sIntegration(Worker):
# get next task in queue
try:
response = self._session.api_client.queues.get_next_task(queue=queue)
response = get_next_task(
self._session, queue=queue, get_task_info=self._impersonate_as_task_owner
)
except Exception as e:
print("Warning: Could not access task queue [{}], error: {}".format(queue, e))
continue
else:
try:
task_id = response.entry.task
except AttributeError:
task_id = response["entry"]["task"]
except (KeyError, TypeError, AttributeError):
print("No tasks in queue {}".format(queue))
continue
events_service.send_log_events(
@@ -761,8 +774,26 @@ class K8sIntegration(Worker):
level="INFO",
)
task_session = None
if self._impersonate_as_task_owner:
try:
task_user = response["task_info"]["user"]
task_company = response["task_info"]["company"]
except (KeyError, TypeError, AttributeError):
print("Error: cannot retrieve owner user for the task '{}', skipping".format(task_id))
continue
task_session = self.get_task_session(task_user, task_company)
if not task_session:
print(
"Error: Could not login as the user '{}' for the task '{}', skipping".format(
task_user, task_id
)
)
continue
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
self.run_one_task(queue, task_id, worker_params)
self.run_one_task(queue, task_id, worker_params, task_session)
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
break
else:
@@ -773,7 +804,7 @@ class K8sIntegration(Worker):
if self._session.config["agent.reload_config"]:
self.reload_config()
def k8s_daemon(self, queue):
def k8s_daemon(self, queue, **kwargs):
"""
Start the k8s Glue service.
This service will be pulling tasks from *queue* and scheduling them for execution using kubectl.
@@ -784,8 +815,10 @@ class K8sIntegration(Worker):
:param list(str) queue: queue name to pull from
"""
return self.daemon(queues=[ObjectID(name=queue)] if queue else None,
log_level=logging.INFO, foreground=True, docker=False)
return self.daemon(
queues=[ObjectID(name=queue)] if queue else None,
log_level=logging.INFO, foreground=True, docker=False, **kwargs,
)
@classmethod
def get_ssh_server_bash(cls, ssh_port_number):