Compare commits

...

16 Commits

Author SHA1 Message Date
allegroai
73625bf00f Version bump 2021-12-21 14:29:43 +02:00
allegroai
f41ed09dc1 Add support for custom docker image resolving 2021-12-21 14:29:43 +02:00
allegroai
f03c4576f7 Update default docker image 2021-12-21 14:29:43 +02:00
pshowbs
6c5087e425 Update S3 bucket verify option for minio (#83)
Use verify configuration option to skip verify or set ca bundle path
2021-11-06 14:40:35 +02:00
allegroai
5a6caf6399 Fix "git+git://" requirements 2021-10-29 22:58:28 +03:00
allegroai
a07053d961 Version bump to v1.1.1 2021-10-26 10:12:21 +03:00
allegroai
aa9a9a25fb version bump 2021-10-21 12:03:29 +03:00
allegroai
cd4a39d8fc Fix config example 2021-10-21 12:03:07 +03:00
allegroai
92e3f00435 Add support for truncating task log file after reporting to server 2021-10-21 12:02:31 +03:00
allegroai
a890e36a36 Fix PY2.7 support for pytorch 2021-10-19 10:47:09 +03:00
allegroai
bed94ee431 Add support for configuration env and files section 2021-10-19 10:46:43 +03:00
allegroai
175e99b12b Fix if queue tag default does not exist and --queue not specified, try queue name "default" 2021-10-16 23:21:45 +03:00
allegroai
2a941e3abf Fix --stop checking default queue tag (issue #80) 2021-10-16 23:21:12 +03:00
allegroai
3c8e0ae5db Improve PyJWT resiliency support 2021-10-10 09:08:36 +03:00
allegroai
e416ab526b Fix Python 3.5 compatibility 2021-09-26 00:05:08 +03:00
pollfly
e17246d8ea Fix docstring typos (#79)
* edit doctring typo

* fix typos
2021-09-14 18:42:18 +03:00
17 changed files with 663 additions and 68 deletions

View File

@@ -156,7 +156,7 @@
default_docker: {
# default docker image to use when running in docker mode
image: "nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04"
image: "nvidia/cuda:10.2-cudnn7-runtime-ubuntu18.04"
# optional arguments to pass to docker image
# arguments: ["--ipc=host", ]
@@ -217,8 +217,48 @@
}
# Name docker containers created by the daemon using the following string format (supported from Docker 0.6.5)
# Allowed variables are task_id, worker_id and rand_string (random lower-case letters string, up to 32 charaters)
# Note: resulting name must start with an alpha-numeric character and continue with a alpha-numeric characters,
# Allowed variables are task_id, worker_id and rand_string (random lower-case letters string, up to 32 characters)
# Note: resulting name must start with an alphanumeric character and continue with alphanumeric characters,
# underscores (_), dots (.) and/or dashes (-)
#docker_container_name_format: "clearml-id-{task_id}-{rand_string:.8}"
# Apply top-level environment section from configuration into os.environ
apply_environment: true
# Top-level environment section is in the form of:
# environment {
# key: value
# ...
# }
# and is applied to the OS environment as `key=value` for each key/value pair
# Apply top-level files section from configuration into local file system
apply_files: true
# Top-level files section allows auto-generating files at designated paths with a predefined contents
# and target format. Options include:
# contents: the target file's content, typically a string (or any base type int/float/list/dict etc.)
# format: a custom format for the contents. Currently supported value is `base64` to automatically decode a
# base64-encoded contents string, otherwise ignored
# path: the target file's path, may include ~ and inplace env vars
# target_format: format used to encode contents before writing into the target file. Supported values are json,
# yaml, yml and bytes (in which case the file will be written in binary mode). Default is text mode.
# overwrite: overwrite the target file in case it exists. Default is true.
#
# Example:
# files {
# myfile1 {
# contents: "The quick brown fox jumped over the lazy dog"
# path: "/tmp/fox.txt"
# }
# myjsonfile {
# contents: {
# some {
# nested {
# value: [1, 2, 3, 4]
# }
# }
# }
# path: "/tmp/test.json"
# target_format: json
# }
# }
}

View File

@@ -13,6 +13,8 @@ ENV_HOST_VERIFY_CERT = EnvEntry("CLEARML_API_HOST_VERIFY_CERT", "TRAINS_API_HOST
ENV_CONDA_ENV_PACKAGE = EnvEntry("CLEARML_CONDA_ENV_PACKAGE", "TRAINS_CONDA_ENV_PACKAGE")
ENV_NO_DEFAULT_SERVER = EnvEntry("CLEARML_NO_DEFAULT_SERVER", "TRAINS_NO_DEFAULT_SERVER", type=bool, default=True)
ENV_DISABLE_VAULT_SUPPORT = EnvEntry('CLEARML_AGENT_DISABLE_VAULT_SUPPORT', type=bool)
ENV_ENABLE_ENV_CONFIG_SECTION = EnvEntry('CLEARML_AGENT_ENABLE_ENV_CONFIG_SECTION', type=bool)
ENV_ENABLE_FILES_CONFIG_SECTION = EnvEntry('CLEARML_AGENT_ENABLE_FILES_CONFIG_SECTION', type=bool)
ENV_INITIAL_CONNECT_RETRY_OVERRIDE = EnvEntry(
'CLEARML_AGENT_INITIAL_CONNECT_RETRY_OVERRIDE', default=True, converter=safe_text_to_bool
)

View File

@@ -183,8 +183,6 @@ class Session(TokenManager):
# notice: this is across the board warning omission
urllib_log_warning_setup(total_retries=http_retries_config.get('total', 0), display_warning_after=3)
self._load_vaults()
def _setup_session(self, http_retries_config, initial_session=False, default_initial_connect_override=None):
# type: (dict, bool, Optional[bool]) -> (dict, requests.Session)
http_retries_config = http_retries_config or self.config.get(
@@ -210,7 +208,7 @@ class Session(TokenManager):
return http_retries_config, get_http_session_with_retry(**http_retries_config)
def _load_vaults(self):
def load_vaults(self):
if not self.check_min_api_version("2.15") or self.feature_set == "basic":
return
@@ -242,6 +240,12 @@ class Session(TokenManager):
except Exception as ex:
print("Failed getting vaults: {}".format(ex))
def verify_feature_set(self, feature_set):
if isinstance(feature_set, str):
feature_set = [feature_set]
if self.feature_set not in feature_set:
raise ValueError('ClearML-server does not support requested feature set {}'.format(feature_set))
def _send_request(
self,
service,

View File

@@ -87,10 +87,16 @@ class TokenManager(object):
@classmethod
def get_decoded_token(cls, token, verify=False):
""" Get token expiration time. If not present, assume forever """
if hasattr(jwt, '__version__') and jwt.__version__[0] == '1':
return jwt.decode(
token,
verify=verify,
algorithms=get_default_algorithms(),
)
return jwt.decode(
token,
verify=verify,
options=dict(verify_signature=False),
options=dict(verify_signature=verify),
algorithms=get_default_algorithms(),
)

View File

@@ -82,7 +82,7 @@ class Config(object):
relative_to=None,
app=None,
is_server=False,
**_,
**_
):
self._app = app
self._verbose = verbose
@@ -214,7 +214,7 @@ class Config(object):
.lower()
)
result = ConfigTree.merge_configs(
result, ConfigFactory.parse_string(f"{path}: {os.environ[key]}")
result, ConfigFactory.parse_string("{}: {}".format(path, os.environ[key]))
)
return result

View File

@@ -1,3 +1,14 @@
import base64
import os
from os.path import expandvars, expanduser
from pathlib import Path
from typing import List, TYPE_CHECKING
from pyhocon import HOCONConverter, ConfigTree
if TYPE_CHECKING:
from .config import Config
def get_items(cls):
""" get key/value items from an enum-like class (members represent enumeration key/value) """
@@ -7,3 +18,95 @@ def get_items(cls):
def get_options(cls):
""" get options from an enum-like class (members represent enumeration key/value) """
return get_items(cls).values()
def apply_environment(config):
# type: (Config) -> List[str]
env_vars = config.get("environment", None)
if not env_vars:
return []
if isinstance(env_vars, (list, tuple)):
env_vars = dict(env_vars)
keys = list(filter(None, env_vars.keys()))
for key in keys:
os.environ[str(key)] = str(env_vars[key] or "")
return keys
def apply_files(config):
# type: (Config) -> None
files = config.get("files", None)
if not files:
return
if isinstance(files, (list, tuple)):
files = dict(files)
print("Creating files from configuration")
for key, data in files.items():
path = data.get("path")
fmt = data.get("format", "string")
target_fmt = data.get("target_format", "string")
overwrite = bool(data.get("overwrite", True))
contents = data.get("contents")
target = Path(expanduser(expandvars(path)))
# noinspection PyBroadException
try:
if target.is_dir():
print("Skipped [{}]: is a directory {}".format(key, target))
continue
if not overwrite and target.is_file():
print("Skipped [{}]: file exists {}".format(key, target))
continue
except Exception as ex:
print("Skipped [{}]: can't access {} ({})".format(key, target, ex))
continue
if contents:
try:
if fmt == "base64":
contents = base64.b64decode(contents)
if target_fmt != "bytes":
contents = contents.decode("utf-8")
except Exception as ex:
print("Skipped [{}]: failed decoding {} ({})".format(key, fmt, ex))
continue
# noinspection PyBroadException
try:
target.parent.mkdir(parents=True, exist_ok=True)
except Exception as ex:
print("Skipped [{}]: failed creating path {} ({})".format(key, target.parent, ex))
continue
try:
if target_fmt == "bytes":
try:
target.write_bytes(contents)
except TypeError:
# simpler error so the user won't get confused
raise TypeError("a bytes-like object is required")
else:
try:
if target_fmt == "json":
text = HOCONConverter.to_json(contents)
elif target_fmt in ("yaml", "yml"):
text = HOCONConverter.to_yaml(contents)
else:
if isinstance(contents, ConfigTree):
contents = contents.as_plain_ordered_dict()
text = str(contents)
except Exception as ex:
print("Skipped [{}]: failed encoding to {} ({})".format(key, target_fmt, ex))
continue
target.write_text(text)
print("Saved [{}]: {}".format(key, target))
except Exception as ex:
print("Skipped [{}]: failed saving file {} ({})".format(key, target, ex))
continue

View File

@@ -0,0 +1,166 @@
import json
import re
import shlex
from clearml_agent.helper.package.requirements import (
RequirementsManager, MarkerRequirement,
compare_version_rules, )
def resolve_default_container(session, task_id, container_config):
container_lookup = session.config.get('agent.default_docker.match_rules', None)
if not session.check_min_api_version("2.13") or not container_lookup:
return container_config
# check backend support before sending any more requests (because they will fail and crash the Task)
try:
session.verify_feature_set('advanced')
except ValueError:
return container_config
result = session.send_request(
service='tasks',
action='get_all',
version='2.14',
json={'id': [task_id],
'only_fields': ['script.requirements', 'script.binary',
'script.repository', 'script.branch',
'project', 'container'],
'search_hidden': True},
method='get',
async_enable=False,
)
try:
task_info = result.json()['data']['tasks'][0] if result.ok else {}
except (ValueError, TypeError):
return container_config
from clearml_agent.external.requirements_parser.requirement import Requirement
# store tasks repository
repository = task_info.get('script', {}).get('repository') or ''
branch = task_info.get('script', {}).get('branch') or ''
binary = task_info.get('script', {}).get('binary') or ''
requested_container = task_info.get('container', {})
# get project full path
project_full_name = ''
if task_info.get('project', None):
result = session.send_request(
service='projects',
action='get_all',
version='2.13',
json={
'id': [task_info.get('project')],
'only_fields': ['name'],
},
method='get',
async_enable=False,
)
try:
if result.ok:
project_full_name = result.json()['data']['projects'][0]['name'] or ''
except (ValueError, TypeError):
pass
task_packages_lookup = {}
for entry in container_lookup:
match = entry.get('match', None)
if not match:
continue
if match.get('project', None):
# noinspection PyBroadException
try:
if not re.search(match.get('project', None), project_full_name):
continue
except Exception:
print('Failed parsing regular expression \"{}\" in rule: {}'.format(
match.get('project', None), entry))
continue
if match.get('script.repository', None):
# noinspection PyBroadException
try:
if not re.search(match.get('script.repository', None), repository):
continue
except Exception:
print('Failed parsing regular expression \"{}\" in rule: {}'.format(
match.get('script.repository', None), entry))
continue
if match.get('script.branch', None):
# noinspection PyBroadException
try:
if not re.search(match.get('script.branch', None), branch):
continue
except Exception:
print('Failed parsing regular expression \"{}\" in rule: {}'.format(
match.get('script.branch', None), entry))
continue
if match.get('script.binary', None):
# noinspection PyBroadException
try:
if not re.search(match.get('script.binary', None), binary):
continue
except Exception:
print('Failed parsing regular expression \"{}\" in rule: {}'.format(
match.get('script.binary', None), entry))
continue
if match.get('container', None):
# noinspection PyBroadException
try:
if not re.search(match.get('container', None), requested_container.get('image', '')):
continue
except Exception:
print('Failed parsing regular expression \"{}\" in rule: {}'.format(
match.get('container', None), entry))
continue
matched = True
for req_section in ['script.requirements.pip', 'script.requirements.conda']:
if not match.get(req_section, None):
continue
match_pip_reqs = [MarkerRequirement(Requirement.parse('{} {}'.format(k, v)))
for k, v in match.get(req_section, None).items()]
if not task_packages_lookup.get(req_section):
req_section_parts = req_section.split('.')
task_packages_lookup[req_section] = \
RequirementsManager.parse_requirements_section_to_marker_requirements(
requirements=task_info.get(req_section_parts[0], {}).get(
req_section_parts[1], {}).get(req_section_parts[2], None)
)
matched_all_reqs = True
for mr in match_pip_reqs:
matched_req = False
for pr in task_packages_lookup[req_section]:
if mr.req.name != pr.req.name:
continue
if compare_version_rules(mr.specs, pr.specs):
matched_req = True
break
if not matched_req:
matched_all_reqs = False
break
# if ew have a match, check second section
if matched_all_reqs:
continue
# no match stop
matched = False
break
if matched:
if not container_config.get('container'):
container_config['container'] = entry.get('image', None)
if not container_config.get('arguments'):
container_config['arguments'] = entry.get('arguments', None)
container_config['arguments'] = shlex.split(str(container_config.get('arguments') or '').strip())
print('Matching default container with rule:\n{}'.format(json.dumps(entry)))
return container_config
return container_config

View File

@@ -37,8 +37,11 @@ from clearml_agent.backend_api.services import queues as queues_api
from clearml_agent.backend_api.services import tasks as tasks_api
from clearml_agent.backend_api.services import workers as workers_api
from clearml_agent.backend_api.session import CallResult
from clearml_agent.backend_api.session.defs import ENV_ENABLE_ENV_CONFIG_SECTION, ENV_ENABLE_FILES_CONFIG_SECTION
from clearml_agent.backend_config.defs import UptimeConf
from clearml_agent.backend_config.utils import apply_environment, apply_files
from clearml_agent.commands.base import resolve_names, ServiceCommandSection
from clearml_agent.commands.resolver import resolve_default_container
from clearml_agent.definitions import (
ENVIRONMENT_SDK_PARAMS,
PROGRAM_NAME,
@@ -60,6 +63,7 @@ from clearml_agent.definitions import (
ENV_SSH_AUTH_SOCK,
ENV_AGENT_SKIP_PIP_VENV_INSTALL,
ENV_EXTRA_DOCKER_ARGS,
)
from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
from clearml_agent.errors import APIError, CommandFailedError, Sigterm
@@ -99,7 +103,8 @@ from clearml_agent.helper.package.poetry_api import PoetryConfig, PoetryAPI
from clearml_agent.helper.package.post_req import PostRequirement
from clearml_agent.helper.package.priority_req import PriorityPackageRequirement, PackageCollectorRequirement
from clearml_agent.helper.package.pytorch import PytorchRequirement
from clearml_agent.helper.package.requirements import RequirementsManager
from clearml_agent.helper.package.requirements import (
RequirementsManager, )
from clearml_agent.helper.package.venv_update_api import VenvUpdateAPI
from clearml_agent.helper.process import (
kill_all_child_processes,
@@ -327,6 +332,9 @@ def get_task_container(session, task_id):
except (ValueError, TypeError):
container = {}
if (not container or not container.get('container')) and session.check_min_api_version("2.13"):
container = resolve_default_container(session=session, task_id=task_id, container_config=container)
return container
@@ -567,6 +575,7 @@ class Worker(ServiceCommandSection):
self._downtime_config = self._session.config.get("agent.downtime", None)
self._suppress_cr = self._session.config.get("agent.suppress_carriage_return", True)
self._host_ssh_cache = None
self._truncate_task_output_files = bool(self._session.config.get("agent.truncate_task_output_files", False))
# True - supported
# None - not initialized
@@ -625,7 +634,7 @@ class Worker(ServiceCommandSection):
:param queue: ID of queue that task was pulled from
:param task_id: ID of task to run
:param worker_args: Worker command line arguments
:params task_session: The session for running operations on the passed task
:param task_session: The session for running operations on the passed task
:param docker: Docker image in which the execution task will run
"""
# start new process and execute task id
@@ -1114,6 +1123,7 @@ class Worker(ServiceCommandSection):
return queue_tags, runtime_props
def get_runtime_properties(self):
# TODO: refactor to use the Session env State
if self._runtime_props_support is not True:
# either not supported or never tested
if self._runtime_props_support == self._session.api_version:
@@ -1278,15 +1288,15 @@ class Worker(ServiceCommandSection):
if self._services_mode and dynamic_gpus:
raise ValueError("Combining --dynamic-gpus and --services-mode is not supported")
# if we do not need to create queues, make sure they are valid
# match previous behaviour when we validated queue names before everything else
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
# We are not running a daemon we are killing one.
# find the pid send termination signal and leave
if kwargs.get('stop', False):
return 1 if not self._kill_daemon(dynamic_gpus=dynamic_gpus) else 0
# if we do not need to create queues, make sure they are valid
# match previous behaviour when we validated queue names before everything else
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
queues_info = [
q.to_dict()
for q in self._session.send_api(
@@ -1434,7 +1444,7 @@ class Worker(ServiceCommandSection):
queue_names = [q.name for q in queues]
if not all('=' in q for q in queue_names):
raise ValueError("using --dynamic-gpus, --queues [{}], "
raise ValueError("using --dynamic-gpus, --queue [{}], "
"queue must be in format <queue_name>=<num_gpus>".format(queue_names))
gpu_indexes = kwargs.get('gpus')
@@ -1544,10 +1554,16 @@ class Worker(ServiceCommandSection):
):
# type: (...) -> Tuple[Optional[int], Optional[TaskStopReason]]
def _print_file(file_path, prev_pos=0):
with open(file_path, "rb") as f:
with open(file_path, "ab+") as f:
f.seek(prev_pos)
binary_text = f.read()
pos = f.tell()
if not self._truncate_task_output_files:
# non-buffered behavior
pos = f.tell()
else:
# buffered - read everything and truncate
f.truncate(0)
pos = 0
# skip the previously printed lines,
blines = binary_text.split(b'\n') if binary_text else []
if not blines:
@@ -1563,6 +1579,21 @@ class Worker(ServiceCommandSection):
stderr = open(stderr_path, "wt") if stderr_path else stdout
stdout_line_count, stdout_pos_count, stdout_last_lines = 0, 0, []
stderr_line_count, stderr_pos_count, stderr_last_lines = 0, 0, []
lines_buffer = defaultdict(list)
def report_lines(lines, source):
if not self._truncate_task_output_files:
# non-buffered
return self.send_logs(task_id, lines, session=session)
buffer = lines_buffer[source]
buffer += lines
sent = self.send_logs(task_id, buffer, session=session)
if sent > 0:
lines_buffer[source] = buffer[sent:]
return sent
service_mode_internal_agent_started = None
stopping = False
status = None
@@ -1613,10 +1644,11 @@ class Worker(ServiceCommandSection):
if status is not None:
stop_reason = 'Service started'
stdout_line_count += self.send_logs(task_id, printed_lines, session=session)
stdout_line_count += report_lines(printed_lines, "stdout")
if stderr_path:
printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count)
stderr_line_count += self.send_logs(task_id, printed_lines, session=session)
stderr_line_count += report_lines(printed_lines, "stderr")
except subprocess.CalledProcessError as ex:
# non zero return code
@@ -1630,10 +1662,10 @@ class Worker(ServiceCommandSection):
except Exception:
# we should not get here, but better safe than sorry
printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count)
stdout_line_count += self.send_logs(task_id, printed_lines, session=session)
stdout_line_count += report_lines(printed_lines, "stdout")
if stderr_path:
printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count)
stderr_line_count += self.send_logs(task_id, printed_lines, session=session)
stderr_line_count += report_lines(printed_lines, "stderr")
stop_reason = TaskStopReason.exception
status = -1
@@ -1652,10 +1684,10 @@ class Worker(ServiceCommandSection):
# Send last lines
printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count)
stdout_line_count += self.send_logs(task_id, printed_lines, session=session)
stdout_line_count += report_lines(printed_lines, "stdout")
if stderr_path:
printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count)
stderr_line_count += self.send_logs(task_id, printed_lines, session=session)
stderr_line_count += report_lines(printed_lines, "stderr")
return status, stop_reason
@@ -1737,6 +1769,29 @@ class Worker(ServiceCommandSection):
raise ValueError("Failed applying git diff:\n{}\n\n"
"ERROR! Failed applying git diff, see diff above.".format(diff))
def _apply_extra_configuration(self):
try:
self._session.load_vaults()
except Exception as ex:
print("Error: failed applying extra configuration: {}".format(ex))
config = self._session.config
default = config.get("agent.apply_environment", False)
if ENV_ENABLE_ENV_CONFIG_SECTION.get(default=default):
try:
keys = apply_environment(config)
if keys:
print("Environment variables set from configuration: {}".format(keys))
except Exception as ex:
print("Error: failed applying environment from configuration: {}".format(ex))
default = config.get("agent.apply_files", default=False)
if ENV_ENABLE_FILES_CONFIG_SECTION.get(default=default):
try:
apply_files(config)
except Exception as ex:
print("Error: failed applying files from configuration: {}".format(ex))
@resolve_names
def build(
self,
@@ -2017,6 +2072,8 @@ class Worker(ServiceCommandSection):
Singleton.close_pid_file()
return
self._apply_extra_configuration()
self._session.print_configuration()
# now mark the task as started
@@ -2779,7 +2836,7 @@ class Worker(ServiceCommandSection):
path itself can be passed in this variable)
:return: virtualenv directory, requirements manager to use with task, True if there is a cached venv entry
"""
skip_pip_venv_install = ENV_AGENT_SKIP_PIP_VENV_INSTALL.get() if self._session.feature_set != "basic" else None
skip_pip_venv_install = ENV_AGENT_SKIP_PIP_VENV_INSTALL.get()
if skip_pip_venv_install:
try:
skip_pip_venv_install = bool(strtobool(skip_pip_venv_install))
@@ -2792,7 +2849,8 @@ class Worker(ServiceCommandSection):
requested_python_version = \
requested_python_version or \
Text(self._session.config.get("agent.python_binary", None)) or \
Text(self._session.config.get("agent.default_python", None))
Text(self._session.config.get("agent.default_python", None)) or \
'{}.{}'.format(sys.version_info.major, sys.version_info.minor)
if self.is_conda:
executable_version_suffix = \
@@ -2819,13 +2877,14 @@ class Worker(ServiceCommandSection):
self.find_python_executable_for_version(requested_python_version)
except Exception:
def_python_version = Text(self._session.config.get("agent.python_binary", None)) or \
Text(self._session.config.get("agent.default_python", None))
Text(self._session.config.get("agent.default_python", None)) or \
'{}.{}'.format(sys.version_info.major, sys.version_info.minor)
print('Warning: could not locate requested Python version {}, reverting to version {}'.format(
requested_python_version, def_python_version))
executable_version, executable_version_suffix, executable_name = \
self.find_python_executable_for_version(def_python_version)
self._session.config.put("agent.default_python", executable_version)
self._session.config.put("agent.default_python", executable_version_suffix)
self._session.config.put("agent.python_binary", executable_name)
venv_dir = Path(venv_dir) if venv_dir else \
@@ -3519,8 +3578,13 @@ class Worker(ServiceCommandSection):
def _resolve_queue_names(self, queues, create_if_missing=False):
if not queues:
default_queue = self._session.send_api(queues_api.GetDefaultRequest())
return [default_queue.id]
# try to look for queues with "default" tag
try:
default_queue = self._session.send_api(queues_api.GetDefaultRequest())
return [default_queue.id]
except APIError:
# if we cannot find one with "default" tag, look for a queue named "default"
queues = ["default"]
queues = return_list(queues)
if not create_if_missing:

View File

@@ -189,14 +189,6 @@ class CondaAPI(PackageManager):
if conda_env.is_file() and not is_windows_platform():
self.source = self.pip.source = CommandSequence(('source', conda_env.as_posix()), self.source)
# install cuda toolkit
# noinspection PyBroadException
try:
cuda_version = float(int(self.session.config['agent.cuda_version'])) / 10.0
if cuda_version > 0:
self._install('cudatoolkit={:.1f}'.format(cuda_version))
except Exception:
pass
return self
def _init_existing_environment(self, conda_pre_build_env_path):
@@ -456,7 +448,9 @@ class CondaAPI(PackageManager):
requirements['conda'] = requirements['conda'].split('\n')
has_torch = False
has_matplotlib = False
has_cudatoolkit = False
try:
# notice this is an integer version: 112 (means 11.2)
cuda_version = int(self.session.config.get('agent.cuda_version', 0))
except:
cuda_version = 0
@@ -488,6 +482,19 @@ class CondaAPI(PackageManager):
if '.' not in m.specs[0][1]:
continue
if m.name.lower() == 'cudatoolkit':
# skip cuda if we are running on CPU
if not cuda_version:
continue
has_cudatoolkit = True
# cuda version, only major.minor
requested_cuda_version = '.'.join(m.specs[0][1].split('.')[:2])
# make sure that the cuda_version we support can install the requested cuda (major version)
if int(float(requested_cuda_version)) > int(float(cuda_version)/10.0):
continue
m.specs = [(m.specs[0][0], str(requested_cuda_version)), ]
conda_supported_req_names.append(m.name.lower())
if m.req.name.lower() == 'matplotlib':
has_matplotlib = True
@@ -504,6 +511,10 @@ class CondaAPI(PackageManager):
reqs.append(m)
if not has_cudatoolkit and cuda_version:
m = MarkerRequirement(Requirement("cudatoolkit == {}".format(float(cuda_version) / 10.0)))
reqs.append(m)
# if we have a conda list, the rest should be installed with pip,
# this means any experiment that was executed with pip environment,
# will be installed using pip
@@ -559,8 +570,12 @@ class CondaAPI(PackageManager):
# change _ to - in name but not the prefix _ (as this is conda prefix)
if r.name and not r.name.startswith('_') and not requirements.get('conda', None):
r.name = r.name.replace('_', '-')
# remove .post from version numbers, it fails ~= version, and change == to ~=
if r.specs and r.specs[0]:
if has_cudatoolkit and r.specs and len(r.specs[0]) > 1 and r.name == 'cudatoolkit':
# select specific cuda version if it came from the requirements
r.specs = [(r.specs[0][0].replace('==', '='), r.specs[0][1].split('.post')[0])]
elif r.specs and r.specs[0] and len(r.specs[0]) > 1:
# remove .post from version numbers it fails with ~= version, and change == to ~=
r.specs = [(r.specs[0][0].replace('==', '~='), r.specs[0][1].split('.post')[0])]
while reqs:

View File

@@ -263,6 +263,9 @@ class PytorchRequirement(SimpleSubstitution):
continue
if len(parts) < 5 or platform_wheel not in parts[4]:
continue
# yes this is for linux python 2.7 support, this is the only python 2.7 we support...
if py_ver and py_ver[0] == '2' and len(parts) > 3 and not parts[3].endswith('u'):
continue
# update the closest matched version (from above)
if not closest_v:
closest_v = v

View File

@@ -208,7 +208,11 @@ class SimpleVersion:
if not version_b:
return True
if not num_parts:
num_parts = max(len(version_a.split('.')), len(version_b.split('.')), )
if op == '~=':
num_parts = len(version_b.split('.')) - 1
num_parts = max(num_parts, 2)
op = '=='
ignore_sub_versions = True
@@ -245,6 +249,16 @@ class SimpleVersion:
return version_a_key < version_b_key
raise ValueError('Unrecognized comparison operator [{}]'.format(op))
@classmethod
def max_version(cls, version_a, version_b):
return version_a if cls.compare_versions(
version_a=version_a, op='>=', version_b=version_b, num_parts=None) else version_b
@classmethod
def min_version(cls, version_a, version_b):
return version_a if cls.compare_versions(
version_a=version_a, op='<=', version_b=version_b, num_parts=None) else version_b
@staticmethod
def _parse_letter_version(
letter, # type: str
@@ -313,6 +327,77 @@ class SimpleVersion:
return ()
def compare_version_rules(specs_a, specs_b):
# specs_a/b are a list of tuples: [('==', '1.2.3'), ] or [('>=', '1.2'), ('<', '1.3')]
# section definition:
class Section(object):
def __init__(self, left=None, left_eq=False, right=None, right_eq=False):
self.left, self.left_eq, self.right, self.right_eq = left, left_eq, right, right_eq
# first create a list of in/out sections for each spec
# >, >= are left rule
# <, <= are right rule
# ~= x.y.z is converted to: >= x.y and < x.y+1
# ==/=== are converted to: >= and <=
# != x.y.z will split a section into: left < x.y.z and right > x.y.z
def create_section(specs):
section = Section()
for op, v in specs:
a = section
if op == '>':
a.left = v
a.left_eq = False
elif op == '>=':
a.left = v
a.left_eq = True
elif op == '<':
a.right = v
a.right_eq = False
elif op == '<=':
a.right = v
a.right_eq = True
elif op == '==':
a.left = v
a.left_eq = True
a.right = v
a.right_eq = True
elif op == '~=':
new_v = v.split('.')
a_left = '.'.join(new_v[:-1])
a.left = a_left if not a.left else SimpleVersion.max_version(a_left, a.left)
a.left_eq = True
a_right = '.'.join(new_v[:-2] + [str(int(new_v[-2])+1)])
a.right = a_right if not a.right else SimpleVersion.min_version(a_right, a.right)
a.right_eq = False if a.right == a_right else a.right_eq
return section
section_a = create_section(specs_a)
section_b = create_section(specs_b)
i = Section()
# then we have a list of sections for spec A/B
if section_a.left == section_b.left:
i.left = section_a.left
i.left_eq = section_a.left_eq and section_b.left_eq
else:
i.left = SimpleVersion.max_version(section_a.left, section_b.left)
i.left_eq = section_a.left_eq if i.left == section_a.left else section_b.left_eq
if section_a.right == section_b.right:
i.right = section_a.right
i.right_eq = section_a.right_eq and section_b.right_eq
else:
i.right = SimpleVersion.min_version(section_a.right, section_b.right)
i.right_eq = section_a.right_eq if i.right == section_a.right else section_b.right_eq
# return true if any section from A intersects a section from B
valid = True
valid &= SimpleVersion.compare_versions(
version_a=i.left, op='<=' if i.left_eq else '<', version_b=i.right, num_parts=None)
valid &= SimpleVersion.compare_versions(
version_a=i.right, op='>=' if i.left_eq else '>', version_b=i.left, num_parts=None)
return valid
@six.add_metaclass(ABCMeta)
class RequirementSubstitution(object):
@@ -468,20 +553,9 @@ class RequirementsManager(object):
return None
def replace(self, requirements): # type: (Text) -> Text
def safe_parse(req_str):
# noinspection PyBroadException
try:
return list(parse(req_str, cwd=self._cwd))
except Exception as ex:
return [Requirement(req_str)]
parsed_requirements = self.parse_requirements_section_to_marker_requirements(
requirements=requirements, cwd=self._cwd)
parsed_requirements = tuple(
map(
MarkerRequirement,
[r for line in (requirements.splitlines() if isinstance(requirements, six.text_type) else requirements)
for r in safe_parse(line)]
)
)
if not parsed_requirements:
# return the original requirements just in case
return requirements
@@ -614,3 +688,24 @@ class RequirementsManager(object):
return (normalize_cuda_version(cuda_version or 0),
normalize_cuda_version(cudnn_version or 0))
@staticmethod
def parse_requirements_section_to_marker_requirements(requirements, cwd=None):
def safe_parse(req_str):
# noinspection PyBroadException
try:
return list(parse(req_str, cwd=cwd))
except Exception as ex:
return [Requirement(req_str)]
if not requirements:
return tuple()
parsed_requirements = tuple(
map(
MarkerRequirement,
[r for line in (requirements.splitlines() if isinstance(requirements, str) else requirements)
for r in safe_parse(line)]
)
)
return parsed_requirements

View File

@@ -482,7 +482,7 @@ class VCS(object):
parsed_url = furl(url)
except ValueError:
return url
if parsed_url.scheme in ["", "ssh"] or parsed_url.scheme.startswith("git"):
if parsed_url.scheme in ["", "ssh"] or (parsed_url.scheme or '').startswith("git"):
return parsed_url.url
config_user = ENV_AGENT_GIT_USER.get() or config.get("agent.{}_user".format(cls.executable_name), None)
config_pass = ENV_AGENT_GIT_PASS.get() or config.get("agent.{}_pass".format(cls.executable_name), None)

View File

@@ -104,7 +104,7 @@ DAEMON_ARGS = dict({
},
'--dynamic-gpus': {
'help': 'Allow to dynamically allocate gpus based on queue properties, '
'configure with \'--queues <queue_name>=<num_gpus>\'.'
'configure with \'--queue <queue_name>=<num_gpus>\'.'
' Example: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 single_gpu=1\''
' Example Opportunistic: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 max_quad_gpus=1-4 \'',
'action': 'store_true',

View File

@@ -229,26 +229,35 @@ class Session(_Session):
except:
pass
def print_configuration(self, remove_secret_keys=("secret", "pass", "token", "account_key")):
def print_configuration(
self,
remove_secret_keys=("secret", "pass", "token", "account_key", "contents"),
skip_value_keys=("environment", )
):
# remove all the secrets from the print
def recursive_remove_secrets(dictionary, secret_keys=()):
def recursive_remove_secrets(dictionary, secret_keys=(), empty_keys=()):
for k in list(dictionary):
for s in secret_keys:
if s in k:
dictionary.pop(k)
break
for s in empty_keys:
if s == k:
dictionary[k] = {key: '****' for key in dictionary[k]} \
if isinstance(dictionary[k], dict) else '****'
break
if isinstance(dictionary.get(k, None), dict):
recursive_remove_secrets(dictionary[k], secret_keys=secret_keys)
recursive_remove_secrets(dictionary[k], secret_keys=secret_keys, empty_keys=empty_keys)
elif isinstance(dictionary.get(k, None), (list, tuple)):
for item in dictionary[k]:
if isinstance(item, dict):
recursive_remove_secrets(item, secret_keys=secret_keys)
recursive_remove_secrets(item, secret_keys=secret_keys, empty_keys=empty_keys)
config = deepcopy(self.config.to_dict())
# remove the env variable, it's not important
config.pop('env', None)
if remove_secret_keys:
recursive_remove_secrets(config, secret_keys=remove_secret_keys)
if remove_secret_keys or skip_value_keys:
recursive_remove_secrets(config, secret_keys=remove_secret_keys, empty_keys=skip_value_keys)
# remove logging.loggers.urllib3.level from the print
try:
config['logging']['loggers']['urllib3'].pop('level', None)

View File

@@ -1 +1 @@
__version__ = '1.1.0'
__version__ = '1.2.0rc0'

View File

@@ -171,7 +171,7 @@ agent {
default_docker: {
# default docker image to use when running in docker mode
image: "nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04"
image: "nvidia/cuda:10.2-cudnn7-runtime-ubuntu18.04"
# optional arguments to pass to docker image
# arguments: ["--ipc=host", ]

View File

@@ -155,10 +155,57 @@ agent {
default_docker: {
# default docker image to use when running in docker mode
image: "nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04"
image: "nvidia/cuda:10.2-cudnn7-runtime-ubuntu18.04"
# optional arguments to pass to docker image
# arguments: ["--ipc=host"]
# lookup table rules for default container
# first matched rule will be picked, according to rule order
# enterprise version only
# match_rules: [
# {
# image: "nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04"
# arguments: "-e define=value"
# match: {
# script{
# # Optional: must match all requirements (not partial)
# requirements: {
# # version selection matching PEP-440
# pip: {
# tensorflow: "~=2.6"
# },
# }
# # Optional: matching based on regular expression, example: "^exact_match$"
# repository: "/my_repository/"
# branch: "main"
# binary: "python3.6"
# }
# # Optional: matching based on regular expression, example: "^exact_match$"
# project: "project/sub_project"
# }
# },
# {
# image: "nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04"
# arguments: "-e define=value"
# match: {
# # must match all requirements (not partial)
# script{
# requirements: {
# conda: {
# torch: ">=2.6,<2.8"
# }
# }
# # no repository matching required
# repository: ""
# }
# # no container image matching required (allow to replace one requested container with another)
# container: ""
# # no repository matching required
# project: ""
# }
# },
# ]
}
# set the OS environments based on the Task's Environment section before launching the Task process.
@@ -195,9 +242,9 @@ agent {
# }
# Name docker containers created by the daemon using the following string format (supported from Docker 0.6.5)
# Allowed variables are task_id, worker_id and rand_string (random lower-case letters string, up to 32 charaters)
# Note: resulting name must start with an alpha-numeric character and
# continue with a alpha-numeric characters, underscores (_), dots (.) and/or dashes (-)
# Allowed variables are task_id, worker_id and rand_string (random lower-case letters string, up to 32 characters)
# Note: resulting name must start with an alphanumeric character and
# continue with alphanumeric characters, underscores (_), dots (.) and/or dashes (-)
# docker_container_name_format: "clearml-id-{task_id}-{rand_string:.8}"
}
@@ -285,6 +332,7 @@ sdk {
# secret: "12345678"
# multipart: false
# secure: false
# verify: /path/to/ca/bundle.crt OR false to not verify
# }
]
}
@@ -359,5 +407,45 @@ sdk {
log_stdout: True
}
}
# Apply top-level environment section from configuration into os.environ
apply_environment: true
# Top-level environment section is in the form of:
# environment {
# key: value
# ...
# }
# and is applied to the OS environment as `key=value` for each key/value pair
# Apply top-level files section from configuration into local file system
apply_files: true
# Top-level files section allows auto-generating files at designated paths with a predefined contents
# and target format. Options include:
# contents: the target file's content, typically a string (or any base type int/float/list/dict etc.)
# format: a custom format for the contents. Currently supported value is `base64` to automatically decode a
# base64-encoded contents string, otherwise ignored
# path: the target file's path, may include ~ and inplace env vars
# target_format: format used to encode contents before writing into the target file. Supported values are json,
# yaml, yml and bytes (in which case the file will be written in binary mode). Default is text mode.
# overwrite: overwrite the target file in case it exists. Default is true.
#
# Example:
# files {
# myfile1 {
# contents: "The quick brown fox jumped over the lazy dog"
# path: "/tmp/fox.txt"
# }
# myjsonfile {
# contents: {
# some {
# nested {
# value: [1, 2, 3, 4]
# }
# }
# }
# path: "/tmp/test.json"
# target_format: json
# }
# }
}