Compare commits

...

53 Commits

Author SHA1 Message Date
clearml
3273f76b46 Version bump to v1.9.2 2024-10-28 18:33:04 +02:00
clearml
9af0f9fe41 Fix reload method is found in the config object 2024-10-28 18:12:22 +02:00
clearml
205cd47cb9 Fix use req_token_expiration_sec when creating a task session and not the default value 2024-10-28 18:11:42 +02:00
clearml
0ff428bb96 Fix report index not advancing in resource monitoring causes more than one GPU not to be reported 2024-10-28 18:11:00 +02:00
Matteo Destro
bf8d9c96e9 Handle OSError when checking for is_file (#215) 2024-10-13 10:08:03 +03:00
allegroai
a88487ff25 Add support for pip legacy resolver for versions specified in the agent.package_manager.pip_legacy_resolver configuration option
Add skip existing packages
2024-09-22 22:36:06 +03:00
Jake Henning
785e22dc87 Version bump to v1.9.1 2024-09-02 01:04:49 +03:00
Jake Henning
6a2b778d53 Add default pip version support for Python 3.12 2024-09-02 01:03:52 +03:00
allegroai
b2c3702830 Version bump to v1.9.0 2024-08-28 23:18:26 +03:00
allegroai
6302d43990 Add support for skipping container apt installs using CLEARML_AGENT_SKIP_CONTAINER_APT env var in k8s
Add runtime callback support for setting runtime properties per task in k8s
Fix remove task from pending queue and set to failed when kubectl apply fails
2024-08-27 23:01:27 +03:00
allegroai
760bbca74e Fix failed Task in services mode logged "User aborted" instead of failed, add Task reason string 2024-08-27 22:56:37 +03:00
allegroai
e63fd31420 Fix string format 2024-08-27 22:55:49 +03:00
allegroai
2ff9985db7 Add user ID to the vault loading print 2024-08-27 22:55:32 +03:00
allegroai
b8c762401b Fix use same state transition if supported by the server (instead of stopping the task before re-enqueue) 2024-08-27 22:54:45 +03:00
allegroai
99e1e54f94 Add support for tasks containing only bash script or python module command 2024-08-27 22:53:14 +03:00
allegroai
a4d3b5bad6 Fix only set Task started status on node rank 0 2024-08-27 22:52:31 +03:00
allegroai
b21665ed6e Fix do not cache venv cache if venv/python skip env var was set 2024-08-27 22:52:01 +03:00
Surya Teja
f877aa96e2 Update Docker base image to Ubuntu 22.04 and Kubectl to 1.29.3 (#201) 2024-07-29 18:41:50 +03:00
pollfly
f99344d194 Add queue priority info to CLI help (#211)
* add queue priority comment

* Add --order-fairness info

---------

Co-authored-by: Jake Henning <59198928+jkhenning@users.noreply.github.com>
2024-07-29 18:40:38 +03:00
allegroai
d9f2a1999a Fix Only send pip freeze update on RANK 0, only update task status on exit on RANK 0 2024-07-29 17:40:24 +03:00
Valentin Schabschneider
79d0abe707 Add NO_DOCKER flag to clearml-agent-services entrypoint (#206) 2024-07-26 19:09:54 +03:00
allegroai
6213ef4c02 Add /bin/bash -c "command" support. Task binary should be set to /bin/bash and entry_point should be set to -c command 2024-07-24 18:00:13 +03:00
allegroai
aef6aa9fc8 Fix a race condition where in rare conditions popping a Task from a queue that was aborted did not set it to started before the watchdog killed it. Does not happen in k8s/slurm 2024-07-24 17:59:46 +03:00
allegroai
0bb267115b Add venvs_cache.path mount override for non-root containers (use: agent.docker_internal_mounts.venvs_cache) 2024-07-24 17:59:18 +03:00
allegroai
f89a92556f Fix check logger is not None 2024-07-24 17:55:02 +03:00
allegroai
8ba4d75e80 Add CLEARML_TASK_ID and auth token to pod env vars in original entrypoint flow 2024-07-24 17:47:48 +03:00
allegroai
edc333ba5f Add K8S_GLUE_POD_USE_IMAGE_ENTRYPOINT to allow running images without overriding the entrypoint (useful for agents using prebuilt images in k8s) 2024-07-24 17:46:27 +03:00
allegroai
2f0553b873 Fix CLEARML_MULTI_NODE_SINGLE_TASK should be read once not every reported line 2024-07-24 17:45:02 +03:00
allegroai
b2a4bf08ac Fix pass --docker only (i.e. no default container image) for --dynamic-gpus feature 2024-07-24 17:44:35 +03:00
allegroai
f18c6b809f Fix slurm multi-node rank detection 2024-07-24 17:44:05 +03:00
allegroai
cd5b4d2186 Add "-m module args" in script entry now supports standalone script, standalone script is converted to "untitled.py" by default or if specified in working_dir such as <dir>:<target_file> for example ".:standalone.py" 2024-07-24 17:43:21 +03:00
allegroai
5f1bab6711 Add default docker match_rules for enterprise users,
NOTICE: matching_rules are ignored if `--docker container` is passed in command line
2024-07-24 17:42:55 +03:00
allegroai
ab9b9db0c9 Add CLEARML_MULTI_NODE_SINGLE_TASK (values -1, 0, 1, 2) for easier multi-node singe Task workloads 2024-07-24 17:42:25 +03:00
allegroai
93df021108 Add support for .ipynb script entry files (install nbconvert in runtime, copnvert to python and execute the python script), including CLEARML_AGENT_FORCE_TASK_INIT patching of ipynb files (post python conversion) 2024-07-24 17:41:59 +03:00
allegroai
700ae85de0 Fix file mode should be optional in configuration files section 2024-07-24 17:41:06 +03:00
allegroai
f367c5a571 Fix git fetch did not update new tags #209 2024-07-24 17:39:53 +03:00
allegroai
ebc5944b44 Fix setting tasks that someone just marked as aborted to started - only force Task to started after dequeuing it otherwise lease it as is 2024-07-24 17:39:26 +03:00
allegroai
8f41002845 Add task.script.binary /bin/bash support
Fix -m module $env to support parsing the $env before launching
2024-07-24 17:37:26 +03:00
allegroai
7e8670d57f Find the correct python version when using a pre-installed python environment 2024-07-21 14:10:38 +03:00
allegroai
77de343863 Use "venv" module if virtualenv is not supported 2024-07-19 13:18:07 +03:00
allegroai
6b31883e45 Fix queue resolution when no queue is passed 2024-05-15 18:30:24 +03:00
allegroai
e48b4756fa Add Python 3.12 support 2024-05-15 18:25:29 +03:00
allegroai
47147e3237 Fix cached repositories were not passing user/token when pulling, agent.vcs_cache.clone_on_pull_fail now defaults to false 2024-04-19 23:50:17 +03:00
allegroai
41fc4ec646 Fix disabling vcs cache should not add vcs mount point to container 2024-04-19 23:48:50 +03:00
allegroai
441e5a73b2 Fix conda env should not be cached if installing into base conda or conda existing env exists 2024-04-19 23:48:10 +03:00
allegroai
27ed6821c4 Add mirrorD config files to gitignore 2024-04-19 23:47:34 +03:00
allegroai
10c6629982 Support skipping re-enqueue on suspected preempted k8s pods 2024-04-19 23:46:57 +03:00
allegroai
6fb48a4c6e Revert version to v1.8.1 2024-04-19 23:44:31 +03:00
allegroai
105ade31f1 Version bump to v1.8.2 2024-04-14 18:18:10 +03:00
allegroai
502e266b6b Fix polling interval missing when not using daemon mode 2024-04-14 18:17:57 +03:00
allegroai
cd9a3b9f4e Version bump to v1.8.1 2024-04-12 20:30:11 +03:00
allegroai
4179ac5234 Fix git pulling on cached invalid git entry. On error, re-clone the entire repo again (disable using "agent.vcs_cache.clone_on_pull_fail: false") 2024-04-12 20:29:36 +03:00
Liron Ilouz
98cc0d86ba Add option to set daemon polling interval (#197)
* add option to set worker polling interval

* polling interval minimum value

---------

Co-authored-by: Liron <liron@tapwithus.com>
2024-04-03 14:33:52 +03:00
34 changed files with 1189 additions and 310 deletions

2
.gitignore vendored
View File

@@ -14,3 +14,5 @@ dist/
# VSCode
.vscode
# MirrorD
.mirrord

View File

@@ -66,7 +66,7 @@
type: pip,
# specify pip version to use (examples "<20.2", "==19.3.1", "", empty string will install the latest version)
pip_version: ["<20.2 ; python_version < '3.10'", "<22.3 ; python_version >= '3.10'"],
pip_version: ["<20.2 ; python_version < '3.10'", "<22.3 ; python_version >= '3.10' and python_version <= '3.11'", ">=23,<24.3 ; python_version >= '3.12'"]
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
# poetry_version: "<2",
# poetry_install_extra_args: ["-v"]
@@ -80,6 +80,14 @@
# additional artifact repositories to use when installing python packages
# extra_index_url: ["https://allegroai.jfrog.io/clearml/api/pypi/public/simple"]
# turn on the "--use-deprecated=legacy-resolver" flag for pip, to avoid package dependency version mismatch
# is any version restrictions are matched we add the "--use-deprecated=legacy-resolver" flag
# example: pip_legacy_resolver = [">=20.3,<24.3", ">99"]
# if pip==20.2 or pip==29.0 is installed we do nothing,
# if pip==21.1 or pip==101.1 is installed the flag is added
# disable the feature by passing an empty list
pip_legacy_resolver = [">=20.3,<24.3"]
# control the pytorch wheel resolving algorithm, options are: "pip", "direct", "none"
# Override with environment variable CLEARML_AGENT_PACKAGE_PYTORCH_RESOLVE
# "pip" (default): would automatically detect the cuda version, and supply pip with the correct
@@ -218,6 +226,76 @@
# optional arguments to pass to docker image
# arguments: ["--ipc=host", ]
# Choose the default docker based on the Task properties,
# Notice: Enterprise feature, ignored otherwise
# Examples: 'script.requirements', 'script.binary', 'script.repository', 'script.branch', 'project'
# Notice: Matching is done via regular expression, for example "^searchme$" will match exactly "searchme" string
"match_rules": [
{
"image": "python:3.6-bullseye",
"arguments": "--ipc=host",
"match": {
"script": {
"binary": "python3.6$",
},
}
},
{
"image": "python:3.7-bullseye",
"arguments": "--ipc=host",
"match": {
"script": {
"binary": "python3.7$",
},
}
},
{
"image": "python:3.8-bullseye",
"arguments": "--ipc=host",
"match": {
"script": {
"binary": "python3.8$",
},
}
},
{
"image": "python:3.9-bullseye",
"arguments": "--ipc=host",
"match": {
"script": {
"binary": "python3.9$",
},
}
},
{
"image": "python:3.10-bullseye",
"arguments": "--ipc=host",
"match": {
"script": {
"binary": "python3.10$",
},
}
},
{
"image": "python:3.11-bullseye",
"arguments": "--ipc=host",
"match": {
"script": {
"binary": "python3.11$",
},
}
},
{
"image": "python:3.12-bullseye",
"arguments": "--ipc=host",
"match": {
"script": {
"binary": "python3.12$",
},
}
},
]
}
# set the OS environments based on the Task's Environment section before launching the Task process.
@@ -289,6 +367,7 @@
pip_cache: "/root/.cache/pip"
poetry_cache: "/root/.cache/pypoetry"
vcs_cache: "/root/.clearml/vcs-cache"
venvs_cache: "/root/.clearml/venvs-cache"
venv_build: "~/.clearml/venvs-builds"
pip_download: "/root/.clearml/pip-download-cache"
}

View File

@@ -22,6 +22,9 @@ ENV_INITIAL_CONNECT_RETRY_OVERRIDE = EnvEntry(
'CLEARML_AGENT_INITIAL_CONNECT_RETRY_OVERRIDE', default=True, converter=safe_text_to_bool
)
ENV_FORCE_MAX_API_VERSION = EnvEntry("CLEARML_AGENT_FORCE_MAX_API_VERSION", type=str)
# values are 0/None (task per node), 1/2 (multi-node reporting, colored console), -1 (only report rank 0 node)
ENV_MULTI_NODE_SINGLE_TASK = EnvEntry("CLEARML_MULTI_NODE_SINGLE_TASK", type=int, default=None)
"""
Experimental option to set the request method for all API requests and auth login.

View File

@@ -64,6 +64,8 @@ class Session(TokenManager):
default_key = "EGRTCO8JMSIGI6S39GTP43NFWXDQOW"
default_secret = "x!XTov_G-#vspE*Y(h$Anm&DIc5Ou-F)jsl$PdOyj5wG1&E!Z8"
force_max_api_version = ENV_FORCE_MAX_API_VERSION.get()
server_version = "1.0.0"
user_id = None
# TODO: add requests.codes.gateway_timeout once we support async commits
_retry_codes = [
@@ -191,6 +193,8 @@ class Session(TokenManager):
Session.api_version = str(api_version)
Session.feature_set = str(token_dict.get('feature_set', self.feature_set) or "basic")
Session.server_version = token_dict.get('server_version', self.server_version)
Session.user_id = (token_dict.get("identity") or {}).get("user") or ""
except (jwt.DecodeError, ValueError):
pass
@@ -256,8 +260,9 @@ class Session(TokenManager):
def parse(vault):
# noinspection PyBroadException
try:
print("Loaded {} vault: {}".format(
print("Loaded {} vault{}: {}".format(
vault.get("scope", ""),
"" if not self.user_id else " for user {}".format(self.user_id),
(vault.get("description", None) or "")[:50] or vault.get("id", ""))
)
d = vault.get("data", None)
@@ -341,11 +346,12 @@ class Session(TokenManager):
if self._propagate_exceptions_on_send:
raise
sleep_time = sys_random.uniform(*self._request_exception_retry_timeout)
self._logger.error(
"{} exception sending {} {}: {} (retrying in {:.1f}sec)".format(
type(ex).__name__, method.upper(), url, str(ex), sleep_time
if self._logger:
self._logger.error(
"{} exception sending {} {}: {} (retrying in {:.1f}sec)".format(
type(ex).__name__, method.upper(), url, str(ex), sleep_time
)
)
)
time.sleep(sleep_time)
continue
@@ -364,11 +370,12 @@ class Session(TokenManager):
res.status_code == requests.codes.service_unavailable
and self.config.get("api.http.wait_on_maintenance_forever", True)
):
self._logger.warning(
"Service unavailable: {} is undergoing maintenance, retrying...".format(
host
if self._logger:
self._logger.warning(
"Service unavailable: {} is undergoing maintenance, retrying...".format(
host
)
)
)
continue
break
self._session_requests += 1
@@ -649,11 +656,14 @@ class Session(TokenManager):
"""
Return True if Session.api_version is greater or equal >= to min_api_version
"""
def version_tuple(v):
v = tuple(map(int, (v.split("."))))
return v + (0,) * max(0, 3 - len(v))
return version_tuple(cls.api_version) >= version_tuple(str(min_api_version))
@classmethod
def check_min_server_version(cls, min_server_version):
"""
Return True if Session.server_version is greater or equal >= to min_server_version
"""
return version_tuple(cls.server_version) >= version_tuple(str(min_server_version))
def _do_refresh_token(self, current_token, exp=None):
""" TokenManager abstract method implementation.
Here we ignore the old token and simply obtain a new token.
@@ -731,3 +741,8 @@ class Session(TokenManager):
def propagate_exceptions_on_send(self, value):
# type: (bool) -> None
self._propagate_exceptions_on_send = value
def version_tuple(v):
v = tuple(map(int, (v.split("."))))
return v + (0,) * max(0, 3 - len(v))

View File

@@ -53,7 +53,7 @@ def apply_files(config):
target_fmt = data.get("target_format", "string")
overwrite = bool(data.get("overwrite", True))
contents = data.get("contents")
mode = data.get("mode")
mode = data.get("mode", None)
target = Path(expanduser(expandvars(path)))

View File

@@ -1,14 +1,16 @@
import json
import re
import shlex
from copy import copy
from clearml_agent.backend_api.session import Request
from clearml_agent.helper.docker_args import DockerArgsSanitizer
from clearml_agent.helper.package.requirements import (
RequirementsManager, MarkerRequirement,
compare_version_rules, )
def resolve_default_container(session, task_id, container_config):
def resolve_default_container(session, task_id, container_config, ignore_match_rules=False):
container_lookup = session.config.get('agent.default_docker.match_rules', None)
if not session.check_min_api_version("2.13") or not container_lookup:
return container_config
@@ -17,6 +19,12 @@ def resolve_default_container(session, task_id, container_config):
try:
session.verify_feature_set('advanced')
except ValueError:
# ignoring matching rules only supported in higher tiers
return container_config
if ignore_match_rules:
print("INFO: default docker command line override, ignoring default docker container match rules")
# ignoring matching rules only supported in higher tiers
return container_config
result = session.send_request(
@@ -159,9 +167,10 @@ def resolve_default_container(session, task_id, container_config):
if not container_config.get('image'):
container_config['image'] = entry.get('image', None)
if not container_config.get('arguments'):
container_config['arguments'] = entry.get('arguments', None)
container_config['arguments'] = shlex.split(str(container_config.get('arguments') or '').strip())
print('Matching default container with rule:\n{}'.format(json.dumps(entry)))
container_config['arguments'] = entry.get('arguments', None) or ''
if isinstance(container_config.get('arguments'), str):
container_config['arguments'] = shlex.split(str(container_config.get('arguments') or '').strip())
print('INFO: Matching default container with rule:\n{}'.format(json.dumps(entry)))
return container_config
return container_config

File diff suppressed because it is too large Load Diff

View File

@@ -167,6 +167,7 @@ ENV_AGENT_GIT_USER = EnvironmentConfig("CLEARML_AGENT_GIT_USER", "TRAINS_AGENT_G
ENV_AGENT_GIT_PASS = EnvironmentConfig("CLEARML_AGENT_GIT_PASS", "TRAINS_AGENT_GIT_PASS")
ENV_AGENT_GIT_HOST = EnvironmentConfig("CLEARML_AGENT_GIT_HOST", "TRAINS_AGENT_GIT_HOST")
ENV_AGENT_DISABLE_SSH_MOUNT = EnvironmentConfig("CLEARML_AGENT_DISABLE_SSH_MOUNT", type=bool)
ENV_AGENT_DEBUG_GET_NEXT_TASK = EnvironmentConfig("CLEARML_AGENT_DEBUG_GET_NEXT_TASK", type=bool)
ENV_SSH_AUTH_SOCK = EnvironmentConfig("SSH_AUTH_SOCK")
ENV_TASK_EXECUTE_AS_USER = EnvironmentConfig("CLEARML_AGENT_EXEC_USER", "TRAINS_AGENT_EXEC_USER")
ENV_TASK_EXTRA_PYTHON_PATH = EnvironmentConfig("CLEARML_AGENT_EXTRA_PYTHON_PATH", "TRAINS_AGENT_EXTRA_PYTHON_PATH")

View File

@@ -9,3 +9,12 @@ Script will be appended to the specified file.
ENV_DEFAULT_EXECUTION_AGENT_ARGS = EnvEntry("K8S_GLUE_DEF_EXEC_AGENT_ARGS", default="--full-monitoring --require-queue")
ENV_POD_AGENT_INSTALL_ARGS = EnvEntry("K8S_GLUE_POD_AGENT_INSTALL_ARGS", default="", lstrip=False)
ENV_POD_MONITOR_LOG_BATCH_SIZE = EnvEntry("K8S_GLUE_POD_MONITOR_LOG_BATCH_SIZE", default=5, converter=int)
ENV_POD_MONITOR_DISABLE_ENQUEUE_ON_PREEMPTION = EnvEntry(
"K8S_GLUE_POD_MONITOR_DISABLE_ENQUEUE_ON_PREEMPTION", default=False, converter=bool
)
ENV_POD_USE_IMAGE_ENTRYPOINT = EnvEntry("K8S_GLUE_POD_USE_IMAGE_ENTRYPOINT", default=False, converter=bool)
"""
Do not inject a cmd and args to the container's image when building the k8s template (depend on the built-in image
entrypoint)
"""

View File

@@ -25,6 +25,7 @@ from clearml_agent.definitions import (
ENV_AGENT_GIT_USER,
ENV_AGENT_GIT_PASS,
ENV_FORCE_SYSTEM_SITE_PACKAGES,
ENV_AGENT_DEBUG_GET_NEXT_TASK,
)
from clearml_agent.errors import APIError, UsageError
from clearml_agent.glue.errors import GetPodCountError
@@ -40,6 +41,7 @@ from clearml_agent.glue.definitions import (
ENV_START_AGENT_SCRIPT_PATH,
ENV_DEFAULT_EXECUTION_AGENT_ARGS,
ENV_POD_AGENT_INSTALL_ARGS,
ENV_POD_USE_IMAGE_ENTRYPOINT,
)
@@ -67,16 +69,23 @@ class K8sIntegration(Worker):
'echo "ldconfig" >> /etc/profile',
"/usr/sbin/sshd -p {port}"]
CONTAINER_BASH_SCRIPT = [
_CONTAINER_APT_SCRIPT_SECTION = [
"export DEBIAN_FRONTEND='noninteractive'",
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean",
"chown -R root /root/.cache/pip",
"apt-get update",
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0",
]
CONTAINER_BASH_SCRIPT = [
*(
'[ ! -z "$CLEARML_AGENT_SKIP_CONTAINER_APT" ] || {}'.format(line)
for line in _CONTAINER_APT_SCRIPT_SECTION
),
"declare LOCAL_PYTHON",
"[ ! -z $LOCAL_PYTHON ] || for i in {{15..5}}; do which python3.$i && python3.$i -m pip --version && "
"export LOCAL_PYTHON=$(which python3.$i) && break ; done",
"[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip",
'[ ! -z "$CLEARML_AGENT_SKIP_CONTAINER_APT" ] || [ ! -z "$LOCAL_PYTHON" ] || apt-get install -y python3-pip',
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
"{extra_bash_init_cmd}",
"[ ! -z $CLEARML_AGENT_NO_UPDATE ] || $LOCAL_PYTHON -m pip install clearml-agent{agent_install_args}",
@@ -98,6 +107,7 @@ class K8sIntegration(Worker):
num_of_services=20,
base_pod_num=1,
user_props_cb=None,
runtime_cb=None,
overrides_yaml=None,
template_yaml=None,
clearml_conf_file=None,
@@ -125,6 +135,7 @@ class K8sIntegration(Worker):
:param callable user_props_cb: An Optional callable allowing additional user properties to be specified
when scheduling a task to run in a pod. Callable can receive an optional pod number and should return
a dictionary of user properties (name and value). Signature is [[Optional[int]], Dict[str,str]]
:param callable runtime_cb: An Optional callable allowing additional task runtime to be specified (see user_props_cb)
:param str overrides_yaml: YAML file containing the overrides for the pod (optional)
:param str template_yaml: YAML file containing the template for the pod (optional).
If provided the pod is scheduled with kubectl apply and overrides are ignored, otherwise with kubectl run.
@@ -159,6 +170,7 @@ class K8sIntegration(Worker):
self.base_pod_num = base_pod_num
self._edit_hyperparams_support = None
self._user_props_cb = user_props_cb
self._runtime_cb = runtime_cb
self.conf_file_content = None
self.overrides_json_string = None
self.template_dict = None
@@ -192,6 +204,14 @@ class K8sIntegration(Worker):
self._min_cleanup_interval_per_ns_sec = 1.0
self._last_pod_cleanup_per_ns = defaultdict(lambda: 0.)
self._server_supports_same_state_transition = (
self._session.feature_set != "basic" and self._session.check_min_server_version("3.22.3")
)
@property
def agent_label(self):
return self._get_agent_label()
def _create_daemon_instance(self, cls_, **kwargs):
return cls_(agent=self, **kwargs)
@@ -424,6 +444,9 @@ class K8sIntegration(Worker):
""" Called when a resource (pod/job) was applied """
pass
def ports_mode_supported_for_task(self, task_id: str, task_data):
return self.ports_mode
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
session = task_session or self._session
@@ -433,7 +456,9 @@ class K8sIntegration(Worker):
if self._is_same_tenant(task_session):
try:
print('Pushing task {} into temporary pending queue'.format(task_id))
_ = session.api_client.tasks.stop(task_id, force=True, status_reason="moving to k8s pending queue")
if not self._server_supports_same_state_transition:
_ = session.api_client.tasks.stop(task_id, force=True, status_reason="moving to k8s pending queue")
# Just make sure to clean up in case the task is stuck in the queue (known issue)
self._session.api_client.queues.remove_task(
@@ -493,8 +518,10 @@ class K8sIntegration(Worker):
)
)
if self.ports_mode:
ports_mode = False
if self.ports_mode_supported_for_task(task_id, task_data):
print("Kubernetes looking for available pod to use")
ports_mode = True
# noinspection PyBroadException
try:
@@ -505,12 +532,12 @@ class K8sIntegration(Worker):
# Search for a free pod number
pod_count = 0
pod_number = self.base_pod_num
while self.ports_mode or self.max_pods_limit:
while ports_mode or self.max_pods_limit:
pod_number = self.base_pod_num + pod_count
try:
items_count = self._get_pod_count(
extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if self.ports_mode else None,
extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if ports_mode else None,
msg="Looking for a free pod/port"
)
except GetPodCountError:
@@ -560,11 +587,11 @@ class K8sIntegration(Worker):
break
pod_count += 1
labels = self._get_pod_labels(queue, queue_name)
if self.ports_mode:
labels = self._get_pod_labels(queue, queue_name, task_data)
if ports_mode:
labels.append(self.limit_pod_label.format(pod_number=pod_number))
if self.ports_mode:
if ports_mode:
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
else:
print("Kubernetes scheduling task id={}".format(task_id))
@@ -595,6 +622,7 @@ class K8sIntegration(Worker):
task_id=task_id,
queue=queue,
namespace=namespace,
task_token=task_session.token.encode("ascii") if task_session else None,
)
print('kubectl output:\n{}\n{}'.format(error, output))
@@ -602,6 +630,14 @@ class K8sIntegration(Worker):
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
# Make sure to remove the task from our k8s pending queue
self._session.api_client.queues.remove_task(
task=task_id,
queue=self.k8s_pending_queue_id,
)
# Set task as failed
session.api_client.tasks.failed(task_id, force=True)
return
if pod_name:
@@ -609,25 +645,41 @@ class K8sIntegration(Worker):
resource_name=pod_name, namespace=namespace, task_id=task_id, session=session
)
self.set_task_info(
task_id=task_id, task_session=task_session, queue_name=queue_name, ports_mode=ports_mode,
pod_number=pod_number, pod_count=pod_count, task_data=task_data
)
def set_task_info(
self, task_id: str, task_session, task_data, queue_name: str, ports_mode: bool, pod_number, pod_count
):
user_props = {"k8s-queue": str(queue_name)}
if self.ports_mode:
user_props.update(
{
"k8s-pod-number": pod_number,
"k8s-pod-label": labels[0],
"k8s-internal-pod-count": pod_count,
"k8s-agent": self._get_agent_label(),
}
)
runtime = {}
if ports_mode:
agent_label = self._get_agent_label()
user_props.update({
"k8s-pod-number": pod_number,
"k8s-pod-label": agent_label, # backwards-compatibility / legacy
"k8s-internal-pod-count": pod_count,
"k8s-agent": agent_label,
})
if self._user_props_cb:
# noinspection PyBroadException
try:
custom_props = self._user_props_cb(pod_number) if self.ports_mode else self._user_props_cb()
custom_props = self._user_props_cb(pod_number) if ports_mode else self._user_props_cb()
user_props.update(custom_props)
except Exception:
pass
if self._runtime_cb:
# noinspection PyBroadException
try:
custom_runtime = self._runtime_cb(pod_number) if ports_mode else self._runtime_cb()
runtime.update(custom_runtime)
except Exception:
pass
if user_props:
self._set_task_user_properties(
task_id=task_id,
@@ -635,7 +687,38 @@ class K8sIntegration(Worker):
**user_props
)
def _get_pod_labels(self, queue, queue_name):
if runtime:
task_runtime = self._get_task_runtime(task_id) or {}
task_runtime.update(runtime)
try:
res = task_session.send_request(
service='tasks', action='edit', method=Request.def_method,
json={
"task": task_id, "force": True, "runtime": task_runtime
},
)
if not res.ok:
raise Exception("failed setting runtime property")
except Exception as ex:
print("WARNING: failed setting custom runtime properties for task '{}': {}".format(task_id, ex))
def _get_task_runtime(self, task_id) -> Optional[dict]:
try:
res = self._session.send_request(
service='tasks', action='get_by_id', method=Request.def_method,
json={"task": task_id, "only_fields": ["runtime"]},
)
if not res.ok:
raise ValueError(f"request returned {res.status_code}")
data = res.json().get("data")
if not data or "task" not in data:
raise ValueError("empty data in result")
return data["task"].get("runtime", {})
except Exception as ex:
print(f"ERROR: Failed getting runtime properties for task {task_id}: {ex}")
def _get_pod_labels(self, queue, queue_name, task_data):
return [
self._get_agent_label(),
"{}={}".format(self.QUEUE_LABEL, self._safe_k8s_label_value(queue)),
@@ -673,7 +756,7 @@ class K8sIntegration(Worker):
def _create_template_container(
self, pod_name: str, task_id: str, docker_image: str, docker_args: List[str],
docker_bash: str, clearml_conf_create_script: List[str], task_worker_id: str
docker_bash: str, clearml_conf_create_script: List[str], task_worker_id: str, task_token: str = None
) -> dict:
container = self._get_docker_args(
docker_args,
@@ -682,16 +765,32 @@ class K8sIntegration(Worker):
convert=lambda env: {'name': env.partition("=")[0], 'value': env.partition("=")[2]},
)
# Set worker ID
env_vars = container.get('env', [])
found_worker_id = False
for entry in env_vars:
if entry.get('name') == 'CLEARML_WORKER_ID':
entry['name'] = task_worker_id
found_worker_id = True
if not found_worker_id:
container['env'] = env_vars + [{'name': 'CLEARML_WORKER_ID', 'value': task_worker_id}]
def add_or_update_env_var(name, value):
env_vars = container.get('env', [])
for entry in env_vars:
if entry.get('name') == name:
entry['value'] = value
break
else:
container['env'] = env_vars + [{'name': name, 'value': value}]
# Set worker ID
add_or_update_env_var('CLEARML_WORKER_ID', task_worker_id)
if ENV_POD_USE_IMAGE_ENTRYPOINT.get():
# Don't add a cmd and args, just the image
# Add the task ID and token since we need it (it's usually in the init script passed to us
add_or_update_env_var('CLEARML_TASK_ID', task_id)
if task_token:
# TODO: find a way to base64 encode the token
add_or_update_env_var('CLEARML_AUTH_TOKEN', task_token)
return self._merge_containers(
container, dict(name=pod_name, image=docker_image)
)
# Create bash script for container and
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
else self.container_bash_script
@@ -740,7 +839,8 @@ class K8sIntegration(Worker):
task_id,
namespace,
template,
pod_number=None
pod_number=None,
task_token=None,
):
if "apiVersion" not in template:
template["apiVersion"] = "batch/v1" if self.using_jobs else "v1"
@@ -771,7 +871,7 @@ class K8sIntegration(Worker):
spec.setdefault('backoffLimit', 0)
spec_template = spec.setdefault('template', {})
if labels:
# Place same labels fro any pod spawned by the job
# Place same labels for any pod spawned by the job
place_labels(spec_template.setdefault('metadata', {}))
spec = spec_template.setdefault('spec', {})
@@ -788,7 +888,8 @@ class K8sIntegration(Worker):
docker_args=docker_args,
docker_bash=docker_bash,
clearml_conf_create_script=clearml_conf_create_script,
task_worker_id=task_worker_id
task_worker_id=task_worker_id,
task_token=task_token,
)
if containers:
@@ -935,7 +1036,7 @@ class K8sIntegration(Worker):
result = self._session.get(
service='tasks',
action='get_all',
json={"id": task_ids, "status": ["in_progress", "queued"], "only_fields": ["id", "status"]},
json={"id": task_ids, "status": ["in_progress", "queued"], "only_fields": ["id", "status", "status_reason"]},
method=Request.def_method,
)
tasks_to_abort = result["tasks"]
@@ -945,9 +1046,13 @@ class K8sIntegration(Worker):
for task in tasks_to_abort:
task_id = task.get("id")
status = task.get("status")
status_reason = (task.get("status_reason") or "").lower()
if not task_id or not status:
self.log.warning('Failed getting task information: id={}, status={}'.format(task_id, status))
continue
if status == "queued" and "pushed back by policy manager" in status_reason:
# Task was pushed back to policy queue by policy manager, don't touch it
continue
try:
if status == "queued":
self._session.get(
@@ -981,6 +1086,9 @@ class K8sIntegration(Worker):
return deleted_pods
def check_if_suspended(self) -> bool:
pass
def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):
"""
:summary: Pull and run tasks from queues.
@@ -992,6 +1100,8 @@ class K8sIntegration(Worker):
:param worker_params: Worker command line arguments
:type worker_params: ``clearml_agent.helper.process.WorkerParams``
"""
# print("debug> running tasks loop")
events_service = self.get_service(Events)
# make sure we have a k8s pending queue
@@ -1023,12 +1133,19 @@ class K8sIntegration(Worker):
continue
# iterate over queues (priority style, queues[0] is highest)
# print("debug> iterating over queues")
for queue in queues:
# delete old completed / failed pods
self._cleanup_old_pods(namespaces, extra_msg="Cleanup cycle {cmd}")
if self.check_if_suspended():
print("Agent is suspended, sleeping for {:.1f} seconds".format(self._polling_interval))
sleep(self._polling_interval)
break
# get next task in queue
try:
# print(f"debug> getting tasks for queue {queue}")
response = self._get_next_task(queue=queue, get_task_info=self._impersonate_as_task_owner)
except Exception as e:
print("Warning: Could not access task queue [{}], error: {}".format(queue, e))

View File

@@ -9,6 +9,7 @@ from clearml_agent.helper.process import stringify_bash_output
from .daemon import K8sDaemon
from .utilities import get_path
from .errors import GetPodsError
from .definitions import ENV_POD_MONITOR_DISABLE_ENQUEUE_ON_PREEMPTION
class PendingPodsDaemon(K8sDaemon):
@@ -17,16 +18,16 @@ class PendingPodsDaemon(K8sDaemon):
self._polling_interval = polling_interval
self._last_tasks_msgs = {} # last msg updated for every task
def get_pods(self, pod_name=None):
def get_pods(self, pod_name=None, debug_msg="Detecting pending pods: {cmd}"):
filters = ["status.phase=Pending"]
if pod_name:
filters.append(f"metadata.name={pod_name}")
if self._agent.using_jobs:
return self._agent.get_pods_for_jobs(
job_condition="status.active=1", pod_filters=filters, debug_msg="Detecting pending pods: {cmd}"
job_condition="status.active=1", pod_filters=filters, debug_msg=debug_msg
)
return self._agent.get_pods(filters=filters, debug_msg="Detecting pending pods: {cmd}")
return self._agent.get_pods(filters=filters, debug_msg=debug_msg)
def _get_pod_name(self, pod: dict):
return get_path(pod, "metadata", "name")
@@ -72,6 +73,11 @@ class PendingPodsDaemon(K8sDaemon):
if not namespace:
continue
updated_pod = self.get_pods(pod_name=pod_name, debug_msg="Refreshing pod information: {cmd}")
if not updated_pod:
continue
pod = updated_pod[0]
task_id_to_pod[task_id] = pod
msg = None
@@ -190,32 +196,39 @@ class PendingPodsDaemon(K8sDaemon):
if not msg or self._last_tasks_msgs.get(task_id, None) == (msg, tags):
return
try:
# Make sure the task is queued
result = self._session.send_request(
service='tasks',
action='get_all',
json={"id": task_id, "only_fields": ["status"]},
method=Request.def_method,
async_enable=False,
)
if result.ok:
status = get_path(result.json(), 'data', 'tasks', 0, 'status')
# if task is in progress, change its status to enqueued
if status == "in_progress":
result = self._session.send_request(
service='tasks', action='enqueue',
json={
"task": task_id, "force": True, "queue": self._agent.k8s_pending_queue_id
},
method=Request.def_method,
async_enable=False,
)
if not result.ok:
result_msg = get_path(result.json(), 'meta', 'result_msg')
self.log.debug(
"K8S Glue pods monitor: failed forcing task status change"
" for pending task {}: {}".format(task_id, result_msg)
if ENV_POD_MONITOR_DISABLE_ENQUEUE_ON_PREEMPTION.get():
# This disables the option to enqueue the task which is supposed to sync the ClearML task status
# in case the pod was preempted. In some cases this does not happen due to preemption but due to
# cluster communication lag issues that cause us not to discover the pod is no longer pending and
# enqueue the task when it's actually already running, thus essentially killing the task
pass
else:
# Make sure the task is queued
result = self._session.send_request(
service='tasks',
action='get_all',
json={"id": task_id, "only_fields": ["status"]},
method=Request.def_method,
async_enable=False,
)
if result.ok:
status = get_path(result.json(), 'data', 'tasks', 0, 'status')
# if task is in progress, change its status to enqueued
if status == "in_progress":
result = self._session.send_request(
service='tasks', action='enqueue',
json={
"task": task_id, "force": True, "queue": self._agent.k8s_pending_queue_id
},
method=Request.def_method,
async_enable=False,
)
if not result.ok:
result_msg = get_path(result.json(), 'meta', 'result_msg')
self.log.debug(
"K8S Glue pods monitor: failed forcing task status change"
" for pending task {}: {}".format(task_id, result_msg)
)
# Update task status message
payload = {"task": task_id, "status_message": "K8S glue status: {}".format(msg)}

View File

@@ -543,6 +543,36 @@ def convert_cuda_version_to_int_10_base_str(cuda_version):
return str(int(float(cuda_version)*10))
def get_python_version(python_executable, log=None):
from clearml_agent.helper.process import Argv
try:
output = Argv(python_executable, "--version").get_output(
stderr=subprocess.STDOUT
)
except subprocess.CalledProcessError as ex:
# Windows returns 9009 code and suggests to install Python from Windows Store
if is_windows_platform() and ex.returncode == 9009:
if log:
log.debug("version not found: {}".format(ex))
else:
if log:
log.warning("error getting %s version: %s", python_executable, ex)
return None
except FileNotFoundError as ex:
if log:
log.debug("version not found: {}".format(ex))
return None
match = re.search(r"Python ({}(?:\.\d+)*)".format(r"\d+"), output)
if match:
if log:
log.debug("Found: {}".format(python_executable))
# only return major.minor version
return ".".join(str(match.group(1)).split(".")[:2])
return None
class NonStrictAttrs(object):
@classmethod

View File

@@ -69,7 +69,7 @@ def or_(*converters, **kwargs):
return wrapper
def strtobool (val):
def strtobool(val):
"""Convert a string representation of truth to true (1) or false (0).
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values

View File

@@ -29,9 +29,12 @@ class PackageManager(object):
_config_cache_max_entries = 'agent.venvs_cache.max_entries'
_config_cache_free_space_threshold = 'agent.venvs_cache.free_space_threshold_gb'
_config_cache_lock_timeout = 'agent.venvs_cache.lock_timeout'
_config_pip_legacy_resolver = 'agent.package_manager.pip_legacy_resolver'
def __init__(self):
self._cache_manager = None
self._existing_packages = []
self._base_install_flags = []
@abc.abstractproperty
def bin(self):
@@ -79,6 +82,23 @@ class PackageManager(object):
# type: (Iterable[Text]) -> None
pass
def add_extra_install_flags(self, extra_flags): # type: (List[str]) -> None
if extra_flags:
extra_flags = [
e for e in extra_flags if e not in list(self._base_install_flags)
]
self._base_install_flags = list(self._base_install_flags) + list(extra_flags)
def remove_extra_install_flags(self, extra_flags): # type: (List[str]) -> bool
if extra_flags:
_base_install_flags = [
e for e in self._base_install_flags if e not in list(extra_flags)
]
if self._base_install_flags != _base_install_flags:
self._base_install_flags = _base_install_flags
return True
return False
def upgrade_pip(self):
result = self._install(
*select_for_platform(
@@ -87,19 +107,58 @@ class PackageManager(object):
),
"--upgrade"
)
packages = self.run_with_env(('list',), output=True).splitlines()
# p.split is ('pip', 'x.y.z')
pip = [p.split() for p in packages if len(p.split()) == 2 and p.split()[0] == 'pip']
if pip:
# noinspection PyBroadException
packages = (self.freeze(freeze_full_environment=True) or dict()).get("pip")
if packages:
from clearml_agent.helper.package.requirements import RequirementsManager
from .requirements import MarkerRequirement, SimpleVersion
# store existing packages so that we can check if we can skip preinstalled packages
# we will only check "@ file" "@ vcs" for exact match
self._existing_packages = RequirementsManager.parse_requirements_section_to_marker_requirements(
packages, skip_local_file_validation=True)
try:
from .requirements import MarkerRequirement
pip = pip[0][1].split('.')
MarkerRequirement.pip_new_version = bool(int(pip[0]) >= 20)
except Exception:
pass
pip_pkg = next(p for p in self._existing_packages if p.name == "pip")
except StopIteration:
pip_pkg = None
# check if we need to list the pip version as well
if pip_pkg:
MarkerRequirement.pip_new_version = SimpleVersion.compare_versions(pip_pkg.version, ">=", "20")
# add --use-deprecated=legacy-resolver to pip install to avoid mismatched packages issues
self._add_legacy_resolver_flag(pip_pkg.version)
return result
def _add_legacy_resolver_flag(self, pip_pkg_version):
if not self.session.config.get(self._config_pip_legacy_resolver, None):
return
from .requirements import SimpleVersion
match_versions = self.session.config.get(self._config_pip_legacy_resolver)
matched = False
for rule in match_versions:
matched = False
# make sure we match all the parts of the rule
for a_version in rule.split(","):
o, v = SimpleVersion.split_op_version(a_version.strip())
matched = SimpleVersion.compare_versions(pip_pkg_version, o, v)
if not matched:
break
# if the rule is fully matched we have a match
if matched:
break
legacy_resolver_flags = ["--use-deprecated=legacy-resolver"]
if matched:
print("INFO: Using legacy resolver for PIP to avoid inconsistency with package versions!")
self.add_extra_install_flags(legacy_resolver_flags)
elif self.remove_extra_install_flags(legacy_resolver_flags):
print("INFO: removing pip legacy resolver!")
def get_python_command(self, extra=()):
# type: (...) -> Executable
return Argv(self.bin, *extra)
@@ -149,6 +208,18 @@ class PackageManager(object):
return False
except Exception:
return False
try:
from .requirements import Requirement, MarkerRequirement
req = MarkerRequirement(Requirement.parse(package_name))
# if pip was part of the requirements, make sure we update the flags
# add --use-deprecated=legacy-resolver to pip install to avoid mismatched packages issues
if req.name == "pip" and req.version:
PackageManager._selected_manager._add_legacy_resolver_flag(req.version)
except Exception as e:
print("WARNING: Error while parsing pip version legacy [{}]".format(e))
return True
@classmethod
@@ -219,6 +290,8 @@ class PackageManager(object):
if not self._get_cache_manager():
return
print('Adding venv into cache: {}'.format(source_folder))
try:
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
return self._get_cache_manager().add_entry(

View File

@@ -208,7 +208,6 @@ class CondaAPI(PackageManager):
except Exception as ex:
print("WARNING: Failed using base conda environment, reverting to new environment: {}".format(ex))
command = Argv(
self.conda,
"create",
@@ -273,7 +272,7 @@ class CondaAPI(PackageManager):
Conda seems to load "vcruntime140.dll" from all its environment on startup.
This means environment have to be deleted using 'conda env remove'.
If necessary, conda can be fooled into deleting a partially-deleted environment by creating an empty file
in '<ENV>\conda-meta\history' (value found in 'conda.gateways.disk.test.PREFIX_MAGIC_FILE').
in '<ENV>\\conda-meta\\history' (value found in 'conda.gateways.disk.test.PREFIX_MAGIC_FILE').
Otherwise, it complains that said directory is not a conda environment.
See: https://github.com/conda/conda/issues/7682
@@ -801,6 +800,25 @@ class CondaAPI(PackageManager):
return conda_env
return base_conda_env
def add_cached_venv(self, *args, **kwargs):
"""
Copy the local venv folder into the venv cache (keys are based on the requirements+python+docker).
"""
# do not cache if this is a base conda environment
if self.conda_env_as_base_docker or self.use_conda_base_env:
return
return super().add_cached_venv(*args, **kwargs)
def get_cached_venv(self, *args, **kwargs):
"""
Copy a cached copy of the venv (based on the requirements) into destination_folder.
Return None if failed or cached entry does not exist
"""
# do not cache if this is a base conda environment
if self.conda_env_as_base_docker or self.use_conda_base_env:
return
return super().get_cached_venv(*args, **kwargs)
# enable hashing with cmp=False because pdb fails on un-hashable exceptions
exception = attrs(str=True, cmp=False)

View File

@@ -97,7 +97,7 @@ class SystemPip(PackageManager):
return Argv(self.bin, '-m', 'pip', '--disable-pip-version-check', *command)
def install_flags(self):
indices_args = tuple(
base_args = tuple(self._base_install_flags or []) + tuple(
chain.from_iterable(('--extra-index-url', x) for x in PIP_EXTRA_INDICES)
)
@@ -105,7 +105,7 @@ class SystemPip(PackageManager):
ENV_PIP_EXTRA_INSTALL_FLAGS.get() or \
self.session.config.get("agent.package_manager.extra_pip_install_flags", None)
return (indices_args + tuple(extra_pip_flags)) if extra_pip_flags else indices_args
return (base_args + tuple(extra_pip_flags)) if extra_pip_flags else base_args
def download_flags(self):
indices_args = tuple(

View File

@@ -37,7 +37,9 @@ class VirtualenvPip(SystemPip, PackageManager):
def load_requirements(self, requirements):
if isinstance(requirements, dict) and requirements.get("pip"):
requirements["pip"] = self.requirements_manager.replace(requirements["pip"])
requirements["pip"] = self.requirements_manager.replace(
requirements["pip"], existing_packages=self._existing_packages
)
super(VirtualenvPip, self).load_requirements(requirements)
self.requirements_manager.post_install(self.session, package_manager=self)
@@ -64,9 +66,18 @@ class VirtualenvPip(SystemPip, PackageManager):
Only valid if instantiated with path.
Use self.python as self.bin does not exist.
"""
self.session.command(
self.python, "-m", "virtualenv", self.path, *self.create_flags()
).check_call()
# noinspection PyBroadException
try:
self.session.command(
self.python, "-m", "virtualenv", self.path, *self.create_flags()
).check_call()
except Exception as ex:
# let's try with std library instead
print("WARNING: virtualenv call failed: {}\n INFO: Creating virtual environment with venv".format(ex))
self.session.command(
self.python, "-m", "venv", self.path, *self.create_flags()
).check_call()
return self
def remove(self):

View File

@@ -53,12 +53,18 @@ class PriorityPackageRequirement(SimpleSubstitution):
if not self._replaced_packages:
return list_of_requirements
# we assume that both pip & setup tools are not in list_of_requirements, and we need to add them
if "pip" in self._replaced_packages:
full_freeze = PackageManager.out_of_scope_freeze(freeze_full_environment=True)
# now let's look for pip
pips = [line for line in full_freeze.get("pip", []) if line.split("==")[0] == "pip"]
if pips and "pip" in list_of_requirements:
list_of_requirements["pip"] = [pips[0]] + list_of_requirements["pip"]
if not full_freeze:
if "pip" in list_of_requirements:
list_of_requirements["pip"] = [self._replaced_packages["pip"]] + list_of_requirements["pip"]
else:
# now let's look for pip
pips = [line for line in full_freeze.get("pip", []) if str(line.split("==")[0]).strip() == "pip"]
if pips and "pip" in list_of_requirements:
list_of_requirements["pip"] = [pips[0]] + list_of_requirements["pip"]
if "setuptools" in self._replaced_packages:
try:
@@ -87,6 +93,20 @@ class PriorityPackageRequirement(SimpleSubstitution):
return list_of_requirements
class CachedPackageRequirement(PriorityPackageRequirement):
name = ("setuptools", "pip", )
optional_package_names = tuple()
def replace(self, req):
"""
Put the requirement in the list for later conversion
:raises: ValueError if version is pre-release
"""
self._replaced_packages[req.name] = req.line
return Text(req)
class PackageCollectorRequirement(SimpleSubstitution):
"""
This RequirementSubstitution class will allow you to have multiple instances of the same

View File

@@ -19,7 +19,7 @@ import logging
from clearml_agent.definitions import PIP_EXTRA_INDICES
from clearml_agent.helper.base import (
warning, is_conda, which, join_lines, is_windows_platform,
convert_cuda_version_to_int_10_base_str, )
convert_cuda_version_to_int_10_base_str, dump_yaml, )
from clearml_agent.helper.process import Argv, PathLike
from clearml_agent.helper.gpu.gpustat import get_driver_cuda_version
from clearml_agent.session import Session, normalize_cuda_version
@@ -94,6 +94,12 @@ class MarkerRequirement(object):
def __repr__(self):
return '{self.__class__.__name__}[{self}]'.format(self=self)
def __eq__(self, other):
return isinstance(other, MarkerRequirement) and str(self) == str(other)
def __hash__(self):
return str(self).__hash__()
def format_specs(self, num_parts=None, max_num_parts=None):
max_num_parts = max_num_parts or num_parts
if max_num_parts is None or not self.specs:
@@ -116,6 +122,10 @@ class MarkerRequirement(object):
def specs(self): # type: () -> List[Tuple[Text, Text]]
return self.req.specs
@property
def version(self): # type: () -> Text
return self.specs[0][1] if self.specs else ""
@specs.setter
def specs(self, value): # type: (List[Tuple[Text, Text]]) -> None
self.req.specs = value
@@ -143,6 +153,8 @@ class MarkerRequirement(object):
If the requested version is 1.2 the self.spec should be 1.2*
etc.
usage: it returns the value of the following comparison: requested_version "op" self.version
:param str requested_version:
:param str op: '==', '>', '>=', '<=', '<', '~='
:param int num_parts: number of parts to compare
@@ -152,7 +164,7 @@ class MarkerRequirement(object):
if not self.specs:
return True
version = self.specs[0][1]
version = self.version
op = (op or self.specs[0][0]).strip()
return SimpleVersion.compare_versions(
@@ -170,11 +182,21 @@ class MarkerRequirement(object):
self.req.local_file = False
return True
def validate_local_file_ref(self):
def is_local_package_ref(self):
# if local file does not exist, remove the reference to it
if self.vcs or self.editable or self.path or not self.local_file or not self.name or \
not self.uri or not self.uri.startswith("file://"):
return False
return True
def is_vcs_ref(self):
return bool(self.vcs)
def validate_local_file_ref(self):
# if local file does not exist, remove the reference to it
if not self.is_local_package_ref():
return
local_path = Path(self.uri[len("file://"):])
if not local_path.exists():
local_path = Path(unquote(self.uri)[len("file://"):])
@@ -221,6 +243,19 @@ class SimpleVersion:
_local_version_separators = re.compile(r"[\._-]")
_regex = re.compile(r"^\s*" + VERSION_PATTERN + r"\s*$", re.VERBOSE | re.IGNORECASE)
@classmethod
def split_op_version(cls, line):
"""
Split a string in the form of ">=1.2.3" into a (op, version), i.e. (">=", "1.2.3")
Notice is calling with only a version string (e.g. "1.2.3") default operator is "=="
which means you get ("==", "1.2.3")
:param line: string examples: "<=0.1.2"
:return: tuple of (op, version) example ("<=", "0.1.2")
"""
match = r"\s*([>=<~!]*)\s*(\S*)\s*"
groups = re.match(match, line).groups()
return groups[0] or "==", groups[1]
@classmethod
def compare_versions(cls, version_a, op, version_b, ignore_sub_versions=True, num_parts=3):
"""
@@ -624,14 +659,54 @@ class RequirementsManager(object):
return handler.replace(req)
return None
def replace(self, requirements): # type: (Text) -> Text
def replace(
self,
requirements, # type: Text
existing_packages=None, # type: List[MarkerRequirement]
pkg_skip_existing_local=True, # type: bool
pkg_skip_existing_vcs=True, # type: bool
pkg_skip_existing=True, # type: bool
): # type: (...) -> Text
parsed_requirements = self.parse_requirements_section_to_marker_requirements(
requirements=requirements, cwd=self._cwd)
requirements=requirements, cwd=self._cwd, skip_local_file_validation=True)
if parsed_requirements and existing_packages:
skipped_packages = None
if pkg_skip_existing:
skipped_packages = set(parsed_requirements) & set(existing_packages)
elif pkg_skip_existing_local or pkg_skip_existing_vcs:
existing_packages = [
p for p in existing_packages if (
(pkg_skip_existing_local and p.is_local_package_ref()) or
(pkg_skip_existing_vcs and p.is_vcs_ref())
)
]
skipped_packages = set(parsed_requirements) & set(existing_packages)
if skipped_packages:
# maintain order
num_skipped_packages = len(parsed_requirements)
parsed_requirements = [p for p in parsed_requirements if p not in skipped_packages]
num_skipped_packages -= len(parsed_requirements)
print("Skipping {} pre-installed packages:\n{}Remaining {} additional packages to install".format(
num_skipped_packages,
dump_yaml(sorted([str(p) for p in skipped_packages])),
len(parsed_requirements)
))
# nothing to install!
if not parsed_requirements:
return ""
# sanity check
if not parsed_requirements:
# return the original requirements just in case
return requirements
# remove local file reference that do not exist
for p in parsed_requirements:
p.validate_local_file_ref()
def replace_one(i, req):
# type: (int, MarkerRequirement) -> Optional[Text]
try:
@@ -805,7 +880,7 @@ class RequirementsManager(object):
normalize_cuda_version(cudnn_version or 0))
@staticmethod
def parse_requirements_section_to_marker_requirements(requirements, cwd=None):
def parse_requirements_section_to_marker_requirements(requirements, cwd=None, skip_local_file_validation=False):
def safe_parse(req_str):
# noinspection PyBroadException
try:
@@ -815,7 +890,8 @@ class RequirementsManager(object):
def create_req(x):
r = MarkerRequirement(x)
r.validate_local_file_ref()
if not skip_local_file_validation:
r.validate_local_file_ref()
return r
if not requirements:

View File

@@ -322,6 +322,8 @@ class VCS(object):
return
# rewrite ssh URLs only if either ssh port or ssh user are forced in config
# TODO: fix, when url is in the form of `git@domain.com:user/project.git` we will fail to get scheme
# need to add ssh:// and replace first ":" with / , unless port is specified
if parsed_url.scheme == "ssh" and (
self.session.config.get('agent.force_git_ssh_port', None) or
self.session.config.get('agent.force_git_ssh_user', None)
@@ -595,7 +597,8 @@ class Git(VCS):
)
def pull(self):
self.call("fetch", "--all", "--recurse-submodules", cwd=self.location)
self._set_ssh_url()
self.call("fetch", "--all", "--tags", "--recurse-submodules", cwd=self.location)
def _git_pass_auth_wrapper(self, func, *args, **kwargs):
try:
@@ -777,7 +780,22 @@ def clone_repository_cached(session, execution, destination):
# We clone the entire repository, not a specific branch
vcs.clone() # branch=execution.branch)
vcs.pull()
print("pulling git")
try:
vcs.pull()
except Exception as ex:
print("git pull failed: {}".format(ex))
if (
session.config.get("agent.vcs_cache.enabled", False) and
session.config.get("agent.vcs_cache.clone_on_pull_fail", False)
):
print("pulling git failed, re-cloning: {}".format(no_password_url))
rm_tree(cached_repo_path)
vcs.clone()
else:
raise ex
print("pulling git completed")
rm_tree(destination)
shutil.copytree(Text(cached_repo_path), Text(clone_folder),
symlinks=select_for_platform(linux=True, windows=False),
@@ -918,7 +936,7 @@ def _locate_future_import(lines):
def patch_add_task_init_call(local_filename):
if not local_filename or not Path(local_filename).is_file():
if not local_filename or not Path(local_filename).is_file() or not str(local_filename).lower().endswith(".py"):
return
idx_a = 0

View File

@@ -401,6 +401,7 @@ class ResourceMonitor(object):
fractions = self._fractions_handler.fractions
stats["gpu_fraction_{}".format(report_index)] = \
(fractions[i] if i < len(fractions) else fractions[-1]) if fractions else 1.0
report_index += 1
except Exception as ex:
# something happened and we can't use gpu stats,
@@ -438,6 +439,7 @@ class ResourceMonitor(object):
class GpuFractionsHandler:
_number_re = re.compile(r"^clear\.ml/fraction(-\d+)?$")
_mig_re = re.compile(r"^nvidia\.com/mig-(?P<compute>[0-9]+)g\.(?P<memory>[0-9]+)gb$")
_frac_gpu_injector_re = re.compile(r"^clearml-injector/fraction$")
_gpu_name_to_memory_gb = {
"A30": 24,
@@ -514,10 +516,14 @@ class GpuFractionsHandler:
return 0
@classmethod
def encode_fractions(cls, limits: dict) -> str:
if any(cls._number_re.match(x) for x in (limits or {})):
return ",".join(str(v) for k, v in sorted(limits.items()) if cls._number_re.match(k))
return ",".join(("{}:{}".format(k, v) for k, v in (limits or {}).items() if cls._mig_re.match(k)))
def encode_fractions(cls, limits: dict, annotations: dict) -> str:
if limits:
if any(cls._number_re.match(x) for x in (limits or {})):
return ",".join(str(v) for k, v in sorted(limits.items()) if cls._number_re.match(k))
return ",".join(("{}:{}".format(k, v) for k, v in (limits or {}).items() if cls._mig_re.match(k)))
elif annotations:
if any(cls._frac_gpu_injector_re.match(x) for x in (annotations or {})):
return ",".join(str(v) for k, v in sorted(annotations.items()) if cls._frac_gpu_injector_re.match(k))
@staticmethod
def decode_fractions(fractions: str) -> Union[List[float], Dict[str, int]]:

View File

@@ -44,6 +44,11 @@ WORKER_ARGS = {
}
DAEMON_ARGS = dict({
'--polling-interval': {
'help': 'Polling interval in seconds. Minimum is 5 (default 5)',
'type': int,
'default': 5,
},
'--foreground': {
'help': 'Pipe full log to stdout/stderr, should not be used if running in background',
'action': 'store_true',
@@ -62,7 +67,10 @@ DAEMON_ARGS = dict({
'group': 'Docker support',
},
'--queue': {
'help': 'Queue ID(s)/Name(s) to pull tasks from (\'default\' queue)',
'help': 'Queue ID(s)/Name(s) to pull tasks from (\'default\' queue).'
' Note that the queue list order determines priority, with the first listed queue having the'
' highest priority. To change this behavior, use --order-fairness to pull from each queue in a'
' round-robin order',
'nargs': '+',
'default': tuple(),
'dest': 'queues',
@@ -107,8 +115,11 @@ DAEMON_ARGS = dict({
'--dynamic-gpus': {
'help': 'Allow to dynamically allocate gpus based on queue properties, '
'configure with \'--queue <queue_name>=<num_gpus>\'.'
' Example: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 single_gpu=1\''
' Example Opportunistic: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 max_quad_gpus=1-4 \'',
' Example: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 single_gpu=1\'.'
' Example Opportunistic: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 max_quad_gpus=1-4\'.'
' Note that the queue list order determines priority, with the first listed queue having the'
' highest priority. To change this behavior, use --order-fairness to pull from each queue in a'
' round-robin order',
'action': 'store_true',
},
'--uptime': {

View File

@@ -1 +1 @@
__version__ = '1.8.0'
__version__ = '1.9.2'

View File

@@ -1,4 +1,4 @@
FROM ubuntu:18.04
FROM ubuntu:22.04
USER root
WORKDIR /root

View File

@@ -4,7 +4,8 @@ curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip
unzip awscliv2.zip
./aws/install
curl -o kubectl https://amazon-eks.s3-us-west-2.amazonaws.com/1.21.2/2021-07-05/bin/linux/amd64/kubectl
curl -o kubectl https://s3.us-west-2.amazonaws.com/amazon-eks/1.29.3/2024-04-19/bin/linux/amd64/kubectl
#curl -o kubectl https://amazon-eks.s3-us-west-2.amazonaws.com/1.21.2/2021-07-05/bin/linux/amd64/kubectl
#curl -o kubectl https://amazon-eks.s3.us-west-2.amazonaws.com/1.19.6/2021-01-05/bin/linux/amd64/kubectl
chmod +x ./kubectl && mkdir -p $HOME/bin && cp ./kubectl $HOME/bin/kubectl && export PATH=$PATH:$HOME/bin

View File

@@ -1,4 +1,4 @@
FROM ubuntu:18.04
FROM ubuntu:22.04
USER root
WORKDIR /root

View File

@@ -1,6 +1,6 @@
#!/bin/bash
curl -LO https://dl.k8s.io/release/v1.21.0/bin/linux/amd64/kubectl
curl -LO https://dl.k8s.io/release/v1.29.3/bin/linux/amd64/kubectl
install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl

View File

@@ -20,7 +20,7 @@ FROM python:${TAG} as target
WORKDIR /app
ARG KUBECTL_VERSION=1.24.0
ARG KUBECTL_VERSION=1.29.3
# Not sure about these ENV vars
# ENV LC_ALL=en_US.UTF-8

View File

@@ -2,7 +2,7 @@ ARG TAG=3.7.17-slim-bullseye
FROM python:${TAG} as target
ARG KUBECTL_VERSION=1.22.4
ARG KUBECTL_VERSION=1.29.3
WORKDIR /app

View File

@@ -1,4 +1,4 @@
FROM ubuntu:18.04
FROM ubuntu:22.04
USER root
WORKDIR /root

View File

@@ -33,4 +33,10 @@ if [ -z "$CLEARML_AGENT_NO_UPDATE" ]; then
fi
fi
clearml-agent daemon $DAEMON_OPTIONS --queue $QUEUES --docker "${CLEARML_AGENT_DEFAULT_BASE_DOCKER:-$TRAINS_AGENT_DEFAULT_BASE_DOCKER}" --cpu-only ${CLEARML_AGENT_EXTRA_ARGS:-$TRAINS_AGENT_EXTRA_ARGS}
DOCKER_ARGS="--docker \"${CLEARML_AGENT_DEFAULT_BASE_DOCKER:-$TRAINS_AGENT_DEFAULT_BASE_DOCKER}\""
if [ -n "$CLEARML_AGENT_NO_DOCKER" ]; then
DOCKER_ARGS=""
fi
clearml-agent daemon $DAEMON_OPTIONS --queue $QUEUES $DOCKER_ARGS --cpu-only ${CLEARML_AGENT_EXTRA_ARGS:-$TRAINS_AGENT_EXTRA_ARGS}

View File

@@ -161,6 +161,9 @@ agent {
vcs_cache: {
enabled: true,
path: ~/.clearml/vcs-cache
# if git pull failed, always revert to re-cloning the repo, it protects against old user name changes
# clone_on_pull_fail: false
},
# DEPRECATED: please use `venvs_cache` and set `venvs_cache.path`
@@ -305,6 +308,7 @@ agent {
# pip_cache: "/root/.cache/pip"
# poetry_cache: "/root/.cache/pypoetry"
# vcs_cache: "/root/.clearml/vcs-cache"
# venvs_cache: "/root/.clearml/venvs-cache"
# venv_build: "~/.clearml/venvs-builds"
# pip_download: "/root/.clearml/pip-download-cache"
# }

View File

@@ -13,65 +13,86 @@ def parse_args():
group = parser.add_mutually_exclusive_group()
parser.add_argument(
"--queue", type=str, help="Queues to pull tasks from. If multiple queues, use comma separated list, e.g. 'queue1,queue2'",
"--queue",
type=str,
help="Queues to pull tasks from. If multiple queues, use comma separated list, e.g. 'queue1,queue2'",
)
group.add_argument(
"--ports-mode", action='store_true', default=False,
"--ports-mode",
action="store_true",
default=False,
help="Ports-Mode will add a label to the pod which can be used as service, in order to expose ports"
"Should not be used with max-pods"
)
parser.add_argument(
"--num-of-services", type=int, default=20,
help="Specify the number of k8s services to be used. Use only with ports-mode."
"--num-of-services",
type=int,
default=20,
help="Specify the number of k8s services to be used. Use only with ports-mode.",
)
parser.add_argument(
"--base-port", type=int,
"--base-port",
type=int,
help="Used in conjunction with ports-mode, specifies the base port exposed by the services. "
"For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num"
"e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003"
)
parser.add_argument(
"--base-pod-num", type=int, default=1,
"--base-pod-num",
type=int,
default=1,
help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the "
"service (default: %(default)s)"
)
parser.add_argument(
"--gateway-address", type=str, default=None,
help="Used in conjunction with ports-mode, specify the external address of the k8s ingress / ELB"
"--gateway-address",
type=str,
default=None,
help="Used in conjunction with ports-mode, specify the external address of the k8s ingress / ELB",
)
parser.add_argument(
"--pod-clearml-conf", type=str,
help="Configuration file to be used by the pod itself (if not provided, current configuration is used)"
"--pod-clearml-conf",
type=str,
help="Configuration file to be used by the pod itself (if not provided, current configuration is used)",
)
parser.add_argument(
"--overrides-yaml", type=str,
help="YAML file containing pod overrides to be used when launching a new pod"
"--overrides-yaml", type=str, help="YAML file containing pod overrides to be used when launching a new pod"
)
parser.add_argument(
"--template-yaml", type=str,
"--template-yaml",
type=str,
help="YAML file containing pod template. If provided pod will be scheduled with kubectl apply "
"and overrides are ignored, otherwise it will be scheduled with kubectl run"
)
parser.add_argument(
"--ssh-server-port", type=int, default=0,
help="If non-zero, every pod will also start an SSH server on the selected port (default: zero, not active)"
"--ssh-server-port",
type=int,
default=0,
help="If non-zero, every pod will also start an SSH server on the selected port (default: zero, not active)",
)
parser.add_argument(
"--namespace", type=str,
help="Specify the namespace in which pods will be created (default: %(default)s)", default="clearml"
"--namespace",
type=str,
help="Specify the namespace in which pods will be created (default: %(default)s)",
default="clearml",
)
group.add_argument(
"--max-pods", type=int,
"--max-pods",
type=int,
help="Limit the maximum number of pods that this service can run at the same time."
"Should not be used with ports-mode"
)
parser.add_argument(
"--use-owner-token", action="store_true", default=False,
help="Generate and use task owner token for the execution of each task"
"--use-owner-token",
action="store_true",
default=False,
help="Generate and use task owner token for the execution of each task",
)
parser.add_argument(
"--create-queue", action="store_true", default=False,
help="Create the queue if it does not exist (default: %(default)s)"
"--create-queue",
action="store_true",
default=False,
help="Create the queue if it does not exist (default: %(default)s)",
)
return parser.parse_args()
@@ -81,23 +102,32 @@ def main():
user_props_cb = None
if args.ports_mode and args.base_port:
def k8s_user_props_cb(pod_number=0):
user_prop = {"k8s-pod-port": args.base_port + pod_number}
if args.gateway_address:
user_prop["k8s-gateway-address"] = args.gateway_address
return user_prop
user_props_cb = k8s_user_props_cb
k8s = K8sIntegration(
ports_mode=args.ports_mode, num_of_services=args.num_of_services, base_pod_num=args.base_pod_num,
user_props_cb=user_props_cb, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf,
template_yaml=args.template_yaml, extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None,
namespace=args.namespace, max_pods_limit=args.max_pods or None,
ports_mode=args.ports_mode,
num_of_services=args.num_of_services,
base_pod_num=args.base_pod_num,
user_props_cb=user_props_cb,
overrides_yaml=args.overrides_yaml,
clearml_conf_file=args.pod_clearml_conf,
template_yaml=args.template_yaml,
extra_bash_init_script=K8sIntegration.get_ssh_server_bash(ssh_port_number=args.ssh_server_port)
if args.ssh_server_port
else None,
namespace=args.namespace,
max_pods_limit=args.max_pods or None,
)
args.queue = [q.strip() for q in args.queue.split(",") if q.strip()]
queue = [q.strip() for q in args.queue.split(",") if q.strip()] if args.queue else None
k8s.k8s_daemon(args.queue, use_owner_token=args.use_owner_token, create_queue=args.create_queue)
k8s.k8s_daemon(queue, use_owner_token=args.use_owner_token, create_queue=args.create_queue)
if __name__ == "__main__":

View File

@@ -63,6 +63,7 @@ setup(
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'License :: OSI Approved :: Apache Software License',
],