Improve max pod limit check

This commit is contained in:
allegroai 2021-07-15 10:26:49 +03:00
parent 6470b16b70
commit ca360b7d43

View File

@ -260,6 +260,32 @@ class K8sIntegration(Worker):
if error.code == 404:
self._edit_hyperparams_support = self._session.api_version
def _get_number_used_pods(self):
# noinspection PyBroadException
try:
kubectl_cmd_new = "kubectl get pods -l {agent_label} -n {namespace} -o json".format(
agent_label=self._get_agent_label(),
namespace=self.namespace,
)
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
error = '' if not error else error if isinstance(error, str) else error.decode('utf-8')
if not output:
# No such pod exist so we can use the pod_number we found
return 0
try:
current_pod_count = len(json.loads(output).get("items", []))
except (ValueError, TypeError) as ex:
return -1
return current_pod_count
except Exception as ex:
print('Failed getting number of used pods: {}'.format(ex))
return -2
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
@ -611,6 +637,20 @@ class K8sIntegration(Worker):
_last_machine_update_ts = 0
while True:
# check if have pod limit, then check if we hit it.
if self.max_pods_limit:
current_pods = self._get_number_used_pods()
if current_pods >= self.max_pods_limit:
print("Maximum pod limit reached {}/{}, sleeping for {:.1f} seconds".format(
current_pods, self.max_pods_limit, self._polling_interval))
# delete old completed / failed pods
get_bash_output(
self.KUBECTL_DELETE_CMD.format(namespace=self.namespace, selector=self._get_agent_label())
)
# go to sleep
sleep(self._polling_interval)
continue
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed / failed pods