Compare commits

..

27 Commits

Author SHA1 Message Date
allegroai
d74b9f105b Fix resolving k8s pending queue may cause a queue with a uuid name to be created 2022-09-02 23:45:24 +03:00
allegroai
d0b8eeed15 Add docker ssh_ro_folder (default: "/.ssh") changed docker ssh_folder (default: "~/.ssh") 2022-09-02 23:44:36 +03:00
allegroai
221db3e175 Fix second .ssh temp mount fails if container changes the files inside 2022-09-02 23:43:58 +03:00
allegroai
2c71f9a821 Fix name not escaped as regex (all services "get_all" use regex for name) 2022-09-02 23:43:42 +03:00
allegroai
9006c2d28f Add support for abort callback registration 2022-08-29 18:06:59 +03:00
allegroai
ec216198a0 Add agent.enable_git_ask_pass to improve passing user/pass to git commands 2022-08-29 18:06:26 +03:00
allegroai
fe6adbf110 Fix package @ file:// with quoted (url style) links should not be ignored 2022-08-29 18:06:09 +03:00
allegroai
2693c565ba Fix docker mode use "~/.clearml/venvs-builds" as default for easier user-mode containers 2022-08-29 18:05:53 +03:00
allegroai
9054ea37c2 Fix home folder 2022-08-23 23:16:56 +03:00
allegroai
7292263f86 Add CLEARML_K8S_GLUE_START_AGENT_SCRIPT_PATH to allow customizing the agent startup script location for k8s glue agent 2022-08-23 23:16:36 +03:00
allegroai
f8a6cd697f Add k8s agent debug env var 2022-08-23 23:15:53 +03:00
allegroai
ec9d027678 Add support for MIG devices, use 0:1 for GPU 0 slice 1 (or use 0.1) 2022-08-01 18:58:42 +03:00
allegroai
48a145a8bd Fix messages 2022-08-01 18:57:36 +03:00
allegroai
71d2ab4ce7 Add missing use_credentials_chain to config file example 2022-08-01 18:57:04 +03:00
allegroai
12a8872b27 Fix Python 3.10+ support 2022-08-01 18:56:37 +03:00
allegroai
820ab4dc0c Fix k8s glue debug mode, refactoring 2022-08-01 18:55:49 +03:00
allegroai
1d1ffd17fb Fix README 2022-07-31 19:36:48 +03:00
allegroai
d96b8ff906 Fix template namespace should override default namespace 2022-07-22 22:44:32 +03:00
allegroai
e687418194 Refactor k8s glue template handling 2022-07-22 22:43:07 +03:00
allegroai
a5a797ec5e Version bump to v1.3.0 2022-06-16 23:24:28 +03:00
allegroai
ff6cee4a44 Fix requirements --extra-index-url line with trailing comment
Fix --extra-index-url is added for different command line switches
2022-06-16 23:22:29 +03:00
allegroai
9acbad28f7 Fix repository URL contains credentials even when agent.force_git_ssh_protocolagent.force_git_ssh_protocol is true 2022-06-16 23:20:53 +03:00
allegroai
560e689ccd Fix always make pygobject an optional package (i.e. if installation fails continue the Task package environment setup) 2022-06-16 23:18:55 +03:00
allegroai
f66e42ddb1 Fix optional priority packaged always compare lower case package name 2022-06-16 23:18:31 +03:00
allegroai
d9856d5de5 Add Python 3.10 support 2022-06-16 23:16:06 +03:00
Niels ten Boom
24177cc5a9 Support private repos from requirements.txt file (#107)
* support private repos
* fix double indices
2022-06-15 10:26:24 +03:00
allegroai
178af0dee8 Bump PyJWT version due to "Key confusion through non-blocklisted public key formats" vulnerability 2022-05-25 16:41:26 +03:00
20 changed files with 642 additions and 244 deletions

196
README.md
View File

@@ -9,14 +9,14 @@ ML-Ops scheduler & orchestration solution supporting Linux, macOS and Windows**
[![PyPI pyversions](https://img.shields.io/pypi/pyversions/clearml-agent.svg)](https://img.shields.io/pypi/pyversions/clearml-agent.svg)
[![PyPI version shields.io](https://img.shields.io/pypi/v/clearml-agent.svg)](https://img.shields.io/pypi/v/clearml-agent.svg)
[![PyPI Downloads](https://pepy.tech/badge/clearml-agent/month)](https://pypi.org/project/clearml-agent/)
[![Artifact Hub](https://img.shields.io/endpoint?url=https://artifacthub.io/badge/repository/allegroai)](https://artifacthub.io/packages/search?repo=allegroai)
[![Artifact Hub](https://img.shields.io/endpoint?url=https://artifacthub.io/badge/repository/allegroai)](https://artifacthub.io/packages/search?repo=allegroai)
</div>
---
### ClearML-Agent
#### *Formerly known as Trains Agent*
#### *Formerly known as Trains Agent*
* Run jobs (experiments) on any local or cloud based resource
* Implement optimized resource utilization policies
@@ -24,23 +24,31 @@ ML-Ops scheduler & orchestration solution supporting Linux, macOS and Windows**
* Launch-and-Forget service containers
* [Cloud autoscaling](https://clear.ml/docs/latest/docs/guides/services/aws_autoscaler)
* [Customizable cleanup](https://clear.ml/docs/latest/docs/guides/services/cleanup_service)
* Advanced [pipeline building and execution](https://clear.ml/docs/latest/docs/guides/frameworks/pytorch/notebooks/table/tabular_training_pipeline)
*
Advanced [pipeline building and execution](https://clear.ml/docs/latest/docs/guides/frameworks/pytorch/notebooks/table/tabular_training_pipeline)
It is a zero configuration fire-and-forget execution agent, providing a full ML/DL cluster solution.
**Full Automation in 5 steps**
1. ClearML Server [self-hosted](https://github.com/allegroai/clearml-server) or [free tier hosting](https://app.clear.ml)
2. `pip install clearml-agent` ([install](#installing-the-clearml-agent) the ClearML Agent on any GPU machine: on-premises / cloud / ...)
3. Create a [job](https://github.com/allegroai/clearml/docs/clearml-task.md) or Add [ClearML](https://github.com/allegroai/clearml) to your code with just 2 lines
4. Change the [parameters](#using-the-clearml-agent) in the UI & schedule for [execution](#using-the-clearml-agent) (or automate with an [AutoML pipeline](#automl-and-orchestration-pipelines-))
1. ClearML Server [self-hosted](https://github.com/allegroai/clearml-server)
or [free tier hosting](https://app.clear.ml)
2. `pip install clearml-agent` ([install](#installing-the-clearml-agent) the ClearML Agent on any GPU machine:
on-premises / cloud / ...)
3. Create a [job](https://github.com/allegroai/clearml/docs/clearml-task.md) or
Add [ClearML](https://github.com/allegroai/clearml) to your code with just 2 lines
4. Change the [parameters](#using-the-clearml-agent) in the UI & schedule for [execution](#using-the-clearml-agent) (or
automate with an [AutoML pipeline](#automl-and-orchestration-pipelines-))
5. :chart_with_downwards_trend: :chart_with_upwards_trend: :eyes: :beer:
"All the Deep/Machine-Learning DevOps your research needs, and then some... Because ain't nobody got time for that"
**Try ClearML now** [Self Hosted](https://github.com/allegroai/clearml-server) or [Free tier Hosting](https://app.clear.ml)
**Try ClearML now** [Self Hosted](https://github.com/allegroai/clearml-server)
or [Free tier Hosting](https://app.clear.ml)
<a href="https://app.clear.ml"><img src="https://github.com/allegroai/clearml-agent/blob/master/docs/screenshots.gif?raw=true" width="100%"></a>
### Simple, Flexible Experiment Orchestration
**The ClearML Agent was built to address the DL/ML R&D DevOps needs:**
* Easily add & remove machines from the cluster
@@ -56,20 +64,23 @@ It is a zero configuration fire-and-forget execution agent, providing a full ML/
*epsilon - Because we are :triangular_ruler: and nothing is really zero work
### Kubernetes Integration (Optional)
We think Kubernetes is awesome, but it should be a choice.
We designed `clearml-agent` so you can run bare-metal or inside a pod with any mix that fits your environment.
We think Kubernetes is awesome, but it should be a choice. We designed `clearml-agent` so you can run bare-metal or
inside a pod with any mix that fits your environment.
Find Dockerfiles in the [docker](./docker) dir and a helm Chart in https://github.com/allegroai/clearml-helm-charts
#### Benefits of integrating existing K8s with ClearML-Agent
#### Benefits of integrating existing K8s with ClearML-Agent
- ClearML-Agent adds the missing scheduling capabilities to K8s
- Allowing for more flexible automation from code
- A programmatic interface for easier learning curve (and debugging)
- Seamless integration with ML/DL experiment manager
- Web UI for customization, scheduling & prioritization of jobs
- Web UI for customization, scheduling & prioritization of jobs
**Two K8s integration flavours**
**Two K8s integration flavours**
- Spin ClearML-Agent as a long-lasting service pod
- use [clearml-agent](https://hub.docker.com/r/allegroai/clearml-agent) docker image
- map docker socket into the pod (soon replaced by [podman](https://github.com/containers/podman))
@@ -77,57 +88,66 @@ Find Dockerfiles in the [docker](./docker) dir and a helm Chart in https://githu
- benefits: full use of the ClearML scheduling, no need to worry about wrong container images / lost pods etc.
- downside: Sibling containers
- Kubernetes Glue, map ClearML jobs directly to K8s jobs
- Run the [clearml-k8s glue](https://github.com/allegroai/clearml-agent/blob/master/examples/k8s_glue_example.py) on a K8s cpu node
- The clearml-k8s glue pulls jobs from the ClearML job execution queue and prepares a K8s job (based on provided yaml template)
- Inside the pod itself the clearml-agent will install the job (experiment) environment and spin and monitor the experiment's process
- Run the [clearml-k8s glue](https://github.com/allegroai/clearml-agent/blob/master/examples/k8s_glue_example.py) on
a K8s cpu node
- The clearml-k8s glue pulls jobs from the ClearML job execution queue and prepares a K8s job (based on provided
yaml template)
- Inside the pod itself the clearml-agent will install the job (experiment) environment and spin and monitor the
experiment's process
- benefits: Kubernetes full view of all running jobs in the system
- downside: No real scheduling (k8s scheduler), no docker image verification (post-mortem only)
- downside: No real scheduling (k8s scheduler), no docker image verification (post-mortem only)
### Using the ClearML Agent
**Full scale HPC with a click of a button**
The ClearML Agent is a job scheduler that listens on job queue(s), pulls jobs, sets the job environments, executes the job and monitors its progress.
The ClearML Agent is a job scheduler that listens on job queue(s), pulls jobs, sets the job environments, executes the
job and monitors its progress.
Any 'Draft' experiment can be scheduled for execution by a ClearML agent.
A previously run experiment can be put into 'Draft' state by either of two methods:
* Using the **'Reset'** action from the experiment right-click context menu in the
ClearML UI - This will clear any results and artifacts the previous run had created.
* Using the **'Clone'** action from the experiment right-click context menu in the
ClearML UI - This will create a new 'Draft' experiment with the same configuration as the original experiment.
An experiment is scheduled for execution using the **'Enqueue'** action from the experiment
right-click context menu in the ClearML UI and selecting the execution queue.
* Using the **'Reset'** action from the experiment right-click context menu in the ClearML UI - This will clear any
results and artifacts the previous run had created.
* Using the **'Clone'** action from the experiment right-click context menu in the ClearML UI - This will create a new '
Draft' experiment with the same configuration as the original experiment.
An experiment is scheduled for execution using the **'Enqueue'** action from the experiment right-click context menu in
the ClearML UI and selecting the execution queue.
See [creating an experiment and enqueuing it for execution](#from-scratch).
Once an experiment is enqueued, it will be picked up and executed by a ClearML agent monitoring this queue.
The ClearML UI Workers & Queues page provides ongoing execution information:
- Workers Tab: Monitor you cluster
- Workers Tab: Monitor you cluster
- Review available resources
- Monitor machines statistics (CPU / GPU / Disk / Network)
- Queues Tab:
- Queues Tab:
- Control the scheduling order of jobs
- Cancel or abort job execution
- Move jobs between execution queues
#### What The ClearML Agent Actually Does
The ClearML Agent executes experiments using the following process:
- Create a new virtual environment (or launch the selected docker image)
- Clone the code into the virtual-environment (or inside the docker)
- Install python packages based on the package requirements listed for the experiment
- Special note for PyTorch: The ClearML Agent will automatically select the
torch packages based on the CUDA_VERSION environment variable of the machine
- Execute the code, while monitoring the process
- Log all stdout/stderr in the ClearML UI, including the cloning and installation process, for easy debugging
- Monitor the execution and allow you to manually abort the job using the ClearML UI (or, in the unfortunate case of a code crash, catch the error and signal the experiment has failed)
- Create a new virtual environment (or launch the selected docker image)
- Clone the code into the virtual-environment (or inside the docker)
- Install python packages based on the package requirements listed for the experiment
- Special note for PyTorch: The ClearML Agent will automatically select the torch packages based on the CUDA_VERSION
environment variable of the machine
- Execute the code, while monitoring the process
- Log all stdout/stderr in the ClearML UI, including the cloning and installation process, for easy debugging
- Monitor the execution and allow you to manually abort the job using the ClearML UI (or, in the unfortunate case of a
code crash, catch the error and signal the experiment has failed)
#### System Design & Flow
<img src="https://github.com/allegroai/clearml-agent/blob/master/docs/clearml_architecture.png" width="100%" alt="clearml-architecture">
#### Installing the ClearML Agent
```bash
@@ -137,6 +157,7 @@ pip install clearml-agent
#### ClearML Agent Usage Examples
Full Interface and capabilities are available with
```bash
clearml-agent --help
clearml-agent daemon --help
@@ -148,7 +169,8 @@ clearml-agent daemon --help
clearml-agent init
```
Note: The ClearML Agent uses a cache folder to cache pip packages, apt packages and cloned repositories. The default ClearML Agent cache folder is `~/.clearml`
Note: The ClearML Agent uses a cache folder to cache pip packages, apt packages and cloned repositories. The default
ClearML Agent cache folder is `~/.clearml`
See full details in your configuration file at `~/clearml.conf`
@@ -158,29 +180,36 @@ They are designed to share the same configuration file, see example [here](docs/
#### Running the ClearML Agent
For debug and experimentation, start the ClearML agent in `foreground` mode, where all the output is printed to screen
```bash
clearml-agent daemon --queue default --foreground
```
For actual service mode, all the stdout will be stored automatically into a temporary file (no need to pipe)
Notice: with `--detached` flag, the *clearml-agent* will be running in the background
```bash
clearml-agent daemon --detached --queue default
```
GPU allocation is controlled via the standard OS environment `NVIDIA_VISIBLE_DEVICES` or `--gpus` flag (or disabled with `--cpu-only`).
GPU allocation is controlled via the standard OS environment `NVIDIA_VISIBLE_DEVICES` or `--gpus` flag (or disabled
with `--cpu-only`).
If no flag is set, and `NVIDIA_VISIBLE_DEVICES` variable doesn't exist, all GPU's will be allocated for the `clearml-agent` <br>
If `--cpu-only` flag is set, or `NVIDIA_VISIBLE_DEVICES` is an empty string (""), no gpu will be allocated for the `clearml-agent`
If no flag is set, and `NVIDIA_VISIBLE_DEVICES` variable doesn't exist, all GPU's will be allocated for
the `clearml-agent` <br>
If `--cpu-only` flag is set, or `NVIDIA_VISIBLE_DEVICES` is an empty string (""), no gpu will be allocated for
the `clearml-agent`
Example: spin two agents, one per gpu on the same machine:
Notice: with `--detached` flag, the *clearml-agent* will be running in the background
```bash
clearml-agent daemon --detached --gpus 0 --queue default
clearml-agent daemon --detached --gpus 1 --queue default
```
Example: spin two agents, pulling from dedicated `dual_gpu` queue, two gpu's per agent
```bash
clearml-agent daemon --detached --gpus 0,1 --queue dual_gpu
clearml-agent daemon --detached --gpus 2,3 --queue dual_gpu
@@ -189,23 +218,29 @@ clearml-agent daemon --detached --gpus 2,3 --queue dual_gpu
##### Starting the ClearML Agent in docker mode
For debug and experimentation, start the ClearML agent in `foreground` mode, where all the output is printed to screen
```bash
clearml-agent daemon --queue default --docker --foreground
```
For actual service mode, all the stdout will be stored automatically into a file (no need to pipe)
Notice: with `--detached` flag, the *clearml-agent* will be running in the background
```bash
clearml-agent daemon --detached --queue default --docker
```
Example: spin two agents, one per gpu on the same machine, with default nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04 docker:
Example: spin two agents, one per gpu on the same machine, with default nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04
docker:
```bash
clearml-agent daemon --detached --gpus 0 --queue default --docker nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04
clearml-agent daemon --detached --gpus 1 --queue default --docker nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04
```
Example: spin two agents, pulling from dedicated `dual_gpu` queue, two gpu's per agent, with default nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04 docker:
Example: spin two agents, pulling from dedicated `dual_gpu` queue, two gpu's per agent, with default nvidia/cuda:
10.1-cudnn7-runtime-ubuntu18.04 docker:
```bash
clearml-agent daemon --detached --gpus 0,1 --queue dual_gpu --docker nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04
clearml-agent daemon --detached --gpus 2,3 --queue dual_gpu --docker nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04
@@ -216,55 +251,61 @@ clearml-agent daemon --detached --gpus 2,3 --queue dual_gpu --docker nvidia/cuda
Priority Queues are also supported, example use case:
High priority queue: `important_jobs` Low priority queue: `default`
```bash
clearml-agent daemon --queue important_jobs default
```
The **ClearML Agent** will first try to pull jobs from the `important_jobs` queue, only then it will fetch a job from the `default` queue.
Adding queues, managing job order within a queue and moving jobs between queues, is available using the Web UI, see example on our [free server](https://app.clear.ml/workers-and-queues/queues)
The **ClearML Agent** will first try to pull jobs from the `important_jobs` queue, only then it will fetch a job from
the `default` queue.
Adding queues, managing job order within a queue and moving jobs between queues, is available using the Web UI, see
example on our [free server](https://app.clear.ml/workers-and-queues/queues)
##### Stopping the ClearML Agent
To stop a **ClearML Agent** running in the background, run the same command line used to start the agent with `--stop` appended.
For example, to stop the first of the above shown same machine, single gpu agents:
To stop a **ClearML Agent** running in the background, run the same command line used to start the agent with `--stop`
appended. For example, to stop the first of the above shown same machine, single gpu agents:
```bash
clearml-agent daemon --detached --gpus 0 --queue default --docker nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04 --stop
```
### How do I create an experiment on the ClearML Server? <a name="from-scratch"></a>
* Integrate [ClearML](https://github.com/allegroai/clearml) with your code
* Execute the code on your machine (Manually / PyCharm / Jupyter Notebook)
* As your code is running, **ClearML** creates an experiment logging all the necessary execution information:
- Git repository link and commit ID (or an entire jupyter notebook)
- Git diff (were not saying you never commit and push, but still...)
- Python packages used by your code (including specific versions used)
- Hyper-Parameters
- Input Artifacts
- Git repository link and commit ID (or an entire jupyter notebook)
- Git diff (were not saying you never commit and push, but still...)
- Python packages used by your code (including specific versions used)
- Hyper-Parameters
- Input Artifacts
You now have a 'template' of your experiment with everything required for automated execution
* In the ClearML UI, Right click on the experiment and select 'clone'. A copy of your experiment will be created.
* In the ClearML UI, Right-click on the experiment and select 'clone'. A copy of your experiment will be created.
* You now have a new draft experiment cloned from your original experiment, feel free to edit it
- Change the Hyper-Parameters
- Switch to the latest code base of the repository
- Update package versions
- Select a specific docker image to run in (see docker execution mode section)
- Or simply change nothing to run the same experiment again...
- Change the Hyper-Parameters
- Switch to the latest code base of the repository
- Update package versions
- Select a specific docker image to run in (see docker execution mode section)
- Or simply change nothing to run the same experiment again...
* Schedule the newly created experiment for execution: Right-click the experiment and select 'enqueue'
### ClearML-Agent Services Mode <a name="services"></a>
ClearML-Agent Services is a special mode of ClearML-Agent that provides the ability to launch long-lasting jobs
that previously had to be executed on local / dedicated machines. It allows a single agent to
launch multiple dockers (Tasks) for different use cases. To name a few use cases, auto-scaler service (spinning instances
when the need arises and the budget allows), Controllers (Implementing pipelines and more sophisticated DevOps logic),
Optimizer (such as Hyper-parameter Optimization or sweeping), and Application (such as interactive Bokeh apps for
increased data transparency)
ClearML-Agent Services is a special mode of ClearML-Agent that provides the ability to launch long-lasting jobs that
previously had to be executed on local / dedicated machines. It allows a single agent to launch multiple dockers (Tasks)
for different use cases. To name a few use cases, auto-scaler service (spinning instances when the need arises and the
budget allows), Controllers (Implementing pipelines and more sophisticated DevOps logic), Optimizer (such as
Hyper-parameter Optimization or sweeping), and Application (such as interactive Bokeh apps for increased data
transparency)
ClearML-Agent Services mode will spin **any** task enqueued into the specified queue.
Every task launched by ClearML-Agent Services will be registered as a new node in the system,
providing tracking and transparency capabilities.
Currently clearml-agent in services-mode supports cpu only configuration. ClearML-agent services mode can be launched alongside GPU agents.
ClearML-Agent Services mode will spin **any** task enqueued into the specified queue. Every task launched by
ClearML-Agent Services will be registered as a new node in the system, providing tracking and transparency capabilities.
Currently clearml-agent in services-mode supports cpu only configuration. ClearML-agent services mode can be launched
alongside GPU agents.
```bash
clearml-agent daemon --services-mode --detached --queue services --create-queue --docker ubuntu:18.04 --cpu-only
@@ -272,22 +313,27 @@ clearml-agent daemon --services-mode --detached --queue services --create-queue
**Note**: It is the user's responsibility to make sure the proper tasks are pushed into the specified queue.
### AutoML and Orchestration Pipelines <a name="automl-pipes"></a>
The ClearML Agent can also be used to implement AutoML orchestration and Experiment Pipelines in conjunction with the ClearML package.
Sample AutoML & Orchestration examples can be found in the ClearML [example/automation](https://github.com/allegroai/clearml/tree/master/examples/automation) folder.
The ClearML Agent can also be used to implement AutoML orchestration and Experiment Pipelines in conjunction with the
ClearML package.
Sample AutoML & Orchestration examples can be found in the
ClearML [example/automation](https://github.com/allegroai/clearml/tree/master/examples/automation) folder.
AutoML examples
- [Toy Keras training experiment](https://github.com/allegroai/clearml/blob/master/examples/optimization/hyper-parameter-optimization/base_template_keras_simple.py)
- [Toy Keras training experiment](https://github.com/allegroai/clearml/blob/master/examples/optimization/hyper-parameter-optimization/base_template_keras_simple.py)
- In order to create an experiment-template in the system, this code must be executed once manually
- [Random Search over the above Keras experiment-template](https://github.com/allegroai/clearml/blob/master/examples/automation/manual_random_param_search_example.py)
- This example will create multiple copies of the Keras experiment-template, with different hyper-parameter combinations
- [Random Search over the above Keras experiment-template](https://github.com/allegroai/clearml/blob/master/examples/automation/manual_random_param_search_example.py)
- This example will create multiple copies of the Keras experiment-template, with different hyper-parameter
combinations
Experiment Pipeline examples
- [First step experiment](https://github.com/allegroai/clearml/blob/master/examples/automation/task_piping_example.py)
- [First step experiment](https://github.com/allegroai/clearml/blob/master/examples/automation/task_piping_example.py)
- This example will "process data", and once done, will launch a copy of the 'second step' experiment-template
- [Second step experiment](https://github.com/allegroai/clearml/blob/master/examples/automation/toy_base_task.py)
- [Second step experiment](https://github.com/allegroai/clearml/blob/master/examples/automation/toy_base_task.py)
- In order to create an experiment-template in the system, this code must be executed once manually
### License

View File

@@ -39,6 +39,13 @@
# default false, only the working directory will be added to the PYHTONPATH
# force_git_root_python_path: false
# if set, use GIT_ASKPASS to pass user/pass when cloning / fetch repositories
# it solves passing user/token to git submodules.
# this is a safer way to ensure multiple users using the same repository will
# not accidentally leak credentials
# Only supported on Linux systems, it will be the default in future releases
# enable_git_ask_pass: false
# in docker mode, if container's entrypoint automatically activated a virtual environment
# use the activated virtual environment and install everything there
# set to False to disable, and always create a new venv inheriting from the system_site_packages
@@ -83,7 +90,7 @@
# set the optional priority packages to be installed before the rest of the required packages,
# In case a package installation fails, the package will be ignored,
# and the virtual environment process will continue
# priority_optional_packages: ["pygobject", ]
priority_optional_packages: ["pygobject", ]
# set the post packages to be installed after all the rest of the required packages
# post_packages: ["horovod", ]
@@ -220,16 +227,20 @@
parse_embedded_urls: true
}
# Maximum execution time (in seconds) for Task's abort function call
abort_callback_max_timeout: 1800
# allow to set internal mount points inside the docker,
# especially useful for non-root docker container images.
docker_internal_mounts {
sdk_cache: "/clearml_agent_cache"
apt_cache: "/var/cache/apt/archives"
ssh_folder: "/root/.ssh"
ssh_folder: "~/.ssh"
ssh_ro_folder: "/.ssh"
pip_cache: "/root/.cache/pip"
poetry_cache: "/root/.cache/pypoetry"
vcs_cache: "/root/.clearml/vcs-cache"
venv_build: "/root/.clearml/venvs-builds"
venv_build: "~/.clearml/venvs-builds"
pip_download: "/root/.clearml/pip-download-cache"
}

View File

@@ -347,7 +347,7 @@ class ServiceCommandSection(BaseCommandSection):
except AttributeError:
raise NameResolutionError('Name resolution unavailable for {}'.format(service))
request = request_cls.from_dict(dict(name=name, only_fields=['name', 'id']))
request = request_cls.from_dict(dict(name=re.escape(name), only_fields=['name', 'id']))
# from_dict will ignore unrecognised keyword arguments - not all GetAll's have only_fields
response = getattr(self._session.send_api(request), service)
matches = [db_object for db_object in response if name.lower() == db_object.name.lower()]

View File

@@ -122,7 +122,7 @@ def main():
" Bitbucket: https://support.atlassian.com/bitbucket-cloud/docs/app-passwords/\n"
" GitLab: https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html\n"
)
print('Enter git password token for user \'{}\': '.format(git_user), end='')
print('Enter git personal token for user \'{}\': '.format(git_user), end='')
git_pass = input()
print('Git repository cloning will be using user={} token={}'.format(git_user, git_pass))
else:

View File

@@ -67,8 +67,10 @@ from clearml_agent.definitions import (
ENV_SSH_AUTH_SOCK,
ENV_AGENT_SKIP_PIP_VENV_INSTALL,
ENV_EXTRA_DOCKER_ARGS,
ENV_CUSTOM_BUILD_SCRIPT, ENV_AGENT_SKIP_PYTHON_ENV_INSTALL, WORKING_STANDALONE_DIR,
ENV_CUSTOM_BUILD_SCRIPT,
ENV_AGENT_SKIP_PYTHON_ENV_INSTALL,
WORKING_STANDALONE_DIR,
ENV_DEBUG_INFO,
)
from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
from clearml_agent.errors import (
@@ -406,6 +408,10 @@ class TaskStopSignal(object):
self.worker_id = command.worker_id
self._task_reset_state_counter = 0
self.task_id = task_id
self._support_callback = None
self._active_callback_timestamp = None
self._active_callback_timeout = None
self._abort_callback_max_timeout = float(self.session.config.get('agent.abort_callback_max_timeout', 1800))
def test(self):
# type: () -> TaskStopReason
@@ -423,11 +429,84 @@ class TaskStopSignal(object):
# make sure we break nothing
return TaskStopSignal.default
def _wait_for_abort_callback(self):
if not self._support_callback:
return None
if self._active_callback_timestamp:
if time() - self._active_callback_timestamp < self._active_callback_timeout:
# print("waiting for callback to complete")
self.command.log("waiting for callback to complete")
# check state
cb_completed = None
try:
task_info = self.session.get(
service="tasks", action="get_all", version="2.13", id=[self.task_id],
only_fields=["status", "status_message", "runtime._abort_callback_completed"])
cb_completed = task_info['tasks'][0]['runtime'].get('_abort_callback_completed', None)
except: # noqa
pass
if not bool(cb_completed):
return False
msg = "Task abort callback completed in {:.2f} seconds".format(
time() - self._active_callback_timestamp)
else:
msg = "Task abort callback timed out [timeout: {}, elapsed: {:.2f}]".format(
self._active_callback_timeout, time() - self._active_callback_timestamp)
self.command.send_logs(self.task_id, ["### " + msg + " ###"], session=self.session)
return True
# check if abort callback is turned on
cb_completed = None
# TODO: add retries on network error with timeout
try:
task_info = self.session.get(
service="tasks", action="get_all", version="2.13", id=[self.task_id],
only_fields=["status", "status_message", "runtime._abort_callback_timeout",
"runtime._abort_poll_freq", "runtime._abort_callback_completed"])
abort_timeout = task_info['tasks'][0]['runtime'].get('_abort_callback_timeout', 0)
poll_timeout = task_info['tasks'][0]['runtime'].get('_abort_poll_freq', 0)
cb_completed = task_info['tasks'][0]['runtime'].get('_abort_callback_completed', None)
except: # noqa
abort_timeout = None
poll_timeout = None
if not abort_timeout:
# no callback set we can leave
return None
try:
timeout = min(float(abort_timeout) + float(poll_timeout), self._abort_callback_max_timeout)
except: # noqa
self.command.log("Failed parsing runtime timeout shutdown callback [{}, {}]".format(
abort_timeout, poll_timeout))
return None
self.command.send_logs(
self.task_id,
["### Task abort callback timeout set, waiting for max {} sec ###".format(timeout)],
session=self.session
)
self._active_callback_timestamp = time()
self._active_callback_timeout = timeout
return bool(cb_completed)
def was_abort_function_called(self):
return bool(self._active_callback_timestamp)
def _test(self):
# type: () -> TaskStopReason
"""
"Unsafe" version of test()
"""
if self._support_callback is None:
# test if backend support callback
self._support_callback = self.session.check_min_api_version("2.13")
task_info = get_task(
self.session, self.task_id, only_fields=["status", "status_message"]
)
@@ -439,10 +518,16 @@ class TaskStopSignal(object):
"task status_message has '%s', task will terminate",
self.stopping_message,
)
# actively waiting for task to complete
if self._wait_for_abort_callback() is False:
return TaskStopReason.no_stop
return TaskStopReason.stopped
if status in self.unexpected_statuses: # ## and "worker" not in message:
self.command.log("unexpected status change, task will terminate")
# actively waiting for task to complete
if self._wait_for_abort_callback() is False:
return TaskStopReason.no_stop
return TaskStopReason.status_changed
if status == self.statuses.created:
@@ -451,13 +536,18 @@ class TaskStopSignal(object):
>= self._number_of_consecutive_reset_tests
):
self.command.log("task was reset, task will terminate")
# actively waiting for task to complete
if self._wait_for_abort_callback() is False:
return TaskStopReason.no_stop
return TaskStopReason.reset
self._task_reset_state_counter += 1
warning_msg = "Warning: Task {} was reset! if state is consistent we shall terminate ({}/{}).".format(
self.task_id,
self._task_reset_state_counter,
self._number_of_consecutive_reset_tests,
)
if self.events_service:
self.events_service.send_log_events(
self.worker_id,
@@ -526,6 +616,7 @@ class Worker(ServiceCommandSection):
def __init__(self, *args, **kwargs):
super(Worker, self).__init__(*args, **kwargs)
self._debug_context = ENV_DEBUG_INFO.get()
self.monitor = None
self.log = self._session.get_logger(__name__)
self.register_signal_handler()
@@ -555,6 +646,7 @@ class Worker(ServiceCommandSection):
self.worker_id = self._session.config["agent.worker_id"] or "{}:{}".format(
self._session.config["agent.worker_name"], os.getpid()
)
self.parent_worker_id = None # maybe add os env for overriding
self._last_stats = defaultdict(lambda: 0)
self._last_report_timestamp = psutil.time.time()
self.temp_config_path = None
@@ -943,7 +1035,7 @@ class Worker(ServiceCommandSection):
# update available gpus
if gpu_queues:
available_gpus = self._dynamic_gpu_get_available(gpu_indexes)
# if something went wrong or we have no free gpus
# if something went wrong, or we have no free gpus
# start over from the highest priority queue
if not available_gpus:
if self._daemon_foreground or worker_params.debug:
@@ -1029,7 +1121,7 @@ class Worker(ServiceCommandSection):
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES')
org_gpus = Session.get_nvidia_visible_env()
dynamic_gpus_worker_id = self.worker_id
# the following is only executed in dynamic gpus mode
if gpu_queues and gpu_queues.get(queue):
@@ -1040,10 +1132,10 @@ class Worker(ServiceCommandSection):
available_gpus = available_gpus[gpu_queues.get(queue)[1]:]
self.set_runtime_properties(
key='available_gpus', value=','.join(str(g) for g in available_gpus))
os.environ['CUDA_VISIBLE_DEVICES'] = \
os.environ['NVIDIA_VISIBLE_DEVICES'] = ','.join(str(g) for g in gpus)
Session.set_nvidia_visible_env(gpus)
list_task_gpus_ids.update({str(g): task_id for g in gpus})
self.worker_id = ':'.join(self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)])
self.worker_id = ':'.join(
self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)])
self.send_logs(
task_id=task_id,
@@ -1056,8 +1148,7 @@ class Worker(ServiceCommandSection):
if gpu_queues:
self.worker_id = dynamic_gpus_worker_id
os.environ['CUDA_VISIBLE_DEVICES'] = \
os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus
Session.set_nvidia_visible_env(org_gpus)
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
@@ -1065,7 +1156,7 @@ class Worker(ServiceCommandSection):
runtime_props = None
# if we are using priority start pulling from the first always,
# if we are doing round robin, pull from the next one
# if we are doing roundrobin, pull from the next one
if priority_order:
break
else:
@@ -1097,6 +1188,8 @@ class Worker(ServiceCommandSection):
self._unregister()
def _dynamic_gpu_get_available(self, gpu_indexes):
# cast to string
gpu_indexes = [str(g) for g in gpu_indexes]
# noinspection PyBroadException
try:
response = self._session.send_api(workers_api.GetAllRequest(last_seen=600))
@@ -1111,7 +1204,8 @@ class Worker(ServiceCommandSection):
for w in our_workers:
for g in w.split(':')[-1].lower().replace('gpu', '').split(','):
try:
gpus += [int(g.strip())]
# verify "int.int"
gpus += [str(g).strip()] if float(g.strip()) >= 0 else []
except (ValueError, TypeError):
print("INFO: failed parsing GPU int('{}') - skipping".format(g))
available_gpus = list(set(gpu_indexes) - set(gpus))
@@ -1127,10 +1221,12 @@ class Worker(ServiceCommandSection):
gpus = []
for g in available_gpus[-1].split(','):
try:
gpus += [int(g.strip())]
# verify "int.int"
gpus += [str(g).strip()] if float(g.strip()) >= 0 else []
except (ValueError, TypeError):
print("INFO: failed parsing GPU int('{}') - skipping".format(g))
available_gpus = gpus
if not isinstance(gpu_queues, dict):
gpu_queues = dict(gpu_queues)
@@ -1487,12 +1583,14 @@ class Worker(ServiceCommandSection):
if '-' in gpu_indexes:
gpu_indexes = list(range(int(gpu_indexes.split('-')[0]), 1 + int(gpu_indexes.split('-')[1])))
else:
gpu_indexes = [int(g) for g in gpu_indexes.split(',')]
gpu_indexes = [str(g).replace(":", ".").strip() for g in gpu_indexes.split(',')]
# verify (basically numbers with single "." dot)
gpu_indexes = [str(g) for g in gpu_indexes if float(g) >= 0]
except Exception:
raise ValueError(
'Failed parsing --gpus "{}". '
'--dynamic_gpus must be use with '
'specific gpus for example "0-7" or "0,1,2,3"'.format(kwargs.get('gpus')))
'specific gpus for example "0-7" or "0,1,2,3" or "0:0,0:1,1:0,1:1"'.format(kwargs.get('gpus')))
dynamic_gpus = []
for s in queue_names:
@@ -1719,6 +1817,10 @@ class Worker(ServiceCommandSection):
printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count)
stderr_line_count += report_lines(printed_lines, "stderr")
# make sure that if the abort function was called, the task is marked as aborted
if stop_signal and stop_signal.was_abort_function_called():
stop_reason = TaskStopReason.stopped
return status, stop_reason
def _check_if_internal_agent_started(self, printed_lines, service_mode_internal_agent_started, task_id):
@@ -2024,7 +2126,10 @@ class Worker(ServiceCommandSection):
python_ver = task.script.binary
python_ver = python_ver.split('/')[-1].replace('python', '')
# if we can cast it, we are good
return '{:.1f}'.format(float(python_ver))
return '{}.{}'.format(
int(python_ver.partition(".")[0]),
int(python_ver.partition(".")[-1].partition(".")[0] or 0)
)
except Exception:
pass
@@ -2873,8 +2978,8 @@ class Worker(ServiceCommandSection):
if self._session.debug_mode:
self.log(traceback.format_exc())
def debug(self, message):
if self._session.debug_mode:
def debug(self, message, context=None):
if self._session.debug_mode and (not context or context == self._debug_context):
print("clearml_agent: {}".format(message))
@staticmethod
@@ -3303,7 +3408,7 @@ class Worker(ServiceCommandSection):
mounted_vcs_cache = temp_config.get(
"agent.docker_internal_mounts.vcs_cache", '/root/.clearml/vcs-cache')
mounted_venv_dir = temp_config.get(
"agent.docker_internal_mounts.venv_build", '/root/.clearml/venvs-builds')
"agent.docker_internal_mounts.venv_build", '~/.clearml/venvs-builds')
temp_config.put("sdk.storage.cache.default_base_dir", mounted_cache_dir)
temp_config.put("agent.pip_download_cache.path", mounted_pip_dl_dir)
temp_config.put("agent.vcs_cache.path", mounted_vcs_cache)
@@ -3341,27 +3446,36 @@ class Worker(ServiceCommandSection):
)
def _get_docker_config_cmd(self, temp_config, clean_api_credentials=False, **kwargs):
self.debug("Setting up docker config command")
host_cache = Path(os.path.expandvars(
self._session.config["sdk.storage.cache.default_base_dir"])).expanduser().as_posix()
self.debug("host_cache: {}".format(host_cache))
host_pip_dl = Path(os.path.expandvars(
self._session.config["agent.pip_download_cache.path"])).expanduser().as_posix()
self.debug("host_pip_dl: {}".format(host_pip_dl))
host_vcs_cache = Path(os.path.expandvars(
self._session.config["agent.vcs_cache.path"])).expanduser().as_posix()
self.debug("host_vcs_cache: {}".format(host_vcs_cache))
host_venvs_cache = Path(os.path.expandvars(
self._session.config["agent.venvs_cache.path"])).expanduser().as_posix() \
if self._session.config.get("agent.venvs_cache.path", None) else None
self.debug("host_venvs_cache: {}".format(host_venvs_cache))
host_ssh_cache = self._host_ssh_cache
self.debug("host_ssh_cache: {}".format(host_ssh_cache))
host_apt_cache = Path(os.path.expandvars(self._session.config.get(
"agent.docker_apt_cache", '~/.clearml/apt-cache'))).expanduser().as_posix()
self.debug("host_apt_cache: {}".format(host_apt_cache))
host_pip_cache = Path(os.path.expandvars(self._session.config.get(
"agent.docker_pip_cache", '~/.clearml/pip-cache'))).expanduser().as_posix()
self.debug("host_pip_cache: {}".format(host_pip_cache))
if self.poetry.enabled:
host_poetry_cache = Path(os.path.expandvars(self._session.config.get(
"agent.docker_poetry_cache", '~/.clearml/poetry-cache'))).expanduser().as_posix()
else:
host_poetry_cache = None
self.debug("host_poetry_cache: {}".format(host_poetry_cache))
# make sure all folders are valid
if host_apt_cache:
@@ -3389,8 +3503,16 @@ class Worker(ServiceCommandSection):
shutil.rmtree(host_ssh_cache, ignore_errors=True)
shutil.copytree(Path('~/.ssh').expanduser().as_posix(), host_ssh_cache)
except Exception:
host_ssh_cache = None
self.log.warning('Failed creating temporary copy of ~/.ssh for git credential')
# if we failed to copy / delete, let's see if we
self.log.warning('Failed creating temporary copy of ~/.ssh for git credential, '
'creating a new temp folder')
# noinspection PyBroadException
try:
host_ssh_cache = mkdtemp(prefix='clearml_agent.ssh.')
shutil.copytree(Path('~/.ssh').expanduser().as_posix(), host_ssh_cache)
except Exception:
self.log.warning('Failed creating temporary copy of ~/.ssh for git credential, removing mount!')
host_ssh_cache = None
# check if the .git credentials exist:
try:
@@ -3420,6 +3542,7 @@ class Worker(ServiceCommandSection):
mounted_vcs_cache = temp_config.get("agent.vcs_cache.path")
mounted_venvs_cache = temp_config.get("agent.venvs_cache.path", "")
mount_ssh = temp_config.get("agent.docker_internal_mounts.ssh_folder", None)
mount_ssh_ro = temp_config.get("agent.docker_internal_mounts.ssh_ro_folder", None)
mount_apt_cache = temp_config.get("agent.docker_internal_mounts.apt_cache", None)
mount_pip_cache = temp_config.get("agent.docker_internal_mounts.pip_cache", None)
mount_poetry_cache = temp_config.get("agent.docker_internal_mounts.poetry_cache", None)
@@ -3430,7 +3553,7 @@ class Worker(ServiceCommandSection):
docker_cmd = dict(
worker_id=self.worker_id,
parent_worker_id=self.worker_id,
parent_worker_id=self.parent_worker_id or self.worker_id,
# docker_image=docker_image,
# docker_arguments=docker_arguments,
extra_docker_arguments=self._extra_docker_arguments,
@@ -3451,6 +3574,7 @@ class Worker(ServiceCommandSection):
preprocess_bash_script=preprocess_bash_script,
install_opencv_libs=install_opencv_libs,
mount_ssh=mount_ssh,
mount_ssh_ro=mount_ssh_ro,
mount_apt_cache=mount_apt_cache,
mount_pip_cache=mount_pip_cache,
mount_poetry_cache=mount_poetry_cache,
@@ -3481,9 +3605,8 @@ class Worker(ServiceCommandSection):
return len(output.splitlines()) if output else 0
@classmethod
def _get_docker_cmd(
cls,
self,
worker_id, parent_worker_id,
docker_image, docker_arguments,
python_version,
@@ -3505,18 +3628,19 @@ class Worker(ServiceCommandSection):
auth_token=None,
worker_tags=None,
name=None,
mount_ssh=None, mount_apt_cache=None, mount_pip_cache=None, mount_poetry_cache=None,
mount_ssh=None, mount_ssh_ro=None, mount_apt_cache=None, mount_pip_cache=None, mount_poetry_cache=None,
env_task_id=None,
):
self.debug("Constructing docker command", context="docker")
docker = 'docker'
base_cmd = [docker, 'run', '-t']
update_scheme = ""
dockers_nvidia_visible_devices = 'all'
gpu_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES', None)
gpu_devices = Session.get_nvidia_visible_env()
if gpu_devices is None or gpu_devices.lower().strip() == 'all':
if ENV_DOCKER_SKIP_GPUS_FLAG.get():
dockers_nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES') or \
dockers_nvidia_visible_devices = Session.get_nvidia_visible_env() or \
dockers_nvidia_visible_devices
else:
base_cmd += ['--gpus', 'all', ]
@@ -3524,7 +3648,8 @@ class Worker(ServiceCommandSection):
if ENV_DOCKER_SKIP_GPUS_FLAG.get():
dockers_nvidia_visible_devices = gpu_devices
else:
base_cmd += ['--gpus', '\"device={}\"'.format(gpu_devices), ]
# replace back "." to ":" MIG support
base_cmd += ['--gpus', '\"device={}\"'.format(gpu_devices.replace(".", ":")), ]
# We are using --gpu, so we should not pass NVIDIA_VISIBLE_DEVICES, I think.
# base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES=' + gpu_devices, ]
elif gpu_devices.strip() == 'none':
@@ -3541,8 +3666,10 @@ class Worker(ServiceCommandSection):
base_cmd += [str(a) for a in extra_docker_arguments if a]
# set docker labels
base_cmd += ['-l', cls._worker_label.format(worker_id)]
base_cmd += ['-l', cls._parent_worker_label.format(parent_worker_id)]
base_cmd += ['-l', self._worker_label.format(worker_id)]
base_cmd += ['-l', self._parent_worker_label.format(parent_worker_id)]
self.debug("Command: {}".format(base_cmd), context="docker")
# check if running inside a kubernetes
if ENV_DOCKER_HOST_MOUNT.get() or (os.environ.get('KUBERNETES_SERVICE_HOST') and
@@ -3559,6 +3686,8 @@ class Worker(ServiceCommandSection):
pass
base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES={}'.format(dockers_nvidia_visible_devices)]
self.debug("Running in k8s: {}".format(base_cmd), context="docker")
# check if we need to map host folders
if ENV_DOCKER_HOST_MOUNT.get():
# expect CLEARML_AGENT_K8S_HOST_MOUNT = '/mnt/host/data:/root/.clearml'
@@ -3566,6 +3695,7 @@ class Worker(ServiceCommandSection):
# search and replace all the host folders with the k8s
host_mounts = [host_apt_cache, host_pip_cache, host_poetry_cache, host_pip_dl,
host_cache, host_vcs_cache, host_venvs_cache]
self.debug("Mapping host mounts: {}".format(host_mounts), context="docker")
for i, m in enumerate(host_mounts):
if not m:
continue
@@ -3574,6 +3704,7 @@ class Worker(ServiceCommandSection):
host_mounts[i] = None
else:
host_mounts[i] = m.replace(k8s_pod_mnt, k8s_node_mnt, 1)
self.debug("Mapped host mounts: {}".format(host_mounts), context="docker")
host_apt_cache, host_pip_cache, host_poetry_cache, host_pip_dl, \
host_cache, host_vcs_cache, host_venvs_cache = host_mounts
@@ -3587,6 +3718,8 @@ class Worker(ServiceCommandSection):
except Exception:
raise ValueError('Error: could not copy configuration file into: {}'.format(new_conf_file))
self.debug("Config file target: {}, host: {}".format(new_conf_file, conf_file), context="docker")
if host_ssh_cache:
new_ssh_cache = os.path.join(k8s_pod_mnt, '.clearml_agent.{}.ssh'.format(quote(worker_id, safe="")))
try:
@@ -3595,6 +3728,7 @@ class Worker(ServiceCommandSection):
host_ssh_cache = new_ssh_cache.replace(k8s_pod_mnt, k8s_node_mnt)
except Exception:
raise ValueError('Error: could not copy .ssh directory into: {}'.format(new_ssh_cache))
self.debug("Copied host SSH cache to: {}, host {}".format(new_ssh_cache, host_ssh_cache), context="docker")
base_cmd += ['-e', 'CLEARML_WORKER_ID='+worker_id, ]
# update the docker image, so the system knows where it runs
@@ -3637,6 +3771,12 @@ class Worker(ServiceCommandSection):
# clearml-agent{specify_version}
clearml_agent_wheel = 'clearml-agent{specify_version}'.format(specify_version=specify_version)
mount_ssh = mount_ssh or '/root/.ssh'
mount_ssh_ro = mount_ssh_ro or "{}_ro".format(mount_ssh.rstrip("/"))
mount_apt_cache = mount_apt_cache or '/var/cache/apt/archives'
mount_pip_cache = mount_pip_cache or '/root/.cache/pip'
mount_poetry_cache = mount_poetry_cache or '/root/.cache/pypoetry'
if not standalone_mode:
if not bash_script:
# Find the highest python version installed, or install from apt-get
@@ -3647,6 +3787,7 @@ class Worker(ServiceCommandSection):
"export DEBIAN_FRONTEND=noninteractive",
"export CLEARML_APT_INSTALL=\"$CLEARML_APT_INSTALL{}\"".format(
' libsm6 libxext6 libxrender-dev libglib2.0-0' if install_opencv_libs else ""),
"cp -Rf {mount_ssh_ro} -T {mount_ssh}" if host_ssh_cache else "",
"[ ! -z $(which git) ] || export CLEARML_APT_INSTALL=\"$CLEARML_APT_INSTALL git\"",
"declare LOCAL_PYTHON",
"[ ! -z $LOCAL_PYTHON ] || for i in {{15..5}}; do which {python_single_digit}.$i && " +
@@ -3674,7 +3815,9 @@ class Worker(ServiceCommandSection):
"$LOCAL_PYTHON -m pip install -U {clearml_agent_wheel} ; ").format(
python_single_digit=python_version.split('.')[0],
python=python_version, pip_version=PackageManager.get_pip_version(),
clearml_agent_wheel=clearml_agent_wheel)
clearml_agent_wheel=clearml_agent_wheel,
mount_ssh_ro=mount_ssh_ro, mount_ssh=mount_ssh,
)
if host_git_credentials:
for git_credentials in host_git_credentials:
@@ -3686,16 +3829,20 @@ class Worker(ServiceCommandSection):
for line in docker_bash_setup_script.split('\n') if line.strip()) + \
' ; '
mount_ssh = mount_ssh or '/root/.ssh'
mount_apt_cache = mount_apt_cache or '/var/cache/apt/archives'
mount_pip_cache = mount_pip_cache or '/root/.cache/pip'
mount_poetry_cache = mount_poetry_cache or '/root/.cache/pypoetry'
self.debug(
"Adding mounts: host_ssh_cache={}, host_apt_cache={}, host_pip_cache={}, host_poetry_cache={}, "
"host_pip_dl={}, host_cache={}, host_vcs_cache={}, host_venvs_cache={}".format(
host_ssh_cache, host_apt_cache, host_pip_cache, host_poetry_cache, host_pip_dl, host_cache,
host_vcs_cache, host_venvs_cache,
),
context="docker"
)
base_cmd += (
(['--name', name] if name else []) +
['-v', conf_file+':'+DOCKER_ROOT_CONF_FILE] +
['-e', "CLEARML_CONFIG_FILE={}".format(DOCKER_ROOT_CONF_FILE)] +
(['-v', host_ssh_cache+':'+mount_ssh] if host_ssh_cache else []) +
(['-v', host_ssh_cache+':'+mount_ssh_ro] if host_ssh_cache else []) +
(['-v', host_apt_cache+':'+mount_apt_cache] if host_apt_cache else []) +
(['-v', host_pip_cache+':'+mount_pip_cache] if host_pip_cache else []) +
(['-v', host_poetry_cache + ':'+mount_poetry_cache] if host_poetry_cache else []) +
@@ -3841,6 +3988,9 @@ class Worker(ServiceCommandSection):
unique_worker_id=worker_id, worker_name=worker_name, api_client=self._session.api_client,
allow_double=bool(ENV_DOCKER_HOST_MOUNT.get()) # and bool(self._services_mode),
)
# set the parent ID the first time we have a worker ID (it might change for services-mode / dgpus)
if not self.parent_worker_id:
self.parent_worker_id = self.worker_id
if self.worker_id is None:
error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id))
@@ -3851,8 +4001,8 @@ class Worker(ServiceCommandSection):
def _generate_worker_id_name(self, dynamic_gpus=False):
worker_id = self._session.config["agent.worker_id"]
worker_name = self._session.config["agent.worker_name"]
if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None:
nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES')
if not worker_id and Session.get_nvidia_visible_env() is not None:
nvidia_visible_devices = Session.get_nvidia_visible_env()
if nvidia_visible_devices and nvidia_visible_devices.lower() != 'none':
worker_id = '{}:{}gpu{}'.format(
worker_name, 'd' if dynamic_gpus else '', nvidia_visible_devices)
@@ -3969,6 +4119,13 @@ class Worker(ServiceCommandSection):
if self._session.feature_set == "basic":
raise ValueError("Server does not support --use-owner-token option")
role = self._session.get_decoded_token(self._session.token).get("identity", {}).get("role", None)
if role and role not in ["admin", "root", "system"]:
raise ValueError(
"User role not suitable for --use-owner-token option (requires at least admin,"
" found {})".format(role)
)
if __name__ == "__main__":
pass

View File

@@ -148,6 +148,7 @@ ENV_DOCKER_HOST_MOUNT = EnvironmentConfig('CLEARML_AGENT_K8S_HOST_MOUNT', 'CLEAR
'TRAINS_AGENT_K8S_HOST_MOUNT', 'TRAINS_AGENT_DOCKER_HOST_MOUNT')
ENV_VENV_CACHE_PATH = EnvironmentConfig('CLEARML_AGENT_VENV_CACHE_PATH')
ENV_EXTRA_DOCKER_ARGS = EnvironmentConfig('CLEARML_AGENT_EXTRA_DOCKER_ARGS', type=list)
ENV_DEBUG_INFO = EnvironmentConfig('CLEARML_AGENT_DEBUG_INFO')
ENV_CUSTOM_BUILD_SCRIPT = EnvironmentConfig('CLEARML_AGENT_CUSTOM_BUILD_SCRIPT')
"""

View File

@@ -1,6 +1,9 @@
import os
import re
import warnings
from clearml_agent.definitions import PIP_EXTRA_INDICES
from .requirement import Requirement
@@ -42,9 +45,14 @@ def parse(reqstr, cwd=None):
yield requirement
elif line.startswith('-f') or line.startswith('--find-links') or \
line.startswith('-i') or line.startswith('--index-url') or \
line.startswith('--extra-index-url') or \
line.startswith('--no-index'):
warnings.warn('Private repos not supported. Skipping.')
elif line.startswith('--extra-index-url'):
extra_index = line[len('--extra-index-url'):].strip()
extra_index = re.sub(r"\s+#.*$", "", extra_index) # strip comments
if extra_index and extra_index not in PIP_EXTRA_INDICES:
PIP_EXTRA_INDICES.append(extra_index)
print(f"appended {extra_index} to list of extra pip indices")
continue
elif line.startswith('-Z') or line.startswith('--always-unzip'):
warnings.warn('Unused option --always-unzip. Skipping.')

View File

@@ -0,0 +1,7 @@
from clearml_agent.definitions import EnvironmentConfig
ENV_START_AGENT_SCRIPT_PATH = EnvironmentConfig('CLEARML_K8S_GLUE_START_AGENT_SCRIPT_PATH')
"""
Script path to use when creating the bash script to run the agent inside the scheduled pod's docker container.
Script will be appended to the specified file.
"""

View File

@@ -27,6 +27,8 @@ from clearml_agent.helper.process import get_bash_output
from clearml_agent.helper.resource_monitor import ResourceMonitor
from clearml_agent.interface.base import ObjectID
from .definitions import ENV_START_AGENT_SCRIPT_PATH
class K8sIntegration(Worker):
K8S_PENDING_QUEUE = "k8s_scheduler"
@@ -119,7 +121,7 @@ class K8sIntegration(Worker):
when scheduling a task to run in a pod. Callable can receive an optional pod number and should return
a dictionary of user properties (name and value). Signature is [[Optional[int]], Dict[str,str]]
:param str overrides_yaml: YAML file containing the overrides for the pod (optional)
:param str template_yaml: YAML file containing the template for the pod (optional).
:param str template_yaml: YAML file containing the template for the pod (optional).
If provided the pod is scheduled with kubectl apply and overrides are ignored, otherwise with kubectl run.
:param str clearml_conf_file: clearml.conf file to be use by the pod itself (optional)
:param str extra_bash_init_script: Additional bash script to run before starting the Task inside the container
@@ -128,6 +130,7 @@ class K8sIntegration(Worker):
"""
super(K8sIntegration, self).__init__()
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
self.k8s_pending_queue_id = None
self.kubectl_cmd = kubectl_cmd or self.KUBECTL_RUN_CMD
self.container_bash_script = container_bash_script or self.CONTAINER_BASH_SCRIPT
# Always do system packages, because by we will be running inside a docker
@@ -135,7 +138,8 @@ class K8sIntegration(Worker):
# Add debug logging
if debug:
self.log.logger.disabled = False
self.log.logger.setLevel(logging.INFO)
self.log.logger.setLevel(logging.DEBUG)
self.log.logger.addHandler(logging.StreamHandler())
self.ports_mode = ports_mode
self.num_of_services = num_of_services
self.base_pod_num = base_pod_num
@@ -152,8 +156,7 @@ class K8sIntegration(Worker):
self.pod_requests = []
self.max_pods_limit = max_pods_limit if not self.ports_mode else None
if overrides_yaml:
with open(os.path.expandvars(os.path.expanduser(str(overrides_yaml))), 'rt') as f:
overrides = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
overrides = self._load_template_file(overrides_yaml)
if overrides:
containers = overrides.get('spec', {}).get('containers', [])
for c in containers:
@@ -174,8 +177,7 @@ class K8sIntegration(Worker):
self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers')))
self.overrides_json_string = json.dumps(overrides)
if template_yaml:
with open(os.path.expandvars(os.path.expanduser(str(template_yaml))), 'rt') as f:
self.template_dict = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
self.template_dict = self._load_template_file(template_yaml)
clearml_conf_file = clearml_conf_file or kwargs.get('trains_conf_file')
@@ -194,6 +196,11 @@ class K8sIntegration(Worker):
_check_pod_thread.daemon = True
_check_pod_thread.start()
@staticmethod
def _load_template_file(path):
with open(os.path.expandvars(os.path.expanduser(str(path))), 'rt') as f:
return yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
@staticmethod
def _get_path(d, *path, default=None):
try:
@@ -203,13 +210,27 @@ class K8sIntegration(Worker):
except (IndexError, KeyError):
return default
def _get_kubectl_options(self, command, extra_labels=None):
labels = [self._get_agent_label()] + (list(extra_labels) if extra_labels else [])
return {
"-l": ",".join(labels),
"-n": str(self.namespace),
"-o": "json"
}
def get_kubectl_command(self, command, extra_labels=None):
opts = self._get_kubectl_options(command, extra_labels)
return 'kubectl {command} {opts}'.format(
command=command, opts=" ".join(x for item in opts.items() for x in item)
)
def _monitor_hanging_pods_daemon(self):
last_tasks_msgs = {} # last msg updated for every task
while True:
output = get_bash_output('kubectl get pods -n {namespace} -o=JSON'.format(
namespace=self.namespace
))
kubectl_cmd = self.get_kubectl_command("get pods")
self.log.debug("Detecting hanging pods: {}".format(kubectl_cmd))
output = get_bash_output(kubectl_cmd)
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
try:
output_config = json.loads(output)
@@ -231,6 +252,10 @@ class K8sIntegration(Worker):
if not task_id:
continue
namespace = pod.get('metadata', {}).get('namespace', None)
if not namespace:
continue
task_ids.add(task_id)
msg = None
@@ -250,7 +275,7 @@ class K8sIntegration(Worker):
msg = reason + (" ({})".format(message) if message else "")
if reason == 'ImagePullBackOff':
delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, self.namespace)
delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, namespace)
get_bash_output(delete_pod_cmd)
try:
self._session.api_client.tasks.failed(
@@ -336,13 +361,11 @@ class K8sIntegration(Worker):
return self._agent_label
def _get_number_used_pods(self):
def _get_used_pods(self):
# noinspection PyBroadException
try:
kubectl_cmd_new = "kubectl get pods -l {agent_label} -n {namespace} -o json".format(
agent_label=self._get_agent_label(),
namespace=self.namespace,
)
kubectl_cmd_new = self.get_kubectl_command("get pods")
self.log.debug("Getting used pods: {}".format(kubectl_cmd_new))
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
@@ -350,17 +373,20 @@ class K8sIntegration(Worker):
if not output:
# No such pod exist so we can use the pod_number we found
return 0
return 0, {}
try:
current_pod_count = len(json.loads(output).get("items", []))
except (ValueError, TypeError) as ex:
return -1
items = json.loads(output).get("items", [])
current_pod_count = len(items)
namespaces = {item["metadata"]["namespace"] for item in items}
except (KeyError, ValueError, TypeError, AttributeError) as ex:
print("Failed parsing used pods command response for cleanup: {}".format(ex))
return -1, {}
return current_pod_count
return current_pod_count, namespaces
except Exception as ex:
print('Failed getting number of used pods: {}'.format(ex))
return -2
print('Failed obtaining used pods information: {}'.format(ex))
return -2, {}
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
@@ -369,17 +395,17 @@ class K8sIntegration(Worker):
# push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler
try:
print('Pushing task {} into temporary pending queue'.format(task_id))
res = self._session.api_client.tasks.stop(task_id, force=True)
_ = self._session.api_client.tasks.stop(task_id, force=True)
res = self._session.api_client.tasks.enqueue(
task_id,
queue=self.k8s_pending_queue_name,
queue=self.k8s_pending_queue_id,
status_reason='k8s pending scheduler',
)
if res.meta.result_code != 200:
raise Exception(res.meta.result_msg)
except Exception as e:
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue [{}], error: {}".format(
task_id, self.k8s_pending_queue_name, e))
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue {} [{}], error: {}".format(
task_id, self.k8s_pending_queue_name, self.k8s_pending_queue_id, e))
return
container = get_task_container(self._session, task_id)
@@ -426,39 +452,36 @@ class K8sIntegration(Worker):
pod_number = self.base_pod_num
while self.ports_mode or self.max_pods_limit:
pod_number = self.base_pod_num + pod_count
if self.ports_mode:
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format(
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
agent_label=self._get_agent_label(),
namespace=self.namespace,
)
else:
kubectl_cmd_new = "kubectl get pods -l {agent_label} -n {namespace} -o json".format(
agent_label=self._get_agent_label(),
namespace=self.namespace,
)
kubectl_cmd_new = self.get_kubectl_command(
"get pods",
extra_labels=[self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else None
)
self.log.debug("Looking for a free pod/port: {}".format(kubectl_cmd_new))
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
error = '' if not error else error if isinstance(error, str) else error.decode('utf-8')
if not output:
# No such pod exist so we can use the pod_number we found
try:
items_count = len(json.loads(output).get("items", []))
except (ValueError, TypeError) as ex:
self.log.warning(
"K8S Glue pods monitor: Failed parsing kubectl output:\n{}\ntask '{}' "
"will be enqueued back to queue '{}'\nEx: {}".format(
output, task_id, queue, ex
)
)
self._session.api_client.tasks.stop(task_id, force=True)
self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error')
return
if not items_count:
# No such pod exist so we can use the pod_number we found (result exists but with no items)
break
if self.max_pods_limit:
try:
current_pod_count = len(json.loads(output).get("items", []))
except (ValueError, TypeError) as ex:
self.log.warning(
"K8S Glue pods monitor: Failed parsing kubectl output:\n{}\ntask '{}' "
"will be enqueued back to queue '{}'\nEx: {}".format(
output, task_id, queue, ex
)
)
self._session.api_client.tasks.stop(task_id, force=True)
self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error')
return
current_pod_count = items_count
max_count = self.max_pods_limit
else:
current_pod_count = pod_count
@@ -483,10 +506,9 @@ class K8sIntegration(Worker):
break
pod_count += 1
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + \
[self._get_agent_label()]
labels.append("clearml-agent-queue={}".format(self._safe_k8s_label_value(queue)))
labels.append("clearml-agent-queue-name={}".format(self._safe_k8s_label_value(queue_name)))
labels = self._get_pod_labels(queue, queue_name)
if self.ports_mode:
labels.append(self.LIMIT_POD_LABEL.format(pod_number=pod_number))
if self.ports_mode:
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
@@ -503,8 +525,14 @@ class K8sIntegration(Worker):
queue=queue
)
if self.template_dict:
output, error = self._kubectl_apply(**kubectl_kwargs)
try:
template = self._resolve_template(task_session, task_data, queue)
except Exception as ex:
print("ERROR: Failed resolving template (skipping): {}".format(ex))
return
if template:
output, error = self._kubectl_apply(template=template, **kubectl_kwargs)
else:
output, error = self._kubectl_run(task_data=task_data, **kubectl_kwargs)
@@ -540,6 +568,13 @@ class K8sIntegration(Worker):
**user_props
)
def _get_pod_labels(self, queue, queue_name):
return [
self._get_agent_label(),
"clearml-agent-queue={}".format(self._safe_k8s_label_value(queue)),
"clearml-agent-queue-name={}".format(self._safe_k8s_label_value(queue_name))
]
def _get_docker_args(self, docker_args, flags, target=None, convert=None):
# type: (List[str], Collection[str], Optional[str], Callable[[str], Any]) -> Union[dict, List[str]]
"""
@@ -566,8 +601,16 @@ class K8sIntegration(Worker):
return {target: results} if results else {}
return results
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_id):
template = deepcopy(self.template_dict)
def _kubectl_apply(
self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_id, template=None
):
template = template or deepcopy(self.template_dict)
try:
namespace = template['metadata']['namespace'] or self.namespace
except (KeyError, TypeError, AttributeError):
namespace = self.namespace
template.setdefault('apiVersion', 'v1')
template['kind'] = 'Pod'
template.setdefault('metadata', {})
@@ -604,12 +647,15 @@ class K8sIntegration(Worker):
extra_bash_commands = list(create_clearml_conf or [])
start_agent_script_path = ENV_START_AGENT_SCRIPT_PATH.get() or "~/__start_agent__.sh"
extra_bash_commands.append(
"echo '{}' | base64 --decode >> ~/__start_agent__.sh ; "
"/bin/bash ~/__start_agent__.sh".format(
base64.b64encode(
"echo '{content}' | base64 --decode >> {script_path} ; /bin/bash {script_path}".format(
content=base64.b64encode(
script_encoded.encode('ascii')
).decode('ascii'))
).decode('ascii'),
script_path=start_agent_script_path
)
)
# Notice: we always leave with exit code 0, so pods are never restarted
@@ -638,7 +684,7 @@ class K8sIntegration(Worker):
task_id=task_id,
docker_image=docker_image,
queue_id=queue,
namespace=self.namespace
namespace=namespace
)
# make sure we provide a list
if isinstance(kubectl_cmd, str):
@@ -720,26 +766,29 @@ class K8sIntegration(Worker):
events_service = self.get_service(Events)
# make sure we have a k8s pending queue
# noinspection PyBroadException
try:
self._session.api_client.queues.create(self.k8s_pending_queue_name)
except Exception:
pass
# get queue id
self.k8s_pending_queue_name = self._resolve_name(self.k8s_pending_queue_name, "queues")
if not self.k8s_pending_queue_id:
resolved_ids = self._resolve_queue_names([self.k8s_pending_queue_name], create_if_missing=True)
if not resolved_ids:
raise ValueError(
"Failed resolving or creating k8s pending queue {}".format(self.k8s_pending_queue_name)
)
self.k8s_pending_queue_id = resolved_ids[0]
_last_machine_update_ts = 0
while True:
# Get used pods and namespaces
current_pods, namespaces = self._get_used_pods()
# check if have pod limit, then check if we hit it.
if self.max_pods_limit:
current_pods = self._get_number_used_pods()
if current_pods >= self.max_pods_limit:
print("Maximum pod limit reached {}/{}, sleeping for {:.1f} seconds".format(
current_pods, self.max_pods_limit, self._polling_interval))
# delete old completed / failed pods
get_bash_output(
self.KUBECTL_DELETE_CMD.format(namespace=self.namespace, selector=self._get_agent_label())
)
for namespace in namespaces:
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, selector=self._get_agent_label())
self.log.debug("Deleting old/failed pods due to pod limit: {}".format(kubectl_cmd))
get_bash_output(kubectl_cmd)
# go to sleep
sleep(self._polling_interval)
continue
@@ -747,19 +796,20 @@ class K8sIntegration(Worker):
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed / failed pods
get_bash_output(
self.KUBECTL_DELETE_CMD.format(namespace=self.namespace, selector=self._get_agent_label())
)
for namespace in namespaces:
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, selector=self._get_agent_label())
self.log.debug("Deleting old/failed pods: {}".format(kubectl_cmd))
get_bash_output(kubectl_cmd)
# get next task in queue
try:
response = get_next_task(
self._session, queue=queue, get_task_info=self._impersonate_as_task_owner
)
response = self._get_next_task(queue=queue, get_task_info=self._impersonate_as_task_owner)
except Exception as e:
print("Warning: Could not access task queue [{}], error: {}".format(queue, e))
continue
else:
if not response:
continue
try:
task_id = response["entry"]["task"]
except (KeyError, TypeError, AttributeError):
@@ -820,6 +870,15 @@ class K8sIntegration(Worker):
log_level=logging.INFO, foreground=True, docker=False, **kwargs,
)
def _get_next_task(self, queue, get_task_info):
return get_next_task(
self._session, queue=queue, get_task_info=get_task_info
)
def _resolve_template(self, task_session, task_data, queue):
if self.template_dict:
return deepcopy(self.template_dict)
@classmethod
def get_ssh_server_bash(cls, ssh_port_number):
return ' ; '.join(line.format(port=ssh_port_number) for line in cls.BASH_INSTALL_SSH_CMD)

View File

@@ -95,7 +95,8 @@ class ExternalRequirements(SimpleSubstitution):
vcs._set_ssh_url()
new_req_line = 'git+{}{}{}'.format(
'' if scheme and '://' in vcs.url else scheme,
vcs.url_with_auth, fragment
vcs_url if session.config.get('agent.force_git_ssh_protocol', None) else vcs.url_with_auth,
fragment
)
if new_req_line != req_line:
furl_line = furl(new_req_line)

View File

@@ -16,10 +16,10 @@ class PriorityPackageRequirement(SimpleSubstitution):
# check if we need to replace the packages:
priority_packages = self.config.get('agent.package_manager.priority_packages', None)
if priority_packages:
self.__class__.name = priority_packages
self.__class__.name = [p.lower() for p in priority_packages]
priority_optional_packages = self.config.get('agent.package_manager.priority_optional_packages', None)
if priority_optional_packages:
self.__class__.optional_package_names = priority_optional_packages
self.__class__.optional_package_names = [p.lower() for p in priority_optional_packages]
def match(self, req):
# match both Cython & cython
@@ -32,7 +32,7 @@ class PriorityPackageRequirement(SimpleSubstitution):
"""
self._replaced_packages[req.name] = req.line
if req.name in self.optional_package_names:
if req.name.lower() in self.optional_package_names:
# noinspection PyBroadException
try:
if PackageManager.out_of_scope_install_package(str(req)):

View File

@@ -14,6 +14,7 @@ from pathlib2 import Path
from pyhocon import ConfigTree
import six
from six.moves.urllib.parse import unquote
import logging
from clearml_agent.definitions import PIP_EXTRA_INDICES
from clearml_agent.helper.base import (
@@ -175,11 +176,13 @@ class MarkerRequirement(object):
return
local_path = Path(self.uri[len("file://"):])
if not local_path.exists():
line = self.line
if self.remove_local_file_ref():
# print warning
logging.getLogger(__name__).warning(
'Local file not found [{}], references removed'.format(line))
local_path = Path(unquote(self.uri)[len("file://"):])
if not local_path.exists():
line = self.line
if self.remove_local_file_ref():
# print warning
logging.getLogger(__name__).warning(
'Local file not found [{}], references removed'.format(line))
class SimpleVersion:

View File

@@ -1,7 +1,11 @@
import abc
import os
import re
import shutil
import stat
import subprocess
import sys
import tempfile
from distutils.spawn import find_executable
from hashlib import md5
from os import environ
@@ -23,7 +27,7 @@ from clearml_agent.helper.base import (
rm_tree,
ExecutionInfo,
normalize_path,
create_file_if_not_exists,
create_file_if_not_exists, safe_remove_file,
)
from clearml_agent.helper.os.locks import FileLock
from clearml_agent.helper.process import DEVNULL, Argv, PathLike, COMMAND_SUCCESS
@@ -118,6 +122,13 @@ class VCS(object):
"""
return self.add_auth(self.session.config, self.url)
@property
def url_without_auth(self):
"""
Return URL without configured user/password
"""
return self.add_auth(self.session.config, self.url, reset_auth=True)
@abc.abstractmethod
def executable_name(self):
"""
@@ -349,7 +360,9 @@ class VCS(object):
If not in debug mode, filter VCS password from output.
"""
self._set_ssh_url()
clone_command = ("clone", self.url_with_auth, self.location) + self.clone_flags
# if we are on linux no need for the full auth url because we use GIT_ASKPASS
url = self.url_without_auth if self._use_ask_pass else self.url_with_auth
clone_command = ("clone", url, self.location) + self.clone_flags
# clone all branches regardless of when we want to later checkout
# if branch:
# clone_command += ("-b", branch)
@@ -357,34 +370,35 @@ class VCS(object):
self.call(*clone_command)
return
def normalize_output(result):
"""
Returns result string without user's password.
NOTE: ``self.get_stderr``'s result might or might not have the same type as ``e.output`` in case of error.
"""
string_type = (
ensure_text
if isinstance(result, six.text_type)
else ensure_binary
)
return result.replace(
string_type(self.url),
string_type(furl(self.url).remove(password=True).tostr()),
)
def print_output(output):
print(ensure_text(output))
try:
print_output(normalize_output(self.get_stderr(*clone_command)))
self._print_output(self._normalize_output(self.get_stderr(*clone_command)))
except subprocess.CalledProcessError as e:
# In Python 3, subprocess.CalledProcessError has a `stderr` attribute,
# but since stderr is redirect to `subprocess.PIPE` it will appear in the usual `output` attribute
if e.output:
e.output = normalize_output(e.output)
print_output(e.output)
e.output = self._normalize_output(e.output)
self._print_output(e.output)
raise
def _normalize_output(self, result):
"""
Returns result string without user's password.
NOTE: ``self.get_stderr``'s result might or might not have the same type as ``e.output`` in case of error.
"""
string_type = (
ensure_text
if isinstance(result, six.text_type)
else ensure_binary
)
return result.replace(
string_type(self.url),
string_type(furl(self.url).remove(password=True).tostr()),
)
@staticmethod
def _print_output(output):
print(ensure_text(output))
def checkout(self):
# type: () -> None
"""
@@ -473,10 +487,12 @@ class VCS(object):
return Argv(self.executable_name, *argv)
@classmethod
def add_auth(cls, config, url):
def add_auth(cls, config, url, reset_auth=False):
"""
Add username and password to URL if missing from URL and present in config.
Does not modify ssh URLs.
:param reset_auth: If true remove the user/pass from the URL (default False)
"""
try:
parsed_url = furl(url)
@@ -493,7 +509,10 @@ class VCS(object):
and config_pass
and (not config_domain or config_domain.lower() == parsed_url.host)
):
parsed_url.set(username=config_user, password=config_pass)
if reset_auth:
parsed_url.set(username=None, password=None)
else:
parsed_url.set(username=config_user, password=config_pass)
return parsed_url.url
@abc.abstractmethod
@@ -531,6 +550,10 @@ class Git(VCS):
def __init__(self, *args, **kwargs):
super(Git, self).__init__(*args, **kwargs)
self._use_ask_pass = False if not self.session.config.get('agent.enable_git_ask_pass', None) \
else sys.platform == "linux"
try:
self.call("config", "--global", "--replace-all", "safe.directory", "*", cwd=self.location)
except: # noqa
@@ -558,6 +581,66 @@ class Git(VCS):
def pull(self):
self.call("fetch", "--all", "--recurse-submodules", cwd=self.location)
def _git_pass_auth_wrapper(self, func, *args, **kwargs):
try:
url_with_auth = furl(self.url_with_auth)
password = url_with_auth.password if url_with_auth else None
username = url_with_auth.username if url_with_auth else None
except: # noqa
password = None
username = None
# if this is not linux or we do not have a password, just run as is
if not self._use_ask_pass or not password or not username:
return func(*args, **kwargs)
# create the password file
fp, pass_file = tempfile.mkstemp(prefix='clearml_git_', suffix='.sh')
os.close(fp)
with open(pass_file, 'wt') as f:
# get first letter only (username / password are the argument options)
# then echo the correct information
f.writelines([
'#!/bin/bash\n',
'c="$1"\n',
'c="${c%"${c#?}"}"\n',
'if [ "$c" == "u" ] || [ "$c" == "U" ]; then echo "{}"; else echo "{}"; fi\n'.format(
username.replace('"', '\\"'), password.replace('"', '\\"')
)
])
# mark executable
st = os.stat(pass_file)
os.chmod(pass_file, st.st_mode | stat.S_IEXEC)
# let GIT use it
self.COMMAND_ENV["GIT_ASKPASS"] = pass_file
# call git command
try:
ret = func(*args, **kwargs)
finally:
# delete temp password file
self.COMMAND_ENV.pop("GIT_ASKPASS", None)
safe_remove_file(pass_file)
return ret
def get_stderr(self, *argv, **kwargs):
"""
Wrapper with git password authentication
"""
return self._git_pass_auth_wrapper(super(Git, self).get_stderr, *argv, **kwargs)
def call_with_stdin(self, *argv, **kwargs):
"""
Wrapper with git password authentication
"""
return self._git_pass_auth_wrapper(super(Git, self).call_with_stdin, *argv, **kwargs)
def call(self, *argv, **kwargs):
"""
Wrapper with git password authentication
"""
return self._git_pass_auth_wrapper(super(Git, self).call, *argv, **kwargs)
def checkout(self): # type: () -> None
"""
Checkout repository at specified revision

View File

@@ -82,7 +82,7 @@ class ResourceMonitor(object):
if not worker_tags and ENV_WORKER_TAGS.get():
worker_tags = shlex.split(ENV_WORKER_TAGS.get())
self._worker_tags = worker_tags
if os.environ.get('NVIDIA_VISIBLE_DEVICES') == 'none':
if Session.get_nvidia_visible_env() == 'none':
# NVIDIA_VISIBLE_DEVICES set to none, marks cpu_only flag
# active_gpus == False means no GPU reporting
self._active_gpus = False
@@ -92,10 +92,9 @@ class ResourceMonitor(object):
# None means no filtering, report all gpus
self._active_gpus = None
try:
active_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES', '') or \
os.environ.get('CUDA_VISIBLE_DEVICES', '')
active_gpus = Session.get_nvidia_visible_env() or ""
if active_gpus:
self._active_gpus = [int(g.strip()) for g in active_gpus.split(',')]
self._active_gpus = [g.strip() for g in active_gpus.split(',')]
except Exception:
pass
@@ -263,7 +262,7 @@ class ResourceMonitor(object):
gpu_stat = self._gpustat.new_query()
for i, g in enumerate(gpu_stat.gpus):
# only monitor the active gpu's, if none were selected, monitor everything
if self._active_gpus and i not in self._active_gpus:
if self._active_gpus and str(i) not in self._active_gpus:
continue
stats["gpu_temperature_{:d}".format(i)] = g["temperature.gpu"]
stats["gpu_utilization_{:d}".format(i)] = g["utilization.gpu"]

View File

@@ -22,7 +22,7 @@ WORKER_ARGS = {
'help': 'git username for repository access',
},
'--git-pass': {
'help': 'git password for repository access',
'help': 'git password (personal access tokens) for repository access',
},
'--log-level': {
'help': 'SDK log level',

View File

@@ -76,7 +76,7 @@ class Session(_Session):
cpu_only = kwargs.get('cpu_only')
if cpu_only:
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = 'none'
Session.set_nvidia_visible_env('none')
if kwargs.get('gpus') and not os.environ.get('KUBERNETES_SERVICE_HOST') \
and not os.environ.get('KUBERNETES_PORT'):
@@ -85,7 +85,7 @@ class Session(_Session):
os.environ.pop('CUDA_VISIBLE_DEVICES', None)
os.environ['NVIDIA_VISIBLE_DEVICES'] = kwargs.get('gpus')
else:
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = kwargs.get('gpus')
Session.set_nvidia_visible_env(kwargs.get('gpus'))
if kwargs.get('only_load_config'):
from clearml_agent.backend_api.config import load
@@ -327,6 +327,23 @@ class Session(_Session):
def command(self, *args):
return Argv(*args, log=self.get_logger(Argv.__module__))
@staticmethod
def set_nvidia_visible_env(gpus):
if not gpus:
gpus = ""
visible_env = gpus.replace(".", ":") if isinstance(gpus, str) else \
','.join(str(g).replace(".", ":") for g in gpus)
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = visible_env
@staticmethod
def get_nvidia_visible_env():
visible_env = os.environ.get('NVIDIA_VISIBLE_DEVICES') or os.environ.get('CUDA_VISIBLE_DEVICES')
if visible_env is None:
return None
visible_env = str(visible_env).replace(":", ".")
return visible_env
@attr.s
class TrainsAgentLogger(object):

View File

@@ -1 +1 @@
__version__ = '1.2.4rc3'
__version__ = '1.3.0'

View File

@@ -245,7 +245,7 @@ agent {
# pip_cache: "/root/.cache/pip"
# poetry_cache: "/root/.cache/pypoetry"
# vcs_cache: "/root/.clearml/vcs-cache"
# venv_build: "/root/.clearml/venvs-builds"
# venv_build: "~/.clearml/venvs-builds"
# pip_download: "/root/.clearml/pip-download-cache"
# }
@@ -325,6 +325,11 @@ sdk {
key: ""
secret: ""
region: ""
# Or enable credentials chain to let Boto3 pick the right credentials.
# This includes picking credentials from environment variables,
# credential file and IAM role using metadata service.
# Refer to the latest Boto3 docs
use_credentials_chain: false
credentials: [
# specifies key/secret credentials to use when handling s3 urls (read or write)

View File

@@ -8,7 +8,7 @@ psutil>=3.4.2,<5.9.0
pyhocon>=0.3.38,<0.4.0
pyparsing>=2.0.3,<2.5.0
python-dateutil>=2.4.2,<2.9.0
pyjwt>=1.6.4,<2.1.0
pyjwt>=2.4.0,<2.5.0
PyYAML>=3.12,<5.5.0
requests>=2.20.0,<2.26.0
six>=1.13.0,<1.16.0

View File

@@ -61,6 +61,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'License :: OSI Approved :: Apache Software License',
],