From 0b36cb0f85099cd48a0372ddcebcd1c8ce1af7df Mon Sep 17 00:00:00 2001
From: allegroai <>
Date: Thu, 10 Dec 2020 14:19:19 +0200
Subject: [PATCH] Change k8s pod naming scheme to include queue name

---
 examples/k8s_glue_example.py |  2 +-
 trains_agent/glue/k8s.py     | 48 ++++++++++++++++++++++--------------
 2 files changed, 31 insertions(+), 19 deletions(-)

diff --git a/examples/k8s_glue_example.py b/examples/k8s_glue_example.py
index cd928ad..7625ece 100644
--- a/examples/k8s_glue_example.py
+++ b/examples/k8s_glue_example.py
@@ -55,7 +55,7 @@ def main():
 
     user_props_cb = None
     if args.ports_mode and args.base_port:
-        def k8s_user_props_cb(pod_number):
+        def k8s_user_props_cb(pod_number=0):
             user_prop = {"k8s-pod-port": args.base_port + pod_number}
             if args.gateway_address:
                 user_prop["k8s-gateway-address"] = args.gateway_address
diff --git a/trains_agent/glue/k8s.py b/trains_agent/glue/k8s.py
index feb1cd4..25f8f2b 100644
--- a/trains_agent/glue/k8s.py
+++ b/trains_agent/glue/k8s.py
@@ -27,7 +27,7 @@ class K8sIntegration(Worker):
 
     KUBECTL_APPLY_CMD = "kubectl apply -f"
 
-    KUBECTL_RUN_CMD = "kubectl run trains-id-{task_id} " \
+    KUBECTL_RUN_CMD = "kubectl run trains-{queue_name}-id-{task_id} " \
                       "--image {docker_image} " \
                       "--restart=Never --replicas=1 " \
                       "--generator=run-pod/v1 " \
@@ -232,6 +232,12 @@ class K8sIntegration(Worker):
         if self.ports_mode:
             print("Kubernetes looking for available pod to use")
 
+        # noinspection PyBroadException
+        try:
+            queue_name = self._session.api_client.queues.get_by_id(queue=queue).name
+        except Exception:
+            queue_name = 'k8s'
+
         # Search for a free pod number
         pod_number = 1
         while self.ports_mode:
@@ -256,7 +262,8 @@ class K8sIntegration(Worker):
                     )
                 )
                 self._session.api_client.tasks.reset(task_id)
-                self._session.api_client.tasks.enqueue(task_id, queue=queue)
+                self._session.api_client.tasks.enqueue(
+                    task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)')
                 return
             pod_number += 1
 
@@ -271,29 +278,33 @@ class K8sIntegration(Worker):
             output, error = self._kubectl_apply(
                 create_trains_conf=create_trains_conf,
                 labels=labels, docker_image=docker_image, docker_args=docker_args,
-                task_id=task_id, queue=queue)
+                task_id=task_id, queue=queue, queue_name=queue_name)
         else:
             output, error = self._kubectl_run(
                 create_trains_conf=create_trains_conf,
                 labels=labels, docker_image=docker_image,
                 task_data=task_data,
-                task_id=task_id, queue=queue)
+                task_id=task_id, queue=queue, queue_name=queue_name)
 
         error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8'))
         output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8'))
         print('kubectl output:\n{}\n{}'.format(error, output))
-
         if error:
             self.log.error("Running kubectl encountered an error: {}".format(error))
-        elif self.ports_mode:
-            user_props = {"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]}
-            if self._user_props_cb:
-                # noinspection PyBroadException
-                try:
-                    custom_props = self._user_props_cb(pod_number) if self.ports_mode else self._user_props_cb()
-                    user_props.update(custom_props)
-                except Exception:
-                    pass
+
+        user_props = {"k8s-queue": str(queue_name)}
+        if self.ports_mode:
+            user_props.update({"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]})
+
+        if self._user_props_cb:
+            # noinspection PyBroadException
+            try:
+                custom_props = self._user_props_cb(pod_number) if self.ports_mode else self._user_props_cb()
+                user_props.update(custom_props)
+            except Exception:
+                pass
+
+        if user_props:
             self._set_task_user_properties(
                 task_id=task_id,
                 **user_props
@@ -312,12 +323,12 @@ class K8sIntegration(Worker):
                 self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd))
         return kube_args
 
-    def _kubectl_apply(self, create_trains_conf, docker_image, docker_args, labels, queue, task_id):
+    def _kubectl_apply(self, create_trains_conf, docker_image, docker_args, labels, queue, task_id, queue_name):
         template = deepcopy(self.template_dict)
         template.setdefault('apiVersion', 'v1')
         template['kind'] = 'Pod'
         template.setdefault('metadata', {})
-        name = 'trains-id-{task_id}'.format(task_id=task_id)
+        name = 'trains-{queue}-id-{task_id}'.format(queue=queue_name, task_id=task_id)
         template['metadata']['name'] = name
         template.setdefault('spec', {})
         template['spec'].setdefault('containers', [])
@@ -380,11 +391,12 @@ class K8sIntegration(Worker):
 
         return output, error
 
-    def _kubectl_run(self, create_trains_conf, docker_image, labels, queue, task_data, task_id):
+    def _kubectl_run(self, create_trains_conf, docker_image, labels, queue, task_data, task_id, queue_name):
         if callable(self.kubectl_cmd):
-            kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
+            kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data, queue_name)
         else:
             kubectl_cmd = self.kubectl_cmd.format(
+                queue_name=queue_name,
                 task_id=task_id,
                 docker_image=docker_image,
                 queue_id=queue