diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index 389bdb2..f3a21a7 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -12,6 +12,7 @@ import shlex import shutil import signal import string +import socket import subprocess import sys import traceback @@ -24,7 +25,7 @@ from functools import partial from os.path import basename from tempfile import mkdtemp, NamedTemporaryFile from time import sleep, time -from typing import Text, Optional, Any, Tuple, List +from typing import Text, Optional, Any, Tuple, List, Dict, Mapping, Union import attr import six @@ -368,6 +369,8 @@ def get_task_container(session, task_id): container = result.json()['data']['tasks'][0]['container'] if result.ok else {} if container.get('arguments'): container['arguments'] = shlex.split(str(container.get('arguments')).strip()) + if container.get('image'): + container['image'] = container.get('image').strip() except (ValueError, TypeError): container = {} else: @@ -635,6 +638,8 @@ class Worker(ServiceCommandSection): _docker_fixed_user_cache = '/clearml_agent_cache' _temp_cleanup_list = [] + hostname_task_runtime_prop = "_exec_agent_hostname" + @property def service(self): """ Worker command service endpoint """ @@ -850,6 +855,20 @@ class Worker(ServiceCommandSection): # "Running task '{}'".format(task_id) print(self._task_logging_start_message.format(task_id)) task_session = task_session or self._session + + # noinspection PyBroadException + try: + res = task_session.send_request( + service='tasks', action='edit', method=Request.def_method, + json={ + "task": task_id, "force": True, "runtime": {self.hostname_task_runtime_prop: socket.gethostname()} + }, + ) + if not res.ok: + raise Exception("failed setting runtime property") + except Exception as ex: + print("Warning: failed obtaining/setting hostname for task '{}': {}".format(task_id, ex)) + # set task status to in_progress so we know it was popped from the queue # noinspection PyBroadException try: @@ -2352,6 +2371,7 @@ class Worker(ServiceCommandSection): raise CommandFailedError("Cloning failed") else: # make sure this task is not stuck in an execution queue, it shouldn't have been, but just in case. + # noinspection PyBroadException try: res = self._session.api_client.tasks.dequeue(task=current_task.id) if require_queue and res.meta.result_code != 200: diff --git a/clearml_agent/glue/daemon.py b/clearml_agent/glue/daemon.py new file mode 100644 index 0000000..793ebec --- /dev/null +++ b/clearml_agent/glue/daemon.py @@ -0,0 +1,15 @@ +from threading import Thread +from clearml_agent.session import Session + + +class K8sDaemon(Thread): + + def __init__(self, agent): + super(K8sDaemon, self).__init__(target=self.target) + self.daemon = True + self._agent = agent + self.log = agent.log + self._session: Session = agent._session + + def target(self): + pass diff --git a/clearml_agent/glue/errors.py b/clearml_agent/glue/errors.py new file mode 100644 index 0000000..8d42d5e --- /dev/null +++ b/clearml_agent/glue/errors.py @@ -0,0 +1,12 @@ + +class GetPodsError(Exception): + pass + + +class GetJobsError(Exception): + pass + + +class GetPodCountError(Exception): + pass + diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index 929f1e9..009f351 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -13,7 +13,6 @@ from collections import defaultdict, namedtuple from copy import deepcopy from pathlib import Path from pprint import pformat -from threading import Thread from time import sleep, time from typing import Text, List, Callable, Any, Collection, Optional, Union, Iterable, Dict, Tuple, Set @@ -28,8 +27,11 @@ from clearml_agent.definitions import ( ENV_AGENT_GIT_PASS, ENV_FORCE_SYSTEM_SITE_PACKAGES, ) -from clearml_agent.errors import APIError +from clearml_agent.errors import APIError, UsageError from clearml_agent.glue.definitions import ENV_START_AGENT_SCRIPT_PATH +from clearml_agent.glue.errors import GetPodCountError +from clearml_agent.glue.utilities import get_path, get_bash_output +from clearml_agent.glue.pending_pods_daemon import PendingPodsDaemon from clearml_agent.helper.base import safe_remove_file from clearml_agent.helper.dicts import merge_dicts from clearml_agent.helper.process import get_bash_output, stringify_bash_output @@ -38,6 +40,7 @@ from clearml_agent.interface.base import ObjectID class K8sIntegration(Worker): + SUPPORTED_KIND = ("pod", "job") K8S_PENDING_QUEUE = "k8s_scheduler" K8S_DEFAULT_NAMESPACE = "clearml" @@ -46,12 +49,6 @@ class K8sIntegration(Worker): KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f" - KUBECTL_CLEANUP_DELETE_CMD = "kubectl delete pods " \ - "-l={agent_label} " \ - "--field-selector=status.phase!=Pending,status.phase!=Running " \ - "--namespace={namespace} " \ - "--output name" - BASH_INSTALL_SSH_CMD = [ "apt-get update", "apt-get install -y openssh-server", @@ -136,6 +133,10 @@ class K8sIntegration(Worker): :param int max_pods_limit: Maximum number of pods that K8S glue can run at the same time """ super(K8sIntegration, self).__init__() + self.kind = os.environ.get("CLEARML_K8S_GLUE_KIND", "pod").strip().lower() + if self.kind not in self.SUPPORTED_KIND: + raise UsageError(f"Kind '{self.kind}' not supported (expected {','.join(self.SUPPORTED_KIND)})") + self.using_jobs = self.kind == "job" self.pod_name_prefix = pod_name_prefix or self.DEFAULT_POD_NAME_PREFIX self.limit_pod_label = limit_pod_label or self.DEFAULT_LIMIT_POD_LABEL self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE @@ -180,11 +181,18 @@ class K8sIntegration(Worker): self._agent_label = None - self._monitor_hanging_pods() + self._pending_pods_daemon = self._create_pending_pods_daemon( + cls_=PendingPodsDaemon, + polling_interval=self._polling_interval + ) + self._pending_pods_daemon.start() self._min_cleanup_interval_per_ns_sec = 1.0 self._last_pod_cleanup_per_ns = defaultdict(lambda: 0.) + def _create_pending_pods_daemon(self, cls_, **kwargs): + return cls_(agent=self, **kwargs) + def _load_overrides_yaml(self, overrides_yaml): if not overrides_yaml: return @@ -210,26 +218,33 @@ class K8sIntegration(Worker): self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers'))) self.overrides_json_string = json.dumps(overrides) - def _monitor_hanging_pods(self): - _check_pod_thread = Thread(target=self._monitor_hanging_pods_daemon) - _check_pod_thread.daemon = True - _check_pod_thread.start() - @staticmethod def _load_template_file(path): with open(os.path.expandvars(os.path.expanduser(str(path))), 'rt') as f: return yaml.load(f, Loader=getattr(yaml, 'FullLoader', None)) - def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json", labels=None): - # type: (str, Iterable[str], Iterable[str], str, Iterable[str]) -> Dict - if not labels: + @staticmethod + def _get_path(d, *path, default=None): + try: + return functools.reduce( + lambda a, b: a[b], path, d + ) + except (IndexError, KeyError): + return default + + def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json", labels=None, ns=None): + # type: (str, Iterable[str], Iterable[str], str, Iterable[str], str) -> Dict + if labels is False: + labels = [] + elif not labels: labels = [self._get_agent_label()] labels = list(labels) + (list(extra_labels) if extra_labels else []) d = { - "-l": ",".join(labels), - "-n": str(self.namespace), + "-n": ns or str(self.namespace), "-o": output, } + if labels: + d["-l"] = ",".join(labels) if filters: d["--field-selector"] = ",".join(filters) return d @@ -240,132 +255,6 @@ class K8sIntegration(Worker): command=command, opts=" ".join(x for item in opts.items() for x in item) ) - def _monitor_hanging_pods_daemon(self): - last_tasks_msgs = {} # last msg updated for every task - - while True: - kubectl_cmd = self.get_kubectl_command("get pods", filters=["status.phase=Pending"]) - self.log.debug("Detecting hanging pods: {}".format(kubectl_cmd)) - output = stringify_bash_output(get_bash_output(kubectl_cmd)) - try: - output_config = json.loads(output) - except Exception as ex: - self.log.warning('K8S Glue pods monitor: Failed parsing kubectl output:\n{}\nEx: {}'.format(output, ex)) - sleep(self._polling_interval) - continue - pods = output_config.get('items', []) - task_id_to_details = dict() - for pod in pods: - pod_name = pod.get('metadata', {}).get('name', None) - if not pod_name: - continue - - task_id = pod_name.rpartition('-')[-1] - if not task_id: - continue - - namespace = pod.get('metadata', {}).get('namespace', None) - if not namespace: - continue - - task_id_to_details[task_id] = (pod_name, namespace) - - msg = None - - waiting = self._get_path(pod, 'status', 'containerStatuses', 0, 'state', 'waiting') - if not waiting: - condition = self._get_path(pod, 'status', 'conditions', 0) - if condition: - reason = condition.get('reason') - if reason == 'Unschedulable': - message = condition.get('message') - msg = reason + (" ({})".format(message) if message else "") - else: - reason = waiting.get("reason", None) - message = waiting.get("message", None) - - msg = reason + (" ({})".format(message) if message else "") - - if reason == 'ImagePullBackOff': - delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, namespace) - self.log.debug(" - deleting pod due to ImagePullBackOff: {}".format(delete_pod_cmd)) - get_bash_output(delete_pod_cmd) - try: - self.log.debug(" - Detecting hanging pods: {}".format(kubectl_cmd)) - self._session.api_client.tasks.failed( - task=task_id, - status_reason="K8S glue error: {}".format(msg), - status_message="Changed by K8S glue", - force=True - ) - except Exception as ex: - self.log.warning( - 'K8S Glue pods monitor: Failed deleting task "{}"\nEX: {}'.format(task_id, ex) - ) - - # clean up any msg for this task - last_tasks_msgs.pop(task_id, None) - continue - if msg and last_tasks_msgs.get(task_id, None) != msg: - try: - result = self._session.send_request( - service='tasks', - action='update', - json={"task": task_id, "status_message": "K8S glue status: {}".format(msg)}, - method=Request.def_method, - async_enable=False, - ) - if not result.ok: - result_msg = self._get_path(result.json(), 'meta', 'result_msg') - raise Exception(result_msg or result.text) - - # update last msg for this task - last_tasks_msgs[task_id] = msg - except Exception as ex: - self.log.warning( - 'K8S Glue pods monitor: Failed setting status message for task "{}"\nMSG: {}\nEX: {}'.format( - task_id, msg, ex - ) - ) - - if task_id_to_details: - try: - result = self._session.get( - service='tasks', - action='get_all', - json={"id": list(task_id_to_details), "status": ["stopped"], "only_fields": ["id"]}, - method=Request.def_method, - async_enable=False, - ) - aborted_task_ids = list(filter(None, (task.get("id") for task in result["tasks"]))) - - for task_id in aborted_task_ids: - pod_name, namespace = task_id_to_details.get(task_id) - if not pod_name: - self.log.error("Failed locating aborted task {} in pending pods list".format(task_id)) - continue - self.log.info( - "K8S Glue pods monitor: task {} was aborted by its pod {} is still pending, " - "deleting pod".format(task_id, pod_name) - ) - - kubectl_cmd = "kubectl delete pod {pod_name} --output name {namespace}".format( - namespace=f"--namespace={namespace}" if namespace else "", pod_name=pod_name, - ).strip() - self.log.debug("Deleting aborted task pending pod: {}".format(kubectl_cmd)) - output = stringify_bash_output(get_bash_output(kubectl_cmd)) - if not output: - self.log.warning("K8S Glue pods monitor: failed deleting pod {}".format(pod_name)) - except Exception as ex: - self.log.warning( - 'K8S Glue pods monitor: failed checking aborted tasks for hanging pods: {}'.format(ex) - ) - - # clean up any last message for a task that wasn't seen as a pod - last_tasks_msgs = {k: v for k, v in last_tasks_msgs.items() if k in task_id_to_details} - - sleep(self._polling_interval) - def _set_task_user_properties(self, task_id: str, task_session=None, **properties: str): session = task_session or self._session if self._edit_hyperparams_support is not True: @@ -465,6 +354,69 @@ class K8sIntegration(Worker): except Exception as ex: print("ERROR: Failed getting tenant for task session: {}".format(ex)) + def get_jobs_info(self, info_path: str, condition: str = None, namespace=None, debug_msg: str = None)\ + -> Dict[str, str]: + cond = "==".join((x.strip("=") for x in condition.partition("=")[::2])) + output = f"jsonpath='{{range .items[?(@.{cond})]}}{{@.{info_path}}}{{\" \"}}{{@.metadata.namespace}}{{\"\\n\"}}{{end}}'" + kubectl_cmd = self.get_kubectl_command("get job", output=output, ns=namespace) + if debug_msg: + self.log.debug(debug_msg.format(cmd=kubectl_cmd)) + output = stringify_bash_output(get_bash_output(kubectl_cmd)) + output = output.strip("'") # for Windows debugging :( + try: + data_items = dict(l.strip().partition(" ")[::2] for l in output.splitlines()) + return data_items + except Exception as ex: + self.log.warning('Failed parsing kubectl output:\n{}\nEx: {}'.format(output, ex)) + + def get_pods_for_jobs(self, job_condition: str = None, pod_filters: List[str] = None, debug_msg: str = None): + controller_uids = self.get_jobs_info( + "spec.selector.matchLabels.controller-uid", condition=job_condition, debug_msg=debug_msg + ) + if not controller_uids: + # No pods were found for these jobs + return [] + pods = self.get_pods(filters=pod_filters, debug_msg=debug_msg) + return [ + pod for pod in pods + if get_path(pod, "metadata", "labels", "controller-uid") in controller_uids + ] + + def get_pods(self, filters: List[str] = None, debug_msg: str = None): + kubectl_cmd = self.get_kubectl_command( + "get pods", + filters=filters, + labels=False if self.using_jobs else None, + ) + if debug_msg: + self.log.debug(debug_msg.format(cmd=kubectl_cmd)) + output = stringify_bash_output(get_bash_output(kubectl_cmd)) + try: + output_config = json.loads(output) + except Exception as ex: + self.log.warning('Failed parsing kubectl output:\n{}\nEx: {}'.format(output, ex)) + return + return output_config.get('items', []) + + def _get_pod_count(self, extra_labels: List[str] = None, msg: str = None): + kubectl_cmd_new = self.get_kubectl_command( + f"get {self.kind}s", + extra_labels= extra_labels + ) + self.log.debug("{}{}".format((msg + ": ") if msg else "", kubectl_cmd_new)) + process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + output, error = process.communicate() + output = stringify_bash_output(output) + error = stringify_bash_output(error) + + try: + return len(json.loads(output).get("items", [])) + except (ValueError, TypeError) as ex: + self.log.warning( + "K8S Glue pods monitor: Failed parsing kubectl output:\n{}\nEx: {}".format(output, ex) + ) + raise GetPodCountError() + def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_): print('Pulling task {} launching on kubernetes cluster'.format(task_id)) session = task_session or self._session @@ -476,6 +428,12 @@ class K8sIntegration(Worker): print('Pushing task {} into temporary pending queue'.format(task_id)) _ = session.api_client.tasks.stop(task_id, force=True) + # Just make sure to clean up in case the task is stuck in the queue (known issue) + self._session.api_client.queues.remove_task( + task=task_id, + queue=self.k8s_pending_queue_id, + ) + res = self._session.api_client.tasks.enqueue( task_id, queue=self.k8s_pending_queue_id, @@ -515,14 +473,14 @@ class K8sIntegration(Worker): hocon_config_encoded = config_content.encode("ascii") - create_clearml_conf = ["echo '{}' | base64 --decode >> ~/clearml.conf".format( + clearml_conf_create_script = ["echo '{}' | base64 --decode >> ~/clearml.conf".format( base64.b64encode( hocon_config_encoded ).decode('ascii') )] if task_session: - create_clearml_conf.append( + clearml_conf_create_script.append( "export CLEARML_AUTH_TOKEN=$(echo '{}' | base64 --decode)".format( base64.b64encode(task_session.token.encode("ascii")).decode('ascii') ) @@ -543,23 +501,15 @@ class K8sIntegration(Worker): while self.ports_mode or self.max_pods_limit: pod_number = self.base_pod_num + pod_count - kubectl_cmd_new = self.get_kubectl_command( - "get pods", - extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if self.ports_mode else None - ) - self.log.debug("Looking for a free pod/port: {}".format(kubectl_cmd_new)) - process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - output, error = process.communicate() - output = stringify_bash_output(output) - error = stringify_bash_output(error) - try: - items_count = len(json.loads(output).get("items", [])) - except (ValueError, TypeError) as ex: + items_count = self._get_pod_count( + extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if self.ports_mode else None, + msg="Looking for a free pod/port" + ) + except GetPodCountError: self.log.warning( - "K8S Glue pods monitor: Failed parsing kubectl output:\n{}\ntask '{}' " - "will be enqueued back to queue '{}'\nEx: {}".format( - output, task_id, queue, ex + "K8S Glue pods monitor: task '{}' will be enqueued back to queue '{}'".format( + task_id, queue ) ) session.api_client.tasks.stop(task_id, force=True) @@ -583,8 +533,6 @@ class K8sIntegration(Worker): if current_pod_count >= max_count: # All pods are taken, exit - self.log.debug( - "kubectl last result: {}\n{}".format(error, output)) self.log.warning( "All k8s services are in use, task '{}' " "will be enqueued back to queue '{}'".format( @@ -629,7 +577,7 @@ class K8sIntegration(Worker): output, error = self._kubectl_apply( template=template, pod_number=pod_number, - create_clearml_conf=create_clearml_conf, + clearml_conf_create_script=clearml_conf_create_script, labels=labels, docker_image=container['image'], docker_args=container['arguments'], @@ -639,11 +587,11 @@ class K8sIntegration(Worker): namespace=namespace, ) - print('kubectl output:\n{}\n{}'.format(error, output)) - if error: - send_log = "Running kubectl encountered an error: {}".format(error) - self.log.error(send_log) - self.send_logs(task_id, send_log.splitlines()) + print('kubectl output:\n{}\n{}'.format(error, output)) + if error: + send_log = "Running kubectl encountered an error: {}".format(error) + self.log.error(send_log) + self.send_logs(task_id, send_log.splitlines()) user_props = {"k8s-queue": str(queue_name)} if self.ports_mode: @@ -704,32 +652,10 @@ class K8sIntegration(Worker): return {target: results} if results else {} return results - def _kubectl_apply( - self, - create_clearml_conf, - docker_image, - docker_args, - docker_bash, - labels, - queue, - task_id, - namespace, - template=None, - pod_number=None - ): - template.setdefault('apiVersion', 'v1') - template['kind'] = 'Pod' - template.setdefault('metadata', {}) - name = self.pod_name_prefix + str(task_id) - template['metadata']['name'] = name - template.setdefault('spec', {}) - template['spec'].setdefault('containers', []) - template['spec'].setdefault('restartPolicy', 'Never') - if labels: - labels_dict = dict(pair.split('=', 1) for pair in labels) - template['metadata'].setdefault('labels', {}) - template['metadata']['labels'].update(labels_dict) - + def _create_template_container( + self, pod_name: str, task_id: str, docker_image: str, docker_args: List[str], + docker_bash: str, clearml_conf_create_script: List[str] + ) -> dict: container = self._get_docker_args( docker_args, target="env", @@ -753,7 +679,7 @@ class K8sIntegration(Worker): agent_install_args=self.POD_AGENT_INSTALL_ARGS) for line in container_bash_script]) - extra_bash_commands = list(create_clearml_conf or []) + extra_bash_commands = list(clearml_conf_create_script or []) start_agent_script_path = ENV_START_AGENT_SCRIPT_PATH.get() or "~/__start_agent__.sh" @@ -767,20 +693,77 @@ class K8sIntegration(Worker): ) # Notice: we always leave with exit code 0, so pods are never restarted - container = self._merge_containers( + return self._merge_containers( container, - dict(name=name, image=docker_image, + dict(name=pod_name, image=docker_image, command=['/bin/bash'], args=['-c', '{} ; exit 0'.format(' ; '.join(extra_bash_commands))]) ) - if template['spec']['containers']: - template['spec']['containers'][0] = self._merge_containers(template['spec']['containers'][0], container) + def _kubectl_apply( + self, + clearml_conf_create_script: List[str], + docker_image, + docker_args, + docker_bash, + labels, + queue, + task_id, + namespace, + template=None, + pod_number=None + ): + if "apiVersion" not in template: + template["apiVersion"] = "batch/v1" if self.using_jobs else "v1" + if "kind" in template: + if template["kind"].lower() != self.kind: + return ( + "", f"Template kind {template['kind']} does not maych kind {self.kind.capitalize()} set for agent" + ) else: - template['spec']['containers'].append(container) + template["kind"] = self.kind.capitalize() + + metadata = template.setdefault('metadata', {}) + name = self.pod_name_prefix + str(task_id) + metadata['name'] = name + + def place_labels(metadata_dict): + labels_dict = dict(pair.split('=', 1) for pair in labels) + metadata_dict.setdefault('labels', {}).update(labels_dict) + + if labels: + # Place labels on base resource (job or single pod) + place_labels(metadata) + + spec = template.setdefault('spec', {}) + if self.using_jobs: + spec.setdefault('backoffLimit', 0) + spec_template = spec.setdefault('template', {}) + if labels: + # Place same labels fro any pod spawned by the job + place_labels(spec_template.setdefault('metadata', {})) + + spec = spec_template.setdefault('spec', {}) + + containers = spec.setdefault('containers', []) + spec.setdefault('restartPolicy', 'Never') + + container = self._create_template_container( + pod_name=name, + task_id=task_id, + docker_image=docker_image, + docker_args=docker_args, + docker_bash=docker_bash, + clearml_conf_create_script=clearml_conf_create_script + ) + + if containers: + containers[0] = self._merge_containers(containers[0], container) + else: + containers.append(container) if self._docker_force_pull: - for c in template['spec']['containers']: + for c in containers: c.setdefault('imagePullPolicy', 'Always') fp, yaml_file = tempfile.mkstemp(prefix='clearml_k8stmpl_', suffix='.yml') @@ -812,6 +795,83 @@ class K8sIntegration(Worker): return stringify_bash_output(output), stringify_bash_output(error) + def _process_bash_lines_response(self, bash_cmd: str, raise_error=True): + res = get_bash_output(bash_cmd, raise_error=raise_error) + lines = [ + line for line in + (r.strip().rpartition("/")[-1] for r in res.splitlines()) + if line.startswith(self.pod_name_prefix) + ] + return lines + + def _delete_pods(self, selectors: List[str], namespace: str, msg: str = None) -> List[str]: + kubectl_cmd = \ + "kubectl delete pod -l={agent_label} " \ + "--namespace={namespace} --field-selector={selector} --output name".format( + selector=",".join(selectors), + agent_label=self._get_agent_label(), + namespace=namespace, + ) + self.log.debug("Deleting old/failed pods{} for ns {}: {}".format( + msg or "", namespace, kubectl_cmd + )) + lines = self._process_bash_lines_response(kubectl_cmd) + self.log.debug(" - deleted pods %s", ", ".join(lines)) + return lines + + def _delete_jobs_by_names(self, names_to_ns: Dict[str, str], msg: str = None) -> List[str]: + if not names_to_ns: + return [] + ns_to_names = defaultdict(list) + for name, ns in names_to_ns.items(): + ns_to_names[ns].append(name) + + results = [] + for ns, names in ns_to_names.items(): + kubectl_cmd = "kubectl delete job --namespace={ns} --output=name {names}".format( + ns=ns, names=" ".join(names) + ) + self.log.debug("Deleting jobs {}: {}".format( + msg or "", kubectl_cmd + )) + lines = self._process_bash_lines_response(kubectl_cmd) + if not lines: + continue + self.log.debug(" - deleted jobs %s", ", ".join(lines)) + results.extend(lines) + return results + + def _delete_completed_or_failed_pods(self, namespace, msg: str = None): + if not self.using_jobs: + return self._delete_pods( + selectors=["status.phase!=Pending", "status.phase!=Running"], namespace=namespace, msg=msg + ) + + job_names_to_delete = {} + + # locate failed pods for jobs + failed_pods = self.get_pods_for_jobs( + job_condition="status.active=1", + pod_filters=["status.phase!=Pending", "status.phase!=Running", "status.phase!=Terminating"], + debug_msg="Deleting failed pods: {cmd}" + ) + if failed_pods: + job_names_to_delete = { + get_path(pod, "metadata", "labels", "job-name"): get_path(pod, "metadata", "namespace") + for pod in failed_pods + if get_path(pod, "metadata", "labels", "job-name") + } + self.log.debug(f" - found jobs with failed pods: {' '.join(job_names_to_delete)}") + + completed_job_names = self.get_jobs_info( + "metadata.name", condition="status.succeeded=1", namespace=namespace, debug_msg=msg + ) + if completed_job_names: + self.log.debug(f" - found completed jobs: {' '.join(completed_job_names)}") + job_names_to_delete.update(completed_job_names) + + return self._delete_jobs_by_names(names_to_ns=job_names_to_delete, msg=msg) + def _cleanup_old_pods(self, namespaces, extra_msg=None): # type: (Iterable[str], Optional[str]) -> Dict[str, List[str]] self.log.debug("Cleaning up pods") @@ -820,23 +880,12 @@ class K8sIntegration(Worker): if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec: # Do not try to cleanup the same namespace too quickly continue - kubectl_cmd = self.KUBECTL_CLEANUP_DELETE_CMD.format( - namespace=namespace, agent_label=self._get_agent_label() - ) - self.log.debug("Deleting old/failed pods{} for ns {}: {}".format( - extra_msg or "", namespace, kubectl_cmd - )) + try: - res = get_bash_output(kubectl_cmd, raise_error=True) - lines = [ - line for line in - (r.strip().rpartition("/")[-1] for r in res.splitlines()) - if line.startswith(self.pod_name_prefix) - ] - self.log.debug(" - deleted pod(s) %s", ", ".join(lines)) - deleted_pods[namespace].extend(lines) + res = self._delete_completed_or_failed_pods(namespace, extra_msg) + deleted_pods[namespace].extend(res) except Exception as ex: - self.log.error("Failed deleting old/failed pods for ns %s: %s", namespace, str(ex)) + self.log.error("Failed deleting completed/failed pods for ns %s: %s", namespace, str(ex)) finally: self._last_pod_cleanup_per_ns[namespace] = time() @@ -857,7 +906,7 @@ class K8sIntegration(Worker): ) tasks_to_abort = result["tasks"] except Exception as ex: - self.log.warning('Failed getting running tasks for deleted pods: {}'.format(ex)) + self.log.warning('Failed getting running tasks for deleted {}(s): {}'.format(self.kind, ex)) for task in tasks_to_abort: task_id = task.get("id") @@ -870,15 +919,27 @@ class K8sIntegration(Worker): self._session.get( service='tasks', action='dequeue', - json={"task": task_id, "force": True, "status_reason": "Pod deleted (not pending or running)", - "status_message": "Pod deleted by agent {}".format(self.worker_id or "unknown")}, + json={ + "task": task_id, + "force": True, + "status_reason": "Pod deleted (not pending or running)", + "status_message": "{} deleted by agent {}".format( + self.kind.capitalize(), self.worker_id or "unknown" + ) + }, method=Request.def_method, ) self._session.get( service='tasks', action='failed', - json={"task": task_id, "force": True, "status_reason": "Pod deleted (not pending or running)", - "status_message": "Pod deleted by agent {}".format(self.worker_id or "unknown")}, + json={ + "task": task_id, + "force": True, + "status_reason": "Pod deleted (not pending or running)", + "status_message": "{} deleted by agent {}".format( + self.kind.capitalize(), self.worker_id or "unknown" + ) + }, method=Request.def_method, ) except Exception as ex: @@ -919,10 +980,10 @@ class K8sIntegration(Worker): # check if have pod limit, then check if we hit it. if self.max_pods_limit: if current_pods >= self.max_pods_limit: - print("Maximum pod limit reached {}/{}, sleeping for {:.1f} seconds".format( - current_pods, self.max_pods_limit, self._polling_interval)) + print("Maximum {} limit reached {}/{}, sleeping for {:.1f} seconds".format( + self.kind, current_pods, self.max_pods_limit, self._polling_interval)) # delete old completed / failed pods - self._cleanup_old_pods(namespaces, " due to pod limit") + self._cleanup_old_pods(namespaces, f" due to {self.kind} limit") # go to sleep sleep(self._polling_interval) continue @@ -930,7 +991,7 @@ class K8sIntegration(Worker): # iterate over queues (priority style, queues[0] is highest) for queue in queues: # delete old completed / failed pods - self._cleanup_old_pods(namespaces) + self._cleanup_old_pods(namespaces, extra_msg="Cleanup cycle {cmd}") # get next task in queue try: diff --git a/clearml_agent/glue/pending_pods_daemon.py b/clearml_agent/glue/pending_pods_daemon.py new file mode 100644 index 0000000..d6d0707 --- /dev/null +++ b/clearml_agent/glue/pending_pods_daemon.py @@ -0,0 +1,223 @@ +from time import sleep +from typing import Dict, Tuple, Optional, List + +from clearml_agent.backend_api.session import Request +from clearml_agent.glue.utilities import get_bash_output + +from clearml_agent.helper.process import stringify_bash_output + +from .daemon import K8sDaemon +from .utilities import get_path +from .errors import GetPodsError + + +class PendingPodsDaemon(K8sDaemon): + def __init__(self, polling_interval: float, agent): + super(PendingPodsDaemon, self).__init__(agent=agent) + self._polling_interval = polling_interval + self._last_tasks_msgs = {} # last msg updated for every task + + def get_pods(self): + if self._agent.using_jobs: + return self._agent.get_pods_for_jobs( + job_condition="status.active=1", + pod_filters=["status.phase=Pending"], + debug_msg="Detecting pending pods: {cmd}" + ) + return self._agent.get_pods( + filters=["status.phase=Pending"], + debug_msg="Detecting pending pods: {cmd}" + ) + + def _get_pod_name(self, pod: dict): + return get_path(pod, "metadata", "name") + + def _get_k8s_resource_name(self, pod: dict): + if self._agent.using_jobs: + return get_path(pod, "metadata", "labels", "job-name") + return get_path(pod, "metadata", "name") + + def _get_task_id(self, pod: dict): + return self._get_k8s_resource_name(pod).rpartition('-')[-1] + + @staticmethod + def _get_k8s_resource_namespace(pod: dict): + return pod.get('metadata', {}).get('namespace', None) + + def target(self): + """ + Handle pending objects (pods or jobs, depending on the agent mode). + - Delete any pending objects that are not expected to recover + - Delete any pending objects for whom the associated task was aborted + """ + while True: + # noinspection PyBroadException + try: + # Get pods (standalone pods if we're in pods mode, or pods associated to jobs if we're in jobs mode) + pods = self.get_pods() + if pods is None: + raise GetPodsError() + + task_id_to_pod = dict() + + for pod in pods: + pod_name = self._get_pod_name(pod) + if not pod_name: + continue + + task_id = self._get_task_id(pod) + if not task_id: + continue + + namespace = self._get_k8s_resource_namespace(pod) + if not namespace: + continue + + task_id_to_pod[task_id] = pod + + msg = None + tags = [] + + waiting = get_path(pod, 'status', 'containerStatuses', 0, 'state', 'waiting') + if not waiting: + condition = get_path(pod, 'status', 'conditions', 0) + if condition: + reason = condition.get('reason') + if reason == 'Unschedulable': + message = condition.get('message') + msg = reason + (" ({})".format(message) if message else "") + else: + reason = waiting.get("reason", None) + message = waiting.get("message", None) + + msg = reason + (" ({})".format(message) if message else "") + + if reason == 'ImagePullBackOff': + self.delete_k8s_resource(k8s_resource=pod, msg=reason) + try: + self._session.api_client.tasks.failed( + task=task_id, + status_reason="K8S glue error: {}".format(msg), + status_message="Changed by K8S glue", + force=True + ) + self._agent.command.send_logs( + task_id, ["K8S Error: {}".format(msg)], + session=self._session + ) + except Exception as ex: + self.log.warning( + 'K8S Glue pending monitor: Failed deleting task "{}"\nEX: {}'.format(task_id, ex) + ) + + # clean up any msg for this task + self._last_tasks_msgs.pop(task_id, None) + continue + + self._update_pending_task_msg(task_id, msg, tags) + + if task_id_to_pod: + self._process_tasks_for_pending_pods(task_id_to_pod) + + # clean up any last message for a task that wasn't seen as a pod + self._last_tasks_msgs = {k: v for k, v in self._last_tasks_msgs.items() if k in task_id_to_pod} + except GetPodsError: + pass + except Exception: + self.log.exception("Hanging pods daemon loop") + + sleep(self._polling_interval) + + def delete_k8s_resource(self, k8s_resource: dict, msg: str = None): + delete_cmd = "kubectl delete {kind} {name} -n {namespace} --output name".format( + kind=self._agent.kind, + name=self._get_k8s_resource_name(k8s_resource), + namespace=self._get_k8s_resource_namespace(k8s_resource) + ).strip() + self.log.debug(" - deleting {} {}: {}".format(self._agent.kind, (" " + msg) if msg else "", delete_cmd)) + return get_bash_output(delete_cmd).strip() + + def _process_tasks_for_pending_pods(self, task_id_to_details: Dict[str, dict]): + self._handle_aborted_tasks(task_id_to_details) + + def _handle_aborted_tasks(self, pending_tasks_details: Dict[str, dict]): + try: + result = self._session.get( + service='tasks', + action='get_all', + json={ + "id": list(pending_tasks_details), + "status": ["stopped"], + "only_fields": ["id"] + }, + method=Request.def_method, + async_enable=False, + ) + aborted_task_ids = list(filter(None, (task.get("id") for task in result["tasks"]))) + + for task_id in aborted_task_ids: + pod = pending_tasks_details.get(task_id) + if not pod: + self.log.error("Failed locating aborted task {} in pending pods list".format(task_id)) + continue + resource_name = self._get_k8s_resource_name(pod) + self.log.info( + "K8S Glue pending monitor: task {} was aborted but the k8s resource {} is still pending, " + "deleting pod".format(task_id, resource_name) + ) + output = self.delete_k8s_resource(k8s_resource=pod, msg="Pending resource of an aborted task") + if not output: + self.log.warning("K8S Glue pending monitor: failed deleting resource {}".format(resource_name)) + except Exception as ex: + self.log.warning( + 'K8S Glue pending monitor: failed checking aborted tasks for pending resources: {}'.format(ex) + ) + + def _update_pending_task_msg(self, task_id: str, msg: str, tags: List[str] = None): + if not msg or self._last_tasks_msgs.get(task_id, None) == (msg, tags): + return + try: + # Make sure the task is queued + result = self._session.send_request( + service='tasks', + action='get_all', + json={"id": task_id, "only_fields": ["status"]}, + method=Request.def_method, + async_enable=False, + ) + if result.ok: + status = get_path(result.json(), 'data', 'tasks', 0, 'status') + # if task is in progress, change its status to enqueued + if status == "in_progress": + result = self._session.send_request( + service='tasks', action='enqueue', + json={ + "task": task_id, "force": True, "queue": self._agent.k8s_pending_queue_id + }, + method=Request.def_method, + async_enable=False, + ) + if not result.ok: + result_msg = get_path(result.json(), 'meta', 'result_msg') + self.log.debug( + "K8S Glue pods monitor: failed forcing task status change" + " for pending task {}: {}".format(task_id, result_msg) + ) + + # Update task status message + payload = {"task": task_id, "status_message": "K8S glue status: {}".format(msg)} + if tags: + payload["tags"] = tags + result = self._session.send_request('tasks', 'update', json=payload, method=Request.def_method) + if not result.ok: + result_msg = get_path(result.json(), 'meta', 'result_msg') + raise Exception(result_msg or result.text) + + # update last msg for this task + self._last_tasks_msgs[task_id] = msg + except Exception as ex: + self.log.warning( + 'K8S Glue pods monitor: Failed setting status message for task "{}"\nMSG: {}\nEX: {}'.format( + task_id, msg, ex + ) + ) diff --git a/clearml_agent/glue/utilities.py b/clearml_agent/glue/utilities.py new file mode 100644 index 0000000..178a6c2 --- /dev/null +++ b/clearml_agent/glue/utilities.py @@ -0,0 +1,18 @@ +import functools + +from subprocess import DEVNULL + +from clearml_agent.helper.process import get_bash_output as _get_bash_output + + +def get_path(d, *path, default=None): + try: + return functools.reduce( + lambda a, b: a[b], path, d + ) + except (IndexError, KeyError): + return default + + +def get_bash_output(cmd, stderr=DEVNULL, raise_error=False): + return _get_bash_output(cmd, stderr=stderr, raise_error=raise_error) diff --git a/clearml_agent/helper/base.py b/clearml_agent/helper/base.py index ce786af..9600a2d 100644 --- a/clearml_agent/helper/base.py +++ b/clearml_agent/helper/base.py @@ -20,20 +20,22 @@ from typing import Text, Dict, Any, Optional, AnyStr, IO, Union import attr import furl +import six import yaml from attr import fields_dict from pathlib2 import Path - -import six from six.moves import reduce -from clearml_agent.external import pyhocon + from clearml_agent.errors import CommandFailedError +from clearml_agent.external import pyhocon from clearml_agent.helper.dicts import filter_keys pretty_lines = False log = logging.getLogger(__name__) +use_powershell = os.getenv("CLEARML_AGENT_USE_POWERSHELL", None) + def which(cmd, path=None): result = find_executable(cmd, path) @@ -52,7 +54,7 @@ def select_for_platform(linux, windows): def bash_c(): - return 'bash -c' if not is_windows_platform() else 'cmd /c' + return 'bash -c' if not is_windows_platform() else ('powershell -Command' if use_powershell else 'cmd /c') def return_list(arg): diff --git a/clearml_agent/version.py b/clearml_agent/version.py index 76a03c4..33ae61d 100644 --- a/clearml_agent/version.py +++ b/clearml_agent/version.py @@ -1 +1 @@ -__version__ = '1.5.3rc3' +__version__ = '1.5.3rc4'