From 820ab4dc0c545364a3b2e3854b09a395a58b6dc7 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Mon, 1 Aug 2022 18:55:49 +0300 Subject: [PATCH] Fix k8s glue debug mode, refactoring --- clearml_agent/glue/k8s.py | 150 +++++++++++++++++++++++--------------- 1 file changed, 92 insertions(+), 58 deletions(-) diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index bb7af4c..3bdf486 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -135,7 +135,8 @@ class K8sIntegration(Worker): # Add debug logging if debug: self.log.logger.disabled = False - self.log.logger.setLevel(logging.INFO) + self.log.logger.setLevel(logging.DEBUG) + self.log.logger.addHandler(logging.StreamHandler()) self.ports_mode = ports_mode self.num_of_services = num_of_services self.base_pod_num = base_pod_num @@ -152,8 +153,7 @@ class K8sIntegration(Worker): self.pod_requests = [] self.max_pods_limit = max_pods_limit if not self.ports_mode else None if overrides_yaml: - with open(os.path.expandvars(os.path.expanduser(str(overrides_yaml))), 'rt') as f: - overrides = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None)) + overrides = self._load_template_file(overrides_yaml) if overrides: containers = overrides.get('spec', {}).get('containers', []) for c in containers: @@ -174,8 +174,7 @@ class K8sIntegration(Worker): self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers'))) self.overrides_json_string = json.dumps(overrides) if template_yaml: - with open(os.path.expandvars(os.path.expanduser(str(template_yaml))), 'rt') as f: - self.template_dict = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None)) + self.template_dict = self._load_template_file(template_yaml) clearml_conf_file = clearml_conf_file or kwargs.get('trains_conf_file') @@ -194,6 +193,11 @@ class K8sIntegration(Worker): _check_pod_thread.daemon = True _check_pod_thread.start() + @staticmethod + def _load_template_file(path): + with open(os.path.expandvars(os.path.expanduser(str(path))), 'rt') as f: + return yaml.load(f, Loader=getattr(yaml, 'FullLoader', None)) + @staticmethod def _get_path(d, *path, default=None): try: @@ -203,13 +207,27 @@ class K8sIntegration(Worker): except (IndexError, KeyError): return default + def _get_kubectl_options(self, command, extra_labels=None): + labels = [self._get_agent_label()] + (list(extra_labels) if extra_labels else []) + return { + "-l": ",".join(labels), + "-n": str(self.namespace), + "-o": "json" + } + + def get_kubectl_command(self, command, extra_labels=None): + opts = self._get_kubectl_options(command, extra_labels) + return 'kubectl {command} {opts}'.format( + command=command, opts=" ".join(x for item in opts.items() for x in item) + ) + def _monitor_hanging_pods_daemon(self): last_tasks_msgs = {} # last msg updated for every task while True: - output = get_bash_output('kubectl get pods -n {namespace} -o=JSON'.format( - namespace=self.namespace - )) + kubectl_cmd = self.get_kubectl_command("get pods") + self.log.debug("Detecting hanging pods: {}".format(kubectl_cmd)) + output = get_bash_output(kubectl_cmd) output = '' if not output else output if isinstance(output, str) else output.decode('utf-8') try: output_config = json.loads(output) @@ -231,6 +249,10 @@ class K8sIntegration(Worker): if not task_id: continue + namespace = pod.get('metadata', {}).get('namespace', None) + if not namespace: + continue + task_ids.add(task_id) msg = None @@ -250,7 +272,7 @@ class K8sIntegration(Worker): msg = reason + (" ({})".format(message) if message else "") if reason == 'ImagePullBackOff': - delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, self.namespace) + delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, namespace) get_bash_output(delete_pod_cmd) try: self._session.api_client.tasks.failed( @@ -336,13 +358,11 @@ class K8sIntegration(Worker): return self._agent_label - def _get_number_used_pods(self): + def _get_used_pods(self): # noinspection PyBroadException try: - kubectl_cmd_new = "kubectl get pods -l {agent_label} -n {namespace} -o json".format( - agent_label=self._get_agent_label(), - namespace=self.namespace, - ) + kubectl_cmd_new = self.get_kubectl_command("get pods") + self.log.debug("Getting used pods: {}".format(kubectl_cmd_new)) process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = process.communicate() output = '' if not output else output if isinstance(output, str) else output.decode('utf-8') @@ -350,17 +370,20 @@ class K8sIntegration(Worker): if not output: # No such pod exist so we can use the pod_number we found - return 0 + return 0, {} try: - current_pod_count = len(json.loads(output).get("items", [])) - except (ValueError, TypeError) as ex: - return -1 + items = json.loads(output).get("items", []) + current_pod_count = len(items) + namespaces = {item["metadata"]["namespace"] for item in items} + except (KeyError, ValueError, TypeError, AttributeError) as ex: + print("Failed parsing used pods command response for cleanup: {}".format(ex)) + return -1, {} - return current_pod_count + return current_pod_count, namespaces except Exception as ex: - print('Failed getting number of used pods: {}'.format(ex)) - return -2 + print('Failed obtaining used pods information: {}'.format(ex)) + return -2, {} def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_): print('Pulling task {} launching on kubernetes cluster'.format(task_id)) @@ -426,39 +449,36 @@ class K8sIntegration(Worker): pod_number = self.base_pod_num while self.ports_mode or self.max_pods_limit: pod_number = self.base_pod_num + pod_count - if self.ports_mode: - kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format( - pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number), - agent_label=self._get_agent_label(), - namespace=self.namespace, - ) - else: - kubectl_cmd_new = "kubectl get pods -l {agent_label} -n {namespace} -o json".format( - agent_label=self._get_agent_label(), - namespace=self.namespace, - ) + + kubectl_cmd_new = self.get_kubectl_command( + "get pods", + extra_labels=[self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else None + ) + self.log.debug("Looking for a free pod/port: {}".format(kubectl_cmd_new)) process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = process.communicate() output = '' if not output else output if isinstance(output, str) else output.decode('utf-8') error = '' if not error else error if isinstance(error, str) else error.decode('utf-8') - if not output: - # No such pod exist so we can use the pod_number we found + try: + items_count = len(json.loads(output).get("items", [])) + except (ValueError, TypeError) as ex: + self.log.warning( + "K8S Glue pods monitor: Failed parsing kubectl output:\n{}\ntask '{}' " + "will be enqueued back to queue '{}'\nEx: {}".format( + output, task_id, queue, ex + ) + ) + self._session.api_client.tasks.stop(task_id, force=True) + self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error') + return + + if not items_count: + # No such pod exist so we can use the pod_number we found (result exists but with no items) break if self.max_pods_limit: - try: - current_pod_count = len(json.loads(output).get("items", [])) - except (ValueError, TypeError) as ex: - self.log.warning( - "K8S Glue pods monitor: Failed parsing kubectl output:\n{}\ntask '{}' " - "will be enqueued back to queue '{}'\nEx: {}".format( - output, task_id, queue, ex - ) - ) - self._session.api_client.tasks.stop(task_id, force=True) - self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error') - return + current_pod_count = items_count max_count = self.max_pods_limit else: current_pod_count = pod_count @@ -483,10 +503,9 @@ class K8sIntegration(Worker): break pod_count += 1 - labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + \ - [self._get_agent_label()] - labels.append("clearml-agent-queue={}".format(self._safe_k8s_label_value(queue))) - labels.append("clearml-agent-queue-name={}".format(self._safe_k8s_label_value(queue_name))) + labels = self._get_pod_labels(queue, queue_name) + if self.ports_mode: + labels.append(self.LIMIT_POD_LABEL.format(pod_number=pod_number)) if self.ports_mode: print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count)) @@ -503,7 +522,11 @@ class K8sIntegration(Worker): queue=queue ) - template = self._resolve_template(task_session, task_data, queue) + try: + template = self._resolve_template(task_session, task_data, queue) + except Exception as ex: + print("ERROR: Failed resolving template (skipping): {}".format(ex)) + return if template: output, error = self._kubectl_apply(template=template, **kubectl_kwargs) @@ -542,6 +565,13 @@ class K8sIntegration(Worker): **user_props ) + def _get_pod_labels(self, queue, queue_name): + return [ + self._get_agent_label(), + "clearml-agent-queue={}".format(self._safe_k8s_label_value(queue)), + "clearml-agent-queue-name={}".format(self._safe_k8s_label_value(queue_name)) + ] + def _get_docker_args(self, docker_args, flags, target=None, convert=None): # type: (List[str], Collection[str], Optional[str], Callable[[str], Any]) -> Union[dict, List[str]] """ @@ -740,16 +770,19 @@ class K8sIntegration(Worker): _last_machine_update_ts = 0 while True: + # Get used pods and namespaces + current_pods, namespaces = self._get_used_pods() + # check if have pod limit, then check if we hit it. if self.max_pods_limit: - current_pods = self._get_number_used_pods() if current_pods >= self.max_pods_limit: print("Maximum pod limit reached {}/{}, sleeping for {:.1f} seconds".format( current_pods, self.max_pods_limit, self._polling_interval)) # delete old completed / failed pods - get_bash_output( - self.KUBECTL_DELETE_CMD.format(namespace=self.namespace, selector=self._get_agent_label()) - ) + for namespace in namespaces: + kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, selector=self._get_agent_label()) + self.log.debug("Deleting old/failed pods due to pod limit: {}".format(kubectl_cmd)) + get_bash_output(kubectl_cmd) # go to sleep sleep(self._polling_interval) continue @@ -757,9 +790,10 @@ class K8sIntegration(Worker): # iterate over queues (priority style, queues[0] is highest) for queue in queues: # delete old completed / failed pods - get_bash_output( - self.KUBECTL_DELETE_CMD.format(namespace=self.namespace, selector=self._get_agent_label()) - ) + for namespace in namespaces: + kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, selector=self._get_agent_label()) + self.log.debug("Deleting old/failed pods: {}".format(kubectl_cmd)) + get_bash_output(kubectl_cmd) # get next task in queue try: