Fix K8s glue does not delete pending pods if the tasks they represent were aborted

This commit is contained in:
allegroai 2023-02-05 10:32:16 +02:00
parent 00e8e9eb5a
commit 4f17a2c17d

View File

@ -37,7 +37,6 @@ from clearml_agent.helper.resource_monitor import ResourceMonitor
from clearml_agent.interface.base import ObjectID from clearml_agent.interface.base import ObjectID
class K8sIntegration(Worker): class K8sIntegration(Worker):
K8S_PENDING_QUEUE = "k8s_scheduler" K8S_PENDING_QUEUE = "k8s_scheduler"
@ -46,7 +45,7 @@ class K8sIntegration(Worker):
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f" KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
KUBECTL_DELETE_CMD = "kubectl delete pods " \ KUBECTL_CLEANUP_DELETE_CMD = "kubectl delete pods " \
"-l={agent_label} " \ "-l={agent_label} " \
"--field-selector=status.phase!=Pending,status.phase!=Running " \ "--field-selector=status.phase!=Pending,status.phase!=Running " \
"--namespace={namespace} " \ "--namespace={namespace} " \
@ -263,7 +262,7 @@ class K8sIntegration(Worker):
sleep(self._polling_interval) sleep(self._polling_interval)
continue continue
pods = output_config.get('items', []) pods = output_config.get('items', [])
task_ids = set() task_id_to_details = dict()
for pod in pods: for pod in pods:
pod_name = pod.get('metadata', {}).get('name', None) pod_name = pod.get('metadata', {}).get('name', None)
if not pod_name: if not pod_name:
@ -277,7 +276,7 @@ class K8sIntegration(Worker):
if not namespace: if not namespace:
continue continue
task_ids.add(task_id) task_id_to_details[task_id] = (pod_name, namespace)
msg = None msg = None
@ -337,8 +336,41 @@ class K8sIntegration(Worker):
) )
) )
if task_id_to_details:
try:
result = self._session.get(
service='tasks',
action='get_all',
json={"id": list(task_id_to_details), "status": ["stopped"], "only_fields": ["id"]},
method=Request.def_method,
async_enable=False,
)
aborted_task_ids = list(filter(None, (task.get("id") for task in result["tasks"])))
for task_id in aborted_task_ids:
pod_name, namespace = task_id_to_details.get(task_id)
if not pod_name:
self.log.error("Failed locating aborted task {} in pending pods list".format(task_id))
continue
self.log.info(
"K8S Glue pods monitor: task {} was aborted by its pod {} is still pending, "
"deleting pod".format(task_id, pod_name)
)
kubectl_cmd = "kubectl delete pod {pod_name} --output name {namespace}".format(
namespace=f"--namespace={namespace}" if namespace else "", pod_name=pod_name,
).strip()
self.log.debug("Deleting aborted task pending pod: {}".format(kubectl_cmd))
output = stringify_bash_output(get_bash_output(kubectl_cmd))
if not output:
self.log.warning("K8S Glue pods monitor: failed deleting pod {}".format(pod_name))
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: failed checking aborted tasks for hanging pods: {}'.format(ex)
)
# clean up any last message for a task that wasn't seen as a pod # clean up any last message for a task that wasn't seen as a pod
last_tasks_msgs = {k: v for k, v in last_tasks_msgs.items() if k in task_ids} last_tasks_msgs = {k: v for k, v in last_tasks_msgs.items() if k in task_id_to_details}
sleep(self._polling_interval) sleep(self._polling_interval)
@ -612,6 +644,7 @@ class K8sIntegration(Worker):
"k8s-pod-number": pod_number, "k8s-pod-number": pod_number,
"k8s-pod-label": labels[0], "k8s-pod-label": labels[0],
"k8s-internal-pod-count": pod_count, "k8s-internal-pod-count": pod_count,
"k8s-agent": self._get_agent_label(),
} }
) )
@ -779,7 +812,9 @@ class K8sIntegration(Worker):
if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec: if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec:
# Do not try to cleanup the same namespace too quickly # Do not try to cleanup the same namespace too quickly
continue continue
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, agent_label=self._get_agent_label()) kubectl_cmd = self.KUBECTL_CLEANUP_DELETE_CMD.format(
namespace=namespace, agent_label=self._get_agent_label()
)
self.log.debug("Deleting old/failed pods{} for ns {}: {}".format( self.log.debug("Deleting old/failed pods{} for ns {}: {}".format(
extra_msg or "", namespace, kubectl_cmd extra_msg or "", namespace, kubectl_cmd
)) ))
@ -796,6 +831,51 @@ class K8sIntegration(Worker):
self.log.error("Failed deleting old/failed pods for ns %s: %s", namespace, str(ex)) self.log.error("Failed deleting old/failed pods for ns %s: %s", namespace, str(ex))
finally: finally:
self._last_pod_cleanup_per_ns[namespace] = time() self._last_pod_cleanup_per_ns[namespace] = time()
# Locate tasks belonging to deleted pods that are still marked as pending or running
tasks_to_abort = []
try:
task_ids = list(filter(None, (
pod_name[len(self.pod_name_prefix):].strip()
for pod_names in deleted_pods.values()
for pod_name in pod_names
)))
if task_ids:
result = self._session.get(
service='tasks',
action='get_all',
json={"id": task_ids, "status": ["in_progress", "queued"], "only_fields": ["id", "status"]},
method=Request.def_method,
)
tasks_to_abort = result["tasks"]
except Exception as ex:
self.log.warning('Failed getting running tasks for deleted pods: {}'.format(ex))
for task in tasks_to_abort:
task_id = task.get("id")
status = task.get("status")
if not task_id or not status:
self.log.warning('Failed getting task information: id={}, status={}'.format(task_id, status))
continue
try:
if status == "queued":
self._session.get(
service='tasks',
action='dequeue',
json={"task": task_id, "force": True, "status_reason": "Pod deleted (not pending or running)",
"status_message": "Pod deleted by agent {}".format(self.worker_id or "unknown")},
method=Request.def_method,
)
self._session.get(
service='tasks',
action='failed',
json={"task": task_id, "force": True, "status_reason": "Pod deleted (not pending or running)",
"status_message": "Pod deleted by agent {}".format(self.worker_id or "unknown")},
method=Request.def_method,
)
except Exception as ex:
self.log.warning('Failed setting task {} to status "failed": {}'.format(task_id, ex))
return deleted_pods return deleted_pods
def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs): def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):