Filter by phase when detecting hanging pods

More debug print-outs
Use task session when possible
Push task into k8s scheduler queue only if running from the same tenant
Make sure we pass git_user/pass to the task pod
Fix cleanup command not issued when no pods exist in a multi-queue setup
This commit is contained in:
allegroai 2022-12-05 11:29:59 +02:00
parent 53d379205f
commit 3dd5973734

View File

@ -9,26 +9,27 @@ import os
import re
import subprocess
import tempfile
from collections import defaultdict
from copy import deepcopy
from pathlib import Path
from pprint import pformat
from threading import Thread
from time import sleep
from typing import Text, List, Callable, Any, Collection, Optional, Union
from time import sleep, time
from typing import Text, List, Callable, Any, Collection, Optional, Union, Iterable, Dict, Tuple, Set
import yaml
from clearml_agent.backend_api.session import Request
from clearml_agent.commands.events import Events
from clearml_agent.commands.worker import Worker, get_task_container, set_task_container, get_next_task
from clearml_agent.definitions import ENV_DOCKER_IMAGE
from clearml_agent.definitions import ENV_DOCKER_IMAGE, ENV_AGENT_GIT_USER, ENV_AGENT_GIT_PASS
from clearml_agent.errors import APIError
from clearml_agent.glue.definitions import ENV_START_AGENT_SCRIPT_PATH
from clearml_agent.helper.base import safe_remove_file
from clearml_agent.helper.dicts import merge_dicts
from clearml_agent.helper.process import get_bash_output
from clearml_agent.helper.resource_monitor import ResourceMonitor
from clearml_agent.interface.base import ObjectID
from clearml_agent.backend_api.session import Request
from clearml_agent.glue.definitions import ENV_START_AGENT_SCRIPT_PATH
class K8sIntegration(Worker):
@ -46,7 +47,7 @@ class K8sIntegration(Worker):
"--namespace={namespace}"
KUBECTL_DELETE_CMD = "kubectl delete pods " \
"--selector={selector} " \
"-l={agent_label} " \
"--field-selector=status.phase!=Pending,status.phase!=Running " \
"--namespace={namespace}"
@ -65,6 +66,9 @@ class K8sIntegration(Worker):
'echo "ldconfig" >> /etc/profile',
"/usr/sbin/sshd -p {port}"]
DEFAULT_EXECUTION_AGENT_ARGS = os.getenv("K8S_GLUE_DEF_EXEC_AGENT_ARGS", "--full-monitoring --require-queue")
POD_AGENT_INSTALL_ARGS = os.getenv("K8S_GLUE_POD_AGENT_INSTALL_ARGS", "")
CONTAINER_BASH_SCRIPT = [
"export DEBIAN_FRONTEND='noninteractive'",
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean",
@ -77,9 +81,9 @@ class K8sIntegration(Worker):
"[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip",
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
"{extra_bash_init_cmd}",
"$LOCAL_PYTHON -m pip install clearml-agent",
"$LOCAL_PYTHON -m pip install clearml-agent{agent_install_args}",
"{extra_docker_bash_script}",
"$LOCAL_PYTHON -m clearml_agent execute --full-monitoring --require-queue --id {task_id}"
"$LOCAL_PYTHON -m clearml_agent execute {default_execution_agent_args} --id {task_id}"
]
_edit_hyperparams_version = "2.9"
@ -190,6 +194,9 @@ class K8sIntegration(Worker):
self._monitor_hanging_pods()
self._min_cleanup_interval_per_ns_sec = 1.0
self._last_pod_cleanup_per_ns = defaultdict(lambda: 0.)
def _monitor_hanging_pods(self):
_check_pod_thread = Thread(target=self._monitor_hanging_pods_daemon)
_check_pod_thread.daemon = True
@ -209,16 +216,20 @@ class K8sIntegration(Worker):
except (IndexError, KeyError):
return default
def _get_kubectl_options(self, command, extra_labels=None):
def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json"):
# type: (str, Iterable[str], Iterable[str], str) -> Dict
labels = [self._get_agent_label()] + (list(extra_labels) if extra_labels else [])
return {
d = {
"-l": ",".join(labels),
"-n": str(self.namespace),
"-o": "json"
"-o": output,
}
if filters:
d["--field-selector"] = ",".join(filters)
return d
def get_kubectl_command(self, command, extra_labels=None):
opts = self._get_kubectl_options(command, extra_labels)
def get_kubectl_command(self, command, output="json", **args):
opts = self._get_kubectl_options(command, output=output, **args)
return 'kubectl {command} {opts}'.format(
command=command, opts=" ".join(x for item in opts.items() for x in item)
)
@ -227,7 +238,7 @@ class K8sIntegration(Worker):
last_tasks_msgs = {} # last msg updated for every task
while True:
kubectl_cmd = self.get_kubectl_command("get pods")
kubectl_cmd = self.get_kubectl_command("get pods", filters=["status.phase=Pending"])
self.log.debug("Detecting hanging pods: {}".format(kubectl_cmd))
output = get_bash_output(kubectl_cmd)
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
@ -240,9 +251,6 @@ class K8sIntegration(Worker):
pods = output_config.get('items', [])
task_ids = set()
for pod in pods:
if self._get_path(pod, 'status', 'phase') != "Pending":
continue
pod_name = pod.get('metadata', {}).get('name', None)
if not pod_name:
continue
@ -275,8 +283,10 @@ class K8sIntegration(Worker):
if reason == 'ImagePullBackOff':
delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, namespace)
self.log.debug(" - deleting pod due to ImagePullBackOff: {}".format(delete_pod_cmd))
get_bash_output(delete_pod_cmd)
try:
self.log.debug(" - Detecting hanging pods: {}".format(kubectl_cmd))
self._session.api_client.tasks.failed(
task=task_id,
status_reason="K8S glue error: {}".format(msg),
@ -308,8 +318,8 @@ class K8sIntegration(Worker):
last_tasks_msgs[task_id] = msg
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: Failed setting status message for task "{}"\nEX: {}'.format(
task_id, ex
'K8S Glue pods monitor: Failed setting status message for task "{}"\nMSG: {}\nEX: {}'.format(
task_id, msg, ex
)
)
@ -318,7 +328,8 @@ class K8sIntegration(Worker):
sleep(self._polling_interval)
def _set_task_user_properties(self, task_id: str, **properties: str):
def _set_task_user_properties(self, task_id: str, task_session=None, **properties: str):
session = task_session or self._session
if self._edit_hyperparams_support is not True:
# either not supported or never tested
if self._edit_hyperparams_support == self._session.api_version:
@ -329,7 +340,7 @@ class K8sIntegration(Worker):
self._edit_hyperparams_support = self._session.api_version
return
try:
self._session.get(
session.get(
service="tasks",
action="edit_hyper_params",
task=task_id,
@ -361,67 +372,94 @@ class K8sIntegration(Worker):
return self._agent_label
def _get_used_pods(self):
# type: () -> Tuple[int, Set[str]]
# noinspection PyBroadException
try:
kubectl_cmd_new = self.get_kubectl_command("get pods")
self.log.debug("Getting used pods: {}".format(kubectl_cmd_new))
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
kubectl_cmd = self.get_kubectl_command(
"get pods",
output="jsonpath=\"{range .items[*]}{.metadata.name}{' '}{.metadata.namespace}{'\\n'}{end}\""
)
self.log.debug("Getting used pods: {}".format(kubectl_cmd))
output = get_bash_output(kubectl_cmd, raise_error=True)
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
error = '' if not error else error if isinstance(error, str) else error.decode('utf-8')
if not output:
# No such pod exist so we can use the pod_number we found
return 0, {}
return 0, set([])
try:
items = json.loads(output).get("items", [])
items = output.splitlines()
current_pod_count = len(items)
namespaces = {item["metadata"]["namespace"] for item in items}
namespaces = {item.rpartition(" ")[-1] for item in items}
self.log.debug(" - found {} pods in namespaces {}".format(current_pod_count, ", ".join(namespaces)))
except (KeyError, ValueError, TypeError, AttributeError) as ex:
print("Failed parsing used pods command response for cleanup: {}".format(ex))
return -1, {}
return -1, set([])
return current_pod_count, namespaces
except Exception as ex:
print('Failed obtaining used pods information: {}'.format(ex))
return -2, {}
return -2, set([])
def _is_same_tenant(self, task_session):
if not task_session or task_session is self._session:
return True
# noinspection PyStatementEffect
try:
tenant = self._session.get_decoded_token(self._session.token, verify=False)["tenant"]
task_tenant = task_session.get_decoded_token(task_session.token, verify=False)["tenant"]
return tenant == task_tenant
except Exception as ex:
print("ERROR: Failed getting tenant for task session: {}".format(ex))
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
session = task_session or self._session
task_data = session.api_client.tasks.get_all(id=[task_id])[0]
# push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler
try:
print('Pushing task {} into temporary pending queue'.format(task_id))
_ = self._session.api_client.tasks.stop(task_id, force=True)
res = self._session.api_client.tasks.enqueue(
task_id,
queue=self.k8s_pending_queue_id,
status_reason='k8s pending scheduler',
)
if res.meta.result_code != 200:
raise Exception(res.meta.result_msg)
except Exception as e:
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue {} [{}], error: {}".format(
task_id, self.k8s_pending_queue_name, self.k8s_pending_queue_id, e))
return
if self._is_same_tenant(task_session):
try:
print('Pushing task {} into temporary pending queue'.format(task_id))
_ = session.api_client.tasks.stop(task_id, force=True)
container = get_task_container(self._session, task_id)
res = self._session.api_client.tasks.enqueue(
task_id,
queue=self.k8s_pending_queue_id,
status_reason='k8s pending scheduler',
)
if res.meta.result_code != 200:
raise Exception(res.meta.result_msg)
except Exception as e:
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue {} [{}], error: {}".format(
task_id, self.k8s_pending_queue_name, self.k8s_pending_queue_id, e))
return
container = get_task_container(session, task_id)
if not container.get('image'):
container['image'] = str(
ENV_DOCKER_IMAGE.get() or self._session.config.get("agent.default_docker.image", "nvidia/cuda")
ENV_DOCKER_IMAGE.get() or session.config.get("agent.default_docker.image", "nvidia/cuda")
)
container['arguments'] = self._session.config.get("agent.default_docker.arguments", None)
container['arguments'] = session.config.get("agent.default_docker.arguments", None)
set_task_container(
self._session, task_id, docker_image=container['image'], docker_arguments=container['arguments']
session, task_id, docker_image=container['image'], docker_arguments=container['arguments']
)
# get the clearml.conf encoded file, make sure we use system packages!
git_user = ENV_AGENT_GIT_USER.get() or self._session.config.get("agent.git_user", None)
git_pass = ENV_AGENT_GIT_PASS.get() or self._session.config.get("agent.git_pass", None)
extra_config_values = [
'agent.package_manager.system_site_packages: true',
'agent.git_user: "{}"'.format(git_user) if git_user else '',
'agent.git_pass: "{}"'.format(git_pass) if git_pass else '',
]
# noinspection PyProtectedMember
config_content = (
self.conf_file_content or Path(self._session._config_file).read_text() or ""
) + '\nagent.package_manager.system_site_packages=true\n'
self.conf_file_content or Path(session._config_file).read_text() or ""
) + '\n{}\n'.format('\n'.join(x for x in extra_config_values if x))
hocon_config_encoded = config_content.encode("ascii")
create_clearml_conf = ["echo '{}' | base64 --decode >> ~/clearml.conf".format(
@ -471,8 +509,12 @@ class K8sIntegration(Worker):
output, task_id, queue, ex
)
)
self._session.api_client.tasks.stop(task_id, force=True)
self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error')
session.api_client.tasks.stop(task_id, force=True)
# noinspection PyBroadException
try:
self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error')
except:
self.log.warning("Failed enqueuing task to queue '{}'".format(queue))
return
if not items_count:
@ -496,9 +538,14 @@ class K8sIntegration(Worker):
task_id, queue
)
)
self._session.api_client.tasks.stop(task_id, force=True)
self._session.api_client.tasks.enqueue(
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)')
session.api_client.tasks.stop(task_id, force=True)
# noinspection PyBroadException
try:
self._session.api_client.tasks.enqueue(
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)'
)
except:
self.log.warning("Failed enqueuing task to queue '{}'".format(queue))
return
elif self.max_pods_limit:
# max pods limit hasn't reached yet, so we can create the pod
@ -521,7 +568,7 @@ class K8sIntegration(Worker):
docker_args=container['arguments'],
docker_bash=container.get('setup_shell_script'),
task_id=task_id,
queue=queue
queue=queue,
)
try:
@ -564,6 +611,7 @@ class K8sIntegration(Worker):
if user_props:
self._set_task_user_properties(
task_id=task_id,
task_session=task_session,
**user_props
)
@ -641,7 +689,9 @@ class K8sIntegration(Worker):
['#!/bin/bash', ] +
[line.format(extra_bash_init_cmd=self.extra_bash_init_script or '',
task_id=task_id,
extra_docker_bash_script=extra_docker_bash_script)
extra_docker_bash_script=extra_docker_bash_script,
default_execution_agent_args=self.DEFAULT_EXECUTION_AGENT_ARGS,
agent_install_args=self.POD_AGENT_INSTALL_ARGS)
for line in container_bash_script])
extra_bash_commands = list(create_clearml_conf or [])
@ -735,7 +785,17 @@ class K8sIntegration(Worker):
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
else self.container_bash_script
container_bash_script = ' ; '.join(container_bash_script)
container_bash_script = [
line.format(
extra_bash_init_cmd=self.extra_bash_init_script or "",
extra_docker_bash_script=docker_bash or "",
task_id=task_id,
default_execution_agent_args=self.DEFAULT_EXECUTION_AGENT_ARGS,
agent_install_args=self.POD_AGENT_INSTALL_ARGS
)
for line in container_bash_script
]
kubectl_cmd += [
"--labels=" + ",".join(labels),
@ -743,16 +803,30 @@ class K8sIntegration(Worker):
"--",
"/bin/sh",
"-c",
"{} ; {}".format(" ; ".join(create_clearml_conf or []), container_bash_script.format(
extra_bash_init_cmd=self.extra_bash_init_script or "",
extra_docker_bash_script=docker_bash or "",
task_id=task_id
)),
"{} ; {}".format(
" ; ".join(create_clearml_conf or []),
' ; '.join(line for line in container_bash_script if line.strip())
),
]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
return output, error
def _cleanup_old_pods(self, namespaces, extra_msg=None):
# type: (Iterable[str], Optional[str]) -> None
self.log.debug("Cleaning up pods")
for namespace in namespaces:
if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec:
# Do not try to cleanup the same namespace too quickly
continue
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, agent_label=self._get_agent_label())
self.log.debug("Deleting old/failed pods{}: {}".format(
extra_msg or "",
kubectl_cmd
))
get_bash_output(kubectl_cmd)
self._last_pod_cleanup_per_ns[namespace] = time()
def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):
"""
:summary: Pull and run tasks from queues.
@ -780,16 +854,16 @@ class K8sIntegration(Worker):
# Get used pods and namespaces
current_pods, namespaces = self._get_used_pods()
# just in case there are no pods, make sure we look at our base namespace
namespaces.add(self.namespace)
# check if have pod limit, then check if we hit it.
if self.max_pods_limit:
if current_pods >= self.max_pods_limit:
print("Maximum pod limit reached {}/{}, sleeping for {:.1f} seconds".format(
current_pods, self.max_pods_limit, self._polling_interval))
# delete old completed / failed pods
for namespace in namespaces:
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, selector=self._get_agent_label())
self.log.debug("Deleting old/failed pods due to pod limit: {}".format(kubectl_cmd))
get_bash_output(kubectl_cmd)
self._cleanup_old_pods(namespaces, " due to pod limit")
# go to sleep
sleep(self._polling_interval)
continue
@ -797,10 +871,7 @@ class K8sIntegration(Worker):
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed / failed pods
for namespace in namespaces:
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, selector=self._get_agent_label())
self.log.debug("Deleting old/failed pods: {}".format(kubectl_cmd))
get_bash_output(kubectl_cmd)
self._cleanup_old_pods(namespaces)
# get next task in queue
try: