Remove queue name from pod name in k8s glue, add queue name and ID to pod labels (issue )

This commit is contained in:
allegroai 2021-05-05 11:58:37 +03:00
parent cec6420c8f
commit a2db1f5ab5

View File

@ -34,7 +34,7 @@ class K8sIntegration(Worker):
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
KUBECTL_RUN_CMD = "kubectl run clearml-{queue_name}-id-{task_id} " \
KUBECTL_RUN_CMD = "kubectl run clearml-id-{task_id} " \
"--image {docker_image} " \
"--restart=Never " \
"--namespace={namespace}"
@ -307,10 +307,6 @@ class K8sIntegration(Worker):
except Exception:
queue_name = 'k8s'
# conform queue name to k8s standards
safe_queue_name = queue_name.lower().strip()
safe_queue_name = re.sub(r'\W+', '', safe_queue_name).replace('_', '').replace('-', '')
# Search for a free pod number
pod_count = 0
pod_number = self.base_pod_num
@ -374,6 +370,8 @@ class K8sIntegration(Worker):
pod_count += 1
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL]
labels.append("clearml-agent-queue={}".format(self._safe_k8s_label_value(queue)))
labels.append("clearml-agent-queue-name={}".format(self._safe_k8s_label_value(queue_name)))
if self.ports_mode:
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
@ -384,13 +382,13 @@ class K8sIntegration(Worker):
output, error = self._kubectl_apply(
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_image, docker_args=docker_args,
task_id=task_id, queue=queue, queue_name=safe_queue_name)
task_id=task_id, queue=queue)
else:
output, error = self._kubectl_run(
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_cmd,
task_data=task_data,
task_id=task_id, queue=queue, queue_name=safe_queue_name)
task_id=task_id, queue=queue)
error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8'))
output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8'))
@ -437,12 +435,12 @@ class K8sIntegration(Worker):
self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd))
return {'env': kube_args} if kube_args else {}
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id, queue_name):
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id):
template = deepcopy(self.template_dict)
template.setdefault('apiVersion', 'v1')
template['kind'] = 'Pod'
template.setdefault('metadata', {})
name = 'clearml-{queue}-id-{task_id}'.format(queue=queue_name, task_id=task_id)
name = 'clearml-id-{task_id}'.format(task_id=task_id)
template['metadata']['name'] = name
template.setdefault('spec', {})
template['spec'].setdefault('containers', [])
@ -508,12 +506,11 @@ class K8sIntegration(Worker):
return output, error
def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id, queue_name):
def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id):
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data, queue_name)
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
else:
kubectl_cmd = self.kubectl_cmd.format(
queue_name=queue_name,
task_id=task_id,
docker_image=docker_image,
queue_id=queue,
@ -641,3 +638,13 @@ class K8sIntegration(Worker):
return merge_dicts(
c1, c2, custom_merge_func=merge_env
)
@staticmethod
def _safe_k8s_label_value(value):
""" Conform string to k8s standards for a label value """
value = value.lower().strip()
value = re.sub(r'^[^A-Za-z0-9]+', '', value) # strip leading non-alphanumeric chars
value = re.sub(r'[^A-Za-z0-9]+$', '', value) # strip trailing non-alphanumeric chars
value = re.sub(r'\W+', '-', value) # allow only word chars (this removed "." which is supported, but nvm)
value = re.sub(r'-+', '-', value) # don't leave messy "--" after replacing previous chars
return value[:63]