From 95dadca45ca07cd2680e5a4e33e0f2fd90bc7aef Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 21 May 2023 22:56:12 +0300 Subject: [PATCH] Refactor k8s glue running/used pods getter --- clearml_agent/glue/k8s.py | 51 ++++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index 5d1c9e0..2eb64ca 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -9,7 +9,7 @@ import os import re import subprocess import tempfile -from collections import defaultdict +from collections import defaultdict, namedtuple from copy import deepcopy from pathlib import Path from pprint import pformat @@ -42,6 +42,7 @@ class K8sIntegration(Worker): K8S_DEFAULT_NAMESPACE = "clearml" AGENT_LABEL = "CLEARML=agent" + QUEUE_LABEL = "clearml-agent-queue" KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f" @@ -408,34 +409,50 @@ class K8sIntegration(Worker): return self._agent_label - def _get_used_pods(self): - # type: () -> Tuple[int, Set[str]] - # noinspection PyBroadException + RunningPod = namedtuple("RunningPod", "name queue namespace") + + def _get_running_pods(self): try: kubectl_cmd = self.get_kubectl_command( "get pods", - output="jsonpath=\"{range .items[*]}{.metadata.name}{' '}{.metadata.namespace}{'\\n'}{end}\"" + output="jsonpath=\"{{range .items[*]}}{{.metadata.name}}{{' '}}{{.metadata.namespace}}{{' '}}" + "{{.metadata.labels.{}}}{{'\\n'}}{{end}}\"".format(self.QUEUE_LABEL) ) self.log.debug("Getting used pods: {}".format(kubectl_cmd)) output = stringify_bash_output(get_bash_output(kubectl_cmd, raise_error=True)) if not output: # No such pod exist so we can use the pod_number we found - return 0, set([]) + return [] try: - items = output.splitlines() - current_pod_count = len(items) - namespaces = {item.rpartition(" ")[-1] for item in items} - self.log.debug(" - found {} pods in namespaces {}".format(current_pod_count, ", ".join(namespaces))) - except (KeyError, ValueError, TypeError, AttributeError) as ex: - print("Failed parsing used pods command response for cleanup: {}".format(ex)) - return -1, set([]) + return [ + self.RunningPod( + name=parts[0], + namespace=parts[1], + queue=parts[2] + ) + for parts in (line.split(" ") for line in output.splitlines()) + ] + except Exception as ex: + raise Exception("Failed parsing used pods command response for cleanup: {}".format(ex)) + except Exception as ex: + raise Exception('Failed obtaining used pods information: {}'.format(ex)) + def _get_used_pods(self): + # type: () -> Tuple[int, Set[str]] + # noinspection PyBroadException + try: + items = self._get_running_pods() + if not items: + return 0, set([]) + current_pod_count = len(items) + namespaces = {item.namespace for item in items} + self.log.debug(" - found {} pods in namespaces {}".format(current_pod_count, ", ".join(namespaces))) return current_pod_count, namespaces except Exception as ex: - print('Failed obtaining used pods information: {}'.format(ex)) - return -2, set([]) + self.log.debug("Failed getting used pods: {}", ex) + return -1, set([]) def _is_same_tenant(self, task_session): if not task_session or task_session is self._session: @@ -657,8 +674,8 @@ class K8sIntegration(Worker): def _get_pod_labels(self, queue, queue_name): return [ self._get_agent_label(), - "clearml-agent-queue={}".format(self._safe_k8s_label_value(queue)), - "clearml-agent-queue-name={}".format(self._safe_k8s_label_value(queue_name)) + "{}={}".format(self.QUEUE_LABEL, self._safe_k8s_label_value(queue)), + "{}-name={}".format(self.QUEUE_LABEL, self._safe_k8s_label_value(queue_name)) ] def _get_docker_args(self, docker_args, flags, target=None, convert=None):