Compare commits

...

57 Commits

Author SHA1 Message Date
clearml
4158146420 Version bump to v1.9.3 2025-01-19 16:17:56 +02:00
clearml
b9ef1a55cd Fix dependency on windows 2025-01-19 16:16:54 +02:00
clearml
9fa8d72640 Update github repo link 2025-01-13 18:36:16 +02:00
clearml
e535390815 Add win32file dependency on windows 2025-01-13 18:34:48 +02:00
clearml
91dfa09466 Fix Python 3.13 support 2025-01-05 12:14:24 +02:00
clearml
f110bbf5b4 Remove Python 3.5 support 2025-01-05 12:13:57 +02:00
clearml
070919973b Fix python 3.6 compatibility, no := operator 2025-01-05 12:13:21 +02:00
clearml
47d35ef48f Fix managed python environment inside container (PEP 668) remove usr/lib/python3.*/EXTERNALLY-MANAGED 2024-12-26 18:59:42 +02:00
clearml
54ed234fca Add agent.docker_args_filters to configuration docs 2024-12-26 18:58:58 +02:00
clearml
a26860e79f Fix default value handling in merge_dicts() 2024-12-26 18:58:24 +02:00
clearml
fc1abbab0b Refactor k8s glue 2024-12-26 18:58:00 +02:00
clearml
4fa61dde1f Support ignoring kubectl errors 2024-12-12 23:41:31 +02:00
clearml
26d748a4d8 Support creating queue with tags 2024-12-12 23:40:57 +02:00
clearml
5419fd84ae Add support for Python 3.13 2024-12-12 23:39:11 +02:00
clearml
d8366dedc6 Fix UV priority
Fix UV cache is disabled, UV handles its own cache
Fix UV freeze
Fix make sure we do not use pip cache if poetry/uv is used (even if we reverted to pip we can't know if someone changed the repository and now in a new version, a lock file exists)
2024-12-12 23:38:42 +02:00
mads-oestergaard
cc656e2969 Add support for uv as package manager (#218)
* add uv as a package manager

* update configs

* update worker and defs

* update environ

* Update configs to highlight sync command

* rename to sync_extra_args and set UV_CACHE_DIR
2024-11-27 13:44:55 +02:00
clearml
b65e5fed94 Scan more Python 3 versions 2024-11-17 13:55:51 +02:00
clearml
3273f76b46 Version bump to v1.9.2 2024-10-28 18:33:04 +02:00
clearml
9af0f9fe41 Fix reload method is found in the config object 2024-10-28 18:12:22 +02:00
clearml
205cd47cb9 Fix use req_token_expiration_sec when creating a task session and not the default value 2024-10-28 18:11:42 +02:00
clearml
0ff428bb96 Fix report index not advancing in resource monitoring causes more than one GPU not to be reported 2024-10-28 18:11:00 +02:00
Matteo Destro
bf8d9c96e9 Handle OSError when checking for is_file (#215) 2024-10-13 10:08:03 +03:00
allegroai
a88487ff25 Add support for pip legacy resolver for versions specified in the agent.package_manager.pip_legacy_resolver configuration option
Add skip existing packages
2024-09-22 22:36:06 +03:00
Jake Henning
785e22dc87 Version bump to v1.9.1 2024-09-02 01:04:49 +03:00
Jake Henning
6a2b778d53 Add default pip version support for Python 3.12 2024-09-02 01:03:52 +03:00
allegroai
b2c3702830 Version bump to v1.9.0 2024-08-28 23:18:26 +03:00
allegroai
6302d43990 Add support for skipping container apt installs using CLEARML_AGENT_SKIP_CONTAINER_APT env var in k8s
Add runtime callback support for setting runtime properties per task in k8s
Fix remove task from pending queue and set to failed when kubectl apply fails
2024-08-27 23:01:27 +03:00
allegroai
760bbca74e Fix failed Task in services mode logged "User aborted" instead of failed, add Task reason string 2024-08-27 22:56:37 +03:00
allegroai
e63fd31420 Fix string format 2024-08-27 22:55:49 +03:00
allegroai
2ff9985db7 Add user ID to the vault loading print 2024-08-27 22:55:32 +03:00
allegroai
b8c762401b Fix use same state transition if supported by the server (instead of stopping the task before re-enqueue) 2024-08-27 22:54:45 +03:00
allegroai
99e1e54f94 Add support for tasks containing only bash script or python module command 2024-08-27 22:53:14 +03:00
allegroai
a4d3b5bad6 Fix only set Task started status on node rank 0 2024-08-27 22:52:31 +03:00
allegroai
b21665ed6e Fix do not cache venv cache if venv/python skip env var was set 2024-08-27 22:52:01 +03:00
Surya Teja
f877aa96e2 Update Docker base image to Ubuntu 22.04 and Kubectl to 1.29.3 (#201) 2024-07-29 18:41:50 +03:00
pollfly
f99344d194 Add queue priority info to CLI help (#211)
* add queue priority comment

* Add --order-fairness info

---------

Co-authored-by: Jake Henning <59198928+jkhenning@users.noreply.github.com>
2024-07-29 18:40:38 +03:00
allegroai
d9f2a1999a Fix Only send pip freeze update on RANK 0, only update task status on exit on RANK 0 2024-07-29 17:40:24 +03:00
Valentin Schabschneider
79d0abe707 Add NO_DOCKER flag to clearml-agent-services entrypoint (#206) 2024-07-26 19:09:54 +03:00
allegroai
6213ef4c02 Add /bin/bash -c "command" support. Task binary should be set to /bin/bash and entry_point should be set to -c command 2024-07-24 18:00:13 +03:00
allegroai
aef6aa9fc8 Fix a race condition where in rare conditions popping a Task from a queue that was aborted did not set it to started before the watchdog killed it. Does not happen in k8s/slurm 2024-07-24 17:59:46 +03:00
allegroai
0bb267115b Add venvs_cache.path mount override for non-root containers (use: agent.docker_internal_mounts.venvs_cache) 2024-07-24 17:59:18 +03:00
allegroai
f89a92556f Fix check logger is not None 2024-07-24 17:55:02 +03:00
allegroai
8ba4d75e80 Add CLEARML_TASK_ID and auth token to pod env vars in original entrypoint flow 2024-07-24 17:47:48 +03:00
allegroai
edc333ba5f Add K8S_GLUE_POD_USE_IMAGE_ENTRYPOINT to allow running images without overriding the entrypoint (useful for agents using prebuilt images in k8s) 2024-07-24 17:46:27 +03:00
allegroai
2f0553b873 Fix CLEARML_MULTI_NODE_SINGLE_TASK should be read once not every reported line 2024-07-24 17:45:02 +03:00
allegroai
b2a4bf08ac Fix pass --docker only (i.e. no default container image) for --dynamic-gpus feature 2024-07-24 17:44:35 +03:00
allegroai
f18c6b809f Fix slurm multi-node rank detection 2024-07-24 17:44:05 +03:00
allegroai
cd5b4d2186 Add "-m module args" in script entry now supports standalone script, standalone script is converted to "untitled.py" by default or if specified in working_dir such as <dir>:<target_file> for example ".:standalone.py" 2024-07-24 17:43:21 +03:00
allegroai
5f1bab6711 Add default docker match_rules for enterprise users,
NOTICE: matching_rules are ignored if `--docker container` is passed in command line
2024-07-24 17:42:55 +03:00
allegroai
ab9b9db0c9 Add CLEARML_MULTI_NODE_SINGLE_TASK (values -1, 0, 1, 2) for easier multi-node singe Task workloads 2024-07-24 17:42:25 +03:00
allegroai
93df021108 Add support for .ipynb script entry files (install nbconvert in runtime, copnvert to python and execute the python script), including CLEARML_AGENT_FORCE_TASK_INIT patching of ipynb files (post python conversion) 2024-07-24 17:41:59 +03:00
allegroai
700ae85de0 Fix file mode should be optional in configuration files section 2024-07-24 17:41:06 +03:00
allegroai
f367c5a571 Fix git fetch did not update new tags #209 2024-07-24 17:39:53 +03:00
allegroai
ebc5944b44 Fix setting tasks that someone just marked as aborted to started - only force Task to started after dequeuing it otherwise lease it as is 2024-07-24 17:39:26 +03:00
allegroai
8f41002845 Add task.script.binary /bin/bash support
Fix -m module $env to support parsing the $env before launching
2024-07-24 17:37:26 +03:00
allegroai
7e8670d57f Find the correct python version when using a pre-installed python environment 2024-07-21 14:10:38 +03:00
allegroai
77de343863 Use "venv" module if virtualenv is not supported 2024-07-19 13:18:07 +03:00
41 changed files with 1549 additions and 330 deletions

View File

@@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2019 allegro.ai
Copyright 2025 ClearML Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@@ -1,15 +1,15 @@
<div align="center">
<img src="https://github.com/allegroai/clearml-agent/blob/master/docs/clearml_agent_logo.png?raw=true" width="250px">
<img src="https://github.com/clearml/clearml-agent/blob/master/docs/clearml_agent_logo.png?raw=true" width="250px">
**ClearML Agent - MLOps/LLMOps made easy
MLOps/LLMOps scheduler & orchestration solution supporting Linux, macOS and Windows**
[![GitHub license](https://img.shields.io/github/license/allegroai/clearml-agent.svg)](https://img.shields.io/github/license/allegroai/clearml-agent.svg)
[![GitHub license](https://img.shields.io/github/license/clearml/clearml-agent.svg)](https://img.shields.io/github/license/clearml/clearml-agent.svg)
[![PyPI pyversions](https://img.shields.io/pypi/pyversions/clearml-agent.svg)](https://img.shields.io/pypi/pyversions/clearml-agent.svg)
[![PyPI version shields.io](https://img.shields.io/pypi/v/clearml-agent.svg)](https://img.shields.io/pypi/v/clearml-agent.svg)
[![PyPI Downloads](https://pepy.tech/badge/clearml-agent/month)](https://pypi.org/project/clearml-agent/)
[![Artifact Hub](https://img.shields.io/endpoint?url=https://artifacthub.io/badge/repository/allegroai)](https://artifacthub.io/packages/search?repo=allegroai)
[![Artifact Hub](https://img.shields.io/endpoint?url=https://artifacthub.io/badge/repository/clearml)](https://artifacthub.io/packages/search?repo=clearml)
`🌟 ClearML is open-source - Leave a star to support the project! 🌟`
@@ -33,21 +33,21 @@ It is a zero configuration fire-and-forget execution agent, providing a full ML/
**Full Automation in 5 steps**
1. ClearML Server [self-hosted](https://github.com/allegroai/clearml-server)
1. ClearML Server [self-hosted](https://github.com/clearml/clearml-server)
or [free tier hosting](https://app.clear.ml)
2. `pip install clearml-agent` ([install](#installing-the-clearml-agent) the ClearML Agent on any GPU machine:
on-premises / cloud / ...)
3. Create a [job](https://clear.ml/docs/latest/docs/apps/clearml_task) or
add [ClearML](https://github.com/allegroai/clearml) to your code with just 2 lines of code
add [ClearML](https://github.com/clearml/clearml) to your code with just 2 lines of code
4. Change the [parameters](#using-the-clearml-agent) in the UI & schedule for [execution](#using-the-clearml-agent) (or
automate with an [AutoML pipeline](#automl-and-orchestration-pipelines-))
5. :chart_with_downwards_trend: :chart_with_upwards_trend: :eyes: :beer:
"All the Deep/Machine-Learning DevOps your research needs, and then some... Because ain't nobody got time for that"
**Try ClearML now** [Self Hosted](https://github.com/allegroai/clearml-server)
**Try ClearML now** [Self Hosted](https://github.com/clearml/clearml-server)
or [Free tier Hosting](https://app.clear.ml)
<a href="https://app.clear.ml"><img src="https://github.com/allegroai/clearml-agent/blob/master/docs/screenshots.gif?raw=true" width="100%"></a>
<a href="https://app.clear.ml"><img src="https://github.com/clearml/clearml-agent/blob/master/docs/screenshots.gif?raw=true" width="100%"></a>
### Simple, Flexible Experiment Orchestration
@@ -71,7 +71,7 @@ or [Free tier Hosting](https://app.clear.ml)
We think Kubernetes is awesome, but it is not a must to get started with remote execution agents and cluster management.
We designed `clearml-agent` so you can run both bare-metal and on top of Kubernetes, in any combination that fits your environment.
You can find the Dockerfiles in the [docker folder](./docker) and the helm Chart in https://github.com/allegroai/clearml-helm-charts
You can find the Dockerfiles in the [docker folder](./docker) and the helm Chart in https://github.com/clearml/clearml-helm-charts
#### Benefits of integrating existing Kubernetes cluster with ClearML
@@ -86,8 +86,8 @@ You can find the Dockerfiles in the [docker folder](./docker) and the helm Chart
- **Enterprise Features**: RBAC, vault, multi-tenancy, scheduler, quota management, fractional GPU support
**Run the agent in Kubernetes Glue mode an map ClearML jobs directly to K8s jobs:**
- Use the [ClearML Agent Helm Chart](https://github.com/allegroai/clearml-helm-charts/tree/main/charts/clearml-agent) to spin an agent pod acting as a controller
- Or run the [clearml-k8s glue](https://github.com/allegroai/clearml-agent/blob/master/examples/k8s_glue_example.py) on
- Use the [ClearML Agent Helm Chart](https://github.com/clearml/clearml-helm-charts/tree/main/charts/clearml-agent) to spin an agent pod acting as a controller
- Or run the [clearml-k8s glue](https://github.com/clearml/clearml-agent/blob/master/examples/k8s_glue_example.py) on
a Kubernetes cpu node
- The clearml-k8s glue pulls jobs from the ClearML job execution queue and prepares a Kubernetes job (based on provided
yaml template)
@@ -151,7 +151,7 @@ The ClearML Agent executes experiments using the following process:
#### System Design & Flow
<img src="https://github.com/allegroai/clearml-agent/blob/master/docs/clearml_architecture.png" width="100%" alt="clearml-architecture">
<img src="https://github.com/clearml/clearml-agent/blob/master/docs/clearml_architecture.png" width="100%" alt="clearml-architecture">
#### Installing the ClearML Agent
@@ -279,7 +279,7 @@ clearml-agent daemon --detached --gpus 0 --queue default --docker nvidia/cuda:11
### How do I create an experiment on the ClearML Server? <a name="from-scratch"></a>
* Integrate [ClearML](https://github.com/allegroai/clearml) with your code
* Integrate [ClearML](https://github.com/clearml/clearml) with your code
* Execute the code on your machine (Manually / PyCharm / Jupyter Notebook)
* As your code is running, **ClearML** creates an experiment logging all the necessary execution information:
- Git repository link and commit ID (or an entire jupyter notebook)
@@ -326,21 +326,21 @@ The ClearML Agent can also be used to implement AutoML orchestration and Experim
ClearML package.
Sample AutoML & Orchestration examples can be found in the
ClearML [example/automation](https://github.com/allegroai/clearml/tree/master/examples/automation) folder.
ClearML [example/automation](https://github.com/clearml/clearml/tree/master/examples/automation) folder.
AutoML examples:
- [Toy Keras training experiment](https://github.com/allegroai/clearml/blob/master/examples/optimization/hyper-parameter-optimization/base_template_keras_simple.py)
- [Toy Keras training experiment](https://github.com/clearml/clearml/blob/master/examples/optimization/hyper-parameter-optimization/base_template_keras_simple.py)
- In order to create an experiment-template in the system, this code must be executed once manually
- [Random Search over the above Keras experiment-template](https://github.com/allegroai/clearml/blob/master/examples/automation/manual_random_param_search_example.py)
- [Random Search over the above Keras experiment-template](https://github.com/clearml/clearml/blob/master/examples/automation/manual_random_param_search_example.py)
- This example will create multiple copies of the Keras experiment-template, with different hyperparameter
combinations
Experiment Pipeline examples:
- [First step experiment](https://github.com/allegroai/clearml/blob/master/examples/automation/task_piping_example.py)
- [First step experiment](https://github.com/clearml/clearml/blob/master/examples/automation/task_piping_example.py)
- This example will "process data", and once done, will launch a copy of the 'second step' experiment-template
- [Second step experiment](https://github.com/allegroai/clearml/blob/master/examples/automation/toy_base_task.py)
- [Second step experiment](https://github.com/clearml/clearml/blob/master/examples/automation/toy_base_task.py)
- In order to create an experiment-template in the system, this code must be executed once manually
### License

View File

@@ -54,22 +54,26 @@
# docker_use_activated_venv: true
# select python package manager:
# currently supported: pip, conda and poetry
# currently supported: pip, conda, uv and poetry
# if "pip" or "conda" are used, the agent installs the required packages
# based on the "installed packages" section of the Task. If the "installed packages" is empty,
# it will revert to using `requirements.txt` from the repository's root directory.
# If Poetry is selected and the root repository contains `poetry.lock` or `pyproject.toml`,
# the "installed packages" section is ignored, and poetry is used.
# If Poetry is selected and no lock file is found, it reverts to "pip" package manager behaviour.
# If uv is selected and the root repository contains `uv.lock` or `pyproject.toml`,
# the "installed packages" section is ignored, and uv is used.
package_manager: {
# supported options: pip, conda, poetry
# supported options: pip, conda, poetry, uv
type: pip,
# specify pip version to use (examples "<20.2", "==19.3.1", "", empty string will install the latest version)
pip_version: ["<20.2 ; python_version < '3.10'", "<22.3 ; python_version >= '3.10'"],
pip_version: ["<20.2 ; python_version < '3.10'", "<22.3 ; python_version >= '3.10' and python_version <= '3.11'", ">=23,<24.3 ; python_version >= '3.12'"]
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
# poetry_version: "<2",
# poetry_install_extra_args: ["-v"]
# uv_version: ">0.4",
# uv_sync_extra_args: ["--all-extras"]
# virtual environment inherits packages from system
system_site_packages: false,
@@ -80,6 +84,14 @@
# additional artifact repositories to use when installing python packages
# extra_index_url: ["https://allegroai.jfrog.io/clearml/api/pypi/public/simple"]
# turn on the "--use-deprecated=legacy-resolver" flag for pip, to avoid package dependency version mismatch
# is any version restrictions are matched we add the "--use-deprecated=legacy-resolver" flag
# example: pip_legacy_resolver = [">=20.3,<24.3", ">99"]
# if pip==20.2 or pip==29.0 is installed we do nothing,
# if pip==21.1 or pip==101.1 is installed the flag is added
# disable the feature by passing an empty list
pip_legacy_resolver = [">=20.3,<24.3"]
# control the pytorch wheel resolving algorithm, options are: "pip", "direct", "none"
# Override with environment variable CLEARML_AGENT_PACKAGE_PYTORCH_RESOLVE
# "pip" (default): would automatically detect the cuda version, and supply pip with the correct
@@ -125,6 +137,10 @@
# if set to true, the agent will look for the "poetry.lock" file
# in the passed current working directory instead of the repository's root directory.
poetry_files_from_repo_working_dir: false
# if set to true, the agent will look for the "uv.lock" file
# in the passed current working directory instead of the repository's root directory.
uv_files_from_repo_working_dir: false
},
# target folder for virtual environments builds, created when executing experiment
@@ -184,6 +200,12 @@
# allows the following task docker args to be overridden by the extra_docker_arguments
# protected_docker_extra_args: ["privileged", "security-opt", "network", "ipc"]
# Enforce filter whitelist on docker arguments, allowing only those matching these filters to be used when running
# a task. These can also be provided using the CLEARML_AGENT_DOCKER_ARGS_FILTERS environment variable
# (using shlex.split whitespace-separated format).
# For example, allow only environment variables:
# docker_args_filters: ["^--env$", "^-e$"]
# optional shell script to run in docker when started before the experiment is started
# extra_docker_shell_script: ["apt-get install -y bindfs", ]
@@ -218,6 +240,76 @@
# optional arguments to pass to docker image
# arguments: ["--ipc=host", ]
# Choose the default docker based on the Task properties,
# Notice: Enterprise feature, ignored otherwise
# Examples: 'script.requirements', 'script.binary', 'script.repository', 'script.branch', 'project'
# Notice: Matching is done via regular expression, for example "^searchme$" will match exactly "searchme" string
"match_rules": [
{
"image": "python:3.6-bullseye",
"arguments": "--ipc=host",
"match": {
"script": {
"binary": "python3.6$",
},
}
},
{
"image": "python:3.7-bullseye",
"arguments": "--ipc=host",
"match": {
"script": {
"binary": "python3.7$",
},
}
},
{
"image": "python:3.8-bullseye",
"arguments": "--ipc=host",
"match": {
"script": {
"binary": "python3.8$",
},
}
},
{
"image": "python:3.9-bullseye",
"arguments": "--ipc=host",
"match": {
"script": {
"binary": "python3.9$",
},
}
},
{
"image": "python:3.10-bullseye",
"arguments": "--ipc=host",
"match": {
"script": {
"binary": "python3.10$",
},
}
},
{
"image": "python:3.11-bullseye",
"arguments": "--ipc=host",
"match": {
"script": {
"binary": "python3.11$",
},
}
},
{
"image": "python:3.12-bullseye",
"arguments": "--ipc=host",
"match": {
"script": {
"binary": "python3.12$",
},
}
},
]
}
# set the OS environments based on the Task's Environment section before launching the Task process.
@@ -289,6 +381,7 @@
pip_cache: "/root/.cache/pip"
poetry_cache: "/root/.cache/pypoetry"
vcs_cache: "/root/.clearml/vcs-cache"
venvs_cache: "/root/.clearml/venvs-cache"
venv_build: "~/.clearml/venvs-builds"
pip_download: "/root/.clearml/pip-download-cache"
}

View File

@@ -22,6 +22,9 @@ ENV_INITIAL_CONNECT_RETRY_OVERRIDE = EnvEntry(
'CLEARML_AGENT_INITIAL_CONNECT_RETRY_OVERRIDE', default=True, converter=safe_text_to_bool
)
ENV_FORCE_MAX_API_VERSION = EnvEntry("CLEARML_AGENT_FORCE_MAX_API_VERSION", type=str)
# values are 0/None (task per node), 1/2 (multi-node reporting, colored console), -1 (only report rank 0 node)
ENV_MULTI_NODE_SINGLE_TASK = EnvEntry("CLEARML_MULTI_NODE_SINGLE_TASK", type=int, default=None)
"""
Experimental option to set the request method for all API requests and auth login.

View File

@@ -134,7 +134,7 @@ class BaseField(object):
def _validate_name(self):
if self.name is None:
return
if not re.match('^[A-Za-z_](([\w\-]*)?\w+)?$', self.name):
if not re.match(r'^[A-Za-z_](([\w\-]*)?\w+)?$', self.name):
raise ValueError('Wrong name', self.name)
def structue_name(self, default):

View File

@@ -19,8 +19,19 @@ class Request(ApiModel):
_method = ENV_API_DEFAULT_REQ_METHOD.get(default="get")
def __init__(self, **kwargs):
if kwargs:
allow_extra_fields = kwargs.pop("_allow_extra_fields_", False)
if not allow_extra_fields and kwargs:
raise ValueError('Unsupported keyword arguments: %s' % ', '.join(kwargs.keys()))
elif allow_extra_fields and kwargs:
self._extra_fields = kwargs
else:
self._extra_fields = {}
def to_dict(self, *args, **kwargs):
res = super(Request, self).to_dict(*args, **kwargs)
if self._extra_fields:
res.update(self._extra_fields)
return res
@six.add_metaclass(abc.ABCMeta)

View File

@@ -64,6 +64,8 @@ class Session(TokenManager):
default_key = "EGRTCO8JMSIGI6S39GTP43NFWXDQOW"
default_secret = "x!XTov_G-#vspE*Y(h$Anm&DIc5Ou-F)jsl$PdOyj5wG1&E!Z8"
force_max_api_version = ENV_FORCE_MAX_API_VERSION.get()
server_version = "1.0.0"
user_id = None
# TODO: add requests.codes.gateway_timeout once we support async commits
_retry_codes = [
@@ -191,6 +193,8 @@ class Session(TokenManager):
Session.api_version = str(api_version)
Session.feature_set = str(token_dict.get('feature_set', self.feature_set) or "basic")
Session.server_version = token_dict.get('server_version', self.server_version)
Session.user_id = (token_dict.get("identity") or {}).get("user") or ""
except (jwt.DecodeError, ValueError):
pass
@@ -256,8 +260,9 @@ class Session(TokenManager):
def parse(vault):
# noinspection PyBroadException
try:
print("Loaded {} vault: {}".format(
print("Loaded {} vault{}: {}".format(
vault.get("scope", ""),
"" if not self.user_id else " for user {}".format(self.user_id),
(vault.get("description", None) or "")[:50] or vault.get("id", ""))
)
d = vault.get("data", None)
@@ -341,11 +346,12 @@ class Session(TokenManager):
if self._propagate_exceptions_on_send:
raise
sleep_time = sys_random.uniform(*self._request_exception_retry_timeout)
self._logger.error(
"{} exception sending {} {}: {} (retrying in {:.1f}sec)".format(
type(ex).__name__, method.upper(), url, str(ex), sleep_time
if self._logger:
self._logger.error(
"{} exception sending {} {}: {} (retrying in {:.1f}sec)".format(
type(ex).__name__, method.upper(), url, str(ex), sleep_time
)
)
)
time.sleep(sleep_time)
continue
@@ -364,11 +370,12 @@ class Session(TokenManager):
res.status_code == requests.codes.service_unavailable
and self.config.get("api.http.wait_on_maintenance_forever", True)
):
self._logger.warning(
"Service unavailable: {} is undergoing maintenance, retrying...".format(
host
if self._logger:
self._logger.warning(
"Service unavailable: {} is undergoing maintenance, retrying...".format(
host
)
)
)
continue
break
self._session_requests += 1
@@ -649,11 +656,14 @@ class Session(TokenManager):
"""
Return True if Session.api_version is greater or equal >= to min_api_version
"""
def version_tuple(v):
v = tuple(map(int, (v.split("."))))
return v + (0,) * max(0, 3 - len(v))
return version_tuple(cls.api_version) >= version_tuple(str(min_api_version))
@classmethod
def check_min_server_version(cls, min_server_version):
"""
Return True if Session.server_version is greater or equal >= to min_server_version
"""
return version_tuple(cls.server_version) >= version_tuple(str(min_server_version))
def _do_refresh_token(self, current_token, exp=None):
""" TokenManager abstract method implementation.
Here we ignore the old token and simply obtain a new token.
@@ -731,3 +741,8 @@ class Session(TokenManager):
def propagate_exceptions_on_send(self, value):
# type: (bool) -> None
self._propagate_exceptions_on_send = value
def version_tuple(v):
v = tuple(map(int, (v.split("."))))
return v + (0,) * max(0, 3 - len(v))

View File

@@ -53,7 +53,7 @@ def apply_files(config):
target_fmt = data.get("target_format", "string")
overwrite = bool(data.get("overwrite", True))
contents = data.get("contents")
mode = data.get("mode")
mode = data.get("mode", None)
target = Path(expanduser(expandvars(path)))

View File

@@ -327,7 +327,7 @@ class ServiceCommandSection(BaseCommandSection):
def get_service(self, service_class):
return service_class(config=self._session.config)
def _resolve_name(self, name, service=None):
def _resolve_name(self, name, service=None, search_hidden=False):
"""
Resolve an object name to an object ID.
Operation:
@@ -349,7 +349,11 @@ class ServiceCommandSection(BaseCommandSection):
except AttributeError:
raise NameResolutionError('Name resolution unavailable for {}'.format(service))
request = request_cls.from_dict(dict(name=re.escape(name), only_fields=['name', 'id']))
req_dict = {"name": re.escape(name), "only_fields": ['name', 'id']}
if search_hidden:
req_dict["_allow_extra_fields_"] = True
req_dict["search_hidden"] = True
request = request_cls.from_dict(req_dict)
# from_dict will ignore unrecognised keyword arguments - not all GetAll's have only_fields
response = getattr(self._session.send_api(request), service)
matches = [db_object for db_object in response if name.lower() == db_object.name.lower()]

View File

@@ -1,14 +1,16 @@
import json
import re
import shlex
from copy import copy
from clearml_agent.backend_api.session import Request
from clearml_agent.helper.docker_args import DockerArgsSanitizer
from clearml_agent.helper.package.requirements import (
RequirementsManager, MarkerRequirement,
compare_version_rules, )
def resolve_default_container(session, task_id, container_config):
def resolve_default_container(session, task_id, container_config, ignore_match_rules=False):
container_lookup = session.config.get('agent.default_docker.match_rules', None)
if not session.check_min_api_version("2.13") or not container_lookup:
return container_config
@@ -17,6 +19,12 @@ def resolve_default_container(session, task_id, container_config):
try:
session.verify_feature_set('advanced')
except ValueError:
# ignoring matching rules only supported in higher tiers
return container_config
if ignore_match_rules:
print("INFO: default docker command line override, ignoring default docker container match rules")
# ignoring matching rules only supported in higher tiers
return container_config
result = session.send_request(
@@ -159,9 +167,10 @@ def resolve_default_container(session, task_id, container_config):
if not container_config.get('image'):
container_config['image'] = entry.get('image', None)
if not container_config.get('arguments'):
container_config['arguments'] = entry.get('arguments', None)
container_config['arguments'] = shlex.split(str(container_config.get('arguments') or '').strip())
print('Matching default container with rule:\n{}'.format(json.dumps(entry)))
container_config['arguments'] = entry.get('arguments', None) or ''
if isinstance(container_config.get('arguments'), str):
container_config['arguments'] = shlex.split(str(container_config.get('arguments') or '').strip())
print('INFO: Matching default container with rule:\n{}'.format(json.dumps(entry)))
return container_config
return container_config

File diff suppressed because it is too large Load Diff

View File

@@ -161,12 +161,14 @@ ENV_AGENT_SKIP_PYTHON_ENV_INSTALL = EnvironmentConfig("CLEARML_AGENT_SKIP_PYTHON
ENV_AGENT_FORCE_CODE_DIR = EnvironmentConfig("CLEARML_AGENT_FORCE_CODE_DIR")
ENV_AGENT_FORCE_EXEC_SCRIPT = EnvironmentConfig("CLEARML_AGENT_FORCE_EXEC_SCRIPT")
ENV_AGENT_FORCE_POETRY = EnvironmentConfig("CLEARML_AGENT_FORCE_POETRY", type=bool)
ENV_AGENT_FORCE_UV = EnvironmentConfig("CLEARML_AGENT_FORCE_UV", type=bool)
ENV_AGENT_FORCE_TASK_INIT = EnvironmentConfig("CLEARML_AGENT_FORCE_TASK_INIT", type=bool)
ENV_DOCKER_SKIP_GPUS_FLAG = EnvironmentConfig("CLEARML_DOCKER_SKIP_GPUS_FLAG", "TRAINS_DOCKER_SKIP_GPUS_FLAG")
ENV_AGENT_GIT_USER = EnvironmentConfig("CLEARML_AGENT_GIT_USER", "TRAINS_AGENT_GIT_USER")
ENV_AGENT_GIT_PASS = EnvironmentConfig("CLEARML_AGENT_GIT_PASS", "TRAINS_AGENT_GIT_PASS")
ENV_AGENT_GIT_HOST = EnvironmentConfig("CLEARML_AGENT_GIT_HOST", "TRAINS_AGENT_GIT_HOST")
ENV_AGENT_DISABLE_SSH_MOUNT = EnvironmentConfig("CLEARML_AGENT_DISABLE_SSH_MOUNT", type=bool)
ENV_AGENT_DEBUG_GET_NEXT_TASK = EnvironmentConfig("CLEARML_AGENT_DEBUG_GET_NEXT_TASK", type=bool)
ENV_SSH_AUTH_SOCK = EnvironmentConfig("SSH_AUTH_SOCK")
ENV_TASK_EXECUTE_AS_USER = EnvironmentConfig("CLEARML_AGENT_EXEC_USER", "TRAINS_AGENT_EXEC_USER")
ENV_TASK_EXTRA_PYTHON_PATH = EnvironmentConfig("CLEARML_AGENT_EXTRA_PYTHON_PATH", "TRAINS_AGENT_EXTRA_PYTHON_PATH")

View File

@@ -1,3 +1,5 @@
import shlex
from clearml_agent.helper.environment import EnvEntry
ENV_START_AGENT_SCRIPT_PATH = EnvEntry("CLEARML_K8S_GLUE_START_AGENT_SCRIPT_PATH", default="~/__start_agent__.sh")
@@ -12,3 +14,19 @@ ENV_POD_MONITOR_LOG_BATCH_SIZE = EnvEntry("K8S_GLUE_POD_MONITOR_LOG_BATCH_SIZE",
ENV_POD_MONITOR_DISABLE_ENQUEUE_ON_PREEMPTION = EnvEntry(
"K8S_GLUE_POD_MONITOR_DISABLE_ENQUEUE_ON_PREEMPTION", default=False, converter=bool
)
ENV_POD_USE_IMAGE_ENTRYPOINT = EnvEntry("K8S_GLUE_POD_USE_IMAGE_ENTRYPOINT", default=False, converter=bool)
"""
Do not inject a cmd and args to the container's image when building the k8s template (depend on the built-in image
entrypoint)
"""
ENV_KUBECTL_IGNORE_ERROR = EnvEntry("K8S_GLUE_IGNORE_KUBECTL_ERROR", default=None)
"""
Ignore kubectl errors matching this string pattern (allows ignoring warnings sent on stderr while
kubectl actually works and starts the pod)
"""
ENV_DEFAULT_SCHEDULER_QUEUE_TAGS = EnvEntry(
"K8S_GLUE_DEFAULT_SCHEDULER_QUEUE_TAGS", default=["k8s-glue"], converter=shlex.split
)

View File

@@ -25,6 +25,7 @@ from clearml_agent.definitions import (
ENV_AGENT_GIT_USER,
ENV_AGENT_GIT_PASS,
ENV_FORCE_SYSTEM_SITE_PACKAGES,
ENV_AGENT_DEBUG_GET_NEXT_TASK,
)
from clearml_agent.errors import APIError, UsageError
from clearml_agent.glue.errors import GetPodCountError
@@ -40,6 +41,9 @@ from clearml_agent.glue.definitions import (
ENV_START_AGENT_SCRIPT_PATH,
ENV_DEFAULT_EXECUTION_AGENT_ARGS,
ENV_POD_AGENT_INSTALL_ARGS,
ENV_POD_USE_IMAGE_ENTRYPOINT,
ENV_KUBECTL_IGNORE_ERROR,
ENV_DEFAULT_SCHEDULER_QUEUE_TAGS,
)
@@ -67,17 +71,25 @@ class K8sIntegration(Worker):
'echo "ldconfig" >> /etc/profile',
"/usr/sbin/sshd -p {port}"]
CONTAINER_BASH_SCRIPT = [
_CONTAINER_APT_SCRIPT_SECTION = [
"export DEBIAN_FRONTEND='noninteractive'",
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean",
"chown -R root /root/.cache/pip",
"apt-get update",
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0",
]
CONTAINER_BASH_SCRIPT = [
*(
'[ ! -z "$CLEARML_AGENT_SKIP_CONTAINER_APT" ] || {}'.format(line)
for line in _CONTAINER_APT_SCRIPT_SECTION
),
"declare LOCAL_PYTHON",
"[ ! -z $LOCAL_PYTHON ] || for i in {{15..5}}; do which python3.$i && python3.$i -m pip --version && "
"[ ! -z $LOCAL_PYTHON ] || for i in {{20..5}}; do which python3.$i && python3.$i -m pip --version && "
"export LOCAL_PYTHON=$(which python3.$i) && break ; done",
"[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip",
'[ ! -z "$CLEARML_AGENT_SKIP_CONTAINER_APT" ] || [ ! -z "$LOCAL_PYTHON" ] || apt-get install -y python3-pip',
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
"rm /usr/lib/python3.*/EXTERNALLY-MANAGED", # remove PEP 668
"{extra_bash_init_cmd}",
"[ ! -z $CLEARML_AGENT_NO_UPDATE ] || $LOCAL_PYTHON -m pip install clearml-agent{agent_install_args}",
"{extra_docker_bash_script}",
@@ -98,6 +110,7 @@ class K8sIntegration(Worker):
num_of_services=20,
base_pod_num=1,
user_props_cb=None,
runtime_cb=None,
overrides_yaml=None,
template_yaml=None,
clearml_conf_file=None,
@@ -125,6 +138,7 @@ class K8sIntegration(Worker):
:param callable user_props_cb: An Optional callable allowing additional user properties to be specified
when scheduling a task to run in a pod. Callable can receive an optional pod number and should return
a dictionary of user properties (name and value). Signature is [[Optional[int]], Dict[str,str]]
:param callable runtime_cb: An Optional callable allowing additional task runtime to be specified (see user_props_cb)
:param str overrides_yaml: YAML file containing the overrides for the pod (optional)
:param str template_yaml: YAML file containing the template for the pod (optional).
If provided the pod is scheduled with kubectl apply and overrides are ignored, otherwise with kubectl run.
@@ -159,6 +173,7 @@ class K8sIntegration(Worker):
self.base_pod_num = base_pod_num
self._edit_hyperparams_support = None
self._user_props_cb = user_props_cb
self._runtime_cb = runtime_cb
self.conf_file_content = None
self.overrides_json_string = None
self.template_dict = None
@@ -192,6 +207,18 @@ class K8sIntegration(Worker):
self._min_cleanup_interval_per_ns_sec = 1.0
self._last_pod_cleanup_per_ns = defaultdict(lambda: 0.)
self._server_supports_same_state_transition = (
self._session.feature_set != "basic" and self._session.check_min_server_version("3.22.3")
)
self.ignore_kubectl_errors_re = (
re.compile(ENV_KUBECTL_IGNORE_ERROR.get()) if ENV_KUBECTL_IGNORE_ERROR.get() else None
)
@property
def agent_label(self):
return self._get_agent_label()
def _create_daemon_instance(self, cls_, **kwargs):
return cls_(agent=self, **kwargs)
@@ -424,6 +451,9 @@ class K8sIntegration(Worker):
""" Called when a resource (pod/job) was applied """
pass
def ports_mode_supported_for_task(self, task_id: str, task_data):
return self.ports_mode
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
session = task_session or self._session
@@ -433,7 +463,9 @@ class K8sIntegration(Worker):
if self._is_same_tenant(task_session):
try:
print('Pushing task {} into temporary pending queue'.format(task_id))
_ = session.api_client.tasks.stop(task_id, force=True, status_reason="moving to k8s pending queue")
if not self._server_supports_same_state_transition:
_ = session.api_client.tasks.stop(task_id, force=True, status_reason="moving to k8s pending queue")
# Just make sure to clean up in case the task is stuck in the queue (known issue)
self._session.api_client.queues.remove_task(
@@ -441,13 +473,34 @@ class K8sIntegration(Worker):
queue=self.k8s_pending_queue_id,
)
res = self._session.api_client.tasks.enqueue(
task_id,
queue=self.k8s_pending_queue_id,
status_reason='k8s pending scheduler',
)
if res.meta.result_code != 200:
raise Exception(res.meta.result_msg)
for attempt in range(2):
res = self._session.send_request(
"tasks",
"enqueue",
json={
"task": task_id,
"queue": self.k8s_pending_queue_id,
"status_reason": "k8s pending scheduler",
"update_execution_queue": False,
}
)
if res.ok:
break
# noinspection PyBroadException
try:
result_subcode = res.json()["meta"]["result_subcode"]
result_msg = res.json()["meta"]["result_msg"]
except Exception:
result_subcode = None
result_msg = res.text
if attempt == 0 and res.status_code == 400 and result_subcode == 701:
# Invalid queue ID, only retry once
self._ensure_pending_queue_exists()
continue
raise Exception(result_msg)
except Exception as e:
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue {} [{}], error: {}".format(
task_id, self.k8s_pending_queue_name, self.k8s_pending_queue_id, e))
@@ -493,8 +546,10 @@ class K8sIntegration(Worker):
)
)
if self.ports_mode:
ports_mode = False
if self.ports_mode_supported_for_task(task_id, task_data):
print("Kubernetes looking for available pod to use")
ports_mode = True
# noinspection PyBroadException
try:
@@ -505,12 +560,12 @@ class K8sIntegration(Worker):
# Search for a free pod number
pod_count = 0
pod_number = self.base_pod_num
while self.ports_mode or self.max_pods_limit:
while ports_mode or self.max_pods_limit:
pod_number = self.base_pod_num + pod_count
try:
items_count = self._get_pod_count(
extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if self.ports_mode else None,
extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if ports_mode else None,
msg="Looking for a free pod/port"
)
except GetPodCountError:
@@ -560,11 +615,11 @@ class K8sIntegration(Worker):
break
pod_count += 1
labels = self._get_pod_labels(queue, queue_name)
if self.ports_mode:
labels = self._get_pod_labels(queue, queue_name, task_data)
if ports_mode:
labels.append(self.limit_pod_label.format(pod_number=pod_number))
if self.ports_mode:
if ports_mode:
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
else:
print("Kubernetes scheduling task id={}".format(task_id))
@@ -584,50 +639,130 @@ class K8sIntegration(Worker):
print("ERROR: no template for task {}, skipping".format(task_id))
return
pod_name = self.pod_name_prefix + str(task_id)
self.apply_template_and_handle_result(
pod_name=pod_name,
clearml_conf_create_script=clearml_conf_create_script,
labels=labels,
queue=queue,
task_id=task_id,
namespace=namespace,
template=template,
docker_image=container['image'],
docker_args=container.get('arguments'),
docker_bash=container.get('setup_shell_script'),
session=session,
task_session=task_session,
pod_number=pod_number,
queue_name=queue_name,
task_data=task_data,
ports_mode=ports_mode,
pod_count=pod_count,
)
def apply_template_and_handle_result(
self,
pod_name,
clearml_conf_create_script: List[str],
labels,
queue,
task_id,
namespace,
template,
docker_image,
docker_args,
docker_bash,
session,
task_session,
queue_name,
task_data,
ports_mode,
pod_count,
pod_number=None,
base_spec: dict = None,
):
"""Apply the provided template with all custom settings and handle bookkeeping for the reaults"""
output, error, pod_name = self._kubectl_apply(
pod_name=pod_name,
template=template,
pod_number=pod_number,
clearml_conf_create_script=clearml_conf_create_script,
labels=labels,
docker_image=container['image'],
docker_args=container.get('arguments'),
docker_bash=container.get('setup_shell_script'),
docker_image=docker_image,
docker_args=docker_args,
docker_bash=docker_bash,
task_id=task_id,
queue=queue,
namespace=namespace,
task_token=task_session.token.encode("ascii") if task_session else None,
base_spec=base_spec,
)
print('kubectl output:\n{}\n{}'.format(error, output))
if error:
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
return
if self.ignore_kubectl_errors_re and self.ignore_kubectl_errors_re.match(error):
print(f"Ignoring error due to {ENV_KUBECTL_IGNORE_ERROR.key}")
else:
self._set_task_failed_while_applying(
session, task_id, f"Running kubectl encountered an error: {error}"
)
return
if pod_name:
self.resource_applied(
resource_name=pod_name, namespace=namespace, task_id=task_id, session=session
)
self.set_task_info(
task_id=task_id, task_session=task_session, queue_name=queue_name, ports_mode=ports_mode,
pod_number=pod_number, pod_count=pod_count, task_data=task_data
)
def _set_task_failed_while_applying(self, session, task_id: str, error: str):
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
# Make sure to remove the task from our k8s pending queue
self._session.api_client.queues.remove_task(
task=task_id,
queue=self.k8s_pending_queue_id,
)
# Set task as failed
session.api_client.tasks.failed(task_id, force=True)
def set_task_info(
self, task_id: str, task_session, task_data, queue_name: str, ports_mode: bool, pod_number, pod_count
):
user_props = {"k8s-queue": str(queue_name)}
if self.ports_mode:
user_props.update(
{
"k8s-pod-number": pod_number,
"k8s-pod-label": labels[0],
"k8s-internal-pod-count": pod_count,
"k8s-agent": self._get_agent_label(),
}
)
runtime = {}
if ports_mode:
agent_label = self._get_agent_label()
user_props.update({
"k8s-pod-number": pod_number,
"k8s-pod-label": agent_label, # backwards-compatibility / legacy
"k8s-internal-pod-count": pod_count,
"k8s-agent": agent_label,
})
if self._user_props_cb:
# noinspection PyBroadException
try:
custom_props = self._user_props_cb(pod_number) if self.ports_mode else self._user_props_cb()
custom_props = self._user_props_cb(pod_number) if ports_mode else self._user_props_cb()
user_props.update(custom_props)
except Exception:
pass
if self._runtime_cb:
# noinspection PyBroadException
try:
custom_runtime = self._runtime_cb(pod_number) if ports_mode else self._runtime_cb()
runtime.update(custom_runtime)
except Exception:
pass
if user_props:
self._set_task_user_properties(
task_id=task_id,
@@ -635,7 +770,38 @@ class K8sIntegration(Worker):
**user_props
)
def _get_pod_labels(self, queue, queue_name):
if runtime:
task_runtime = self._get_task_runtime(task_id) or {}
task_runtime.update(runtime)
try:
res = task_session.send_request(
service='tasks', action='edit', method=Request.def_method,
json={
"task": task_id, "force": True, "runtime": task_runtime
},
)
if not res.ok:
raise Exception("failed setting runtime property")
except Exception as ex:
print("WARNING: failed setting custom runtime properties for task '{}': {}".format(task_id, ex))
def _get_task_runtime(self, task_id) -> Optional[dict]:
try:
res = self._session.send_request(
service='tasks', action='get_by_id', method=Request.def_method,
json={"task": task_id, "only_fields": ["runtime"]},
)
if not res.ok:
raise ValueError(f"request returned {res.status_code}")
data = res.json().get("data")
if not data or "task" not in data:
raise ValueError("empty data in result")
return data["task"].get("runtime", {})
except Exception as ex:
print(f"ERROR: Failed getting runtime properties for task {task_id}: {ex}")
def _get_pod_labels(self, queue, queue_name, task_data):
return [
self._get_agent_label(),
"{}={}".format(self.QUEUE_LABEL, self._safe_k8s_label_value(queue)),
@@ -673,7 +839,7 @@ class K8sIntegration(Worker):
def _create_template_container(
self, pod_name: str, task_id: str, docker_image: str, docker_args: List[str],
docker_bash: str, clearml_conf_create_script: List[str], task_worker_id: str
docker_bash: str, clearml_conf_create_script: List[str], task_worker_id: str, task_token: str = None
) -> dict:
container = self._get_docker_args(
docker_args,
@@ -682,16 +848,32 @@ class K8sIntegration(Worker):
convert=lambda env: {'name': env.partition("=")[0], 'value': env.partition("=")[2]},
)
# Set worker ID
env_vars = container.get('env', [])
found_worker_id = False
for entry in env_vars:
if entry.get('name') == 'CLEARML_WORKER_ID':
entry['name'] = task_worker_id
found_worker_id = True
if not found_worker_id:
container['env'] = env_vars + [{'name': 'CLEARML_WORKER_ID', 'value': task_worker_id}]
def add_or_update_env_var(name, value):
env_vars = container.get('env', [])
for entry in env_vars:
if entry.get('name') == name:
entry['value'] = value
break
else:
container['env'] = env_vars + [{'name': name, 'value': value}]
# Set worker ID
add_or_update_env_var('CLEARML_WORKER_ID', task_worker_id)
if ENV_POD_USE_IMAGE_ENTRYPOINT.get():
# Don't add a cmd and args, just the image
# Add the task ID and token since we need it (it's usually in the init script passed to us
add_or_update_env_var('CLEARML_TASK_ID', task_id)
if task_token:
# TODO: find a way to base64 encode the token
add_or_update_env_var('CLEARML_AUTH_TOKEN', task_token)
return self._merge_containers(
container, dict(name=pod_name, image=docker_image)
)
# Create bash script for container and
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
else self.container_bash_script
@@ -731,6 +913,7 @@ class K8sIntegration(Worker):
def _kubectl_apply(
self,
pod_name,
clearml_conf_create_script: List[str],
docker_image,
docker_args,
@@ -740,7 +923,9 @@ class K8sIntegration(Worker):
task_id,
namespace,
template,
pod_number=None
pod_number=None,
task_token=None,
base_spec: dict = None, # base values for the spec (might be overridden)
):
if "apiVersion" not in template:
template["apiVersion"] = "batch/v1" if self.using_jobs else "v1"
@@ -755,8 +940,7 @@ class K8sIntegration(Worker):
template["kind"] = self.kind.capitalize()
metadata = template.setdefault('metadata', {})
name = self.pod_name_prefix + str(task_id)
metadata['name'] = name
metadata['name'] = pod_name
def place_labels(metadata_dict):
labels_dict = dict(pair.split('=', 1) for pair in labels)
@@ -771,24 +955,28 @@ class K8sIntegration(Worker):
spec.setdefault('backoffLimit', 0)
spec_template = spec.setdefault('template', {})
if labels:
# Place same labels fro any pod spawned by the job
# Place same labels for any pod spawned by the job
place_labels(spec_template.setdefault('metadata', {}))
spec = spec_template.setdefault('spec', {})
if base_spec:
merge_dicts(spec, base_spec)
containers = spec.setdefault('containers', [])
spec.setdefault('restartPolicy', 'Never')
task_worker_id = self.get_task_worker_id(template, task_id, name, namespace, queue)
task_worker_id = self.get_task_worker_id(template, task_id, pod_name, namespace, queue)
container = self._create_template_container(
pod_name=name,
pod_name=pod_name,
task_id=task_id,
docker_image=docker_image,
docker_args=docker_args,
docker_bash=docker_bash,
clearml_conf_create_script=clearml_conf_create_script,
task_worker_id=task_worker_id
task_worker_id=task_worker_id,
task_token=task_token,
)
if containers:
@@ -827,7 +1015,7 @@ class K8sIntegration(Worker):
finally:
safe_remove_file(yaml_file)
return stringify_bash_output(output), stringify_bash_output(error), name
return stringify_bash_output(output), stringify_bash_output(error), pod_name
def _process_bash_lines_response(self, bash_cmd: str, raise_error=True):
res = get_bash_output(bash_cmd, raise_error=raise_error)
@@ -912,7 +1100,7 @@ class K8sIntegration(Worker):
deleted_pods = defaultdict(list)
for namespace in namespaces:
if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec:
# Do not try to cleanup the same namespace too quickly
# Do not try to clean up the same namespace too quickly
continue
try:
@@ -935,7 +1123,7 @@ class K8sIntegration(Worker):
result = self._session.get(
service='tasks',
action='get_all',
json={"id": task_ids, "status": ["in_progress", "queued"], "only_fields": ["id", "status"]},
json={"id": task_ids, "status": ["in_progress", "queued"], "only_fields": ["id", "status", "status_reason"]},
method=Request.def_method,
)
tasks_to_abort = result["tasks"]
@@ -945,9 +1133,13 @@ class K8sIntegration(Worker):
for task in tasks_to_abort:
task_id = task.get("id")
status = task.get("status")
status_reason = (task.get("status_reason") or "").lower()
if not task_id or not status:
self.log.warning('Failed getting task information: id={}, status={}'.format(task_id, status))
continue
if status == "queued" and "pushed back by policy manager" in status_reason:
# Task was pushed back to policy queue by policy manager, don't touch it
continue
try:
if status == "queued":
self._session.get(
@@ -981,6 +1173,24 @@ class K8sIntegration(Worker):
return deleted_pods
def check_if_suspended(self) -> bool:
pass
def check_if_schedulable(self, queue: str) -> bool:
return True
def _ensure_pending_queue_exists(self):
resolved_ids = self._resolve_queue_names(
[self.k8s_pending_queue_name],
create_if_missing=True,
create_system_tags=ENV_DEFAULT_SCHEDULER_QUEUE_TAGS.get()
)
if not resolved_ids:
raise ValueError(
"Failed resolving or creating k8s pending queue {}".format(self.k8s_pending_queue_name)
)
self.k8s_pending_queue_id = resolved_ids[0]
def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):
"""
:summary: Pull and run tasks from queues.
@@ -992,16 +1202,12 @@ class K8sIntegration(Worker):
:param worker_params: Worker command line arguments
:type worker_params: ``clearml_agent.helper.process.WorkerParams``
"""
# print("debug> running tasks loop")
events_service = self.get_service(Events)
# make sure we have a k8s pending queue
if not self.k8s_pending_queue_id:
resolved_ids = self._resolve_queue_names([self.k8s_pending_queue_name], create_if_missing=True)
if not resolved_ids:
raise ValueError(
"Failed resolving or creating k8s pending queue {}".format(self.k8s_pending_queue_name)
)
self.k8s_pending_queue_id = resolved_ids[0]
self._ensure_pending_queue_exists()
_last_machine_update_ts = 0
while True:
@@ -1023,12 +1229,22 @@ class K8sIntegration(Worker):
continue
# iterate over queues (priority style, queues[0] is highest)
# print("debug> iterating over queues")
for queue in queues:
# delete old completed / failed pods
self._cleanup_old_pods(namespaces, extra_msg="Cleanup cycle {cmd}")
if self.check_if_suspended():
print("Agent is suspended, sleeping for {:.1f} seconds".format(self._polling_interval))
sleep(self._polling_interval)
break
if not self.check_if_schedulable(queue):
continue
# get next task in queue
try:
# print(f"debug> getting tasks for queue {queue}")
response = self._get_next_task(queue=queue, get_task_info=self._impersonate_as_task_owner)
except Exception as e:
print("Warning: Could not access task queue [{}], error: {}".format(queue, e))

View File

@@ -1,11 +1,9 @@
from time import sleep
from typing import Dict, Tuple, Optional, List
from typing import Dict, List
from clearml_agent.backend_api.session import Request
from clearml_agent.glue.utilities import get_bash_output
from clearml_agent.helper.process import stringify_bash_output
from .daemon import K8sDaemon
from .utilities import get_path
from .errors import GetPodsError
@@ -38,7 +36,11 @@ class PendingPodsDaemon(K8sDaemon):
return get_path(pod, "metadata", "name")
def _get_task_id(self, pod: dict):
return self._get_k8s_resource_name(pod).rpartition('-')[-1]
prefix, _, value = self._get_k8s_resource_name(pod).rpartition('-')
if len(value) > 4:
return value
# we assume this is a multi-node rank x (>0) pod
return prefix.rpartition('-')[-1] or value
@staticmethod
def _get_k8s_resource_namespace(pod: dict):
@@ -239,6 +241,11 @@ class PendingPodsDaemon(K8sDaemon):
result_msg = get_path(result.json(), 'meta', 'result_msg')
raise Exception(result_msg or result.text)
self._agent.send_logs(
task_id, ["Kubernetes Pod status: {}".format(msg)],
session=self._session
)
# update last msg for this task
self._last_tasks_msgs[task_id] = msg
except Exception as ex:

View File

@@ -543,6 +543,36 @@ def convert_cuda_version_to_int_10_base_str(cuda_version):
return str(int(float(cuda_version)*10))
def get_python_version(python_executable, log=None):
from clearml_agent.helper.process import Argv
try:
output = Argv(python_executable, "--version").get_output(
stderr=subprocess.STDOUT
)
except subprocess.CalledProcessError as ex:
# Windows returns 9009 code and suggests to install Python from Windows Store
if is_windows_platform() and ex.returncode == 9009:
if log:
log.debug("version not found: {}".format(ex))
else:
if log:
log.warning("error getting %s version: %s", python_executable, ex)
return None
except FileNotFoundError as ex:
if log:
log.debug("version not found: {}".format(ex))
return None
match = re.search(r"Python ({}(?:\.\d+)*)".format(r"\d+"), output)
if match:
if log:
log.debug("Found: {}".format(python_executable))
# only return major.minor version
return ".".join(str(match.group(1)).split(".")[:2])
return None
class NonStrictAttrs(object):
@classmethod

View File

@@ -14,7 +14,7 @@ def merge_dicts(dict1, dict2, custom_merge_func=None):
return dict2
for k in dict2:
if k in dict1:
res = None
res = _not_set
if custom_merge_func:
res = custom_merge_func(k, dict1[k], dict2[k], _not_set)
dict1[k] = merge_dicts(dict1[k], dict2[k], custom_merge_func) if res is _not_set else res

View File

@@ -69,7 +69,7 @@ def or_(*converters, **kwargs):
return wrapper
def strtobool (val):
def strtobool(val):
"""Convert a string representation of truth to true (1) or false (0).
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values

View File

@@ -29,9 +29,12 @@ class PackageManager(object):
_config_cache_max_entries = 'agent.venvs_cache.max_entries'
_config_cache_free_space_threshold = 'agent.venvs_cache.free_space_threshold_gb'
_config_cache_lock_timeout = 'agent.venvs_cache.lock_timeout'
_config_pip_legacy_resolver = 'agent.package_manager.pip_legacy_resolver'
def __init__(self):
self._cache_manager = None
self._existing_packages = []
self._base_install_flags = []
@abc.abstractproperty
def bin(self):
@@ -79,6 +82,23 @@ class PackageManager(object):
# type: (Iterable[Text]) -> None
pass
def add_extra_install_flags(self, extra_flags): # type: (List[str]) -> None
if extra_flags:
extra_flags = [
e for e in extra_flags if e not in list(self._base_install_flags)
]
self._base_install_flags = list(self._base_install_flags) + list(extra_flags)
def remove_extra_install_flags(self, extra_flags): # type: (List[str]) -> bool
if extra_flags:
_base_install_flags = [
e for e in self._base_install_flags if e not in list(extra_flags)
]
if self._base_install_flags != _base_install_flags:
self._base_install_flags = _base_install_flags
return True
return False
def upgrade_pip(self):
result = self._install(
*select_for_platform(
@@ -87,19 +107,58 @@ class PackageManager(object):
),
"--upgrade"
)
packages = self.run_with_env(('list',), output=True).splitlines()
# p.split is ('pip', 'x.y.z')
pip = [p.split() for p in packages if len(p.split()) == 2 and p.split()[0] == 'pip']
if pip:
# noinspection PyBroadException
packages = (self.freeze(freeze_full_environment=True) or dict()).get("pip")
if packages:
from clearml_agent.helper.package.requirements import RequirementsManager
from .requirements import MarkerRequirement, SimpleVersion
# store existing packages so that we can check if we can skip preinstalled packages
# we will only check "@ file" "@ vcs" for exact match
self._existing_packages = RequirementsManager.parse_requirements_section_to_marker_requirements(
packages, skip_local_file_validation=True)
try:
from .requirements import MarkerRequirement
pip = pip[0][1].split('.')
MarkerRequirement.pip_new_version = bool(int(pip[0]) >= 20)
except Exception:
pass
pip_pkg = next(p for p in self._existing_packages if p.name == "pip")
except StopIteration:
pip_pkg = None
# check if we need to list the pip version as well
if pip_pkg:
MarkerRequirement.pip_new_version = SimpleVersion.compare_versions(pip_pkg.version, ">=", "20")
# add --use-deprecated=legacy-resolver to pip install to avoid mismatched packages issues
self._add_legacy_resolver_flag(pip_pkg.version)
return result
def _add_legacy_resolver_flag(self, pip_pkg_version):
if not self.session.config.get(self._config_pip_legacy_resolver, None):
return
from .requirements import SimpleVersion
match_versions = self.session.config.get(self._config_pip_legacy_resolver)
matched = False
for rule in match_versions:
matched = False
# make sure we match all the parts of the rule
for a_version in rule.split(","):
o, v = SimpleVersion.split_op_version(a_version.strip())
matched = SimpleVersion.compare_versions(pip_pkg_version, o, v)
if not matched:
break
# if the rule is fully matched we have a match
if matched:
break
legacy_resolver_flags = ["--use-deprecated=legacy-resolver"]
if matched:
print("INFO: Using legacy resolver for PIP to avoid inconsistency with package versions!")
self.add_extra_install_flags(legacy_resolver_flags)
elif self.remove_extra_install_flags(legacy_resolver_flags):
print("INFO: removing pip legacy resolver!")
def get_python_command(self, extra=()):
# type: (...) -> Executable
return Argv(self.bin, *extra)
@@ -149,6 +208,18 @@ class PackageManager(object):
return False
except Exception:
return False
try:
from .requirements import Requirement, MarkerRequirement
req = MarkerRequirement(Requirement.parse(package_name))
# if pip was part of the requirements, make sure we update the flags
# add --use-deprecated=legacy-resolver to pip install to avoid mismatched packages issues
if req.name == "pip" and req.version:
PackageManager._selected_manager._add_legacy_resolver_flag(req.version)
except Exception as e:
print("WARNING: Error while parsing pip version legacy [{}]".format(e))
return True
@classmethod
@@ -219,6 +290,8 @@ class PackageManager(object):
if not self._get_cache_manager():
return
print('Adding venv into cache: {}'.format(source_folder))
try:
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
return self._get_cache_manager().add_entry(

View File

@@ -97,7 +97,7 @@ class SystemPip(PackageManager):
return Argv(self.bin, '-m', 'pip', '--disable-pip-version-check', *command)
def install_flags(self):
indices_args = tuple(
base_args = tuple(self._base_install_flags or []) + tuple(
chain.from_iterable(('--extra-index-url', x) for x in PIP_EXTRA_INDICES)
)
@@ -105,7 +105,7 @@ class SystemPip(PackageManager):
ENV_PIP_EXTRA_INSTALL_FLAGS.get() or \
self.session.config.get("agent.package_manager.extra_pip_install_flags", None)
return (indices_args + tuple(extra_pip_flags)) if extra_pip_flags else indices_args
return (base_args + tuple(extra_pip_flags)) if extra_pip_flags else base_args
def download_flags(self):
indices_args = tuple(

View File

@@ -37,7 +37,9 @@ class VirtualenvPip(SystemPip, PackageManager):
def load_requirements(self, requirements):
if isinstance(requirements, dict) and requirements.get("pip"):
requirements["pip"] = self.requirements_manager.replace(requirements["pip"])
requirements["pip"] = self.requirements_manager.replace(
requirements["pip"], existing_packages=self._existing_packages
)
super(VirtualenvPip, self).load_requirements(requirements)
self.requirements_manager.post_install(self.session, package_manager=self)
@@ -64,9 +66,18 @@ class VirtualenvPip(SystemPip, PackageManager):
Only valid if instantiated with path.
Use self.python as self.bin does not exist.
"""
self.session.command(
self.python, "-m", "virtualenv", self.path, *self.create_flags()
).check_call()
# noinspection PyBroadException
try:
self.session.command(
self.python, "-m", "virtualenv", self.path, *self.create_flags()
).check_call()
except Exception as ex:
# let's try with std library instead
print("WARNING: virtualenv call failed: {}\n INFO: Creating virtual environment with venv".format(ex))
self.session.command(
self.python, "-m", "venv", self.path, *self.create_flags()
).check_call()
return self
def remove(self):

View File

@@ -7,7 +7,7 @@ from .requirements import SimpleSubstitution
class PriorityPackageRequirement(SimpleSubstitution):
name = ("cython", "numpy", "setuptools", "pip", )
name = ("cython", "numpy", "setuptools", "pip", "uv", )
optional_package_names = tuple()
def __init__(self, *args, **kwargs):
@@ -53,12 +53,18 @@ class PriorityPackageRequirement(SimpleSubstitution):
if not self._replaced_packages:
return list_of_requirements
# we assume that both pip & setup tools are not in list_of_requirements, and we need to add them
if "pip" in self._replaced_packages:
full_freeze = PackageManager.out_of_scope_freeze(freeze_full_environment=True)
# now let's look for pip
pips = [line for line in full_freeze.get("pip", []) if line.split("==")[0] == "pip"]
if pips and "pip" in list_of_requirements:
list_of_requirements["pip"] = [pips[0]] + list_of_requirements["pip"]
if not full_freeze:
if "pip" in list_of_requirements:
list_of_requirements["pip"] = [self._replaced_packages["pip"]] + list_of_requirements["pip"]
else:
# now let's look for pip
pips = [line for line in full_freeze.get("pip", []) if str(line.split("==")[0]).strip() == "pip"]
if pips and "pip" in list_of_requirements:
list_of_requirements["pip"] = [pips[0]] + list_of_requirements["pip"]
if "setuptools" in self._replaced_packages:
try:
@@ -87,6 +93,20 @@ class PriorityPackageRequirement(SimpleSubstitution):
return list_of_requirements
class CachedPackageRequirement(PriorityPackageRequirement):
name = ("setuptools", "pip", )
optional_package_names = tuple()
def replace(self, req):
"""
Put the requirement in the list for later conversion
:raises: ValueError if version is pre-release
"""
self._replaced_packages[req.name] = req.line
return Text(req)
class PackageCollectorRequirement(SimpleSubstitution):
"""
This RequirementSubstitution class will allow you to have multiple instances of the same

View File

@@ -19,7 +19,7 @@ import logging
from clearml_agent.definitions import PIP_EXTRA_INDICES
from clearml_agent.helper.base import (
warning, is_conda, which, join_lines, is_windows_platform,
convert_cuda_version_to_int_10_base_str, )
convert_cuda_version_to_int_10_base_str, dump_yaml, )
from clearml_agent.helper.process import Argv, PathLike
from clearml_agent.helper.gpu.gpustat import get_driver_cuda_version
from clearml_agent.session import Session, normalize_cuda_version
@@ -94,6 +94,12 @@ class MarkerRequirement(object):
def __repr__(self):
return '{self.__class__.__name__}[{self}]'.format(self=self)
def __eq__(self, other):
return isinstance(other, MarkerRequirement) and str(self) == str(other)
def __hash__(self):
return str(self).__hash__()
def format_specs(self, num_parts=None, max_num_parts=None):
max_num_parts = max_num_parts or num_parts
if max_num_parts is None or not self.specs:
@@ -116,6 +122,10 @@ class MarkerRequirement(object):
def specs(self): # type: () -> List[Tuple[Text, Text]]
return self.req.specs
@property
def version(self): # type: () -> Text
return self.specs[0][1] if self.specs else ""
@specs.setter
def specs(self, value): # type: (List[Tuple[Text, Text]]) -> None
self.req.specs = value
@@ -143,6 +153,8 @@ class MarkerRequirement(object):
If the requested version is 1.2 the self.spec should be 1.2*
etc.
usage: it returns the value of the following comparison: requested_version "op" self.version
:param str requested_version:
:param str op: '==', '>', '>=', '<=', '<', '~='
:param int num_parts: number of parts to compare
@@ -152,7 +164,7 @@ class MarkerRequirement(object):
if not self.specs:
return True
version = self.specs[0][1]
version = self.version
op = (op or self.specs[0][0]).strip()
return SimpleVersion.compare_versions(
@@ -170,11 +182,21 @@ class MarkerRequirement(object):
self.req.local_file = False
return True
def validate_local_file_ref(self):
def is_local_package_ref(self):
# if local file does not exist, remove the reference to it
if self.vcs or self.editable or self.path or not self.local_file or not self.name or \
not self.uri or not self.uri.startswith("file://"):
return False
return True
def is_vcs_ref(self):
return bool(self.vcs)
def validate_local_file_ref(self):
# if local file does not exist, remove the reference to it
if not self.is_local_package_ref():
return
local_path = Path(self.uri[len("file://"):])
if not local_path.exists():
local_path = Path(unquote(self.uri)[len("file://"):])
@@ -221,6 +243,19 @@ class SimpleVersion:
_local_version_separators = re.compile(r"[\._-]")
_regex = re.compile(r"^\s*" + VERSION_PATTERN + r"\s*$", re.VERBOSE | re.IGNORECASE)
@classmethod
def split_op_version(cls, line):
"""
Split a string in the form of ">=1.2.3" into a (op, version), i.e. (">=", "1.2.3")
Notice is calling with only a version string (e.g. "1.2.3") default operator is "=="
which means you get ("==", "1.2.3")
:param line: string examples: "<=0.1.2"
:return: tuple of (op, version) example ("<=", "0.1.2")
"""
match = r"\s*([>=<~!]*)\s*(\S*)\s*"
groups = re.match(match, line).groups()
return groups[0] or "==", groups[1]
@classmethod
def compare_versions(cls, version_a, op, version_b, ignore_sub_versions=True, num_parts=3):
"""
@@ -624,14 +659,54 @@ class RequirementsManager(object):
return handler.replace(req)
return None
def replace(self, requirements): # type: (Text) -> Text
def replace(
self,
requirements, # type: Text
existing_packages=None, # type: List[MarkerRequirement]
pkg_skip_existing_local=True, # type: bool
pkg_skip_existing_vcs=True, # type: bool
pkg_skip_existing=True, # type: bool
): # type: (...) -> Text
parsed_requirements = self.parse_requirements_section_to_marker_requirements(
requirements=requirements, cwd=self._cwd)
requirements=requirements, cwd=self._cwd, skip_local_file_validation=True)
if parsed_requirements and existing_packages:
skipped_packages = None
if pkg_skip_existing:
skipped_packages = set(parsed_requirements) & set(existing_packages)
elif pkg_skip_existing_local or pkg_skip_existing_vcs:
existing_packages = [
p for p in existing_packages if (
(pkg_skip_existing_local and p.is_local_package_ref()) or
(pkg_skip_existing_vcs and p.is_vcs_ref())
)
]
skipped_packages = set(parsed_requirements) & set(existing_packages)
if skipped_packages:
# maintain order
num_skipped_packages = len(parsed_requirements)
parsed_requirements = [p for p in parsed_requirements if p not in skipped_packages]
num_skipped_packages -= len(parsed_requirements)
print("Skipping {} pre-installed packages:\n{}Remaining {} additional packages to install".format(
num_skipped_packages,
dump_yaml(sorted([str(p) for p in skipped_packages])),
len(parsed_requirements)
))
# nothing to install!
if not parsed_requirements:
return ""
# sanity check
if not parsed_requirements:
# return the original requirements just in case
return requirements
# remove local file reference that do not exist
for p in parsed_requirements:
p.validate_local_file_ref()
def replace_one(i, req):
# type: (int, MarkerRequirement) -> Optional[Text]
try:
@@ -805,7 +880,7 @@ class RequirementsManager(object):
normalize_cuda_version(cudnn_version or 0))
@staticmethod
def parse_requirements_section_to_marker_requirements(requirements, cwd=None):
def parse_requirements_section_to_marker_requirements(requirements, cwd=None, skip_local_file_validation=False):
def safe_parse(req_str):
# noinspection PyBroadException
try:
@@ -815,7 +890,8 @@ class RequirementsManager(object):
def create_req(x):
r = MarkerRequirement(x)
r.validate_local_file_ref()
if not skip_local_file_validation:
r.validate_local_file_ref()
return r
if not requirements:

View File

@@ -0,0 +1,234 @@
from copy import deepcopy
from functools import wraps
import attr
import sys
import os
from pathlib2 import Path
from clearml_agent.definitions import ENV_AGENT_FORCE_UV
from clearml_agent.helper.base import select_for_platform
from clearml_agent.helper.process import Argv, DEVNULL, check_if_command_exists
from clearml_agent.session import Session, UV
def prop_guard(prop, log_prop=None):
assert isinstance(prop, property)
assert not log_prop or isinstance(log_prop, property)
def decorator(func):
message = "%s:%s calling {}, {} = %s".format(func.__name__, prop.fget.__name__)
@wraps(func)
def new_func(self, *args, **kwargs):
prop_value = prop.fget(self)
if log_prop:
log_prop.fget(self).debug(
message,
type(self).__name__,
"" if prop_value else " not",
prop_value,
)
if prop_value:
return func(self, *args, **kwargs)
return new_func
return decorator
class UvConfig:
def __init__(self, session):
# type: (Session, str) -> None
self.session = session
self._log = session.get_logger(__name__)
self._python = (
sys.executable
) # default, overwritten from session config in initialize()
self._initialized = False
@property
def log(self):
return self._log
@property
def enabled(self):
return (
ENV_AGENT_FORCE_UV.get()
or self.session.config["agent.package_manager.type"] == UV
)
_guard_enabled = prop_guard(enabled, log)
def run(self, *args, **kwargs):
func = kwargs.pop("func", Argv.get_output)
kwargs.setdefault("stdin", DEVNULL)
kwargs["env"] = deepcopy(os.environ)
if "VIRTUAL_ENV" in kwargs["env"] or "CONDA_PREFIX" in kwargs["env"]:
kwargs["env"].pop("VIRTUAL_ENV", None)
kwargs["env"].pop("CONDA_PREFIX", None)
kwargs["env"].pop("PYTHONPATH", None)
if hasattr(sys, "real_prefix") and hasattr(sys, "base_prefix"):
path = ":" + kwargs["env"]["PATH"]
path = path.replace(":" + sys.base_prefix, ":" + sys.real_prefix, 1)
kwargs["env"]["PATH"] = path
if self.session and self.session.config and args and args[0] == "sync":
# Set the cache dir to venvs dir
cache_dir = self.session.config.get("agent.venvs_dir", None)
if cache_dir is not None:
os.environ["UV_CACHE_DIR"] = cache_dir
extra_args = self.session.config.get(
"agent.package_manager.uv_sync_extra_args", None
)
if extra_args:
args = args + tuple(extra_args)
if check_if_command_exists("uv"):
argv = Argv("uv", *args)
else:
argv = Argv(self._python, "-m", "uv", *args)
self.log.debug("running: %s", argv)
return func(argv, **kwargs)
@_guard_enabled
def initialize(self, cwd=None):
if not self._initialized:
# use correct python version -- detected in Worker.install_virtualenv() and written to
# session
if self.session.config.get("agent.python_binary", None):
self._python = self.session.config.get("agent.python_binary")
if (
self.session.config.get("agent.package_manager.uv_version", None)
is not None
):
version = str(
self.session.config.get("agent.package_manager.uv_version")
)
# get uv version
version = version.replace(" ", "")
if (
("=" in version)
or ("~" in version)
or ("<" in version)
or (">" in version)
):
version = version
elif version:
version = "==" + version
# (we are not running it yet)
argv = Argv(
self._python,
"-m",
"pip",
"install",
"uv{}".format(version),
"--upgrade",
"--disable-pip-version-check",
)
# this is just for beauty and checks, we already set the verion in the Argv
if not version:
version = "latest"
else:
# mark to install uv if not already installed (we are not running it yet)
argv = Argv(
self._python,
"-m",
"pip",
"install",
"uv",
"--disable-pip-version-check",
)
version = ""
# first upgrade pip if we need to
try:
from clearml_agent.helper.package.pip_api.venv import VirtualenvPip
pip = VirtualenvPip(
session=self.session,
python=self._python,
requirements_manager=None,
path=None,
interpreter=self._python,
)
pip.upgrade_pip()
except Exception as ex:
self.log.warning("failed upgrading pip: {}".format(ex))
# check if we do not have a specific version and uv is found skip installation
if not version and check_if_command_exists("uv"):
print(
"Notice: uv was found, no specific version required, skipping uv installation"
)
else:
print("Installing / Upgrading uv package to {}".format(version))
# now install uv
try:
print(argv.get_output())
except Exception as ex:
self.log.warning("failed installing uv: {}".format(ex))
# all done.
self._initialized = True
def get_api(self, path):
# type: (Path) -> UvAPI
return UvAPI(self, path)
@attr.s
class UvAPI(object):
config = attr.ib(type=UvConfig)
path = attr.ib(type=Path, converter=Path)
INDICATOR_FILES = "pyproject.toml", "uv.lock"
def install(self):
# type: () -> bool
if self.enabled:
self.config.run("sync", "--locked", cwd=str(self.path), func=Argv.check_call)
return True
return False
@property
def enabled(self):
return self.config.enabled and (
any((self.path / indicator).exists() for indicator in self.INDICATOR_FILES)
)
def freeze(self, freeze_full_environment=False):
python = Path(self.path) / ".venv" / select_for_platform(linux="bin/python", windows="scripts/python.exe")
lines = self.config.run("pip", "freeze", "--python", str(python), cwd=str(self.path)).splitlines()
# fix local filesystem reference in freeze
from clearml_agent.external.requirements_parser.requirement import Requirement
packages = [Requirement.parse(p) for p in lines]
for p in packages:
if p.local_file and p.editable:
p.path = str(Path(p.path).relative_to(self.path))
p.line = "-e {}".format(p.path)
return {
"pip": [p.line for p in packages]
}
def get_python_command(self, extra):
if check_if_command_exists("uv"):
return Argv("uv", "run", "python", *extra)
else:
return Argv(self.config._python, "-m", "uv", "run", "python", *extra)
def upgrade_pip(self, *args, **kwargs):
pass
def set_selected_package_manager(self, *args, **kwargs):
pass
def out_of_scope_install_package(self, *args, **kwargs):
pass
def install_from_file(self, *args, **kwargs):
pass

View File

@@ -598,7 +598,7 @@ class Git(VCS):
def pull(self):
self._set_ssh_url()
self.call("fetch", "--all", "--recurse-submodules", cwd=self.location)
self.call("fetch", "--all", "--tags", "--recurse-submodules", cwd=self.location)
def _git_pass_auth_wrapper(self, func, *args, **kwargs):
try:
@@ -936,7 +936,7 @@ def _locate_future_import(lines):
def patch_add_task_init_call(local_filename):
if not local_filename or not Path(local_filename).is_file():
if not local_filename or not Path(local_filename).is_file() or not str(local_filename).lower().endswith(".py"):
return
idx_a = 0

View File

@@ -401,6 +401,7 @@ class ResourceMonitor(object):
fractions = self._fractions_handler.fractions
stats["gpu_fraction_{}".format(report_index)] = \
(fractions[i] if i < len(fractions) else fractions[-1]) if fractions else 1.0
report_index += 1
except Exception as ex:
# something happened and we can't use gpu stats,
@@ -438,6 +439,7 @@ class ResourceMonitor(object):
class GpuFractionsHandler:
_number_re = re.compile(r"^clear\.ml/fraction(-\d+)?$")
_mig_re = re.compile(r"^nvidia\.com/mig-(?P<compute>[0-9]+)g\.(?P<memory>[0-9]+)gb$")
_frac_gpu_injector_re = re.compile(r"^clearml-injector/fraction$")
_gpu_name_to_memory_gb = {
"A30": 24,
@@ -514,10 +516,14 @@ class GpuFractionsHandler:
return 0
@classmethod
def encode_fractions(cls, limits: dict) -> str:
if any(cls._number_re.match(x) for x in (limits or {})):
return ",".join(str(v) for k, v in sorted(limits.items()) if cls._number_re.match(k))
return ",".join(("{}:{}".format(k, v) for k, v in (limits or {}).items() if cls._mig_re.match(k)))
def encode_fractions(cls, limits: dict, annotations: dict) -> str:
if limits:
if any(cls._number_re.match(x) for x in (limits or {})):
return ",".join(str(v) for k, v in sorted(limits.items()) if cls._number_re.match(k))
return ",".join(("{}:{}".format(k, v) for k, v in (limits or {}).items() if cls._mig_re.match(k)))
elif annotations:
if any(cls._frac_gpu_injector_re.match(x) for x in (annotations or {})):
return ",".join(str(v) for k, v in sorted(annotations.items()) if cls._frac_gpu_injector_re.match(k))
@staticmethod
def decode_fractions(fractions: str) -> Union[List[float], Dict[str, int]]:

View File

@@ -67,7 +67,10 @@ DAEMON_ARGS = dict({
'group': 'Docker support',
},
'--queue': {
'help': 'Queue ID(s)/Name(s) to pull tasks from (\'default\' queue)',
'help': 'Queue ID(s)/Name(s) to pull tasks from (\'default\' queue).'
' Note that the queue list order determines priority, with the first listed queue having the'
' highest priority. To change this behavior, use --order-fairness to pull from each queue in a'
' round-robin order',
'nargs': '+',
'default': tuple(),
'dest': 'queues',
@@ -112,8 +115,11 @@ DAEMON_ARGS = dict({
'--dynamic-gpus': {
'help': 'Allow to dynamically allocate gpus based on queue properties, '
'configure with \'--queue <queue_name>=<num_gpus>\'.'
' Example: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 single_gpu=1\''
' Example Opportunistic: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 max_quad_gpus=1-4 \'',
' Example: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 single_gpu=1\'.'
' Example Opportunistic: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 max_quad_gpus=1-4\'.'
' Note that the queue list order determines priority, with the first listed queue having the'
' highest priority. To change this behavior, use --order-fairness to pull from each queue in a'
' round-robin order',
'action': 'store_true',
},
'--uptime': {

View File

@@ -24,6 +24,7 @@ from clearml_agent.helper.docker_args import DockerArgsSanitizer, sanitize_urls
from .version import __version__
POETRY = "poetry"
UV = "uv"
@attr.s

View File

@@ -1 +1 @@
__version__ = '1.8.1'
__version__ = '1.9.3'

View File

@@ -53,8 +53,9 @@ agent {
# select python package manager:
# currently supported pip and conda
# poetry is used if pip selected and repository contains poetry.lock file
# uv is used if pip selected and repository contains uv.lock file
package_manager: {
# supported options: pip, conda, poetry
# supported options: pip, conda, poetry, uv
type: pip,
# specify pip version to use (examples "<20.2", "==19.3.1", "", empty string will install the latest version)

View File

@@ -1,4 +1,4 @@
FROM ubuntu:18.04
FROM ubuntu:22.04
USER root
WORKDIR /root

View File

@@ -4,7 +4,8 @@ curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip
unzip awscliv2.zip
./aws/install
curl -o kubectl https://amazon-eks.s3-us-west-2.amazonaws.com/1.21.2/2021-07-05/bin/linux/amd64/kubectl
curl -o kubectl https://s3.us-west-2.amazonaws.com/amazon-eks/1.29.3/2024-04-19/bin/linux/amd64/kubectl
#curl -o kubectl https://amazon-eks.s3-us-west-2.amazonaws.com/1.21.2/2021-07-05/bin/linux/amd64/kubectl
#curl -o kubectl https://amazon-eks.s3.us-west-2.amazonaws.com/1.19.6/2021-01-05/bin/linux/amd64/kubectl
chmod +x ./kubectl && mkdir -p $HOME/bin && cp ./kubectl $HOME/bin/kubectl && export PATH=$PATH:$HOME/bin

View File

@@ -1,4 +1,4 @@
FROM ubuntu:18.04
FROM ubuntu:22.04
USER root
WORKDIR /root

View File

@@ -1,6 +1,6 @@
#!/bin/bash
curl -LO https://dl.k8s.io/release/v1.21.0/bin/linux/amd64/kubectl
curl -LO https://dl.k8s.io/release/v1.29.3/bin/linux/amd64/kubectl
install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl

View File

@@ -20,7 +20,7 @@ FROM python:${TAG} as target
WORKDIR /app
ARG KUBECTL_VERSION=1.24.0
ARG KUBECTL_VERSION=1.29.3
# Not sure about these ENV vars
# ENV LC_ALL=en_US.UTF-8

View File

@@ -2,7 +2,7 @@ ARG TAG=3.7.17-slim-bullseye
FROM python:${TAG} as target
ARG KUBECTL_VERSION=1.22.4
ARG KUBECTL_VERSION=1.29.3
WORKDIR /app

View File

@@ -1,4 +1,4 @@
FROM ubuntu:18.04
FROM ubuntu:22.04
USER root
WORKDIR /root

View File

@@ -33,4 +33,10 @@ if [ -z "$CLEARML_AGENT_NO_UPDATE" ]; then
fi
fi
clearml-agent daemon $DAEMON_OPTIONS --queue $QUEUES --docker "${CLEARML_AGENT_DEFAULT_BASE_DOCKER:-$TRAINS_AGENT_DEFAULT_BASE_DOCKER}" --cpu-only ${CLEARML_AGENT_EXTRA_ARGS:-$TRAINS_AGENT_EXTRA_ARGS}
DOCKER_ARGS="--docker \"${CLEARML_AGENT_DEFAULT_BASE_DOCKER:-$TRAINS_AGENT_DEFAULT_BASE_DOCKER}\""
if [ -n "$CLEARML_AGENT_NO_DOCKER" ]; then
DOCKER_ARGS=""
fi
clearml-agent daemon $DAEMON_OPTIONS --queue $QUEUES $DOCKER_ARGS --cpu-only ${CLEARML_AGENT_EXTRA_ARGS:-$TRAINS_AGENT_EXTRA_ARGS}

View File

@@ -74,8 +74,10 @@ agent {
# If Poetry is selected and the root repository contains `poetry.lock` or `pyproject.toml`,
# the "installed packages" section is ignored, and poetry is used.
# If Poetry is selected and no lock file is found, it reverts to "pip" package manager behaviour.
# If uv is selected and the root repository contains `uv.lock` or `pyproject.toml`,
# the "installed packages" section is ignored, and uv is used.
package_manager: {
# supported options: pip, conda, poetry
# supported options: pip, conda, poetry, uv
type: pip,
# specify pip version to use (examples "<20.2", "==19.3.1", "", empty string will install the latest version)
@@ -83,6 +85,8 @@ agent {
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
# poetry_version: "<2",
# poetry_install_extra_args: ["-v"]
# uv_version: ">0.4",
# uv_sync_extra_args: ["--all-extras"]
# virtual environment inheres packages from system
system_site_packages: false,
@@ -308,6 +312,7 @@ agent {
# pip_cache: "/root/.cache/pip"
# poetry_cache: "/root/.cache/pypoetry"
# vcs_cache: "/root/.clearml/vcs-cache"
# venvs_cache: "/root/.clearml/venvs-cache"
# venv_build: "~/.clearml/venvs-builds"
# pip_download: "/root/.clearml/pip-download-cache"
# }

View File

@@ -9,7 +9,9 @@ python-dateutil>=2.4.2,<2.9.0
pyjwt>=2.4.0,<2.9.0
PyYAML>=3.12,<6.1
requests>=2.20.0,<=2.31.0
setuptools ; python_version > '3.11'
six>=1.13.0,<1.17.0
typing>=3.6.4,<3.8.0 ; python_version < '3.5'
urllib3>=1.21.1,<2
virtualenv>=16,<21
pywin32 ; sys_platform == 'win32'

View File

@@ -1,7 +1,7 @@
"""
ClearML - Artificial Intelligence Version Control
ClearML Inc.
CLEARML-AGENT DevOps for machine/deep learning
https://github.com/allegroai/clearml-agent
https://github.com/clearml/clearml-agent
"""
import os.path
@@ -39,9 +39,9 @@ setup(
long_description=long_description,
long_description_content_type='text/markdown',
# The project's main homepage.
url='https://github.com/allegroai/clearml-agent',
author='Allegroai',
author_email='clearml@allegro.ai',
url='https://github.com/clearml/clearml-agent',
author='clearml',
author_email='clearml@clearml.ai',
license='Apache License 2.0',
classifiers=[
'Development Status :: 5 - Production/Stable',
@@ -56,7 +56,6 @@ setup(
'Topic :: Scientific/Engineering :: Image Recognition',
'Topic :: System :: Logging',
'Topic :: System :: Monitoring',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
@@ -64,6 +63,7 @@ setup(
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Programming Language :: Python :: 3.13',
'License :: OSI Approved :: Apache Software License',
],