diff --git a/clearml_agent/backend_api/session/session.py b/clearml_agent/backend_api/session/session.py index 757b9b6..97b4e4e 100644 --- a/clearml_agent/backend_api/session/session.py +++ b/clearml_agent/backend_api/session/session.py @@ -2,14 +2,15 @@ import json as json_lib import sys import types from socket import gethostname -from six.moves.urllib.parse import urlparse, urlunparse import jwt import requests import six -from pyhocon import ConfigTree +from pyhocon import ConfigTree, ConfigFactory from requests.auth import HTTPBasicAuth +from six.moves.urllib.parse import urlparse, urlunparse +from clearml_agent.definitions import ENV_DISABLE_VAULT_SUPPORT from .callresult import CallResult from .defs import ENV_VERBOSE, ENV_HOST, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_WEB_HOST, ENV_FILES_HOST, ENV_AUTH_TOKEN, \ ENV_NO_DEFAULT_SERVER @@ -186,6 +187,40 @@ class Session(TokenManager): # notice: this is across the board warning omission urllib_log_warning_setup(total_retries=http_retries_config.get('total', 0), display_warning_after=3) + self._load_vaults() + + def _load_vaults(self): + if not self.check_min_api_version("2.15") or self.feature_set == "basic": + return + + if ENV_DISABLE_VAULT_SUPPORT.get(): + print("Vault support is disabled") + return + + def parse(vault): + # noinspection PyBroadException + try: + d = vault.get('data', None) + if d: + r = ConfigFactory.parse_string(d) + if isinstance(r, (ConfigTree, dict)): + return r + except Exception as e: + print("Failed parsing vault {}: {}".format(vault.get("description", ""), e)) + + # noinspection PyBroadException + try: + res = self.send_request("users", "get_vaults", json={"enabled": True, "types": ["config"]}) + if res.ok: + vaults = res.json().get("data", {}).get("vaults", []) + data = list(filter(None, map(parse, vaults))) + if data: + self.config.set_overrides(*data) + elif res.status_code != 404: + raise Exception(res.json().get("meta", {}).get("result_msg", res.text)) + except Exception as ex: + print("Failed getting vaults: {}".format(ex)) + def _send_request( self, service, diff --git a/clearml_agent/backend_config/config.py b/clearml_agent/backend_config/config.py index 73c202e..cc3fbe3 100644 --- a/clearml_agent/backend_config/config.py +++ b/clearml_agent/backend_config/config.py @@ -4,15 +4,13 @@ import functools import json import os import sys -import warnings -from fnmatch import fnmatch from os.path import expanduser from typing import Any import pyhocon import six from pathlib2 import Path -from pyhocon import ConfigTree +from pyhocon import ConfigTree, ConfigFactory from pyparsing import ( ParseFatalException, ParseException, @@ -71,6 +69,10 @@ class Config(object): # used in place of None in Config.get as default value because None is a valid value _MISSING = object() + extra_config_values_env_key_sep = "__" + extra_config_values_env_key_prefix = [ + "CLEARML_AGENT" + extra_config_values_env_key_sep, + ] def __init__( self, @@ -80,7 +82,7 @@ class Config(object): relative_to=None, app=None, is_server=False, - **_ + **_, ): self._app = app self._verbose = verbose @@ -90,6 +92,7 @@ class Config(object): self._env = env or os.environ.get("TRAINS_ENV", Environment.default) self.config_paths = set() self.is_server = is_server + self._overrides_configs = None if self._verbose: print("Config env:%s" % str(self._env)) @@ -100,6 +103,7 @@ class Config(object): ) if self._env not in get_options(Environment): raise ValueError("Invalid environment %s" % env) + if relative_to is not None: self.load_relative_to(relative_to) @@ -158,7 +162,9 @@ class Config(object): if LOCAL_CONFIG_PATHS: config = functools.reduce( lambda cfg, path: ConfigTree.merge_configs( - cfg, self._read_recursive(path, verbose=self._verbose), copy_trees=True + cfg, + self._read_recursive(path, verbose=self._verbose), + copy_trees=True, ), LOCAL_CONFIG_PATHS, config, @@ -181,9 +187,38 @@ class Config(object): config, ) + config = ConfigTree.merge_configs( + config, self._read_extra_env_config_values(), copy_trees=True + ) + + if self._overrides_configs: + config = functools.reduce( + lambda cfg, override: ConfigTree.merge_configs(cfg, override, copy_trees=True), + self._overrides_configs, + config, + ) + config["env"] = env return config + def _read_extra_env_config_values(self) -> ConfigTree: + """ Loads extra configuration from environment-injected values """ + result = ConfigTree() + + for prefix in self.extra_config_values_env_key_prefix: + keys = sorted(k for k in os.environ if k.startswith(prefix)) + for key in keys: + path = ( + key[len(prefix) :] + .replace(self.extra_config_values_env_key_sep, ".") + .lower() + ) + result = ConfigTree.merge_configs( + result, ConfigFactory.parse_string(f"{path}: {os.environ[key]}") + ) + + return result + def replace(self, config): self._config = config @@ -340,3 +375,10 @@ class Config(object): except Exception as ex: print("Failed loading %s: %s" % (file_path, ex)) raise + + def set_overrides(self, *dicts): + """ Set several override dictionaries or ConfigTree objects which should be merged onto the configuration """ + self._overrides_configs = [ + d if isinstance(d, ConfigTree) else pyhocon.ConfigFactory.from_dict(d) for d in dicts + ] + self.reload() diff --git a/clearml_agent/definitions.py b/clearml_agent/definitions.py index b20b357..eceb719 100644 --- a/clearml_agent/definitions.py +++ b/clearml_agent/definitions.py @@ -146,6 +146,7 @@ ENV_DOCKER_HOST_MOUNT = EnvironmentConfig('CLEARML_AGENT_K8S_HOST_MOUNT', 'CLEAR 'TRAINS_AGENT_K8S_HOST_MOUNT', 'TRAINS_AGENT_DOCKER_HOST_MOUNT') ENV_VENV_CACHE_PATH = EnvironmentConfig('CLEARML_AGENT_VENV_CACHE_PATH') ENV_EXTRA_DOCKER_ARGS = EnvironmentConfig('CLEARML_AGENT_EXTRA_DOCKER_ARGS', type=list) +ENV_DISABLE_VAULT_SUPPORT = EnvironmentConfig('CLEARML_AGENT_DISABLE_VAULT_SUPPORT', type=bool) class FileBuffering(IntEnum): diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index 93fa28d..643c80a 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -194,7 +194,18 @@ class K8sIntegration(Worker): _check_pod_thread.daemon = True _check_pod_thread.start() + @staticmethod + def _get_path(d, *path, default=None): + try: + return functools.reduce( + lambda a, b: a[b], path, d + ) + except (IndexError, KeyError): + return default + def _monitor_hanging_pods_daemon(self): + last_tasks_msgs = {} # last msg updated for every task + while True: output = get_bash_output('kubectl get pods -n {namespace} -o=JSON'.format( namespace=self.namespace @@ -207,23 +218,44 @@ class K8sIntegration(Worker): sleep(self._polling_interval) continue pods = output_config.get('items', []) + task_ids = set() for pod in pods: - try: - reason = functools.reduce( - lambda a, b: a[b], ('status', 'containerStatuses', 0, 'state', 'waiting', 'reason'), pod - ) - except (IndexError, KeyError): + if self._get_path(pod, 'status', 'phase') != "Pending": continue - if reason == 'ImagePullBackOff': - pod_name = pod.get('metadata', {}).get('name', None) - if pod_name: - task_id = pod_name.rpartition('-')[-1] + + pod_name = pod.get('metadata', {}).get('name', None) + if not pod_name: + continue + + task_id = pod_name.rpartition('-')[-1] + if not task_id: + continue + + task_ids.add(task_id) + + msg = None + + waiting = self._get_path(pod, 'status', 'containerStatuses', 0, 'state', 'waiting') + if not waiting: + condition = self._get_path(pod, 'status', 'conditions', 0) + if condition: + reason = condition.get('reason') + if reason == 'Unschedulable': + message = condition.get('message') + msg = reason + (" ({})".format(message) if message else "") + else: + reason = waiting.get("reason", None) + message = waiting.get("message", None) + + msg = reason + (" ({})".format(message) if message else "") + + if reason == 'ImagePullBackOff': delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, self.namespace) get_bash_output(delete_pod_cmd) try: self._session.api_client.tasks.failed( task=task_id, - status_reason="K8S glue error due to ImagePullBackOff", + status_reason="K8S glue error: {}".format(msg), status_message="Changed by K8S glue", force=True ) @@ -231,6 +263,35 @@ class K8sIntegration(Worker): self.log.warning( 'K8S Glue pods monitor: Failed deleting task "{}"\nEX: {}'.format(task_id, ex) ) + + # clean up any msg for this task + last_tasks_msgs.pop(task_id, None) + continue + if msg and last_tasks_msgs.get(task_id, None) != msg: + try: + result = self._session.send_request( + service='tasks', + action='update', + json={"task": task_id, "status_message": "K8S glue status: {}".format(msg)}, + method='get', + async_enable=False, + ) + if not result.ok: + result_msg = self._get_path(result.json(), 'meta', 'result_msg') + raise Exception(result_msg or result.text) + + # update last msg for this task + last_tasks_msgs[task_id] = msg + except Exception as ex: + self.log.warning( + 'K8S Glue pods monitor: Failed setting status message for task "{}"\nEX: {}'.format( + task_id, ex + ) + ) + + # clean up any last message for a task that wasn't seen as a pod + last_tasks_msgs = {k: v for k, v in last_tasks_msgs.items() if k in task_ids} + sleep(self._polling_interval) def _set_task_user_properties(self, task_id: str, **properties: str): @@ -308,12 +369,14 @@ class K8sIntegration(Worker): # push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler try: print('Pushing task {} into temporary pending queue'.format(task_id)) - self._session.api_client.tasks.stop(task_id, force=True) - self._session.api_client.tasks.enqueue( + res = self._session.api_client.tasks.stop(task_id, force=True) + res = self._session.api_client.tasks.enqueue( task_id, queue=self.k8s_pending_queue_name, - status_reason='k8s pending scheduler' + status_reason='k8s pending scheduler', ) + if res.meta.result_code != 200: + raise Exception(res.meta.result_msg) except Exception as e: self.log.error("ERROR: Could not push back task [{}] to k8s pending queue [{}], error: {}".format( task_id, self.k8s_pending_queue_name, e))