Compare commits

...

35 Commits

Author SHA1 Message Date
allegroai
a4315722ab Version bump to vv1.2.1 2022-03-28 18:13:20 +03:00
allegroai
c901bd331c Fix git packages are installed even if commit is given and is preinstalled when using cached virtual environment 2022-03-28 18:11:46 +03:00
allegroai
df97f170a2 Fix clearml-agent init
Use app.clear.ml as default server
Add git token refrences
2022-03-24 22:08:06 +02:00
allegroai
a30a2dad66 Add git personal token docs 2022-03-24 22:07:15 +02:00
allegroai
2432f5bb68 Add CLEARML_AGENT_PROPAGATE_EXITCODE, set to 1 to let clearml-agent execute retrun a nonzero exit code on failure (notice by default we keep the retrun code 0, the exception is the k8s glue with non-restarting Pods, where users would want to get visibility into failing Tasks, do not use unless you know what to expect from k8s) 2022-03-24 22:04:25 +02:00
allegroai
341086d86a Fix vcs packages are reinstalled when same commit version is already installed 2022-03-24 22:03:25 +02:00
allegroai
1163c96438 Add agent.package_manager.force_original_requirements allowing to only use the "org_pip" coming from dev execution (using this prevents editing the installed packages from the UI) 2022-03-24 22:00:33 +02:00
allegroai
4c120d7cd0 Add ability to override container LOCAL_PYTHON, add auto python support (max 3.15) 2022-03-24 21:58:07 +02:00
Jan Stratil
966a9758b8 Add condition to requirements for typing package (python < 3.5) (#103)
- According to the maintainer of the typing package, it is recommended
  to use the typing package with condition for python version since
  for python3.5 and later typing package is useless (as it is in the
  stdlib).
- Typing package can cause some issues so NOT installing it can solve
  some of them.

Co-authored-by: Jan Stratil <jan.stratil@innovatrics.com>
2022-03-23 15:03:37 +02:00
allegroai
f58071fc74 Fix README 2022-03-20 23:24:07 +02:00
allegroai
8712c5e636 Fix PyTorch aarch64 and windows support 2022-03-16 17:40:21 +02:00
allegroai
a51f9bed49 Version bump 2022-03-15 10:04:45 +02:00
allegroai
531e514003 Add custom build script support
Add extra configurations when starting daemon
Propagate token to docker in case credentials are not available
2022-03-15 10:04:25 +02:00
allegroai
2cd9e706c8 Fix user-provided " is unnecessarily replaced to \\" 2022-03-15 10:02:28 +02:00
Idan Tene
e3e6a1dda8 Fix virtualenv python interpreter used (#98)
* Add virtualenv version logging
* Force using requested python interpreter
2022-02-27 11:25:25 +02:00
Andrey Okhotnikov
92b5ce61a0 Add additional k8s-glue dockerfiles (#94) 2022-02-21 15:59:50 +02:00
pollfly
36073ad488 Fix links (#100) 2022-02-17 12:04:11 +02:00
allegroai
d89d0f9ff5 Fix pathlib2 six conflict, version bump 2022-02-09 18:29:04 +02:00
allegroai
14c48d0a78 Fix FORCE_LOCAL_CLEARML_AGENT_WHEEL when running from a Windows host 2022-02-09 18:28:17 +02:00
allegroai
b1ee3e105b Version bump 2022-02-07 20:05:03 +02:00
allegroai
1f53c4fd1b Fix agent fails to check out code from main branch when branch/commit is not explicitly specified 2022-02-07 20:04:08 +02:00
allegroai
bfed3ccf4d Fix agent attempts to check out code when in standalone mode 2022-02-07 20:03:08 +02:00
pollfly
d521482409 Add spaces to help menu (#96) 2022-02-06 12:45:21 +02:00
allegroai
53eba5658f Fix conda package manager listed packages with local links (@ file://) should ignore the local package if it does not exist
Fix cuda patch version support in conda
2022-02-02 16:33:07 +02:00
allegroai
bb64e4a850 Fix hide_docker_command_env_vars mode to include URL passwords and handle env vars containing docker commands 2022-02-02 16:30:34 +02:00
allegroai
771690d5c0 Fix ENV_API_DEFAULT_REQ_METHOD no default value causes ValueError if not specified 2022-01-31 12:39:39 +02:00
pollfly
d39e30995a Fix links (#93) 2022-01-27 12:15:36 +02:00
allegroai
363aaeaba8 Fix symbolic links not copied from cached VCS into working copy. Windows platform will result with default copy content instead of original symbolic link (issue #89) 2022-01-23 10:42:11 +02:00
allegroai
fa1307e62c Add agent.poetry_version to specify poetry version (and force installation of poetry if missing) 2022-01-23 10:40:05 +02:00
allegroai
e7c9e9695b Fix using deprecated abc support 2022-01-23 10:39:13 +02:00
Mal Miller
bf07b7f76d Add environment variable for request method (#91)
* Add environment variable for default request method
2022-01-12 20:29:17 +02:00
allegroai
5afb604e3d Fix default_python set to None 2022-01-07 15:12:27 +02:00
allegroai
b3e8be6296 Add agent.force_git_root_python_path configuration setting to force adding the git repository root folder to the PYTHONPATH (if set working directory is not added to the PYHTONPATH) 2022-01-07 15:11:59 +02:00
allegroai
2cb452b1c2 Version bump 2021-12-29 13:21:31 +02:00
allegroai
938fcc4530 Add build --force-docker command line argument to the to allow ignoring task container data 2021-12-29 13:21:25 +02:00
27 changed files with 1133 additions and 252 deletions

View File

@@ -29,7 +29,7 @@ ML-Ops scheduler & orchestration solution supporting Linux, macOS and Windows**
It is a zero configuration fire-and-forget execution agent, providing a full ML/DL cluster solution.
**Full Automation in 5 steps**
1. ClearML Server [self-hosted](https://github.com/allegroai/clearml-server) or [free tier hosting](https://app.community.clear.ml)
1. ClearML Server [self-hosted](https://github.com/allegroai/clearml-server) or [free tier hosting](https://app.clear.ml)
2. `pip install clearml-agent` ([install](#installing-the-clearml-agent) the ClearML Agent on any GPU machine: on-premises / cloud / ...)
3. Create a [job](https://github.com/allegroai/clearml/docs/clearml-task.md) or Add [ClearML](https://github.com/allegroai/clearml) to your code with just 2 lines
4. Change the [parameters](#using-the-clearml-agent) in the UI & schedule for [execution](#using-the-clearml-agent) (or automate with an [AutoML pipeline](#automl-and-orchestration-pipelines-))
@@ -37,8 +37,8 @@ It is a zero configuration fire-and-forget execution agent, providing a full ML/
"All the Deep/Machine-Learning DevOps your research needs, and then some... Because ain't nobody got time for that"
**Try ClearML now** [Self Hosted](https://github.com/allegroai/clearml-server) or [Free tier Hosting](https://app.community.clear.ml)
<a href="https://app.community.clear.ml"><img src="https://github.com/allegroai/clearml-agent/blob/master/docs/screenshots.gif?raw=true" width="100%"></a>
**Try ClearML now** [Self Hosted](https://github.com/allegroai/clearml-server) or [Free tier Hosting](https://app.clear.ml)
<a href="https://app.clear.ml"><img src="https://github.com/allegroai/clearml-agent/blob/master/docs/screenshots.gif?raw=true" width="100%"></a>
### Simple, Flexible Experiment Orchestration
**The ClearML Agent was built to address the DL/ML R&D DevOps needs:**
@@ -60,6 +60,8 @@ It is a zero configuration fire-and-forget execution agent, providing a full ML/
### Kubernetes Integration (Optional)
We think Kubernetes is awesome, but it should be a choice.
We designed `clearml-agent` so you can run bare-metal or inside a pod with any mix that fits your environment.
Find Dockerfiles in the [docker](./docker) dir and a helm Chart in https://github.com/allegroai/clearml-helm-charts
#### Benefits of integrating existing K8s with ClearML-Agent
- ClearML-Agent adds the missing scheduling capabilities to K8s
- Allowing for more flexible automation from code
@@ -219,7 +221,7 @@ clearml-agent daemon --queue important_jobs default
```
The **ClearML Agent** will first try to pull jobs from the `important_jobs` queue, only then it will fetch a job from the `default` queue.
Adding queues, managing job order within a queue and moving jobs between queues, is available using the Web UI, see example on our [free server](https://app.community.clear.ml/workers-and-queues/queues)
Adding queues, managing job order within a queue and moving jobs between queues, is available using the Web UI, see example on our [free server](https://app.clear.ml/workers-and-queues/queues)
##### Stopping the ClearML Agent

View File

@@ -12,7 +12,7 @@ from clearml_agent.definitions import FileBuffering, CONFIG_FILE
from clearml_agent.helper.base import reverse_home_folder_expansion, chain_map, named_temporary_file
from clearml_agent.helper.process import ExitStatus
from . import interface, session, definitions, commands
from .errors import ConfigFileNotFound, Sigterm, APIError
from .errors import ConfigFileNotFound, Sigterm, APIError, CustomBuildScriptFailed
from .helper.trace import PackageTrace
from .interface import get_parser
@@ -44,6 +44,8 @@ def run_command(parser, args, command_name):
debug = command._session.debug_mode
func = getattr(command, command_name)
return func(**args_dict)
except CustomBuildScriptFailed as e:
command_class.exit(e.message, e.errno)
except ConfigFileNotFound:
message = 'Cannot find configuration file in "{}".\n' \
'To create a configuration file, run:\n' \

View File

@@ -11,6 +11,11 @@
# Set GIT user/pass credentials (if user/pass are set, GIT protocol will be set to https)
# leave blank for GIT SSH credentials (set force_git_ssh_protocol=true to force SSH protocol)
# **Notice**: GitHub personal token is equivalent to password, you can put it directly into `git_pass`
# To learn how to generate git token GitHub/Bitbucket/GitLab:
# https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token
# https://support.atlassian.com/bitbucket-cloud/docs/app-passwords/
# https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html
# git_user: ""
# git_pass: ""
# git_host: ""
@@ -30,6 +35,15 @@
# specific python version and the system supports multiple python the agent will use the requested python version)
# ignore_requested_python_version: true
# Force the root folder of the git repository (instead of the working directory) into the PYHTONPATH
# default false, only the working directory will be added to the PYHTONPATH
# force_git_root_python_path: false
# in docker mode, if container's entrypoint automatically activated a virtual environment
# use the activated virtual environment and install everything there
# set to False to disable, and always create a new venv inheriting from the system_site_packages
# docker_use_activated_venv: true
# select python package manager:
# currently supported: pip, conda and poetry
# if "pip" or "conda" are used, the agent installs the required packages
@@ -44,6 +58,8 @@
# specify pip version to use (examples "<20", "==19.3.1", "", empty string will install the latest version)
pip_version: "<20.2",
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
# poetry_version: "<2",
# virtual environment inheres packages from system
system_site_packages: false,
@@ -201,6 +217,7 @@
hide_docker_command_env_vars {
enabled: true
extra_keys: []
parse_embedded_urls: true
}
# allow to set internal mount points inside the docker,
@@ -261,4 +278,34 @@
# target_format: json
# }
# }
# Specifies a custom environment setup script to be executed instead of installing a virtual environment.
# If provided, this script is executed following Git cloning. Script command may include environment variable and
# will be expanded before execution (e.g. "$CLEARML_GIT_ROOT/script.sh").
# The script can also be specified using the CLEARML_AGENT_CUSTOM_BUILD_SCRIPT environment variable.
#
# When running the script, the following environment variables will be set:
# - CLEARML_CUSTOM_BUILD_TASK_CONFIG_JSON: specifies a path to a temporary files containing the complete task
# contents in JSON format
# - CLEARML_TASK_SCRIPT_ENTRY: task entrypoint script as defined in the task's script section
# - CLEARML_TASK_WORKING_DIR: task working directory as defined in the task's script section
# - CLEARML_VENV_PATH: path to the agent's default virtual environment path (as defined in the configuration)
# - CLEARML_GIT_ROOT: path to the cloned Git repository
# - CLEARML_CUSTOM_BUILD_OUTPUT: a path to a non-existing file that may be created by the script. If created,
# this file must be in the following JSON format:
# ```json
# {
# "binary": "/absolute/path/to/python-executable",
# "entry_point": "/absolute/path/to/task-entrypoint-script",
# "working_dir": "/absolute/path/to/task-working/dir"
# }
# ```
# If provided, the agent will use these instead of the predefined task script section to execute the task and will
# skip virtual environment creation.
#
# In case the custom script returns with a non-zero exit code, the agent will fail with the same exit code.
# In case the custom script is specified but does not exist, or if the custom script does not write valid content
# into the file specified in CLEARML_CUSTOM_BUILD_OUTPUT, the agent will emit a warning and continue with the
# standard flow.
custom_build_script: ""
}

View File

@@ -15,6 +15,17 @@ ENV_NO_DEFAULT_SERVER = EnvEntry("CLEARML_NO_DEFAULT_SERVER", "TRAINS_NO_DEFAULT
ENV_DISABLE_VAULT_SUPPORT = EnvEntry('CLEARML_AGENT_DISABLE_VAULT_SUPPORT', type=bool)
ENV_ENABLE_ENV_CONFIG_SECTION = EnvEntry('CLEARML_AGENT_ENABLE_ENV_CONFIG_SECTION', type=bool)
ENV_ENABLE_FILES_CONFIG_SECTION = EnvEntry('CLEARML_AGENT_ENABLE_FILES_CONFIG_SECTION', type=bool)
ENV_VENV_CONFIGURED = EnvEntry('VIRTUAL_ENV', type=str)
ENV_PROPAGATE_EXITCODE = EnvEntry("CLEARML_AGENT_PROPAGATE_EXITCODE", type=bool, default=False)
ENV_INITIAL_CONNECT_RETRY_OVERRIDE = EnvEntry(
'CLEARML_AGENT_INITIAL_CONNECT_RETRY_OVERRIDE', default=True, converter=safe_text_to_bool
)
"""
Experimental option to set the request method for all API requests and auth login.
This could be useful when GET requests with payloads are blocked by a server as
POST requests can be used instead.
However this has not been vigorously tested and may have unintended consequences.
"""
ENV_API_DEFAULT_REQ_METHOD = EnvEntry("CLEARML_API_DEFAULT_REQ_METHOD", default="GET")

View File

@@ -5,10 +5,17 @@ import six
from .apimodel import ApiModel
from .datamodel import DataModel
from .defs import ENV_API_DEFAULT_REQ_METHOD
if ENV_API_DEFAULT_REQ_METHOD.get().upper() not in ("GET", "POST"):
raise ValueError(
"CLEARML_API_DEFAULT_REQ_METHOD environment variable must be 'get' or 'post' (any case is allowed)."
)
class Request(ApiModel):
_method = 'get'
_method = ENV_API_DEFAULT_REQ_METHOD.get(default="get")
def __init__(self, **kwargs):
if kwargs:

View File

@@ -15,7 +15,7 @@ from six.moves.urllib.parse import urlparse, urlunparse
from .callresult import CallResult
from .defs import ENV_VERBOSE, ENV_HOST, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_WEB_HOST, ENV_FILES_HOST, ENV_AUTH_TOKEN, \
ENV_NO_DEFAULT_SERVER, ENV_DISABLE_VAULT_SUPPORT, ENV_INITIAL_CONNECT_RETRY_OVERRIDE
ENV_NO_DEFAULT_SERVER, ENV_DISABLE_VAULT_SUPPORT, ENV_INITIAL_CONNECT_RETRY_OVERRIDE, ENV_API_DEFAULT_REQ_METHOD
from .request import Request, BatchRequest
from .token_manager import TokenManager
from ..config import load
@@ -142,7 +142,7 @@ class Session(TokenManager):
"Could not find host server definition "
"(missing `~/clearml.conf` or Environment CLEARML_API_HOST)\n"
"To get started with ClearML: setup your own `clearml-server`, "
"or create a free account at https://app.community.clear.ml and run `clearml-agent init`"
"or create a free account at https://app.clear.ml and run `clearml-agent init`"
)
self.__host = host.strip("/")
@@ -206,7 +206,7 @@ class Session(TokenManager):
http_retries_config = dict(**http_retries_config)
http_retries_config['connect'] = connect_retries
return http_retries_config, get_http_session_with_retry(**http_retries_config)
return http_retries_config, get_http_session_with_retry(config=self.config or None, **http_retries_config)
def load_vaults(self):
if not self.check_min_api_version("2.15") or self.feature_set == "basic":
@@ -615,6 +615,7 @@ class Session(TokenManager):
try:
data = {"expiration_sec": exp} if exp else {}
res = self._send_request(
method=ENV_API_DEFAULT_REQ_METHOD.get(default="get"),
service="auth",
action="login",
auth=auth,

View File

@@ -11,10 +11,10 @@ from clearml_agent.backend_config.defs import LOCAL_CONFIG_FILES
description = """
Please create new clearml credentials through the profile page in your `clearml-server` web app,
or create a free account at https://app.community.clear.ml/profile
Please create new clearml credentials through the settings page in your `clearml-server` web app,
or create a free account at https://app.clear.ml/settings/webapp-configuration
In the profile page, press "Create new credentials", then press "Copy to clipboard".
In the settings > workspace page, press "Create new credentials", then press "Copy to clipboard".
Paste copied configuration here:
"""
@@ -27,9 +27,9 @@ except Exception:
host_description = """
Editing configuration file: {CONFIG_FILE}
Enter the url of the clearml-server's Web service, for example: {HOST}
Enter the url of the clearml-server's Web service, for example: {HOST} or https://app.clear.ml
""".format(
CONFIG_FILE=LOCAL_CONFIG_FILES[0],
CONFIG_FILE=LOCAL_CONFIG_FILES[-1],
HOST=def_host,
)
@@ -84,7 +84,7 @@ def main():
host = input_url('API Host', api_server)
else:
print(host_description)
host = input_url('WEB Host', '')
host = input_url('WEB Host', 'https://app.clear.ml')
parsed_host = verify_url(host)
api_host, files_host, web_host = parse_host(parsed_host, allow_input=True)
@@ -116,9 +116,15 @@ def main():
print('Enter git username for repository cloning (leave blank for SSH key authentication): [] ', end='')
git_user = input()
if git_user.strip():
print('Enter password for user \'{}\': '.format(git_user), end='')
print(
"Git personal token is equivalent to a password, to learn how to generate a token:\n"
" GitHub: https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token\n" # noqa
" Bitbucket: https://support.atlassian.com/bitbucket-cloud/docs/app-passwords/\n"
" GitLab: https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html\n"
)
print('Enter git password token for user \'{}\': '.format(git_user), end='')
git_pass = input()
print('Git repository cloning will be using user={} password={}'.format(git_user, git_pass))
print('Git repository cloning will be using user={} token={}'.format(git_user, git_pass))
else:
git_user = None
git_pass = None
@@ -157,7 +163,7 @@ def main():
' api_server: %s\n' \
' web_server: %s\n' \
' files_server: %s\n' \
' # Credentials are generated using the webapp, %s/profile\n' \
' # Credentials are generated using the webapp, %s/settings\n' \
' # Override with os environment: CLEARML_API_ACCESS_KEY / CLEARML_API_SECRET_KEY\n' \
' credentials {"access_key": "%s", "secret_key": "%s"}\n' \
'}\n\n' % (api_host, web_host, files_host,

View File

@@ -21,6 +21,7 @@ from distutils.spawn import find_executable
from distutils.util import strtobool
from functools import partial
from itertools import chain
from os.path import basename
from tempfile import mkdtemp, NamedTemporaryFile
from time import sleep, time
from typing import Text, Optional, Any, Tuple, List
@@ -31,13 +32,16 @@ import six
from pathlib2 import Path
from pyhocon import ConfigTree, ConfigFactory
from six.moves.urllib.parse import quote
from six.moves.urllib.parse import urlparse, urlunparse
from clearml_agent.backend_api.services import auth as auth_api
from clearml_agent.backend_api.services import queues as queues_api
from clearml_agent.backend_api.services import tasks as tasks_api
from clearml_agent.backend_api.services import workers as workers_api
from clearml_agent.backend_api.session import CallResult
from clearml_agent.backend_api.session.defs import ENV_ENABLE_ENV_CONFIG_SECTION, ENV_ENABLE_FILES_CONFIG_SECTION
from clearml_agent.backend_api.session.defs import (
ENV_ENABLE_ENV_CONFIG_SECTION, ENV_ENABLE_FILES_CONFIG_SECTION,
ENV_VENV_CONFIGURED, ENV_PROPAGATE_EXITCODE, )
from clearml_agent.backend_config.defs import UptimeConf
from clearml_agent.backend_config.utils import apply_environment, apply_files
from clearml_agent.commands.base import resolve_names, ServiceCommandSection
@@ -63,10 +67,17 @@ from clearml_agent.definitions import (
ENV_SSH_AUTH_SOCK,
ENV_AGENT_SKIP_PIP_VENV_INSTALL,
ENV_EXTRA_DOCKER_ARGS,
ENV_CUSTOM_BUILD_SCRIPT, ENV_AGENT_SKIP_PYTHON_ENV_INSTALL, WORKING_STANDALONE_DIR,
)
from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
from clearml_agent.errors import APIError, CommandFailedError, Sigterm
from clearml_agent.errors import (
APIError,
CommandFailedError,
Sigterm,
SkippedCustomBuildScript,
CustomBuildScriptFailed,
)
from clearml_agent.helper.base import (
return_list,
print_parameters,
@@ -216,7 +227,7 @@ class LiteralScriptManager(object):
location = None
location = location or (repo_info and repo_info.root)
if not location:
location = Path(self.venv_folder, "code")
location = Path(self.venv_folder, WORKING_STANDALONE_DIR)
location.mkdir(exist_ok=True, parents=True)
log.debug("selected execution directory: %s", location)
return Text(location), self.write(task, location, execution.entry_point)
@@ -628,7 +639,7 @@ class Worker(ServiceCommandSection):
pass
def run_one_task(self, queue, task_id, worker_args, docker=None, task_session=None):
# type: (Text, Text, WorkerParams, Optional[Text]) -> ()
# type: (Text, Text, WorkerParams, Optional[Text]) -> int
"""
Run one task pulled from queue.
:param queue: ID of queue that task was pulled from
@@ -636,6 +647,8 @@ class Worker(ServiceCommandSection):
:param worker_args: Worker command line arguments
:param task_session: The session for running operations on the passed task
:param docker: Docker image in which the execution task will run
:return: exit code (0 is success)
"""
# start new process and execute task id
# "Running task '{}'".format(task_id)
@@ -696,6 +709,9 @@ class Worker(ServiceCommandSection):
)
if self._impersonate_as_task_owner:
docker_params["auth_token"] = task_session.token
elif self._session.access_key is None or self._session.secret_key is None:
# We're using a token right now
docker_params["auth_token"] = self._session.token
if self._worker_tags:
docker_params["worker_tags"] = self._worker_tags
if self._services_mode:
@@ -718,7 +734,7 @@ class Worker(ServiceCommandSection):
else:
print("Warning: generated docker container name is invalid: {}".format(name))
full_docker_cmd = self.docker_image_func(**docker_params)
full_docker_cmd = self.docker_image_func(env_task_id=task_id, **docker_params)
# if we are using the default docker, update back the Task:
if default_docker:
@@ -834,6 +850,8 @@ class Worker(ServiceCommandSection):
# unregister this worker, it was killed
self._unregister()
return status
def get_task_session(self, user, company):
"""
Get task session for the user by cloning the agent session
@@ -1256,6 +1274,7 @@ class Worker(ServiceCommandSection):
self._session.print_configuration()
def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs):
self._apply_extra_configuration()
# check that we have docker command if we need it
if docker not in (False, None) and not check_if_command_exists("docker"):
@@ -1290,8 +1309,12 @@ class Worker(ServiceCommandSection):
# We are not running a daemon we are killing one.
# find the pid send termination signal and leave
if kwargs.get('stop', False):
return 1 if not self._kill_daemon(dynamic_gpus=dynamic_gpus) else 0
if kwargs.get('stop', False) is not False:
return_code = 0
for worker_id in kwargs.get('stop') or [None]:
if not self._kill_daemon(dynamic_gpus=dynamic_gpus, worker_id=worker_id):
return_code = 1
return return_code
# if we do not need to create queues, make sure they are valid
# match previous behaviour when we validated queue names before everything else
@@ -1770,11 +1793,19 @@ class Worker(ServiceCommandSection):
"ERROR! Failed applying git diff, see diff above.".format(diff))
def _apply_extra_configuration(self):
# store a few things we updated in runtime (TODO: we should list theme somewhere)
agent_config = self._session.config["agent"].copy()
agent_config_keys = ["cuda_version", "cudnn_version", "default_python", "worker_id", "debug"]
try:
self._session.load_vaults()
except Exception as ex:
print("Error: failed applying extra configuration: {}".format(ex))
# merge back
for restore_key in agent_config_keys:
if restore_key in agent_config:
self._session.config["agent"][restore_key] = agent_config[restore_key]
config = self._session.config
default = config.get("agent.apply_environment", False)
if ENV_ENABLE_ENV_CONFIG_SECTION.get(default=default):
@@ -1801,6 +1832,7 @@ class Worker(ServiceCommandSection):
docker=None,
entry_point=None,
install_globally=False,
force_docker=False,
**_
):
if not task_id:
@@ -1809,7 +1841,7 @@ class Worker(ServiceCommandSection):
self._session.print_configuration()
if docker is not False and docker is not None:
return self._build_docker(docker, target, task_id, entry_point)
return self._build_docker(docker, target, task_id, entry_point, force_docker=force_docker)
current_task = self._session.api_client.tasks.get_by_id(task_id)
@@ -1826,13 +1858,7 @@ class Worker(ServiceCommandSection):
requirements = None
if not python_version:
try:
python_version = current_task.script.binary
python_version = python_version.split('/')[-1].replace('python', '')
# if we can cast it, we are good
python_version = '{:.1f}'.format(float(python_version))
except:
python_version = None
python_version = self._get_task_python_version(current_task)
venv_folder, requirements_manager, is_cached = self.install_virtualenv(
venv_dir=target, requested_python_version=python_version, execution_info=execution,
@@ -1856,6 +1882,9 @@ class Worker(ServiceCommandSection):
base_interpreter=package_api.requirements_manager.get_interpreter(),
requirement_substitutions=[OnlyExternalRequirements],
)
# manually update the current state,
# for the external git reference chance (in the replace callback)
package_api.requirements_manager.update_installed_packages_state(package_api.freeze())
# make sure we run the handlers
cached_requirements = \
{k: package_api.requirements_manager.replace(requirements[k] or '')
@@ -1891,7 +1920,7 @@ class Worker(ServiceCommandSection):
return 0
def _build_docker(self, docker, target, task_id, entry_point=None):
def _build_docker(self, docker, target, task_id, entry_point=None, force_docker=False):
self.temp_config_path = safe_mkstemp(
suffix=".cfg", prefix=".clearml_agent.", text=True, name_only=True
@@ -1902,20 +1931,24 @@ class Worker(ServiceCommandSection):
temp_config, docker_image_func = self.get_docker_config_cmd(docker)
self.dump_config(self.temp_config_path, config=temp_config)
self.docker_image_func = docker_image_func
# noinspection PyBroadException
try:
task_container = get_task_container(self._session, task_id)
except Exception:
task_container = {}
if task_container.get('image'):
docker_image = task_container.get('image')
docker_arguments = task_container.get('arguments')
docker_setup_script = task_container.get('setup_shell_script')
docker_image = self._docker_image
docker_arguments = self._docker_arguments
docker_setup_script = None
if force_docker:
print('Ignoring any task container info, using docker image {}'.format(docker_image))
else:
docker_image = self._docker_image
docker_arguments = self._docker_arguments
docker_setup_script = None
# noinspection PyBroadException
try:
task_container = get_task_container(self._session, task_id)
if task_container.get('image'):
docker_image = task_container.get('image')
print('Ignoring default docker image, using task docker image {}'.format(docker_image))
docker_arguments = task_container.get('arguments')
docker_setup_script = task_container.get('setup_shell_script')
except Exception:
pass
print('Building Task {} inside docker image: {} {} setup_script={}\n'.format(
task_id, docker_image, docker_arguments or '', docker_setup_script or ''))
@@ -1978,6 +2011,16 @@ class Worker(ServiceCommandSection):
return
def _get_task_python_version(self, task):
# noinspection PyBroadException
try:
python_ver = task.script.binary
python_ver = python_ver.split('/')[-1].replace('python', '')
# if we can cast it, we are good
return '{:.1f}'.format(float(python_ver))
except Exception:
pass
@resolve_names
def execute(
self,
@@ -2062,7 +2105,7 @@ class Worker(ServiceCommandSection):
)
try:
self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id))
self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker)
status = self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker)
finally:
self.stop_monitor()
self._unregister()
@@ -2070,7 +2113,7 @@ class Worker(ServiceCommandSection):
if full_monitoring and self.temp_config_path:
safe_remove_file(self._session.config_file)
Singleton.close_pid_file()
return
return status if ENV_PROPAGATE_EXITCODE.get() else 0
self._apply_extra_configuration()
@@ -2090,85 +2133,157 @@ class Worker(ServiceCommandSection):
execution = self.get_execution_info(current_task)
if self._session.config.get("agent.package_manager.force_repo_requirements_txt", False):
requirements = None
print("[package_manager.force_repo_requirements_txt=true] "
"Skipping requirements, using repository \"requirements.txt\" ")
else:
python_ver = self._get_task_python_version(current_task)
freeze = None
repo_info = None
script_dir = ""
venv_folder = ""
custom_build_script = self._session.config.get("agent.custom_build_script", "") or ENV_CUSTOM_BUILD_SCRIPT.get()
if custom_build_script:
try:
requirements = current_task.script.requirements
except AttributeError:
venv_folder = Path(self._session.config["agent.venvs_dir"], python_ver or "3")
venv_folder = Path(os.path.expanduser(os.path.expandvars(venv_folder.as_posix())))
directory, vcs, repo_info = self.get_repo_info(
execution, current_task, str(venv_folder)
)
binary, entry_point, working_dir = self.run_custom_build_script(
custom_build_script,
current_task,
execution,
venv_folder=venv_folder,
git_root=vcs.location,
)
execution.entry_point = str(entry_point)
execution.working_dir = str(working_dir)
script_dir = str(working_dir)
self.package_api = VirtualenvPip(
session=self._session,
interpreter=str(binary),
python=str(binary),
requirements_manager=RequirementsManager(self._session),
execution_info=execution,
path=venv_folder,
)
self.global_package_api = SystemPip(
session=self._session,
interpreter=str(binary),
)
except SkippedCustomBuildScript as ex:
print("Warning: {}".format(str(ex)))
custom_build_script = None
if not custom_build_script:
if self._session.config.get("agent.package_manager.force_repo_requirements_txt", False):
requirements = None
print("\n[package_manager.force_repo_requirements_txt=true] "
"Skipping requirements, using repository \"requirements.txt\" \n")
elif self._session.config.get("agent.package_manager.force_original_requirements", False):
try:
requirements = current_task.script.requirements
if isinstance(requirements, dict):
if 'org_pip' in requirements:
requirements['pip'] = requirements['org_pip']
print("\n[package_manager.force_original_requirements=true] "
"Using original requirements: \n{}\n".format(requirements['org_pip']))
if 'org_conda' in requirements:
requirements['conda'] = requirements['org_conda']
print("\n[package_manager.force_original_requirements=true] "
"Using original requirements: \n{}\n".format(requirements['org_conda']))
except AttributeError:
requirements = None
else:
try:
requirements = current_task.script.requirements
except AttributeError:
requirements = None
try:
python_ver = current_task.script.binary
python_ver = python_ver.split('/')[-1].replace('python', '')
# if we can cast it, we are good
python_ver = '{:.1f}'.format(float(python_ver))
except:
python_ver = None
alternative_code_folder = None
if ENV_AGENT_SKIP_PYTHON_ENV_INSTALL.get():
venv_folder, requirements_manager, is_cached = None, None, False
# we need to create a folder for the code to be dumped into
code_folder = self._session.config.get("agent.venvs_dir")
code_folder = Path(os.path.expanduser(os.path.expandvars(code_folder)))
# let's make sure it is clear from previous runs
rm_tree(normalize_path(code_folder, WORKING_REPOSITORY_DIR))
rm_tree(normalize_path(code_folder, WORKING_STANDALONE_DIR))
if not code_folder.exists():
code_folder.mkdir(parents=True, exist_ok=True)
alternative_code_folder = code_folder.as_posix()
else:
venv_folder, requirements_manager, is_cached = self.install_virtualenv(
standalone_mode=standalone_mode,
requested_python_version=python_ver,
execution_info=execution,
cached_requirements=requirements,
)
venv_folder, requirements_manager, is_cached = self.install_virtualenv(
standalone_mode=standalone_mode,
requested_python_version=python_ver,
execution_info=execution,
cached_requirements=requirements,
)
if not is_cached and not standalone_mode:
if self._default_pip:
self.package_api.install_packages(*self._default_pip)
if not is_cached and not standalone_mode:
if self._default_pip:
self.package_api.install_packages(*self._default_pip)
print("\n")
# either use the venvs base folder for code or the cwd
directory, vcs, repo_info = self.get_repo_info(
execution, current_task, str(venv_folder or alternative_code_folder)
)
print("\n")
directory, vcs, repo_info = self.get_repo_info(
execution, current_task, venv_folder
)
cwd = vcs.location if vcs and vcs.location else directory
print("\n")
if not standalone_mode:
if is_cached:
# reinstalling git / local packages
package_api = copy(self.package_api)
OnlyExternalRequirements.cwd = package_api.cwd = cwd
package_api.requirements_manager = self._get_requirements_manager(
base_interpreter=package_api.requirements_manager.get_interpreter(),
requirement_substitutions=[OnlyExternalRequirements]
)
# manually update the current state,
# for the external git reference chance (in the replace callback)
package_api.requirements_manager.update_installed_packages_state(package_api.freeze())
# make sure we run the handlers
cached_requirements = \
{k: package_api.requirements_manager.replace(requirements[k] or '')
for k in requirements}
if str(cached_requirements.get('pip', '')).strip() \
or str(cached_requirements.get('conda', '')).strip():
package_api.load_requirements(cached_requirements)
# make sure we call the correct freeze
requirements_manager = package_api.requirements_manager
elif requirements_manager:
self.install_requirements(
execution,
repo_info,
requirements_manager=requirements_manager,
cached_requirements=requirements,
cwd=cwd,
)
elif not self.package_api:
# check if we have to manually configure package API, it will be readonly
self.package_api = SystemPip(session=self._session)
cwd = vcs.location if vcs and vcs.location else directory
# do not update the task packages if we are using conda,
# it will most likely make the task environment unreproducible
skip_freeze_update = self.is_conda and not self._session.config.get(
"agent.package_manager.conda_full_env_update", False)
if is_cached and not standalone_mode:
# reinstalling git / local packages
package_api = copy(self.package_api)
OnlyExternalRequirements.cwd = package_api.cwd = cwd
package_api.requirements_manager = self._get_requirements_manager(
base_interpreter=package_api.requirements_manager.get_interpreter(),
requirement_substitutions=[OnlyExternalRequirements]
)
# make sure we run the handlers
cached_requirements = \
{k: package_api.requirements_manager.replace(requirements[k] or '')
for k in requirements}
if str(cached_requirements.get('pip', '')).strip() \
or str(cached_requirements.get('conda', '')).strip():
package_api.load_requirements(cached_requirements)
# make sure we call the correct freeze
requirements_manager = package_api.requirements_manager
elif not is_cached and not standalone_mode:
self.install_requirements(
execution,
repo_info,
freeze = self.freeze_task_environment(
task_id=current_task.id,
requirements_manager=requirements_manager,
cached_requirements=requirements,
cwd=cwd,
add_venv_folder_cache=venv_folder,
execution_info=execution,
update_requirements=not skip_freeze_update,
)
# do not update the task packages if we are using conda,
# it will most likely make the task environment unreproducible
skip_freeze_update = self.is_conda and not self._session.config.get(
"agent.package_manager.conda_full_env_update", False)
freeze = self.freeze_task_environment(
task_id=current_task.id,
requirements_manager=requirements_manager,
add_venv_folder_cache=venv_folder,
execution_info=execution,
update_requirements=not skip_freeze_update,
)
script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix()
script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix()
# run code
# print("Running task id [%s]:" % current_task.id)
@@ -2178,7 +2293,9 @@ class Worker(ServiceCommandSection):
extra.append(
WorkerParams(optimization=optimization).get_optimization_flag()
)
# check if this is a module load, then load it.
# noinspection PyBroadException
try:
if current_task.script.binary and current_task.script.binary.startswith('python') and \
execution.entry_point and execution.entry_point.split()[0].strip() == '-m':
@@ -2186,7 +2303,7 @@ class Worker(ServiceCommandSection):
extra.extend(shlex.split(execution.entry_point))
else:
extra.append(execution.entry_point)
except:
except Exception:
extra.append(execution.entry_point)
command = self.package_api.get_python_command(extra)
@@ -2225,7 +2342,10 @@ class Worker(ServiceCommandSection):
os.environ.update(hyper_params)
# Add the script CWD to the python path
python_path = get_python_path(script_dir, execution.entry_point, self.package_api, is_conda_env=self.is_conda)
if repo_info and repo_info.root and self._session.config.get('agent.force_git_root_python_path', None):
python_path = get_python_path(repo_info.root, None, self.package_api, is_conda_env=self.is_conda)
else:
python_path = get_python_path(script_dir, execution.entry_point, self.package_api, is_conda_env=self.is_conda)
if ENV_TASK_EXTRA_PYTHON_PATH.get():
python_path = add_python_path(python_path, ENV_TASK_EXTRA_PYTHON_PATH.get())
if python_path:
@@ -2567,7 +2687,7 @@ class Worker(ServiceCommandSection):
python_version=getattr(self.package_api, 'python', ''),
cuda_version=self._session.config.get("agent.cuda_version"),
source_folder=add_venv_folder_cache,
exclude_sub_folders=['task_repository', 'code'])
exclude_sub_folders=[WORKING_REPOSITORY_DIR, WORKING_STANDALONE_DIR])
# If do not update back requirements
if not update_requirements:
@@ -2594,8 +2714,8 @@ class Worker(ServiceCommandSection):
print('Poetry Enabled: Ignoring requested python packages, using repository poetry lock file!')
api.install()
return api
except Exception:
self.log.error("failed installing poetry requirements:")
except Exception as ex:
self.log.error("failed installing poetry requirements: {}".format(ex))
return None
def install_requirements(
@@ -2694,7 +2814,7 @@ class Worker(ServiceCommandSection):
if self._session.debug_mode and temp_file:
rm_file(temp_file.name)
# call post installation callback
requirements_manager.post_install(self._session)
requirements_manager.post_install(self._session, package_manager=package_api)
# mark as successful installation
repo_requirements_installed = True
@@ -2795,7 +2915,7 @@ class Worker(ServiceCommandSection):
".".join, reversed(list(suffixes(self._get_python_version_suffix(config_version).split("."))))
)
]
default_python = None
for version, executable in python_executables:
self.log.debug("Searching for {}".format(executable))
if find_executable(executable):
@@ -2806,42 +2926,158 @@ class Worker(ServiceCommandSection):
except subprocess.CalledProcessError as ex:
self.log.warning("error getting %s version: %s", executable, ex)
continue
if not default_python:
match = re.search(r"Python ({}(?:\.\d+)*)".format(r"\d+"), output)
default_python = (
match.group(1),
version if version and '.' in version else '.'.join(match.group(1).split('.')[:2]),
executable)
match = re.search(
r"Python ({}(?:\.\d+)*)".format(
r"\d+" if not config_version or os.path.sep in config_version else config_version), output
)
if match:
self.log.debug("Found: {}".format(executable))
return match.group(1), version or '.'.join(match.group(1).split('.')[:2]), executable
return (
match.group(1),
version if version and '.' in version else '.'.join(match.group(1).split('.')[:2]),
executable
)
if default_python:
self.log.warning(
"Python executable with version {!r} requested by the Task, "
"not found in path, using \'{}\' (v{}) instead".format(
config_version, find_executable(default_python[-1]), default_python[0]
)
)
return default_python
raise CommandFailedError(
"Python executable with version {!r} defined in configuration file, "
"Python executable with version {!r} requested by the Task, "
"key 'agent.default_python', not found in path, tried: {}".format(
config_version, list(zip(*python_executables))[1]
)
)
def install_virtualenv(
self,
venv_dir=None,
requested_python_version=None,
standalone_mode=False,
execution_info=None,
cached_requirements=None,
):
# type: (str, str, bool, ExecutionInfo, dict) -> Tuple[Path, RequirementsManager, bool]
def run_custom_build_script(self, script, task, execution, venv_folder, git_root):
# type: (str, tasks_api.Task, ExecutionInfo, Path, str)-> Tuple[Path, Path, Path]
"""
Install a new python virtual environment, removing the old one if exists
If CLEARML_SKIP_PIP_VENV_INSTALL is set then an emtpy virtual env folder is created
and package manager is configured to work with the global python interpreter (the interpreter
path itself can be passed in this variable)
:return: virtualenv directory, requirements manager to use with task, True if there is a cached venv entry
Run a custom env build script
:param script:
:return: A tuple containing:
- a full path to a python executable
- a new task entry_point (replacing the entry_point in the task's script section)
- a new working directory (replacing the working_dir in the task's script section)
- a requirements manager instance
"""
skip_pip_venv_install = ENV_AGENT_SKIP_PIP_VENV_INSTALL.get()
script = os.path.expanduser(os.path.expandvars(script))
try:
if not os.path.isfile(script):
raise SkippedCustomBuildScript("Build script {} is not found".format(script))
except OSError as ex:
raise SkippedCustomBuildScript(str(ex))
print("Running custom build script {}".format(script))
script_output_file = NamedTemporaryFile(prefix="custom_build_script", suffix=".json", mode="wt", delete=False)
os.environ["CLEARML_AGENT_CUSTOM_BUILD_SCRIPT"] = script
os.environ["CLEARML_CUSTOM_BUILD_TASK_CONFIG_JSON"] = json.dumps(
task.to_dict(), separators=(',', ':'), default=str
)
os.environ["CLEARML_CUSTOM_BUILD_OUTPUT"] = script_output_file.name
os.environ["CLEARML_TASK_SCRIPT_ENTRY"] = execution.entry_point
os.environ["CLEARML_TASK_WORKING_DIR"] = execution.working_dir
os.environ["CLEARML_VENV_PATH"] = str(venv_folder)
os.environ["CLEARML_GIT_ROOT"] = git_root
try:
subprocess.check_call([script])
except subprocess.CalledProcessError as ex:
raise CustomBuildScriptFailed(
message="Custom build script failed with return code {}".format(ex.returncode),
errno=ex.returncode
)
output = Path(script_output_file.name).read_text()
if not output:
raise SkippedCustomBuildScript("Build script {} is not found".format(script))
try:
output = json.loads(output)
binary = Path(output["binary"])
entry_point = Path(output["entry_point"])
working_dir = Path(output["working_dir"])
except ValueError as ex:
raise SkippedCustomBuildScript(
"Failed parsing build script output JSON ({}): {}".format(script_output_file.name, ex)
)
except KeyError as ex:
raise SkippedCustomBuildScript("Build script output missing {} field".format(ex.args[0]))
try:
if not binary.is_file():
raise SkippedCustomBuildScript(
"Invalid binary path returned from custom build script: {}".format(binary)
)
if not entry_point.is_file():
raise SkippedCustomBuildScript(
"Invalid entrypoint path returned from custom build script: {}".format(entry_point)
)
if not working_dir.is_dir():
raise SkippedCustomBuildScript(
"Invalid working dir returned from custom build script: {}".format(working_dir)
)
except OSError as ex:
raise SkippedCustomBuildScript(str(ex))
return binary, entry_point, working_dir
def _get_skip_pip_venv_install(self, skip_pip_venv_install=None):
if skip_pip_venv_install is None:
skip_pip_venv_install = ENV_AGENT_SKIP_PIP_VENV_INSTALL.get()
if skip_pip_venv_install:
try:
skip_pip_venv_install = bool(strtobool(skip_pip_venv_install))
except ValueError:
pass
elif ENV_VENV_CONFIGURED.get() and ENV_DOCKER_IMAGE.get() and \
self._session.config.get("agent.docker_use_activated_venv", True) and \
self._session.config.get("agent.package_manager.system_site_packages", False):
# if we are running inside a container, and virtual environment is already installed,
# we should install directly into it, because we cannot inherit from the system packages
skip_pip_venv_install = find_executable("python") or True
# check if we are running inside a container:
print(
"Warning! Found python virtual environment [{}] already activated inside the container, "
"installing packages into venv (pip does not support inherit/nested venv)".format(
skip_pip_venv_install if isinstance(skip_pip_venv_install, str) else ENV_VENV_CONFIGURED.get())
)
return skip_pip_venv_install
def install_virtualenv(
self,
venv_dir=None,
requested_python_version=None,
standalone_mode=False,
execution_info=None,
cached_requirements=None,
):
# type: (str, str, bool, ExecutionInfo, dict) -> Tuple[Path, RequirementsManager, bool]
"""
Install a new python virtual environment, removing the old one if exists
If skip_pip_venv_install is True or contains a string (or if CLEARML_SKIP_PIP_VENV_INSTALL is set)
then an emtpy virtual env folder is created and package manager is configured to work with the global python
interpreter (or using a custom interpreter if an interpreter path is passed in this variable)
:return: virtualenv directory, requirements manager to use with task, True if there is a cached venv entry
"""
skip_pip_venv_install = self._get_skip_pip_venv_install()
if self._session.config.get("agent.ignore_requested_python_version", None):
requested_python_version = ''
@@ -2889,6 +3125,7 @@ class Worker(ServiceCommandSection):
venv_dir = Path(venv_dir) if venv_dir else \
Path(self._session.config["agent.venvs_dir"], executable_version_suffix)
venv_dir = Path(os.path.expanduser(os.path.expandvars(venv_dir.as_posix())))
first_time = not standalone_mode and (
is_windows_platform()
@@ -2897,13 +3134,50 @@ class Worker(ServiceCommandSection):
or not self.is_venv_update
)
if not standalone_mode:
rm_tree(normalize_path(venv_dir, WORKING_REPOSITORY_DIR))
rm_tree(normalize_path(venv_dir, WORKING_STANDALONE_DIR))
call_package_manager_create, requirements_manager = self._setup_package_api(
executable_name=executable_name,
executable_version_suffix=executable_version_suffix,
venv_dir=venv_dir,
execution_info=execution_info,
standalone_mode=standalone_mode,
skip_pip_venv_install=skip_pip_venv_install,
first_time=first_time,
)
# check if we have a cached folder
if cached_requirements and not skip_pip_venv_install and self.package_api.get_cached_venv(
requirements=cached_requirements,
docker_cmd=execution_info.docker_cmd if execution_info else None,
python_version=self.package_api.python,
cuda_version=self._session.config.get("agent.cuda_version"),
destination_folder=Path(venv_dir)
):
print('::: Using Cached environment {} :::'.format(self.package_api.get_last_used_entry_cache()))
return venv_dir, requirements_manager, True
# create the initial venv
if not skip_pip_venv_install:
if call_package_manager_create:
self.package_api.create()
else:
if not venv_dir.exists():
venv_dir.mkdir(parents=True, exist_ok=True)
return venv_dir, requirements_manager, False
def _setup_package_api(
self, executable_name, executable_version_suffix, venv_dir, execution_info,
standalone_mode, skip_pip_venv_install=False, first_time=False
):
# type: (str, str, Path, ExecutionInfo, bool, bool, bool) -> Tuple[bool, RequirementsManager]
requirements_manager = self._get_requirements_manager(
base_interpreter=executable_name
)
if not standalone_mode:
rm_tree(normalize_path(venv_dir, WORKING_REPOSITORY_DIR))
package_manager_params = dict(
session=self._session,
python=executable_version_suffix if self.is_conda else executable_name,
@@ -2918,7 +3192,6 @@ class Worker(ServiceCommandSection):
)
call_package_manager_create = False
if not self.is_conda:
if standalone_mode or skip_pip_venv_install:
# pip with standalone mode
@@ -2926,7 +3199,10 @@ class Worker(ServiceCommandSection):
if standalone_mode:
self.package_api = VirtualenvPip(**package_manager_params)
else:
self.package_api = self.global_package_api
# we can change it, no one is going to use it anyhow
package_manager_params['path'] = None
package_manager_params['interpreter'] = executable_name
self.package_api = VirtualenvPip(**package_manager_params)
else:
if self.is_venv_update:
self.package_api = VenvUpdateAPI(
@@ -2964,26 +3240,7 @@ class Worker(ServiceCommandSection):
venv_dir = new_venv_folder
self.package_api = get_conda(path=venv_dir)
# check if we have a cached folder
if cached_requirements and not skip_pip_venv_install and self.package_api.get_cached_venv(
requirements=cached_requirements,
docker_cmd=execution_info.docker_cmd if execution_info else None,
python_version=package_manager_params['python'],
cuda_version=self._session.config.get("agent.cuda_version"),
destination_folder=Path(venv_dir)
):
print('::: Using Cached environment {} :::'.format(self.package_api.get_last_used_entry_cache()))
return venv_dir, requirements_manager, True
# create the initial venv
if not skip_pip_venv_install:
if call_package_manager_create:
self.package_api.create()
else:
if not venv_dir.exists():
venv_dir.mkdir(parents=True, exist_ok=True)
return venv_dir, requirements_manager, False
return call_package_manager_create, requirements_manager
def parse_requirements(self, reqs_file=None, overrides=None):
os = None
@@ -3018,8 +3275,8 @@ class Worker(ServiceCommandSection):
self._docker_image = docker_image
self._docker_arguments = docker_arguments
print("Running in Docker {} mode (v19.03 and above) - using default docker image: {} {}\n".format(
'*standalone*' if self._standalone_mode else '', self._docker_image,
print("Running in Docker{} mode (v19.03 and above) - using default docker image: {} {}\n".format(
' *standalone*' if self._standalone_mode else '', self._docker_image,
self._sanitize_docker_command(self._docker_arguments) or ''))
temp_config = deepcopy(self._session.config)
@@ -3233,6 +3490,7 @@ class Worker(ServiceCommandSection):
worker_tags=None,
name=None,
mount_ssh=None, mount_apt_cache=None, mount_pip_cache=None, mount_poetry_cache=None,
env_task_id=None,
):
docker = 'docker'
@@ -3326,6 +3584,9 @@ class Worker(ServiceCommandSection):
# update the docker image, so the system knows where it runs
base_cmd += ['-e', 'CLEARML_DOCKER_IMAGE={} {}'.format(docker_image, ' '.join(docker_arguments or [])).strip()]
if env_task_id:
base_cmd += ['-e', 'CLEARML_TASK_ID={}'.format(env_task_id), ]
if auth_token:
# if auth token is passed then put it in the env var
base_cmd += ['-e', '{}={}'.format(ENV_AGENT_AUTH_TOKEN.vars[0], auth_token)]
@@ -3351,7 +3612,7 @@ class Worker(ServiceCommandSection):
agent_install_bash_script = []
if os.environ.get('FORCE_LOCAL_CLEARML_AGENT_WHEEL'):
local_wheel = os.path.expanduser(os.environ.get('FORCE_LOCAL_CLEARML_AGENT_WHEEL'))
docker_wheel = str(Path('/tmp') / Path(local_wheel).name)
docker_wheel = '/tmp/{}'.format(basename(local_wheel))
base_cmd += ['-v', local_wheel + ':' + docker_wheel]
clearml_agent_wheel = '\"{}\"'.format(docker_wheel)
elif os.environ.get('FORCE_CLEARML_AGENT_REPO'):
@@ -3372,7 +3633,7 @@ class Worker(ServiceCommandSection):
' libsm6 libxext6 libxrender-dev libglib2.0-0' if install_opencv_libs else ""),
"[ ! -z $(which git) ] || export CLEARML_APT_INSTALL=\"$CLEARML_APT_INSTALL git\"",
"declare LOCAL_PYTHON",
"for i in {{10..5}}; do which {python_single_digit}.$i && " +
"[ ! -z $LOCAL_PYTHON ] || for i in {{15..5}}; do which {python_single_digit}.$i && " +
"{python_single_digit}.$i -m pip --version && " +
"export LOCAL_PYTHON=$(which {python_single_digit}.$i) && break ; done",
"[ ! -z $LOCAL_PYTHON ] || export CLEARML_APT_INSTALL=\"$CLEARML_APT_INSTALL {python_single_digit}-pip\"", # noqa
@@ -3405,7 +3666,7 @@ class Worker(ServiceCommandSection):
if docker_bash_setup_script and docker_bash_setup_script.strip('\n '):
extra_shell_script = (extra_shell_script or '') + \
' ; '.join(line.strip().replace('\"', '\\\"')
' ; '.join(line.strip()
for line in docker_bash_setup_script.split('\n') if line.strip()) + \
' ; '
@@ -3517,8 +3778,11 @@ class Worker(ServiceCommandSection):
return command, script_dir
def _kill_daemon(self, dynamic_gpus=False):
worker_id, worker_name = self._generate_worker_id_name(dynamic_gpus=dynamic_gpus)
def _kill_daemon(self, dynamic_gpus=False, worker_id=None):
if not worker_id:
worker_id, worker_name = self._generate_worker_id_name(dynamic_gpus=dynamic_gpus)
else:
worker_name = worker_id
# Iterate over all running process
for pid, uid, slot, file in sorted(Singleton.get_running_pids(), key=lambda x: x[1] or ''):
@@ -3600,6 +3864,27 @@ class Worker(ServiceCommandSection):
queue_ids.append(q_id)
return queue_ids
@staticmethod
def _sanitize_urls(s: str) -> Tuple[str, bool]:
""" Replaces passwords in URLs with asterisks """
regex = re.compile("^([^:]*:)[^@]+(.*)$")
tokens = re.split(r"\s", s)
changed = False
for k in range(len(tokens)):
if "@" in tokens[k]:
res = urlparse(tokens[k])
if regex.match(res.netloc):
changed = True
tokens[k] = urlunparse((
res.scheme,
regex.sub("\\1********\\2", res.netloc),
res.path,
res.params,
res.query,
res.fragment
))
return " ".join(tokens) if changed else s, changed
def _sanitize_docker_command(self, docker_command):
# type: (List[str]) -> List[str]
if not docker_command:
@@ -3616,16 +3901,35 @@ class Worker(ServiceCommandSection):
ENV_AGENT_AUTH_TOKEN.vars,
)
parse_embedded_urls = bool(self._session.config.get(
'agent.hide_docker_command_env_vars.parse_embedded_urls', True
))
skip_next = False
result = docker_command[:]
for i, item in enumerate(docker_command):
if skip_next:
skip_next = False
continue
try:
if item not in ("-e", "--env"):
continue
key, sep, _ = result[i + 1].partition("=")
if key not in keys or not sep:
continue
result[i + 1] = "{}={}".format(key, "********")
except KeyError:
if item in ("-e", "--env"):
key, sep, val = result[i + 1].partition("=")
if not sep:
continue
if key in ENV_DOCKER_IMAGE.vars:
# special case - this contains a complete docker command
val = " ".join(self._sanitize_docker_command(re.split(r"\s", val)))
elif key in keys:
val = "********"
elif parse_embedded_urls:
val = self._sanitize_urls(val)[0]
result[i + 1] = "{}={}".format(key, val)
skip_next = True
elif parse_embedded_urls and not item.startswith("-"):
item, changed = self._sanitize_urls(item)
if changed:
result[i] = item
except (KeyError, TypeError):
pass
return result

View File

@@ -126,6 +126,7 @@ DEFAULT_VENV_UPDATE_URL = (
"https://raw.githubusercontent.com/Yelp/venv-update/v3.2.4/venv_update.py"
)
WORKING_REPOSITORY_DIR = "task_repository"
WORKING_STANDALONE_DIR = "code"
DEFAULT_VCS_CACHE = normalize_path(CONFIG_DIR, "vcs-cache")
PIP_EXTRA_INDICES = [
]
@@ -134,6 +135,7 @@ ENV_DOCKER_IMAGE = EnvironmentConfig('CLEARML_DOCKER_IMAGE', 'TRAINS_DOCKER_IMAG
ENV_WORKER_ID = EnvironmentConfig('CLEARML_WORKER_ID', 'TRAINS_WORKER_ID')
ENV_WORKER_TAGS = EnvironmentConfig('CLEARML_WORKER_TAGS')
ENV_AGENT_SKIP_PIP_VENV_INSTALL = EnvironmentConfig('CLEARML_AGENT_SKIP_PIP_VENV_INSTALL')
ENV_AGENT_SKIP_PYTHON_ENV_INSTALL = EnvironmentConfig('CLEARML_AGENT_SKIP_PYTHON_ENV_INSTALL', type=bool)
ENV_DOCKER_SKIP_GPUS_FLAG = EnvironmentConfig('CLEARML_DOCKER_SKIP_GPUS_FLAG', 'TRAINS_DOCKER_SKIP_GPUS_FLAG')
ENV_AGENT_GIT_USER = EnvironmentConfig('CLEARML_AGENT_GIT_USER', 'TRAINS_AGENT_GIT_USER')
ENV_AGENT_GIT_PASS = EnvironmentConfig('CLEARML_AGENT_GIT_PASS', 'TRAINS_AGENT_GIT_PASS')
@@ -147,6 +149,38 @@ ENV_DOCKER_HOST_MOUNT = EnvironmentConfig('CLEARML_AGENT_K8S_HOST_MOUNT', 'CLEAR
ENV_VENV_CACHE_PATH = EnvironmentConfig('CLEARML_AGENT_VENV_CACHE_PATH')
ENV_EXTRA_DOCKER_ARGS = EnvironmentConfig('CLEARML_AGENT_EXTRA_DOCKER_ARGS', type=list)
ENV_CUSTOM_BUILD_SCRIPT = EnvironmentConfig('CLEARML_AGENT_CUSTOM_BUILD_SCRIPT')
"""
Specifies a custom environment setup script to be executed instead of installing a virtual environment.
If provided, this script is executed following Git cloning. Script command may include environment variable and
will be expanded before execution (e.g. "$CLEARML_GIT_ROOT/script.sh").
The script can also be specified using the `agent.custom_build_script` configuration setting.
When running the script, the following environment variables will be set:
- CLEARML_CUSTOM_BUILD_TASK_CONFIG_JSON: specifies a path to a temporary files containing the complete task
contents in JSON format
- CLEARML_TASK_SCRIPT_ENTRY: task entrypoint script as defined in the task's script section
- CLEARML_TASK_WORKING_DIR: task working directory as defined in the task's script section
- CLEARML_VENV_PATH: path to the agent's default virtual environment path (as defined in the configuration)
- CLEARML_GIT_ROOT: path to the cloned Git repository
- CLEARML_CUSTOM_BUILD_OUTPUT: a path to a non-existing file that may be created by the script. If created,
this file must be in the following JSON format:
```json
{
"binary": "/absolute/path/to/python-executable",
"entry_point": "/absolute/path/to/task-entrypoint-script",
"working_dir": "/absolute/path/to/task-working/dir"
}
```
If provided, the agent will use these instead of the predefined task script section to execute the task and will
skip virtual environment creation.
In case the custom script returns with a non-zero exit code, the agent will fail with the same exit code.
In case the custom script is specified but does not exist, or if the custom script does not write valid content
into the file specified in CLEARML_CUSTOM_BUILD_OUTPUT, the agent will emit a warning and continue with the
standard flow.
"""
class FileBuffering(IntEnum):
"""

View File

@@ -84,3 +84,13 @@ class MissingPackageError(CommandFailedError):
def __str__(self):
return '{self.__class__.__name__}: ' \
'"{self.name}" package is required. Please run "pip install {self.name}"'.format(self=self)
class CustomBuildScriptFailed(CommandFailedError):
def __init__(self, errno, *args, **kwargs):
super(CustomBuildScriptFailed, self).__init__(*args, **kwargs)
self.errno = errno
class SkippedCustomBuildScript(CommandFailedError):
pass

View File

@@ -69,7 +69,7 @@ class K8sIntegration(Worker):
"apt-get update",
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0",
"declare LOCAL_PYTHON",
"for i in {{10..5}}; do which python3.$i && python3.$i -m pip --version && "
"[ ! -z $LOCAL_PYTHON ] || for i in {{15..5}}; do which python3.$i && python3.$i -m pip --version && "
"export LOCAL_PYTHON=$(which python3.$i) && break ; done",
"[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip",
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",

View File

@@ -204,10 +204,13 @@ def get_python_path(script_dir, entry_point, package_api, is_conda_env=False):
["-c", "import sys; print('{}'.join(sys.path))".format(python_path_sep)])
org_python_path = python_path_cmd.get_output(cwd=script_dir)
# Add path of the script directory and executable directory
python_path = '{}{python_path_sep}{}{python_path_sep}'.format(
Path(script_dir).absolute().as_posix(),
(Path(script_dir) / Path(entry_point)).parent.absolute().as_posix(),
python_path_sep=python_path_sep)
python_path = '{}{python_path_sep}'.format(
Path(script_dir).absolute().as_posix(), python_path_sep=python_path_sep)
if entry_point:
python_path += '{}{python_path_sep}'.format(
(Path(script_dir) / Path(entry_point)).parent.absolute().as_posix(),
python_path_sep=python_path_sep)
if is_windows_platform():
python_path = python_path.replace('/', '\\')
@@ -503,6 +506,38 @@ def is_conda(config):
return config['agent.package_manager.type'].lower() == 'conda'
def convert_cuda_version_to_float_single_digit_str(cuda_version):
"""
Convert a cuda_version (string/float/int) into a float representation, e.g. 11.4
Notice returns String Single digit only!
:return str:
"""
cuda_version = str(cuda_version or 0)
# if we have patch version we parse it here
cuda_version_parts = [int(v) for v in cuda_version.split('.')]
if len(cuda_version_parts) > 1 or cuda_version_parts[0] < 60:
cuda_version = 10 * cuda_version_parts[0]
if len(cuda_version_parts) > 1:
cuda_version += float(".{:d}".format(cuda_version_parts[1]))*10
cuda_version_full = "{:.1f}".format(float(cuda_version) / 10.)
else:
cuda_version = cuda_version_parts[0]
cuda_version_full = "{:.1f}".format(float(cuda_version) / 10.)
return cuda_version_full
def convert_cuda_version_to_int_10_base_str(cuda_version):
"""
Convert a cuda_version (string/float/int) into an integer version, e.g. 112 for cuda 11.2
Return string
:return str:
"""
cuda_version = convert_cuda_version_to_float_single_digit_str(cuda_version)
return str(int(float(cuda_version)*10))
class NonStrictAttrs(object):
@classmethod

View File

@@ -2,7 +2,7 @@ from __future__ import unicode_literals, print_function
import csv
import sys
from collections import Iterable
from collections.abc import Iterable
from typing import List, Dict, Text, Any
from attr import attrs, attrib

View File

@@ -19,7 +19,9 @@ from clearml_agent.external.requirements_parser import parse
from clearml_agent.external.requirements_parser.requirement import Requirement
from clearml_agent.errors import CommandFailedError
from clearml_agent.helper.base import rm_tree, NonStrictAttrs, select_for_platform, is_windows_platform, ExecutionInfo
from clearml_agent.helper.base import (
rm_tree, NonStrictAttrs, select_for_platform, is_windows_platform, ExecutionInfo,
convert_cuda_version_to_float_single_digit_str, convert_cuda_version_to_int_10_base_str, )
from clearml_agent.helper.process import Argv, Executable, DEVNULL, CommandSequence, PathLike
from clearml_agent.helper.package.requirements import SimpleVersion
from clearml_agent.session import Session
@@ -167,7 +169,7 @@ class CondaAPI(PackageManager):
raise ValueError("Could not restore Conda environment, cannot find {}".format(
self.conda_pre_build_env_path))
output = Argv(
command = Argv(
self.conda,
"create",
"--yes",
@@ -175,7 +177,9 @@ class CondaAPI(PackageManager):
"--prefix",
self.path,
"python={}".format(self.python),
).get_output(stderr=DEVNULL)
)
print('Executing Conda: {}'.format(command.serialize()))
output = command.get_output(stderr=DEVNULL)
match = re.search(
r"\W*(.*activate) ({})".format(re.escape(str(self.path))), output
)
@@ -420,7 +424,7 @@ class CondaAPI(PackageManager):
finally:
PackageManager._selected_manager = self
self.requirements_manager.post_install(self.session)
self.requirements_manager.post_install(self.session, package_manager=self)
def load_requirements(self, requirements):
# if we are in read only mode, do not uninstall anything
@@ -449,10 +453,17 @@ class CondaAPI(PackageManager):
has_torch = False
has_matplotlib = False
has_cudatoolkit = False
cuda_version_full = 0
# noinspection PyBroadException
try:
# notice this is an integer version: 112 (means 11.2)
cuda_version = int(self.session.config.get('agent.cuda_version', 0))
except:
cuda_version = str(self.session.config.get('agent.cuda_version', "")).strip()
if not cuda_version:
cuda_version = 0
else:
cuda_version_full = convert_cuda_version_to_float_single_digit_str(cuda_version)
cuda_version = int(convert_cuda_version_to_int_10_base_str(cuda_version))
except Exception:
cuda_version = 0
# notice 'conda' entry with empty string is a valid conda requirements list, it means pip only
@@ -469,6 +480,7 @@ class CondaAPI(PackageManager):
continue
m = MarkerRequirement(marker[0])
m.validate_local_file_ref()
# conda does not support version control links
if m.vcs:
pip_requirements.append(m)
@@ -512,7 +524,8 @@ class CondaAPI(PackageManager):
reqs.append(m)
if not has_cudatoolkit and cuda_version:
m = MarkerRequirement(Requirement("cudatoolkit == {}".format(float(cuda_version) / 10.0)))
m = MarkerRequirement(Requirement.parse("cudatoolkit == {}".format(cuda_version_full)))
has_cudatoolkit = True
reqs.append(m)
# if we have a conda list, the rest should be installed with pip,
@@ -528,9 +541,9 @@ class CondaAPI(PackageManager):
continue
m = MarkerRequirement(marker[0])
# skip over local files (we cannot change the version to a local file)
if m.local_file:
continue
# remove local files reference if it does not exist (leave the package name)
m.validate_local_file_ref()
m_name = (m.name or '').lower()
if m_name in conda_supported_req_names:
# this package is in the conda list,
@@ -629,7 +642,7 @@ class CondaAPI(PackageManager):
finally:
PackageManager._selected_manager = self
self.requirements_manager.post_install(self.session)
self.requirements_manager.post_install(self.session, package_manager=self)
return True
def _parse_conda_result_bad_packges(self, result_dict):

View File

@@ -46,11 +46,10 @@ class ExternalRequirements(SimpleSubstitution):
post_install_req = self.post_install_req
self.post_install_req = []
for req in post_install_req:
try:
freeze_base = PackageManager.out_of_scope_freeze() or ''
except:
freeze_base = ''
if self.is_already_installed(req):
print("No need to reinstall \'{}\' from VCS, "
"the exact same version is already installed".format(req.name))
continue
req_line = self._add_vcs_credentials(req, session)
# if we have older pip version we have to make sure we replace back the package name with the
@@ -175,5 +174,11 @@ class OnlyExternalRequirements(ExternalRequirements):
# Do not store the skipped requirements
# mark skip package
if super(OnlyExternalRequirements, self).match(req):
if self.is_already_installed(req):
print("No need to reinstall \'{}\' from VCS, "
"the exact same version is already installed".format(req.name))
return Text('')
return self._add_vcs_credentials(req, self._session)
return Text('')

View File

@@ -12,7 +12,7 @@ from ..requirements import RequirementsManager
class VirtualenvPip(SystemPip, PackageManager):
def __init__(self, session, python, requirements_manager, path, interpreter=None, execution_info=None, **kwargs):
# type: (Session, float, RequirementsManager, PathLike, PathLike, ExecutionInfo, Any) -> ()
# type: (Session, str, RequirementsManager, PathLike, PathLike, ExecutionInfo, Any) -> ()
"""
Program interface to virtualenv pip.
Must be given either path to virtualenv or source command.
@@ -39,7 +39,7 @@ class VirtualenvPip(SystemPip, PackageManager):
if isinstance(requirements, dict) and requirements.get("pip"):
requirements["pip"] = self.requirements_manager.replace(requirements["pip"])
super(VirtualenvPip, self).load_requirements(requirements)
self.requirements_manager.post_install(self.session)
self.requirements_manager.post_install(self.session, package_manager=self)
def create_flags(self):
"""

View File

@@ -5,6 +5,7 @@ import attr
import sys
import os
from pathlib2 import Path
from clearml_agent.helper.process import Argv, DEVNULL, check_if_command_exists
from clearml_agent.session import Session, POETRY
@@ -81,6 +82,32 @@ class PoetryConfig:
@_guard_enabled
def initialize(self, cwd=None):
if not self._initialized:
if self.session.config.get("agent.package_manager.poetry_version", None) is not None:
version = str(self.session.config.get("agent.package_manager.poetry_version"))
print('Upgrading Poetry package {}'.format(version))
# first upgrade pip if we need to
try:
from clearml_agent.helper.package.pip_api.venv import VirtualenvPip
pip = VirtualenvPip(
session=self.session, python=self._python,
requirements_manager=None, path=None, interpreter=self._python)
pip.upgrade_pip()
except Exception as ex:
self.log.warning("failed upgrading pip: {}".format(ex))
# now install poetry
try:
version = version.replace(' ', '')
if ('=' in version) or ('~' in version) or ('<' in version) or ('>' in version):
version = version
elif version:
version = "==" + version
argv = Argv(self._python, "-m", "pip", "install", "poetry{}".format(version),
"--upgrade", "--disable-pip-version-check")
print(argv.get_output())
except Exception as ex:
self.log.warning("failed upgrading poetry: {}".format(ex))
self._initialized = True
try:
self._config("--local", "virtualenvs.in-project", "true", cwd=cwd)

View File

@@ -2,6 +2,7 @@ from __future__ import unicode_literals
import re
import sys
import platform
from furl import furl
import urllib.parse
from operator import itemgetter
@@ -174,36 +175,42 @@ class PytorchRequirement(SimpleSubstitution):
self.log = self._session.get_logger(__name__)
self.package_manager = self.config["agent.package_manager.type"].lower()
self.os = os_name or self.get_platform()
self.cuda = "cuda{}".format(self.cuda_version).lower()
self.python_version_string = str(self.config["agent.default_python"])
self.python_major_minor_str = '.'.join(self.python_version_string.split('.')[:2])
if '.' not in self.python_major_minor_str:
raise PytorchResolutionError(
"invalid python version {!r} defined in configuration file, key 'agent.default_python': "
"must have both major and minor parts of the version (for example: '3.7')".format(
self.python_version_string
)
)
self.python = "python{}".format(self.python_major_minor_str)
self.exceptions = [
PytorchResolutionError(message)
for message in (
None,
'cuda version "{}" is not supported'.format(self.cuda),
'python version "{}" is not supported'.format(
self.python_version_string
),
)
]
try:
self.validate_python_version()
except PytorchResolutionError as e:
self.log.warn("will not be able to install pytorch wheels: %s", e.args[0])
self.cuda = None
self.python_version_string = None
self.python_major_minor_str = None
self.python = None
self.exceptions = []
self._original_req = []
def _init_python_ver_cuda_ver(self):
if self.cuda is None:
self.cuda = "cuda{}".format(self.cuda_version).lower()
if self.python_version_string is None:
self.python_version_string = str(self.config["agent.default_python"])
if self.python_major_minor_str is None:
self.python_major_minor_str = '.'.join(self.python_version_string.split('.')[:2])
if '.' not in self.python_major_minor_str:
raise PytorchResolutionError(
"invalid python version {!r} defined in configuration file, key 'agent.default_python': "
"must have both major and minor parts of the version (for example: '3.7')".format(
self.python_version_string
)
)
if self.python is None:
self.python = "python{}".format(self.python_major_minor_str)
if not self.exceptions:
self.exceptions = [
PytorchResolutionError(message)
for message in (
None,
'cuda version "{}" is not supported'.format(self.cuda),
'python version "{}" is not supported'.format(
self.python_version_string
),
)
]
@property
def is_conda(self):
return self.package_manager == "conda"
@@ -216,6 +223,8 @@ class PytorchRequirement(SimpleSubstitution):
"""
Make sure python version has both major and minor versions as required for choosing pytorch wheel
"""
self._init_python_ver_cuda_ver()
if self.is_pip and not self.python_major_minor_str:
raise PytorchResolutionError(
"invalid python version {!r} defined in configuration file, key 'agent.default_python': "
@@ -237,10 +246,15 @@ class PytorchRequirement(SimpleSubstitution):
return "macos"
raise RuntimeError("unrecognized OS")
@staticmethod
def get_arch():
return str(platform.machine()).lower()
def _get_link_from_torch_page(self, req, torch_url):
links_parser = LinksHTMLParser()
links_parser.feed(requests.get(torch_url, timeout=10).text)
platform_wheel = "win" if self.get_platform() == "windows" else self.get_platform()
arch_wheel = self.get_arch()
py_ver = self.python_major_minor_str.replace('.', '')
url = None
last_v = None
@@ -261,8 +275,11 @@ class PytorchRequirement(SimpleSubstitution):
continue
if len(parts) < 3 or not parts[2].endswith(py_ver):
continue
if len(parts) < 5 or platform_wheel not in parts[4]:
if len(parts) < 5 or platform_wheel not in parts[4].lower():
continue
if len(parts) < 5 or arch_wheel not in parts[4].lower():
continue
# yes this is for linux python 2.7 support, this is the only python 2.7 we support...
if py_ver and py_ver[0] == '2' and len(parts) > 3 and not parts[3].endswith('u'):
continue
@@ -294,6 +311,7 @@ class PytorchRequirement(SimpleSubstitution):
def get_url_for_platform(self, req):
# check if package is already installed with system packages
self.validate_python_version()
# noinspection PyBroadException
try:
if self.config.get("agent.package_manager.system_site_packages", None):

View File

@@ -14,8 +14,11 @@ from pathlib2 import Path
from pyhocon import ConfigTree
import six
import logging
from clearml_agent.definitions import PIP_EXTRA_INDICES
from clearml_agent.helper.base import warning, is_conda, which, join_lines, is_windows_platform
from clearml_agent.helper.base import (
warning, is_conda, which, join_lines, is_windows_platform,
convert_cuda_version_to_int_10_base_str, )
from clearml_agent.helper.process import Argv, PathLike
from clearml_agent.helper.gpu.gpustat import get_driver_cuda_version
from clearml_agent.session import Session, normalize_cuda_version
@@ -153,6 +156,31 @@ class MarkerRequirement(object):
return SimpleVersion.compare_versions(
version_a=requested_version, op=op, version_b=version, num_parts=num_parts)
def remove_local_file_ref(self):
if not self.local_file or self.vcs or self.editable or self.path:
return False
parts = re.split(r"@\s*{}".format(self.req.uri), self.req.line)
# if we did not find anything do nothing
if len(parts) < 2:
return False
self.req.line = ''.join(parts).strip()
self.req.uri = None
self.req.local_file = False
return True
def validate_local_file_ref(self):
# if local file does not exist, remove the reference to it
if self.vcs or self.editable or self.path or not self.local_file or not self.name or \
not self.uri or not self.uri.startswith("file://"):
return
local_path = Path(self.uri[len("file://"):])
if not local_path.exists():
line = self.line
if self.remove_local_file_ref():
# print warning
logging.getLogger(__name__).warning(
'Local file not found [{}], references removed'.format(line))
class SimpleVersion:
_sub_versions_pep440 = ['a', 'b', 'rc', '.post', '.dev', '+', ]
@@ -409,6 +437,7 @@ class RequirementSubstitution(object):
self.config = session.config # type: ConfigTree
self.suffix = '.post{config[agent.cuda_version]}.dev{config[agent.cudnn_version]}'.format(config=self.config)
self.package_manager = self.config['agent.package_manager.type']
self._is_already_installed_cb = None
@abstractmethod
def match(self, req): # type: (MarkerRequirement) -> bool
@@ -424,6 +453,20 @@ class RequirementSubstitution(object):
"""
pass
def set_is_already_installed_cb(self, cb):
self._is_already_installed_cb = cb
def is_already_installed(self, req):
if not self._is_already_installed_cb:
return False
# noinspection PyBroadException
try:
return self._is_already_installed_cb(req)
except BaseException as ex:
# debug could not resolve something
print("Warning: Requirements post install callback exception (check if package installed): {}".format(ex))
return False
def post_scan_add_req(self): # type: () -> Optional[MarkerRequirement]
"""
Allows the RequirementSubstitution to add an extra line/requirements after
@@ -448,7 +491,7 @@ class RequirementSubstitution(object):
@property
def cuda_version(self):
return self.config['agent.cuda_version']
return convert_cuda_version_to_int_10_base_str(self.config['agent.cuda_version'])
@property
def cudnn_version(self):
@@ -534,6 +577,7 @@ class RequirementsManager(object):
cache_dir=pip_cache_dir.as_posix())
self._base_interpreter = base_interpreter
self._cwd = None
self._installed_parsed_packages = set()
def register(self, cls): # type: (Type[RequirementSubstitution]) -> None
self.handlers.append(cls(self._session))
@@ -591,7 +635,9 @@ class RequirementsManager(object):
return join_lines(result)
def post_install(self, session):
def post_install(self, session, package_manager=None):
if package_manager:
self.update_installed_packages_state(package_manager.freeze())
for h in self.handlers:
try:
h.post_install(session)
@@ -613,6 +659,34 @@ class RequirementsManager(object):
def get_interpreter(self):
return self._base_interpreter
def update_installed_packages_state(self, requirements):
"""
Updates internal Installed Packages objects, so that later we can detect
if we already have a pre-installed package
:param requirements: is the output of a freeze() call, i.e. dict {'pip': "package==version"}
"""
requirements = requirements if not isinstance(requirements, dict) else requirements.get("pip")
self._installed_parsed_packages = self.parse_requirements_section_to_marker_requirements(
requirements=requirements, cwd=self._cwd)
for h in self.handlers:
h.set_is_already_installed_cb(self._callback_is_already_installed)
def _callback_is_already_installed(self, req):
for p in (self._installed_parsed_packages or []):
if p.name != req.name:
continue
# if this is version control package, only return true of both installed and requests specify commit ID
if req.vcs:
return p.vcs and req.revision and req.revision == p.revision
if not req.specs and not p.specs:
return True
# return if this is the same version
return req.specs and p.specs and req.compare_version(p, op="==")
return False
@staticmethod
def get_cuda_version(config): # type: (ConfigTree) -> (Text, Text)
# we assume os.environ already updated the config['agent.cuda_version'] & config['agent.cudnn_version']
@@ -698,12 +772,17 @@ class RequirementsManager(object):
except Exception as ex:
return [Requirement(req_str)]
def create_req(x):
r = MarkerRequirement(x)
r.validate_local_file_ref()
return r
if not requirements:
return tuple()
parsed_requirements = tuple(
map(
MarkerRequirement,
create_req,
[r for line in (requirements.splitlines() if isinstance(requirements, str) else requirements)
for r in safe_parse(line)]
)

View File

@@ -108,7 +108,7 @@ class VCS(object):
)
self.url = url
self.location = Text(location)
self.revision = revision
self._revision = revision
self.log = self.session.get_logger(__name__)
@property
@@ -390,7 +390,7 @@ class VCS(object):
"""
Checkout repository at specified revision
"""
self.call("checkout", self.revision, *self.checkout_flags, cwd=self.location)
self.call("checkout", self._revision, *self.checkout_flags, cwd=self.location)
@abc.abstractmethod
def pull(self):
@@ -519,7 +519,7 @@ class VCS(object):
class Git(VCS):
executable_name = "git"
main_branch = "master"
main_branch = ("master", "main")
clone_flags = ("--quiet", "--recursive")
checkout_flags = ("--force",)
COMMAND_ENV = {
@@ -531,7 +531,9 @@ class Git(VCS):
@staticmethod
def remote_branch_name(branch):
return "origin/{}".format(branch)
return [
"origin/{}".format(b) for b in ([branch] if isinstance(branch, str) else branch)
]
def executable_not_found_error_help(self):
return 'Cannot find "{}" executable. {}'.format(
@@ -553,7 +555,15 @@ class Git(VCS):
"""
Checkout repository at specified revision
"""
self.call("checkout", self.revision, *self.checkout_flags, cwd=self.location)
revisions = [self._revision] if isinstance(self._revision, str) else self._revision
for i, revision in enumerate(revisions):
try:
self.call("checkout", revision, *self.checkout_flags, cwd=self.location)
break
except subprocess.CalledProcessError:
if i == len(revisions) - 1:
raise
try:
self.call("submodule", "update", "--recursive", cwd=self.location)
except: # noqa
@@ -593,7 +603,7 @@ class Hg(VCS):
"pull",
self.url_with_auth,
cwd=self.location,
*(("-r", self.revision) if self.revision else ())
*(("-r", self._revision) if self._revision else ())
)
info_commands = dict(
@@ -663,7 +673,9 @@ def clone_repository_cached(session, execution, destination):
vcs.pull()
rm_tree(destination)
shutil.copytree(Text(cached_repo_path), Text(clone_folder))
shutil.copytree(Text(cached_repo_path), Text(clone_folder),
symlinks=select_for_platform(linux=True, windows=False),
ignore_dangling_symlinks=True)
if not clone_folder.is_dir():
raise CommandFailedError(
"copying of repository failed: from {} to {}".format(
@@ -671,9 +683,9 @@ def clone_repository_cached(session, execution, destination):
)
)
# checkout in the newly copy destination
vcs.location = Text(clone_folder)
vcs.checkout()
# checkout in the newly copy destination
vcs.location = Text(clone_folder)
vcs.checkout()
repo_info = vcs.get_repository_copy_info(clone_folder)

View File

@@ -99,8 +99,10 @@ DAEMON_ARGS = dict({
'aliases': ['-d'],
},
'--stop': {
'help': 'Stop the running agent (based on the same set of arguments)',
'action': 'store_true',
'help': 'Stop the running agent (based on the same set of arguments). '
'Optional: provide a list of specific local worker IDs to stop',
'nargs': '*',
'default': False,
},
'--dynamic-gpus': {
'help': 'Allow to dynamically allocate gpus based on queue properties, '
@@ -165,7 +167,7 @@ COMMANDS = {
},
'--docker': {
'help': 'Run execution task inside a docker (v19.03 and above). Optional args <image> <arguments> or '
'specify default docker image in agent.default_docker.image / agent.default_docker.arguments'
'specify default docker image in agent.default_docker.image / agent.default_docker.arguments '
'use --gpus/--cpu-only (or set NVIDIA_VISIBLE_DEVICES) to limit gpu visibility for docker',
'nargs': '*',
'default': False,
@@ -199,11 +201,18 @@ COMMANDS = {
},
'--docker': {
'help': 'Build the experiment inside a docker (v19.03 and above). Optional args <image> <arguments> or '
'specify default docker image in agent.default_docker.image / agent.default_docker.arguments'
'specify default docker image in agent.default_docker.image / agent.default_docker.arguments '
'use --gpus/--cpu-only (or set NVIDIA_VISIBLE_DEVICES) to limit gpu visibility for docker',
'nargs': '*',
'default': False,
},
'--force-docker': {
'help': 'Force using the agent-specified docker image (either explicitly in the --docker argument or '
'using the agent\'s default docker image). If provided, the agent will not use any docker '
'container information stored on the task itself (default False)',
'default': False,
'action': 'store_true',
},
'--python-version': {
'help': 'Virtual environment python version to use',
},

View File

@@ -1 +1 @@
__version__ = '1.2.0rc0'
__version__ = '1.2.1'

View File

@@ -0,0 +1,75 @@
ARG TAG=3.7.12-alpine3.15
FROM python:${TAG} as build
RUN apk add --no-cache \
gcc \
musl-dev \
libffi-dev
RUN python3 \
-m pip \
install \
--prefix=/install \
--no-cache-dir \
-U \
clearml-agent \
cryptography>=2.9
FROM python:${TAG} as target
WORKDIR /app
ARG KUBECTL_VERSION=1.22.4
# Not sure about these ENV vars
# ENV LC_ALL=en_US.UTF-8
# ENV LANG=en_US.UTF-8
# ENV LANGUAGE=en_US.UTF-8
# ENV PYTHONIOENCODING=UTF-8
COPY --from=build /install /usr/local
ADD https://storage.googleapis.com/kubernetes-release/release/v${KUBECTL_VERSION}/bin/linux/amd64/kubectl /usr/bin/
RUN chmod +x /usr/bin/kubectl
RUN apk add --no-cache \
bash
COPY k8s_glue_example.py .
# AWS CLI
# https://github.com/kyleknap/aws-cli/blob/source-proposal/proposals/source-install.md#alpine-linux
# https://github.com/aws/aws-cli/issues/4685
# https://github.com/aws/aws-cli/pull/6352
# https://github.com/GoogleCloudPlatform/cloud-sdk-docker/blob/master/alpine/Dockerfile
FROM target as gcp
ARG CLOUD_SDK_VERSION=371.0.0
ENV CLOUD_SDK_VERSION=$CLOUD_SDK_VERSION
ENV PATH /google-cloud-sdk/bin:$PATH
WORKDIR /
RUN apk --no-cache add \
curl \
python3 \
py3-crcmod \
py3-openssl \
bash \
libc6-compat \
openssh-client \
git \
gnupg \
&& curl -O https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-${CLOUD_SDK_VERSION}-linux-x86_64.tar.gz && \
tar xzf google-cloud-sdk-${CLOUD_SDK_VERSION}-linux-x86_64.tar.gz && \
rm google-cloud-sdk-${CLOUD_SDK_VERSION}-linux-x86_64.tar.gz && \
gcloud config set core/disable_usage_reporting true && \
gcloud config set component_manager/disable_update_check true && \
gcloud config set metrics/environment github_docker_image && \
gcloud --version
WORKDIR /app

View File

@@ -0,0 +1,82 @@
ARG TAG=3.7.12-slim-bullseye
FROM python:${TAG} as target
ARG KUBECTL_VERSION=1.22.4
WORKDIR /app
RUN python3 \
-m pip \
install \
--no-cache-dir \
-U \
clearml-agent \
cryptography>=2.9
# Not sure about these ENV vars
# ENV LC_ALL=en_US.UTF-8
# ENV LANG=en_US.UTF-8
# ENV LANGUAGE=en_US.UTF-8
# ENV PYTHONIOENCODING=UTF-8
ADD https://storage.googleapis.com/kubernetes-release/release/v${KUBECTL_VERSION}/bin/linux/amd64/kubectl /usr/bin/
RUN chmod +x /usr/bin/kubectl
COPY k8s_glue_example.py .
CMD ["python3", "k8s_glue_example.py"]
FROM target as aws
# https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html
# https://docs.aws.amazon.com/eks/latest/userguide/install-aws-iam-authenticator.html
RUN apt-get update -qqy && \
apt-get install -qqy \
unzip && \
rm -rf /var/lib/apt/lists/*
ADD https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip awscliv2.zip
ADD https://amazon-eks.s3.us-west-2.amazonaws.com/1.21.2/2021-07-05/bin/linux/amd64/aws-iam-authenticator /usr/local/bin/aws-iam-authenticator
RUN unzip awscliv2.zip && \
./aws/install && \
rm -r awscliv2.zip aws/ && \
chmod +x /usr/local/bin/aws-iam-authenticator && \
aws --version && \
aws-iam-authenticator version
# https://github.com/GoogleCloudPlatform/cloud-sdk-docker/blob/master/debian_slim/Dockerfile
FROM target as gcp
ARG CLOUD_SDK_VERSION=371.0.0
ENV CLOUD_SDK_VERSION=$CLOUD_SDK_VERSION
ENV PATH "$PATH:/opt/google-cloud-sdk/bin/"
ARG INSTALL_COMPONENTS
RUN mkdir -p /usr/share/man/man1/
RUN apt-get update -qqy && \
apt-get install -qqy \
curl \
gcc \
python3-dev \
python3-pip \
apt-transport-https \
lsb-release \
openssh-client \
git \
gnupg && \
rm -rf /var/lib/apt/lists/* && \
pip3 install -U crcmod && \
export CLOUD_SDK_REPO="cloud-sdk-$(lsb_release -c -s)" && \
echo "deb https://packages.cloud.google.com/apt $CLOUD_SDK_REPO main" > /etc/apt/sources.list.d/google-cloud-sdk.list && \
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - && \
apt-get update && apt-get install -y google-cloud-sdk=${CLOUD_SDK_VERSION}-0 $INSTALL_COMPONENTS && \
gcloud config set core/disable_usage_reporting true && \
gcloud config set component_manager/disable_update_check true && \
gcloud config set metrics/environment github_docker_image && \
gcloud --version

View File

@@ -0,0 +1,94 @@
"""
This example assumes you have preconfigured services with selectors in the form of
"ai.allegro.agent.serial=pod-<number>" and a targetPort of 10022.
The K8sIntegration component will label each pod accordingly.
"""
from argparse import ArgumentParser
from clearml_agent.glue.k8s import K8sIntegration
def parse_args():
parser = ArgumentParser()
group = parser.add_mutually_exclusive_group()
parser.add_argument(
"--queue", type=str, help="Queue to pull tasks from"
)
group.add_argument(
"--ports-mode", action='store_true', default=False,
help="Ports-Mode will add a label to the pod which can be used as service, in order to expose ports"
"Should not be used with max-pods"
)
parser.add_argument(
"--num-of-services", type=int, default=20,
help="Specify the number of k8s services to be used. Use only with ports-mode."
)
parser.add_argument(
"--base-port", type=int,
help="Used in conjunction with ports-mode, specifies the base port exposed by the services. "
"For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num"
"e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003"
)
parser.add_argument(
"--base-pod-num", type=int, default=1,
help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the "
"service (default: %(default)s)"
)
parser.add_argument(
"--gateway-address", type=str, default=None,
help="Used in conjunction with ports-mode, specify the external address of the k8s ingress / ELB"
)
parser.add_argument(
"--pod-clearml-conf", type=str,
help="Configuration file to be used by the pod itself (if not provided, current configuration is used)"
)
parser.add_argument(
"--overrides-yaml", type=str,
help="YAML file containing pod overrides to be used when launching a new pod"
)
parser.add_argument(
"--template-yaml", type=str,
help="YAML file containing pod template. If provided pod will be scheduled with kubectl apply "
"and overrides are ignored, otherwise it will be scheduled with kubectl run"
)
parser.add_argument(
"--ssh-server-port", type=int, default=0,
help="If non-zero, every pod will also start an SSH server on the selected port (default: zero, not active)"
)
parser.add_argument(
"--namespace", type=str,
help="Specify the namespace in which pods will be created (default: %(default)s)", default="clearml"
)
group.add_argument(
"--max-pods", type=int,
help="Limit the maximum number of pods that this service can run at the same time."
"Should not be used with ports-mode"
)
return parser.parse_args()
def main():
args = parse_args()
user_props_cb = None
if args.ports_mode and args.base_port:
def k8s_user_props_cb(pod_number=0):
user_prop = {"k8s-pod-port": args.base_port + pod_number}
if args.gateway_address:
user_prop["k8s-gateway-address"] = args.gateway_address
return user_prop
user_props_cb = k8s_user_props_cb
k8s = K8sIntegration(
ports_mode=args.ports_mode, num_of_services=args.num_of_services, base_pod_num=args.base_pod_num,
user_props_cb=user_props_cb, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf,
template_yaml=args.template_yaml, extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None,
namespace=args.namespace, max_pods_limit=args.max_pods or None,
)
k8s.k8s_daemon(args.queue)
if __name__ == "__main__":
main()

View File

@@ -4,7 +4,7 @@ api {
web_server: https://demoapp.demo.clear.ml
files_server: https://demofiles.demo.clear.ml
# Credentials are generated in the webapp, https://demoapp.demo.clear.ml/profile
# Credentials are generated in the webapp, https://app.clear.ml/settings/workspace-configuration
# Overridden with os environment: CLEARML_API_ACCESS_KEY / CLEARML_API_SECRET_KEY
credentials {"access_key": "EGRTCO8JMSIGI6S39GTP43NFWXDQOW", "secret_key": "x!XTov_G-#vspE*Y(h$Anm&DIc5Ou-F)jsl$PdOyj5wG1&E!Z8"}
@@ -15,6 +15,11 @@ api {
agent {
# Set GIT user/pass credentials (if user/pass are set, GIT protocol will be set to https)
# leave blank for GIT SSH credentials (set force_git_ssh_protocol=true to force SSH protocol)
# **Notice**: GitHub personal token is equivalent to password, you can put it directly into `git_pass`
# To learn how to generate git token GitHub/Bitbucket/GitLab:
# https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token
# https://support.atlassian.com/bitbucket-cloud/docs/app-passwords/
# https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html
git_user=""
git_pass=""
# Limit credentials to a single domain, for example: github.com,
@@ -60,6 +65,8 @@ agent {
# specify pip version to use (examples "<20", "==19.3.1", "", empty string will install the latest version)
# pip_version: "<20"
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
# poetry_version: "<2",
# virtual environment inheres packages from system
system_site_packages: false,
@@ -226,6 +233,7 @@ agent {
hide_docker_command_env_vars {
enabled: true
extra_keys: []
parse_embedded_urls: true
}
# allow to set internal mount points inside the docker,

View File

@@ -11,7 +11,7 @@ python-dateutil>=2.4.2,<2.9.0
pyjwt>=1.6.4,<2.1.0
PyYAML>=3.12,<5.5.0
requests>=2.20.0,<2.26.0
six>=1.11.0,<1.16.0
typing>=3.6.4,<3.8.0
six>=1.13.0,<1.16.0
typing>=3.6.4,<3.8.0 ; python_version < '3.5'
urllib3>=1.21.1,<1.27.0
virtualenv>=16,<21