From 3dd5973734dc733e2b4adbfcc6a0017453242994 Mon Sep 17 00:00:00 2001
From: allegroai <>
Date: Mon, 5 Dec 2022 11:29:59 +0200
Subject: [PATCH] Filter by phase when detecting hanging pods More debug
 print-outs Use task session when possible Push task into k8s scheduler queue
 only if running from the same tenant Make sure we pass git_user/pass to the
 task pod Fix cleanup command not issued when no pods exist in a multi-queue
 setup

---
 clearml_agent/glue/k8s.py | 217 +++++++++++++++++++++++++-------------
 1 file changed, 144 insertions(+), 73 deletions(-)

diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py
index 79e09bd..1bbd411 100644
--- a/clearml_agent/glue/k8s.py
+++ b/clearml_agent/glue/k8s.py
@@ -9,26 +9,27 @@ import os
 import re
 import subprocess
 import tempfile
+from collections import defaultdict
 from copy import deepcopy
 from pathlib import Path
 from pprint import pformat
 from threading import Thread
-from time import sleep
-from typing import Text, List, Callable, Any, Collection, Optional, Union
+from time import sleep, time
+from typing import Text, List, Callable, Any, Collection, Optional, Union, Iterable, Dict, Tuple, Set
 
 import yaml
 
+from clearml_agent.backend_api.session import Request
 from clearml_agent.commands.events import Events
 from clearml_agent.commands.worker import Worker, get_task_container, set_task_container, get_next_task
-from clearml_agent.definitions import ENV_DOCKER_IMAGE
+from clearml_agent.definitions import ENV_DOCKER_IMAGE, ENV_AGENT_GIT_USER, ENV_AGENT_GIT_PASS
 from clearml_agent.errors import APIError
+from clearml_agent.glue.definitions import ENV_START_AGENT_SCRIPT_PATH
 from clearml_agent.helper.base import safe_remove_file
 from clearml_agent.helper.dicts import merge_dicts
 from clearml_agent.helper.process import get_bash_output
 from clearml_agent.helper.resource_monitor import ResourceMonitor
 from clearml_agent.interface.base import ObjectID
-from clearml_agent.backend_api.session import Request
-from clearml_agent.glue.definitions import ENV_START_AGENT_SCRIPT_PATH
 
 
 class K8sIntegration(Worker):
@@ -46,7 +47,7 @@ class K8sIntegration(Worker):
                       "--namespace={namespace}"
 
     KUBECTL_DELETE_CMD = "kubectl delete pods " \
-                         "--selector={selector} " \
+                         "-l={agent_label} " \
                          "--field-selector=status.phase!=Pending,status.phase!=Running " \
                          "--namespace={namespace}"
 
@@ -65,6 +66,9 @@ class K8sIntegration(Worker):
         'echo "ldconfig" >> /etc/profile',
         "/usr/sbin/sshd -p {port}"]
 
+    DEFAULT_EXECUTION_AGENT_ARGS = os.getenv("K8S_GLUE_DEF_EXEC_AGENT_ARGS", "--full-monitoring --require-queue")
+    POD_AGENT_INSTALL_ARGS = os.getenv("K8S_GLUE_POD_AGENT_INSTALL_ARGS", "")
+
     CONTAINER_BASH_SCRIPT = [
         "export DEBIAN_FRONTEND='noninteractive'",
         "echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean",
@@ -77,9 +81,9 @@ class K8sIntegration(Worker):
         "[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip",
         "[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
         "{extra_bash_init_cmd}",
-        "$LOCAL_PYTHON -m pip install clearml-agent",
+        "$LOCAL_PYTHON -m pip install clearml-agent{agent_install_args}",
         "{extra_docker_bash_script}",
-        "$LOCAL_PYTHON -m clearml_agent execute --full-monitoring --require-queue --id {task_id}"
+        "$LOCAL_PYTHON -m clearml_agent execute {default_execution_agent_args} --id {task_id}"
     ]
 
     _edit_hyperparams_version = "2.9"
@@ -190,6 +194,9 @@ class K8sIntegration(Worker):
 
         self._monitor_hanging_pods()
 
+        self._min_cleanup_interval_per_ns_sec = 1.0
+        self._last_pod_cleanup_per_ns = defaultdict(lambda: 0.)
+
     def _monitor_hanging_pods(self):
         _check_pod_thread = Thread(target=self._monitor_hanging_pods_daemon)
         _check_pod_thread.daemon = True
@@ -209,16 +216,20 @@ class K8sIntegration(Worker):
         except (IndexError, KeyError):
             return default
 
-    def _get_kubectl_options(self, command, extra_labels=None):
+    def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json"):
+        # type: (str, Iterable[str], Iterable[str], str) -> Dict
         labels = [self._get_agent_label()] + (list(extra_labels) if extra_labels else [])
-        return {
+        d = {
             "-l": ",".join(labels),
             "-n": str(self.namespace),
-            "-o": "json"
+            "-o": output,
         }
+        if filters:
+            d["--field-selector"] = ",".join(filters)
+        return d
 
-    def get_kubectl_command(self, command, extra_labels=None):
-        opts = self._get_kubectl_options(command, extra_labels)
+    def get_kubectl_command(self, command, output="json", **args):
+        opts = self._get_kubectl_options(command, output=output, **args)
         return 'kubectl {command} {opts}'.format(
             command=command, opts=" ".join(x for item in opts.items() for x in item)
         )
@@ -227,7 +238,7 @@ class K8sIntegration(Worker):
         last_tasks_msgs = {}  # last msg updated for every task
 
         while True:
-            kubectl_cmd = self.get_kubectl_command("get pods")
+            kubectl_cmd = self.get_kubectl_command("get pods", filters=["status.phase=Pending"])
             self.log.debug("Detecting hanging pods: {}".format(kubectl_cmd))
             output = get_bash_output(kubectl_cmd)
             output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
@@ -240,9 +251,6 @@ class K8sIntegration(Worker):
             pods = output_config.get('items', [])
             task_ids = set()
             for pod in pods:
-                if self._get_path(pod, 'status', 'phase') != "Pending":
-                    continue
-
                 pod_name = pod.get('metadata', {}).get('name', None)
                 if not pod_name:
                     continue
@@ -275,8 +283,10 @@ class K8sIntegration(Worker):
 
                     if reason == 'ImagePullBackOff':
                         delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, namespace)
+                        self.log.debug(" - deleting pod due to ImagePullBackOff: {}".format(delete_pod_cmd))
                         get_bash_output(delete_pod_cmd)
                         try:
+                            self.log.debug(" - Detecting hanging pods: {}".format(kubectl_cmd))
                             self._session.api_client.tasks.failed(
                                 task=task_id,
                                 status_reason="K8S glue error: {}".format(msg),
@@ -308,8 +318,8 @@ class K8sIntegration(Worker):
                         last_tasks_msgs[task_id] = msg
                     except Exception as ex:
                         self.log.warning(
-                            'K8S Glue pods monitor: Failed setting status message for task "{}"\nEX: {}'.format(
-                                task_id, ex
+                            'K8S Glue pods monitor: Failed setting status message for task "{}"\nMSG: {}\nEX: {}'.format(
+                                task_id, msg, ex
                             )
                         )
 
@@ -318,7 +328,8 @@ class K8sIntegration(Worker):
 
             sleep(self._polling_interval)
 
-    def _set_task_user_properties(self, task_id: str, **properties: str):
+    def _set_task_user_properties(self, task_id: str, task_session=None, **properties: str):
+        session = task_session or self._session
         if self._edit_hyperparams_support is not True:
             # either not supported or never tested
             if self._edit_hyperparams_support == self._session.api_version:
@@ -329,7 +340,7 @@ class K8sIntegration(Worker):
                 self._edit_hyperparams_support = self._session.api_version
                 return
         try:
-            self._session.get(
+            session.get(
                 service="tasks",
                 action="edit_hyper_params",
                 task=task_id,
@@ -361,67 +372,94 @@ class K8sIntegration(Worker):
         return self._agent_label
 
     def _get_used_pods(self):
+        # type: () -> Tuple[int, Set[str]]
         # noinspection PyBroadException
         try:
-            kubectl_cmd_new = self.get_kubectl_command("get pods")
-            self.log.debug("Getting used pods: {}".format(kubectl_cmd_new))
-            process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-            output, error = process.communicate()
+            kubectl_cmd = self.get_kubectl_command(
+                "get pods",
+                output="jsonpath=\"{range .items[*]}{.metadata.name}{' '}{.metadata.namespace}{'\\n'}{end}\""
+            )
+            self.log.debug("Getting used pods: {}".format(kubectl_cmd))
+            output = get_bash_output(kubectl_cmd, raise_error=True)
             output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
-            error = '' if not error else error if isinstance(error, str) else error.decode('utf-8')
 
             if not output:
                 # No such pod exist so we can use the pod_number we found
-                return 0, {}
+                return 0, set([])
 
             try:
-                items = json.loads(output).get("items", [])
+                items = output.splitlines()
                 current_pod_count = len(items)
-                namespaces = {item["metadata"]["namespace"] for item in items}
+                namespaces = {item.rpartition(" ")[-1] for item in items}
+                self.log.debug(" - found {} pods in namespaces {}".format(current_pod_count, ", ".join(namespaces)))
             except (KeyError, ValueError, TypeError, AttributeError) as ex:
                 print("Failed parsing used pods command response for cleanup: {}".format(ex))
-                return -1, {}
+                return -1, set([])
 
             return current_pod_count, namespaces
         except Exception as ex:
             print('Failed obtaining used pods information: {}'.format(ex))
-            return -2, {}
+            return -2, set([])
+
+    def _is_same_tenant(self, task_session):
+        if not task_session or task_session is self._session:
+            return True
+        # noinspection PyStatementEffect
+        try:
+            tenant = self._session.get_decoded_token(self._session.token, verify=False)["tenant"]
+            task_tenant = task_session.get_decoded_token(task_session.token, verify=False)["tenant"]
+            return tenant == task_tenant
+        except Exception as ex:
+            print("ERROR: Failed getting tenant for task session: {}".format(ex))
 
     def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
         print('Pulling task {} launching on kubernetes cluster'.format(task_id))
-        task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
+        session = task_session or self._session
+        task_data = session.api_client.tasks.get_all(id=[task_id])[0]
 
         # push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler
-        try:
-            print('Pushing task {} into temporary pending queue'.format(task_id))
-            _ = self._session.api_client.tasks.stop(task_id, force=True)
-            res = self._session.api_client.tasks.enqueue(
-                task_id,
-                queue=self.k8s_pending_queue_id,
-                status_reason='k8s pending scheduler',
-            )
-            if res.meta.result_code != 200:
-                raise Exception(res.meta.result_msg)
-        except Exception as e:
-            self.log.error("ERROR: Could not push back task [{}] to k8s pending queue {} [{}], error: {}".format(
-                task_id, self.k8s_pending_queue_name, self.k8s_pending_queue_id, e))
-            return
+        if self._is_same_tenant(task_session):
+            try:
+                print('Pushing task {} into temporary pending queue'.format(task_id))
+                _ = session.api_client.tasks.stop(task_id, force=True)
 
-        container = get_task_container(self._session, task_id)
+                res = self._session.api_client.tasks.enqueue(
+                    task_id,
+                    queue=self.k8s_pending_queue_id,
+                    status_reason='k8s pending scheduler',
+                )
+                if res.meta.result_code != 200:
+                    raise Exception(res.meta.result_msg)
+            except Exception as e:
+                self.log.error("ERROR: Could not push back task [{}] to k8s pending queue {} [{}], error: {}".format(
+                    task_id, self.k8s_pending_queue_name, self.k8s_pending_queue_id, e))
+                return
+
+        container = get_task_container(session, task_id)
         if not container.get('image'):
             container['image'] = str(
-                ENV_DOCKER_IMAGE.get() or self._session.config.get("agent.default_docker.image", "nvidia/cuda")
+                ENV_DOCKER_IMAGE.get() or session.config.get("agent.default_docker.image", "nvidia/cuda")
             )
-            container['arguments'] = self._session.config.get("agent.default_docker.arguments", None)
+            container['arguments'] = session.config.get("agent.default_docker.arguments", None)
             set_task_container(
-                self._session, task_id, docker_image=container['image'], docker_arguments=container['arguments']
+                session, task_id, docker_image=container['image'], docker_arguments=container['arguments']
             )
 
         # get the clearml.conf encoded file, make sure we use system packages!
+
+        git_user = ENV_AGENT_GIT_USER.get() or self._session.config.get("agent.git_user", None)
+        git_pass = ENV_AGENT_GIT_PASS.get() or self._session.config.get("agent.git_pass", None)
+        extra_config_values = [
+            'agent.package_manager.system_site_packages: true',
+            'agent.git_user: "{}"'.format(git_user) if git_user else '',
+            'agent.git_pass: "{}"'.format(git_pass) if git_pass else '',
+        ]
+
         # noinspection PyProtectedMember
         config_content = (
-            self.conf_file_content or Path(self._session._config_file).read_text() or ""
-        ) + '\nagent.package_manager.system_site_packages=true\n'
+            self.conf_file_content or Path(session._config_file).read_text() or ""
+        ) + '\n{}\n'.format('\n'.join(x for x in extra_config_values if x))
+
         hocon_config_encoded = config_content.encode("ascii")
 
         create_clearml_conf = ["echo '{}' | base64 --decode >> ~/clearml.conf".format(
@@ -471,8 +509,12 @@ class K8sIntegration(Worker):
                         output, task_id, queue, ex
                     )
                 )
-                self._session.api_client.tasks.stop(task_id, force=True)
-                self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error')
+                session.api_client.tasks.stop(task_id, force=True)
+                # noinspection PyBroadException
+                try:
+                    self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error')
+                except:
+                    self.log.warning("Failed enqueuing task to queue '{}'".format(queue))
                 return
 
             if not items_count:
@@ -496,9 +538,14 @@ class K8sIntegration(Worker):
                         task_id, queue
                     )
                 )
-                self._session.api_client.tasks.stop(task_id, force=True)
-                self._session.api_client.tasks.enqueue(
-                    task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)')
+                session.api_client.tasks.stop(task_id, force=True)
+                # noinspection PyBroadException
+                try:
+                    self._session.api_client.tasks.enqueue(
+                        task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)'
+                    )
+                except:
+                    self.log.warning("Failed enqueuing task to queue '{}'".format(queue))
                 return
             elif self.max_pods_limit:
                 # max pods limit hasn't reached yet, so we can create the pod
@@ -521,7 +568,7 @@ class K8sIntegration(Worker):
             docker_args=container['arguments'],
             docker_bash=container.get('setup_shell_script'),
             task_id=task_id,
-            queue=queue
+            queue=queue,
         )
 
         try:
@@ -564,6 +611,7 @@ class K8sIntegration(Worker):
         if user_props:
             self._set_task_user_properties(
                 task_id=task_id,
+                task_session=task_session,
                 **user_props
             )
 
@@ -641,7 +689,9 @@ class K8sIntegration(Worker):
             ['#!/bin/bash', ] +
             [line.format(extra_bash_init_cmd=self.extra_bash_init_script or '',
                          task_id=task_id,
-                         extra_docker_bash_script=extra_docker_bash_script)
+                         extra_docker_bash_script=extra_docker_bash_script,
+                         default_execution_agent_args=self.DEFAULT_EXECUTION_AGENT_ARGS,
+                         agent_install_args=self.POD_AGENT_INSTALL_ARGS)
              for line in container_bash_script])
 
         extra_bash_commands = list(create_clearml_conf or [])
@@ -735,7 +785,17 @@ class K8sIntegration(Worker):
 
         container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
             else self.container_bash_script
-        container_bash_script = ' ; '.join(container_bash_script)
+
+        container_bash_script = [
+            line.format(
+                extra_bash_init_cmd=self.extra_bash_init_script or "",
+                extra_docker_bash_script=docker_bash or "",
+                task_id=task_id,
+                default_execution_agent_args=self.DEFAULT_EXECUTION_AGENT_ARGS,
+                agent_install_args=self.POD_AGENT_INSTALL_ARGS
+            )
+            for line in container_bash_script
+        ]
 
         kubectl_cmd += [
             "--labels=" + ",".join(labels),
@@ -743,16 +803,30 @@ class K8sIntegration(Worker):
             "--",
             "/bin/sh",
             "-c",
-            "{} ; {}".format(" ; ".join(create_clearml_conf or []), container_bash_script.format(
-                extra_bash_init_cmd=self.extra_bash_init_script or "",
-                extra_docker_bash_script=docker_bash or "",
-                task_id=task_id
-            )),
+            "{} ; {}".format(
+                " ; ".join(create_clearml_conf or []),
+                ' ; '.join(line for line in container_bash_script if line.strip())
+            ),
         ]
         process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
         output, error = process.communicate()
         return output, error
 
+    def _cleanup_old_pods(self, namespaces, extra_msg=None):
+        # type: (Iterable[str], Optional[str]) -> None
+        self.log.debug("Cleaning up pods")
+        for namespace in namespaces:
+            if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec:
+                # Do not try to cleanup the same namespace too quickly
+                continue
+            kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, agent_label=self._get_agent_label())
+            self.log.debug("Deleting old/failed pods{}: {}".format(
+                extra_msg or "",
+                kubectl_cmd
+            ))
+            get_bash_output(kubectl_cmd)
+            self._last_pod_cleanup_per_ns[namespace] = time()
+
     def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):
         """
         :summary: Pull and run tasks from queues.
@@ -780,16 +854,16 @@ class K8sIntegration(Worker):
             # Get used pods and namespaces
             current_pods, namespaces = self._get_used_pods()
 
+            # just in case there are no pods, make sure we look at our base namespace
+            namespaces.add(self.namespace)
+
             # check if have pod limit, then check if we hit it.
             if self.max_pods_limit:
                 if current_pods >= self.max_pods_limit:
                     print("Maximum pod limit reached {}/{}, sleeping for {:.1f} seconds".format(
                         current_pods, self.max_pods_limit, self._polling_interval))
                     # delete old completed / failed pods
-                    for namespace in namespaces:
-                        kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, selector=self._get_agent_label())
-                        self.log.debug("Deleting old/failed pods due to pod limit: {}".format(kubectl_cmd))
-                        get_bash_output(kubectl_cmd)
+                    self._cleanup_old_pods(namespaces, " due to pod limit")
                     # go to sleep
                     sleep(self._polling_interval)
                     continue
@@ -797,10 +871,7 @@ class K8sIntegration(Worker):
             # iterate over queues (priority style, queues[0] is highest)
             for queue in queues:
                 # delete old completed / failed pods
-                for namespace in namespaces:
-                    kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, selector=self._get_agent_label())
-                    self.log.debug("Deleting old/failed pods: {}".format(kubectl_cmd))
-                    get_bash_output(kubectl_cmd)
+                self._cleanup_old_pods(namespaces)
 
                 # get next task in queue
                 try: