Support skipping re-enqueue on suspected preempted k8s pods

This commit is contained in:
allegroai 2024-04-19 23:46:57 +03:00
parent 6fb48a4c6e
commit 10c6629982
2 changed files with 44 additions and 28 deletions

View File

@ -9,3 +9,6 @@ Script will be appended to the specified file.
ENV_DEFAULT_EXECUTION_AGENT_ARGS = EnvEntry("K8S_GLUE_DEF_EXEC_AGENT_ARGS", default="--full-monitoring --require-queue")
ENV_POD_AGENT_INSTALL_ARGS = EnvEntry("K8S_GLUE_POD_AGENT_INSTALL_ARGS", default="", lstrip=False)
ENV_POD_MONITOR_LOG_BATCH_SIZE = EnvEntry("K8S_GLUE_POD_MONITOR_LOG_BATCH_SIZE", default=5, converter=int)
ENV_POD_MONITOR_DISABLE_ENQUEUE_ON_PREEMPTION = EnvEntry(
"K8S_GLUE_POD_MONITOR_DISABLE_ENQUEUE_ON_PREEMPTION", default=False, converter=bool
)

View File

@ -9,6 +9,7 @@ from clearml_agent.helper.process import stringify_bash_output
from .daemon import K8sDaemon
from .utilities import get_path
from .errors import GetPodsError
from .definitions import ENV_POD_MONITOR_DISABLE_ENQUEUE_ON_PREEMPTION
class PendingPodsDaemon(K8sDaemon):
@ -17,16 +18,16 @@ class PendingPodsDaemon(K8sDaemon):
self._polling_interval = polling_interval
self._last_tasks_msgs = {} # last msg updated for every task
def get_pods(self, pod_name=None):
def get_pods(self, pod_name=None, debug_msg="Detecting pending pods: {cmd}"):
filters = ["status.phase=Pending"]
if pod_name:
filters.append(f"metadata.name={pod_name}")
if self._agent.using_jobs:
return self._agent.get_pods_for_jobs(
job_condition="status.active=1", pod_filters=filters, debug_msg="Detecting pending pods: {cmd}"
job_condition="status.active=1", pod_filters=filters, debug_msg=debug_msg
)
return self._agent.get_pods(filters=filters, debug_msg="Detecting pending pods: {cmd}")
return self._agent.get_pods(filters=filters, debug_msg=debug_msg)
def _get_pod_name(self, pod: dict):
return get_path(pod, "metadata", "name")
@ -72,6 +73,11 @@ class PendingPodsDaemon(K8sDaemon):
if not namespace:
continue
updated_pod = self.get_pods(pod_name=pod_name, debug_msg="Refreshing pod information: {cmd}")
if not updated_pod:
continue
pod = updated_pod[0]
task_id_to_pod[task_id] = pod
msg = None
@ -190,32 +196,39 @@ class PendingPodsDaemon(K8sDaemon):
if not msg or self._last_tasks_msgs.get(task_id, None) == (msg, tags):
return
try:
# Make sure the task is queued
result = self._session.send_request(
service='tasks',
action='get_all',
json={"id": task_id, "only_fields": ["status"]},
method=Request.def_method,
async_enable=False,
)
if result.ok:
status = get_path(result.json(), 'data', 'tasks', 0, 'status')
# if task is in progress, change its status to enqueued
if status == "in_progress":
result = self._session.send_request(
service='tasks', action='enqueue',
json={
"task": task_id, "force": True, "queue": self._agent.k8s_pending_queue_id
},
method=Request.def_method,
async_enable=False,
)
if not result.ok:
result_msg = get_path(result.json(), 'meta', 'result_msg')
self.log.debug(
"K8S Glue pods monitor: failed forcing task status change"
" for pending task {}: {}".format(task_id, result_msg)
if ENV_POD_MONITOR_DISABLE_ENQUEUE_ON_PREEMPTION.get():
# This disables the option to enqueue the task which is supposed to sync the ClearML task status
# in case the pod was preempted. In some cases this does not happen due to preemption but due to
# cluster communication lag issues that cause us not to discover the pod is no longer pending and
# enqueue the task when it's actually already running, thus essentially killing the task
pass
else:
# Make sure the task is queued
result = self._session.send_request(
service='tasks',
action='get_all',
json={"id": task_id, "only_fields": ["status"]},
method=Request.def_method,
async_enable=False,
)
if result.ok:
status = get_path(result.json(), 'data', 'tasks', 0, 'status')
# if task is in progress, change its status to enqueued
if status == "in_progress":
result = self._session.send_request(
service='tasks', action='enqueue',
json={
"task": task_id, "force": True, "queue": self._agent.k8s_pending_queue_id
},
method=Request.def_method,
async_enable=False,
)
if not result.ok:
result_msg = get_path(result.json(), 'meta', 'result_msg')
self.log.debug(
"K8S Glue pods monitor: failed forcing task status change"
" for pending task {}: {}".format(task_id, result_msg)
)
# Update task status message
payload = {"task": task_id, "status_message": "K8S glue status: {}".format(msg)}