Add k8s glue update task status_message in hanging pods daemon

Fix k8s glue not throwing error when failing to push to queue
This commit is contained in:
allegroai 2021-08-02 22:59:31 +03:00
parent 5ed47d2d2c
commit cd046927f3
4 changed files with 161 additions and 20 deletions

View File

@ -2,14 +2,15 @@ import json as json_lib
import sys import sys
import types import types
from socket import gethostname from socket import gethostname
from six.moves.urllib.parse import urlparse, urlunparse
import jwt import jwt
import requests import requests
import six import six
from pyhocon import ConfigTree from pyhocon import ConfigTree, ConfigFactory
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
from six.moves.urllib.parse import urlparse, urlunparse
from clearml_agent.definitions import ENV_DISABLE_VAULT_SUPPORT
from .callresult import CallResult from .callresult import CallResult
from .defs import ENV_VERBOSE, ENV_HOST, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_WEB_HOST, ENV_FILES_HOST, ENV_AUTH_TOKEN, \ from .defs import ENV_VERBOSE, ENV_HOST, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_WEB_HOST, ENV_FILES_HOST, ENV_AUTH_TOKEN, \
ENV_NO_DEFAULT_SERVER ENV_NO_DEFAULT_SERVER
@ -186,6 +187,40 @@ class Session(TokenManager):
# notice: this is across the board warning omission # notice: this is across the board warning omission
urllib_log_warning_setup(total_retries=http_retries_config.get('total', 0), display_warning_after=3) urllib_log_warning_setup(total_retries=http_retries_config.get('total', 0), display_warning_after=3)
self._load_vaults()
def _load_vaults(self):
if not self.check_min_api_version("2.15") or self.feature_set == "basic":
return
if ENV_DISABLE_VAULT_SUPPORT.get():
print("Vault support is disabled")
return
def parse(vault):
# noinspection PyBroadException
try:
d = vault.get('data', None)
if d:
r = ConfigFactory.parse_string(d)
if isinstance(r, (ConfigTree, dict)):
return r
except Exception as e:
print("Failed parsing vault {}: {}".format(vault.get("description", "<unknown>"), e))
# noinspection PyBroadException
try:
res = self.send_request("users", "get_vaults", json={"enabled": True, "types": ["config"]})
if res.ok:
vaults = res.json().get("data", {}).get("vaults", [])
data = list(filter(None, map(parse, vaults)))
if data:
self.config.set_overrides(*data)
elif res.status_code != 404:
raise Exception(res.json().get("meta", {}).get("result_msg", res.text))
except Exception as ex:
print("Failed getting vaults: {}".format(ex))
def _send_request( def _send_request(
self, self,
service, service,

View File

@ -4,15 +4,13 @@ import functools
import json import json
import os import os
import sys import sys
import warnings
from fnmatch import fnmatch
from os.path import expanduser from os.path import expanduser
from typing import Any from typing import Any
import pyhocon import pyhocon
import six import six
from pathlib2 import Path from pathlib2 import Path
from pyhocon import ConfigTree from pyhocon import ConfigTree, ConfigFactory
from pyparsing import ( from pyparsing import (
ParseFatalException, ParseFatalException,
ParseException, ParseException,
@ -71,6 +69,10 @@ class Config(object):
# used in place of None in Config.get as default value because None is a valid value # used in place of None in Config.get as default value because None is a valid value
_MISSING = object() _MISSING = object()
extra_config_values_env_key_sep = "__"
extra_config_values_env_key_prefix = [
"CLEARML_AGENT" + extra_config_values_env_key_sep,
]
def __init__( def __init__(
self, self,
@ -80,7 +82,7 @@ class Config(object):
relative_to=None, relative_to=None,
app=None, app=None,
is_server=False, is_server=False,
**_ **_,
): ):
self._app = app self._app = app
self._verbose = verbose self._verbose = verbose
@ -90,6 +92,7 @@ class Config(object):
self._env = env or os.environ.get("TRAINS_ENV", Environment.default) self._env = env or os.environ.get("TRAINS_ENV", Environment.default)
self.config_paths = set() self.config_paths = set()
self.is_server = is_server self.is_server = is_server
self._overrides_configs = None
if self._verbose: if self._verbose:
print("Config env:%s" % str(self._env)) print("Config env:%s" % str(self._env))
@ -100,6 +103,7 @@ class Config(object):
) )
if self._env not in get_options(Environment): if self._env not in get_options(Environment):
raise ValueError("Invalid environment %s" % env) raise ValueError("Invalid environment %s" % env)
if relative_to is not None: if relative_to is not None:
self.load_relative_to(relative_to) self.load_relative_to(relative_to)
@ -158,7 +162,9 @@ class Config(object):
if LOCAL_CONFIG_PATHS: if LOCAL_CONFIG_PATHS:
config = functools.reduce( config = functools.reduce(
lambda cfg, path: ConfigTree.merge_configs( lambda cfg, path: ConfigTree.merge_configs(
cfg, self._read_recursive(path, verbose=self._verbose), copy_trees=True cfg,
self._read_recursive(path, verbose=self._verbose),
copy_trees=True,
), ),
LOCAL_CONFIG_PATHS, LOCAL_CONFIG_PATHS,
config, config,
@ -181,9 +187,38 @@ class Config(object):
config, config,
) )
config = ConfigTree.merge_configs(
config, self._read_extra_env_config_values(), copy_trees=True
)
if self._overrides_configs:
config = functools.reduce(
lambda cfg, override: ConfigTree.merge_configs(cfg, override, copy_trees=True),
self._overrides_configs,
config,
)
config["env"] = env config["env"] = env
return config return config
def _read_extra_env_config_values(self) -> ConfigTree:
""" Loads extra configuration from environment-injected values """
result = ConfigTree()
for prefix in self.extra_config_values_env_key_prefix:
keys = sorted(k for k in os.environ if k.startswith(prefix))
for key in keys:
path = (
key[len(prefix) :]
.replace(self.extra_config_values_env_key_sep, ".")
.lower()
)
result = ConfigTree.merge_configs(
result, ConfigFactory.parse_string(f"{path}: {os.environ[key]}")
)
return result
def replace(self, config): def replace(self, config):
self._config = config self._config = config
@ -340,3 +375,10 @@ class Config(object):
except Exception as ex: except Exception as ex:
print("Failed loading %s: %s" % (file_path, ex)) print("Failed loading %s: %s" % (file_path, ex))
raise raise
def set_overrides(self, *dicts):
""" Set several override dictionaries or ConfigTree objects which should be merged onto the configuration """
self._overrides_configs = [
d if isinstance(d, ConfigTree) else pyhocon.ConfigFactory.from_dict(d) for d in dicts
]
self.reload()

View File

@ -146,6 +146,7 @@ ENV_DOCKER_HOST_MOUNT = EnvironmentConfig('CLEARML_AGENT_K8S_HOST_MOUNT', 'CLEAR
'TRAINS_AGENT_K8S_HOST_MOUNT', 'TRAINS_AGENT_DOCKER_HOST_MOUNT') 'TRAINS_AGENT_K8S_HOST_MOUNT', 'TRAINS_AGENT_DOCKER_HOST_MOUNT')
ENV_VENV_CACHE_PATH = EnvironmentConfig('CLEARML_AGENT_VENV_CACHE_PATH') ENV_VENV_CACHE_PATH = EnvironmentConfig('CLEARML_AGENT_VENV_CACHE_PATH')
ENV_EXTRA_DOCKER_ARGS = EnvironmentConfig('CLEARML_AGENT_EXTRA_DOCKER_ARGS', type=list) ENV_EXTRA_DOCKER_ARGS = EnvironmentConfig('CLEARML_AGENT_EXTRA_DOCKER_ARGS', type=list)
ENV_DISABLE_VAULT_SUPPORT = EnvironmentConfig('CLEARML_AGENT_DISABLE_VAULT_SUPPORT', type=bool)
class FileBuffering(IntEnum): class FileBuffering(IntEnum):

View File

@ -194,7 +194,18 @@ class K8sIntegration(Worker):
_check_pod_thread.daemon = True _check_pod_thread.daemon = True
_check_pod_thread.start() _check_pod_thread.start()
@staticmethod
def _get_path(d, *path, default=None):
try:
return functools.reduce(
lambda a, b: a[b], path, d
)
except (IndexError, KeyError):
return default
def _monitor_hanging_pods_daemon(self): def _monitor_hanging_pods_daemon(self):
last_tasks_msgs = {} # last msg updated for every task
while True: while True:
output = get_bash_output('kubectl get pods -n {namespace} -o=JSON'.format( output = get_bash_output('kubectl get pods -n {namespace} -o=JSON'.format(
namespace=self.namespace namespace=self.namespace
@ -207,23 +218,44 @@ class K8sIntegration(Worker):
sleep(self._polling_interval) sleep(self._polling_interval)
continue continue
pods = output_config.get('items', []) pods = output_config.get('items', [])
task_ids = set()
for pod in pods: for pod in pods:
try: if self._get_path(pod, 'status', 'phase') != "Pending":
reason = functools.reduce(
lambda a, b: a[b], ('status', 'containerStatuses', 0, 'state', 'waiting', 'reason'), pod
)
except (IndexError, KeyError):
continue continue
if reason == 'ImagePullBackOff':
pod_name = pod.get('metadata', {}).get('name', None) pod_name = pod.get('metadata', {}).get('name', None)
if pod_name: if not pod_name:
task_id = pod_name.rpartition('-')[-1] continue
task_id = pod_name.rpartition('-')[-1]
if not task_id:
continue
task_ids.add(task_id)
msg = None
waiting = self._get_path(pod, 'status', 'containerStatuses', 0, 'state', 'waiting')
if not waiting:
condition = self._get_path(pod, 'status', 'conditions', 0)
if condition:
reason = condition.get('reason')
if reason == 'Unschedulable':
message = condition.get('message')
msg = reason + (" ({})".format(message) if message else "")
else:
reason = waiting.get("reason", None)
message = waiting.get("message", None)
msg = reason + (" ({})".format(message) if message else "")
if reason == 'ImagePullBackOff':
delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, self.namespace) delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, self.namespace)
get_bash_output(delete_pod_cmd) get_bash_output(delete_pod_cmd)
try: try:
self._session.api_client.tasks.failed( self._session.api_client.tasks.failed(
task=task_id, task=task_id,
status_reason="K8S glue error due to ImagePullBackOff", status_reason="K8S glue error: {}".format(msg),
status_message="Changed by K8S glue", status_message="Changed by K8S glue",
force=True force=True
) )
@ -231,6 +263,35 @@ class K8sIntegration(Worker):
self.log.warning( self.log.warning(
'K8S Glue pods monitor: Failed deleting task "{}"\nEX: {}'.format(task_id, ex) 'K8S Glue pods monitor: Failed deleting task "{}"\nEX: {}'.format(task_id, ex)
) )
# clean up any msg for this task
last_tasks_msgs.pop(task_id, None)
continue
if msg and last_tasks_msgs.get(task_id, None) != msg:
try:
result = self._session.send_request(
service='tasks',
action='update',
json={"task": task_id, "status_message": "K8S glue status: {}".format(msg)},
method='get',
async_enable=False,
)
if not result.ok:
result_msg = self._get_path(result.json(), 'meta', 'result_msg')
raise Exception(result_msg or result.text)
# update last msg for this task
last_tasks_msgs[task_id] = msg
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: Failed setting status message for task "{}"\nEX: {}'.format(
task_id, ex
)
)
# clean up any last message for a task that wasn't seen as a pod
last_tasks_msgs = {k: v for k, v in last_tasks_msgs.items() if k in task_ids}
sleep(self._polling_interval) sleep(self._polling_interval)
def _set_task_user_properties(self, task_id: str, **properties: str): def _set_task_user_properties(self, task_id: str, **properties: str):
@ -308,12 +369,14 @@ class K8sIntegration(Worker):
# push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler # push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler
try: try:
print('Pushing task {} into temporary pending queue'.format(task_id)) print('Pushing task {} into temporary pending queue'.format(task_id))
self._session.api_client.tasks.stop(task_id, force=True) res = self._session.api_client.tasks.stop(task_id, force=True)
self._session.api_client.tasks.enqueue( res = self._session.api_client.tasks.enqueue(
task_id, task_id,
queue=self.k8s_pending_queue_name, queue=self.k8s_pending_queue_name,
status_reason='k8s pending scheduler' status_reason='k8s pending scheduler',
) )
if res.meta.result_code != 200:
raise Exception(res.meta.result_msg)
except Exception as e: except Exception as e:
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue [{}], error: {}".format( self.log.error("ERROR: Could not push back task [{}] to k8s pending queue [{}], error: {}".format(
task_id, self.k8s_pending_queue_name, e)) task_id, self.k8s_pending_queue_name, e))