mirror of
https://github.com/clearml/clearml-agent
synced 2025-06-26 18:16:15 +00:00
Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3fe92a92ba | ||
|
|
154db59ce6 | ||
|
|
afffa83063 | ||
|
|
787c7d88bb | ||
|
|
667c2ced3d | ||
|
|
7f5b3c8df4 | ||
|
|
46ded2864d | ||
|
|
40456be948 | ||
|
|
8d51aed679 | ||
|
|
bfc4ba38cd | ||
|
|
3cedc104df | ||
|
|
b367c80477 | ||
|
|
262b6d3a00 | ||
|
|
95e996bfda | ||
|
|
b6d132b226 | ||
|
|
4f17a2c17d | ||
|
|
00e8e9eb5a | ||
|
|
af6a77918f | ||
|
|
855622fd30 | ||
|
|
8cd12810f3 | ||
|
|
ebb955187d | ||
|
|
85e1fadf9b |
@@ -69,8 +69,9 @@
|
||||
pip_version: ["<20.2 ; python_version < '3.10'", "<22.3 ; python_version >= '3.10'"],
|
||||
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
|
||||
# poetry_version: "<2",
|
||||
# poetry_install_extra_args: ["-v"]
|
||||
|
||||
# virtual environment inheres packages from system
|
||||
# virtual environment inherits packages from system
|
||||
system_site_packages: false,
|
||||
|
||||
# install with --upgrade
|
||||
@@ -105,6 +106,10 @@
|
||||
# set to True to support torch nightly build installation,
|
||||
# notice: torch nightly builds are ephemeral and are deleted from time to time
|
||||
torch_nightly: false,
|
||||
|
||||
# if set to true, the agent will look for the "poetry.lock" file
|
||||
# in the passed current working directory instead of the repository's root directory.
|
||||
poetry_files_from_repo_working_dir: false
|
||||
},
|
||||
|
||||
# target folder for virtual environments builds, created when executing experiment
|
||||
@@ -335,6 +340,49 @@
|
||||
|
||||
# Disable task docker override. If true, the agent will use the default docker image and ignore any docker image
|
||||
# and arguments specified in the task's container section (setup shell script from the task container section will
|
||||
# be used in any case, of specified).
|
||||
# be used in any case, if specified).
|
||||
disable_task_docker_override: false
|
||||
|
||||
# Choose the default docker based on the Task properties,
|
||||
# Examples: 'script.requirements', 'script.binary', 'script.repository', 'script.branch', 'project'
|
||||
# Notice: Matching is done via regular expression, for example "^searchme$" will match exactly "searchme$" string
|
||||
#
|
||||
# "default_docker": {
|
||||
# "image": "nvidia/cuda:10.2-cudnn7-runtime-ubuntu18.04",
|
||||
# # optional arguments to pass to docker image
|
||||
# # arguments: ["--ipc=host", ]
|
||||
# "match_rules": [
|
||||
# {
|
||||
# "image": "sample_container:tag",
|
||||
# "arguments": "-e VALUE=1 --ipc=host",
|
||||
# "match": {
|
||||
# "script": {
|
||||
# "requirements": {
|
||||
# "pip": {
|
||||
# "tensorflow": "~=1.6"
|
||||
# }
|
||||
# },
|
||||
# "repository": "",
|
||||
# "branch": "master"
|
||||
# },
|
||||
# "project": "example"
|
||||
# }
|
||||
# },
|
||||
# {
|
||||
# "image": "better_container:tag",
|
||||
# "arguments": "",
|
||||
# "match": {
|
||||
# "container": "replace_me_please"
|
||||
# }
|
||||
# },
|
||||
# {
|
||||
# "image": "another_container:tag",
|
||||
# "arguments": "",
|
||||
# "match": {
|
||||
# "project": "^examples", # anything that starts with "examples", e.g. "examples", "examples/sub_project"
|
||||
# }
|
||||
# }
|
||||
# ]
|
||||
# },
|
||||
#
|
||||
}
|
||||
|
||||
@@ -2,13 +2,16 @@
|
||||
import json as json_lib
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import types
|
||||
from random import SystemRandom
|
||||
from socket import gethostname
|
||||
from typing import Optional
|
||||
|
||||
import jwt
|
||||
import requests
|
||||
import six
|
||||
from requests import RequestException
|
||||
from requests.auth import HTTPBasicAuth
|
||||
from six.moves.urllib.parse import urlparse, urlunparse
|
||||
|
||||
@@ -26,6 +29,9 @@ from ...backend_config.environment import backward_compatibility_support
|
||||
from ...version import __version__
|
||||
|
||||
|
||||
sys_random = SystemRandom()
|
||||
|
||||
|
||||
class LoginError(Exception):
|
||||
pass
|
||||
|
||||
@@ -49,6 +55,7 @@ class Session(TokenManager):
|
||||
_session_initial_retry_connect_override = 4
|
||||
_write_session_data_size = 15000
|
||||
_write_session_timeout = (30.0, 30.)
|
||||
_request_exception_retry_timeout = (2.0, 3.0)
|
||||
|
||||
api_version = '2.1'
|
||||
feature_set = 'basic'
|
||||
@@ -111,19 +118,9 @@ class Session(TokenManager):
|
||||
self._verbose = verbose if verbose is not None else ENV_VERBOSE.get()
|
||||
self._logger = logger
|
||||
self.__auth_token = None
|
||||
self._propagate_exceptions_on_send = True
|
||||
|
||||
if ENV_API_DEFAULT_REQ_METHOD.get(default=None):
|
||||
# Make sure we update the config object, so we pass it into the new containers when we map them
|
||||
self.config.put("api.http.default_method", ENV_API_DEFAULT_REQ_METHOD.get())
|
||||
# notice the default setting of Request.def_method are already set by the OS environment
|
||||
elif self.config.get("api.http.default_method", None):
|
||||
def_method = str(self.config.get("api.http.default_method", None)).strip()
|
||||
if def_method.upper() not in ("GET", "POST", "PUT"):
|
||||
raise ValueError(
|
||||
"api.http.default_method variable must be 'get' or 'post' (any case is allowed)."
|
||||
)
|
||||
Request.def_method = def_method
|
||||
Request._method = Request.def_method
|
||||
self.update_default_api_method()
|
||||
|
||||
if ENV_AUTH_TOKEN.get(
|
||||
value_cb=lambda key, value: print("Using environment access token {}=********".format(key))
|
||||
@@ -178,6 +175,10 @@ class Session(TokenManager):
|
||||
)
|
||||
# try to connect with the server
|
||||
self.refresh_token()
|
||||
|
||||
# for resilience, from now on we won't allow propagating exceptions when sending requests
|
||||
self._propagate_exceptions_on_send = False
|
||||
|
||||
# create the default session with many retries
|
||||
http_retries_config, self.__http_session = self._setup_session(http_retries_config)
|
||||
|
||||
@@ -223,7 +224,22 @@ class Session(TokenManager):
|
||||
|
||||
return http_retries_config, get_http_session_with_retry(config=self.config or None, **http_retries_config)
|
||||
|
||||
def update_default_api_method(self):
|
||||
if ENV_API_DEFAULT_REQ_METHOD.get(default=None):
|
||||
# Make sure we update the config object, so we pass it into the new containers when we map them
|
||||
self.config.put("api.http.default_method", ENV_API_DEFAULT_REQ_METHOD.get())
|
||||
# notice the default setting of Request.def_method are already set by the OS environment
|
||||
elif self.config.get("api.http.default_method", None):
|
||||
def_method = str(self.config.get("api.http.default_method", None)).strip()
|
||||
if def_method.upper() not in ("GET", "POST", "PUT"):
|
||||
raise ValueError(
|
||||
"api.http.default_method variable must be 'get', 'post' or 'put' (any case is allowed)."
|
||||
)
|
||||
Request.def_method = def_method
|
||||
Request._method = Request.def_method
|
||||
|
||||
def load_vaults(self):
|
||||
# () -> Optional[bool]
|
||||
if not self.check_min_api_version("2.15") or self.feature_set == "basic":
|
||||
return
|
||||
|
||||
@@ -244,12 +260,14 @@ class Session(TokenManager):
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
res = self.send_request("users", "get_vaults", json={"enabled": True, "types": ["config"]})
|
||||
# Use params and not data/json otherwise payload might be dropped if we're using GET with a strict firewall
|
||||
res = self.send_request("users", "get_vaults", params="enabled=true&types=config&types=config")
|
||||
if res.ok:
|
||||
vaults = res.json().get("data", {}).get("vaults", [])
|
||||
data = list(filter(None, map(parse, vaults)))
|
||||
if data:
|
||||
self.config.set_overrides(*data)
|
||||
return True
|
||||
elif res.status_code != 404:
|
||||
raise Exception(res.json().get("meta", {}).get("result_msg", res.text))
|
||||
except Exception as ex:
|
||||
@@ -272,6 +290,7 @@ class Session(TokenManager):
|
||||
data=None,
|
||||
json=None,
|
||||
refresh_token_if_unauthorized=True,
|
||||
params=None,
|
||||
):
|
||||
""" Internal implementation for making a raw API request.
|
||||
- Constructs the api endpoint name
|
||||
@@ -295,6 +314,7 @@ class Session(TokenManager):
|
||||
if version
|
||||
else "{host}/{service}.{action}"
|
||||
).format(**locals())
|
||||
|
||||
while True:
|
||||
if data and len(data) > self._write_session_data_size:
|
||||
timeout = self._write_session_timeout
|
||||
@@ -302,16 +322,29 @@ class Session(TokenManager):
|
||||
timeout = self._session_initial_timeout
|
||||
else:
|
||||
timeout = self._session_timeout
|
||||
res = self.__http_session.request(
|
||||
method, url, headers=headers, auth=auth, data=data, json=json, timeout=timeout)
|
||||
|
||||
try:
|
||||
res = self.__http_session.request(
|
||||
method, url, headers=headers, auth=auth, data=data, json=json, timeout=timeout, params=params)
|
||||
except RequestException as ex:
|
||||
if self._propagate_exceptions_on_send:
|
||||
raise
|
||||
sleep_time = sys_random.uniform(*self._request_exception_retry_timeout)
|
||||
self._logger.error(
|
||||
"{} exception sending {} {}: {} (retrying in {:.1f}sec)".format(
|
||||
type(ex).__name__, method.upper(), url, str(ex), sleep_time
|
||||
)
|
||||
)
|
||||
time.sleep(sleep_time)
|
||||
continue
|
||||
|
||||
if (
|
||||
refresh_token_if_unauthorized
|
||||
and res.status_code == requests.codes.unauthorized
|
||||
and not token_refreshed_on_error
|
||||
):
|
||||
# it seems we're unauthorized, so we'll try to refresh our token once in case permissions changed since
|
||||
# the last time we got the token, and try again
|
||||
# it seems we're unauthorized, so we'll try to refresh our token once in case permissions changed
|
||||
# since the last time we got the token, and try again
|
||||
self.refresh_token()
|
||||
token_refreshed_on_error = True
|
||||
# try again
|
||||
@@ -348,6 +381,7 @@ class Session(TokenManager):
|
||||
data=None,
|
||||
json=None,
|
||||
async_enable=False,
|
||||
params=None,
|
||||
):
|
||||
"""
|
||||
Send a raw API request.
|
||||
@@ -360,6 +394,7 @@ class Session(TokenManager):
|
||||
content type will be application/json)
|
||||
:param data: Dictionary, bytes, or file-like object to send in the request body
|
||||
:param async_enable: whether request is asynchronous
|
||||
:param params: additional query parameters
|
||||
:return: requests Response instance
|
||||
"""
|
||||
headers = self.add_auth_headers(
|
||||
@@ -376,6 +411,7 @@ class Session(TokenManager):
|
||||
headers=headers,
|
||||
data=data,
|
||||
json=json,
|
||||
params=params,
|
||||
)
|
||||
|
||||
def send_request_batch(
|
||||
@@ -628,15 +664,14 @@ class Session(TokenManager):
|
||||
|
||||
res = None
|
||||
try:
|
||||
data = {"expiration_sec": exp} if exp else {}
|
||||
res = self._send_request(
|
||||
method=Request.def_method,
|
||||
service="auth",
|
||||
action="login",
|
||||
auth=auth,
|
||||
json=data,
|
||||
headers=headers,
|
||||
refresh_token_if_unauthorized=False,
|
||||
params={"expiration_sec": exp} if exp else {},
|
||||
)
|
||||
try:
|
||||
resp = res.json()
|
||||
@@ -675,3 +710,13 @@ class Session(TokenManager):
|
||||
return "{self.__class__.__name__}[{self.host}, {self.access_key}/{secret_key}]".format(
|
||||
self=self, secret_key=self.secret_key[:5] + "*" * (len(self.secret_key) - 5)
|
||||
)
|
||||
|
||||
@property
|
||||
def propagate_exceptions_on_send(self):
|
||||
# type: () -> bool
|
||||
return self._propagate_exceptions_on_send
|
||||
|
||||
@propagate_exceptions_on_send.setter
|
||||
def propagate_exceptions_on_send(self, value):
|
||||
# type: (bool) -> None
|
||||
self._propagate_exceptions_on_send = value
|
||||
|
||||
@@ -118,13 +118,15 @@ class ServiceCommandSection(BaseCommandSection):
|
||||
""" The name of the REST service used by this command """
|
||||
pass
|
||||
|
||||
def get(self, endpoint, *args, session=None, **kwargs):
|
||||
def get(self, endpoint, *args, service=None, session=None, **kwargs):
|
||||
session = session or self._session
|
||||
return session.get(service=self.service, action=endpoint, *args, **kwargs)
|
||||
service = service or self.service
|
||||
return session.get(service=service, action=endpoint, *args, **kwargs)
|
||||
|
||||
def post(self, endpoint, *args, session=None, **kwargs):
|
||||
def post(self, endpoint, *args, service=None, session=None, **kwargs):
|
||||
session = session or self._session
|
||||
return session.post(service=self.service, action=endpoint, *args, **kwargs)
|
||||
service = service or self.service
|
||||
return session.post(service=service, action=endpoint, *args, **kwargs)
|
||||
|
||||
def get_with_act_as(self, endpoint, *args, **kwargs):
|
||||
return self._session.get_with_act_as(service=self.service, action=endpoint, *args, **kwargs)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import print_function, division, unicode_literals
|
||||
|
||||
import errno
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -69,6 +70,7 @@ from clearml_agent.definitions import (
|
||||
ENV_CHILD_AGENTS_COUNT_CMD,
|
||||
ENV_DOCKER_ARGS_FILTERS,
|
||||
ENV_FORCE_SYSTEM_SITE_PACKAGES,
|
||||
ENV_SERVICES_DOCKER_RESTART,
|
||||
)
|
||||
from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
|
||||
from clearml_agent.errors import (
|
||||
@@ -734,8 +736,68 @@ class Worker(ServiceCommandSection):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _get_docker_restart_value(self, task_session, task_id: str):
|
||||
try:
|
||||
self._session.verify_feature_set('advanced')
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
restart = (ENV_SERVICES_DOCKER_RESTART.get() or "").strip()
|
||||
if not restart:
|
||||
return
|
||||
|
||||
# Parse value and selector
|
||||
restart_value, _, selector = restart.partition(";")
|
||||
|
||||
if restart_value not in ("unless-stopped", "no", "always") and not restart_value.startswith("on-failure"):
|
||||
self.log.error(
|
||||
"Invalid value \"{}\" provided for {}, ignoring".format(restart, ENV_SERVICES_DOCKER_RESTART.vars[0])
|
||||
)
|
||||
return
|
||||
|
||||
if not selector:
|
||||
return restart_value
|
||||
|
||||
path, _, expected_value = selector.partition("=")
|
||||
|
||||
result = task_session.send_request(
|
||||
service='tasks',
|
||||
action='get_all',
|
||||
json={'id': [task_id], 'only_fields': [path], 'search_hidden': True},
|
||||
method=Request.def_method,
|
||||
)
|
||||
if not result.ok:
|
||||
result_msg = self._get_path(result.json(), 'meta', 'result_msg')
|
||||
self.log.error(
|
||||
"Failed obtaining selector value for restart option \"{}\", ignoring: {}".format(selector, result_msg)
|
||||
)
|
||||
return
|
||||
|
||||
not_found = object()
|
||||
try:
|
||||
value = self._get_path(result.json(), 'data', 'tasks', 0, *path.split("."), default=not_found)
|
||||
except (ValueError, TypeError):
|
||||
return
|
||||
|
||||
if value is not_found:
|
||||
return
|
||||
|
||||
if not expected_value:
|
||||
return restart_value
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
if (
|
||||
(isinstance(value, bool) and value == strtobool(expected_value)) # check first - bool is also an int
|
||||
or (isinstance(value, (int, float)) and value == float(expected_value))
|
||||
or (str(value) == str(expected_value))
|
||||
):
|
||||
return restart_value
|
||||
except Exception as ex:
|
||||
pass
|
||||
|
||||
def run_one_task(self, queue, task_id, worker_args, docker=None, task_session=None):
|
||||
# type: (Text, Text, WorkerParams, Optional[Text], Optional[Session]) -> int
|
||||
# type: (Text, Text, WorkerParams, Optional[Text], Optional[Session]) -> Optional[int]
|
||||
"""
|
||||
Run one task pulled from queue.
|
||||
:param queue: ID of queue that task was pulled from
|
||||
@@ -810,6 +872,7 @@ class Worker(ServiceCommandSection):
|
||||
docker_image=docker_image,
|
||||
docker_arguments=docker_arguments,
|
||||
docker_bash_setup_script=docker_setup_script,
|
||||
restart=self._get_docker_restart_value(task_session, task_id),
|
||||
)
|
||||
if self._impersonate_as_task_owner:
|
||||
docker_params["auth_token"] = task_session.token
|
||||
@@ -1967,6 +2030,11 @@ class Worker(ServiceCommandSection):
|
||||
except Exception as ex:
|
||||
print("Error: failed applying files from configuration: {}".format(ex))
|
||||
|
||||
try:
|
||||
self._session.update_default_api_method()
|
||||
except Exception as ex:
|
||||
print("Error: failed updating default API method: {}".format(ex))
|
||||
|
||||
@resolve_names
|
||||
def build(
|
||||
self,
|
||||
@@ -1981,6 +2049,10 @@ class Worker(ServiceCommandSection):
|
||||
):
|
||||
if not task_id:
|
||||
raise CommandFailedError("Worker build must have valid task id")
|
||||
|
||||
if target and not os.path.isabs(target):
|
||||
# Non absolute target path will lead to errors with relative python executable
|
||||
target = os.path.abspath(target)
|
||||
|
||||
self._session.print_configuration()
|
||||
|
||||
@@ -2100,7 +2172,8 @@ class Worker(ServiceCommandSection):
|
||||
print('Building Task {} inside docker image: {} {} setup_script={}\n'.format(
|
||||
task_id, docker_image, docker_arguments or '', docker_setup_script or ''))
|
||||
full_docker_cmd = self.docker_image_func(
|
||||
docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script)
|
||||
docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script
|
||||
)
|
||||
|
||||
end_of_build_marker = "build.done=true"
|
||||
docker_cmd_suffix = ' build --id {task_id} --install-globally; ' \
|
||||
@@ -2866,19 +2939,27 @@ class Worker(ServiceCommandSection):
|
||||
self.log_traceback(e)
|
||||
return freeze
|
||||
|
||||
def _install_poetry_requirements(self, repo_info):
|
||||
# type: (Optional[RepoInfo]) -> Optional[PoetryAPI]
|
||||
def _install_poetry_requirements(self, repo_info, working_dir=None):
|
||||
# type: (Optional[RepoInfo], Optional[str]) -> Optional[PoetryAPI]
|
||||
if not repo_info:
|
||||
return None
|
||||
|
||||
files_from_working_dir = self._session.config.get(
|
||||
"agent.package_manager.poetry_files_from_repo_working_dir", False)
|
||||
lockfile_path = Path(repo_info.root) / ((working_dir or "") if files_from_working_dir else "")
|
||||
|
||||
try:
|
||||
if not self.poetry.enabled:
|
||||
return None
|
||||
self.poetry.initialize(cwd=repo_info.root)
|
||||
api = self.poetry.get_api(repo_info.root)
|
||||
|
||||
self.poetry.initialize(cwd=lockfile_path)
|
||||
api = self.poetry.get_api(lockfile_path)
|
||||
if api.enabled:
|
||||
print('Poetry Enabled: Ignoring requested python packages, using repository poetry lock file!')
|
||||
api.install()
|
||||
return api
|
||||
|
||||
print(f"Could not find pyproject.toml or poetry.lock file in {lockfile_path} \n")
|
||||
except Exception as ex:
|
||||
self.log.error("failed installing poetry requirements: {}".format(ex))
|
||||
return None
|
||||
@@ -2909,7 +2990,8 @@ class Worker(ServiceCommandSection):
|
||||
"""
|
||||
if package_api:
|
||||
package_api.cwd = cwd
|
||||
api = self._install_poetry_requirements(repo_info)
|
||||
|
||||
api = self._install_poetry_requirements(repo_info, execution.working_dir)
|
||||
if api:
|
||||
# update back the package manager, this hack should be fixed
|
||||
if package_api == self.package_api:
|
||||
@@ -3709,11 +3791,20 @@ class Worker(ServiceCommandSection):
|
||||
name=None,
|
||||
mount_ssh=None, mount_ssh_ro=None, mount_apt_cache=None, mount_pip_cache=None, mount_poetry_cache=None,
|
||||
env_task_id=None,
|
||||
restart=None,
|
||||
):
|
||||
self.debug("Constructing docker command", context="docker")
|
||||
docker = 'docker'
|
||||
|
||||
base_cmd = [docker, 'run', '-t']
|
||||
use_rm = True
|
||||
if restart:
|
||||
if restart in ("unless-stopped", "no", "always") or restart.startswith("on-failure"):
|
||||
base_cmd += ["--restart", restart]
|
||||
use_rm = False
|
||||
else:
|
||||
self.log.error("Invalid restart value \"{}\" , ignoring".format(restart))
|
||||
|
||||
update_scheme = ""
|
||||
dockers_nvidia_visible_devices = 'all'
|
||||
gpu_devices = Session.get_nvidia_visible_env()
|
||||
@@ -3926,7 +4017,8 @@ class Worker(ServiceCommandSection):
|
||||
(['-v', host_cache+':'+mounted_cache] if host_cache else []) +
|
||||
(['-v', host_vcs_cache+':'+mounted_vcs_cache] if host_vcs_cache else []) +
|
||||
(['-v', host_venvs_cache + ':' + mounted_venvs_cache] if host_venvs_cache else []) +
|
||||
['--rm', docker_image, 'bash', '-c',
|
||||
(['--rm'] if use_rm else []) +
|
||||
[docker_image, 'bash', '-c',
|
||||
update_scheme +
|
||||
extra_shell_script +
|
||||
"cp {} {} ; ".format(DOCKER_ROOT_CONF_FILE, DOCKER_DEFAULT_CONF_FILE) +
|
||||
@@ -4132,6 +4224,15 @@ class Worker(ServiceCommandSection):
|
||||
" found {})".format(role)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _get_path(d, *path, default=None):
|
||||
try:
|
||||
return functools.reduce(
|
||||
lambda a, b: a[b], path, d
|
||||
)
|
||||
except (IndexError, KeyError):
|
||||
return default
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pass
|
||||
|
||||
@@ -5,9 +5,9 @@ from enum import IntEnum
|
||||
from os import getenv, environ
|
||||
from typing import Text, Optional, Union, Tuple, Any
|
||||
|
||||
import six
|
||||
from pathlib2 import Path
|
||||
|
||||
import six
|
||||
from clearml_agent.helper.base import normalize_path
|
||||
|
||||
PROGRAM_NAME = "clearml-agent"
|
||||
@@ -69,42 +69,65 @@ ENV_AWS_SECRET_KEY = EnvironmentConfig("AWS_SECRET_ACCESS_KEY")
|
||||
ENV_AZURE_ACCOUNT_KEY = EnvironmentConfig("AZURE_STORAGE_KEY")
|
||||
|
||||
ENVIRONMENT_CONFIG = {
|
||||
"api.api_server": EnvironmentConfig("CLEARML_API_HOST", "TRAINS_API_HOST", ),
|
||||
"api.files_server": EnvironmentConfig("CLEARML_FILES_HOST", "TRAINS_FILES_HOST", ),
|
||||
"api.web_server": EnvironmentConfig("CLEARML_WEB_HOST", "TRAINS_WEB_HOST", ),
|
||||
"api.api_server": EnvironmentConfig(
|
||||
"CLEARML_API_HOST",
|
||||
"TRAINS_API_HOST",
|
||||
),
|
||||
"api.files_server": EnvironmentConfig(
|
||||
"CLEARML_FILES_HOST",
|
||||
"TRAINS_FILES_HOST",
|
||||
),
|
||||
"api.web_server": EnvironmentConfig(
|
||||
"CLEARML_WEB_HOST",
|
||||
"TRAINS_WEB_HOST",
|
||||
),
|
||||
"api.credentials.access_key": EnvironmentConfig(
|
||||
"CLEARML_API_ACCESS_KEY", "TRAINS_API_ACCESS_KEY",
|
||||
"CLEARML_API_ACCESS_KEY",
|
||||
"TRAINS_API_ACCESS_KEY",
|
||||
),
|
||||
"api.credentials.secret_key": ENV_AGENT_SECRET_KEY,
|
||||
"agent.worker_name": EnvironmentConfig("CLEARML_WORKER_NAME", "TRAINS_WORKER_NAME", ),
|
||||
"agent.worker_id": EnvironmentConfig("CLEARML_WORKER_ID", "TRAINS_WORKER_ID", ),
|
||||
"agent.cuda_version": EnvironmentConfig(
|
||||
"CLEARML_CUDA_VERSION", "TRAINS_CUDA_VERSION", "CUDA_VERSION"
|
||||
"agent.worker_name": EnvironmentConfig(
|
||||
"CLEARML_WORKER_NAME",
|
||||
"TRAINS_WORKER_NAME",
|
||||
),
|
||||
"agent.cudnn_version": EnvironmentConfig(
|
||||
"CLEARML_CUDNN_VERSION", "TRAINS_CUDNN_VERSION", "CUDNN_VERSION"
|
||||
),
|
||||
"agent.cpu_only": EnvironmentConfig(
|
||||
names=("CLEARML_CPU_ONLY", "TRAINS_CPU_ONLY", "CPU_ONLY"), type=bool
|
||||
"agent.worker_id": EnvironmentConfig(
|
||||
"CLEARML_WORKER_ID",
|
||||
"TRAINS_WORKER_ID",
|
||||
),
|
||||
"agent.cuda_version": EnvironmentConfig("CLEARML_CUDA_VERSION", "TRAINS_CUDA_VERSION", "CUDA_VERSION"),
|
||||
"agent.cudnn_version": EnvironmentConfig("CLEARML_CUDNN_VERSION", "TRAINS_CUDNN_VERSION", "CUDNN_VERSION"),
|
||||
"agent.cpu_only": EnvironmentConfig(names=("CLEARML_CPU_ONLY", "TRAINS_CPU_ONLY", "CPU_ONLY"), type=bool),
|
||||
"agent.crash_on_exception": EnvironmentConfig("CLEAMRL_AGENT_CRASH_ON_EXCEPTION", type=bool),
|
||||
"sdk.aws.s3.key": EnvironmentConfig("AWS_ACCESS_KEY_ID"),
|
||||
"sdk.aws.s3.secret": ENV_AWS_SECRET_KEY,
|
||||
"sdk.aws.s3.region": EnvironmentConfig("AWS_DEFAULT_REGION"),
|
||||
"sdk.azure.storage.containers.0": {'account_name': EnvironmentConfig("AZURE_STORAGE_ACCOUNT"),
|
||||
'account_key': ENV_AZURE_ACCOUNT_KEY},
|
||||
"sdk.azure.storage.containers.0": {
|
||||
"account_name": EnvironmentConfig("AZURE_STORAGE_ACCOUNT"),
|
||||
"account_key": ENV_AZURE_ACCOUNT_KEY,
|
||||
},
|
||||
"sdk.google.storage.credentials_json": EnvironmentConfig("GOOGLE_APPLICATION_CREDENTIALS"),
|
||||
}
|
||||
|
||||
ENVIRONMENT_SDK_PARAMS = {
|
||||
"task_id": ("CLEARML_TASK_ID", "TRAINS_TASK_ID", ),
|
||||
"config_file": ("CLEARML_CONFIG_FILE", "TRAINS_CONFIG_FILE", ),
|
||||
"log_level": ("CLEARML_LOG_LEVEL", "TRAINS_LOG_LEVEL", ),
|
||||
"log_to_backend": ("CLEARML_LOG_TASK_TO_BACKEND", "TRAINS_LOG_TASK_TO_BACKEND", ),
|
||||
"task_id": (
|
||||
"CLEARML_TASK_ID",
|
||||
"TRAINS_TASK_ID",
|
||||
),
|
||||
"config_file": (
|
||||
"CLEARML_CONFIG_FILE",
|
||||
"TRAINS_CONFIG_FILE",
|
||||
),
|
||||
"log_level": (
|
||||
"CLEARML_LOG_LEVEL",
|
||||
"TRAINS_LOG_LEVEL",
|
||||
),
|
||||
"log_to_backend": (
|
||||
"CLEARML_LOG_TASK_TO_BACKEND",
|
||||
"TRAINS_LOG_TASK_TO_BACKEND",
|
||||
),
|
||||
}
|
||||
|
||||
ENVIRONMENT_BACKWARD_COMPATIBLE = EnvironmentConfig(
|
||||
names=("CLEARML_AGENT_ALG_ENV", "TRAINS_AGENT_ALG_ENV"), type=bool)
|
||||
ENVIRONMENT_BACKWARD_COMPATIBLE = EnvironmentConfig(names=("CLEARML_AGENT_ALG_ENV", "TRAINS_AGENT_ALG_ENV"), type=bool)
|
||||
|
||||
VIRTUAL_ENVIRONMENT_PATH = {
|
||||
"python2": normalize_path(CONFIG_DIR, "py2venv"),
|
||||
@@ -123,43 +146,61 @@ TOKEN_EXPIRATION_SECONDS = int(timedelta(days=2).total_seconds())
|
||||
|
||||
METADATA_EXTENSION = ".json"
|
||||
|
||||
DEFAULT_VENV_UPDATE_URL = (
|
||||
"https://raw.githubusercontent.com/Yelp/venv-update/v3.2.4/venv_update.py"
|
||||
)
|
||||
DEFAULT_VENV_UPDATE_URL = "https://raw.githubusercontent.com/Yelp/venv-update/v3.2.4/venv_update.py"
|
||||
WORKING_REPOSITORY_DIR = "task_repository"
|
||||
WORKING_STANDALONE_DIR = "code"
|
||||
DEFAULT_VCS_CACHE = normalize_path(CONFIG_DIR, "vcs-cache")
|
||||
PIP_EXTRA_INDICES = [
|
||||
]
|
||||
PIP_EXTRA_INDICES = []
|
||||
DEFAULT_PIP_DOWNLOAD_CACHE = normalize_path(CONFIG_DIR, "pip-download-cache")
|
||||
ENV_DOCKER_IMAGE = EnvironmentConfig('CLEARML_DOCKER_IMAGE', 'TRAINS_DOCKER_IMAGE')
|
||||
ENV_WORKER_ID = EnvironmentConfig('CLEARML_WORKER_ID', 'TRAINS_WORKER_ID')
|
||||
ENV_WORKER_TAGS = EnvironmentConfig('CLEARML_WORKER_TAGS')
|
||||
ENV_AGENT_SKIP_PIP_VENV_INSTALL = EnvironmentConfig('CLEARML_AGENT_SKIP_PIP_VENV_INSTALL')
|
||||
ENV_AGENT_SKIP_PYTHON_ENV_INSTALL = EnvironmentConfig('CLEARML_AGENT_SKIP_PYTHON_ENV_INSTALL', type=bool)
|
||||
ENV_DOCKER_SKIP_GPUS_FLAG = EnvironmentConfig('CLEARML_DOCKER_SKIP_GPUS_FLAG', 'TRAINS_DOCKER_SKIP_GPUS_FLAG')
|
||||
ENV_AGENT_GIT_USER = EnvironmentConfig('CLEARML_AGENT_GIT_USER', 'TRAINS_AGENT_GIT_USER')
|
||||
ENV_AGENT_GIT_PASS = EnvironmentConfig('CLEARML_AGENT_GIT_PASS', 'TRAINS_AGENT_GIT_PASS')
|
||||
ENV_AGENT_GIT_HOST = EnvironmentConfig('CLEARML_AGENT_GIT_HOST', 'TRAINS_AGENT_GIT_HOST')
|
||||
ENV_AGENT_DISABLE_SSH_MOUNT = EnvironmentConfig('CLEARML_AGENT_DISABLE_SSH_MOUNT', type=bool)
|
||||
ENV_SSH_AUTH_SOCK = EnvironmentConfig('SSH_AUTH_SOCK')
|
||||
ENV_TASK_EXECUTE_AS_USER = EnvironmentConfig('CLEARML_AGENT_EXEC_USER', 'TRAINS_AGENT_EXEC_USER')
|
||||
ENV_TASK_EXTRA_PYTHON_PATH = EnvironmentConfig('CLEARML_AGENT_EXTRA_PYTHON_PATH', 'TRAINS_AGENT_EXTRA_PYTHON_PATH')
|
||||
ENV_DOCKER_HOST_MOUNT = EnvironmentConfig('CLEARML_AGENT_K8S_HOST_MOUNT', 'CLEARML_AGENT_DOCKER_HOST_MOUNT',
|
||||
'TRAINS_AGENT_K8S_HOST_MOUNT', 'TRAINS_AGENT_DOCKER_HOST_MOUNT')
|
||||
ENV_VENV_CACHE_PATH = EnvironmentConfig('CLEARML_AGENT_VENV_CACHE_PATH')
|
||||
ENV_EXTRA_DOCKER_ARGS = EnvironmentConfig('CLEARML_AGENT_EXTRA_DOCKER_ARGS', type=list)
|
||||
ENV_DEBUG_INFO = EnvironmentConfig('CLEARML_AGENT_DEBUG_INFO')
|
||||
ENV_CHILD_AGENTS_COUNT_CMD = EnvironmentConfig('CLEARML_AGENT_CHILD_AGENTS_COUNT_CMD')
|
||||
ENV_DOCKER_ARGS_FILTERS = EnvironmentConfig('CLEARML_AGENT_DOCKER_ARGS_FILTERS')
|
||||
ENV_DOCKER_ARGS_HIDE_ENV = EnvironmentConfig('CLEARML_AGENT_DOCKER_ARGS_HIDE_ENV')
|
||||
ENV_DOCKER_IMAGE = EnvironmentConfig("CLEARML_DOCKER_IMAGE", "TRAINS_DOCKER_IMAGE")
|
||||
ENV_WORKER_ID = EnvironmentConfig("CLEARML_WORKER_ID", "TRAINS_WORKER_ID")
|
||||
ENV_WORKER_TAGS = EnvironmentConfig("CLEARML_WORKER_TAGS")
|
||||
ENV_AGENT_SKIP_PIP_VENV_INSTALL = EnvironmentConfig("CLEARML_AGENT_SKIP_PIP_VENV_INSTALL")
|
||||
ENV_AGENT_SKIP_PYTHON_ENV_INSTALL = EnvironmentConfig("CLEARML_AGENT_SKIP_PYTHON_ENV_INSTALL", type=bool)
|
||||
ENV_DOCKER_SKIP_GPUS_FLAG = EnvironmentConfig("CLEARML_DOCKER_SKIP_GPUS_FLAG", "TRAINS_DOCKER_SKIP_GPUS_FLAG")
|
||||
ENV_AGENT_GIT_USER = EnvironmentConfig("CLEARML_AGENT_GIT_USER", "TRAINS_AGENT_GIT_USER")
|
||||
ENV_AGENT_GIT_PASS = EnvironmentConfig("CLEARML_AGENT_GIT_PASS", "TRAINS_AGENT_GIT_PASS")
|
||||
ENV_AGENT_GIT_HOST = EnvironmentConfig("CLEARML_AGENT_GIT_HOST", "TRAINS_AGENT_GIT_HOST")
|
||||
ENV_AGENT_DISABLE_SSH_MOUNT = EnvironmentConfig("CLEARML_AGENT_DISABLE_SSH_MOUNT", type=bool)
|
||||
ENV_SSH_AUTH_SOCK = EnvironmentConfig("SSH_AUTH_SOCK")
|
||||
ENV_TASK_EXECUTE_AS_USER = EnvironmentConfig("CLEARML_AGENT_EXEC_USER", "TRAINS_AGENT_EXEC_USER")
|
||||
ENV_TASK_EXTRA_PYTHON_PATH = EnvironmentConfig("CLEARML_AGENT_EXTRA_PYTHON_PATH", "TRAINS_AGENT_EXTRA_PYTHON_PATH")
|
||||
ENV_DOCKER_HOST_MOUNT = EnvironmentConfig(
|
||||
"CLEARML_AGENT_K8S_HOST_MOUNT",
|
||||
"CLEARML_AGENT_DOCKER_HOST_MOUNT",
|
||||
"TRAINS_AGENT_K8S_HOST_MOUNT",
|
||||
"TRAINS_AGENT_DOCKER_HOST_MOUNT",
|
||||
)
|
||||
ENV_VENV_CACHE_PATH = EnvironmentConfig("CLEARML_AGENT_VENV_CACHE_PATH")
|
||||
ENV_EXTRA_DOCKER_ARGS = EnvironmentConfig("CLEARML_AGENT_EXTRA_DOCKER_ARGS", type=list)
|
||||
ENV_DEBUG_INFO = EnvironmentConfig("CLEARML_AGENT_DEBUG_INFO")
|
||||
ENV_CHILD_AGENTS_COUNT_CMD = EnvironmentConfig("CLEARML_AGENT_CHILD_AGENTS_COUNT_CMD")
|
||||
ENV_DOCKER_ARGS_FILTERS = EnvironmentConfig("CLEARML_AGENT_DOCKER_ARGS_FILTERS")
|
||||
ENV_DOCKER_ARGS_HIDE_ENV = EnvironmentConfig("CLEARML_AGENT_DOCKER_ARGS_HIDE_ENV")
|
||||
|
||||
ENV_FORCE_SYSTEM_SITE_PACKAGES = EnvironmentConfig('CLEARML_AGENT_FORCE_SYSTEM_SITE_PACKAGES', type=bool)
|
||||
ENV_SERVICES_DOCKER_RESTART = EnvironmentConfig("CLEARML_AGENT_SERVICES_DOCKER_RESTART")
|
||||
"""
|
||||
Specify a restart value for a services agent task containers.
|
||||
Note that when a restart value is provided, task containers will not be run with the '--rm' flag and will
|
||||
not be cleaned up automatically when completed (this will need to be done externally using the
|
||||
'docker container prune' command to free up resources).
|
||||
Value format for this env var is "<restart-value>;<task-selector>", where:
|
||||
- <restart-value> can be any valid restart value for docker-run (see https://docs.docker.com/engine/reference/commandline/run/#restart)
|
||||
- <task-selector> is optional, allowing to restrict this behaviour to specific tasks. The format is:
|
||||
"<path-to-task-field>=<value>" where:
|
||||
* <path-to-task-field> is a dot-separated path to a task field (e.g. "container.image")
|
||||
* <value> is optional. If not provided, the restart policy till be applied for the task container if the
|
||||
path provided exists. If provided, the restart policy will be applied if the value matches the value
|
||||
obtained from the task (value parsing and comparison is based on the type of value obtained from the task)
|
||||
For example:
|
||||
CLEARML_AGENT_SERVICES_DOCKER_RESTART=unless-stopped
|
||||
CLEARML_AGENT_SERVICES_DOCKER_RESTART=unless-stopped;container.image=some-image
|
||||
"""
|
||||
|
||||
ENV_FORCE_SYSTEM_SITE_PACKAGES = EnvironmentConfig("CLEARML_AGENT_FORCE_SYSTEM_SITE_PACKAGES", type=bool)
|
||||
""" Force system_site_packages: true when running tasks in containers (i.e. docker mode or k8s glue) """
|
||||
|
||||
|
||||
|
||||
ENV_CUSTOM_BUILD_SCRIPT = EnvironmentConfig('CLEARML_AGENT_CUSTOM_BUILD_SCRIPT')
|
||||
ENV_CUSTOM_BUILD_SCRIPT = EnvironmentConfig("CLEARML_AGENT_CUSTOM_BUILD_SCRIPT")
|
||||
"""
|
||||
Specifies a custom environment setup script to be executed instead of installing a virtual environment.
|
||||
If provided, this script is executed following Git cloning. Script command may include environment variable and
|
||||
|
||||
@@ -37,7 +37,6 @@ from clearml_agent.helper.resource_monitor import ResourceMonitor
|
||||
from clearml_agent.interface.base import ObjectID
|
||||
|
||||
|
||||
|
||||
class K8sIntegration(Worker):
|
||||
K8S_PENDING_QUEUE = "k8s_scheduler"
|
||||
|
||||
@@ -46,11 +45,11 @@ class K8sIntegration(Worker):
|
||||
|
||||
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
|
||||
|
||||
KUBECTL_DELETE_CMD = "kubectl delete pods " \
|
||||
"-l={agent_label} " \
|
||||
"--field-selector=status.phase!=Pending,status.phase!=Running " \
|
||||
"--namespace={namespace} " \
|
||||
"--output name"
|
||||
KUBECTL_CLEANUP_DELETE_CMD = "kubectl delete pods " \
|
||||
"-l={agent_label} " \
|
||||
"--field-selector=status.phase!=Pending,status.phase!=Running " \
|
||||
"--namespace={namespace} " \
|
||||
"--output name"
|
||||
|
||||
BASH_INSTALL_SSH_CMD = [
|
||||
"apt-get update",
|
||||
@@ -220,15 +219,6 @@ class K8sIntegration(Worker):
|
||||
with open(os.path.expandvars(os.path.expanduser(str(path))), 'rt') as f:
|
||||
return yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
|
||||
|
||||
@staticmethod
|
||||
def _get_path(d, *path, default=None):
|
||||
try:
|
||||
return functools.reduce(
|
||||
lambda a, b: a[b], path, d
|
||||
)
|
||||
except (IndexError, KeyError):
|
||||
return default
|
||||
|
||||
def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json", labels=None):
|
||||
# type: (str, Iterable[str], Iterable[str], str, Iterable[str]) -> Dict
|
||||
if not labels:
|
||||
@@ -263,7 +253,7 @@ class K8sIntegration(Worker):
|
||||
sleep(self._polling_interval)
|
||||
continue
|
||||
pods = output_config.get('items', [])
|
||||
task_ids = set()
|
||||
task_id_to_details = dict()
|
||||
for pod in pods:
|
||||
pod_name = pod.get('metadata', {}).get('name', None)
|
||||
if not pod_name:
|
||||
@@ -277,7 +267,7 @@ class K8sIntegration(Worker):
|
||||
if not namespace:
|
||||
continue
|
||||
|
||||
task_ids.add(task_id)
|
||||
task_id_to_details[task_id] = (pod_name, namespace)
|
||||
|
||||
msg = None
|
||||
|
||||
@@ -337,8 +327,41 @@ class K8sIntegration(Worker):
|
||||
)
|
||||
)
|
||||
|
||||
if task_id_to_details:
|
||||
try:
|
||||
result = self._session.get(
|
||||
service='tasks',
|
||||
action='get_all',
|
||||
json={"id": list(task_id_to_details), "status": ["stopped"], "only_fields": ["id"]},
|
||||
method=Request.def_method,
|
||||
async_enable=False,
|
||||
)
|
||||
aborted_task_ids = list(filter(None, (task.get("id") for task in result["tasks"])))
|
||||
|
||||
for task_id in aborted_task_ids:
|
||||
pod_name, namespace = task_id_to_details.get(task_id)
|
||||
if not pod_name:
|
||||
self.log.error("Failed locating aborted task {} in pending pods list".format(task_id))
|
||||
continue
|
||||
self.log.info(
|
||||
"K8S Glue pods monitor: task {} was aborted by its pod {} is still pending, "
|
||||
"deleting pod".format(task_id, pod_name)
|
||||
)
|
||||
|
||||
kubectl_cmd = "kubectl delete pod {pod_name} --output name {namespace}".format(
|
||||
namespace=f"--namespace={namespace}" if namespace else "", pod_name=pod_name,
|
||||
).strip()
|
||||
self.log.debug("Deleting aborted task pending pod: {}".format(kubectl_cmd))
|
||||
output = stringify_bash_output(get_bash_output(kubectl_cmd))
|
||||
if not output:
|
||||
self.log.warning("K8S Glue pods monitor: failed deleting pod {}".format(pod_name))
|
||||
except Exception as ex:
|
||||
self.log.warning(
|
||||
'K8S Glue pods monitor: failed checking aborted tasks for hanging pods: {}'.format(ex)
|
||||
)
|
||||
|
||||
# clean up any last message for a task that wasn't seen as a pod
|
||||
last_tasks_msgs = {k: v for k, v in last_tasks_msgs.items() if k in task_ids}
|
||||
last_tasks_msgs = {k: v for k, v in last_tasks_msgs.items() if k in task_id_to_details}
|
||||
|
||||
sleep(self._polling_interval)
|
||||
|
||||
@@ -470,7 +493,7 @@ class K8sIntegration(Worker):
|
||||
|
||||
# noinspection PyProtectedMember
|
||||
config_content = (
|
||||
self.conf_file_content or Path(session._config_file).read_text() or ""
|
||||
self.conf_file_content or (session._config_file and Path(session._config_file).read_text()) or ""
|
||||
) + '\n{}\n'.format('\n'.join(x for x in extra_config_values if x))
|
||||
|
||||
hocon_config_encoded = config_content.encode("ascii")
|
||||
@@ -612,6 +635,7 @@ class K8sIntegration(Worker):
|
||||
"k8s-pod-number": pod_number,
|
||||
"k8s-pod-label": labels[0],
|
||||
"k8s-internal-pod-count": pod_count,
|
||||
"k8s-agent": self._get_agent_label(),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -779,7 +803,9 @@ class K8sIntegration(Worker):
|
||||
if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec:
|
||||
# Do not try to cleanup the same namespace too quickly
|
||||
continue
|
||||
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, agent_label=self._get_agent_label())
|
||||
kubectl_cmd = self.KUBECTL_CLEANUP_DELETE_CMD.format(
|
||||
namespace=namespace, agent_label=self._get_agent_label()
|
||||
)
|
||||
self.log.debug("Deleting old/failed pods{} for ns {}: {}".format(
|
||||
extra_msg or "", namespace, kubectl_cmd
|
||||
))
|
||||
@@ -796,6 +822,51 @@ class K8sIntegration(Worker):
|
||||
self.log.error("Failed deleting old/failed pods for ns %s: %s", namespace, str(ex))
|
||||
finally:
|
||||
self._last_pod_cleanup_per_ns[namespace] = time()
|
||||
|
||||
# Locate tasks belonging to deleted pods that are still marked as pending or running
|
||||
tasks_to_abort = []
|
||||
try:
|
||||
task_ids = list(filter(None, (
|
||||
pod_name[len(self.pod_name_prefix):].strip()
|
||||
for pod_names in deleted_pods.values()
|
||||
for pod_name in pod_names
|
||||
)))
|
||||
if task_ids:
|
||||
result = self._session.get(
|
||||
service='tasks',
|
||||
action='get_all',
|
||||
json={"id": task_ids, "status": ["in_progress", "queued"], "only_fields": ["id", "status"]},
|
||||
method=Request.def_method,
|
||||
)
|
||||
tasks_to_abort = result["tasks"]
|
||||
except Exception as ex:
|
||||
self.log.warning('Failed getting running tasks for deleted pods: {}'.format(ex))
|
||||
|
||||
for task in tasks_to_abort:
|
||||
task_id = task.get("id")
|
||||
status = task.get("status")
|
||||
if not task_id or not status:
|
||||
self.log.warning('Failed getting task information: id={}, status={}'.format(task_id, status))
|
||||
continue
|
||||
try:
|
||||
if status == "queued":
|
||||
self._session.get(
|
||||
service='tasks',
|
||||
action='dequeue',
|
||||
json={"task": task_id, "force": True, "status_reason": "Pod deleted (not pending or running)",
|
||||
"status_message": "Pod deleted by agent {}".format(self.worker_id or "unknown")},
|
||||
method=Request.def_method,
|
||||
)
|
||||
self._session.get(
|
||||
service='tasks',
|
||||
action='failed',
|
||||
json={"task": task_id, "force": True, "status_reason": "Pod deleted (not pending or running)",
|
||||
"status_message": "Pod deleted by agent {}".format(self.worker_id or "unknown")},
|
||||
method=Request.def_method,
|
||||
)
|
||||
except Exception as ex:
|
||||
self.log.warning('Failed setting task {} to status "failed": {}'.format(task_id, ex))
|
||||
|
||||
return deleted_pods
|
||||
|
||||
def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):
|
||||
@@ -949,5 +1020,6 @@ class K8sIntegration(Worker):
|
||||
value = re.sub(r'^[^A-Za-z0-9]+', '', value) # strip leading non-alphanumeric chars
|
||||
value = re.sub(r'[^A-Za-z0-9]+$', '', value) # strip trailing non-alphanumeric chars
|
||||
value = re.sub(r'\W+', '-', value) # allow only word chars (this removed "." which is supported, but nvm)
|
||||
value = re.sub(r'_+', '-', value) # "_" is not allowed as well
|
||||
value = re.sub(r'-+', '-', value) # don't leave messy "--" after replacing previous chars
|
||||
return value[:63]
|
||||
|
||||
@@ -192,8 +192,13 @@ class PackageManager(object):
|
||||
if not self._get_cache_manager():
|
||||
return None
|
||||
|
||||
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
|
||||
return self._get_cache_manager().copy_cached_entry(keys, destination_folder)
|
||||
try:
|
||||
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
|
||||
return self._get_cache_manager().copy_cached_entry(keys, destination_folder)
|
||||
except Exception as ex:
|
||||
print("WARNING: Failed accessing venvs cache at {}: {}".format(destination_folder, ex))
|
||||
print("WARNING: Skipping venv cache - folder not accessible!")
|
||||
return None
|
||||
|
||||
def add_cached_venv(
|
||||
self,
|
||||
@@ -210,9 +215,15 @@ class PackageManager(object):
|
||||
"""
|
||||
if not self._get_cache_manager():
|
||||
return
|
||||
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
|
||||
return self._get_cache_manager().add_entry(
|
||||
keys=keys, source_folder=source_folder, exclude_sub_folders=exclude_sub_folders)
|
||||
|
||||
try:
|
||||
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
|
||||
return self._get_cache_manager().add_entry(
|
||||
keys=keys, source_folder=source_folder, exclude_sub_folders=exclude_sub_folders)
|
||||
except Exception as ex:
|
||||
print("WARNING: Failed accessing venvs cache at {}: {}".format(source_folder, ex))
|
||||
print("WARNING: Skipping venv cache - folder not accessible!")
|
||||
return None
|
||||
|
||||
def get_cache_folder(self):
|
||||
# type: () -> Optional[Path]
|
||||
@@ -280,12 +291,19 @@ class PackageManager(object):
|
||||
|
||||
def _get_cache_manager(self):
|
||||
if not self._cache_manager:
|
||||
cache_folder = ENV_VENV_CACHE_PATH.get() or self.session.config.get(self._config_cache_folder, None)
|
||||
if not cache_folder:
|
||||
cache_folder = None
|
||||
try:
|
||||
cache_folder = ENV_VENV_CACHE_PATH.get() or self.session.config.get(self._config_cache_folder, None)
|
||||
if not cache_folder:
|
||||
return None
|
||||
|
||||
max_entries = int(self.session.config.get(self._config_cache_max_entries, 10))
|
||||
free_space_threshold = float(self.session.config.get(self._config_cache_free_space_threshold, 0))
|
||||
self._cache_manager = FolderCache(
|
||||
cache_folder, max_cache_entries=max_entries, min_free_space_gb=free_space_threshold)
|
||||
except Exception as ex:
|
||||
print("WARNING: Failed accessing venvs cache at {}: {}".format(cache_folder, ex))
|
||||
print("WARNING: Skipping venv cache - folder not accessible!")
|
||||
return None
|
||||
|
||||
max_entries = int(self.session.config.get(self._config_cache_max_entries, 10))
|
||||
free_space_threshold = float(self.session.config.get(self._config_cache_free_space_threshold, 0))
|
||||
self._cache_manager = FolderCache(
|
||||
cache_folder, max_cache_entries=max_entries, min_free_space_gb=free_space_threshold)
|
||||
return self._cache_manager
|
||||
|
||||
@@ -50,6 +50,14 @@ class ExternalRequirements(SimpleSubstitution):
|
||||
print("No need to reinstall \'{}\' from VCS, "
|
||||
"the exact same version is already installed".format(req.name))
|
||||
continue
|
||||
|
||||
if not req.pip_new_version:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
freeze_base = PackageManager.out_of_scope_freeze() or dict(pip=[])
|
||||
except Exception:
|
||||
freeze_base = dict(pip=[])
|
||||
|
||||
req_line = self._add_vcs_credentials(req, session)
|
||||
|
||||
# if we have older pip version we have to make sure we replace back the package name with the
|
||||
@@ -58,14 +66,14 @@ class ExternalRequirements(SimpleSubstitution):
|
||||
PackageManager.out_of_scope_install_package(req_line, "--no-deps")
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
freeze_post = PackageManager.out_of_scope_freeze() or ''
|
||||
freeze_post = PackageManager.out_of_scope_freeze() or dict(pip=[])
|
||||
package_name = list(set(freeze_post['pip']) - set(freeze_base['pip']))
|
||||
if package_name and package_name[0] not in self.post_install_req_lookup:
|
||||
self.post_install_req_lookup[package_name[0]] = req.req.line
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# no need to force reinstall, pip will always rebuilt if the package comes from git
|
||||
# no need to force reinstall, pip will always rebuild if the package comes from git
|
||||
# and make sure the required packages are installed (if they are not it will install them)
|
||||
if not PackageManager.out_of_scope_install_package(req_line):
|
||||
raise ValueError("Failed installing GIT/HTTPs package \'{}\'".format(req_line))
|
||||
@@ -86,12 +94,13 @@ class ExternalRequirements(SimpleSubstitution):
|
||||
vcs_url = vcs_url[::-1].replace(fragment[::-1], '', 1)[::-1]
|
||||
# remove ssh:// or git:// prefix for git detection and credentials
|
||||
scheme = ''
|
||||
full_vcs_url = vcs_url
|
||||
if vcs_url and (vcs_url.startswith('ssh://') or vcs_url.startswith('git://')):
|
||||
scheme = 'ssh://' # notice git:// is actually ssh://
|
||||
vcs_url = vcs_url[6:]
|
||||
|
||||
from ..repo import Git
|
||||
vcs = Git(session=session, url=vcs_url, location=None, revision=None)
|
||||
vcs = Git(session=session, url=full_vcs_url, location=None, revision=None)
|
||||
vcs._set_ssh_url()
|
||||
new_req_line = 'git+{}{}{}'.format(
|
||||
'' if scheme and '://' in vcs.url else scheme,
|
||||
|
||||
@@ -69,6 +69,11 @@ class PoetryConfig:
|
||||
path = path.replace(':'+sys.base_prefix, ':'+sys.real_prefix, 1)
|
||||
kwargs['env']['PATH'] = path
|
||||
|
||||
if self.session and self.session.config:
|
||||
extra_args = self.session.config.get("agent.package_manager.poetry_install_extra_args", None)
|
||||
if extra_args:
|
||||
args = args + tuple(extra_args)
|
||||
|
||||
if check_if_command_exists("poetry"):
|
||||
argv = Argv("poetry", *args)
|
||||
else:
|
||||
|
||||
@@ -493,7 +493,15 @@ class PytorchRequirement(SimpleSubstitution):
|
||||
|
||||
if req.specs and len(req.specs) == 1 and req.specs[0][0] == "==":
|
||||
# remove any +cu extension and let pip resolve that
|
||||
line = "{} {}".format(req.name, req.format_specs(max_num_parts=3))
|
||||
# and add .* if we have 3 parts version to deal with nvidia container 'a' version
|
||||
# i.e. "1.13.0" -> "1.13.0.*" so it should match preinstalled "1.13.0a0+936e930"
|
||||
spec_3_parts = req.format_specs(num_parts=3)
|
||||
spec_max3_parts = req.format_specs(max_num_parts=3)
|
||||
if spec_3_parts == spec_max3_parts and not spec_max3_parts.endswith("*"):
|
||||
line = "{} {}.*".format(req.name, spec_max3_parts)
|
||||
else:
|
||||
line = "{} {}".format(req.name, spec_max3_parts)
|
||||
|
||||
if req.marker:
|
||||
line += " ; {}".format(req.marker)
|
||||
else:
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from tempfile import mkdtemp
|
||||
from typing import Text
|
||||
|
||||
from furl import furl
|
||||
@@ -20,7 +21,16 @@ class RequirementsTranslator(object):
|
||||
config = session.config
|
||||
self.cache_dir = cache_dir or Path(config["agent.pip_download_cache.path"]).expanduser().as_posix()
|
||||
self.enabled = config["agent.pip_download_cache.enabled"]
|
||||
Path(self.cache_dir).mkdir(parents=True, exist_ok=True)
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
Path(self.cache_dir).mkdir(parents=True, exist_ok=True)
|
||||
except Exception:
|
||||
temp_cache_folder = mkdtemp(prefix='pip_download_cache.')
|
||||
print("Failed creating pip download cache folder at `{}` reverting to `{}`".format(
|
||||
self.cache_dir, temp_cache_folder))
|
||||
self.cache_dir = temp_cache_folder
|
||||
Path(self.cache_dir).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.config = Config()
|
||||
self.pip = SystemPip(interpreter=interpreter, session=self._session)
|
||||
self._translate_back = {}
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = '1.5.2rc0'
|
||||
__version__ = '1.5.2'
|
||||
|
||||
@@ -1,16 +1,36 @@
|
||||
#!/bin/sh
|
||||
#!/bin/bash +x
|
||||
|
||||
CLEARML_FILES_HOST=${CLEARML_FILES_HOST:-$TRAINS_FILES_HOST}
|
||||
if [ -n "$SHUTDOWN_IF_NO_ACCESS_KEY" ] && [ -z "$CLEARML_API_ACCESS_KEY" ] && [ -z "$TRAINS_API_ACCESS_KEY" ]; then
|
||||
echo "CLEARML_API_ACCESS_KEY was not provided, service will not be started"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
export CLEARML_FILES_HOST=${CLEARML_FILES_HOST:-$TRAINS_FILES_HOST}
|
||||
|
||||
if [ -z "$CLEARML_FILES_HOST" ]; then
|
||||
CLEARML_HOST_IP=${CLEARML_HOST_IP:-${TRAINS_HOST_IP:-$(curl -s https://ifconfig.me/ip)}}
|
||||
fi
|
||||
|
||||
CLEARML_FILES_HOST=${CLEARML_FILES_HOST:-${TRAINS_FILES_HOST:-"http://$CLEARML_HOST_IP:8081"}}
|
||||
CLEARML_WEB_HOST=${CLEARML_WEB_HOST:-${TRAINS_WEB_HOST:-"http://$CLEARML_HOST_IP:8080"}}
|
||||
CLEARML_API_HOST=${CLEARML_API_HOST:-${TRAINS_API_HOST:-"http://$CLEARML_HOST_IP:8008"}}
|
||||
export CLEARML_FILES_HOST=${CLEARML_FILES_HOST:-${TRAINS_FILES_HOST:-"http://$CLEARML_HOST_IP:8081"}}
|
||||
export CLEARML_WEB_HOST=${CLEARML_WEB_HOST:-${TRAINS_WEB_HOST:-"http://$CLEARML_HOST_IP:8080"}}
|
||||
export CLEARML_API_HOST=${CLEARML_API_HOST:-${TRAINS_API_HOST:-"http://$CLEARML_HOST_IP:8008"}}
|
||||
|
||||
echo $CLEARML_FILES_HOST $CLEARML_WEB_HOST $CLEARML_API_HOST 1>&2
|
||||
|
||||
python3 -m pip install -q -U "clearml-agent${CLEARML_AGENT_UPDATE_VERSION:-$TRAINS_AGENT_UPDATE_VERSION}"
|
||||
clearml-agent daemon --services-mode --queue services --create-queue --docker "${CLEARML_AGENT_DEFAULT_BASE_DOCKER:-$TRAINS_AGENT_DEFAULT_BASE_DOCKER}" --cpu-only ${CLEARML_AGENT_EXTRA_ARGS:-$TRAINS_AGENT_EXTRA_ARGS}
|
||||
if [[ "$CLEARML_AGENT_UPDATE_VERSION" =~ ^[0-9]{1,3}\.[0-9]{1,3}(\.[0-9]{1,3}([a-zA-Z]{1,3}[0-9]{1,3})?)?$ ]]
|
||||
then
|
||||
CLEARML_AGENT_UPDATE_VERSION="==$CLEARML_AGENT_UPDATE_VERSION"
|
||||
fi
|
||||
|
||||
DAEMON_OPTIONS=${CLEARML_AGENT_DAEMON_OPTIONS:---services-mode --create-queue}
|
||||
QUEUES=${CLEARML_AGENT_QUEUES:-services}
|
||||
|
||||
if [ -z "$CLEARML_AGENT_NO_UPDATE" ]; then
|
||||
if [ -n "$CLEARML_AGENT_UPDATE_REPO" ]; then
|
||||
python3 -m pip install -q -U $CLEARML_AGENT_UPDATE_REPO
|
||||
else
|
||||
python3 -m pip install -q -U "clearml-agent${CLEARML_AGENT_UPDATE_VERSION:-$TRAINS_AGENT_UPDATE_VERSION}"
|
||||
fi
|
||||
fi
|
||||
|
||||
clearml-agent daemon $DAEMON_OPTIONS --queue $QUEUES --docker "${CLEARML_AGENT_DEFAULT_BASE_DOCKER:-$TRAINS_AGENT_DEFAULT_BASE_DOCKER}" --cpu-only ${CLEARML_AGENT_EXTRA_ARGS:-$TRAINS_AGENT_EXTRA_ARGS}
|
||||
|
||||
@@ -82,6 +82,7 @@ agent {
|
||||
# pip_version: ["<20.2 ; python_version < '3.10'", "<22.3 ; python_version >= '3.10'"]
|
||||
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
|
||||
# poetry_version: "<2",
|
||||
# poetry_install_extra_args: ["-v"]
|
||||
|
||||
# virtual environment inheres packages from system
|
||||
system_site_packages: false,
|
||||
|
||||
Reference in New Issue
Block a user