Change k8s pod naming scheme to include queue name

This commit is contained in:
allegroai 2020-12-10 14:19:19 +02:00
parent dd42423482
commit 0b36cb0f85
2 changed files with 31 additions and 19 deletions

View File

@ -55,7 +55,7 @@ def main():
user_props_cb = None
if args.ports_mode and args.base_port:
def k8s_user_props_cb(pod_number):
def k8s_user_props_cb(pod_number=0):
user_prop = {"k8s-pod-port": args.base_port + pod_number}
if args.gateway_address:
user_prop["k8s-gateway-address"] = args.gateway_address

View File

@ -27,7 +27,7 @@ class K8sIntegration(Worker):
KUBECTL_APPLY_CMD = "kubectl apply -f"
KUBECTL_RUN_CMD = "kubectl run trains-id-{task_id} " \
KUBECTL_RUN_CMD = "kubectl run trains-{queue_name}-id-{task_id} " \
"--image {docker_image} " \
"--restart=Never --replicas=1 " \
"--generator=run-pod/v1 " \
@ -232,6 +232,12 @@ class K8sIntegration(Worker):
if self.ports_mode:
print("Kubernetes looking for available pod to use")
# noinspection PyBroadException
try:
queue_name = self._session.api_client.queues.get_by_id(queue=queue).name
except Exception:
queue_name = 'k8s'
# Search for a free pod number
pod_number = 1
while self.ports_mode:
@ -256,7 +262,8 @@ class K8sIntegration(Worker):
)
)
self._session.api_client.tasks.reset(task_id)
self._session.api_client.tasks.enqueue(task_id, queue=queue)
self._session.api_client.tasks.enqueue(
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)')
return
pod_number += 1
@ -271,29 +278,33 @@ class K8sIntegration(Worker):
output, error = self._kubectl_apply(
create_trains_conf=create_trains_conf,
labels=labels, docker_image=docker_image, docker_args=docker_args,
task_id=task_id, queue=queue)
task_id=task_id, queue=queue, queue_name=queue_name)
else:
output, error = self._kubectl_run(
create_trains_conf=create_trains_conf,
labels=labels, docker_image=docker_image,
task_data=task_data,
task_id=task_id, queue=queue)
task_id=task_id, queue=queue, queue_name=queue_name)
error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8'))
output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8'))
print('kubectl output:\n{}\n{}'.format(error, output))
if error:
self.log.error("Running kubectl encountered an error: {}".format(error))
elif self.ports_mode:
user_props = {"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]}
if self._user_props_cb:
# noinspection PyBroadException
try:
custom_props = self._user_props_cb(pod_number) if self.ports_mode else self._user_props_cb()
user_props.update(custom_props)
except Exception:
pass
user_props = {"k8s-queue": str(queue_name)}
if self.ports_mode:
user_props.update({"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]})
if self._user_props_cb:
# noinspection PyBroadException
try:
custom_props = self._user_props_cb(pod_number) if self.ports_mode else self._user_props_cb()
user_props.update(custom_props)
except Exception:
pass
if user_props:
self._set_task_user_properties(
task_id=task_id,
**user_props
@ -312,12 +323,12 @@ class K8sIntegration(Worker):
self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd))
return kube_args
def _kubectl_apply(self, create_trains_conf, docker_image, docker_args, labels, queue, task_id):
def _kubectl_apply(self, create_trains_conf, docker_image, docker_args, labels, queue, task_id, queue_name):
template = deepcopy(self.template_dict)
template.setdefault('apiVersion', 'v1')
template['kind'] = 'Pod'
template.setdefault('metadata', {})
name = 'trains-id-{task_id}'.format(task_id=task_id)
name = 'trains-{queue}-id-{task_id}'.format(queue=queue_name, task_id=task_id)
template['metadata']['name'] = name
template.setdefault('spec', {})
template['spec'].setdefault('containers', [])
@ -380,11 +391,12 @@ class K8sIntegration(Worker):
return output, error
def _kubectl_run(self, create_trains_conf, docker_image, labels, queue, task_data, task_id):
def _kubectl_run(self, create_trains_conf, docker_image, labels, queue, task_data, task_id, queue_name):
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data, queue_name)
else:
kubectl_cmd = self.kubectl_cmd.format(
queue_name=queue_name,
task_id=task_id,
docker_image=docker_image,
queue_id=queue