From 92fc8e838f6b078c348069eb6b6a2feeb88f7c61 Mon Sep 17 00:00:00 2001
From: allegroai <>
Date: Tue, 20 Oct 2020 14:17:30 +0300
Subject: [PATCH] Add K8s glue support for limited number of services exposing

 examples/ |  34 +++++++++++
 trains_agent/glue/     | 107 ++++++++++++++++++++++++++++-------
 2 files changed, 122 insertions(+), 19 deletions(-)
 create mode 100644 examples/

diff --git a/examples/ b/examples/
new file mode 100644
index 0000000..6c1b6bf
--- /dev/null
+++ b/examples/
@@ -0,0 +1,34 @@
+This example assumes you have preconfigured services with selectors in the form of
+ "ai.allegro.agent.serial=pod-<number>" and a targetPort of 10022.
+The K8sIntegration component will label each pod accordingly.
+from argparse import ArgumentParser
+from trains_agent.glue.k8s import K8sIntegration
+def parse_args():
+    parser = ArgumentParser()
+    parser.add_argument(
+        "--queue", type=str, help="Queue to pull tasks from"
+    )
+    parser.add_argument(
+        "--ports-mode", action='store_true', default=False,
+        help="Ports-mode will add a label to the pod which can be used in services in order to expose ports"
+    )
+    parser.add_argument(
+        "--num-of-services", type=int, default=20,
+        help="Specify the number of k8s services to be used. Use only with ports-mode."
+    )
+    return parser.parse_args()
+def main():
+    args = parse_args()
+    k8s = K8sIntegration(ports_mode=args.ports_mode, num_of_services=args.num_of_services)
+    k8s.k8s_daemon(args.queue)
+if __name__ == "__main__":
+    main()
diff --git a/trains_agent/glue/ b/trains_agent/glue/
index 9f0a061..9fea7a9 100644
--- a/trains_agent/glue/
+++ b/trains_agent/glue/
@@ -1,5 +1,6 @@
 from __future__ import print_function, division, unicode_literals
+import base64
 import logging
 import os
 import subprocess
@@ -12,39 +13,58 @@ from import Events
 from trains_agent.commands.worker import Worker
 from trains_agent.helper.process import get_bash_output
 from trains_agent.helper.resource_monitor import ResourceMonitor
+from trains_agent.interface.base import ObjectID
 class K8sIntegration(Worker):
     K8S_PENDING_QUEUE = "k8s_scheduler"
-    KUBECTL_RUN_CMD = "kubectl run trains_id_{task_id} " \
+    KUBECTL_RUN_CMD = "kubectl run trains-id-{task_id} " \
                       "--image {docker_image} " \
                       "--restart=Never --replicas=1 " \
-                      "--generator=run-pod/v1"
+                      "--generator=run-pod/v1 " \
+                      "--namespace=trains"
     KUBECTL_DELETE_CMD = "kubectl delete pods " \
                          "--selector=TRAINS=agent " \
-                         "--field-selector=status.phase!=Pending,status.phase!=Running"
+                         "--field-selector=status.phase!=Pending,status.phase!=Running " \
+                         "--namespace=trains"
+        "export DEBIAN_FRONTEND='noninteractive'; " \
         "echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " \
         "chown -R root /root/.cache/pip ; " \
         "apt-get update ; " \
         "apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0 ; " \
         "(which python3 && python3 -m pip --version) || apt-get install -y python3-pip ; " \
         "python3 -m pip install trains-agent ; " \
-        "python3 -m trains_agent execute --full-monitoring --require-queue --id {}"
+        "python3 -m trains_agent execute --full-monitoring --require-queue --id {} ; "
-    def __init__(self, k8s_pending_queue_name=None, kubectl_cmd=None, container_bash_script=None, debug=False):
+    AGENT_LABEL = "TRAINS=agent"
+    LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
+    def __init__(
+            self,
+            k8s_pending_queue_name=None,
+            kubectl_cmd=None,
+            container_bash_script=None,
+            debug=False,
+            ports_mode=False,
+            num_of_services=20,
+    ):
         Initialize the k8s integration glue layer daemon
         :param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler
-        :param str|callable kubectl_cmd: kubectl command line str, supports formating (default: KUBECTL_RUN_CMD)
+        :param str|callable kubectl_cmd: kubectl command line str, supports formatting (default: KUBECTL_RUN_CMD)
             example: "task={task_id} image={docker_image} queue_id={queue_id}"
             or a callable function: kubectl_cmd(task_id, docker_image, queue_id, task_data)
         :param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT)
         :param bool debug: Switch logging on
+        :param bool ports_mode: Adds a label to each pod which can be used in services in order to expose ports.
+            Requires the `num_of_services` parameter.
+        :param int num_of_services: Number of k8s services configured in the cluster. Required if `port_mode` is True.
+            (default: 20)
         super(K8sIntegration, self).__init__()
         self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
@@ -56,12 +76,15 @@ class K8sIntegration(Worker):
         if debug:
             self.log.logger.disabled = False
+        self.ports_mode = ports_mode
+        self.num_of_services = num_of_services
-    def run_one_task(self, queue: Text, task_id: Text, worker_args=None):
+    def run_one_task(self, queue: Text, task_id: Text, worker_args=None, **_):
         task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
         # push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler
+            self._session.api_client.tasks.reset(task_id)
             self._session.api_client.tasks.enqueue(task_id, queue=self.k8s_pending_queue_name,
                                                    status_reason='k8s pending scheduler')
         except Exception as e:
@@ -78,28 +101,74 @@ class K8sIntegration(Worker):
         # take the first part, this is the docker image name (not arguments)
         docker_image = docker_image.split()[0]
-        create_trains_conf = "echo '{}' >> ~/trains.conf && ".format(
-            HOCONConverter.to_hocon(self._session.config._config))
+        hocon_config_encoded = HOCONConverter.to_hocon(
+                self._session.config._config
+            ).encode('ascii')
+        create_trains_conf = "echo '{}' | base64 --decode >> ~/trains.conf && ".format(
+            base64.b64encode(
+                hocon_config_encoded
+            ).decode('ascii')
+        )
         if callable(self.kubectl_cmd):
             kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
-            kubectl_cmd = self.kubectl_cmd.format(task_id=task_id, docker_image=docker_image, queue_id=queue)
+            kubectl_cmd = self.kubectl_cmd.format(
+                task_id=task_id,
+                docker_image=docker_image,
+                queue_id=queue
+            )
-        # make sure we gave a list
+        # Search for a free pod number
+        pod_number = 1
+        while self.ports_mode:
+            kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n trains".format(
+                pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
+                agent_label=self.AGENT_LABEL
+            )
+            process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+            output, error = process.communicate()
+            if not output:
+                # No such pod exist so we can use the pod_number we found
+                break
+            if pod_number >= self.num_of_services:
+                # All pod numbers are taken, exit
+                    "All k8s services are in use, task '{}' will be enqueued back to queue '{}'".format(
+                        task_id, queue
+                    )
+                )
+                self._session.api_client.tasks.reset(task_id)
+                self._session.api_client.tasks.enqueue(task_id, queue=queue)
+                return
+            pod_number += 1
+        # make sure we provide a list
         if isinstance(kubectl_cmd, str):
             kubectl_cmd = kubectl_cmd.split()
-        kubectl_cmd += ["--labels=TRAINS=agent", "--command", "--", "/bin/sh", "-c",
-                        create_trains_conf + self.container_bash_script.format(task_id)]
+        labels = [self.AGENT_LABEL]
+        message = "K8s scheduling experiment task id={}".format(task_id)
+        if self.ports_mode:
+            labels.insert(0, self.LIMIT_POD_LABEL.format(pod_number=pod_number))
+            message += " pod #{}".format(pod_number)
+        kubectl_cmd += [
+            "--labels=" + ",".join(labels),
+            "--command",
+            "--",
+            "/bin/sh",
+            "-c",
+            create_trains_conf + self.container_bash_script.format(task_id),
+        ]
         process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
         output, error = process.communicate()
-"K8s scheduling experiment task id={}".format(task_id))
         if error:
             self.log.error("Running kubectl encountered an error: {}".format(
                 error if isinstance(error, str) else error.decode()))
-    def run_tasks_loop(self, queues: List[Text], worker_params):
+    def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):
         :summary: Pull and run tasks from queues.
         :description: 1. Go through ``queues`` by order.
@@ -160,15 +229,15 @@ class K8sIntegration(Worker):
             if self._session.config["agent.reload_config"]:
-    def k8s_daemon(self, queues):
+    def k8s_daemon(self, queue):
         Start the k8s Glue service.
-        This service will be pulling tasks from *queues* and scheduling them for execution using kubectl.
+        This service will be pulling tasks from *queue* and scheduling them for execution using kubectl.
         Notice all scheduled tasks are pushed back into K8S_PENDING_QUEUE,
         and popped when execution actually starts. This creates full visibility into the k8s scheduler.
         Manually popping a task from the K8S_PENDING_QUEUE,
         will cause the k8s scheduler to skip the execution once the scheduled tasks needs to be executed
-        :param list(str) queues: List of queue names to pull from
+        :param list(str) queue: queue name to pull from
-        return self.daemon(queues=queues, log_level=logging.INFO, foreground=True, docker=False)
+        return self.daemon(queues=[ObjectID(name=queue)], log_level=logging.INFO, foreground=True, docker=False)