diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index db1dc14..ac6bb2d 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -1291,12 +1291,9 @@ class Worker(ServiceCommandSection): raise ValueError("Running in Docker mode, 'docker' command was not found") self._worker_tags = kwargs.get('child_report_tags', None) - self._impersonate_as_task_owner = kwargs.get('use_owner_token', False) - if self._impersonate_as_task_owner: - if not self._session.check_min_api_version("2.14"): - raise ValueError("Server does not support --use-owner-token option (incompatible API version)") - if self._session.feature_set == "basic": - raise ValueError("Server does not support --use-owner-token option") + + self._use_owner_token(kwargs.get('use_owner_token', False)) + self._standalone_mode = kwargs.get('standalone_mode', False) self._services_mode = kwargs.get('services_mode', False) # must have docker in services_mode @@ -3953,6 +3950,14 @@ class Worker(ServiceCommandSection): # type: (str) -> bool return re.fullmatch(r"^[a-zA-Z0-9][a-zA-Z0-9_.-]+$", name) is not None + def _use_owner_token(self, use_owner_token=False): + self._impersonate_as_task_owner = use_owner_token + if self._impersonate_as_task_owner: + if not self._session.check_min_api_version("2.14"): + raise ValueError("Server does not support --use-owner-token option (incompatible API version)") + if self._session.feature_set == "basic": + raise ValueError("Server does not support --use-owner-token option") + if __name__ == "__main__": pass diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index 54607f4..43b614d 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -18,7 +18,7 @@ from typing import Text, List, Callable, Any, Collection, Optional, Union import yaml from clearml_agent.commands.events import Events -from clearml_agent.commands.worker import Worker, get_task_container, set_task_container +from clearml_agent.commands.worker import Worker, get_task_container, set_task_container, get_next_task from clearml_agent.definitions import ENV_DOCKER_IMAGE from clearml_agent.errors import APIError from clearml_agent.helper.base import safe_remove_file @@ -362,7 +362,7 @@ class K8sIntegration(Worker): print('Failed getting number of used pods: {}'.format(ex)) return -2 - def run_one_task(self, queue: Text, task_id: Text, worker_args=None, **_): + def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_): print('Pulling task {} launching on kubernetes cluster'.format(task_id)) task_data = self._session.api_client.tasks.get_all(id=[task_id])[0] @@ -398,11 +398,19 @@ class K8sIntegration(Worker): self.conf_file_content or Path(self._session._config_file).read_text() ).encode("ascii") - create_clearml_conf = "echo '{}' | base64 --decode >> ~/clearml.conf".format( + + create_clearml_conf = ["echo '{}' | base64 --decode >> ~/clearml.conf".format( base64.b64encode( hocon_config_encoded ).decode('ascii') - ) + )] + + if task_session: + create_clearml_conf.append( + "export CLEARML_AUTH_TOKEN=$(echo '{}' | base64 --decode)".format( + base64.b64encode(task_session.token.encode("ascii")).decode('ascii') + ) + ) if self.ports_mode: print("Kubernetes looking for available pod to use") @@ -594,19 +602,22 @@ class K8sIntegration(Worker): extra_docker_bash_script=extra_docker_bash_script) for line in container_bash_script]) - create_init_script = \ - "echo '{}' | base64 --decode >> ~/__start_agent__.sh ; " \ + extra_bash_commands = list(create_clearml_conf or []) + + extra_bash_commands.append( + "echo '{}' | base64 --decode >> ~/__start_agent__.sh ; " "/bin/bash ~/__start_agent__.sh".format( base64.b64encode( script_encoded.encode('ascii') ).decode('ascii')) + ) # Notice: we always leave with exit code 0, so pods are never restarted container = self._merge_containers( container, dict(name=name, image=docker_image, command=['/bin/bash'], - args=['-c', '{} ; {} ; exit 0'.format(create_clearml_conf, create_init_script)]) + args=['-c', '{} ; exit 0'.format(' ; '.join(extra_bash_commands))]) ) if template['spec']['containers']: @@ -685,7 +696,7 @@ class K8sIntegration(Worker): "--", "/bin/sh", "-c", - "{} ; {}".format(create_clearml_conf, container_bash_script.format( + "{} ; {}".format(" ; ".join(create_clearml_conf or []), container_bash_script.format( extra_bash_init_cmd=self.extra_bash_init_script or "", extra_docker_bash_script=docker_bash or "", task_id=task_id @@ -742,14 +753,16 @@ class K8sIntegration(Worker): # get next task in queue try: - response = self._session.api_client.queues.get_next_task(queue=queue) + response = get_next_task( + self._session, queue=queue, get_task_info=self._impersonate_as_task_owner + ) except Exception as e: print("Warning: Could not access task queue [{}], error: {}".format(queue, e)) continue else: try: - task_id = response.entry.task - except AttributeError: + task_id = response["entry"]["task"] + except (KeyError, TypeError, AttributeError): print("No tasks in queue {}".format(queue)) continue events_service.send_log_events( @@ -761,8 +774,26 @@ class K8sIntegration(Worker): level="INFO", ) + task_session = None + if self._impersonate_as_task_owner: + try: + task_user = response["task_info"]["user"] + task_company = response["task_info"]["company"] + except (KeyError, TypeError, AttributeError): + print("Error: cannot retrieve owner user for the task '{}', skipping".format(task_id)) + continue + + task_session = self.get_task_session(task_user, task_company) + if not task_session: + print( + "Error: Could not login as the user '{}' for the task '{}', skipping".format( + task_user, task_id + ) + ) + continue + self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id)) - self.run_one_task(queue, task_id, worker_params) + self.run_one_task(queue, task_id, worker_params, task_session) self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues)) break else: @@ -773,7 +804,7 @@ class K8sIntegration(Worker): if self._session.config["agent.reload_config"]: self.reload_config() - def k8s_daemon(self, queue): + def k8s_daemon(self, queue, **kwargs): """ Start the k8s Glue service. This service will be pulling tasks from *queue* and scheduling them for execution using kubectl. @@ -784,8 +815,10 @@ class K8sIntegration(Worker): :param list(str) queue: queue name to pull from """ - return self.daemon(queues=[ObjectID(name=queue)] if queue else None, - log_level=logging.INFO, foreground=True, docker=False) + return self.daemon( + queues=[ObjectID(name=queue)] if queue else None, + log_level=logging.INFO, foreground=True, docker=False, **kwargs, + ) @classmethod def get_ssh_server_bash(cls, ssh_port_number): diff --git a/examples/k8s_glue_example.py b/examples/k8s_glue_example.py index dc69c37..7085238 100644 --- a/examples/k8s_glue_example.py +++ b/examples/k8s_glue_example.py @@ -65,6 +65,10 @@ def parse_args(): help="Limit the maximum number of pods that this service can run at the same time." "Should not be used with ports-mode" ) + parser.add_argument( + "--use-owner-token", action="store_true", default=False, + help="Generate and use task owner token for the execution of each task" + ) return parser.parse_args() @@ -87,7 +91,7 @@ def main(): ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None, namespace=args.namespace, max_pods_limit=args.max_pods or None, ) - k8s.k8s_daemon(args.queue) + k8s.k8s_daemon(args.queue, use_owner_token=args.use_owner_token) if __name__ == "__main__":