diff --git a/clearml_agent/glue/pending_pods_daemon.py b/clearml_agent/glue/pending_pods_daemon.py index 006f562..59a19bc 100644 --- a/clearml_agent/glue/pending_pods_daemon.py +++ b/clearml_agent/glue/pending_pods_daemon.py @@ -17,17 +17,16 @@ class PendingPodsDaemon(K8sDaemon): self._polling_interval = polling_interval self._last_tasks_msgs = {} # last msg updated for every task - def get_pods(self): + def get_pods(self, pod_name=None): + filters = ["status.phase=Pending"] + if pod_name: + filters.append(f"metadata.name={pod_name}") + if self._agent.using_jobs: return self._agent.get_pods_for_jobs( - job_condition="status.active=1", - pod_filters=["status.phase=Pending"], - debug_msg="Detecting pending pods: {cmd}" + job_condition="status.active=1", pod_filters=filters, debug_msg="Detecting pending pods: {cmd}" ) - return self._agent.get_pods( - filters=["status.phase=Pending"], - debug_msg="Detecting pending pods: {cmd}" - ) + return self._agent.get_pods(filters=filters, debug_msg="Detecting pending pods: {cmd}") def _get_pod_name(self, pod: dict): return get_path(pod, "metadata", "name") @@ -149,9 +148,7 @@ class PendingPodsDaemon(K8sDaemon): "id": list(pending_tasks_details), "status": ["stopped"], "only_fields": ["id"] - }, - method=Request.def_method, - async_enable=False, + } ) aborted_task_ids = list(filter(None, (task.get("id") for task in result["tasks"]))) @@ -160,11 +157,27 @@ class PendingPodsDaemon(K8sDaemon): if not pod: self.log.error("Failed locating aborted task {} in pending pods list".format(task_id)) continue + + pod_name = self._get_pod_name(pod) + if not self.get_pods(pod_name=pod_name): + self.log.debug("K8S Glue pending monitor: pod {} is no longer pending, skipping".format(pod_name)) + continue + resource_name = self._get_k8s_resource_name(pod) self.log.info( "K8S Glue pending monitor: task {} was aborted but the k8s resource {} is still pending, " "deleting pod".format(task_id, resource_name) ) + + result = self._session.get( + service='tasks', + action='get_all', + json={"id": [task_id], "status": ["stopped"], "only_fields": ["id"]}, + ) + if not result["tasks"]: + self.log.debug("K8S Glue pending monitor: task {} is no longer aborted, skipping".format(task_id)) + continue + output = self.delete_k8s_resource(k8s_resource=pod, msg="Pending resource of an aborted task") if not output: self.log.warning("K8S Glue pending monitor: failed deleting resource {}".format(resource_name))