From ca360b7d4344c39ca8f2816461d82dc922fcf2b4 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 15 Jul 2021 10:26:49 +0300 Subject: [PATCH] Improve max pod limit check --- clearml_agent/glue/k8s.py | 40 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index 4abccce..37d8f25 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -260,6 +260,32 @@ class K8sIntegration(Worker): if error.code == 404: self._edit_hyperparams_support = self._session.api_version + def _get_number_used_pods(self): + # noinspection PyBroadException + try: + kubectl_cmd_new = "kubectl get pods -l {agent_label} -n {namespace} -o json".format( + agent_label=self._get_agent_label(), + namespace=self.namespace, + ) + process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + output, error = process.communicate() + output = '' if not output else output if isinstance(output, str) else output.decode('utf-8') + error = '' if not error else error if isinstance(error, str) else error.decode('utf-8') + + if not output: + # No such pod exist so we can use the pod_number we found + return 0 + + try: + current_pod_count = len(json.loads(output).get("items", [])) + except (ValueError, TypeError) as ex: + return -1 + + return current_pod_count + except Exception as ex: + print('Failed getting number of used pods: {}'.format(ex)) + return -2 + def run_one_task(self, queue: Text, task_id: Text, worker_args=None, **_): print('Pulling task {} launching on kubernetes cluster'.format(task_id)) task_data = self._session.api_client.tasks.get_all(id=[task_id])[0] @@ -611,6 +637,20 @@ class K8sIntegration(Worker): _last_machine_update_ts = 0 while True: + # check if have pod limit, then check if we hit it. + if self.max_pods_limit: + current_pods = self._get_number_used_pods() + if current_pods >= self.max_pods_limit: + print("Maximum pod limit reached {}/{}, sleeping for {:.1f} seconds".format( + current_pods, self.max_pods_limit, self._polling_interval)) + # delete old completed / failed pods + get_bash_output( + self.KUBECTL_DELETE_CMD.format(namespace=self.namespace, selector=self._get_agent_label()) + ) + # go to sleep + sleep(self._polling_interval) + continue + # iterate over queues (priority style, queues[0] is highest) for queue in queues: # delete old completed / failed pods