diff --git a/clearml_agent/glue/definitions.py b/clearml_agent/glue/definitions.py index c28d422..22f09a9 100644 --- a/clearml_agent/glue/definitions.py +++ b/clearml_agent/glue/definitions.py @@ -1,3 +1,5 @@ +import shlex + from clearml_agent.helper.environment import EnvEntry ENV_START_AGENT_SCRIPT_PATH = EnvEntry("CLEARML_K8S_GLUE_START_AGENT_SCRIPT_PATH", default="~/__start_agent__.sh") @@ -17,4 +19,14 @@ ENV_POD_USE_IMAGE_ENTRYPOINT = EnvEntry("K8S_GLUE_POD_USE_IMAGE_ENTRYPOINT", def """ Do not inject a cmd and args to the container's image when building the k8s template (depend on the built-in image entrypoint) -""" \ No newline at end of file +""" + +ENV_KUBECTL_IGNORE_ERROR = EnvEntry("K8S_GLUE_IGNORE_KUBECTL_ERROR", default=None) +""" +Ignore kubectl errors matching this string pattern (allows ignoring warnings sent on stderr while +kubectl actually works and starts the pod) +""" + +ENV_DEFAULT_SCHEDULER_QUEUE_TAGS = EnvEntry( + "K8S_GLUE_DEFAULT_SCHEDULER_QUEUE_TAGS", default=["k8s-glue"], converter=shlex.split +) diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index cbf82c0..d2719f6 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -42,6 +42,8 @@ from clearml_agent.glue.definitions import ( ENV_DEFAULT_EXECUTION_AGENT_ARGS, ENV_POD_AGENT_INSTALL_ARGS, ENV_POD_USE_IMAGE_ENTRYPOINT, + ENV_KUBECTL_IGNORE_ERROR, + ENV_DEFAULT_SCHEDULER_QUEUE_TAGS, ) @@ -208,6 +210,10 @@ class K8sIntegration(Worker): self._session.feature_set != "basic" and self._session.check_min_server_version("3.22.3") ) + self.ignore_kubectl_errors_re = ( + re.compile(ENV_KUBECTL_IGNORE_ERROR.get()) if ENV_KUBECTL_IGNORE_ERROR.get() else None + ) + @property def agent_label(self): return self._get_agent_label() @@ -466,13 +472,34 @@ class K8sIntegration(Worker): queue=self.k8s_pending_queue_id, ) - res = self._session.api_client.tasks.enqueue( - task_id, - queue=self.k8s_pending_queue_id, - status_reason='k8s pending scheduler', - ) - if res.meta.result_code != 200: - raise Exception(res.meta.result_msg) + for attempt in range(2): + res = self._session.send_request( + "tasks", + "enqueue", + json={ + "task": task_id, + "queue": self.k8s_pending_queue_id, + "status_reason": "k8s pending scheduler", + "update_execution_queue": False, + } + ) + if res.ok: + break + + # noinspection PyBroadException + try: + result_subcode = res.json()["meta"]["result_subcode"] + result_msg = res.json()["meta"]["result_msg"] + except Exception: + result_subcode = None + result_msg = res.text + + if attempt == 0 and res.status_code == 400 and result_subcode == 701: + # Invalid queue ID, only retry once + self._ensure_pending_queue_exists() + continue + raise Exception(result_msg) + except Exception as e: self.log.error("ERROR: Could not push back task [{}] to k8s pending queue {} [{}], error: {}".format( task_id, self.k8s_pending_queue_name, self.k8s_pending_queue_id, e)) @@ -627,18 +654,21 @@ class K8sIntegration(Worker): print('kubectl output:\n{}\n{}'.format(error, output)) if error: - send_log = "Running kubectl encountered an error: {}".format(error) - self.log.error(send_log) - self.send_logs(task_id, send_log.splitlines()) + if self.ignore_kubectl_errors_re and self.ignore_kubectl_errors_re.match(error): + print(f"Ignoring error due to {ENV_KUBECTL_IGNORE_ERROR.key}") + else: + send_log = "Running kubectl encountered an error: {}".format(error) + self.log.error(send_log) + self.send_logs(task_id, send_log.splitlines()) - # Make sure to remove the task from our k8s pending queue - self._session.api_client.queues.remove_task( - task=task_id, - queue=self.k8s_pending_queue_id, - ) - # Set task as failed - session.api_client.tasks.failed(task_id, force=True) - return + # Make sure to remove the task from our k8s pending queue + self._session.api_client.queues.remove_task( + task=task_id, + queue=self.k8s_pending_queue_id, + ) + # Set task as failed + session.api_client.tasks.failed(task_id, force=True) + return if pod_name: self.resource_applied( @@ -1089,6 +1119,18 @@ class K8sIntegration(Worker): def check_if_suspended(self) -> bool: pass + def _ensure_pending_queue_exists(self): + resolved_ids = self._resolve_queue_names( + [self.k8s_pending_queue_name], + create_if_missing=True, + create_system_tags=ENV_DEFAULT_SCHEDULER_QUEUE_TAGS.get() + ) + if not resolved_ids: + raise ValueError( + "Failed resolving or creating k8s pending queue {}".format(self.k8s_pending_queue_name) + ) + self.k8s_pending_queue_id = resolved_ids[0] + def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs): """ :summary: Pull and run tasks from queues. @@ -1104,14 +1146,8 @@ class K8sIntegration(Worker): events_service = self.get_service(Events) - # make sure we have a k8s pending queue if not self.k8s_pending_queue_id: - resolved_ids = self._resolve_queue_names([self.k8s_pending_queue_name], create_if_missing=True) - if not resolved_ids: - raise ValueError( - "Failed resolving or creating k8s pending queue {}".format(self.k8s_pending_queue_name) - ) - self.k8s_pending_queue_id = resolved_ids[0] + self._ensure_pending_queue_exists() _last_machine_update_ts = 0 while True: