Fix multiple k8s glue instances with pod limits

Version bump
This commit is contained in:
allegroai 2021-07-15 10:28:43 +03:00
parent 499b3dfa66
commit 42606d9247
2 changed files with 23 additions and 5 deletions

View File

@ -2,6 +2,7 @@ from __future__ import print_function, division, unicode_literals
import base64
import functools
import hashlib
import json
import logging
import os
@ -184,6 +185,8 @@ class K8sIntegration(Worker):
# make sure we use system packages!
self.conf_file_content += '\nagent.package_manager.system_site_packages=true\n'
self._agent_label = None
self._monitor_hanging_pods()
def _monitor_hanging_pods(self):
@ -260,6 +263,18 @@ class K8sIntegration(Worker):
if error.code == 404:
self._edit_hyperparams_support = self._session.api_version
def _get_agent_label(self):
if not self.worker_id:
print('WARNING! no worker ID found!!!')
return self.AGENT_LABEL
if not self._agent_label:
h = hashlib.md5()
h.update(str(self.worker_id).encode('utf-8'))
self._agent_label = '{}-{}'.format(self.AGENT_LABEL, h.hexdigest()[:8])
return self._agent_label
def _get_number_used_pods(self):
# noinspection PyBroadException
try:
@ -343,12 +358,12 @@ class K8sIntegration(Worker):
if self.ports_mode:
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format(
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
agent_label=self.AGENT_LABEL,
agent_label=self._get_agent_label(),
namespace=self.namespace,
)
else:
kubectl_cmd_new = "kubectl get pods -l {agent_label} -n {namespace} -o json".format(
agent_label=self.AGENT_LABEL,
agent_label=self._get_agent_label(),
namespace=self.namespace,
)
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@ -397,7 +412,8 @@ class K8sIntegration(Worker):
break
pod_count += 1
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL]
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + \
[self._get_agent_label()]
labels.append("clearml-agent-queue={}".format(self._safe_k8s_label_value(queue)))
labels.append("clearml-agent-queue-name={}".format(self._safe_k8s_label_value(queue_name)))
@ -657,7 +673,9 @@ class K8sIntegration(Worker):
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed / failed pods
get_bash_output(self.KUBECTL_DELETE_CMD.format(namespace=self.namespace, selector=self.AGENT_LABEL))
get_bash_output(
self.KUBECTL_DELETE_CMD.format(namespace=self.namespace, selector=self._get_agent_label())
)
# get next task in queue
try:

View File

@ -1 +1 @@
__version__ = '1.0.1rc1'
__version__ = '1.0.1rc4'