mirror of
				https://github.com/clearml/clearml-agent
				synced 2025-06-26 18:16:15 +00:00 
			
		
		
		
	Fix queue handling in K8sIntegration and k8s_glue_example.py (#183)
* Fix queue handling in K8sIntegration and k8s_glue_example.py * Update Dockerfile and k8s_glue_example.py * Add executable permission to provider_entrypoint.sh * ADJUST docker * Update clearml-agent version * ADDJUST stuff * ADJUST queue string handling * DELETE pip install from own repo
This commit is contained in:
		
							parent
							
								
									01e8ffd854
								
							
						
					
					
						commit
						a2758250b2
					
				| @ -1096,7 +1096,7 @@ class K8sIntegration(Worker): | ||||
|         :param list(str) queue: queue name to pull from | ||||
|         """ | ||||
|         return self.daemon( | ||||
|             queues=[ObjectID(name=queue)] if queue else None, | ||||
|             queues=[ObjectID(name=q) for q in queue] if queue else None, | ||||
|             log_level=logging.INFO, foreground=True, docker=False, **kwargs, | ||||
|         ) | ||||
| 
 | ||||
|  | ||||
							
								
								
									
										37
									
								
								docker/k8s-glue/build-image-helper.sh
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										37
									
								
								docker/k8s-glue/build-image-helper.sh
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,37 @@ | ||||
| #!/bin/bash | ||||
| 
 | ||||
| # Check if image name and Dockerfile path are provided | ||||
| if [ -z "$1" ] || [ -z "$2" ]; then | ||||
|     echo "Usage: $0 <image_name> <dockerfile_path> <build_context>" | ||||
|     exit 1 | ||||
| fi | ||||
| 
 | ||||
| # Build the Docker image | ||||
| image_name=$1 | ||||
| dockerfile_path=$2 | ||||
| build_context=$3 | ||||
| 
 | ||||
| if [ $build_context == "glue-build-aws" ] || [ $build_context == "glue-build-gcp" ]; then | ||||
|     if [ ! -f $build_context/clearml.conf ]; then | ||||
|         cp build-resources/clearml.conf $build_context | ||||
|     fi | ||||
|     if [ ! -f $build_context/entrypoint.sh ]; then | ||||
|         cp build-resources/entrypoint.sh $build_context | ||||
|         chmod +x $build_context/entrypoint.sh | ||||
|     fi | ||||
|     if [ ! -f $build_context/setup.sh ]; then | ||||
|         cp build-resources/setup.sh $build_context | ||||
|         chmod +x $build_context/setup.sh | ||||
|     fi | ||||
| fi | ||||
| cp ../../examples/k8s_glue_example.py $build_context | ||||
| 
 | ||||
| docker build -f $dockerfile_path -t $image_name $build_context | ||||
| 
 | ||||
| # cleanup | ||||
| if [ $build_context == "glue-build-aws" ] || [ $build_context == "glue-build-gcp" ]; then | ||||
|     rm $build_context/clearml.conf | ||||
|     rm $build_context/entrypoint.sh | ||||
|     rm $build_context/setup.sh | ||||
| fi | ||||
| rm $build_context/k8s_glue_example.py | ||||
| @ -1,112 +0,0 @@ | ||||
| """ | ||||
| This example assumes you have preconfigured services with selectors in the form of | ||||
|  "ai.allegro.agent.serial=pod-<number>" and a targetPort of 10022. | ||||
| The K8sIntegration component will label each pod accordingly. | ||||
| """ | ||||
| from argparse import ArgumentParser | ||||
| 
 | ||||
| from clearml_agent.glue.k8s import K8sIntegration | ||||
| 
 | ||||
| 
 | ||||
| def parse_args(): | ||||
|     parser = ArgumentParser() | ||||
|     group = parser.add_mutually_exclusive_group() | ||||
| 
 | ||||
|     parser.add_argument( | ||||
|         "--queue", type=str, help="Queue to pull tasks from" | ||||
|     ) | ||||
|     group.add_argument( | ||||
|         "--ports-mode", action='store_true', default=False, | ||||
|         help="Ports-Mode will add a label to the pod which can be used as service, in order to expose ports" | ||||
|              "Should not be used with max-pods" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--num-of-services", type=int, default=20, | ||||
|         help="Specify the number of k8s services to be used. Use only with ports-mode." | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--base-port", type=int, | ||||
|         help="Used in conjunction with ports-mode, specifies the base port exposed by the services. " | ||||
|              "For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num" | ||||
|              "e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--base-pod-num", type=int, default=1, | ||||
|         help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the " | ||||
|              "service (default: %(default)s)" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--gateway-address", type=str, default=None, | ||||
|         help="Used in conjunction with ports-mode, specify the external address of the k8s ingress / ELB" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--pod-clearml-conf", type=str, | ||||
|         help="Configuration file to be used by the pod itself (if not provided, current configuration is used)" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--overrides-yaml", type=str, | ||||
|         help="YAML file containing pod overrides to be used when launching a new pod" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--template-yaml", type=str, | ||||
|         help="YAML file containing pod template. If provided pod will be scheduled with kubectl apply " | ||||
|              "and overrides are ignored, otherwise it will be scheduled with kubectl run" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--ssh-server-port", type=int, default=0, | ||||
|         help="If non-zero, every pod will also start an SSH server on the selected port (default: zero, not active)" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--namespace", type=str, | ||||
|         help="Specify the namespace in which pods will be created (default: %(default)s)", default="clearml" | ||||
|     ) | ||||
|     group.add_argument( | ||||
|         "--max-pods", type=int, | ||||
|         help="Limit the maximum number of pods that this service can run at the same time." | ||||
|              "Should not be used with ports-mode" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--use-owner-token", action="store_true", default=False, | ||||
|         help="Generate and use task owner token for the execution of each task" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--standalone-mode", action="store_true", default=False, | ||||
|         help="Do not use any network connects, assume everything is pre-installed" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--child-report-tags", type=str, nargs="+", default=None, | ||||
|         help="List of tags to send with the status reports from a worker that runs a task" | ||||
|     ) | ||||
| 
 | ||||
|     return parser.parse_args() | ||||
| 
 | ||||
| 
 | ||||
| def main(): | ||||
|     args = parse_args() | ||||
| 
 | ||||
|     user_props_cb = None | ||||
|     if args.ports_mode and args.base_port: | ||||
|         def k8s_user_props_cb(pod_number=0): | ||||
|             user_prop = {"k8s-pod-port": args.base_port + pod_number} | ||||
|             if args.gateway_address: | ||||
|                 user_prop["k8s-gateway-address"] = args.gateway_address | ||||
|             return user_prop | ||||
|         user_props_cb = k8s_user_props_cb | ||||
| 
 | ||||
|     k8s = K8sIntegration( | ||||
|         ports_mode=args.ports_mode, num_of_services=args.num_of_services, base_pod_num=args.base_pod_num, | ||||
|         user_props_cb=user_props_cb, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf, | ||||
|         template_yaml=args.template_yaml, extra_bash_init_script=K8sIntegration.get_ssh_server_bash( | ||||
|             ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None, | ||||
|         namespace=args.namespace, max_pods_limit=args.max_pods or None | ||||
|     ) | ||||
|     k8s.k8s_daemon( | ||||
|         args.queue, | ||||
|         use_owner_token=args.use_owner_token, | ||||
|         standalone_mode=args.standalone_mode, | ||||
|         child_report_tags=args.child_report_tags | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
| @ -8,15 +8,16 @@ ENV LANG=en_US.UTF-8 | ||||
| ENV LANGUAGE=en_US.UTF-8 | ||||
| ENV PYTHONIOENCODING=UTF-8 | ||||
| 
 | ||||
| COPY ../build-resources/setup.sh /root/setup.sh | ||||
| COPY ./setup.sh /root/setup.sh | ||||
| RUN /root/setup.sh | ||||
| 
 | ||||
| COPY ./setup_aws.sh /root/setup_aws.sh | ||||
| RUN /root/setup_aws.sh | ||||
| RUN chmod +x /root/setup_aws.sh && /root/setup_aws.sh | ||||
| 
 | ||||
| COPY ../build-resources/entrypoint.sh /root/entrypoint.sh | ||||
| COPY ./entrypoint.sh /root/entrypoint.sh | ||||
| COPY ./provider_entrypoint.sh /root/provider_entrypoint.sh | ||||
| COPY ./build-resources/k8s_glue_example.py /root/k8s_glue_example.py | ||||
| RUN chmod +x /root/provider_entrypoint.sh | ||||
| COPY ./k8s_glue_example.py /root/k8s_glue_example.py | ||||
| COPY ./clearml.conf /root/clearml.conf | ||||
| 
 | ||||
| ENTRYPOINT ["/root/entrypoint.sh"] | ||||
| @ -8,15 +8,15 @@ ENV LANG=en_US.UTF-8 | ||||
| ENV LANGUAGE=en_US.UTF-8 | ||||
| ENV PYTHONIOENCODING=UTF-8 | ||||
| 
 | ||||
| COPY ../build-resources/setup.sh /root/setup.sh | ||||
| COPY ./setup.sh /root/setup.sh | ||||
| RUN /root/setup.sh | ||||
| 
 | ||||
| COPY ./setup_gcp.sh /root/setup_gcp.sh | ||||
| RUN /root/setup_gcp.sh | ||||
| RUN chmod +x /root/setup_gcp.sh && /root/setup_gcp.sh | ||||
| 
 | ||||
| COPY ../build-resources/entrypoint.sh /root/entrypoint.sh | ||||
| COPY ./entrypoint.sh /root/entrypoint.sh | ||||
| COPY ./provider_entrypoint.sh /root/provider_entrypoint.sh | ||||
| COPY ./build-resources/k8s_glue_example.py /root/k8s_glue_example.py | ||||
| COPY ./k8s_glue_example.py /root/k8s_glue_example.py | ||||
| COPY ./clearml.conf /root/clearml.conf | ||||
| 
 | ||||
| ENTRYPOINT ["/root/entrypoint.sh"] | ||||
| @ -1,98 +0,0 @@ | ||||
| """ | ||||
| This example assumes you have preconfigured services with selectors in the form of | ||||
|  "ai.allegro.agent.serial=pod-<number>" and a targetPort of 10022. | ||||
| The K8sIntegration component will label each pod accordingly. | ||||
| """ | ||||
| from argparse import ArgumentParser | ||||
| 
 | ||||
| from clearml_agent.glue.k8s import K8sIntegration | ||||
| 
 | ||||
| 
 | ||||
| def parse_args(): | ||||
|     parser = ArgumentParser() | ||||
|     group = parser.add_mutually_exclusive_group() | ||||
| 
 | ||||
|     parser.add_argument( | ||||
|         "--queue", type=str, help="Queue to pull tasks from" | ||||
|     ) | ||||
|     group.add_argument( | ||||
|         "--ports-mode", action='store_true', default=False, | ||||
|         help="Ports-Mode will add a label to the pod which can be used as service, in order to expose ports" | ||||
|              "Should not be used with max-pods" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--num-of-services", type=int, default=20, | ||||
|         help="Specify the number of k8s services to be used. Use only with ports-mode." | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--base-port", type=int, | ||||
|         help="Used in conjunction with ports-mode, specifies the base port exposed by the services. " | ||||
|              "For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num" | ||||
|              "e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--base-pod-num", type=int, default=1, | ||||
|         help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the " | ||||
|              "service (default: %(default)s)" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--gateway-address", type=str, default=None, | ||||
|         help="Used in conjunction with ports-mode, specify the external address of the k8s ingress / ELB" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--pod-clearml-conf", type=str, | ||||
|         help="Configuration file to be used by the pod itself (if not provided, current configuration is used)" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--overrides-yaml", type=str, | ||||
|         help="YAML file containing pod overrides to be used when launching a new pod" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--template-yaml", type=str, | ||||
|         help="YAML file containing pod template. If provided pod will be scheduled with kubectl apply " | ||||
|              "and overrides are ignored, otherwise it will be scheduled with kubectl run" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--ssh-server-port", type=int, default=0, | ||||
|         help="If non-zero, every pod will also start an SSH server on the selected port (default: zero, not active)" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--namespace", type=str, | ||||
|         help="Specify the namespace in which pods will be created (default: %(default)s)", default="clearml" | ||||
|     ) | ||||
|     group.add_argument( | ||||
|         "--max-pods", type=int, | ||||
|         help="Limit the maximum number of pods that this service can run at the same time." | ||||
|              "Should not be used with ports-mode" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--use-owner-token", action="store_true", default=False, | ||||
|         help="Generate and use task owner token for the execution of each task" | ||||
|     ) | ||||
|     return parser.parse_args() | ||||
| 
 | ||||
| 
 | ||||
| def main(): | ||||
|     args = parse_args() | ||||
| 
 | ||||
|     user_props_cb = None | ||||
|     if args.ports_mode and args.base_port: | ||||
|         def k8s_user_props_cb(pod_number=0): | ||||
|             user_prop = {"k8s-pod-port": args.base_port + pod_number} | ||||
|             if args.gateway_address: | ||||
|                 user_prop["k8s-gateway-address"] = args.gateway_address | ||||
|             return user_prop | ||||
|         user_props_cb = k8s_user_props_cb | ||||
| 
 | ||||
|     k8s = K8sIntegration( | ||||
|         ports_mode=args.ports_mode, num_of_services=args.num_of_services, base_pod_num=args.base_pod_num, | ||||
|         user_props_cb=user_props_cb, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf, | ||||
|         template_yaml=args.template_yaml, extra_bash_init_script=K8sIntegration.get_ssh_server_bash( | ||||
|             ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None, | ||||
|         namespace=args.namespace, max_pods_limit=args.max_pods or None, | ||||
|     ) | ||||
|     k8s.k8s_daemon(args.queue, use_owner_token=args.use_owner_token) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
| @ -13,7 +13,7 @@ def parse_args(): | ||||
|     group = parser.add_mutually_exclusive_group() | ||||
| 
 | ||||
|     parser.add_argument( | ||||
|         "--queue", type=str, help="Queue to pull tasks from" | ||||
|         "--queue", type=str, help="Queues to pull tasks from. If multiple queues, use comma separated list, e.g. 'queue1,queue2'", | ||||
|     ) | ||||
|     group.add_argument( | ||||
|         "--ports-mode", action='store_true', default=False, | ||||
| @ -69,6 +69,10 @@ def parse_args(): | ||||
|         "--use-owner-token", action="store_true", default=False, | ||||
|         help="Generate and use task owner token for the execution of each task" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "--create-queue", action="store_true", default=False, | ||||
|         help="Create the queue if it does not exist (default: %(default)s)" | ||||
|     ) | ||||
|     return parser.parse_args() | ||||
| 
 | ||||
| 
 | ||||
| @ -91,7 +95,9 @@ def main(): | ||||
|             ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None, | ||||
|         namespace=args.namespace, max_pods_limit=args.max_pods or None, | ||||
|     ) | ||||
|     k8s.k8s_daemon(args.queue, use_owner_token=args.use_owner_token) | ||||
|     args.queue = [q.strip() for q in args.queue.split(",") if q.strip()] | ||||
| 
 | ||||
|     k8s.k8s_daemon(args.queue, use_owner_token=args.use_owner_token, create_queue=args.create_queue) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user