This commit is contained in:
Revital 2021-12-13 13:36:50 +02:00
commit f8a7f3f15c
11 changed files with 273 additions and 24 deletions

View File

@ -221,4 +221,44 @@
# Note: resulting name must start with an alphanumeric character and continue with alphanumeric characters,
# underscores (_), dots (.) and/or dashes (-)
#docker_container_name_format: "clearml-id-{task_id}-{rand_string:.8}"
# Apply top-level environment section from configuration into os.environ
apply_environment: true
# Top-level environment section is in the form of:
# environment {
# key: value
# ...
# }
# and is applied to the OS environment as `key=value` for each key/value pair
# Apply top-level files section from configuration into local file system
apply_files: true
# Top-level files section allows auto-generating files at designated paths with a predefined contents
# and target format. Options include:
# contents: the target file's content, typically a string (or any base type int/float/list/dict etc.)
# format: a custom format for the contents. Currently supported value is `base64` to automatically decode a
# base64-encoded contents string, otherwise ignored
# path: the target file's path, may include ~ and inplace env vars
# target_format: format used to encode contents before writing into the target file. Supported values are json,
# yaml, yml and bytes (in which case the file will be written in binary mode). Default is text mode.
# overwrite: overwrite the target file in case it exists. Default is true.
#
# Example:
# files {
# myfile1 {
# contents: "The quick brown fox jumped over the lazy dog"
# path: "/tmp/fox.txt"
# }
# myjsonfile {
# contents: {
# some {
# nested {
# value: [1, 2, 3, 4]
# }
# }
# }
# path: "/tmp/test.json"
# target_format: json
# }
# }
}

View File

@ -13,6 +13,8 @@ ENV_HOST_VERIFY_CERT = EnvEntry("CLEARML_API_HOST_VERIFY_CERT", "TRAINS_API_HOST
ENV_CONDA_ENV_PACKAGE = EnvEntry("CLEARML_CONDA_ENV_PACKAGE", "TRAINS_CONDA_ENV_PACKAGE")
ENV_NO_DEFAULT_SERVER = EnvEntry("CLEARML_NO_DEFAULT_SERVER", "TRAINS_NO_DEFAULT_SERVER", type=bool, default=True)
ENV_DISABLE_VAULT_SUPPORT = EnvEntry('CLEARML_AGENT_DISABLE_VAULT_SUPPORT', type=bool)
ENV_ENABLE_ENV_CONFIG_SECTION = EnvEntry('CLEARML_AGENT_ENABLE_ENV_CONFIG_SECTION', type=bool)
ENV_ENABLE_FILES_CONFIG_SECTION = EnvEntry('CLEARML_AGENT_ENABLE_FILES_CONFIG_SECTION', type=bool)
ENV_INITIAL_CONNECT_RETRY_OVERRIDE = EnvEntry(
'CLEARML_AGENT_INITIAL_CONNECT_RETRY_OVERRIDE', default=True, converter=safe_text_to_bool
)

View File

@ -183,8 +183,6 @@ class Session(TokenManager):
# notice: this is across the board warning omission
urllib_log_warning_setup(total_retries=http_retries_config.get('total', 0), display_warning_after=3)
self._load_vaults()
def _setup_session(self, http_retries_config, initial_session=False, default_initial_connect_override=None):
# type: (dict, bool, Optional[bool]) -> (dict, requests.Session)
http_retries_config = http_retries_config or self.config.get(
@ -210,7 +208,7 @@ class Session(TokenManager):
return http_retries_config, get_http_session_with_retry(**http_retries_config)
def _load_vaults(self):
def load_vaults(self):
if not self.check_min_api_version("2.15") or self.feature_set == "basic":
return

View File

@ -87,10 +87,16 @@ class TokenManager(object):
@classmethod
def get_decoded_token(cls, token, verify=False):
""" Get token expiration time. If not present, assume forever """
if hasattr(jwt, '__version__') and jwt.__version__[0] == '1':
return jwt.decode(
token,
verify=verify,
algorithms=get_default_algorithms(),
)
return jwt.decode(
token,
verify=verify,
options=dict(verify_signature=False),
options=dict(verify_signature=verify),
algorithms=get_default_algorithms(),
)

View File

@ -82,7 +82,7 @@ class Config(object):
relative_to=None,
app=None,
is_server=False,
**_,
**_
):
self._app = app
self._verbose = verbose
@ -214,7 +214,7 @@ class Config(object):
.lower()
)
result = ConfigTree.merge_configs(
result, ConfigFactory.parse_string(f"{path}: {os.environ[key]}")
result, ConfigFactory.parse_string("{}: {}".format(path, os.environ[key]))
)
return result

View File

@ -1,3 +1,14 @@
import base64
import os
from os.path import expandvars, expanduser
from pathlib import Path
from typing import List, TYPE_CHECKING
from pyhocon import HOCONConverter, ConfigTree
if TYPE_CHECKING:
from .config import Config
def get_items(cls):
""" get key/value items from an enum-like class (members represent enumeration key/value) """
@ -7,3 +18,95 @@ def get_items(cls):
def get_options(cls):
""" get options from an enum-like class (members represent enumeration key/value) """
return get_items(cls).values()
def apply_environment(config):
# type: (Config) -> List[str]
env_vars = config.get("environment", None)
if not env_vars:
return []
if isinstance(env_vars, (list, tuple)):
env_vars = dict(env_vars)
keys = list(filter(None, env_vars.keys()))
for key in keys:
os.environ[str(key)] = str(env_vars[key] or "")
return keys
def apply_files(config):
# type: (Config) -> None
files = config.get("files", None)
if not files:
return
if isinstance(files, (list, tuple)):
files = dict(files)
print("Creating files from configuration")
for key, data in files.items():
path = data.get("path")
fmt = data.get("format", "string")
target_fmt = data.get("target_format", "string")
overwrite = bool(data.get("overwrite", True))
contents = data.get("contents")
target = Path(expanduser(expandvars(path)))
# noinspection PyBroadException
try:
if target.is_dir():
print("Skipped [{}]: is a directory {}".format(key, target))
continue
if not overwrite and target.is_file():
print("Skipped [{}]: file exists {}".format(key, target))
continue
except Exception as ex:
print("Skipped [{}]: can't access {} ({})".format(key, target, ex))
continue
if contents:
try:
if fmt == "base64":
contents = base64.b64decode(contents)
if target_fmt != "bytes":
contents = contents.decode("utf-8")
except Exception as ex:
print("Skipped [{}]: failed decoding {} ({})".format(key, fmt, ex))
continue
# noinspection PyBroadException
try:
target.parent.mkdir(parents=True, exist_ok=True)
except Exception as ex:
print("Skipped [{}]: failed creating path {} ({})".format(key, target.parent, ex))
continue
try:
if target_fmt == "bytes":
try:
target.write_bytes(contents)
except TypeError:
# simpler error so the user won't get confused
raise TypeError("a bytes-like object is required")
else:
try:
if target_fmt == "json":
text = HOCONConverter.to_json(contents)
elif target_fmt in ("yaml", "yml"):
text = HOCONConverter.to_yaml(contents)
else:
if isinstance(contents, ConfigTree):
contents = contents.as_plain_ordered_dict()
text = str(contents)
except Exception as ex:
print("Skipped [{}]: failed encoding to {} ({})".format(key, target_fmt, ex))
continue
target.write_text(text)
print("Saved [{}]: {}".format(key, target))
except Exception as ex:
print("Skipped [{}]: failed saving file {} ({})".format(key, target, ex))
continue

View File

@ -37,7 +37,9 @@ from clearml_agent.backend_api.services import queues as queues_api
from clearml_agent.backend_api.services import tasks as tasks_api
from clearml_agent.backend_api.services import workers as workers_api
from clearml_agent.backend_api.session import CallResult
from clearml_agent.backend_api.session.defs import ENV_ENABLE_ENV_CONFIG_SECTION, ENV_ENABLE_FILES_CONFIG_SECTION
from clearml_agent.backend_config.defs import UptimeConf
from clearml_agent.backend_config.utils import apply_environment, apply_files
from clearml_agent.commands.base import resolve_names, ServiceCommandSection
from clearml_agent.definitions import (
ENVIRONMENT_SDK_PARAMS,
@ -60,6 +62,7 @@ from clearml_agent.definitions import (
ENV_SSH_AUTH_SOCK,
ENV_AGENT_SKIP_PIP_VENV_INSTALL,
ENV_EXTRA_DOCKER_ARGS,
)
from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
from clearml_agent.errors import APIError, CommandFailedError, Sigterm
@ -567,6 +570,7 @@ class Worker(ServiceCommandSection):
self._downtime_config = self._session.config.get("agent.downtime", None)
self._suppress_cr = self._session.config.get("agent.suppress_carriage_return", True)
self._host_ssh_cache = None
self._truncate_task_output_files = bool(self._session.config.get("agent.truncate_task_output_files", False))
# True - supported
# None - not initialized
@ -1278,15 +1282,15 @@ class Worker(ServiceCommandSection):
if self._services_mode and dynamic_gpus:
raise ValueError("Combining --dynamic-gpus and --services-mode is not supported")
# if we do not need to create queues, make sure they are valid
# match previous behaviour when we validated queue names before everything else
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
# We are not running a daemon we are killing one.
# find the pid send termination signal and leave
if kwargs.get('stop', False):
return 1 if not self._kill_daemon(dynamic_gpus=dynamic_gpus) else 0
# if we do not need to create queues, make sure they are valid
# match previous behaviour when we validated queue names before everything else
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
queues_info = [
q.to_dict()
for q in self._session.send_api(
@ -1544,10 +1548,16 @@ class Worker(ServiceCommandSection):
):
# type: (...) -> Tuple[Optional[int], Optional[TaskStopReason]]
def _print_file(file_path, prev_pos=0):
with open(file_path, "rb") as f:
with open(file_path, "ab+") as f:
f.seek(prev_pos)
binary_text = f.read()
pos = f.tell()
if not self._truncate_task_output_files:
# non-buffered behavior
pos = f.tell()
else:
# buffered - read everything and truncate
f.truncate(0)
pos = 0
# skip the previously printed lines,
blines = binary_text.split(b'\n') if binary_text else []
if not blines:
@ -1563,6 +1573,21 @@ class Worker(ServiceCommandSection):
stderr = open(stderr_path, "wt") if stderr_path else stdout
stdout_line_count, stdout_pos_count, stdout_last_lines = 0, 0, []
stderr_line_count, stderr_pos_count, stderr_last_lines = 0, 0, []
lines_buffer = defaultdict(list)
def report_lines(lines, source):
if not self._truncate_task_output_files:
# non-buffered
return self.send_logs(task_id, lines, session=session)
buffer = lines_buffer[source]
buffer += lines
sent = self.send_logs(task_id, buffer, session=session)
if sent > 0:
lines_buffer[source] = buffer[sent:]
return sent
service_mode_internal_agent_started = None
stopping = False
status = None
@ -1613,10 +1638,11 @@ class Worker(ServiceCommandSection):
if status is not None:
stop_reason = 'Service started'
stdout_line_count += self.send_logs(task_id, printed_lines, session=session)
stdout_line_count += report_lines(printed_lines, "stdout")
if stderr_path:
printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count)
stderr_line_count += self.send_logs(task_id, printed_lines, session=session)
stderr_line_count += report_lines(printed_lines, "stderr")
except subprocess.CalledProcessError as ex:
# non zero return code
@ -1630,10 +1656,10 @@ class Worker(ServiceCommandSection):
except Exception:
# we should not get here, but better safe than sorry
printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count)
stdout_line_count += self.send_logs(task_id, printed_lines, session=session)
stdout_line_count += report_lines(printed_lines, "stdout")
if stderr_path:
printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count)
stderr_line_count += self.send_logs(task_id, printed_lines, session=session)
stderr_line_count += report_lines(printed_lines, "stderr")
stop_reason = TaskStopReason.exception
status = -1
@ -1652,10 +1678,10 @@ class Worker(ServiceCommandSection):
# Send last lines
printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count)
stdout_line_count += self.send_logs(task_id, printed_lines, session=session)
stdout_line_count += report_lines(printed_lines, "stdout")
if stderr_path:
printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count)
stderr_line_count += self.send_logs(task_id, printed_lines, session=session)
stderr_line_count += report_lines(printed_lines, "stderr")
return status, stop_reason
@ -1737,6 +1763,29 @@ class Worker(ServiceCommandSection):
raise ValueError("Failed applying git diff:\n{}\n\n"
"ERROR! Failed applying git diff, see diff above.".format(diff))
def _apply_extra_configuration(self):
try:
self._session.load_vaults()
except Exception as ex:
print("Error: failed applying extra configuration: {}".format(ex))
config = self._session.config
default = config.get("agent.apply_environment", False)
if ENV_ENABLE_ENV_CONFIG_SECTION.get(default=default):
try:
keys = apply_environment(config)
if keys:
print("Environment variables set from configuration: {}".format(keys))
except Exception as ex:
print("Error: failed applying environment from configuration: {}".format(ex))
default = config.get("agent.apply_files", default=False)
if ENV_ENABLE_FILES_CONFIG_SECTION.get(default=default):
try:
apply_files(config)
except Exception as ex:
print("Error: failed applying files from configuration: {}".format(ex))
@resolve_names
def build(
self,
@ -2017,6 +2066,8 @@ class Worker(ServiceCommandSection):
Singleton.close_pid_file()
return
self._apply_extra_configuration()
self._session.print_configuration()
# now mark the task as started
@ -2779,7 +2830,7 @@ class Worker(ServiceCommandSection):
path itself can be passed in this variable)
:return: virtualenv directory, requirements manager to use with task, True if there is a cached venv entry
"""
skip_pip_venv_install = ENV_AGENT_SKIP_PIP_VENV_INSTALL.get() if self._session.feature_set != "basic" else None
skip_pip_venv_install = ENV_AGENT_SKIP_PIP_VENV_INSTALL.get()
if skip_pip_venv_install:
try:
skip_pip_venv_install = bool(strtobool(skip_pip_venv_install))
@ -3519,8 +3570,13 @@ class Worker(ServiceCommandSection):
def _resolve_queue_names(self, queues, create_if_missing=False):
if not queues:
default_queue = self._session.send_api(queues_api.GetDefaultRequest())
return [default_queue.id]
# try to look for queues with "default" tag
try:
default_queue = self._session.send_api(queues_api.GetDefaultRequest())
return [default_queue.id]
except APIError:
# if we cannot find one with "default" tag, look for a queue named "default"
queues = ["default"]
queues = return_list(queues)
if not create_if_missing:

View File

@ -263,6 +263,9 @@ class PytorchRequirement(SimpleSubstitution):
continue
if len(parts) < 5 or platform_wheel not in parts[4]:
continue
# yes this is for linux python 2.7 support, this is the only python 2.7 we support...
if py_ver and py_ver[0] == '2' and len(parts) > 3 and not parts[3].endswith('u'):
continue
# update the closest matched version (from above)
if not closest_v:
closest_v = v

View File

@ -482,7 +482,7 @@ class VCS(object):
parsed_url = furl(url)
except ValueError:
return url
if parsed_url.scheme in ["", "ssh"] or parsed_url.scheme.startswith("git"):
if parsed_url.scheme in ["", "ssh"] or (parsed_url.scheme or '').startswith("git"):
return parsed_url.url
config_user = ENV_AGENT_GIT_USER.get() or config.get("agent.{}_user".format(cls.executable_name), None)
config_pass = ENV_AGENT_GIT_PASS.get() or config.get("agent.{}_pass".format(cls.executable_name), None)

View File

@ -1 +1 @@
__version__ = '1.1.0'
__version__ = '1.1.1'

View File

@ -285,6 +285,7 @@ sdk {
# secret: "12345678"
# multipart: false
# secure: false
# verify: /path/to/ca/bundle.crt OR false to not verify
# }
]
}
@ -359,5 +360,45 @@ sdk {
log_stdout: True
}
}
# Apply top-level environment section from configuration into os.environ
apply_environment: true
# Top-level environment section is in the form of:
# environment {
# key: value
# ...
# }
# and is applied to the OS environment as `key=value` for each key/value pair
# Apply top-level files section from configuration into local file system
apply_files: true
# Top-level files section allows auto-generating files at designated paths with a predefined contents
# and target format. Options include:
# contents: the target file's content, typically a string (or any base type int/float/list/dict etc.)
# format: a custom format for the contents. Currently supported value is `base64` to automatically decode a
# base64-encoded contents string, otherwise ignored
# path: the target file's path, may include ~ and inplace env vars
# target_format: format used to encode contents before writing into the target file. Supported values are json,
# yaml, yml and bytes (in which case the file will be written in binary mode). Default is text mode.
# overwrite: overwrite the target file in case it exists. Default is true.
#
# Example:
# files {
# myfile1 {
# contents: "The quick brown fox jumped over the lazy dog"
# path: "/tmp/fox.txt"
# }
# myjsonfile {
# contents: {
# some {
# nested {
# value: [1, 2, 3, 4]
# }
# }
# }
# path: "/tmp/test.json"
# target_format: json
# }
# }
}