When cleaning up pending pods, verify task is still aborted and pod is still pending before deleting the pod

This commit is contained in:
allegroai 2023-11-01 15:04:01 +02:00
parent 52405c343d
commit 9c6cb421b3

View File

@ -17,17 +17,16 @@ class PendingPodsDaemon(K8sDaemon):
self._polling_interval = polling_interval
self._last_tasks_msgs = {} # last msg updated for every task
def get_pods(self):
def get_pods(self, pod_name=None):
filters = ["status.phase=Pending"]
if pod_name:
filters.append(f"metadata.name={pod_name}")
if self._agent.using_jobs:
return self._agent.get_pods_for_jobs(
job_condition="status.active=1",
pod_filters=["status.phase=Pending"],
debug_msg="Detecting pending pods: {cmd}"
)
return self._agent.get_pods(
filters=["status.phase=Pending"],
debug_msg="Detecting pending pods: {cmd}"
job_condition="status.active=1", pod_filters=filters, debug_msg="Detecting pending pods: {cmd}"
)
return self._agent.get_pods(filters=filters, debug_msg="Detecting pending pods: {cmd}")
def _get_pod_name(self, pod: dict):
return get_path(pod, "metadata", "name")
@ -149,9 +148,7 @@ class PendingPodsDaemon(K8sDaemon):
"id": list(pending_tasks_details),
"status": ["stopped"],
"only_fields": ["id"]
},
method=Request.def_method,
async_enable=False,
}
)
aborted_task_ids = list(filter(None, (task.get("id") for task in result["tasks"])))
@ -160,11 +157,27 @@ class PendingPodsDaemon(K8sDaemon):
if not pod:
self.log.error("Failed locating aborted task {} in pending pods list".format(task_id))
continue
pod_name = self._get_pod_name(pod)
if not self.get_pods(pod_name=pod_name):
self.log.debug("K8S Glue pending monitor: pod {} is no longer pending, skipping".format(pod_name))
continue
resource_name = self._get_k8s_resource_name(pod)
self.log.info(
"K8S Glue pending monitor: task {} was aborted but the k8s resource {} is still pending, "
"deleting pod".format(task_id, resource_name)
)
result = self._session.get(
service='tasks',
action='get_all',
json={"id": [task_id], "status": ["stopped"], "only_fields": ["id"]},
)
if not result["tasks"]:
self.log.debug("K8S Glue pending monitor: task {} is no longer aborted, skipping".format(task_id))
continue
output = self.delete_k8s_resource(k8s_resource=pod, msg="Pending resource of an aborted task")
if not output:
self.log.warning("K8S Glue pending monitor: failed deleting resource {}".format(resource_name))