diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index d00abd2..8fdf2be 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -32,7 +32,7 @@ class K8sIntegration(Worker): K8S_DEFAULT_NAMESPACE = "clearml" - KUBECTL_APPLY_CMD = "kubectl apply -f" + KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f" KUBECTL_RUN_CMD = "kubectl run clearml-{queue_name}-id-{task_id} " \ "--image {docker_image} " \ @@ -95,6 +95,7 @@ class K8sIntegration(Worker): clearml_conf_file=None, extra_bash_init_script=None, namespace=None, + max_pods_limit=None, **kwargs ): """ @@ -122,6 +123,7 @@ class K8sIntegration(Worker): :param str clearml_conf_file: clearml.conf file to be use by the pod itself (optional) :param str extra_bash_init_script: Additional bash script to run before starting the Task inside the container :param str namespace: K8S namespace to be used when creating the new pods (default: clearml) + :param int max_pods_limit: Maximum number of pods that K8S glue can run at the same time """ super(K8sIntegration, self).__init__() self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE @@ -147,6 +149,7 @@ class K8sIntegration(Worker): self.namespace = namespace or self.K8S_DEFAULT_NAMESPACE self.pod_limits = [] self.pod_requests = [] + self.max_pods_limit = max_pods_limit if not self.ports_mode else None if overrides_yaml: with open(os.path.expandvars(os.path.expanduser(str(overrides_yaml))), 'rt') as f: overrides = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None)) @@ -311,13 +314,19 @@ class K8sIntegration(Worker): # Search for a free pod number pod_count = 0 pod_number = self.base_pod_num - while self.ports_mode: + while self.ports_mode or self.max_pods_limit: pod_number = self.base_pod_num + pod_count - kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format( - pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number), - agent_label=self.AGENT_LABEL, - namespace=self.namespace, - ) + if self.ports_mode: + kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format( + pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number), + agent_label=self.AGENT_LABEL, + namespace=self.namespace, + ) + else: + kubectl_cmd_new = "kubectl get pods -l {agent_label} -n {namespace} -o json".format( + agent_label=self.AGENT_LABEL, + namespace=self.namespace, + ) process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = process.communicate() output = '' if not output else output if isinstance(output, str) else output.decode('utf-8') @@ -326,18 +335,42 @@ class K8sIntegration(Worker): if not output: # No such pod exist so we can use the pod_number we found break - if pod_count >= self.num_of_services - 1: - # All pod numbers are taken, exit + + if self.max_pods_limit: + try: + current_pod_count = len(json.loads(output).get("items", [])) + except (ValueError, TypeError) as ex: + self.log.warning( + "K8S Glue pods monitor: Failed parsing kubectl output:\n{}\ntask '{}' " + "will be enqueued back to queue '{}'\nEx: {}".format( + output, task_id, queue, ex + ) + ) + self._session.api_client.tasks.reset(task_id) + self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error') + return + max_count = self.max_pods_limit + else: + current_pod_count = pod_count + max_count = self.num_of_services - 1 + + if current_pod_count >= max_count: + # All pods are taken, exit + self.log.debug( + "kubectl last result: {}\n{}".format(error, output)) self.log.warning( - "kubectl last result: {}\n{}\nAll k8s services are in use, task '{}' " + "All k8s services are in use, task '{}' " "will be enqueued back to queue '{}'".format( - error, output, task_id, queue + task_id, queue ) ) self._session.api_client.tasks.reset(task_id) self._session.api_client.tasks.enqueue( task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)') return + elif self.max_pods_limit: + # max pods limit hasn't reached yet, so we can create the pod + break pod_count += 1 labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL]