Refactor k8s glue

This commit is contained in:
clearml 2024-12-26 18:58:00 +02:00
parent 4fa61dde1f
commit fc1abbab0b
2 changed files with 92 additions and 23 deletions

View File

@ -638,18 +638,65 @@ class K8sIntegration(Worker):
print("ERROR: no template for task {}, skipping".format(task_id))
return
pod_name = self.pod_name_prefix + str(task_id)
self.apply_template_and_handle_result(
pod_name=pod_name,
clearml_conf_create_script=clearml_conf_create_script,
labels=labels,
queue=queue,
task_id=task_id,
namespace=namespace,
template=template,
docker_image=container['image'],
docker_args=container.get('arguments'),
docker_bash=container.get('setup_shell_script'),
session=session,
task_session=task_session,
pod_number=pod_number,
queue_name=queue_name,
task_data=task_data,
ports_mode=ports_mode,
pod_count=pod_count,
)
def apply_template_and_handle_result(
self,
pod_name,
clearml_conf_create_script: List[str],
labels,
queue,
task_id,
namespace,
template,
docker_image,
docker_args,
docker_bash,
session,
task_session,
queue_name,
task_data,
ports_mode,
pod_count,
pod_number=None,
base_spec: dict = None,
):
"""Apply the provided template with all custom settings and handle bookkeeping for the reaults"""
output, error, pod_name = self._kubectl_apply(
pod_name=pod_name,
template=template,
pod_number=pod_number,
clearml_conf_create_script=clearml_conf_create_script,
labels=labels,
docker_image=container['image'],
docker_args=container.get('arguments'),
docker_bash=container.get('setup_shell_script'),
docker_image=docker_image,
docker_args=docker_args,
docker_bash=docker_bash,
task_id=task_id,
queue=queue,
namespace=namespace,
task_token=task_session.token.encode("ascii") if task_session else None,
base_spec=base_spec,
)
print('kubectl output:\n{}\n{}'.format(error, output))
@ -657,17 +704,9 @@ class K8sIntegration(Worker):
if self.ignore_kubectl_errors_re and self.ignore_kubectl_errors_re.match(error):
print(f"Ignoring error due to {ENV_KUBECTL_IGNORE_ERROR.key}")
else:
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
# Make sure to remove the task from our k8s pending queue
self._session.api_client.queues.remove_task(
task=task_id,
queue=self.k8s_pending_queue_id,
self._set_task_failed_while_applying(
session, task_id, f"Running kubectl encountered an error: {error}"
)
# Set task as failed
session.api_client.tasks.failed(task_id, force=True)
return
if pod_name:
@ -680,6 +719,19 @@ class K8sIntegration(Worker):
pod_number=pod_number, pod_count=pod_count, task_data=task_data
)
def _set_task_failed_while_applying(self, session, task_id: str, error: str):
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
# Make sure to remove the task from our k8s pending queue
self._session.api_client.queues.remove_task(
task=task_id,
queue=self.k8s_pending_queue_id,
)
# Set task as failed
session.api_client.tasks.failed(task_id, force=True)
def set_task_info(
self, task_id: str, task_session, task_data, queue_name: str, ports_mode: bool, pod_number, pod_count
):
@ -860,6 +912,7 @@ class K8sIntegration(Worker):
def _kubectl_apply(
self,
pod_name,
clearml_conf_create_script: List[str],
docker_image,
docker_args,
@ -871,6 +924,7 @@ class K8sIntegration(Worker):
template,
pod_number=None,
task_token=None,
base_spec: dict = None, # base values for the spec (might be overridden)
):
if "apiVersion" not in template:
template["apiVersion"] = "batch/v1" if self.using_jobs else "v1"
@ -885,8 +939,7 @@ class K8sIntegration(Worker):
template["kind"] = self.kind.capitalize()
metadata = template.setdefault('metadata', {})
name = self.pod_name_prefix + str(task_id)
metadata['name'] = name
metadata['name'] = pod_name
def place_labels(metadata_dict):
labels_dict = dict(pair.split('=', 1) for pair in labels)
@ -906,13 +959,16 @@ class K8sIntegration(Worker):
spec = spec_template.setdefault('spec', {})
if base_spec:
merge_dicts(spec, base_spec)
containers = spec.setdefault('containers', [])
spec.setdefault('restartPolicy', 'Never')
task_worker_id = self.get_task_worker_id(template, task_id, name, namespace, queue)
task_worker_id = self.get_task_worker_id(template, task_id, pod_name, namespace, queue)
container = self._create_template_container(
pod_name=name,
pod_name=pod_name,
task_id=task_id,
docker_image=docker_image,
docker_args=docker_args,
@ -958,7 +1014,7 @@ class K8sIntegration(Worker):
finally:
safe_remove_file(yaml_file)
return stringify_bash_output(output), stringify_bash_output(error), name
return stringify_bash_output(output), stringify_bash_output(error), pod_name
def _process_bash_lines_response(self, bash_cmd: str, raise_error=True):
res = get_bash_output(bash_cmd, raise_error=raise_error)
@ -1043,7 +1099,7 @@ class K8sIntegration(Worker):
deleted_pods = defaultdict(list)
for namespace in namespaces:
if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec:
# Do not try to cleanup the same namespace too quickly
# Do not try to clean up the same namespace too quickly
continue
try:
@ -1119,6 +1175,9 @@ class K8sIntegration(Worker):
def check_if_suspended(self) -> bool:
pass
def check_if_schedulable(self, queue: str) -> bool:
return True
def _ensure_pending_queue_exists(self):
resolved_ids = self._resolve_queue_names(
[self.k8s_pending_queue_name],
@ -1179,6 +1238,9 @@ class K8sIntegration(Worker):
sleep(self._polling_interval)
break
if not self.check_if_schedulable(queue):
continue
# get next task in queue
try:
# print(f"debug> getting tasks for queue {queue}")

View File

@ -1,11 +1,9 @@
from time import sleep
from typing import Dict, Tuple, Optional, List
from typing import Dict, List
from clearml_agent.backend_api.session import Request
from clearml_agent.glue.utilities import get_bash_output
from clearml_agent.helper.process import stringify_bash_output
from .daemon import K8sDaemon
from .utilities import get_path
from .errors import GetPodsError
@ -38,7 +36,11 @@ class PendingPodsDaemon(K8sDaemon):
return get_path(pod, "metadata", "name")
def _get_task_id(self, pod: dict):
return self._get_k8s_resource_name(pod).rpartition('-')[-1]
prefix, _, value = self._get_k8s_resource_name(pod).rpartition('-')
if len(value) > 4:
return value
# we assume this is a multi-node rank x (>0) pod
return prefix.rpartition('-')[-1] or value
@staticmethod
def _get_k8s_resource_namespace(pod: dict):
@ -239,6 +241,11 @@ class PendingPodsDaemon(K8sDaemon):
result_msg = get_path(result.json(), 'meta', 'result_msg')
raise Exception(result_msg or result.text)
self._agent.send_logs(
task_id, ["Kubernetes Pod status: {}".format(msg)],
session=self._session
)
# update last msg for this task
self._last_tasks_msgs[task_id] = msg
except Exception as ex: