Compare commits

...

70 Commits

Author SHA1 Message Date
allegroai
4c056a17b9 Add support for k8s jobs execution
Strip docker container obtained from task in k8s apply
2023-07-04 14:45:00 +03:00
allegroai
21d98afca5 Add support for extra docker arguments referencing machines environment variables using the agent.docker_allow_host_environ configuration option to allow users to also be able to use $ENV in the task's docker arguments 2023-07-04 14:42:28 +03:00
allegroai
6a1bf11549 Fix Task docker arguments passed twice 2023-07-04 14:41:07 +03:00
allegroai
7115a9b9a7 Add CLEARML_EXTRA_PIP_INSTALL_FLAGS / agent.package_manager.extra_pip_install_flags to control additional pip install flags
Fix pip version marking in "installed packages" is now preserved for and reinstalled
2023-07-04 14:39:40 +03:00
allegroai
450df2f8d3 Support skipping agent pip upgrade in container bash script using the CLEARML_AGENT_NO_UPDATE env var 2023-07-04 14:38:50 +03:00
allegroai
ccf752c4e4 Add support for setting mode on files applied by the agent 2023-07-04 14:37:58 +03:00
allegroai
3ed63e2154 Fix docker container backwards compatibility for API <2.13
Fix default docker match rules resolver (used incorrect field "container" instead of "image")
Remove "container" (image) match rule option from default docker image resolver
2023-07-04 14:37:18 +03:00
allegroai
a535f93cd6 Add support for CLEARML_AGENT_FORCE_MAX_API_VERSION for testing 2023-07-04 14:35:54 +03:00
allegroai
b380ec54c6 Improve config file comments 2023-07-04 14:34:43 +03:00
allegroai
a1274299ce Add support for CLEARML_AGENT_EXTRA_DOCKER_LABELS env var 2023-07-03 11:08:59 +03:00
allegroai
c77224af68 Add support for task field injection into container docker name 2023-07-03 11:07:12 +03:00
allegroai
95dadca45c Refactor k8s glue running/used pods getter 2023-05-21 22:56:12 +03:00
allegroai
685918fd9b Version bump to v1.5.3rc3 2023-05-21 22:54:38 +03:00
allegroai
bc85ddf78d Fix pytorch direct resolve replacing wheel link with directly installed version 2023-05-21 22:53:51 +03:00
allegroai
5b5fb0b8a6 Add agent.package_manager.pytorch_resolve configuration setting with pip or direct values. pip sets extra index based on cuda and lets pip resolve, direct is the previous parsing algorithm that does the matching and downloading (default pip) 2023-05-21 22:53:11 +03:00
allegroai
fec0ce1756 Better message for agent init when an existing clearml.conf is found 2023-05-21 22:51:11 +03:00
allegroai
1e09b88b7a Add alias CLEARML_AGENT_DOCKER_AGENT_REPO env var for the FORCE_CLEARML_AGENT_REPO env var 2023-05-21 22:50:01 +03:00
allegroai
b6ca0fa6a5 Print error on resource monitor failure 2023-05-11 16:18:11 +03:00
allegroai
307ec9213e Fix git+ssh:// links inside installed packages not being converted properly to HTTPS authenticated and vice versa 2023-05-11 16:16:51 +03:00
allegroai
a78a25d966 Support new Retry.DEFAULT_BACKOFF_MAX in a backwards-compatible way 2023-05-11 16:16:18 +03:00
allegroai
ebb6231f5a Add CLEARML_AGENT_STANDALONE_CONFIG_BC to support backwards compatibility in standalone mode 2023-05-11 16:15:06 +03:00
pollfly
e1d65cb280 Update clearml-agent gif (#137) 2023-04-10 10:58:10 +03:00
allegroai
3fe92a92ba Version bump to v1.5.2 2023-03-29 12:49:33 +03:00
allegroai
154db59ce6 Add agent.package_manager.poetry_install_extra_args configuration option 2023-03-28 14:37:48 +03:00
allegroai
afffa83063 Fix git+ssh:// links inside installed packages not being converted properly to https authenticated links 2023-03-28 14:35:51 +03:00
allegroai
787c7d88bb Fix additional poetry cwd support feature 2023-03-28 14:35:41 +03:00
allegroai
667c2ced3d Fix very old pip version support (<20) 2023-03-28 14:34:19 +03:00
allegroai
7f5b3c8df4 Fix None config file in session causes k8s agent to raise exception 2023-03-28 14:33:55 +03:00
allegroai
46ded2864d Fix restart feature should be tested against agent session 2023-03-28 14:33:33 +03:00
allegroai
40456be948 Black formatting
Refactor path support
2023-03-05 18:05:00 +02:00
allegroai
8d51aed679 Protect against cache folders without permission 2023-03-05 18:05:00 +02:00
allegroai
bfc4ba38cd Fix torch inside nvidia containers to use preinstalled version (i.e. ==x.y.z.* matching) 2023-03-05 18:05:00 +02:00
Niels ten Boom
3cedc104df Add poetry cwd support (#142)
Closes #138
2023-03-05 14:19:57 +02:00
Marijan Smetko
b367c80477 Switch entrypoint shell from sh to bash (#141) 2023-02-28 21:55:16 +02:00
allegroai
262b6d3a00 Update services agent entrypoint 2023-02-05 10:40:02 +02:00
allegroai
95e996bfda Reintroduce CLEARML_AGENT_SERVICES_DOCKER_RESTART accidentally reverted by a previous merge 2023-02-05 10:34:38 +02:00
allegroai
b6d132b226 Fix build fails when target is relative path 2023-02-05 10:33:32 +02:00
allegroai
4f17a2c17d Fix K8s glue does not delete pending pods if the tasks they represent were aborted 2023-02-05 10:32:16 +02:00
allegroai
00e8e9eb5a Do not allow request exceptions (only on the initial login call) 2023-02-05 10:30:45 +02:00
allegroai
af6a77918f Fix _ is allowed in k8s label names 2023-02-05 10:29:48 +02:00
allegroai
855622fd30 Support custom service on Worker.get() calls 2023-02-05 10:29:09 +02:00
allegroai
8cd12810f3 Fix login uses GET with payload which breaks when trying to connect a server running in GCP 2023-02-05 10:28:41 +02:00
achaiah
ebb955187d Fix agent update version (#132)
* Fix agent update version

Pip install command is missing the '=='  to execute successfully

* allow for fuzzy and direct version spec

adding logic to allow for flexible version specification

* Added regex to parse 1.2.3rc4 patterns
2023-01-08 19:10:26 +02:00
pollfly
85e1fadf9b Fix typos (#131) 2022-12-28 19:39:59 +02:00
allegroai
249b51a31b Version bump 2022-12-13 15:29:10 +02:00
allegroai
da19ef26c4 Fix pinging running task (and change default to once a minute) 2022-12-13 15:26:26 +02:00
allegroai
f69e16ea9d Fix clearml-agent build --docker stuck on certain containers 2022-12-13 15:24:32 +02:00
allegroai
efa1f71dac Version bump to v1.5.1 2022-12-10 22:18:21 +02:00
allegroai
692cb8cf13 Update six requirements 2022-12-10 22:18:10 +02:00
allegroai
ebdc215632 Remove " from pip commands in venv 2022-12-10 20:58:30 +02:00
allegroai
b2da639582 Add CLEARML_AGENT_FORCE_SYSTEM_SITE_PACKAGES env var (default true) to allow overriding default "system_site_packages: true" behavior when running tasks in containers (docker mode and k8s-glue) 2022-12-10 20:00:46 +02:00
allegroai
71fdb43f10 Version bump to v1.5.1rc0 2022-12-07 22:09:40 +02:00
allegroai
ca2791c65e Fix pip support allowing multiple pip version constraints (by default, one for <PY3.10 and one for >=PY3.10) 2022-12-07 22:09:25 +02:00
allegroai
dd75cedaab Upgrade requirements for attrs, jsonschema, pyparsing and pyjwt 2022-12-07 22:08:15 +02:00
allegroai
669fb1a6e5 Fix using deprecated types validator argument raises an error (deprecated even before jsonschema 3.0.0 and unsupported since 4.0.0) 2022-12-07 22:07:53 +02:00
allegroai
5d517c91b5 Add agent.disable_task_docker_override configuration option to disable docker override specified in executing tasks 2022-12-07 22:07:11 +02:00
allegroai
6be75abc86 Add default output URI selection to "clearml-agent init" 2022-12-07 22:06:10 +02:00
allegroai
4c777fa2ee Version bump to v1.5.0 2022-12-05 16:42:44 +02:00
allegroai
dc5e0033c8 Remove support for kubectl run
Allow customizing pod name prefix and limit pod label
Return deleted pods from cleanup
Some refactoring
2022-12-05 11:40:19 +02:00
allegroai
3dd5973734 Filter by phase when detecting hanging pods
More debug print-outs
Use task session when possible
Push task into k8s scheduler queue only if running from the same tenant
Make sure we pass git_user/pass to the task pod
Fix cleanup command not issued when no pods exist in a multi-queue setup
2022-12-05 11:29:59 +02:00
allegroai
53d379205f Support raise_error in get_bash_output() 2022-12-05 11:26:40 +02:00
allegroai
57cde21c48 Send task.ping for executing tasks every 120 seconds (set using the agent.task_ping_interval_sec configuration option) 2022-12-05 11:22:25 +02:00
allegroai
396abf13b6 Fix get_task_session() may cause an old copy of the APIClient to be used containing a reference to the previous session 2022-12-05 11:20:32 +02:00
allegroai
6e7fb5f331 Fix sending task logs fails when agent is not running in the same tenant 2022-12-05 11:19:14 +02:00
allegroai
1d5c118b70 Fix setting CLEARML_API_DEFAULT_REQ_METHOD raises an error 2022-12-05 11:18:12 +02:00
allegroai
18612aac4d Improve configuration examples 2022-12-05 11:17:27 +02:00
allegroai
76c533a2e8 Fix access to config object 2022-11-11 13:34:17 +02:00
Niels ten Boom
9eee213683 Add option to crash agent on exception using agent.crash_on_exception configuration setting (#123) 2022-11-06 17:15:39 +02:00
allegroai
e4861fc0fb Add missing settings in clearml.conf 2022-11-06 12:36:01 +02:00
allegroai
53ef984065 Update README 2022-11-06 11:53:16 +02:00
39 changed files with 1923 additions and 684 deletions

3
.gitignore vendored
View File

@@ -11,3 +11,6 @@ build/
dist/
*.egg-info
# VSCode
.vscode

View File

@@ -24,7 +24,7 @@ ML-Ops scheduler & orchestration solution supporting Linux, macOS and Windows**
* Launch-and-Forget service containers
* [Cloud autoscaling](https://clear.ml/docs/latest/docs/guides/services/aws_autoscaler)
* [Customizable cleanup](https://clear.ml/docs/latest/docs/guides/services/cleanup_service)
*
Advanced [pipeline building and execution](https://clear.ml/docs/latest/docs/guides/frameworks/pytorch/notebooks/table/tabular_training_pipeline)
It is a zero configuration fire-and-forget execution agent, providing a full ML/DL cluster solution.
@@ -33,12 +33,12 @@ It is a zero configuration fire-and-forget execution agent, providing a full ML/
1. ClearML Server [self-hosted](https://github.com/allegroai/clearml-server)
or [free tier hosting](https://app.clear.ml)
2. `pip install clearml-agent` ([install](#installing-the-clearml-agent) the ClearML Agent on any CPU/GPU machine:
2. `pip install clearml-agent` ([install](#installing-the-clearml-agent) the ClearML Agent on any GPU machine:
on-premises / cloud / ...)
3. Create a [job](https://github.com/allegroai/clearml/docs/clearml-task.md) or
Add [ClearML](https://github.com/allegroai/clearml) to your code with just 2 lines
4. Change the [parameters](#using-the-clearml-agent) in the UI & schedule for [execution](#using-the-clearml-agent) (or
automate with a [pipelines](#automl-and-orchestration-pipelines-))
automate with an [AutoML pipeline](#automl-and-orchestration-pipelines-))
5. :chart_with_downwards_trend: :chart_with_upwards_trend: :eyes: :beer:
"All the Deep/Machine-Learning DevOps your research needs, and then some... Because ain't nobody got time for that"
@@ -313,31 +313,28 @@ clearml-agent daemon --services-mode --detached --queue services --create-queue
**Note**: It is the user's responsibility to make sure the proper tasks are pushed into the specified queue.
### Orchestration and Pipelines <a name="automl-pipes"></a>
### AutoML and Orchestration Pipelines <a name="automl-pipes"></a>
The ClearML Agent can also be used to orchestrate and automate Pipelines in conjunction with the
The ClearML Agent can also be used to implement AutoML orchestration and Experiment Pipelines in conjunction with the
ClearML package.
Sample automation examples can be found in the
ClearML [pipelines](https://github.com/allegroai/clearml/tree/master/examples/pipeline) / [automation](https://github.com/allegroai/clearml/tree/master/examples/automation) folder.
Sample AutoML & Orchestration examples can be found in the
ClearML [example/automation](https://github.com/allegroai/clearml/tree/master/examples/automation) folder.
HPO examples
AutoML examples
- [Toy Keras training experiment](https://github.com/allegroai/clearml/blob/master/examples/optimization/hyper-parameter-optimization/base_template_keras_simple.py)
- In order to create an experiment-template in the system, this code must be executed once manually
- [Manual Search over the above Keras experiment-template](https://github.com/allegroai/clearml/blob/master/examples/automation/manual_random_param_search_example.py)
- [Random Search over the above Keras experiment-template](https://github.com/allegroai/clearml/blob/master/examples/automation/manual_random_param_search_example.py)
- This example will create multiple copies of the Keras experiment-template, with different hyper-parameter
combinations
- [Optimized Bayesian search over the above Keras experiment-template](https://github.com/allegroai/clearml/blob/master/examples/optimization/hyper-parameter-optimization/hyper_parameter_optimizer.py)
- This example will create multiple copies of the Keras experiment-template, with different hyper-parameter combinations launch them on remote machines, monitor the metric (i.e. loss) decide which one has the best potential and abort the others
Experiment Pipeline examples
- [Build DAG from Tasks](https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_tasks.py)
- This example will build a DAG processing flow from existing Tasks and launch them on remote machines
- [Logic Driven Pipeline](https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_decorator.py)
- This example will run any component function as a standalone Task on a remote machine, it will auto-parallelize jobs, cache results and automatically serialize data between remote machines.
- [First step experiment](https://github.com/allegroai/clearml/blob/master/examples/automation/task_piping_example.py)
- This example will "process data", and once done, will launch a copy of the 'second step' experiment-template
- [Second step experiment](https://github.com/allegroai/clearml/blob/master/examples/automation/toy_base_task.py)
- In order to create an experiment-template in the system, this code must be executed once manually
### License

View File

@@ -18,6 +18,8 @@
# https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html
# git_user: ""
# git_pass: ""
# Limit credentials to a single domain, for example: github.com,
# all other domains will use public access (no user/pass). Default: always send user/pass for any VCS domain
# git_host: ""
# Force GIT protocol to use SSH regardless of the git url (Assumes GIT user/pass are blank)
@@ -64,18 +66,29 @@
type: pip,
# specify pip version to use (examples "<20.2", "==19.3.1", "", empty string will install the latest version)
pip_version: "<21",
pip_version: ["<20.2 ; python_version < '3.10'", "<22.3 ; python_version >= '3.10'"],
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
# poetry_version: "<2",
# poetry_install_extra_args: ["-v"]
# virtual environment inheres packages from system
# virtual environment inherits packages from system
system_site_packages: false,
# install with --upgrade
force_upgrade: false,
# additional artifact repositories to use when installing python packages
# extra_index_url: ["https://allegroai.jfrog.io/clearmlai/api/pypi/public/simple"]
# extra_index_url: ["https://allegroai.jfrog.io/clearml/api/pypi/public/simple"]
# control the pytorch wheel resolving algorithm, options are: "pip", "direct"
# "pip" (default): would automatically detect the cuda version, and supply pip with the correct
# extra-index-url, based on pytorch.org tables
# "direct": would resolve a direct link to the pytorch wheel by parsing the pytorch.org pip repository
# and matching the automatically detected cuda version with the required pytorch wheel.
# if the exact cuda version is not found for the required pytorch wheel, it will try
# a lower cuda version until a match is found
#
# pytorch_resolve: "pip"
# additional conda channels to use when installing with conda package manager
conda_channels: ["pytorch", "conda-forge", "defaults", ]
@@ -85,24 +98,32 @@
# force_repo_requirements_txt: false
# set the priority packages to be installed before the rest of the required packages
# Note: this only controls the installation order of existing requirement packages (and does not add additional packages)
# priority_packages: ["cython", "numpy", "setuptools", ]
# set the optional priority packages to be installed before the rest of the required packages,
# In case a package installation fails, the package will be ignored,
# and the virtual environment process will continue
# Note: this only controls the installation order of existing requirement packages (and does not add additional packages)
priority_optional_packages: ["pygobject", ]
# set the post packages to be installed after all the rest of the required packages
# Note: this only controls the installation order of existing requirement packages (and does not add additional packages)
# post_packages: ["horovod", ]
# set the optional post packages to be installed after all the rest of the required packages,
# In case a package installation fails, the package will be ignored,
# and the virtual environment process will continue
# Note: this only controls the installation order of existing requirement packages (and does not add additional packages)
# post_optional_packages: []
# set to True to support torch nightly build installation,
# notice: torch nightly builds are ephemeral and are deleted from time to time
torch_nightly: false,
# if set to true, the agent will look for the "poetry.lock" file
# in the passed current working directory instead of the repository's root directory.
poetry_files_from_repo_working_dir: false
},
# target folder for virtual environments builds, created when executing experiment
@@ -215,8 +236,8 @@
# default is True, report a single \r line in a sequence of consecutive lines, per 5 seconds.
# suppress_carriage_return: true
# cuda versions used for solving pytorch wheel packages
# should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
# CUDA versions used for Conda setup & solving PyTorch wheel packages
# Should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
# cuda_version: 10.1
# cudnn_version: 7.6
@@ -252,9 +273,14 @@
# Name docker containers created by the daemon using the following string format (supported from Docker 0.6.5)
# Allowed variables are task_id, worker_id and rand_string (random lower-case letters string, up to 32 characters)
# Note: resulting name must start with an alphanumeric character and continue with alphanumeric characters,
# underscores (_), dots (.) and/or dashes (-)
#docker_container_name_format: "clearml-id-{task_id}-{rand_string:.8}"
# Custom variables may be specified using the docker_container_name_format_fields option.
# Note: resulting name must start with an alphanumeric character and
# continue with alphanumeric characters, underscores (_), dots (.) and/or dashes (-)
# docker_container_name_format: "clearml-id-{task_id}-{rand_string:.8}"
# Specify custom variables for the docker_container_name_format option using a mapping of variable name
# to a (nested) task field (using "." as a task field separator, digits specify array index)
# docker_container_name_format_fields: { foo: "bar.moo" }
# Apply top-level environment section from configuration into os.environ
apply_environment: true
@@ -325,4 +351,50 @@
# into the file specified in CLEARML_CUSTOM_BUILD_OUTPUT, the agent will emit a warning and continue with the
# standard flow.
custom_build_script: ""
# Crash on exception: by default when encountering an exception while running a task,
# the agent will catch the exception, log it and continue running.
# Set this to `true` to propagate exceptions and crash the agent.
# crash_on_exception: true
# Disable task docker override. If true, the agent will use the default docker image and ignore any docker image
# and arguments specified in the task's container section (setup shell script from the task container section will
# be used in any case, if specified).
disable_task_docker_override: false
# Choose the default docker based on the Task properties,
# Examples: 'script.requirements', 'script.binary', 'script.repository', 'script.branch', 'project'
# Notice: Matching is done via regular expression, for example "^searchme$" will match exactly "searchme$" string
#
# "default_docker": {
# "image": "nvidia/cuda:10.2-cudnn7-runtime-ubuntu18.04",
# # optional arguments to pass to docker image
# # arguments: ["--ipc=host", ]
# "match_rules": [
# {
# "image": "sample_container:tag",
# "arguments": "-e VALUE=1 --ipc=host",
# "match": {
# "script": {
# "requirements": {
# "pip": {
# "tensorflow": "~=1.6"
# }
# },
# "repository": "",
# "branch": "master"
# },
# "project": "example"
# }
# },
# {
# "image": "another_container:tag",
# "arguments": "",
# "match": {
# "project": "^examples", # anything that starts with "examples", e.g. "examples", "examples/sub_project"
# }
# }
# ]
# },
#
}

View File

@@ -66,11 +66,16 @@ class DataModel(object):
}
def validate(self, schema=None):
jsonschema.validate(
self.to_dict(),
schema or self._schema,
types=dict(array=(list, tuple), integer=six.integer_types),
schema = schema or self._schema
validator = jsonschema.validators.validator_for(schema)
validator_cls = jsonschema.validators.extend(
validator=validator,
type_checker=validator.TYPE_CHECKER.redefine_many({
"array": lambda s, instance: isinstance(instance, (list, tuple)),
"integer": lambda s, instance: isinstance(instance, six.integer_types),
}),
)
jsonschema.validate(self.to_dict(), schema, cls=validator_cls)
def __repr__(self):
return '<{}.{}: {}>'.format(

View File

@@ -20,6 +20,7 @@ ENV_PROPAGATE_EXITCODE = EnvEntry("CLEARML_AGENT_PROPAGATE_EXITCODE", type=bool,
ENV_INITIAL_CONNECT_RETRY_OVERRIDE = EnvEntry(
'CLEARML_AGENT_INITIAL_CONNECT_RETRY_OVERRIDE', default=True, converter=safe_text_to_bool
)
ENV_FORCE_MAX_API_VERSION = EnvEntry("CLEARML_AGENT_FORCE_MAX_API_VERSION", type=str)
"""
Experimental option to set the request method for all API requests and auth login.

View File

@@ -2,22 +2,25 @@
import json as json_lib
import os
import sys
import time
import types
from random import SystemRandom
from socket import gethostname
from typing import Optional
import jwt
import requests
import six
from requests import RequestException
from requests.auth import HTTPBasicAuth
from six.moves.urllib.parse import urlparse, urlunparse
from clearml_agent.external.pyhocon import ConfigTree, ConfigFactory
from .callresult import CallResult
from .defs import (
ENV_VERBOSE, ENV_HOST, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_WEB_HOST, ENV_FILES_HOST, ENV_AUTH_TOKEN,
ENV_NO_DEFAULT_SERVER, ENV_DISABLE_VAULT_SUPPORT, ENV_INITIAL_CONNECT_RETRY_OVERRIDE, ENV_API_DEFAULT_REQ_METHOD, )
ENV_NO_DEFAULT_SERVER, ENV_DISABLE_VAULT_SUPPORT, ENV_INITIAL_CONNECT_RETRY_OVERRIDE, ENV_API_DEFAULT_REQ_METHOD,
ENV_FORCE_MAX_API_VERSION)
from .request import Request, BatchRequest
from .token_manager import TokenManager
from ..config import load
@@ -25,6 +28,8 @@ from ..utils import get_http_session_with_retry, urllib_log_warning_setup
from ...backend_config.environment import backward_compatibility_support
from ...version import __version__
sys_random = SystemRandom()
class LoginError(Exception):
pass
@@ -49,6 +54,7 @@ class Session(TokenManager):
_session_initial_retry_connect_override = 4
_write_session_data_size = 15000
_write_session_timeout = (30.0, 30.)
_request_exception_retry_timeout = (2.0, 3.0)
api_version = '2.1'
feature_set = 'basic'
@@ -57,6 +63,7 @@ class Session(TokenManager):
default_files = "https://demofiles.demo.clear.ml"
default_key = "EGRTCO8JMSIGI6S39GTP43NFWXDQOW"
default_secret = "x!XTov_G-#vspE*Y(h$Anm&DIc5Ou-F)jsl$PdOyj5wG1&E!Z8"
force_max_api_version = ENV_FORCE_MAX_API_VERSION.get()
# TODO: add requests.codes.gateway_timeout once we support async commits
_retry_codes = [
@@ -111,19 +118,9 @@ class Session(TokenManager):
self._verbose = verbose if verbose is not None else ENV_VERBOSE.get()
self._logger = logger
self.__auth_token = None
self._propagate_exceptions_on_send = True
if ENV_API_DEFAULT_REQ_METHOD.get(default=None):
# Make sure we update the config object, so we pass it into the new containers when we map them
self.config["api.http.default_method"] = ENV_API_DEFAULT_REQ_METHOD.get()
# notice the default setting of Request.def_method are already set by the OS environment
elif self.config.get("api.http.default_method", None):
def_method = str(self.config.get("api.http.default_method", None)).strip()
if def_method.upper() not in ("GET", "POST", "PUT"):
raise ValueError(
"api.http.default_method variable must be 'get' or 'post' (any case is allowed)."
)
Request.def_method = def_method
Request._method = Request.def_method
self.update_default_api_method()
if ENV_AUTH_TOKEN.get(
value_cb=lambda key, value: print("Using environment access token {}=********".format(key))
@@ -178,6 +175,10 @@ class Session(TokenManager):
)
# try to connect with the server
self.refresh_token()
# for resilience, from now on we won't allow propagating exceptions when sending requests
self._propagate_exceptions_on_send = False
# create the default session with many retries
http_retries_config, self.__http_session = self._setup_session(http_retries_config)
@@ -198,6 +199,10 @@ class Session(TokenManager):
# notice: this is across the board warning omission
urllib_log_warning_setup(total_retries=http_retries_config.get('total', 0), display_warning_after=3)
if self.force_max_api_version and self.check_min_api_version(self.force_max_api_version):
print("Using forced API version {}".format(self.force_max_api_version))
Session.max_api_version = Session.api_version = str(self.force_max_api_version)
def _setup_session(self, http_retries_config, initial_session=False, default_initial_connect_override=None):
# type: (dict, bool, Optional[bool]) -> (dict, requests.Session)
http_retries_config = http_retries_config or self.config.get(
@@ -223,7 +228,22 @@ class Session(TokenManager):
return http_retries_config, get_http_session_with_retry(config=self.config or None, **http_retries_config)
def update_default_api_method(self):
if ENV_API_DEFAULT_REQ_METHOD.get(default=None):
# Make sure we update the config object, so we pass it into the new containers when we map them
self.config.put("api.http.default_method", ENV_API_DEFAULT_REQ_METHOD.get())
# notice the default setting of Request.def_method are already set by the OS environment
elif self.config.get("api.http.default_method", None):
def_method = str(self.config.get("api.http.default_method", None)).strip()
if def_method.upper() not in ("GET", "POST", "PUT"):
raise ValueError(
"api.http.default_method variable must be 'get', 'post' or 'put' (any case is allowed)."
)
Request.def_method = def_method
Request._method = Request.def_method
def load_vaults(self):
# () -> Optional[bool]
if not self.check_min_api_version("2.15") or self.feature_set == "basic":
return
@@ -244,12 +264,14 @@ class Session(TokenManager):
# noinspection PyBroadException
try:
res = self.send_request("users", "get_vaults", json={"enabled": True, "types": ["config"]})
# Use params and not data/json otherwise payload might be dropped if we're using GET with a strict firewall
res = self.send_request("users", "get_vaults", params="enabled=true&types=config&types=config")
if res.ok:
vaults = res.json().get("data", {}).get("vaults", [])
data = list(filter(None, map(parse, vaults)))
if data:
self.config.set_overrides(*data)
return True
elif res.status_code != 404:
raise Exception(res.json().get("meta", {}).get("result_msg", res.text))
except Exception as ex:
@@ -272,6 +294,7 @@ class Session(TokenManager):
data=None,
json=None,
refresh_token_if_unauthorized=True,
params=None,
):
""" Internal implementation for making a raw API request.
- Constructs the api endpoint name
@@ -295,6 +318,7 @@ class Session(TokenManager):
if version
else "{host}/{service}.{action}"
).format(**locals())
while True:
if data and len(data) > self._write_session_data_size:
timeout = self._write_session_timeout
@@ -302,16 +326,29 @@ class Session(TokenManager):
timeout = self._session_initial_timeout
else:
timeout = self._session_timeout
res = self.__http_session.request(
method, url, headers=headers, auth=auth, data=data, json=json, timeout=timeout)
try:
res = self.__http_session.request(
method, url, headers=headers, auth=auth, data=data, json=json, timeout=timeout, params=params)
except RequestException as ex:
if self._propagate_exceptions_on_send:
raise
sleep_time = sys_random.uniform(*self._request_exception_retry_timeout)
self._logger.error(
"{} exception sending {} {}: {} (retrying in {:.1f}sec)".format(
type(ex).__name__, method.upper(), url, str(ex), sleep_time
)
)
time.sleep(sleep_time)
continue
if (
refresh_token_if_unauthorized
and res.status_code == requests.codes.unauthorized
and not token_refreshed_on_error
):
# it seems we're unauthorized, so we'll try to refresh our token once in case permissions changed since
# the last time we got the token, and try again
# it seems we're unauthorized, so we'll try to refresh our token once in case permissions changed
# since the last time we got the token, and try again
self.refresh_token()
token_refreshed_on_error = True
# try again
@@ -348,6 +385,7 @@ class Session(TokenManager):
data=None,
json=None,
async_enable=False,
params=None,
):
"""
Send a raw API request.
@@ -360,6 +398,7 @@ class Session(TokenManager):
content type will be application/json)
:param data: Dictionary, bytes, or file-like object to send in the request body
:param async_enable: whether request is asynchronous
:param params: additional query parameters
:return: requests Response instance
"""
headers = self.add_auth_headers(
@@ -376,6 +415,7 @@ class Session(TokenManager):
headers=headers,
data=data,
json=json,
params=params,
)
def send_request_batch(
@@ -628,15 +668,14 @@ class Session(TokenManager):
res = None
try:
data = {"expiration_sec": exp} if exp else {}
res = self._send_request(
method=Request.def_method,
service="auth",
action="login",
auth=auth,
json=data,
headers=headers,
refresh_token_if_unauthorized=False,
params={"expiration_sec": exp} if exp else {},
)
try:
resp = res.json()
@@ -675,3 +714,13 @@ class Session(TokenManager):
return "{self.__class__.__name__}[{self.host}, {self.access_key}/{secret_key}]".format(
self=self, secret_key=self.secret_key[:5] + "*" * (len(self.secret_key) - 5)
)
@property
def propagate_exceptions_on_send(self):
# type: () -> bool
return self._propagate_exceptions_on_send
@propagate_exceptions_on_send.setter
def propagate_exceptions_on_send(self, value):
# type: (bool) -> None
self._propagate_exceptions_on_send = value

View File

@@ -86,7 +86,10 @@ def get_http_session_with_retry(
session = requests.Session()
if backoff_max is not None:
Retry.BACKOFF_MAX = backoff_max
if "BACKOFF_MAX" in vars(Retry):
Retry.BACKOFF_MAX = backoff_max
else:
Retry.DEFAULT_BACKOFF_MAX = backoff_max
retry = Retry(
total=total, connect=connect, read=read, redirect=redirect, status=status,

View File

@@ -294,6 +294,12 @@ class Config(object):
)
return value
def put(self, key, value):
self._config.put(key, value)
def pop(self, key, default=None):
return self._config.pop(key, default=default)
def to_dict(self):
return self._config.as_plain_ordered_dict()

View File

@@ -14,6 +14,14 @@ except ImportError:
ConverterType = TypeVar("ConverterType", bound=Callable[[Any], Any])
def text_to_int(value, default=0):
# type: (Any, int) -> int
try:
return int(value)
except (ValueError, TypeError):
return default
def base64_to_text(value):
# type: (Any) -> Text
return base64.b64decode(value).decode("utf-8")

View File

@@ -52,6 +52,7 @@ def apply_files(config):
target_fmt = data.get("target_format", "string")
overwrite = bool(data.get("overwrite", True))
contents = data.get("contents")
mode = data.get("mode")
target = Path(expanduser(expandvars(path)))
@@ -110,3 +111,14 @@ def apply_files(config):
except Exception as ex:
print("Skipped [{}]: failed saving file {} ({})".format(key, target, ex))
continue
try:
if mode:
if isinstance(mode, int):
mode = int(str(mode), 8)
else:
mode = int(mode, 8)
target.chmod(mode)
except Exception as ex:
print("Skipped [{}]: failed setting mode {} for {} ({})".format(key, mode, target, ex))
continue

View File

@@ -118,13 +118,15 @@ class ServiceCommandSection(BaseCommandSection):
""" The name of the REST service used by this command """
pass
def get(self, endpoint, *args, session=None, **kwargs):
def get(self, endpoint, *args, service=None, session=None, **kwargs):
session = session or self._session
return session.get(service=self.service, action=endpoint, *args, **kwargs)
service = service or self.service
return session.get(service=service, action=endpoint, *args, **kwargs)
def post(self, endpoint, *args, session=None, **kwargs):
def post(self, endpoint, *args, service=None, session=None, **kwargs):
session = session or self._session
return session.post(service=self.service, action=endpoint, *args, **kwargs)
service = service or self.service
return session.post(service=service, action=endpoint, *args, **kwargs)
def get_with_act_as(self, endpoint, *args, **kwargs):
return self._session.get_with_act_as(service=self.service, action=endpoint, *args, **kwargs)

View File

@@ -1,14 +1,15 @@
from __future__ import print_function
from six.moves import input
from typing import Dict, Optional
from pathlib2 import Path
from six.moves import input
from six.moves.urllib.parse import urlparse
from clearml_agent.external.pyhocon import ConfigFactory, ConfigMissingException
from clearml_agent.backend_api.session import Session
from clearml_agent.backend_api.session.defs import ENV_HOST
from clearml_agent.backend_config.defs import LOCAL_CONFIG_FILES
from clearml_agent.external.pyhocon import ConfigFactory, ConfigMissingException
description = """
Please create new clearml credentials through the settings page in your `clearml-server` web app,
@@ -43,7 +44,7 @@ def main():
if conf_file.exists() and conf_file.is_file() and conf_file.stat().st_size > 0:
print('Configuration file already exists: {}'.format(str(conf_file)))
print('Leaving setup, feel free to edit the configuration file.')
print('Leaving setup. If you\'ve previously initialized the ClearML SDK on this machine, manually add an \'agent\' section to this file.')
return
print(description, end='')
@@ -112,6 +113,21 @@ def main():
print('Exiting setup without creating configuration file')
return
selection = input_options(
'Default Output URI (used to automatically store models and artifacts)',
{'N': 'None', 'S': 'ClearML Server', 'C': 'Custom'},
default='None'
)
if selection == 'Custom':
print('Custom Default Output URI: ', end='')
default_output_uri = input().strip()
elif selection == "ClearML Server":
default_output_uri = files_host
else:
default_output_uri = None
print('\nDefault Output URI: {}'.format(default_output_uri if default_output_uri else 'not set'))
# get GIT User/Pass for cloning
print('Enter git username for repository cloning (leave blank for SSH key authentication): [] ', end='')
git_user = input()
@@ -179,6 +195,13 @@ def main():
'agent.package_manager.extra_index_url= ' \
'[\n{}\n]\n\n'.format("\n".join(map("\"{}\"".format, extra_index_urls)))
f.write(extra_index_str)
if default_output_uri:
default_output_url_str = '# Default Task output_uri. if output_uri is not provided to Task.init, ' \
'default_output_uri will be used instead.\n' \
'sdk.development.default_output_uri="{}"\n' \
'\n'.format(default_output_uri.strip('"'))
f.write(default_output_url_str)
default_conf = default_conf.replace('default_output_uri: ""', '# default_output_uri: ""')
f.write(default_conf)
except Exception:
print('Error! Could not write configuration file at: {}'.format(str(conf_file)))
@@ -305,6 +328,25 @@ def input_url(host_type, host=None):
return host
def input_options(message, options, default=None):
# type: (str, Dict[str, str], Optional[str]) -> str
options_msg = "/".join(
"".join(('(' + c.upper() + ')') if c == o else c for c in option)
for o, option in options.items()
)
if default:
options_msg += " [{}]".format(default)
while True:
print('{}: {} '.format(message, options_msg), end='')
res = input().strip()
if not res:
return default
elif res.lower() in options:
return options[res.lower()]
elif res.upper() in options:
return options[res.upper()]
def input_host_port(host_type, parsed_host):
print('Enter port for {} host '.format(host_type), end='')
replace_port = input().lower()

View File

@@ -109,15 +109,15 @@ def resolve_default_container(session, task_id, container_config):
match.get('script.binary', None), entry))
continue
if match.get('container', None):
# noinspection PyBroadException
try:
if not re.search(match.get('container', None), requested_container.get('image', '')):
continue
except Exception:
print('Failed parsing regular expression \"{}\" in rule: {}'.format(
match.get('container', None), entry))
continue
# if match.get('image', None):
# # noinspection PyBroadException
# try:
# if not re.search(match.get('image', None), requested_container.get('image', '')):
# continue
# except Exception:
# print('Failed parsing regular expression \"{}\" in rule: {}'.format(
# match.get('image', None), entry))
# continue
matched = True
for req_section in ['script.requirements.pip', 'script.requirements.conda']:
@@ -156,8 +156,8 @@ def resolve_default_container(session, task_id, container_config):
break
if matched:
if not container_config.get('container'):
container_config['container'] = entry.get('image', None)
if not container_config.get('image'):
container_config['image'] = entry.get('image', None)
if not container_config.get('arguments'):
container_config['arguments'] = entry.get('arguments', None)
container_config['arguments'] = shlex.split(str(container_config.get('arguments') or '').strip())

View File

@@ -1,6 +1,7 @@
from __future__ import print_function, division, unicode_literals
import errno
import functools
import json
import logging
import os
@@ -11,6 +12,7 @@ import shlex
import shutil
import signal
import string
import socket
import subprocess
import sys
import traceback
@@ -23,7 +25,7 @@ from functools import partial
from os.path import basename
from tempfile import mkdtemp, NamedTemporaryFile
from time import sleep, time
from typing import Text, Optional, Any, Tuple, List
from typing import Text, Optional, Any, Tuple, List, Dict, Mapping, Union
import attr
import six
@@ -39,8 +41,10 @@ from clearml_agent.backend_api.session import CallResult, Request
from clearml_agent.backend_api.session.defs import (
ENV_ENABLE_ENV_CONFIG_SECTION, ENV_ENABLE_FILES_CONFIG_SECTION,
ENV_VENV_CONFIGURED, ENV_PROPAGATE_EXITCODE, )
from clearml_agent.backend_config import Config
from clearml_agent.backend_config.defs import UptimeConf
from clearml_agent.backend_config.utils import apply_environment, apply_files
from clearml_agent.backend_config.converters import text_to_int
from clearml_agent.commands.base import resolve_names, ServiceCommandSection
from clearml_agent.commands.resolver import resolve_default_container
from clearml_agent.definitions import (
@@ -56,10 +60,7 @@ from clearml_agent.definitions import (
ENV_WORKER_ID,
ENV_WORKER_TAGS,
ENV_DOCKER_SKIP_GPUS_FLAG,
ENV_AGENT_SECRET_KEY,
ENV_AGENT_AUTH_TOKEN,
ENV_AWS_SECRET_KEY,
ENV_AZURE_ACCOUNT_KEY,
ENV_AGENT_DISABLE_SSH_MOUNT,
ENV_SSH_AUTH_SOCK,
ENV_AGENT_SKIP_PIP_VENV_INSTALL,
@@ -70,6 +71,11 @@ from clearml_agent.definitions import (
ENV_DEBUG_INFO,
ENV_CHILD_AGENTS_COUNT_CMD,
ENV_DOCKER_ARGS_FILTERS,
ENV_FORCE_SYSTEM_SITE_PACKAGES,
ENV_SERVICES_DOCKER_RESTART,
ENV_CONFIG_BC_IN_STANDALONE,
ENV_FORCE_DOCKER_AGENT_REPO,
ENV_EXTRA_DOCKER_LABELS,
)
from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
from clearml_agent.errors import (
@@ -315,6 +321,37 @@ def get_next_task(session, queue, get_task_info=False):
return data
def get_task_fields(session, task_id, fields: list, log=None) -> dict:
"""
Returns dict with Task docker container setup {container: '', arguments: '', setup_shell_script: ''}
"""
result = session.send_request(
service='tasks',
action='get_all',
json={'id': [task_id], 'only_fields': list(fields), 'search_hidden': True},
method=Request.def_method,
async_enable=False,
)
# noinspection PyBroadException
try:
results = {}
result = result.json()['data']['tasks'][0]
for field in fields:
cur = result
for part in field.split("."):
if part.isdigit():
cur = cur[part]
else:
cur = cur.get(part, {})
results[field] = cur
return results
except Exception as ex:
if log:
log.error("Failed obtaining values for task fields {}: {}", fields, ex)
pass
return {}
def get_task_container(session, task_id):
"""
Returns dict with Task docker container setup {container: '', arguments: '', setup_shell_script: ''}
@@ -332,20 +369,25 @@ def get_task_container(session, task_id):
container = result.json()['data']['tasks'][0]['container'] if result.ok else {}
if container.get('arguments'):
container['arguments'] = shlex.split(str(container.get('arguments')).strip())
if container.get('image'):
container['image'] = container.get('image').strip()
except (ValueError, TypeError):
container = {}
else:
response = get_task(session, task_id, only_fields=["execution.docker_cmd"])
task_docker_cmd_parts = shlex.split(str(response.execution.docker_cmd or '').strip())
try:
container = dict(
container=task_docker_cmd_parts[0],
arguments=task_docker_cmd_parts[1:] if len(task_docker_cmd_parts[0]) > 1 else ''
)
except (ValueError, TypeError):
container = {}
container = {}
if response.execution:
task_docker_cmd_parts = shlex.split(str(response.execution.docker_cmd or '').strip())
if task_docker_cmd_parts:
try:
container = dict(
image=task_docker_cmd_parts[0],
arguments=task_docker_cmd_parts[1:] if len(task_docker_cmd_parts[0]) > 1 else ''
)
except (ValueError, TypeError):
pass
if (not container or not container.get('container')) and session.check_min_api_version("2.13"):
if (not container or not container.get('image')) and session.check_min_api_version("2.13"):
container = resolve_default_container(session=session, task_id=task_id, container_config=container)
return container
@@ -596,6 +638,8 @@ class Worker(ServiceCommandSection):
_docker_fixed_user_cache = '/clearml_agent_cache'
_temp_cleanup_list = []
hostname_task_runtime_prop = "_exec_agent_hostname"
@property
def service(self):
""" Worker command service endpoint """
@@ -686,6 +730,10 @@ class Worker(ServiceCommandSection):
else:
self._docker_args_filters = []
self._task_ping_interval_sec = max(
0, text_to_int(self._session.config.get("agent.task_ping_interval_sec", 60.0))
)
@classmethod
def _verify_command_states(cls, kwargs):
"""
@@ -731,8 +779,68 @@ class Worker(ServiceCommandSection):
except Exception:
pass
def _get_docker_restart_value(self, task_session, task_id: str):
try:
self._session.verify_feature_set('advanced')
except ValueError:
return
restart = (ENV_SERVICES_DOCKER_RESTART.get() or "").strip()
if not restart:
return
# Parse value and selector
restart_value, _, selector = restart.partition(";")
if restart_value not in ("unless-stopped", "no", "always") and not restart_value.startswith("on-failure"):
self.log.error(
"Invalid value \"{}\" provided for {}, ignoring".format(restart, ENV_SERVICES_DOCKER_RESTART.vars[0])
)
return
if not selector:
return restart_value
path, _, expected_value = selector.partition("=")
result = task_session.send_request(
service='tasks',
action='get_all',
json={'id': [task_id], 'only_fields': [path], 'search_hidden': True},
method=Request.def_method,
)
if not result.ok:
result_msg = self._get_path(result.json(), 'meta', 'result_msg')
self.log.error(
"Failed obtaining selector value for restart option \"{}\", ignoring: {}".format(selector, result_msg)
)
return
not_found = object()
try:
value = self._get_path(result.json(), 'data', 'tasks', 0, *path.split("."), default=not_found)
except (ValueError, TypeError):
return
if value is not_found:
return
if not expected_value:
return restart_value
# noinspection PyBroadException
try:
if (
(isinstance(value, bool) and value == strtobool(expected_value)) # check first - bool is also an int
or (isinstance(value, (int, float)) and value == float(expected_value))
or (str(value) == str(expected_value))
):
return restart_value
except Exception as ex:
pass
def run_one_task(self, queue, task_id, worker_args, docker=None, task_session=None):
# type: (Text, Text, WorkerParams, Optional[Text]) -> int
# type: (Text, Text, WorkerParams, Optional[Text], Optional[Session]) -> Optional[int]
"""
Run one task pulled from queue.
:param queue: ID of queue that task was pulled from
@@ -747,6 +855,20 @@ class Worker(ServiceCommandSection):
# "Running task '{}'".format(task_id)
print(self._task_logging_start_message.format(task_id))
task_session = task_session or self._session
# noinspection PyBroadException
try:
res = task_session.send_request(
service='tasks', action='edit', method=Request.def_method,
json={
"task": task_id, "force": True, "runtime": {self.hostname_task_runtime_prop: socket.gethostname()}
},
)
if not res.ok:
raise Exception("failed setting runtime property")
except Exception as ex:
print("Warning: failed obtaining/setting hostname for task '{}': {}".format(task_id, ex))
# set task status to in_progress so we know it was popped from the queue
# noinspection PyBroadException
try:
@@ -777,10 +899,18 @@ class Worker(ServiceCommandSection):
except Exception:
task_container = {}
default_docker = not bool(task_container.get('image'))
docker_image = task_container.get('image') or self._docker_image
docker_arguments = task_container.get(
'arguments', self._docker_arguments if default_docker else None)
default_docker = (
self._session.config.get('agent.disable_task_docker_override', False)
or not bool(task_container.get('image'))
)
if default_docker:
docker_image = self._docker_image
docker_arguments = self._docker_arguments
else:
docker_image = task_container.get('image') or self._docker_image
docker_arguments = task_container.get(
'arguments', self._docker_arguments if default_docker else None)
docker_setup_script = task_container.get('setup_shell_script')
self.send_logs(
@@ -799,6 +929,7 @@ class Worker(ServiceCommandSection):
docker_image=docker_image,
docker_arguments=docker_arguments,
docker_bash_setup_script=docker_setup_script,
restart=self._get_docker_restart_value(task_session, task_id),
)
if self._impersonate_as_task_owner:
docker_params["auth_token"] = task_session.token
@@ -813,11 +944,21 @@ class Worker(ServiceCommandSection):
name_format = self._session.config.get('agent.docker_container_name_format', None)
if name_format:
custom_fields = {}
name_format_fields = self._session.config.get('agent.docker_container_name_format_fields', None)
if name_format_fields:
field_values = get_task_fields(task_session, task_id, name_format_fields.values(), log=self.log)
custom_fields = {
k: field_values.get(v)
for k, v in name_format_fields.items()
}
try:
name = name_format.format(
task_id=re.sub(r'[^a-zA-Z0-9._-]', '-', task_id),
worker_id=re.sub(r'[^a-zA-Z0-9._-]', '-', worker_id),
rand_string="".join(sys_random.choice(string.ascii_lowercase) for _ in range(32))
rand_string="".join(sys_random.choice(string.ascii_lowercase) for _ in range(32)),
**custom_fields,
)
except Exception as ex:
print("Warning: failed generating docker container name: {}".format(ex))
@@ -958,6 +1099,7 @@ class Worker(ServiceCommandSection):
if not (result.ok() and result.response):
return
new_session = copy(session)
new_session.api_client = None
new_session.set_auth_token(result.response.token)
return new_session
@@ -1166,7 +1308,7 @@ class Worker(ServiceCommandSection):
print("No tasks in Queues, sleeping for {:.1f} seconds".format(self._polling_interval))
sleep(self._polling_interval)
if self._session.config["agent.reload_config"]:
if self._session.config.get("agent.reload_config", False):
self.reload_config()
finally:
# if we are in dynamic gpus mode, shutdown all active runs
@@ -1197,7 +1339,7 @@ class Worker(ServiceCommandSection):
except Exception:
return None
worker_name = self._session.config["agent.worker_name"] + ':gpu'
worker_name = self._session.config.get("agent.worker_name", "") + ':gpu'
our_workers = [
w.id for w in response.workers
if w.id.startswith(worker_name) and w.id != self.worker_id]
@@ -1548,10 +1690,14 @@ class Worker(ServiceCommandSection):
gpu_indexes=gpu_indexes,
gpu_queues=dynamic_gpus,
)
except Exception:
except Exception as e:
tb = six.text_type(traceback.format_exc())
print("FATAL ERROR:")
print(tb)
if self._session.config.get("agent.crash_on_exception", False):
raise e
crash_file, name = safe_mkstemp(prefix=".clearml_agent-crash", suffix=".log")
try:
with crash_file:
@@ -1657,7 +1803,9 @@ class Worker(ServiceCommandSection):
# noinspection PyBroadException
try:
config_data = self._session.config.as_plain_ordered_dict() if config is None else config.as_plain_ordered_dict()
config_data = (
self._session.config.as_plain_ordered_dict() if config is None else config.as_plain_ordered_dict()
)
if clean_api_credentials:
api = config_data.get("api")
if api:
@@ -1730,6 +1878,7 @@ class Worker(ServiceCommandSection):
stopping = False
status = None
process = None
last_task_ping = 0
try:
_last_machine_update_ts = time()
stop_reason = None
@@ -1765,6 +1914,18 @@ class Worker(ServiceCommandSection):
if stderr:
stderr.flush()
if not stopping and self._task_ping_interval_sec and \
time() - last_task_ping > self._task_ping_interval_sec:
# noinspection PyBroadException
try:
res = (session or self._session).send(tasks_api.PingRequest(task=task_id))
if not res:
self.log.error("Failed sending ping for task %s: %s", task_id, res.response)
except Exception as ex:
self.log.error("Failed sending ping: %s", str(ex))
finally:
last_task_ping = time()
# get diff from previous poll
printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count)
if self._services_mode and not stopping and status is None:
@@ -1936,6 +2097,11 @@ class Worker(ServiceCommandSection):
except Exception as ex:
print("Error: failed applying files from configuration: {}".format(ex))
try:
self._session.update_default_api_method()
except Exception as ex:
print("Error: failed updating default API method: {}".format(ex))
@resolve_names
def build(
self,
@@ -1950,6 +2116,10 @@ class Worker(ServiceCommandSection):
):
if not task_id:
raise CommandFailedError("Worker build must have valid task id")
if target and not os.path.isabs(target):
# Non absolute target path will lead to errors with relative python executable
target = os.path.abspath(target)
self._session.print_configuration()
@@ -2055,7 +2225,10 @@ class Worker(ServiceCommandSection):
# noinspection PyBroadException
try:
task_container = get_task_container(self._session, task_id)
if task_container.get('image'):
if (
task_container.get('image')
and not self._session.config.get('agent.disable_task_docker_override', False)
):
docker_image = task_container.get('image')
print('Ignoring default docker image, using task docker image {}'.format(docker_image))
docker_arguments = task_container.get('arguments')
@@ -2066,12 +2239,14 @@ class Worker(ServiceCommandSection):
print('Building Task {} inside docker image: {} {} setup_script={}\n'.format(
task_id, docker_image, docker_arguments or '', docker_setup_script or ''))
full_docker_cmd = self.docker_image_func(
docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script)
docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script
)
end_of_build_marker = "build.done=true"
docker_cmd_suffix = ' build --id {task_id} --install-globally; ' \
'echo "" >> {conf_file} ; ' \
'echo {end_of_build_marker} >> {conf_file} ; ' \
'ORG=$(stat -c "%u:%g" {conf_file}) ; chown $(whoami):$(whoami) {conf_file} ; ' \
'echo "" >> {conf_file} ; echo {end_of_build_marker} >> {conf_file} ; ' \
'chown $ORG {conf_file} ; ' \
'bash'.format(
task_id=task_id,
end_of_build_marker=end_of_build_marker,
@@ -2090,10 +2265,16 @@ class Worker(ServiceCommandSection):
# now we need to wait until the line shows on our configuration file.
while True:
while temp_config.stat().st_mtime == base_time_stamp:
sleep(5.0)
with open(temp_config.as_posix()) as f:
lines = [l.strip() for l in f.readlines()]
# noinspection PyBroadException
try:
while temp_config.stat().st_mtime == base_time_stamp:
sleep(5.0)
with open(temp_config.as_posix()) as f:
lines = [l.strip() for l in f.readlines()]
except Exception as ex:
# print("Failed reading status file [{}], retrying in 2 seconds".format(ex))
sleep(2.0)
if 'build.done=true' in lines:
break
base_time_stamp = temp_config.stat().st_mtime
@@ -2122,6 +2303,8 @@ class Worker(ServiceCommandSection):
print(commit_docker(container_name=target, docker_id=docker_id, apply_change=change))
shutdown_docker_process(docker_id=docker_id)
safe_remove_file(temp_config.as_posix())
return
def _get_task_python_version(self, task):
@@ -2188,6 +2371,7 @@ class Worker(ServiceCommandSection):
raise CommandFailedError("Cloning failed")
else:
# make sure this task is not stuck in an execution queue, it shouldn't have been, but just in case.
# noinspection PyBroadException
try:
res = self._session.api_client.tasks.dequeue(task=current_task.id)
if require_queue and res.meta.result_code != 200:
@@ -2801,8 +2985,8 @@ class Worker(ServiceCommandSection):
# Todo: add support for poetry caching
if not self.poetry.enabled:
# add to cache
print('Adding venv into cache: {}'.format(add_venv_folder_cache))
if add_venv_folder_cache:
print('Adding venv into cache: {}'.format(add_venv_folder_cache))
self.package_api.add_cached_venv(
requirements=[freeze, previous_reqs],
docker_cmd=execution_info.docker_cmd if execution_info else None,
@@ -2823,19 +3007,27 @@ class Worker(ServiceCommandSection):
self.log_traceback(e)
return freeze
def _install_poetry_requirements(self, repo_info):
# type: (Optional[RepoInfo]) -> Optional[PoetryAPI]
def _install_poetry_requirements(self, repo_info, working_dir=None):
# type: (Optional[RepoInfo], Optional[str]) -> Optional[PoetryAPI]
if not repo_info:
return None
files_from_working_dir = self._session.config.get(
"agent.package_manager.poetry_files_from_repo_working_dir", False)
lockfile_path = Path(repo_info.root) / ((working_dir or "") if files_from_working_dir else "")
try:
if not self.poetry.enabled:
return None
self.poetry.initialize(cwd=repo_info.root)
api = self.poetry.get_api(repo_info.root)
self.poetry.initialize(cwd=lockfile_path)
api = self.poetry.get_api(lockfile_path)
if api.enabled:
print('Poetry Enabled: Ignoring requested python packages, using repository poetry lock file!')
api.install()
return api
print(f"Could not find pyproject.toml or poetry.lock file in {lockfile_path} \n")
except Exception as ex:
self.log.error("failed installing poetry requirements: {}".format(ex))
return None
@@ -2866,7 +3058,8 @@ class Worker(ServiceCommandSection):
"""
if package_api:
package_api.cwd = cwd
api = self._install_poetry_requirements(repo_info)
api = self._install_poetry_requirements(repo_info, execution.working_dir)
if api:
# update back the package manager, this hack should be fixed
if package_api == self.package_api:
@@ -3390,6 +3583,11 @@ class Worker(ServiceCommandSection):
requirements_manager.translator.enabled = False
print(requirements_manager.replace(contents))
def remove_non_backwards_compatible_entries(self, config: Config):
if not self._standalone_mode or not ENV_CONFIG_BC_IN_STANDALONE.get() or self._session.feature_set == "basic":
return
config.pop("agent.package_manager.pip_version") # removed due to a breaking change in v1.5.1
def get_docker_config_cmd(self, docker_args, clean_api_credentials=False):
docker_image = str(ENV_DOCKER_IMAGE.get() or
self._session.config.get("agent.default_docker.image", "nvidia/cuda")) \
@@ -3412,6 +3610,7 @@ class Worker(ServiceCommandSection):
DockerArgsSanitizer.sanitize_docker_command(self._session, self._docker_arguments) or ''))
temp_config = deepcopy(self._session.config)
self.remove_non_backwards_compatible_entries(temp_config)
mounted_cache_dir = temp_config.get(
"agent.docker_internal_mounts.sdk_cache", self._docker_fixed_user_cache)
mounted_pip_dl_dir = temp_config.get(
@@ -3423,7 +3622,6 @@ class Worker(ServiceCommandSection):
temp_config.put("sdk.storage.cache.default_base_dir", mounted_cache_dir)
temp_config.put("agent.pip_download_cache.path", mounted_pip_dl_dir)
temp_config.put("agent.vcs_cache.path", mounted_vcs_cache)
temp_config.put("agent.package_manager.system_site_packages", True)
temp_config.put("agent.package_manager.conda_env_as_base_docker", False)
temp_config.put("agent.default_python", "")
temp_config.put("agent.python_binary", "")
@@ -3435,6 +3633,11 @@ class Worker(ServiceCommandSection):
temp_config.put("agent.git_pass", (ENV_AGENT_GIT_PASS.get() or
self._session.config.get("agent.git_pass", None)))
force_system_site_packages = ENV_FORCE_SYSTEM_SITE_PACKAGES.get()
force_system_site_packages = force_system_site_packages if force_system_site_packages is not None else True
if force_system_site_packages:
temp_config.put("agent.package_manager.system_site_packages", True)
if temp_config.get("agent.venvs_cache.path", None):
temp_config.put("agent.venvs_cache.path", '/root/.clearml/venvs-cache')
@@ -3637,6 +3840,60 @@ class Worker(ServiceCommandSection):
pass
return results
@staticmethod
def _resolve_docker_env_args(docker_args):
# type: (List[str]) -> List[str]
"""
Resolve -e / --env docker environment args matching $VAR or ${VAR} from the host environment
:argument docker_args: List of docker argument strings (flags and values)
"""
non_list_args = (
"rm", "read-only", "sig-proxy", "tty", "privileged", "publish-all", "interactive", "init", "help", "detach"
)
non_list_args_single = (
"t", "P", "i", "d",
)
# if no filtering, do nothing
if not docker_args:
return docker_args
args = docker_args[:]
skip_arg = False
for i, cmd in enumerate(docker_args):
if skip_arg and not cmd.startswith("-"):
continue
skip_arg = False
if cmd.startswith("--"):
# jump over single command
if cmd[2:] in non_list_args:
continue
elif cmd.startswith("-"):
# jump over single character non args
if cmd[1:] in non_list_args_single:
continue
# if we are here we have a command to bypass and the list after it
if cmd in ('-e', '--env'):
skip_arg = True
for j in range(i+1, len(args)):
if args[j].startswith("-"):
break
parts = args[j].split("=", 1)
if len(parts) != 2:
continue
args[j] = "{}={}".format(parts[0], os.path.expandvars(parts[1]))
elif cmd.startswith("-"):
skip_arg = True
return args
def _get_docker_cmd(
self,
worker_id, parent_worker_id,
@@ -3662,11 +3919,20 @@ class Worker(ServiceCommandSection):
name=None,
mount_ssh=None, mount_ssh_ro=None, mount_apt_cache=None, mount_pip_cache=None, mount_poetry_cache=None,
env_task_id=None,
restart=None,
):
self.debug("Constructing docker command", context="docker")
docker = 'docker'
base_cmd = [docker, 'run', '-t']
use_rm = True
if restart:
if restart in ("unless-stopped", "no", "always") or restart.startswith("on-failure"):
base_cmd += ["--restart", restart]
use_rm = False
else:
self.log.error("Invalid restart value \"{}\" , ignoring".format(restart))
update_scheme = ""
dockers_nvidia_visible_devices = 'all'
gpu_devices = Session.get_nvidia_visible_env()
@@ -3691,9 +3957,14 @@ class Worker(ServiceCommandSection):
docker_arguments = list(docker_arguments) \
if isinstance(docker_arguments, (list, tuple)) else [docker_arguments]
docker_arguments = self._filter_docker_args(docker_arguments)
if self._session.config.get("agent.docker_allow_host_environ", None):
docker_arguments = self._resolve_docker_env_args(docker_arguments)
base_cmd += [a for a in docker_arguments if a]
if extra_docker_arguments:
# we always resolve environments in the `extra_docker_arguments` becuase the admin set them (not users)
extra_docker_arguments = self._resolve_docker_env_args(extra_docker_arguments)
extra_docker_arguments = [extra_docker_arguments] \
if isinstance(extra_docker_arguments, six.string_types) else extra_docker_arguments
base_cmd += [str(a) for a in extra_docker_arguments if a]
@@ -3702,6 +3973,10 @@ class Worker(ServiceCommandSection):
base_cmd += ['-l', self._worker_label.format(worker_id)]
base_cmd += ['-l', self._parent_worker_label.format(parent_worker_id)]
extra_labels = ENV_EXTRA_DOCKER_LABELS.get()
for label in (extra_labels or []):
base_cmd += ['-l', label]
self.debug("Command: {}".format(base_cmd), context="docker")
# check if running inside a kubernetes
@@ -3765,7 +4040,7 @@ class Worker(ServiceCommandSection):
base_cmd += ['-e', 'CLEARML_WORKER_ID='+worker_id, ]
# update the docker image, so the system knows where it runs
base_cmd += ['-e', 'CLEARML_DOCKER_IMAGE={} {}'.format(docker_image, ' '.join(docker_arguments or [])).strip()]
base_cmd += ['-e', 'CLEARML_DOCKER_IMAGE={}'.format(docker_image)]
if env_task_id:
base_cmd += ['-e', 'CLEARML_TASK_ID={}'.format(env_task_id), ]
@@ -3784,6 +4059,7 @@ class Worker(ServiceCommandSection):
# if we are running a RC version, install the same version in the docker
# because the default latest, will be a release version (not RC)
specify_version = ''
# noinspection PyBroadException
try:
from clearml_agent.version import __version__
_version_parts = __version__.split('.')
@@ -3792,14 +4068,15 @@ class Worker(ServiceCommandSection):
except:
pass
agent_install_bash_script = []
force_agent_repo = ENV_FORCE_DOCKER_AGENT_REPO.get()
if os.environ.get('FORCE_LOCAL_CLEARML_AGENT_WHEEL'):
local_wheel = os.path.expanduser(os.environ.get('FORCE_LOCAL_CLEARML_AGENT_WHEEL'))
docker_wheel = '/tmp/{}'.format(basename(local_wheel))
base_cmd += ['-v', local_wheel + ':' + docker_wheel]
clearml_agent_wheel = '\"{}\"'.format(docker_wheel)
elif os.environ.get('FORCE_CLEARML_AGENT_REPO'):
clearml_agent_wheel = os.environ.get('FORCE_CLEARML_AGENT_REPO')
elif force_agent_repo:
clearml_agent_wheel = force_agent_repo
else:
# clearml-agent{specify_version}
clearml_agent_wheel = 'clearml-agent{specify_version}'.format(specify_version=specify_version)
@@ -3833,9 +4110,6 @@ class Worker(ServiceCommandSection):
if preprocess_bash_script:
bash_script = preprocess_bash_script + bash_script
if agent_install_bash_script:
bash_script += agent_install_bash_script
docker_bash_script = " ; ".join([line for line in bash_script if line]) \
if not isinstance(bash_script, str) else bash_script
@@ -3844,10 +4118,10 @@ class Worker(ServiceCommandSection):
update_scheme += (
docker_bash_script + " ; " +
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON={python} ; " +
"$LOCAL_PYTHON -m pip install -U \"pip{pip_version}\" ; " +
"$LOCAL_PYTHON -m pip install -U {pip_version} ; " +
"$LOCAL_PYTHON -m pip install -U {clearml_agent_wheel} ; ").format(
python_single_digit=python_version.split('.')[0],
python=python_version, pip_version=PackageManager.get_pip_version(),
python=python_version, pip_version=" ".join(PackageManager.get_pip_versions(wrap='\"')),
clearml_agent_wheel=clearml_agent_wheel,
mount_ssh_ro=mount_ssh_ro, mount_ssh=mount_ssh,
)
@@ -3883,7 +4157,8 @@ class Worker(ServiceCommandSection):
(['-v', host_cache+':'+mounted_cache] if host_cache else []) +
(['-v', host_vcs_cache+':'+mounted_vcs_cache] if host_vcs_cache else []) +
(['-v', host_venvs_cache + ':' + mounted_venvs_cache] if host_venvs_cache else []) +
['--rm', docker_image, 'bash', '-c',
(['--rm'] if use_rm else []) +
[docker_image, 'bash', '-c',
update_scheme +
extra_shell_script +
"cp {} {} ; ".format(DOCKER_ROOT_CONF_FILE, DOCKER_DEFAULT_CONF_FILE) +
@@ -4089,6 +4364,15 @@ class Worker(ServiceCommandSection):
" found {})".format(role)
)
@staticmethod
def _get_path(d, *path, default=None):
try:
return functools.reduce(
lambda a, b: a[b], path, d
)
except (IndexError, KeyError):
return default
if __name__ == "__main__":
pass

View File

@@ -5,9 +5,9 @@ from enum import IntEnum
from os import getenv, environ
from typing import Text, Optional, Union, Tuple, Any
import six
from pathlib2 import Path
import six
from clearml_agent.helper.base import normalize_path
PROGRAM_NAME = "clearml-agent"
@@ -69,41 +69,65 @@ ENV_AWS_SECRET_KEY = EnvironmentConfig("AWS_SECRET_ACCESS_KEY")
ENV_AZURE_ACCOUNT_KEY = EnvironmentConfig("AZURE_STORAGE_KEY")
ENVIRONMENT_CONFIG = {
"api.api_server": EnvironmentConfig("CLEARML_API_HOST", "TRAINS_API_HOST", ),
"api.files_server": EnvironmentConfig("CLEARML_FILES_HOST", "TRAINS_FILES_HOST", ),
"api.web_server": EnvironmentConfig("CLEARML_WEB_HOST", "TRAINS_WEB_HOST", ),
"api.api_server": EnvironmentConfig(
"CLEARML_API_HOST",
"TRAINS_API_HOST",
),
"api.files_server": EnvironmentConfig(
"CLEARML_FILES_HOST",
"TRAINS_FILES_HOST",
),
"api.web_server": EnvironmentConfig(
"CLEARML_WEB_HOST",
"TRAINS_WEB_HOST",
),
"api.credentials.access_key": EnvironmentConfig(
"CLEARML_API_ACCESS_KEY", "TRAINS_API_ACCESS_KEY",
"CLEARML_API_ACCESS_KEY",
"TRAINS_API_ACCESS_KEY",
),
"api.credentials.secret_key": ENV_AGENT_SECRET_KEY,
"agent.worker_name": EnvironmentConfig("CLEARML_WORKER_NAME", "TRAINS_WORKER_NAME", ),
"agent.worker_id": EnvironmentConfig("CLEARML_WORKER_ID", "TRAINS_WORKER_ID", ),
"agent.cuda_version": EnvironmentConfig(
"CLEARML_CUDA_VERSION", "TRAINS_CUDA_VERSION", "CUDA_VERSION"
"agent.worker_name": EnvironmentConfig(
"CLEARML_WORKER_NAME",
"TRAINS_WORKER_NAME",
),
"agent.cudnn_version": EnvironmentConfig(
"CLEARML_CUDNN_VERSION", "TRAINS_CUDNN_VERSION", "CUDNN_VERSION"
),
"agent.cpu_only": EnvironmentConfig(
names=("CLEARML_CPU_ONLY", "TRAINS_CPU_ONLY", "CPU_ONLY"), type=bool
"agent.worker_id": EnvironmentConfig(
"CLEARML_WORKER_ID",
"TRAINS_WORKER_ID",
),
"agent.cuda_version": EnvironmentConfig("CLEARML_CUDA_VERSION", "TRAINS_CUDA_VERSION", "CUDA_VERSION"),
"agent.cudnn_version": EnvironmentConfig("CLEARML_CUDNN_VERSION", "TRAINS_CUDNN_VERSION", "CUDNN_VERSION"),
"agent.cpu_only": EnvironmentConfig(names=("CLEARML_CPU_ONLY", "TRAINS_CPU_ONLY", "CPU_ONLY"), type=bool),
"agent.crash_on_exception": EnvironmentConfig("CLEAMRL_AGENT_CRASH_ON_EXCEPTION", type=bool),
"sdk.aws.s3.key": EnvironmentConfig("AWS_ACCESS_KEY_ID"),
"sdk.aws.s3.secret": ENV_AWS_SECRET_KEY,
"sdk.aws.s3.region": EnvironmentConfig("AWS_DEFAULT_REGION"),
"sdk.azure.storage.containers.0": {'account_name': EnvironmentConfig("AZURE_STORAGE_ACCOUNT"),
'account_key': ENV_AZURE_ACCOUNT_KEY},
"sdk.azure.storage.containers.0": {
"account_name": EnvironmentConfig("AZURE_STORAGE_ACCOUNT"),
"account_key": ENV_AZURE_ACCOUNT_KEY,
},
"sdk.google.storage.credentials_json": EnvironmentConfig("GOOGLE_APPLICATION_CREDENTIALS"),
}
ENVIRONMENT_SDK_PARAMS = {
"task_id": ("CLEARML_TASK_ID", "TRAINS_TASK_ID", ),
"config_file": ("CLEARML_CONFIG_FILE", "TRAINS_CONFIG_FILE", ),
"log_level": ("CLEARML_LOG_LEVEL", "TRAINS_LOG_LEVEL", ),
"log_to_backend": ("CLEARML_LOG_TASK_TO_BACKEND", "TRAINS_LOG_TASK_TO_BACKEND", ),
"task_id": (
"CLEARML_TASK_ID",
"TRAINS_TASK_ID",
),
"config_file": (
"CLEARML_CONFIG_FILE",
"TRAINS_CONFIG_FILE",
),
"log_level": (
"CLEARML_LOG_LEVEL",
"TRAINS_LOG_LEVEL",
),
"log_to_backend": (
"CLEARML_LOG_TASK_TO_BACKEND",
"TRAINS_LOG_TASK_TO_BACKEND",
),
}
ENVIRONMENT_BACKWARD_COMPATIBLE = EnvironmentConfig(
names=("CLEARML_AGENT_ALG_ENV", "TRAINS_AGENT_ALG_ENV"), type=bool)
ENVIRONMENT_BACKWARD_COMPATIBLE = EnvironmentConfig(names=("CLEARML_AGENT_ALG_ENV", "TRAINS_AGENT_ALG_ENV"), type=bool)
VIRTUAL_ENVIRONMENT_PATH = {
"python2": normalize_path(CONFIG_DIR, "py2venv"),
@@ -122,38 +146,67 @@ TOKEN_EXPIRATION_SECONDS = int(timedelta(days=2).total_seconds())
METADATA_EXTENSION = ".json"
DEFAULT_VENV_UPDATE_URL = (
"https://raw.githubusercontent.com/Yelp/venv-update/v3.2.4/venv_update.py"
)
DEFAULT_VENV_UPDATE_URL = "https://raw.githubusercontent.com/Yelp/venv-update/v3.2.4/venv_update.py"
WORKING_REPOSITORY_DIR = "task_repository"
WORKING_STANDALONE_DIR = "code"
DEFAULT_VCS_CACHE = normalize_path(CONFIG_DIR, "vcs-cache")
PIP_EXTRA_INDICES = [
]
PIP_EXTRA_INDICES = []
DEFAULT_PIP_DOWNLOAD_CACHE = normalize_path(CONFIG_DIR, "pip-download-cache")
ENV_DOCKER_IMAGE = EnvironmentConfig('CLEARML_DOCKER_IMAGE', 'TRAINS_DOCKER_IMAGE')
ENV_WORKER_ID = EnvironmentConfig('CLEARML_WORKER_ID', 'TRAINS_WORKER_ID')
ENV_WORKER_TAGS = EnvironmentConfig('CLEARML_WORKER_TAGS')
ENV_AGENT_SKIP_PIP_VENV_INSTALL = EnvironmentConfig('CLEARML_AGENT_SKIP_PIP_VENV_INSTALL')
ENV_AGENT_SKIP_PYTHON_ENV_INSTALL = EnvironmentConfig('CLEARML_AGENT_SKIP_PYTHON_ENV_INSTALL', type=bool)
ENV_DOCKER_SKIP_GPUS_FLAG = EnvironmentConfig('CLEARML_DOCKER_SKIP_GPUS_FLAG', 'TRAINS_DOCKER_SKIP_GPUS_FLAG')
ENV_AGENT_GIT_USER = EnvironmentConfig('CLEARML_AGENT_GIT_USER', 'TRAINS_AGENT_GIT_USER')
ENV_AGENT_GIT_PASS = EnvironmentConfig('CLEARML_AGENT_GIT_PASS', 'TRAINS_AGENT_GIT_PASS')
ENV_AGENT_GIT_HOST = EnvironmentConfig('CLEARML_AGENT_GIT_HOST', 'TRAINS_AGENT_GIT_HOST')
ENV_AGENT_DISABLE_SSH_MOUNT = EnvironmentConfig('CLEARML_AGENT_DISABLE_SSH_MOUNT', type=bool)
ENV_SSH_AUTH_SOCK = EnvironmentConfig('SSH_AUTH_SOCK')
ENV_TASK_EXECUTE_AS_USER = EnvironmentConfig('CLEARML_AGENT_EXEC_USER', 'TRAINS_AGENT_EXEC_USER')
ENV_TASK_EXTRA_PYTHON_PATH = EnvironmentConfig('CLEARML_AGENT_EXTRA_PYTHON_PATH', 'TRAINS_AGENT_EXTRA_PYTHON_PATH')
ENV_DOCKER_HOST_MOUNT = EnvironmentConfig('CLEARML_AGENT_K8S_HOST_MOUNT', 'CLEARML_AGENT_DOCKER_HOST_MOUNT',
'TRAINS_AGENT_K8S_HOST_MOUNT', 'TRAINS_AGENT_DOCKER_HOST_MOUNT')
ENV_VENV_CACHE_PATH = EnvironmentConfig('CLEARML_AGENT_VENV_CACHE_PATH')
ENV_EXTRA_DOCKER_ARGS = EnvironmentConfig('CLEARML_AGENT_EXTRA_DOCKER_ARGS', type=list)
ENV_DEBUG_INFO = EnvironmentConfig('CLEARML_AGENT_DEBUG_INFO')
ENV_CHILD_AGENTS_COUNT_CMD = EnvironmentConfig('CLEARML_AGENT_CHILD_AGENTS_COUNT_CMD')
ENV_DOCKER_ARGS_FILTERS = EnvironmentConfig('CLEARML_AGENT_DOCKER_ARGS_FILTERS')
ENV_DOCKER_ARGS_HIDE_ENV = EnvironmentConfig('CLEARML_AGENT_DOCKER_ARGS_HIDE_ENV')
ENV_PIP_EXTRA_INSTALL_FLAGS = EnvironmentConfig("CLEARML_EXTRA_PIP_INSTALL_FLAGS", type=list)
ENV_DOCKER_IMAGE = EnvironmentConfig("CLEARML_DOCKER_IMAGE", "TRAINS_DOCKER_IMAGE")
ENV_WORKER_ID = EnvironmentConfig("CLEARML_WORKER_ID", "TRAINS_WORKER_ID")
ENV_WORKER_TAGS = EnvironmentConfig("CLEARML_WORKER_TAGS")
ENV_AGENT_SKIP_PIP_VENV_INSTALL = EnvironmentConfig("CLEARML_AGENT_SKIP_PIP_VENV_INSTALL")
ENV_AGENT_SKIP_PYTHON_ENV_INSTALL = EnvironmentConfig("CLEARML_AGENT_SKIP_PYTHON_ENV_INSTALL", type=bool)
ENV_DOCKER_SKIP_GPUS_FLAG = EnvironmentConfig("CLEARML_DOCKER_SKIP_GPUS_FLAG", "TRAINS_DOCKER_SKIP_GPUS_FLAG")
ENV_AGENT_GIT_USER = EnvironmentConfig("CLEARML_AGENT_GIT_USER", "TRAINS_AGENT_GIT_USER")
ENV_AGENT_GIT_PASS = EnvironmentConfig("CLEARML_AGENT_GIT_PASS", "TRAINS_AGENT_GIT_PASS")
ENV_AGENT_GIT_HOST = EnvironmentConfig("CLEARML_AGENT_GIT_HOST", "TRAINS_AGENT_GIT_HOST")
ENV_AGENT_DISABLE_SSH_MOUNT = EnvironmentConfig("CLEARML_AGENT_DISABLE_SSH_MOUNT", type=bool)
ENV_SSH_AUTH_SOCK = EnvironmentConfig("SSH_AUTH_SOCK")
ENV_TASK_EXECUTE_AS_USER = EnvironmentConfig("CLEARML_AGENT_EXEC_USER", "TRAINS_AGENT_EXEC_USER")
ENV_TASK_EXTRA_PYTHON_PATH = EnvironmentConfig("CLEARML_AGENT_EXTRA_PYTHON_PATH", "TRAINS_AGENT_EXTRA_PYTHON_PATH")
ENV_DOCKER_HOST_MOUNT = EnvironmentConfig(
"CLEARML_AGENT_K8S_HOST_MOUNT",
"CLEARML_AGENT_DOCKER_HOST_MOUNT",
"TRAINS_AGENT_K8S_HOST_MOUNT",
"TRAINS_AGENT_DOCKER_HOST_MOUNT",
)
ENV_VENV_CACHE_PATH = EnvironmentConfig("CLEARML_AGENT_VENV_CACHE_PATH")
ENV_EXTRA_DOCKER_ARGS = EnvironmentConfig("CLEARML_AGENT_EXTRA_DOCKER_ARGS", type=list)
ENV_EXTRA_DOCKER_LABELS = EnvironmentConfig("CLEARML_AGENT_EXTRA_DOCKER_LABELS", type=list)
ENV_DEBUG_INFO = EnvironmentConfig("CLEARML_AGENT_DEBUG_INFO")
ENV_CHILD_AGENTS_COUNT_CMD = EnvironmentConfig("CLEARML_AGENT_CHILD_AGENTS_COUNT_CMD")
ENV_DOCKER_ARGS_FILTERS = EnvironmentConfig("CLEARML_AGENT_DOCKER_ARGS_FILTERS")
ENV_DOCKER_ARGS_HIDE_ENV = EnvironmentConfig("CLEARML_AGENT_DOCKER_ARGS_HIDE_ENV")
ENV_CONFIG_BC_IN_STANDALONE = EnvironmentConfig("CLEARML_AGENT_STANDALONE_CONFIG_BC", type=bool)
""" Maintain backwards compatible configuration when launching in standalone mode """
ENV_CUSTOM_BUILD_SCRIPT = EnvironmentConfig('CLEARML_AGENT_CUSTOM_BUILD_SCRIPT')
ENV_FORCE_DOCKER_AGENT_REPO = EnvironmentConfig("FORCE_CLEARML_AGENT_REPO", "CLEARML_AGENT_DOCKER_AGENT_REPO")
ENV_SERVICES_DOCKER_RESTART = EnvironmentConfig("CLEARML_AGENT_SERVICES_DOCKER_RESTART")
"""
Specify a restart value for a services agent task containers.
Note that when a restart value is provided, task containers will not be run with the '--rm' flag and will
not be cleaned up automatically when completed (this will need to be done externally using the
'docker container prune' command to free up resources).
Value format for this env var is "<restart-value>;<task-selector>", where:
- <restart-value> can be any valid restart value for docker-run (see https://docs.docker.com/engine/reference/commandline/run/#restart)
- <task-selector> is optional, allowing to restrict this behaviour to specific tasks. The format is:
"<path-to-task-field>=<value>" where:
* <path-to-task-field> is a dot-separated path to a task field (e.g. "container.image")
* <value> is optional. If not provided, the restart policy till be applied for the task container if the
path provided exists. If provided, the restart policy will be applied if the value matches the value
obtained from the task (value parsing and comparison is based on the type of value obtained from the task)
For example:
CLEARML_AGENT_SERVICES_DOCKER_RESTART=unless-stopped
CLEARML_AGENT_SERVICES_DOCKER_RESTART=unless-stopped;container.image=some-image
"""
ENV_FORCE_SYSTEM_SITE_PACKAGES = EnvironmentConfig("CLEARML_AGENT_FORCE_SYSTEM_SITE_PACKAGES", type=bool)
""" Force system_site_packages: true when running tasks in containers (i.e. docker mode or k8s glue) """
ENV_CUSTOM_BUILD_SCRIPT = EnvironmentConfig("CLEARML_AGENT_CUSTOM_BUILD_SCRIPT")
"""
Specifies a custom environment setup script to be executed instead of installing a virtual environment.
If provided, this script is executed following Git cloning. Script command may include environment variable and

View File

@@ -0,0 +1,15 @@
from threading import Thread
from clearml_agent.session import Session
class K8sDaemon(Thread):
def __init__(self, agent):
super(K8sDaemon, self).__init__(target=self.target)
self.daemon = True
self._agent = agent
self.log = agent.log
self._session: Session = agent._session
def target(self):
pass

View File

@@ -0,0 +1,12 @@
class GetPodsError(Exception):
pass
class GetJobsError(Exception):
pass
class GetPodCountError(Exception):
pass

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,223 @@
from time import sleep
from typing import Dict, Tuple, Optional, List
from clearml_agent.backend_api.session import Request
from clearml_agent.glue.utilities import get_bash_output
from clearml_agent.helper.process import stringify_bash_output
from .daemon import K8sDaemon
from .utilities import get_path
from .errors import GetPodsError
class PendingPodsDaemon(K8sDaemon):
def __init__(self, polling_interval: float, agent):
super(PendingPodsDaemon, self).__init__(agent=agent)
self._polling_interval = polling_interval
self._last_tasks_msgs = {} # last msg updated for every task
def get_pods(self):
if self._agent.using_jobs:
return self._agent.get_pods_for_jobs(
job_condition="status.active=1",
pod_filters=["status.phase=Pending"],
debug_msg="Detecting pending pods: {cmd}"
)
return self._agent.get_pods(
filters=["status.phase=Pending"],
debug_msg="Detecting pending pods: {cmd}"
)
def _get_pod_name(self, pod: dict):
return get_path(pod, "metadata", "name")
def _get_k8s_resource_name(self, pod: dict):
if self._agent.using_jobs:
return get_path(pod, "metadata", "labels", "job-name")
return get_path(pod, "metadata", "name")
def _get_task_id(self, pod: dict):
return self._get_k8s_resource_name(pod).rpartition('-')[-1]
@staticmethod
def _get_k8s_resource_namespace(pod: dict):
return pod.get('metadata', {}).get('namespace', None)
def target(self):
"""
Handle pending objects (pods or jobs, depending on the agent mode).
- Delete any pending objects that are not expected to recover
- Delete any pending objects for whom the associated task was aborted
"""
while True:
# noinspection PyBroadException
try:
# Get pods (standalone pods if we're in pods mode, or pods associated to jobs if we're in jobs mode)
pods = self.get_pods()
if pods is None:
raise GetPodsError()
task_id_to_pod = dict()
for pod in pods:
pod_name = self._get_pod_name(pod)
if not pod_name:
continue
task_id = self._get_task_id(pod)
if not task_id:
continue
namespace = self._get_k8s_resource_namespace(pod)
if not namespace:
continue
task_id_to_pod[task_id] = pod
msg = None
tags = []
waiting = get_path(pod, 'status', 'containerStatuses', 0, 'state', 'waiting')
if not waiting:
condition = get_path(pod, 'status', 'conditions', 0)
if condition:
reason = condition.get('reason')
if reason == 'Unschedulable':
message = condition.get('message')
msg = reason + (" ({})".format(message) if message else "")
else:
reason = waiting.get("reason", None)
message = waiting.get("message", None)
msg = reason + (" ({})".format(message) if message else "")
if reason == 'ImagePullBackOff':
self.delete_k8s_resource(k8s_resource=pod, msg=reason)
try:
self._session.api_client.tasks.failed(
task=task_id,
status_reason="K8S glue error: {}".format(msg),
status_message="Changed by K8S glue",
force=True
)
self._agent.command.send_logs(
task_id, ["K8S Error: {}".format(msg)],
session=self._session
)
except Exception as ex:
self.log.warning(
'K8S Glue pending monitor: Failed deleting task "{}"\nEX: {}'.format(task_id, ex)
)
# clean up any msg for this task
self._last_tasks_msgs.pop(task_id, None)
continue
self._update_pending_task_msg(task_id, msg, tags)
if task_id_to_pod:
self._process_tasks_for_pending_pods(task_id_to_pod)
# clean up any last message for a task that wasn't seen as a pod
self._last_tasks_msgs = {k: v for k, v in self._last_tasks_msgs.items() if k in task_id_to_pod}
except GetPodsError:
pass
except Exception:
self.log.exception("Hanging pods daemon loop")
sleep(self._polling_interval)
def delete_k8s_resource(self, k8s_resource: dict, msg: str = None):
delete_cmd = "kubectl delete {kind} {name} -n {namespace} --output name".format(
kind=self._agent.kind,
name=self._get_k8s_resource_name(k8s_resource),
namespace=self._get_k8s_resource_namespace(k8s_resource)
).strip()
self.log.debug(" - deleting {} {}: {}".format(self._agent.kind, (" " + msg) if msg else "", delete_cmd))
return get_bash_output(delete_cmd).strip()
def _process_tasks_for_pending_pods(self, task_id_to_details: Dict[str, dict]):
self._handle_aborted_tasks(task_id_to_details)
def _handle_aborted_tasks(self, pending_tasks_details: Dict[str, dict]):
try:
result = self._session.get(
service='tasks',
action='get_all',
json={
"id": list(pending_tasks_details),
"status": ["stopped"],
"only_fields": ["id"]
},
method=Request.def_method,
async_enable=False,
)
aborted_task_ids = list(filter(None, (task.get("id") for task in result["tasks"])))
for task_id in aborted_task_ids:
pod = pending_tasks_details.get(task_id)
if not pod:
self.log.error("Failed locating aborted task {} in pending pods list".format(task_id))
continue
resource_name = self._get_k8s_resource_name(pod)
self.log.info(
"K8S Glue pending monitor: task {} was aborted but the k8s resource {} is still pending, "
"deleting pod".format(task_id, resource_name)
)
output = self.delete_k8s_resource(k8s_resource=pod, msg="Pending resource of an aborted task")
if not output:
self.log.warning("K8S Glue pending monitor: failed deleting resource {}".format(resource_name))
except Exception as ex:
self.log.warning(
'K8S Glue pending monitor: failed checking aborted tasks for pending resources: {}'.format(ex)
)
def _update_pending_task_msg(self, task_id: str, msg: str, tags: List[str] = None):
if not msg or self._last_tasks_msgs.get(task_id, None) == (msg, tags):
return
try:
# Make sure the task is queued
result = self._session.send_request(
service='tasks',
action='get_all',
json={"id": task_id, "only_fields": ["status"]},
method=Request.def_method,
async_enable=False,
)
if result.ok:
status = get_path(result.json(), 'data', 'tasks', 0, 'status')
# if task is in progress, change its status to enqueued
if status == "in_progress":
result = self._session.send_request(
service='tasks', action='enqueue',
json={
"task": task_id, "force": True, "queue": self._agent.k8s_pending_queue_id
},
method=Request.def_method,
async_enable=False,
)
if not result.ok:
result_msg = get_path(result.json(), 'meta', 'result_msg')
self.log.debug(
"K8S Glue pods monitor: failed forcing task status change"
" for pending task {}: {}".format(task_id, result_msg)
)
# Update task status message
payload = {"task": task_id, "status_message": "K8S glue status: {}".format(msg)}
if tags:
payload["tags"] = tags
result = self._session.send_request('tasks', 'update', json=payload, method=Request.def_method)
if not result.ok:
result_msg = get_path(result.json(), 'meta', 'result_msg')
raise Exception(result_msg or result.text)
# update last msg for this task
self._last_tasks_msgs[task_id] = msg
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: Failed setting status message for task "{}"\nMSG: {}\nEX: {}'.format(
task_id, msg, ex
)
)

View File

@@ -0,0 +1,18 @@
import functools
from subprocess import DEVNULL
from clearml_agent.helper.process import get_bash_output as _get_bash_output
def get_path(d, *path, default=None):
try:
return functools.reduce(
lambda a, b: a[b], path, d
)
except (IndexError, KeyError):
return default
def get_bash_output(cmd, stderr=DEVNULL, raise_error=False):
return _get_bash_output(cmd, stderr=stderr, raise_error=raise_error)

View File

@@ -20,20 +20,22 @@ from typing import Text, Dict, Any, Optional, AnyStr, IO, Union
import attr
import furl
import six
import yaml
from attr import fields_dict
from pathlib2 import Path
import six
from six.moves import reduce
from clearml_agent.external import pyhocon
from clearml_agent.errors import CommandFailedError
from clearml_agent.external import pyhocon
from clearml_agent.helper.dicts import filter_keys
pretty_lines = False
log = logging.getLogger(__name__)
use_powershell = os.getenv("CLEARML_AGENT_USE_POWERSHELL", None)
def which(cmd, path=None):
result = find_executable(cmd, path)
@@ -52,7 +54,7 @@ def select_for_platform(linux, windows):
def bash_c():
return 'bash -c' if not is_windows_platform() else 'cmd /c'
return 'bash -c' if not is_windows_platform() else ('powershell -Command' if use_powershell else 'cmd /c')
def return_list(arg):

View File

@@ -50,7 +50,7 @@ class PackageManager(object):
pass
@abc.abstractmethod
def freeze(self):
def freeze(self, freeze_full_environment=False):
pass
@abc.abstractmethod
@@ -80,7 +80,12 @@ class PackageManager(object):
def upgrade_pip(self):
result = self._install(
select_for_platform(windows='pip{}', linux='pip{}').format(self.get_pip_version()), "--upgrade")
*select_for_platform(
windows=self.get_pip_versions(),
linux=self.get_pip_versions()
),
"--upgrade"
)
packages = self.run_with_env(('list',), output=True).splitlines()
# p.split is ('pip', 'x.y.z')
pip = [p.split() for p in packages if len(p.split()) == 2 and p.split()[0] == 'pip']
@@ -136,8 +141,9 @@ class PackageManager(object):
@classmethod
def out_of_scope_install_package(cls, package_name, *args):
if PackageManager._selected_manager is not None:
# noinspection PyBroadException
try:
result = PackageManager._selected_manager._install(package_name, *args)
result = PackageManager._selected_manager.install_packages(package_name, *args)
if result not in (0, None, True):
return False
except Exception:
@@ -145,10 +151,11 @@ class PackageManager(object):
return True
@classmethod
def out_of_scope_freeze(cls):
def out_of_scope_freeze(cls, freeze_full_environment=False):
if PackageManager._selected_manager is not None:
# noinspection PyBroadException
try:
return PackageManager._selected_manager.freeze()
return PackageManager._selected_manager.freeze(freeze_full_environment)
except Exception:
pass
return []
@@ -157,15 +164,26 @@ class PackageManager(object):
def set_pip_version(cls, version):
if not version:
return
version = version.replace(' ', '')
if ('=' in version) or ('~' in version) or ('<' in version) or ('>' in version):
cls._pip_version = version
if isinstance(version, (list, tuple)):
versions = version
else:
cls._pip_version = "=="+version
versions = [version]
cls._pip_version = []
for version in versions:
version = version.strip()
if ('=' in version) or ('~' in version) or ('<' in version) or ('>' in version):
cls._pip_version.append(version)
else:
cls._pip_version.append("==" + version)
@classmethod
def get_pip_version(cls):
return cls._pip_version or ''
def get_pip_versions(cls, pip="pip", wrap=''):
return [
(wrap + pip + version + wrap)
for version in cls._pip_version or [pip]
]
def get_cached_venv(self, requirements, docker_cmd, python_version, cuda_version, destination_folder):
# type: (Dict, Optional[Union[dict, str]], Optional[str], Optional[str], Path) -> Optional[Path]
@@ -176,8 +194,13 @@ class PackageManager(object):
if not self._get_cache_manager():
return None
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
return self._get_cache_manager().copy_cached_entry(keys, destination_folder)
try:
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
return self._get_cache_manager().copy_cached_entry(keys, destination_folder)
except Exception as ex:
print("WARNING: Failed accessing venvs cache at {}: {}".format(destination_folder, ex))
print("WARNING: Skipping venv cache - folder not accessible!")
return None
def add_cached_venv(
self,
@@ -194,9 +217,15 @@ class PackageManager(object):
"""
if not self._get_cache_manager():
return
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
return self._get_cache_manager().add_entry(
keys=keys, source_folder=source_folder, exclude_sub_folders=exclude_sub_folders)
try:
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
return self._get_cache_manager().add_entry(
keys=keys, source_folder=source_folder, exclude_sub_folders=exclude_sub_folders)
except Exception as ex:
print("WARNING: Failed accessing venvs cache at {}: {}".format(source_folder, ex))
print("WARNING: Skipping venv cache - folder not accessible!")
return None
def get_cache_folder(self):
# type: () -> Optional[Path]
@@ -264,12 +293,19 @@ class PackageManager(object):
def _get_cache_manager(self):
if not self._cache_manager:
cache_folder = ENV_VENV_CACHE_PATH.get() or self.session.config.get(self._config_cache_folder, None)
if not cache_folder:
cache_folder = None
try:
cache_folder = ENV_VENV_CACHE_PATH.get() or self.session.config.get(self._config_cache_folder, None)
if not cache_folder:
return None
max_entries = int(self.session.config.get(self._config_cache_max_entries, 10))
free_space_threshold = float(self.session.config.get(self._config_cache_free_space_threshold, 0))
self._cache_manager = FolderCache(
cache_folder, max_cache_entries=max_entries, min_free_space_gb=free_space_threshold)
except Exception as ex:
print("WARNING: Failed accessing venvs cache at {}: {}".format(cache_folder, ex))
print("WARNING: Skipping venv cache - folder not accessible!")
return None
max_entries = int(self.session.config.get(self._config_cache_max_entries, 10))
free_space_threshold = float(self.session.config.get(self._config_cache_free_space_threshold, 0))
self._cache_manager = FolderCache(
cache_folder, max_cache_entries=max_entries, min_free_space_gb=free_space_threshold)
return self._cache_manager

View File

@@ -135,7 +135,12 @@ class CondaAPI(PackageManager):
if self.env_read_only:
print('Conda environment in read-only mode, skipping pip upgrade.')
return ''
return self._install(select_for_platform(windows='pip{}', linux='pip{}').format(self.pip.get_pip_version()))
return self._install(
*select_for_platform(
windows=self.pip.get_pip_versions(),
linux=self.pip.get_pip_versions()
)
)
def create(self):
"""

View File

@@ -50,6 +50,14 @@ class ExternalRequirements(SimpleSubstitution):
print("No need to reinstall \'{}\' from VCS, "
"the exact same version is already installed".format(req.name))
continue
if not req.pip_new_version:
# noinspection PyBroadException
try:
freeze_base = PackageManager.out_of_scope_freeze() or dict(pip=[])
except Exception:
freeze_base = dict(pip=[])
req_line = self._add_vcs_credentials(req, session)
# if we have older pip version we have to make sure we replace back the package name with the
@@ -58,14 +66,14 @@ class ExternalRequirements(SimpleSubstitution):
PackageManager.out_of_scope_install_package(req_line, "--no-deps")
# noinspection PyBroadException
try:
freeze_post = PackageManager.out_of_scope_freeze() or ''
freeze_post = PackageManager.out_of_scope_freeze() or dict(pip=[])
package_name = list(set(freeze_post['pip']) - set(freeze_base['pip']))
if package_name and package_name[0] not in self.post_install_req_lookup:
self.post_install_req_lookup[package_name[0]] = req.req.line
except Exception:
pass
# no need to force reinstall, pip will always rebuilt if the package comes from git
# no need to force reinstall, pip will always rebuild if the package comes from git
# and make sure the required packages are installed (if they are not it will install them)
if not PackageManager.out_of_scope_install_package(req_line):
raise ValueError("Failed installing GIT/HTTPs package \'{}\'".format(req_line))
@@ -84,20 +92,14 @@ class ExternalRequirements(SimpleSubstitution):
vcs_url = req_line[4:]
# reverse replace
vcs_url = vcs_url[::-1].replace(fragment[::-1], '', 1)[::-1]
# remove ssh:// or git:// prefix for git detection and credentials
scheme = ''
if vcs_url and (vcs_url.startswith('ssh://') or vcs_url.startswith('git://')):
scheme = 'ssh://' # notice git:// is actually ssh://
vcs_url = vcs_url[6:]
# notice git:// is actually ssh://
if vcs_url and vcs_url.startswith('git://'):
vcs_url = vcs_url.replace('git://', 'ssh://', 1)
from ..repo import Git
vcs = Git(session=session, url=vcs_url, location=None, revision=None)
vcs._set_ssh_url()
new_req_line = 'git+{}{}{}'.format(
'' if scheme and '://' in vcs.url else scheme,
vcs_url if session.config.get('agent.force_git_ssh_protocol', None) else vcs.url_with_auth,
fragment
)
new_req_line = 'git+{}{}'.format(vcs.url_with_auth, fragment)
if new_req_line != req_line:
furl_line = furl(new_req_line)
print('Replacing original pip vcs \'{}\' with \'{}\''.format(

View File

@@ -4,7 +4,7 @@ from itertools import chain
from pathlib import Path
from typing import Text, Optional
from clearml_agent.definitions import PIP_EXTRA_INDICES, PROGRAM_NAME
from clearml_agent.definitions import PIP_EXTRA_INDICES, PROGRAM_NAME, ENV_PIP_EXTRA_INSTALL_FLAGS
from clearml_agent.helper.package.base import PackageManager
from clearml_agent.helper.process import Argv, DEVNULL
from clearml_agent.session import Session
@@ -52,7 +52,7 @@ class SystemPip(PackageManager):
package,
'--dest', cache_dir,
'--no-deps',
) + self.install_flags()
) + self.download_flags()
)
def load_requirements(self, requirements):
@@ -65,13 +65,14 @@ class SystemPip(PackageManager):
def uninstall(self, package):
self.run_with_env(('uninstall', '-y', package))
def freeze(self):
def freeze(self, freeze_full_environment=False):
"""
pip freeze to all install packages except the running program
:return: Dict contains pip as key and pip's packages to install
:rtype: Dict[str: List[str]]
"""
packages = self.run_with_env(('freeze',), output=True).splitlines()
packages = self.run_with_env(
('freeze',) if not freeze_full_environment else ('freeze', '--all'), output=True).splitlines()
packages_without_program = [package for package in packages if PROGRAM_NAME not in package]
return {'pip': packages_without_program}
@@ -87,6 +88,11 @@ class SystemPip(PackageManager):
# make sure we are not running it with our own PYTHONPATH
env = dict(**os.environ)
env.pop('PYTHONPATH', None)
# Debug print
if self.session.debug_mode:
print(command)
return (command.get_output if output else command.check_call)(stdin=DEVNULL, env=env, **kwargs)
def _make_command(self, command):
@@ -97,4 +103,17 @@ class SystemPip(PackageManager):
self.indices_args = tuple(
chain.from_iterable(('--extra-index-url', x) for x in PIP_EXTRA_INDICES)
)
extra_pip_flags = \
ENV_PIP_EXTRA_INSTALL_FLAGS.get() or \
self.session.config.get("agent.package_manager.extra_pip_install_flags", None)
return (self.indices_args + tuple(extra_pip_flags)) if extra_pip_flags else self.indices_args
def download_flags(self):
if self.indices_args is None:
self.indices_args = tuple(
chain.from_iterable(('--extra-index-url', x) for x in PIP_EXTRA_INDICES)
)
return self.indices_args

View File

@@ -69,6 +69,11 @@ class PoetryConfig:
path = path.replace(':'+sys.base_prefix, ':'+sys.real_prefix, 1)
kwargs['env']['PATH'] = path
if self.session and self.session.config:
extra_args = self.session.config.get("agent.package_manager.poetry_install_extra_args", None)
if extra_args:
args = args + tuple(extra_args)
if check_if_command_exists("poetry"):
argv = Argv("poetry", *args)
else:
@@ -142,7 +147,7 @@ class PoetryAPI(object):
any((self.path / indicator).exists() for indicator in self.INDICATOR_FILES)
)
def freeze(self):
def freeze(self, freeze_full_environment=False):
lines = self.config.run("show", cwd=str(self.path)).splitlines()
lines = [[p for p in line.split(' ') if p] for line in lines]
return {"pip": [parts[0]+'=='+parts[1]+' # '+' '.join(parts[2:]) for parts in lines]}

View File

@@ -7,7 +7,7 @@ from .requirements import SimpleSubstitution
class PriorityPackageRequirement(SimpleSubstitution):
name = ("cython", "numpy", "setuptools", )
name = ("cython", "numpy", "setuptools", "pip", )
optional_package_names = tuple()
def __init__(self, *args, **kwargs):
@@ -50,31 +50,39 @@ class PriorityPackageRequirement(SimpleSubstitution):
"""
# if we replaced setuptools, it means someone requested it, and since freeze will not contain it,
# we need to add it manually
if not self._replaced_packages or "setuptools" not in self._replaced_packages:
if not self._replaced_packages:
return list_of_requirements
try:
for k, lines in list_of_requirements.items():
# k is either pip/conda
if k not in ('pip', 'conda'):
continue
for i, line in enumerate(lines):
if not line or line.lstrip().startswith('#'):
continue
parts = [p for p in re.split(r'\s|=|\.|<|>|~|!|@|#', line) if p]
if not parts:
continue
# if we found setuptools, do nothing
if parts[0] == "setuptools":
return list_of_requirements
if "pip" in self._replaced_packages:
full_freeze = PackageManager.out_of_scope_freeze(freeze_full_environment=True)
# now let's look for pip
pips = [line for line in full_freeze.get("pip", []) if line.split("==")[0] == "pip"]
if pips and "pip" in list_of_requirements:
list_of_requirements["pip"] = [pips[0]] + list_of_requirements["pip"]
# if we are here it means we have not found setuptools
# we should add it:
if "pip" in list_of_requirements:
list_of_requirements["pip"] = [self._replaced_packages["setuptools"]] + list_of_requirements["pip"]
if "setuptools" in self._replaced_packages:
try:
for k, lines in list_of_requirements.items():
# k is either pip/conda
if k not in ('pip', 'conda'):
continue
for i, line in enumerate(lines):
if not line or line.lstrip().startswith('#'):
continue
parts = [p for p in re.split(r'\s|=|\.|<|>|~|!|@|#', line) if p]
if not parts:
continue
# if we found setuptools, do nothing
if parts[0] == "setuptools":
return list_of_requirements
except Exception as ex: # noqa
return list_of_requirements
# if we are here it means we have not found setuptools
# we should add it:
if "pip" in list_of_requirements:
list_of_requirements["pip"] = [self._replaced_packages["setuptools"]] + list_of_requirements["pip"]
except Exception as ex: # noqa
return list_of_requirements
return list_of_requirements

View File

@@ -310,6 +310,12 @@ class PytorchRequirement(SimpleSubstitution):
# yes this is for linux python 2.7 support, this is the only python 2.7 we support...
if py_ver and py_ver[0] == '2' and len(parts) > 3 and not parts[3].endswith('u'):
continue
# check if this an actual match
if not req.compare_version(v) or \
(last_v and SimpleVersion.compare_versions(last_v, '>', v, ignore_sub_versions=False)):
continue
# update the closest matched version (from above)
if not closest_v:
closest_v = v
@@ -318,10 +324,6 @@ class PytorchRequirement(SimpleSubstitution):
SimpleVersion.compare_versions(
version_a=v, op='>=', version_b=req.specs[0][1], num_parts=3):
closest_v = v
# check if this an actual match
if not req.compare_version(v) or \
(last_v and SimpleVersion.compare_versions(last_v, '>', v, ignore_sub_versions=False)):
continue
url = '/'.join(torch_url.split('/')[:-1] + l.split('/'))
last_v = v
@@ -475,6 +477,23 @@ class PytorchRequirement(SimpleSubstitution):
return self.match_version(req, base).replace(" ", "\n")
def replace(self, req):
# we first try to resolve things ourselves because pytorch pip is not always picking the correct
# versions from their pip repository
resolve_algorithm = str(self.config.get("agent.package_manager.pytorch_resolve", "pip")).lower()
if resolve_algorithm == "direct":
# noinspection PyBroadException
try:
new_req = self._replace(req)
if new_req:
self._original_req.append((req, new_req))
return new_req
except Exception:
pass
elif resolve_algorithm not in ("direct", "pip"):
print("Warning: `agent.package_manager.pytorch_resolve={}` "
"unrecognized, default to `pip`".format(resolve_algorithm))
# check if package is already installed with system packages
self.validate_python_version()
@@ -493,7 +512,15 @@ class PytorchRequirement(SimpleSubstitution):
if req.specs and len(req.specs) == 1 and req.specs[0][0] == "==":
# remove any +cu extension and let pip resolve that
line = "{} {}".format(req.name, req.format_specs(max_num_parts=3))
# and add .* if we have 3 parts version to deal with nvidia container 'a' version
# i.e. "1.13.0" -> "1.13.0.*" so it should match preinstalled "1.13.0a0+936e930"
spec_3_parts = req.format_specs(num_parts=3)
spec_max3_parts = req.format_specs(max_num_parts=3)
if spec_3_parts == spec_max3_parts and not spec_max3_parts.endswith("*"):
line = "{} {}.*".format(req.name, spec_max3_parts)
else:
line = "{} {}".format(req.name, spec_max3_parts)
if req.marker:
line += " ; {}".format(req.marker)
else:
@@ -558,6 +585,19 @@ class PytorchRequirement(SimpleSubstitution):
:param list_of_requirements: {'pip': ['a==1.0', ]}
:return: {'pip': ['a==1.0', ]}
"""
def build_specific_version_req(a_line, a_name, a_new_req):
try:
r = Requirement.parse(a_line)
wheel_parts = r.uri.split("/")[-1].split('-')
version = str(wheel_parts[1].split('%')[0].split('+')[0])
new_r = Requirement.parse("{} == {} # {}".format(a_name, version, str(a_new_req)))
if new_r.line:
# great it worked!
return new_r.line
except: # noqa
pass
return None
if not self._original_req:
return list_of_requirements
try:
@@ -581,9 +621,18 @@ class PytorchRequirement(SimpleSubstitution):
if req.local_file:
lines[i] = '{}'.format(str(new_req))
else:
lines[i] = '{} # {}'.format(str(req), str(new_req))
# try to rebuild requirements with specific version:
new_line = build_specific_version_req(line, req.req.name, new_req)
if new_line:
lines[i] = new_line
else:
lines[i] = '{} # {}'.format(str(req), str(new_req))
else:
lines[i] = '{} # {}'.format(line, str(new_req))
new_line = build_specific_version_req(line, req.req.name, new_req)
if new_line:
lines[i] = new_line
else:
lines[i] = '{} # {}'.format(line, str(new_req))
break
except:
pass

View File

@@ -240,6 +240,23 @@ class SimpleVersion:
if not version_b:
return True
# remove trailing "*" in both
if "*" in version_a:
ignore_sub_versions = True
while version_a.endswith(".*"):
version_a = version_a[:-2]
if version_a == "*":
version_a = ""
num_parts = min(len(version_a.split('.')), len(version_b.split('.')), )
if "*" in version_b:
ignore_sub_versions = True
while version_b.endswith(".*"):
version_b = version_b[:-2]
if version_b == "*":
version_b = ""
num_parts = min(len(version_a.split('.')), len(version_b.split('.')), )
if not num_parts:
num_parts = max(len(version_a.split('.')), len(version_b.split('.')), )

View File

@@ -1,3 +1,4 @@
from tempfile import mkdtemp
from typing import Text
from furl import furl
@@ -20,7 +21,16 @@ class RequirementsTranslator(object):
config = session.config
self.cache_dir = cache_dir or Path(config["agent.pip_download_cache.path"]).expanduser().as_posix()
self.enabled = config["agent.pip_download_cache.enabled"]
Path(self.cache_dir).mkdir(parents=True, exist_ok=True)
# noinspection PyBroadException
try:
Path(self.cache_dir).mkdir(parents=True, exist_ok=True)
except Exception:
temp_cache_folder = mkdtemp(prefix='pip_download_cache.')
print("Failed creating pip download cache folder at `{}` reverting to `{}`".format(
self.cache_dir, temp_cache_folder))
self.cache_dir = temp_cache_folder
Path(self.cache_dir).mkdir(parents=True, exist_ok=True)
self.config = Config()
self.pip = SystemPip(interpreter=interpreter, session=self._session)
self._translate_back = {}

View File

@@ -25,7 +25,7 @@ from clearml_agent.helper.base import bash_c, is_windows_platform, select_for_pl
PathLike = Union[Text, Path]
def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False):
def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False, raise_error=False):
try:
output = (
subprocess.check_output(
@@ -37,10 +37,16 @@ def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False):
.strip()
)
except subprocess.CalledProcessError:
if raise_error:
raise
output = None
return output if not strip or not output else output.strip()
def stringify_bash_output(value):
return '' if not value else (value if isinstance(value, str) else value.decode('utf-8'))
def terminate_process(pid, timeout=10., ignore_zombie=True, include_children=False):
# noinspection PyBroadException
try:
@@ -111,10 +117,11 @@ def terminate_all_child_processes(pid=None, timeout=10., include_parent=True):
def get_docker_id(docker_cmd_contains):
# noinspection PyBroadException
try:
containers_running = get_bash_output(cmd='docker ps --no-trunc --format \"{{.ID}}: {{.Command}}\"')
for docker_line in containers_running.split('\n'):
parts = docker_line.split(':')
parts = docker_line.split(':', 1)
if docker_cmd_contains in parts[-1]:
# we found our docker, return it
return parts[0]

View File

@@ -320,6 +320,7 @@ class VCS(object):
self.url, new_url))
self.url = new_url
return
# rewrite ssh URLs only if either ssh port or ssh user are forced in config
if parsed_url.scheme == "ssh" and (
self.session.config.get('agent.force_git_ssh_port', None) or
@@ -334,6 +335,9 @@ class VCS(object):
print("Using SSH credentials - ssh url '{}' with ssh url '{}'".format(
self.url, new_url))
self.url = new_url
return
elif parsed_url.scheme == "ssh":
return
if not self.session.config.agent.translate_ssh:
return
@@ -343,7 +347,7 @@ class VCS(object):
(ENV_AGENT_GIT_PASS.get() or self.session.config.get('agent.git_pass', None)):
# only apply to a specific domain (if requested)
config_domain = \
ENV_AGENT_GIT_HOST.get() or self.session.config.get("git_host", None)
ENV_AGENT_GIT_HOST.get() or self.session.config.get("agent.git_host", None)
if config_domain and config_domain != furl(self.url).host:
return

View File

@@ -139,42 +139,45 @@ class ResourceMonitor(object):
def _daemon(self):
seconds_since_started = 0
reported = 0
while True:
last_report = time()
current_report_frequency = (
self._report_frequency if reported != 0 else self._first_report_sec
)
while (time() - last_report) < current_report_frequency:
# wait for self._sample_frequency seconds, if event set quit
if self._exit_event.wait(1 / self._sample_frequency):
return
# noinspection PyBroadException
try:
self._update_readouts()
except Exception as ex:
log.warning("failed getting machine stats: %s", report_error(ex))
self._failure()
try:
while True:
last_report = time()
current_report_frequency = (
self._report_frequency if reported != 0 else self._first_report_sec
)
while (time() - last_report) < current_report_frequency:
# wait for self._sample_frequency seconds, if event set quit
if self._exit_event.wait(1 / self._sample_frequency):
return
# noinspection PyBroadException
try:
self._update_readouts()
except Exception as ex:
log.warning("failed getting machine stats: %s", report_error(ex))
self._failure()
seconds_since_started += int(round(time() - last_report))
# check if we do not report any metric (so it means the last iteration will not be changed)
seconds_since_started += int(round(time() - last_report))
# check if we do not report any metric (so it means the last iteration will not be changed)
# if we do not have last_iteration, we just use seconds as iteration
# if we do not have last_iteration, we just use seconds as iteration
# start reporting only when we figured out, if this is seconds based, or iterations based
average_readouts = self._get_average_readouts()
stats = {
# 3 points after the dot
key: round(value, 3) if isinstance(value, float) else [round(v, 3) for v in value]
for key, value in average_readouts.items()
}
# start reporting only when we figured out, if this is seconds based, or iterations based
average_readouts = self._get_average_readouts()
stats = {
# 3 points after the dot
key: round(value, 3) if isinstance(value, float) else [round(v, 3) for v in value]
for key, value in average_readouts.items()
}
# send actual report
if self.send_report(stats):
# clear readouts if this is update was sent
self._clear_readouts()
# send actual report
if self.send_report(stats):
# clear readouts if this is update was sent
self._clear_readouts()
# count reported iterations
reported += 1
# count reported iterations
reported += 1
except Exception as ex:
log.exception("Error reporting monitoring info: %s", str(ex))
def _update_readouts(self):
readouts = self._machine_stats()

View File

@@ -106,7 +106,7 @@ class Session(_Session):
if os.path.exists(os.path.expanduser(os.path.expandvars(f))):
self._config_file = f
break
self.api_client = APIClient(session=self, api_version="2.5")
self._api_client = None
# HACK make sure we have python version to execute,
# if nothing was specific, use the one that runs us
def_python = ConfigValue(self.config, "agent.default_python")
@@ -133,7 +133,7 @@ class Session(_Session):
# override with environment variables
# cuda_version & cudnn_version are overridden with os.environ here, and normalized in the next section
for config_key, env_config in ENVIRONMENT_CONFIG.items():
# check if the propery is of a list:
# check if the property is of a list:
if config_key.endswith('.0'):
if all(not i.get() for i in env_config.values()):
continue
@@ -167,6 +167,16 @@ class Session(_Session):
if not kwargs.get('only_load_config'):
self.create_cache_folders()
@property
def api_client(self):
if self._api_client is None:
self._api_client = APIClient(session=self, api_version="2.5")
return self._api_client
@api_client.setter
def api_client(self, value):
self._api_client = value
@staticmethod
def get_logger(name):
logger = logging.getLogger(name)

View File

@@ -1 +1 @@
__version__ = '1.5.0rc0'
__version__ = '1.5.3rc4'

View File

@@ -1,16 +1,36 @@
#!/bin/sh
#!/bin/bash +x
CLEARML_FILES_HOST=${CLEARML_FILES_HOST:-$TRAINS_FILES_HOST}
if [ -n "$SHUTDOWN_IF_NO_ACCESS_KEY" ] && [ -z "$CLEARML_API_ACCESS_KEY" ] && [ -z "$TRAINS_API_ACCESS_KEY" ]; then
echo "CLEARML_API_ACCESS_KEY was not provided, service will not be started"
exit 0
fi
export CLEARML_FILES_HOST=${CLEARML_FILES_HOST:-$TRAINS_FILES_HOST}
if [ -z "$CLEARML_FILES_HOST" ]; then
CLEARML_HOST_IP=${CLEARML_HOST_IP:-${TRAINS_HOST_IP:-$(curl -s https://ifconfig.me/ip)}}
fi
CLEARML_FILES_HOST=${CLEARML_FILES_HOST:-${TRAINS_FILES_HOST:-"http://$CLEARML_HOST_IP:8081"}}
CLEARML_WEB_HOST=${CLEARML_WEB_HOST:-${TRAINS_WEB_HOST:-"http://$CLEARML_HOST_IP:8080"}}
CLEARML_API_HOST=${CLEARML_API_HOST:-${TRAINS_API_HOST:-"http://$CLEARML_HOST_IP:8008"}}
export CLEARML_FILES_HOST=${CLEARML_FILES_HOST:-${TRAINS_FILES_HOST:-"http://$CLEARML_HOST_IP:8081"}}
export CLEARML_WEB_HOST=${CLEARML_WEB_HOST:-${TRAINS_WEB_HOST:-"http://$CLEARML_HOST_IP:8080"}}
export CLEARML_API_HOST=${CLEARML_API_HOST:-${TRAINS_API_HOST:-"http://$CLEARML_HOST_IP:8008"}}
echo $CLEARML_FILES_HOST $CLEARML_WEB_HOST $CLEARML_API_HOST 1>&2
python3 -m pip install -q -U "clearml-agent${CLEARML_AGENT_UPDATE_VERSION:-$TRAINS_AGENT_UPDATE_VERSION}"
clearml-agent daemon --services-mode --queue services --create-queue --docker "${CLEARML_AGENT_DEFAULT_BASE_DOCKER:-$TRAINS_AGENT_DEFAULT_BASE_DOCKER}" --cpu-only ${CLEARML_AGENT_EXTRA_ARGS:-$TRAINS_AGENT_EXTRA_ARGS}
if [[ "$CLEARML_AGENT_UPDATE_VERSION" =~ ^[0-9]{1,3}\.[0-9]{1,3}(\.[0-9]{1,3}([a-zA-Z]{1,3}[0-9]{1,3})?)?$ ]]
then
CLEARML_AGENT_UPDATE_VERSION="==$CLEARML_AGENT_UPDATE_VERSION"
fi
DAEMON_OPTIONS=${CLEARML_AGENT_DAEMON_OPTIONS:---services-mode --create-queue}
QUEUES=${CLEARML_AGENT_QUEUES:-services}
if [ -z "$CLEARML_AGENT_NO_UPDATE" ]; then
if [ -n "$CLEARML_AGENT_UPDATE_REPO" ]; then
python3 -m pip install -q -U $CLEARML_AGENT_UPDATE_REPO
else
python3 -m pip install -q -U "clearml-agent${CLEARML_AGENT_UPDATE_VERSION:-$TRAINS_AGENT_UPDATE_VERSION}"
fi
fi
clearml-agent daemon $DAEMON_OPTIONS --queue $QUEUES --docker "${CLEARML_AGENT_DEFAULT_BASE_DOCKER:-$TRAINS_AGENT_DEFAULT_BASE_DOCKER}" --cpu-only ${CLEARML_AGENT_EXTRA_ARGS:-$TRAINS_AGENT_EXTRA_ARGS}

View File

@@ -13,6 +13,15 @@ api {
}
agent {
# unique name of this worker, if None, created based on hostname:process_id
# Override with os environment: CLEARML_WORKER_ID
# worker_id: "clearml-agent-machine1:gpu0"
worker_id: ""
# worker name, replaces the hostname when creating a unique name for this worker
# Override with os environment: CLEARML_WORKER_NAME
# worker_name: "clearml-agent-machine1"
worker_name: ""
# Set GIT user/pass credentials (if user/pass are set, GIT protocol will be set to https)
# leave blank for GIT SSH credentials (set force_git_ssh_protocol=true to force SSH protocol)
# **Notice**: GitHub personal token is equivalent to password, you can put it directly into `git_pass`
@@ -20,11 +29,11 @@ agent {
# https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token
# https://support.atlassian.com/bitbucket-cloud/docs/app-passwords/
# https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html
git_user=""
git_pass=""
# git_user: ""
# git_pass: ""
# Limit credentials to a single domain, for example: github.com,
# all other domains will use public access (no user/pass). Default: always send user/pass for any VCS domain
git_host=""
# git_host: ""
# Force GIT protocol to use SSH regardless of the git url (Assumes GIT user/pass are blank)
force_git_ssh_protocol: false
@@ -33,16 +42,6 @@ agent {
# Force a specific SSH username when converting http to ssh links (the default username is 'git')
# force_git_ssh_user: git
# unique name of this worker, if None, created based on hostname:process_id
# Overridden with os environment: CLEARML_WORKER_ID
# worker_id: "clearml-agent-machine1:gpu0"
worker_id: ""
# worker name, replaces the hostname when creating a unique name for this worker
# Overridden with os environment: CLEARML_WORKER_NAME
# worker_name: "clearml-agent-machine1"
worker_name: ""
# Set the python version to use when creating the virtual environment and launching the experiment
# Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6"
# The default is the python executing the clearml_agent
@@ -51,6 +50,22 @@ agent {
# specific python version and the system supports multiple python the agent will use the requested python version)
# ignore_requested_python_version: true
# Force the root folder of the git repository (instead of the working directory) into the PYHTONPATH
# default false, only the working directory will be added to the PYHTONPATH
# force_git_root_python_path: false
# if set, use GIT_ASKPASS to pass user/pass when cloning / fetch repositories
# it solves passing user/token to git submodules.
# this is a safer way to ensure multiple users using the same repository will
# not accidentally leak credentials
# Only supported on Linux systems, it will be the default in future releases
# enable_git_ask_pass: false
# in docker mode, if container's entrypoint automatically activated a virtual environment
# use the activated virtual environment and install everything there
# set to False to disable, and always create a new venv inheriting from the system_site_packages
# docker_use_activated_venv: true
# select python package manager:
# currently supported: pip, conda and poetry
# if "pip" or "conda" are used, the agent installs the required packages
@@ -64,9 +79,10 @@ agent {
type: pip,
# specify pip version to use (examples "<20.2", "==19.3.1", "", empty string will install the latest version)
# pip_version: "<21"
# pip_version: ["<20.2 ; python_version < '3.10'", "<22.3 ; python_version >= '3.10'"]
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
# poetry_version: "<2",
# poetry_install_extra_args: ["-v"]
# virtual environment inheres packages from system
system_site_packages: false,
@@ -77,25 +93,42 @@ agent {
# extra_index_url: ["https://allegroai.jfrog.io/clearml/api/pypi/public/simple"]
extra_index_url: []
# additional flags to use when calling pip install, example: ["--use-deprecated=legacy-resolver", ]
# extra_pip_install_flags: []
# control the pytorch wheel resolving algorithm, options are: "pip", "direct"
# "pip" (default): would automatically detect the cuda version, and supply pip with the correct
# extra-index-url, based on pytorch.org tables
# "direct": would resolve a direct link to the pytorch wheel by parsing the pytorch.org pip repository
# and matching the automatically detected cuda version with the required pytorch wheel.
# if the exact cuda version is not found for the required pytorch wheel, it will try
# a lower cuda version until a match is found
#
# pytorch_resolve: "pip"
# additional conda channels to use when installing with conda package manager
conda_channels: ["pytorch", "conda-forge", "defaults", ]
# conda_full_env_update: false
# conda_env_as_base_docker: false
# set the priority packages to be installed before the rest of the required packages
# Note: this only controls the installation order of existing requirement packages (and does not add additional packages)
# priority_packages: ["cython", "numpy", "setuptools", ]
# set the optional priority packages to be installed before the rest of the required packages,
# In case a package installation fails, the package will be ignored,
# and the virtual environment process will continue
# Note: this only controls the installation order of existing requirement packages (and does not add additional packages)
# priority_optional_packages: ["pygobject", ]
# set the post packages to be installed after all the rest of the required packages
# Note: this only controls the installation order of existing requirement packages (and does not add additional packages)
# post_packages: ["horovod", ]
# set the optional post packages to be installed after all the rest of the required packages,
# In case a package installation fails, the package will be ignored,
# and the virtual environment process will continue
# Note: this only controls the installation order of existing requirement packages (and does not add additional packages)
# post_optional_packages: []
# set to True to support torch nightly build installation,
@@ -152,6 +185,7 @@ agent {
# optional arguments to pass to docker image
# these are local for this agent and will not be updated in the experiment's docker_cmd section
# You can also pass host environments into the container with ["-e", "HOST_NAME=$HOST_NAME"]
# extra_docker_arguments: ["--ipc=host", "-v", "/mnt/host/data:/mnt/data"]
# optional shell script to run in docker when started before the experiment is started
@@ -162,6 +196,12 @@ agent {
# change to false to skip installation and decrease docker spin up time
# docker_install_opencv_libs: true
# Allow passing host environments into docker container with Task's docker container args
# Example "-e HOST_NAME=$HOST_NAME"
# NOTICE this might introduce security risk allowing access to keys/secret on the host machine1
# Use with care!
# docker_allow_host_environ: false
# set to true in order to force "docker pull" before running an experiment using a docker image.
# This makes sure the docker image is updated.
docker_force_pull: false
@@ -212,8 +252,6 @@ agent {
# # no repository matching required
# repository: ""
# }
# # no container image matching required (allow to replace one requested container with another)
# container: ""
# # no repository matching required
# project: ""
# }
@@ -225,7 +263,7 @@ agent {
enable_task_env: false
# CUDA versions used for Conda setup & solving PyTorch wheel packages
# it Should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
# Should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
# cuda_version: 10.1
# cudnn_version: 7.6
@@ -429,42 +467,47 @@ sdk {
# Apply top-level environment section from configuration into os.environ
apply_environment: true
# Top-level environment section is in the form of:
# environment {
# key: value
# ...
# }
# and is applied to the OS environment as `key=value` for each key/value pair
# Apply top-level files section from configuration into local file system
apply_files: true
# Top-level files section allows auto-generating files at designated paths with a predefined contents
# and target format. Options include:
# contents: the target file's content, typically a string (or any base type int/float/list/dict etc.)
# format: a custom format for the contents. Currently supported value is `base64` to automatically decode a
# base64-encoded contents string, otherwise ignored
# path: the target file's path, may include ~ and inplace env vars
# target_format: format used to encode contents before writing into the target file. Supported values are json,
# yaml, yml and bytes (in which case the file will be written in binary mode). Default is text mode.
# overwrite: overwrite the target file in case it exists. Default is true.
#
# Example:
# files {
# myfile1 {
# contents: "The quick brown fox jumped over the lazy dog"
# path: "/tmp/fox.txt"
# }
# myjsonfile {
# contents: {
# some {
# nested {
# value: [1, 2, 3, 4]
# }
# }
# }
# path: "/tmp/test.json"
# target_format: json
# }
# }
}
# Environment section (top-level) is applied to the OS environment as `key=value` for each key/value pair
# * enable/disable with `agent.apply_environment` OR `sdk.apply_environment`
# Example:
#
# environment {
# key_a: value_a
# key_b: value_b
# }
# Files section (top-level) allows auto-generating files at designated paths with
# predefined content and target format.
# * enable/disable with `agent.apply_files` OR `sdk.apply_files`
# Files content options include:
# contents: the target file's content, typically a string (or any base type int/float/list/dict etc.)
# format: a custom format for the contents. Currently supported value is `base64` to automatically decode a
# base64-encoded contents string, otherwise ignored
# path: the target file's path, may include ~ and inplace env vars
# target_format: format used to encode contents before writing into the target file. Supported values are json,
# yaml, yml and bytes (in which case the file will be written in binary mode). Default is text mode.
# overwrite: overwrite the target file in case it exists. Default is true.
# mode: set the file mode after writing. use an integer value or a string (e.g. 600 / 777 etc.)
#
# Example:
# files {
# myfile1 {
# contents: "The quick brown fox jumped over the lazy dog"
# path: "/tmp/fox.txt"
# }
# myjsonfile {
# contents: {
# some {
# nested {
# value: [1, 2, 3, 4]
# }
# }
# }
# path: "/tmp/test.json"
# target_format: json
# }
# }

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.1 MiB

After

Width:  |  Height:  |  Size: 1018 KiB

View File

@@ -1,15 +1,15 @@
attrs>=18.0,<20.4.0
attrs>=18.0,<23.0.0
enum34>=0.9,<1.2.0 ; python_version < '3.6'
furl>=2.0.0,<2.2.0
jsonschema>=2.6.0,<3.3.0
jsonschema>=2.6.0,<5.0.0
pathlib2>=2.3.0,<2.4.0
psutil>=3.4.2,<5.10.0
pyparsing>=2.0.3,<2.5.0
pyparsing>=2.0.3,<3.1.0
python-dateutil>=2.4.2,<2.9.0
pyjwt>=2.4.0,<2.5.0
pyjwt>=2.4.0,<2.7.0
PyYAML>=3.12,<6.1
requests>=2.20.0,<2.29.0
six>=1.13.0,<1.16.0
six>=1.13.0,<1.17.0
typing>=3.6.4,<3.8.0 ; python_version < '3.5'
urllib3>=1.21.1,<1.27.0
virtualenv>=16,<21