diff --git a/clearml_agent/backend_api/config/default/agent.conf b/clearml_agent/backend_api/config/default/agent.conf index 0de3472..a2a6ffb 100644 --- a/clearml_agent/backend_api/config/default/agent.conf +++ b/clearml_agent/backend_api/config/default/agent.conf @@ -221,4 +221,44 @@ # Note: resulting name must start with an alphanumeric character and continue with alphanumeric characters, # underscores (_), dots (.) and/or dashes (-) #docker_container_name_format: "clearml-id-{task_id}-{rand_string:.8}" + + # Apply top-level environment section from configuration into os.environ + apply_environment: true + # Top-level environment section is in the form of: + # environment { + # key: value + # ... + # } + # and is applied to the OS environment as `key=value` for each key/value pair + + # Apply top-level files section from configuration into local file system + apply_files: true + # Top-level files section allows auto-generating files at designated paths with a predefined contents + # and target format. Options include: + # contents: the target file's content, typically a string (or any base type int/float/list/dict etc.) + # format: a custom format for the contents. Currently supported value is `base64` to automatically decode a + # base64-encoded contents string, otherwise ignored + # path: the target file's path, may include ~ and inplace env vars + # target_format: format used to encode contents before writing into the target file. Supported values are json, + # yaml, yml and bytes (in which case the file will be written in binary mode). Default is text mode. + # overwrite: overwrite the target file in case it exists. Default is true. + # + # Example: + # files { + # myfile1 { + # contents: "The quick brown fox jumped over the lazy dog" + # path: "/tmp/fox.txt" + # } + # myjsonfile { + # contents: { + # some { + # nested { + # value: [1, 2, 3, 4] + # } + # } + # } + # path: "/tmp/test.json" + # target_format: json + # } + # } } diff --git a/clearml_agent/backend_api/session/defs.py b/clearml_agent/backend_api/session/defs.py index d311660..d49f920 100644 --- a/clearml_agent/backend_api/session/defs.py +++ b/clearml_agent/backend_api/session/defs.py @@ -13,6 +13,8 @@ ENV_HOST_VERIFY_CERT = EnvEntry("CLEARML_API_HOST_VERIFY_CERT", "TRAINS_API_HOST ENV_CONDA_ENV_PACKAGE = EnvEntry("CLEARML_CONDA_ENV_PACKAGE", "TRAINS_CONDA_ENV_PACKAGE") ENV_NO_DEFAULT_SERVER = EnvEntry("CLEARML_NO_DEFAULT_SERVER", "TRAINS_NO_DEFAULT_SERVER", type=bool, default=True) ENV_DISABLE_VAULT_SUPPORT = EnvEntry('CLEARML_AGENT_DISABLE_VAULT_SUPPORT', type=bool) +ENV_ENABLE_ENV_CONFIG_SECTION = EnvEntry('CLEARML_AGENT_ENABLE_ENV_CONFIG_SECTION', type=bool) +ENV_ENABLE_FILES_CONFIG_SECTION = EnvEntry('CLEARML_AGENT_ENABLE_FILES_CONFIG_SECTION', type=bool) ENV_INITIAL_CONNECT_RETRY_OVERRIDE = EnvEntry( 'CLEARML_AGENT_INITIAL_CONNECT_RETRY_OVERRIDE', default=True, converter=safe_text_to_bool ) diff --git a/clearml_agent/backend_api/session/session.py b/clearml_agent/backend_api/session/session.py index cbe9074..71a9ea1 100644 --- a/clearml_agent/backend_api/session/session.py +++ b/clearml_agent/backend_api/session/session.py @@ -183,8 +183,6 @@ class Session(TokenManager): # notice: this is across the board warning omission urllib_log_warning_setup(total_retries=http_retries_config.get('total', 0), display_warning_after=3) - self._load_vaults() - def _setup_session(self, http_retries_config, initial_session=False, default_initial_connect_override=None): # type: (dict, bool, Optional[bool]) -> (dict, requests.Session) http_retries_config = http_retries_config or self.config.get( @@ -210,7 +208,7 @@ class Session(TokenManager): return http_retries_config, get_http_session_with_retry(**http_retries_config) - def _load_vaults(self): + def load_vaults(self): if not self.check_min_api_version("2.15") or self.feature_set == "basic": return diff --git a/clearml_agent/backend_api/session/token_manager.py b/clearml_agent/backend_api/session/token_manager.py index c00722c..16aba32 100644 --- a/clearml_agent/backend_api/session/token_manager.py +++ b/clearml_agent/backend_api/session/token_manager.py @@ -87,10 +87,16 @@ class TokenManager(object): @classmethod def get_decoded_token(cls, token, verify=False): """ Get token expiration time. If not present, assume forever """ + if hasattr(jwt, '__version__') and jwt.__version__[0] == '1': + return jwt.decode( + token, + verify=verify, + algorithms=get_default_algorithms(), + ) + return jwt.decode( token, - verify=verify, - options=dict(verify_signature=False), + options=dict(verify_signature=verify), algorithms=get_default_algorithms(), ) diff --git a/clearml_agent/backend_config/config.py b/clearml_agent/backend_config/config.py index cc3fbe3..88a53f4 100644 --- a/clearml_agent/backend_config/config.py +++ b/clearml_agent/backend_config/config.py @@ -82,7 +82,7 @@ class Config(object): relative_to=None, app=None, is_server=False, - **_, + **_ ): self._app = app self._verbose = verbose @@ -214,7 +214,7 @@ class Config(object): .lower() ) result = ConfigTree.merge_configs( - result, ConfigFactory.parse_string(f"{path}: {os.environ[key]}") + result, ConfigFactory.parse_string("{}: {}".format(path, os.environ[key])) ) return result diff --git a/clearml_agent/backend_config/utils.py b/clearml_agent/backend_config/utils.py index f5a2923..951b553 100644 --- a/clearml_agent/backend_config/utils.py +++ b/clearml_agent/backend_config/utils.py @@ -1,3 +1,14 @@ +import base64 +import os +from os.path import expandvars, expanduser +from pathlib import Path +from typing import List, TYPE_CHECKING + +from pyhocon import HOCONConverter, ConfigTree + +if TYPE_CHECKING: + from .config import Config + def get_items(cls): """ get key/value items from an enum-like class (members represent enumeration key/value) """ @@ -7,3 +18,95 @@ def get_items(cls): def get_options(cls): """ get options from an enum-like class (members represent enumeration key/value) """ return get_items(cls).values() + + +def apply_environment(config): + # type: (Config) -> List[str] + env_vars = config.get("environment", None) + if not env_vars: + return [] + if isinstance(env_vars, (list, tuple)): + env_vars = dict(env_vars) + + keys = list(filter(None, env_vars.keys())) + + for key in keys: + os.environ[str(key)] = str(env_vars[key] or "") + + return keys + + +def apply_files(config): + # type: (Config) -> None + files = config.get("files", None) + if not files: + return + + if isinstance(files, (list, tuple)): + files = dict(files) + + print("Creating files from configuration") + for key, data in files.items(): + path = data.get("path") + fmt = data.get("format", "string") + target_fmt = data.get("target_format", "string") + overwrite = bool(data.get("overwrite", True)) + contents = data.get("contents") + + target = Path(expanduser(expandvars(path))) + + # noinspection PyBroadException + try: + if target.is_dir(): + print("Skipped [{}]: is a directory {}".format(key, target)) + continue + + if not overwrite and target.is_file(): + print("Skipped [{}]: file exists {}".format(key, target)) + continue + except Exception as ex: + print("Skipped [{}]: can't access {} ({})".format(key, target, ex)) + continue + + if contents: + try: + if fmt == "base64": + contents = base64.b64decode(contents) + if target_fmt != "bytes": + contents = contents.decode("utf-8") + except Exception as ex: + print("Skipped [{}]: failed decoding {} ({})".format(key, fmt, ex)) + continue + + # noinspection PyBroadException + try: + target.parent.mkdir(parents=True, exist_ok=True) + except Exception as ex: + print("Skipped [{}]: failed creating path {} ({})".format(key, target.parent, ex)) + continue + + try: + if target_fmt == "bytes": + try: + target.write_bytes(contents) + except TypeError: + # simpler error so the user won't get confused + raise TypeError("a bytes-like object is required") + else: + try: + if target_fmt == "json": + text = HOCONConverter.to_json(contents) + elif target_fmt in ("yaml", "yml"): + text = HOCONConverter.to_yaml(contents) + else: + if isinstance(contents, ConfigTree): + contents = contents.as_plain_ordered_dict() + text = str(contents) + except Exception as ex: + print("Skipped [{}]: failed encoding to {} ({})".format(key, target_fmt, ex)) + continue + target.write_text(text) + print("Saved [{}]: {}".format(key, target)) + except Exception as ex: + print("Skipped [{}]: failed saving file {} ({})".format(key, target, ex)) + continue diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index 07d4daa..7023716 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -37,7 +37,9 @@ from clearml_agent.backend_api.services import queues as queues_api from clearml_agent.backend_api.services import tasks as tasks_api from clearml_agent.backend_api.services import workers as workers_api from clearml_agent.backend_api.session import CallResult +from clearml_agent.backend_api.session.defs import ENV_ENABLE_ENV_CONFIG_SECTION, ENV_ENABLE_FILES_CONFIG_SECTION from clearml_agent.backend_config.defs import UptimeConf +from clearml_agent.backend_config.utils import apply_environment, apply_files from clearml_agent.commands.base import resolve_names, ServiceCommandSection from clearml_agent.definitions import ( ENVIRONMENT_SDK_PARAMS, @@ -60,6 +62,7 @@ from clearml_agent.definitions import ( ENV_SSH_AUTH_SOCK, ENV_AGENT_SKIP_PIP_VENV_INSTALL, ENV_EXTRA_DOCKER_ARGS, + ) from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES from clearml_agent.errors import APIError, CommandFailedError, Sigterm @@ -567,6 +570,7 @@ class Worker(ServiceCommandSection): self._downtime_config = self._session.config.get("agent.downtime", None) self._suppress_cr = self._session.config.get("agent.suppress_carriage_return", True) self._host_ssh_cache = None + self._truncate_task_output_files = bool(self._session.config.get("agent.truncate_task_output_files", False)) # True - supported # None - not initialized @@ -1278,15 +1282,15 @@ class Worker(ServiceCommandSection): if self._services_mode and dynamic_gpus: raise ValueError("Combining --dynamic-gpus and --services-mode is not supported") - # if we do not need to create queues, make sure they are valid - # match previous behaviour when we validated queue names before everything else - queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False)) - # We are not running a daemon we are killing one. # find the pid send termination signal and leave if kwargs.get('stop', False): return 1 if not self._kill_daemon(dynamic_gpus=dynamic_gpus) else 0 + # if we do not need to create queues, make sure they are valid + # match previous behaviour when we validated queue names before everything else + queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False)) + queues_info = [ q.to_dict() for q in self._session.send_api( @@ -1544,10 +1548,16 @@ class Worker(ServiceCommandSection): ): # type: (...) -> Tuple[Optional[int], Optional[TaskStopReason]] def _print_file(file_path, prev_pos=0): - with open(file_path, "rb") as f: + with open(file_path, "ab+") as f: f.seek(prev_pos) binary_text = f.read() - pos = f.tell() + if not self._truncate_task_output_files: + # non-buffered behavior + pos = f.tell() + else: + # buffered - read everything and truncate + f.truncate(0) + pos = 0 # skip the previously printed lines, blines = binary_text.split(b'\n') if binary_text else [] if not blines: @@ -1563,6 +1573,21 @@ class Worker(ServiceCommandSection): stderr = open(stderr_path, "wt") if stderr_path else stdout stdout_line_count, stdout_pos_count, stdout_last_lines = 0, 0, [] stderr_line_count, stderr_pos_count, stderr_last_lines = 0, 0, [] + lines_buffer = defaultdict(list) + + def report_lines(lines, source): + if not self._truncate_task_output_files: + # non-buffered + return self.send_logs(task_id, lines, session=session) + + buffer = lines_buffer[source] + buffer += lines + + sent = self.send_logs(task_id, buffer, session=session) + if sent > 0: + lines_buffer[source] = buffer[sent:] + return sent + service_mode_internal_agent_started = None stopping = False status = None @@ -1613,10 +1638,11 @@ class Worker(ServiceCommandSection): if status is not None: stop_reason = 'Service started' - stdout_line_count += self.send_logs(task_id, printed_lines, session=session) + stdout_line_count += report_lines(printed_lines, "stdout") + if stderr_path: printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count) - stderr_line_count += self.send_logs(task_id, printed_lines, session=session) + stderr_line_count += report_lines(printed_lines, "stderr") except subprocess.CalledProcessError as ex: # non zero return code @@ -1630,10 +1656,10 @@ class Worker(ServiceCommandSection): except Exception: # we should not get here, but better safe than sorry printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count) - stdout_line_count += self.send_logs(task_id, printed_lines, session=session) + stdout_line_count += report_lines(printed_lines, "stdout") if stderr_path: printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count) - stderr_line_count += self.send_logs(task_id, printed_lines, session=session) + stderr_line_count += report_lines(printed_lines, "stderr") stop_reason = TaskStopReason.exception status = -1 @@ -1652,10 +1678,10 @@ class Worker(ServiceCommandSection): # Send last lines printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count) - stdout_line_count += self.send_logs(task_id, printed_lines, session=session) + stdout_line_count += report_lines(printed_lines, "stdout") if stderr_path: printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count) - stderr_line_count += self.send_logs(task_id, printed_lines, session=session) + stderr_line_count += report_lines(printed_lines, "stderr") return status, stop_reason @@ -1737,6 +1763,29 @@ class Worker(ServiceCommandSection): raise ValueError("Failed applying git diff:\n{}\n\n" "ERROR! Failed applying git diff, see diff above.".format(diff)) + def _apply_extra_configuration(self): + try: + self._session.load_vaults() + except Exception as ex: + print("Error: failed applying extra configuration: {}".format(ex)) + + config = self._session.config + default = config.get("agent.apply_environment", False) + if ENV_ENABLE_ENV_CONFIG_SECTION.get(default=default): + try: + keys = apply_environment(config) + if keys: + print("Environment variables set from configuration: {}".format(keys)) + except Exception as ex: + print("Error: failed applying environment from configuration: {}".format(ex)) + + default = config.get("agent.apply_files", default=False) + if ENV_ENABLE_FILES_CONFIG_SECTION.get(default=default): + try: + apply_files(config) + except Exception as ex: + print("Error: failed applying files from configuration: {}".format(ex)) + @resolve_names def build( self, @@ -2017,6 +2066,8 @@ class Worker(ServiceCommandSection): Singleton.close_pid_file() return + self._apply_extra_configuration() + self._session.print_configuration() # now mark the task as started @@ -2779,7 +2830,7 @@ class Worker(ServiceCommandSection): path itself can be passed in this variable) :return: virtualenv directory, requirements manager to use with task, True if there is a cached venv entry """ - skip_pip_venv_install = ENV_AGENT_SKIP_PIP_VENV_INSTALL.get() if self._session.feature_set != "basic" else None + skip_pip_venv_install = ENV_AGENT_SKIP_PIP_VENV_INSTALL.get() if skip_pip_venv_install: try: skip_pip_venv_install = bool(strtobool(skip_pip_venv_install)) @@ -3519,8 +3570,13 @@ class Worker(ServiceCommandSection): def _resolve_queue_names(self, queues, create_if_missing=False): if not queues: - default_queue = self._session.send_api(queues_api.GetDefaultRequest()) - return [default_queue.id] + # try to look for queues with "default" tag + try: + default_queue = self._session.send_api(queues_api.GetDefaultRequest()) + return [default_queue.id] + except APIError: + # if we cannot find one with "default" tag, look for a queue named "default" + queues = ["default"] queues = return_list(queues) if not create_if_missing: diff --git a/clearml_agent/helper/package/pytorch.py b/clearml_agent/helper/package/pytorch.py index 7b5a277..6b8b03a 100644 --- a/clearml_agent/helper/package/pytorch.py +++ b/clearml_agent/helper/package/pytorch.py @@ -263,6 +263,9 @@ class PytorchRequirement(SimpleSubstitution): continue if len(parts) < 5 or platform_wheel not in parts[4]: continue + # yes this is for linux python 2.7 support, this is the only python 2.7 we support... + if py_ver and py_ver[0] == '2' and len(parts) > 3 and not parts[3].endswith('u'): + continue # update the closest matched version (from above) if not closest_v: closest_v = v diff --git a/clearml_agent/helper/repo.py b/clearml_agent/helper/repo.py index 0bab3f2..0a4ccc3 100644 --- a/clearml_agent/helper/repo.py +++ b/clearml_agent/helper/repo.py @@ -482,7 +482,7 @@ class VCS(object): parsed_url = furl(url) except ValueError: return url - if parsed_url.scheme in ["", "ssh"] or parsed_url.scheme.startswith("git"): + if parsed_url.scheme in ["", "ssh"] or (parsed_url.scheme or '').startswith("git"): return parsed_url.url config_user = ENV_AGENT_GIT_USER.get() or config.get("agent.{}_user".format(cls.executable_name), None) config_pass = ENV_AGENT_GIT_PASS.get() or config.get("agent.{}_pass".format(cls.executable_name), None) diff --git a/clearml_agent/version.py b/clearml_agent/version.py index 1a72d32..b3ddbc4 100644 --- a/clearml_agent/version.py +++ b/clearml_agent/version.py @@ -1 +1 @@ -__version__ = '1.1.0' +__version__ = '1.1.1' diff --git a/docs/clearml.conf b/docs/clearml.conf index 7444b3a..29b5dbd 100644 --- a/docs/clearml.conf +++ b/docs/clearml.conf @@ -285,6 +285,7 @@ sdk { # secret: "12345678" # multipart: false # secure: false + # verify: /path/to/ca/bundle.crt OR false to not verify # } ] } @@ -359,5 +360,45 @@ sdk { log_stdout: True } } + + # Apply top-level environment section from configuration into os.environ + apply_environment: true + # Top-level environment section is in the form of: + # environment { + # key: value + # ... + # } + # and is applied to the OS environment as `key=value` for each key/value pair + + # Apply top-level files section from configuration into local file system + apply_files: true + # Top-level files section allows auto-generating files at designated paths with a predefined contents + # and target format. Options include: + # contents: the target file's content, typically a string (or any base type int/float/list/dict etc.) + # format: a custom format for the contents. Currently supported value is `base64` to automatically decode a + # base64-encoded contents string, otherwise ignored + # path: the target file's path, may include ~ and inplace env vars + # target_format: format used to encode contents before writing into the target file. Supported values are json, + # yaml, yml and bytes (in which case the file will be written in binary mode). Default is text mode. + # overwrite: overwrite the target file in case it exists. Default is true. + # + # Example: + # files { + # myfile1 { + # contents: "The quick brown fox jumped over the lazy dog" + # path: "/tmp/fox.txt" + # } + # myjsonfile { + # contents: { + # some { + # nested { + # value: [1, 2, 3, 4] + # } + # } + # } + # path: "/tmp/test.json" + # target_format: json + # } + # } }