diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index d2719f6..6aed40b 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -638,18 +638,65 @@ class K8sIntegration(Worker): print("ERROR: no template for task {}, skipping".format(task_id)) return + pod_name = self.pod_name_prefix + str(task_id) + + self.apply_template_and_handle_result( + pod_name=pod_name, + clearml_conf_create_script=clearml_conf_create_script, + labels=labels, + queue=queue, + task_id=task_id, + namespace=namespace, + template=template, + docker_image=container['image'], + docker_args=container.get('arguments'), + docker_bash=container.get('setup_shell_script'), + session=session, + task_session=task_session, + pod_number=pod_number, + queue_name=queue_name, + task_data=task_data, + ports_mode=ports_mode, + pod_count=pod_count, + ) + + def apply_template_and_handle_result( + self, + pod_name, + clearml_conf_create_script: List[str], + labels, + queue, + task_id, + namespace, + template, + docker_image, + docker_args, + docker_bash, + session, + task_session, + queue_name, + task_data, + ports_mode, + pod_count, + pod_number=None, + base_spec: dict = None, + ): + """Apply the provided template with all custom settings and handle bookkeeping for the reaults""" + output, error, pod_name = self._kubectl_apply( + pod_name=pod_name, template=template, pod_number=pod_number, clearml_conf_create_script=clearml_conf_create_script, labels=labels, - docker_image=container['image'], - docker_args=container.get('arguments'), - docker_bash=container.get('setup_shell_script'), + docker_image=docker_image, + docker_args=docker_args, + docker_bash=docker_bash, task_id=task_id, queue=queue, namespace=namespace, task_token=task_session.token.encode("ascii") if task_session else None, + base_spec=base_spec, ) print('kubectl output:\n{}\n{}'.format(error, output)) @@ -657,17 +704,9 @@ class K8sIntegration(Worker): if self.ignore_kubectl_errors_re and self.ignore_kubectl_errors_re.match(error): print(f"Ignoring error due to {ENV_KUBECTL_IGNORE_ERROR.key}") else: - send_log = "Running kubectl encountered an error: {}".format(error) - self.log.error(send_log) - self.send_logs(task_id, send_log.splitlines()) - - # Make sure to remove the task from our k8s pending queue - self._session.api_client.queues.remove_task( - task=task_id, - queue=self.k8s_pending_queue_id, + self._set_task_failed_while_applying( + session, task_id, f"Running kubectl encountered an error: {error}" ) - # Set task as failed - session.api_client.tasks.failed(task_id, force=True) return if pod_name: @@ -680,6 +719,19 @@ class K8sIntegration(Worker): pod_number=pod_number, pod_count=pod_count, task_data=task_data ) + def _set_task_failed_while_applying(self, session, task_id: str, error: str): + send_log = "Running kubectl encountered an error: {}".format(error) + self.log.error(send_log) + self.send_logs(task_id, send_log.splitlines()) + + # Make sure to remove the task from our k8s pending queue + self._session.api_client.queues.remove_task( + task=task_id, + queue=self.k8s_pending_queue_id, + ) + # Set task as failed + session.api_client.tasks.failed(task_id, force=True) + def set_task_info( self, task_id: str, task_session, task_data, queue_name: str, ports_mode: bool, pod_number, pod_count ): @@ -860,6 +912,7 @@ class K8sIntegration(Worker): def _kubectl_apply( self, + pod_name, clearml_conf_create_script: List[str], docker_image, docker_args, @@ -871,6 +924,7 @@ class K8sIntegration(Worker): template, pod_number=None, task_token=None, + base_spec: dict = None, # base values for the spec (might be overridden) ): if "apiVersion" not in template: template["apiVersion"] = "batch/v1" if self.using_jobs else "v1" @@ -885,8 +939,7 @@ class K8sIntegration(Worker): template["kind"] = self.kind.capitalize() metadata = template.setdefault('metadata', {}) - name = self.pod_name_prefix + str(task_id) - metadata['name'] = name + metadata['name'] = pod_name def place_labels(metadata_dict): labels_dict = dict(pair.split('=', 1) for pair in labels) @@ -906,13 +959,16 @@ class K8sIntegration(Worker): spec = spec_template.setdefault('spec', {}) + if base_spec: + merge_dicts(spec, base_spec) + containers = spec.setdefault('containers', []) spec.setdefault('restartPolicy', 'Never') - task_worker_id = self.get_task_worker_id(template, task_id, name, namespace, queue) + task_worker_id = self.get_task_worker_id(template, task_id, pod_name, namespace, queue) container = self._create_template_container( - pod_name=name, + pod_name=pod_name, task_id=task_id, docker_image=docker_image, docker_args=docker_args, @@ -958,7 +1014,7 @@ class K8sIntegration(Worker): finally: safe_remove_file(yaml_file) - return stringify_bash_output(output), stringify_bash_output(error), name + return stringify_bash_output(output), stringify_bash_output(error), pod_name def _process_bash_lines_response(self, bash_cmd: str, raise_error=True): res = get_bash_output(bash_cmd, raise_error=raise_error) @@ -1043,7 +1099,7 @@ class K8sIntegration(Worker): deleted_pods = defaultdict(list) for namespace in namespaces: if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec: - # Do not try to cleanup the same namespace too quickly + # Do not try to clean up the same namespace too quickly continue try: @@ -1119,6 +1175,9 @@ class K8sIntegration(Worker): def check_if_suspended(self) -> bool: pass + def check_if_schedulable(self, queue: str) -> bool: + return True + def _ensure_pending_queue_exists(self): resolved_ids = self._resolve_queue_names( [self.k8s_pending_queue_name], @@ -1179,6 +1238,9 @@ class K8sIntegration(Worker): sleep(self._polling_interval) break + if not self.check_if_schedulable(queue): + continue + # get next task in queue try: # print(f"debug> getting tasks for queue {queue}") diff --git a/clearml_agent/glue/pending_pods_daemon.py b/clearml_agent/glue/pending_pods_daemon.py index 01d996e..6cfb9c3 100644 --- a/clearml_agent/glue/pending_pods_daemon.py +++ b/clearml_agent/glue/pending_pods_daemon.py @@ -1,11 +1,9 @@ from time import sleep -from typing import Dict, Tuple, Optional, List +from typing import Dict, List from clearml_agent.backend_api.session import Request from clearml_agent.glue.utilities import get_bash_output -from clearml_agent.helper.process import stringify_bash_output - from .daemon import K8sDaemon from .utilities import get_path from .errors import GetPodsError @@ -38,7 +36,11 @@ class PendingPodsDaemon(K8sDaemon): return get_path(pod, "metadata", "name") def _get_task_id(self, pod: dict): - return self._get_k8s_resource_name(pod).rpartition('-')[-1] + prefix, _, value = self._get_k8s_resource_name(pod).rpartition('-') + if len(value) > 4: + return value + # we assume this is a multi-node rank x (>0) pod + return prefix.rpartition('-')[-1] or value @staticmethod def _get_k8s_resource_namespace(pod: dict): @@ -239,6 +241,11 @@ class PendingPodsDaemon(K8sDaemon): result_msg = get_path(result.json(), 'meta', 'result_msg') raise Exception(result_msg or result.text) + self._agent.send_logs( + task_id, ["Kubernetes Pod status: {}".format(msg)], + session=self._session + ) + # update last msg for this task self._last_tasks_msgs[task_id] = msg except Exception as ex: