From 822984301889327ae1a703ffdc56470ad006a951 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 26 Jan 2021 20:00:18 +0200 Subject: [PATCH] Add base-pod-number parameter to k8s glue and example --- clearml_agent/glue/k8s.py | 21 ++++++++++++++++----- examples/k8s_glue_example.py | 14 ++++++++++---- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index b9878c7..02ad890 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -88,6 +88,7 @@ class K8sIntegration(Worker): debug=False, ports_mode=False, num_of_services=20, + base_pod_num=1, user_props_cb=None, overrides_yaml=None, template_yaml=None, @@ -111,6 +112,7 @@ class K8sIntegration(Worker): Requires the `num_of_services` parameter. :param int num_of_services: Number of k8s services configured in the cluster. Required if `port_mode` is True. (default: 20) + :param int base_pod_num: Used when `ports_mode` is True, sets the base pod number to a given value (default: 1) :param callable user_props_cb: An Optional callable allowing additional user properties to be specified when scheduling a task to run in a pod. Callable can receive an optional pod number and should return a dictionary of user properties (name and value). Signature is [[Optional[int]], Dict[str,str]] @@ -133,6 +135,7 @@ class K8sIntegration(Worker): self.log.logger.setLevel(logging.INFO) self.ports_mode = ports_mode self.num_of_services = num_of_services + self.base_pod_num = base_pod_num self._edit_hyperparams_support = None self._user_props_cb = user_props_cb self.conf_file_content = None @@ -306,8 +309,10 @@ class K8sIntegration(Worker): safe_queue_name = re.sub(r'\W+', '', safe_queue_name).replace('_', '').replace('-', '') # Search for a free pod number - pod_number = 1 + pod_count = 0 + pod_number = self.base_pod_num while self.ports_mode: + pod_number = self.base_pod_num + pod_count kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format( pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number), agent_label=self.AGENT_LABEL, @@ -321,7 +326,7 @@ class K8sIntegration(Worker): if not output: # No such pod exist so we can use the pod_number we found break - if pod_number >= self.num_of_services: + if pod_count >= self.num_of_services - 1: # All pod numbers are taken, exit self.log.warning( "kubectl last result: {}\n{}\nAll k8s services are in use, task '{}' " @@ -333,12 +338,12 @@ class K8sIntegration(Worker): self._session.api_client.tasks.enqueue( task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)') return - pod_number += 1 + pod_count += 1 labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL] if self.ports_mode: - print("Kubernetes scheduling task id={} on pod={}".format(task_id, pod_number)) + print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count)) else: print("Kubernetes scheduling task id={}".format(task_id)) @@ -364,7 +369,13 @@ class K8sIntegration(Worker): user_props = {"k8s-queue": str(queue_name)} if self.ports_mode: - user_props.update({"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]}) + user_props.update( + { + "k8s-pod-number": pod_number, + "k8s-pod-label": labels[0], + "k8s-internal-pod-count": pod_count, + } + ) if self._user_props_cb: # noinspection PyBroadException diff --git a/examples/k8s_glue_example.py b/examples/k8s_glue_example.py index 5603148..b368b50 100644 --- a/examples/k8s_glue_example.py +++ b/examples/k8s_glue_example.py @@ -24,7 +24,13 @@ def parse_args(): parser.add_argument( "--base-port", type=int, help="Used in conjunction with ports-mode, specifies the base port exposed by the services. " - "For pod #X, the port will be +X" + "For pod #X, the port will be +X. Note that pod number is calculated based on base-pod-num" + "e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003" + ) + parser.add_argument( + "--base-pod-num", type=int, default=1, + help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the " + "service (default: %(default)s)" ) parser.add_argument( "--gateway-address", type=str, default=None, @@ -67,9 +73,9 @@ def main(): user_props_cb = k8s_user_props_cb k8s = K8sIntegration( - ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb, - overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf, template_yaml=args.template_yaml, - extra_bash_init_script=K8sIntegration.get_ssh_server_bash( + ports_mode=args.ports_mode, num_of_services=args.num_of_services, base_pod_num=args.base_pod_num, + user_props_cb=user_props_cb, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf, + template_yaml=args.template_yaml, extra_bash_init_script=K8sIntegration.get_ssh_server_bash( ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None, namespace=args.namespace, )