Support ignoring kubectl errors

This commit is contained in:
clearml 2024-12-12 23:41:31 +02:00
parent 26d748a4d8
commit 4fa61dde1f
2 changed files with 74 additions and 26 deletions

View File

@ -1,3 +1,5 @@
import shlex
from clearml_agent.helper.environment import EnvEntry
ENV_START_AGENT_SCRIPT_PATH = EnvEntry("CLEARML_K8S_GLUE_START_AGENT_SCRIPT_PATH", default="~/__start_agent__.sh")
@ -17,4 +19,14 @@ ENV_POD_USE_IMAGE_ENTRYPOINT = EnvEntry("K8S_GLUE_POD_USE_IMAGE_ENTRYPOINT", def
"""
Do not inject a cmd and args to the container's image when building the k8s template (depend on the built-in image
entrypoint)
"""
"""
ENV_KUBECTL_IGNORE_ERROR = EnvEntry("K8S_GLUE_IGNORE_KUBECTL_ERROR", default=None)
"""
Ignore kubectl errors matching this string pattern (allows ignoring warnings sent on stderr while
kubectl actually works and starts the pod)
"""
ENV_DEFAULT_SCHEDULER_QUEUE_TAGS = EnvEntry(
"K8S_GLUE_DEFAULT_SCHEDULER_QUEUE_TAGS", default=["k8s-glue"], converter=shlex.split
)

View File

@ -42,6 +42,8 @@ from clearml_agent.glue.definitions import (
ENV_DEFAULT_EXECUTION_AGENT_ARGS,
ENV_POD_AGENT_INSTALL_ARGS,
ENV_POD_USE_IMAGE_ENTRYPOINT,
ENV_KUBECTL_IGNORE_ERROR,
ENV_DEFAULT_SCHEDULER_QUEUE_TAGS,
)
@ -208,6 +210,10 @@ class K8sIntegration(Worker):
self._session.feature_set != "basic" and self._session.check_min_server_version("3.22.3")
)
self.ignore_kubectl_errors_re = (
re.compile(ENV_KUBECTL_IGNORE_ERROR.get()) if ENV_KUBECTL_IGNORE_ERROR.get() else None
)
@property
def agent_label(self):
return self._get_agent_label()
@ -466,13 +472,34 @@ class K8sIntegration(Worker):
queue=self.k8s_pending_queue_id,
)
res = self._session.api_client.tasks.enqueue(
task_id,
queue=self.k8s_pending_queue_id,
status_reason='k8s pending scheduler',
)
if res.meta.result_code != 200:
raise Exception(res.meta.result_msg)
for attempt in range(2):
res = self._session.send_request(
"tasks",
"enqueue",
json={
"task": task_id,
"queue": self.k8s_pending_queue_id,
"status_reason": "k8s pending scheduler",
"update_execution_queue": False,
}
)
if res.ok:
break
# noinspection PyBroadException
try:
result_subcode = res.json()["meta"]["result_subcode"]
result_msg = res.json()["meta"]["result_msg"]
except Exception:
result_subcode = None
result_msg = res.text
if attempt == 0 and res.status_code == 400 and result_subcode == 701:
# Invalid queue ID, only retry once
self._ensure_pending_queue_exists()
continue
raise Exception(result_msg)
except Exception as e:
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue {} [{}], error: {}".format(
task_id, self.k8s_pending_queue_name, self.k8s_pending_queue_id, e))
@ -627,18 +654,21 @@ class K8sIntegration(Worker):
print('kubectl output:\n{}\n{}'.format(error, output))
if error:
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
if self.ignore_kubectl_errors_re and self.ignore_kubectl_errors_re.match(error):
print(f"Ignoring error due to {ENV_KUBECTL_IGNORE_ERROR.key}")
else:
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
# Make sure to remove the task from our k8s pending queue
self._session.api_client.queues.remove_task(
task=task_id,
queue=self.k8s_pending_queue_id,
)
# Set task as failed
session.api_client.tasks.failed(task_id, force=True)
return
# Make sure to remove the task from our k8s pending queue
self._session.api_client.queues.remove_task(
task=task_id,
queue=self.k8s_pending_queue_id,
)
# Set task as failed
session.api_client.tasks.failed(task_id, force=True)
return
if pod_name:
self.resource_applied(
@ -1089,6 +1119,18 @@ class K8sIntegration(Worker):
def check_if_suspended(self) -> bool:
pass
def _ensure_pending_queue_exists(self):
resolved_ids = self._resolve_queue_names(
[self.k8s_pending_queue_name],
create_if_missing=True,
create_system_tags=ENV_DEFAULT_SCHEDULER_QUEUE_TAGS.get()
)
if not resolved_ids:
raise ValueError(
"Failed resolving or creating k8s pending queue {}".format(self.k8s_pending_queue_name)
)
self.k8s_pending_queue_id = resolved_ids[0]
def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):
"""
:summary: Pull and run tasks from queues.
@ -1104,14 +1146,8 @@ class K8sIntegration(Worker):
events_service = self.get_service(Events)
# make sure we have a k8s pending queue
if not self.k8s_pending_queue_id:
resolved_ids = self._resolve_queue_names([self.k8s_pending_queue_name], create_if_missing=True)
if not resolved_ids:
raise ValueError(
"Failed resolving or creating k8s pending queue {}".format(self.k8s_pending_queue_name)
)
self.k8s_pending_queue_id = resolved_ids[0]
self._ensure_pending_queue_exists()
_last_machine_update_ts = 0
while True: