diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index 96e1267..32be4c9 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -37,7 +37,6 @@ from clearml_agent.helper.resource_monitor import ResourceMonitor from clearml_agent.interface.base import ObjectID - class K8sIntegration(Worker): K8S_PENDING_QUEUE = "k8s_scheduler" @@ -46,11 +45,11 @@ class K8sIntegration(Worker): KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f" - KUBECTL_DELETE_CMD = "kubectl delete pods " \ - "-l={agent_label} " \ - "--field-selector=status.phase!=Pending,status.phase!=Running " \ - "--namespace={namespace} " \ - "--output name" + KUBECTL_CLEANUP_DELETE_CMD = "kubectl delete pods " \ + "-l={agent_label} " \ + "--field-selector=status.phase!=Pending,status.phase!=Running " \ + "--namespace={namespace} " \ + "--output name" BASH_INSTALL_SSH_CMD = [ "apt-get update", @@ -263,7 +262,7 @@ class K8sIntegration(Worker): sleep(self._polling_interval) continue pods = output_config.get('items', []) - task_ids = set() + task_id_to_details = dict() for pod in pods: pod_name = pod.get('metadata', {}).get('name', None) if not pod_name: @@ -277,7 +276,7 @@ class K8sIntegration(Worker): if not namespace: continue - task_ids.add(task_id) + task_id_to_details[task_id] = (pod_name, namespace) msg = None @@ -337,8 +336,41 @@ class K8sIntegration(Worker): ) ) + if task_id_to_details: + try: + result = self._session.get( + service='tasks', + action='get_all', + json={"id": list(task_id_to_details), "status": ["stopped"], "only_fields": ["id"]}, + method=Request.def_method, + async_enable=False, + ) + aborted_task_ids = list(filter(None, (task.get("id") for task in result["tasks"]))) + + for task_id in aborted_task_ids: + pod_name, namespace = task_id_to_details.get(task_id) + if not pod_name: + self.log.error("Failed locating aborted task {} in pending pods list".format(task_id)) + continue + self.log.info( + "K8S Glue pods monitor: task {} was aborted by its pod {} is still pending, " + "deleting pod".format(task_id, pod_name) + ) + + kubectl_cmd = "kubectl delete pod {pod_name} --output name {namespace}".format( + namespace=f"--namespace={namespace}" if namespace else "", pod_name=pod_name, + ).strip() + self.log.debug("Deleting aborted task pending pod: {}".format(kubectl_cmd)) + output = stringify_bash_output(get_bash_output(kubectl_cmd)) + if not output: + self.log.warning("K8S Glue pods monitor: failed deleting pod {}".format(pod_name)) + except Exception as ex: + self.log.warning( + 'K8S Glue pods monitor: failed checking aborted tasks for hanging pods: {}'.format(ex) + ) + # clean up any last message for a task that wasn't seen as a pod - last_tasks_msgs = {k: v for k, v in last_tasks_msgs.items() if k in task_ids} + last_tasks_msgs = {k: v for k, v in last_tasks_msgs.items() if k in task_id_to_details} sleep(self._polling_interval) @@ -612,6 +644,7 @@ class K8sIntegration(Worker): "k8s-pod-number": pod_number, "k8s-pod-label": labels[0], "k8s-internal-pod-count": pod_count, + "k8s-agent": self._get_agent_label(), } ) @@ -779,7 +812,9 @@ class K8sIntegration(Worker): if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec: # Do not try to cleanup the same namespace too quickly continue - kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, agent_label=self._get_agent_label()) + kubectl_cmd = self.KUBECTL_CLEANUP_DELETE_CMD.format( + namespace=namespace, agent_label=self._get_agent_label() + ) self.log.debug("Deleting old/failed pods{} for ns {}: {}".format( extra_msg or "", namespace, kubectl_cmd )) @@ -796,6 +831,51 @@ class K8sIntegration(Worker): self.log.error("Failed deleting old/failed pods for ns %s: %s", namespace, str(ex)) finally: self._last_pod_cleanup_per_ns[namespace] = time() + + # Locate tasks belonging to deleted pods that are still marked as pending or running + tasks_to_abort = [] + try: + task_ids = list(filter(None, ( + pod_name[len(self.pod_name_prefix):].strip() + for pod_names in deleted_pods.values() + for pod_name in pod_names + ))) + if task_ids: + result = self._session.get( + service='tasks', + action='get_all', + json={"id": task_ids, "status": ["in_progress", "queued"], "only_fields": ["id", "status"]}, + method=Request.def_method, + ) + tasks_to_abort = result["tasks"] + except Exception as ex: + self.log.warning('Failed getting running tasks for deleted pods: {}'.format(ex)) + + for task in tasks_to_abort: + task_id = task.get("id") + status = task.get("status") + if not task_id or not status: + self.log.warning('Failed getting task information: id={}, status={}'.format(task_id, status)) + continue + try: + if status == "queued": + self._session.get( + service='tasks', + action='dequeue', + json={"task": task_id, "force": True, "status_reason": "Pod deleted (not pending or running)", + "status_message": "Pod deleted by agent {}".format(self.worker_id or "unknown")}, + method=Request.def_method, + ) + self._session.get( + service='tasks', + action='failed', + json={"task": task_id, "force": True, "status_reason": "Pod deleted (not pending or running)", + "status_message": "Pod deleted by agent {}".format(self.worker_id or "unknown")}, + method=Request.def_method, + ) + except Exception as ex: + self.log.warning('Failed setting task {} to status "failed": {}'.format(task_id, ex)) + return deleted_pods def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):