Refactor k8s glue running/used pods getter

This commit is contained in:
allegroai 2023-05-21 22:56:12 +03:00
parent 685918fd9b
commit 95dadca45c

View File

@ -9,7 +9,7 @@ import os
import re import re
import subprocess import subprocess
import tempfile import tempfile
from collections import defaultdict from collections import defaultdict, namedtuple
from copy import deepcopy from copy import deepcopy
from pathlib import Path from pathlib import Path
from pprint import pformat from pprint import pformat
@ -42,6 +42,7 @@ class K8sIntegration(Worker):
K8S_DEFAULT_NAMESPACE = "clearml" K8S_DEFAULT_NAMESPACE = "clearml"
AGENT_LABEL = "CLEARML=agent" AGENT_LABEL = "CLEARML=agent"
QUEUE_LABEL = "clearml-agent-queue"
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f" KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
@ -408,34 +409,50 @@ class K8sIntegration(Worker):
return self._agent_label return self._agent_label
def _get_used_pods(self): RunningPod = namedtuple("RunningPod", "name queue namespace")
# type: () -> Tuple[int, Set[str]]
# noinspection PyBroadException def _get_running_pods(self):
try: try:
kubectl_cmd = self.get_kubectl_command( kubectl_cmd = self.get_kubectl_command(
"get pods", "get pods",
output="jsonpath=\"{range .items[*]}{.metadata.name}{' '}{.metadata.namespace}{'\\n'}{end}\"" output="jsonpath=\"{{range .items[*]}}{{.metadata.name}}{{' '}}{{.metadata.namespace}}{{' '}}"
"{{.metadata.labels.{}}}{{'\\n'}}{{end}}\"".format(self.QUEUE_LABEL)
) )
self.log.debug("Getting used pods: {}".format(kubectl_cmd)) self.log.debug("Getting used pods: {}".format(kubectl_cmd))
output = stringify_bash_output(get_bash_output(kubectl_cmd, raise_error=True)) output = stringify_bash_output(get_bash_output(kubectl_cmd, raise_error=True))
if not output: if not output:
# No such pod exist so we can use the pod_number we found # No such pod exist so we can use the pod_number we found
return 0, set([]) return []
try: try:
items = output.splitlines() return [
current_pod_count = len(items) self.RunningPod(
namespaces = {item.rpartition(" ")[-1] for item in items} name=parts[0],
self.log.debug(" - found {} pods in namespaces {}".format(current_pod_count, ", ".join(namespaces))) namespace=parts[1],
except (KeyError, ValueError, TypeError, AttributeError) as ex: queue=parts[2]
print("Failed parsing used pods command response for cleanup: {}".format(ex)) )
return -1, set([]) for parts in (line.split(" ") for line in output.splitlines())
]
except Exception as ex:
raise Exception("Failed parsing used pods command response for cleanup: {}".format(ex))
except Exception as ex:
raise Exception('Failed obtaining used pods information: {}'.format(ex))
def _get_used_pods(self):
# type: () -> Tuple[int, Set[str]]
# noinspection PyBroadException
try:
items = self._get_running_pods()
if not items:
return 0, set([])
current_pod_count = len(items)
namespaces = {item.namespace for item in items}
self.log.debug(" - found {} pods in namespaces {}".format(current_pod_count, ", ".join(namespaces)))
return current_pod_count, namespaces return current_pod_count, namespaces
except Exception as ex: except Exception as ex:
print('Failed obtaining used pods information: {}'.format(ex)) self.log.debug("Failed getting used pods: {}", ex)
return -2, set([]) return -1, set([])
def _is_same_tenant(self, task_session): def _is_same_tenant(self, task_session):
if not task_session or task_session is self._session: if not task_session or task_session is self._session:
@ -657,8 +674,8 @@ class K8sIntegration(Worker):
def _get_pod_labels(self, queue, queue_name): def _get_pod_labels(self, queue, queue_name):
return [ return [
self._get_agent_label(), self._get_agent_label(),
"clearml-agent-queue={}".format(self._safe_k8s_label_value(queue)), "{}={}".format(self.QUEUE_LABEL, self._safe_k8s_label_value(queue)),
"clearml-agent-queue-name={}".format(self._safe_k8s_label_value(queue_name)) "{}-name={}".format(self.QUEUE_LABEL, self._safe_k8s_label_value(queue_name))
] ]
def _get_docker_args(self, docker_args, flags, target=None, convert=None): def _get_docker_args(self, docker_args, flags, target=None, convert=None):