diff --git a/examples/k8s_glue_example.py b/examples/k8s_glue_example.py index cd928ad..7625ece 100644 --- a/examples/k8s_glue_example.py +++ b/examples/k8s_glue_example.py @@ -55,7 +55,7 @@ def main(): user_props_cb = None if args.ports_mode and args.base_port: - def k8s_user_props_cb(pod_number): + def k8s_user_props_cb(pod_number=0): user_prop = {"k8s-pod-port": args.base_port + pod_number} if args.gateway_address: user_prop["k8s-gateway-address"] = args.gateway_address diff --git a/trains_agent/glue/k8s.py b/trains_agent/glue/k8s.py index feb1cd4..25f8f2b 100644 --- a/trains_agent/glue/k8s.py +++ b/trains_agent/glue/k8s.py @@ -27,7 +27,7 @@ class K8sIntegration(Worker): KUBECTL_APPLY_CMD = "kubectl apply -f" - KUBECTL_RUN_CMD = "kubectl run trains-id-{task_id} " \ + KUBECTL_RUN_CMD = "kubectl run trains-{queue_name}-id-{task_id} " \ "--image {docker_image} " \ "--restart=Never --replicas=1 " \ "--generator=run-pod/v1 " \ @@ -232,6 +232,12 @@ class K8sIntegration(Worker): if self.ports_mode: print("Kubernetes looking for available pod to use") + # noinspection PyBroadException + try: + queue_name = self._session.api_client.queues.get_by_id(queue=queue).name + except Exception: + queue_name = 'k8s' + # Search for a free pod number pod_number = 1 while self.ports_mode: @@ -256,7 +262,8 @@ class K8sIntegration(Worker): ) ) self._session.api_client.tasks.reset(task_id) - self._session.api_client.tasks.enqueue(task_id, queue=queue) + self._session.api_client.tasks.enqueue( + task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)') return pod_number += 1 @@ -271,29 +278,33 @@ class K8sIntegration(Worker): output, error = self._kubectl_apply( create_trains_conf=create_trains_conf, labels=labels, docker_image=docker_image, docker_args=docker_args, - task_id=task_id, queue=queue) + task_id=task_id, queue=queue, queue_name=queue_name) else: output, error = self._kubectl_run( create_trains_conf=create_trains_conf, labels=labels, docker_image=docker_image, task_data=task_data, - task_id=task_id, queue=queue) + task_id=task_id, queue=queue, queue_name=queue_name) error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8')) output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8')) print('kubectl output:\n{}\n{}'.format(error, output)) - if error: self.log.error("Running kubectl encountered an error: {}".format(error)) - elif self.ports_mode: - user_props = {"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]} - if self._user_props_cb: - # noinspection PyBroadException - try: - custom_props = self._user_props_cb(pod_number) if self.ports_mode else self._user_props_cb() - user_props.update(custom_props) - except Exception: - pass + + user_props = {"k8s-queue": str(queue_name)} + if self.ports_mode: + user_props.update({"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]}) + + if self._user_props_cb: + # noinspection PyBroadException + try: + custom_props = self._user_props_cb(pod_number) if self.ports_mode else self._user_props_cb() + user_props.update(custom_props) + except Exception: + pass + + if user_props: self._set_task_user_properties( task_id=task_id, **user_props @@ -312,12 +323,12 @@ class K8sIntegration(Worker): self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd)) return kube_args - def _kubectl_apply(self, create_trains_conf, docker_image, docker_args, labels, queue, task_id): + def _kubectl_apply(self, create_trains_conf, docker_image, docker_args, labels, queue, task_id, queue_name): template = deepcopy(self.template_dict) template.setdefault('apiVersion', 'v1') template['kind'] = 'Pod' template.setdefault('metadata', {}) - name = 'trains-id-{task_id}'.format(task_id=task_id) + name = 'trains-{queue}-id-{task_id}'.format(queue=queue_name, task_id=task_id) template['metadata']['name'] = name template.setdefault('spec', {}) template['spec'].setdefault('containers', []) @@ -380,11 +391,12 @@ class K8sIntegration(Worker): return output, error - def _kubectl_run(self, create_trains_conf, docker_image, labels, queue, task_data, task_id): + def _kubectl_run(self, create_trains_conf, docker_image, labels, queue, task_data, task_id, queue_name): if callable(self.kubectl_cmd): - kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data) + kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data, queue_name) else: kubectl_cmd = self.kubectl_cmd.format( + queue_name=queue_name, task_id=task_id, docker_image=docker_image, queue_id=queue