diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index b5e2a43..b9878c7 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -30,18 +30,20 @@ from clearml_agent.interface.base import ObjectID class K8sIntegration(Worker): K8S_PENDING_QUEUE = "k8s_scheduler" + K8S_DEFAULT_NAMESPACE = "clearml" + KUBECTL_APPLY_CMD = "kubectl apply -f" KUBECTL_RUN_CMD = "kubectl run clearml-{queue_name}-id-{task_id} " \ "--image {docker_image} " \ "--restart=Never --replicas=1 " \ "--generator=run-pod/v1 " \ - "--namespace=clearml" + "--namespace={namespace}" KUBECTL_DELETE_CMD = "kubectl delete pods " \ "--selector=TRAINS=agent " \ "--field-selector=status.phase!=Pending,status.phase!=Running " \ - "--namespace=clearml" + "--namespace={namespace}" BASH_INSTALL_SSH_CMD = [ "apt-get install -y openssh-server", @@ -91,6 +93,7 @@ class K8sIntegration(Worker): template_yaml=None, clearml_conf_file=None, extra_bash_init_script=None, + namespace=None, **kwargs ): """ @@ -116,6 +119,7 @@ class K8sIntegration(Worker): If provided the pod is scheduled with kubectl apply and overrides are ignored, otherwise with kubectl run. :param str clearml_conf_file: clearml.conf file to be use by the pod itself (optional) :param str extra_bash_init_script: Additional bash script to run before starting the Task inside the container + :param str namespace: K8S namespace to be used when creating the new pods (default: clearml) """ super(K8sIntegration, self).__init__() self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE @@ -137,6 +141,7 @@ class K8sIntegration(Worker): self.extra_bash_init_script = extra_bash_init_script or None if self.extra_bash_init_script and not isinstance(self.extra_bash_init_script, str): self.extra_bash_init_script = ' ; '.join(self.extra_bash_init_script) # noqa + self.namespace = namespace or self.K8S_DEFAULT_NAMESPACE self.pod_limits = [] self.pod_requests = [] if overrides_yaml: @@ -182,7 +187,9 @@ class K8sIntegration(Worker): def _monitor_hanging_pods_daemon(self): while True: - output = get_bash_output('kubectl get pods -n clearml -o=JSON') + output = get_bash_output('kubectl get pods -n {namespace} -o=JSON'.format( + namespace=self.namespace + )) output = '' if not output else output if isinstance(output, str) else output.decode('utf-8') try: output_config = json.loads(output) @@ -202,7 +209,7 @@ class K8sIntegration(Worker): pod_name = pod.get('metadata', {}).get('name', None) if pod_name: task_id = pod_name.rpartition('-')[-1] - delete_pod_cmd = 'kubectl delete pods {} -n clearml'.format(pod_name) + delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, self.namespace) get_bash_output(delete_pod_cmd) try: self._session.api_client.tasks.failed( @@ -301,9 +308,10 @@ class K8sIntegration(Worker): # Search for a free pod number pod_number = 1 while self.ports_mode: - kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n clearml".format( + kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format( pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number), - agent_label=self.AGENT_LABEL + agent_label=self.AGENT_LABEL, + namespace=self.namespace, ) process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = process.communicate() @@ -436,6 +444,7 @@ class K8sIntegration(Worker): task_id=task_id, docker_image=docker_image, queue_id=queue, + namespace=self.namespace ) # make sure we provide a list if isinstance(kubectl_cmd, str): @@ -461,7 +470,8 @@ class K8sIntegration(Worker): queue_name=queue_name, task_id=task_id, docker_image=docker_image, - queue_id=queue + queue_id=queue, + namespace=self.namespace, ) # make sure we provide a list if isinstance(kubectl_cmd, str): @@ -519,7 +529,7 @@ class K8sIntegration(Worker): # iterate over queues (priority style, queues[0] is highest) for queue in queues: # delete old completed / failed pods - get_bash_output(self.KUBECTL_DELETE_CMD) + get_bash_output(self.KUBECTL_DELETE_CMD.format(namespace=self.namespace)) # get next task in queue try: diff --git a/examples/k8s_glue_example.py b/examples/k8s_glue_example.py index 0ad694a..5603148 100644 --- a/examples/k8s_glue_example.py +++ b/examples/k8s_glue_example.py @@ -47,6 +47,10 @@ def parse_args(): "--ssh-server-port", type=int, default=0, help="If non-zero, every pod will also start an SSH server on the selected port (default: zero, not active)" ) + parser.add_argument( + "--namespace", type=str, + help="Specify the namespace in which pods will be created (default: %(default)s)", default="clearml" + ) return parser.parse_args() @@ -66,7 +70,8 @@ def main(): ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf, template_yaml=args.template_yaml, extra_bash_init_script=K8sIntegration.get_ssh_server_bash( - ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None + ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None, + namespace=args.namespace, ) k8s.k8s_daemon(args.queue)