From dc5e0033c8d7fec26e82e68444d1650d60fdfb13 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Mon, 5 Dec 2022 11:40:19 +0200 Subject: [PATCH] Remove support for `kubectl run` Allow customizing pod name prefix and limit pod label Return deleted pods from cleanup Some refactoring --- clearml_agent/glue/k8s.py | 235 ++++++++++++++------------------ clearml_agent/helper/process.py | 4 + 2 files changed, 103 insertions(+), 136 deletions(-) diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index 1bbd411..7c87790 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -27,29 +27,25 @@ from clearml_agent.errors import APIError from clearml_agent.glue.definitions import ENV_START_AGENT_SCRIPT_PATH from clearml_agent.helper.base import safe_remove_file from clearml_agent.helper.dicts import merge_dicts -from clearml_agent.helper.process import get_bash_output +from clearml_agent.helper.process import get_bash_output, stringify_bash_output from clearml_agent.helper.resource_monitor import ResourceMonitor from clearml_agent.interface.base import ObjectID + class K8sIntegration(Worker): K8S_PENDING_QUEUE = "k8s_scheduler" K8S_DEFAULT_NAMESPACE = "clearml" AGENT_LABEL = "CLEARML=agent" - LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}" KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f" - KUBECTL_RUN_CMD = "kubectl run clearml-id-{task_id} " \ - "--image {docker_image} {docker_args} " \ - "--restart=Never " \ - "--namespace={namespace}" - KUBECTL_DELETE_CMD = "kubectl delete pods " \ "-l={agent_label} " \ "--field-selector=status.phase!=Pending,status.phase!=Running " \ - "--namespace={namespace}" + "--namespace={namespace} " \ + "--output name" BASH_INSTALL_SSH_CMD = [ "apt-get update", @@ -86,12 +82,14 @@ class K8sIntegration(Worker): "$LOCAL_PYTHON -m clearml_agent execute {default_execution_agent_args} --id {task_id}" ] + DEFAULT_POD_NAME_PREFIX = "clearml-id-" + DEFAULT_LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}" + _edit_hyperparams_version = "2.9" def __init__( self, k8s_pending_queue_name=None, - kubectl_cmd=None, container_bash_script=None, debug=False, ports_mode=False, @@ -104,15 +102,14 @@ class K8sIntegration(Worker): extra_bash_init_script=None, namespace=None, max_pods_limit=None, + pod_name_prefix=None, + limit_pod_label=None, **kwargs ): """ Initialize the k8s integration glue layer daemon :param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler - :param str|callable kubectl_cmd: kubectl command line str, supports formatting (default: KUBECTL_RUN_CMD) - example: "task={task_id} image={docker_image} queue_id={queue_id}" - or a callable function: kubectl_cmd(task_id, docker_image, docker_args, queue_id, task_data) :param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT) Notice this string will use format() call, if you have curly brackets they should be doubled { -> {{ Format arguments passed: {task_id} and {extra_bash_init_cmd} @@ -134,9 +131,10 @@ class K8sIntegration(Worker): :param int max_pods_limit: Maximum number of pods that K8S glue can run at the same time """ super(K8sIntegration, self).__init__() + self.pod_name_prefix = pod_name_prefix or self.DEFAULT_POD_NAME_PREFIX + self.limit_pod_label = limit_pod_label or self.DEFAULT_LIMIT_POD_LABEL self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE self.k8s_pending_queue_id = None - self.kubectl_cmd = kubectl_cmd or self.KUBECTL_RUN_CMD self.container_bash_script = container_bash_script or self.CONTAINER_BASH_SCRIPT # Always do system packages, because by we will be running inside a docker self._session.config.put("agent.package_manager.system_site_packages", True) @@ -160,27 +158,9 @@ class K8sIntegration(Worker): self.pod_limits = [] self.pod_requests = [] self.max_pods_limit = max_pods_limit if not self.ports_mode else None - if overrides_yaml: - overrides = self._load_template_file(overrides_yaml) - if overrides: - containers = overrides.get('spec', {}).get('containers', []) - for c in containers: - resources = {str(k).lower(): v for k, v in c.get('resources', {}).items()} - if not resources: - continue - if resources.get('limits'): - self.pod_limits += ['{}={}'.format(k, v) for k, v in resources['limits'].items()] - if resources.get('requests'): - self.pod_requests += ['{}={}'.format(k, v) for k, v in resources['requests'].items()] - # remove double entries - self.pod_limits = list(set(self.pod_limits)) - self.pod_requests = list(set(self.pod_requests)) - if self.pod_limits or self.pod_requests: - self.log.warning('Found pod container requests={} limits={}'.format( - self.pod_limits, self.pod_requests)) - if containers: - self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers'))) - self.overrides_json_string = json.dumps(overrides) + + self._load_overrides_yaml(overrides_yaml) + if template_yaml: self.template_dict = self._load_template_file(template_yaml) @@ -197,6 +177,31 @@ class K8sIntegration(Worker): self._min_cleanup_interval_per_ns_sec = 1.0 self._last_pod_cleanup_per_ns = defaultdict(lambda: 0.) + def _load_overrides_yaml(self, overrides_yaml): + if not overrides_yaml: + return + overrides = self._load_template_file(overrides_yaml) + if not overrides: + return + containers = overrides.get('spec', {}).get('containers', []) + for c in containers: + resources = {str(k).lower(): v for k, v in c.get('resources', {}).items()} + if not resources: + continue + if resources.get('limits'): + self.pod_limits += ['{}={}'.format(k, v) for k, v in resources['limits'].items()] + if resources.get('requests'): + self.pod_requests += ['{}={}'.format(k, v) for k, v in resources['requests'].items()] + # remove double entries + self.pod_limits = list(set(self.pod_limits)) + self.pod_requests = list(set(self.pod_requests)) + if self.pod_limits or self.pod_requests: + self.log.warning('Found pod container requests={} limits={}'.format( + self.pod_limits, self.pod_requests)) + if containers: + self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers'))) + self.overrides_json_string = json.dumps(overrides) + def _monitor_hanging_pods(self): _check_pod_thread = Thread(target=self._monitor_hanging_pods_daemon) _check_pod_thread.daemon = True @@ -216,9 +221,11 @@ class K8sIntegration(Worker): except (IndexError, KeyError): return default - def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json"): - # type: (str, Iterable[str], Iterable[str], str) -> Dict - labels = [self._get_agent_label()] + (list(extra_labels) if extra_labels else []) + def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json", labels=None): + # type: (str, Iterable[str], Iterable[str], str, Iterable[str]) -> Dict + if not labels: + labels = [self._get_agent_label()] + labels = list(labels) + (list(extra_labels) if extra_labels else []) d = { "-l": ",".join(labels), "-n": str(self.namespace), @@ -240,8 +247,7 @@ class K8sIntegration(Worker): while True: kubectl_cmd = self.get_kubectl_command("get pods", filters=["status.phase=Pending"]) self.log.debug("Detecting hanging pods: {}".format(kubectl_cmd)) - output = get_bash_output(kubectl_cmd) - output = '' if not output else output if isinstance(output, str) else output.decode('utf-8') + output = stringify_bash_output(get_bash_output(kubectl_cmd)) try: output_config = json.loads(output) except Exception as ex: @@ -380,8 +386,7 @@ class K8sIntegration(Worker): output="jsonpath=\"{range .items[*]}{.metadata.name}{' '}{.metadata.namespace}{'\\n'}{end}\"" ) self.log.debug("Getting used pods: {}".format(kubectl_cmd)) - output = get_bash_output(kubectl_cmd, raise_error=True) - output = '' if not output else output if isinstance(output, str) else output.decode('utf-8') + output = stringify_bash_output(get_bash_output(kubectl_cmd, raise_error=True)) if not output: # No such pod exist so we can use the pod_number we found @@ -492,13 +497,13 @@ class K8sIntegration(Worker): kubectl_cmd_new = self.get_kubectl_command( "get pods", - extra_labels=[self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else None + extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if self.ports_mode else None ) self.log.debug("Looking for a free pod/port: {}".format(kubectl_cmd_new)) process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = process.communicate() - output = '' if not output else output if isinstance(output, str) else output.decode('utf-8') - error = '' if not error else error if isinstance(error, str) else error.decode('utf-8') + output = stringify_bash_output(output) + error = stringify_bash_output(error) try: items_count = len(json.loads(output).get("items", [])) @@ -554,36 +559,38 @@ class K8sIntegration(Worker): labels = self._get_pod_labels(queue, queue_name) if self.ports_mode: - labels.append(self.LIMIT_POD_LABEL.format(pod_number=pod_number)) + labels.append(self.limit_pod_label.format(pod_number=pod_number)) if self.ports_mode: print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count)) else: print("Kubernetes scheduling task id={}".format(task_id)) - kubectl_kwargs = dict( - create_clearml_conf=create_clearml_conf, - labels=labels, - docker_image=container['image'], - docker_args=container['arguments'], - docker_bash=container.get('setup_shell_script'), - task_id=task_id, - queue=queue, - ) - try: template = self._resolve_template(task_session, task_data, queue) except Exception as ex: print("ERROR: Failed resolving template (skipping): {}".format(ex)) return - if template: - output, error = self._kubectl_apply(template=template, **kubectl_kwargs) - else: - output, error = self._kubectl_run(task_data=task_data, **kubectl_kwargs) + try: + namespace = template['metadata']['namespace'] or self.namespace + except (KeyError, TypeError, AttributeError): + namespace = self.namespace + + if template: + output, error = self._kubectl_apply( + template=template, + pod_number=pod_number, + create_clearml_conf=create_clearml_conf, + labels=labels, + docker_image=container['image'], + docker_args=container['arguments'], + docker_bash=container.get('setup_shell_script'), + task_id=task_id, + queue=queue, + namespace=namespace, + ) - error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8')) - output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8')) print('kubectl output:\n{}\n{}'.format(error, output)) if error: send_log = "Running kubectl encountered an error: {}".format(error) @@ -649,19 +656,22 @@ class K8sIntegration(Worker): return results def _kubectl_apply( - self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_id, template=None + self, + create_clearml_conf, + docker_image, + docker_args, + docker_bash, + labels, + queue, + task_id, + namespace, + template=None, + pod_number=None ): - template = template or deepcopy(self.template_dict) - - try: - namespace = template['metadata']['namespace'] or self.namespace - except (KeyError, TypeError, AttributeError): - namespace = self.namespace - template.setdefault('apiVersion', 'v1') template['kind'] = 'Pod' template.setdefault('metadata', {}) - name = 'clearml-id-{task_id}'.format(task_id=task_id) + name = self.pod_name_prefix + str(task_id) template['metadata']['name'] = name template.setdefault('spec', {}) template['spec'].setdefault('containers', []) @@ -751,81 +761,34 @@ class K8sIntegration(Worker): finally: safe_remove_file(yaml_file) - return output, error - - def _kubectl_run( - self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_data, task_id - ): - if callable(self.kubectl_cmd): - kubectl_cmd = self.kubectl_cmd(task_id, docker_image, docker_args, queue, task_data) - else: - kubectl_cmd = self.kubectl_cmd.format( - task_id=task_id, - docker_image=docker_image, - docker_args=" ".join(self._get_docker_args( - docker_args, flags={"-e", "--env"}, convert=lambda env: '--env={}'.format(env)) - ), - queue_id=queue, - namespace=self.namespace, - ) - # make sure we provide a list - if isinstance(kubectl_cmd, str): - kubectl_cmd = kubectl_cmd.split() - - if self.overrides_json_string: - kubectl_cmd += ['--overrides=' + self.overrides_json_string] - - if self.pod_limits: - kubectl_cmd += ['--limits', ",".join(self.pod_limits)] - if self.pod_requests: - kubectl_cmd += ['--requests', ",".join(self.pod_requests)] - - if self._docker_force_pull and not any(x.startswith("--image-pull-policy=") for x in kubectl_cmd): - kubectl_cmd += ["--image-pull-policy='always'"] - - container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \ - else self.container_bash_script - - container_bash_script = [ - line.format( - extra_bash_init_cmd=self.extra_bash_init_script or "", - extra_docker_bash_script=docker_bash or "", - task_id=task_id, - default_execution_agent_args=self.DEFAULT_EXECUTION_AGENT_ARGS, - agent_install_args=self.POD_AGENT_INSTALL_ARGS - ) - for line in container_bash_script - ] - - kubectl_cmd += [ - "--labels=" + ",".join(labels), - "--command", - "--", - "/bin/sh", - "-c", - "{} ; {}".format( - " ; ".join(create_clearml_conf or []), - ' ; '.join(line for line in container_bash_script if line.strip()) - ), - ] - process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - output, error = process.communicate() - return output, error + return stringify_bash_output(output), stringify_bash_output(error) def _cleanup_old_pods(self, namespaces, extra_msg=None): - # type: (Iterable[str], Optional[str]) -> None + # type: (Iterable[str], Optional[str]) -> Dict[str, List[str]] self.log.debug("Cleaning up pods") + deleted_pods = defaultdict(list) for namespace in namespaces: if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec: # Do not try to cleanup the same namespace too quickly continue kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, agent_label=self._get_agent_label()) - self.log.debug("Deleting old/failed pods{}: {}".format( - extra_msg or "", - kubectl_cmd + self.log.debug("Deleting old/failed pods{} for ns {}: {}".format( + extra_msg or "", namespace, kubectl_cmd )) - get_bash_output(kubectl_cmd) - self._last_pod_cleanup_per_ns[namespace] = time() + try: + res = get_bash_output(kubectl_cmd, raise_error=True) + lines = [ + line for line in + (r.strip().rpartition("/")[-1] for r in res.splitlines()) + if line.startswith(self.pod_name_prefix) + ] + self.log.debug(" - deleted pod(s) %s", ", ".join(lines)) + deleted_pods[namespace].extend(lines) + except Exception as ex: + self.log.error("Failed deleting old/failed pods for ns %s: %s", namespace, str(ex)) + finally: + self._last_pod_cleanup_per_ns[namespace] = time() + return deleted_pods def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs): """ diff --git a/clearml_agent/helper/process.py b/clearml_agent/helper/process.py index 1494e4b..689910b 100644 --- a/clearml_agent/helper/process.py +++ b/clearml_agent/helper/process.py @@ -43,6 +43,10 @@ def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False, rai return output if not strip or not output else output.strip() +def stringify_bash_output(value): + return '' if not value else (value if isinstance(value, str) else value.decode('utf-8')) + + def terminate_process(pid, timeout=10., ignore_zombie=True, include_children=False): # noinspection PyBroadException try: