Compare commits

...

88 Commits

Author SHA1 Message Date
allegroai
3a07bfe1d7 Version bump 2021-05-31 23:19:46 +03:00
allegroai
0694b9e8af Fix PyYAML supported versions 2021-05-26 18:33:35 +03:00
allegroai
742cbf5767 Add docker environment arguments log masking support (issue #67) 2021-05-25 19:31:45 +03:00
allegroai
e93384b99b Fix --stop with dynamic gpus 2021-05-20 10:58:46 +03:00
allegroai
3c4e976093 Add agent.ignore_requested_python_version to config file 2021-05-19 15:20:44 +03:00
allegroai
1e795beec8 Fix support for spaces in docker arguments (issue #358) 2021-05-19 15:20:03 +03:00
allegroai
4f7407084d Fix standalone script with pre-exiting conda venv 2021-05-12 15:46:25 +03:00
allegroai
ae3d034531 Protect against None in execution.repository 2021-05-12 15:45:31 +03:00
allegroai
a2db1f5ab5 Remove queue name from pod name in k8s glue, add queue name and ID to pod labels (issue #64) 2021-05-05 12:03:35 +03:00
allegroai
cec6420c8f Version bump to v1.0.0 2021-05-03 18:33:53 +03:00
allegroai
4f18bb7ea0 Add k8s glue default restartPolicy=Never to template to prevent pods from restarting 2021-04-28 13:20:13 +03:00
allegroai
3ec2a3a92e Add k8s pod limit to k8s glue example 2021-04-28 13:19:34 +03:00
allegroai
823b67a3ce Deprecate venv_update (replaced by the more robust venvs_cache) 2021-04-28 13:17:37 +03:00
Revital
24dc59e31f add space to help message 2021-04-27 13:50:44 +03:00
allegroai
08ff5e6db7 Add number of pods limit to k8s glue 2021-04-25 10:47:49 +03:00
allegroai
e60a6f9d14 Fix --stop support for dynamic gpus 2021-04-25 10:46:43 +03:00
Allegro AI
8569c02b33 Merge pull request #58 from pollfly/master
fix --downtime help
2021-04-21 15:27:47 +03:00
Revital
35e714d8d9 fix --downtime help 2021-04-21 09:13:47 +03:00
allegroai
6f8d5710d6 Fix dynamic gpus priority queue 2021-04-20 18:11:59 +03:00
allegroai
a671692832 Fix --services-mode with instance limit 2021-04-20 18:11:36 +03:00
allegroai
5c8675e43a Add support for dynamic gpus opportunistic scheduling (with min/max gpus per queue) 2021-04-20 18:11:16 +03:00
allegroai
60a58f6fad Fix poetry support (issue #57) 2021-04-14 11:22:07 +03:00
allegroai
948fc4c6ce Add python 3.9 to the support table 2021-04-12 23:01:40 +03:00
allegroai
5be5f3209d Fix documentation links 2021-04-12 23:01:22 +03:00
allegroai
537b67e0cd Fix agent can return non-zero error code and pods will end up restarting forever (issue #56) 2021-04-12 23:00:59 +03:00
allegroai
82c5e55fe4 Fix usage of not_set in k8s template merge 2021-04-07 21:30:13 +03:00
allegroai
5f0d51d485 Add documentation for agent.docker_install_opencv_libs 2021-04-07 18:48:30 +03:00
allegroai
945dd816ad Fix no docker arguments 2021-04-07 18:47:13 +03:00
allegroai
45009e6cc2 Add support for updating back docker on new API v2.13 2021-04-07 18:46:58 +03:00
allegroai
8eace6d57b Bump virtualenv dependency version 2021-04-07 18:46:35 +03:00
allegroai
3774fa6abd Add support for new container base setup script feature 2021-04-07 18:46:14 +03:00
allegroai
e71e6865d2 Add agent.docker_install_opencv_libs (default: True) to enable auto opencv libs install for faster docker spin-up 2021-04-07 18:45:44 +03:00
allegroai
0e8f1528b1 Remove redundant py2 code 2021-04-07 18:44:59 +03:00
allegroai
c331babf51 Add stopping message on Task process termination
Fix --stop on dynamic gpus venv mode
2021-04-07 18:44:33 +03:00
allegroai
c59d268995 Fix venv cache crash on bad symbolic links 2021-04-07 18:44:11 +03:00
allegroai
9e9fcb0ba9 Add dynamic mode terminate dockers on sig_term 2021-04-07 18:43:44 +03:00
allegroai
f33e0b2f78 Verify docker command exists when running in docker mode 2021-04-07 18:42:27 +03:00
allegroai
0e4b99351f Add --stop support for dynamic gpus
Fix --stop mark tasks as aborted (not failed as before)
2021-04-07 18:42:10 +03:00
allegroai
81edd2860f Fix --dynamic-gpus should keep original queue priority order 2021-03-31 23:55:12 +03:00
allegroai
14ac584577 Support k8s glue container env vars merging 2021-03-31 23:53:58 +03:00
allegroai
9ce6baf074 Fix broken k8s glue docker args parsing
Fix empty env prevents override when merging template
2021-03-26 12:26:15 +03:00
allegroai
92a1e07b33 Fix local path replace back when using cache 2021-03-26 12:16:05 +03:00
allegroai
cb6bdece39 Fix cuda version from driver does not return minor version 2021-03-18 10:07:59 +02:00
allegroai
2ea38364bb Change the default conda channel order, so it pulls the correct pytorch 2021-03-18 10:07:58 +02:00
allegroai
cf6fdc0d81 Add support for PyJWT v2 2021-03-18 10:07:58 +02:00
allegroai
91eec99563 Add conda debug prints (--debug) 2021-03-18 10:07:58 +02:00
allegroai
f8cbaa9a06 documentation 2021-03-18 03:05:26 +02:00
allegroai
d9b9b4984b Version bump to v0.17.2 2021-03-04 20:12:50 +02:00
allegroai
8a46dc6b03 Update default_docker in docs 2021-03-04 20:07:34 +02:00
allegroai
205f9dd816 Fix k8s glue does not pass docker environment variables
Remove deprecated flags
2021-03-03 15:07:06 +02:00
allegroai
9dfa1294e2 Add agent.enable_task_env set the OS environment based on the Environment section of the Task. 2021-02-28 19:47:44 +02:00
allegroai
f019905720 Fix venv cache support for local folders 2021-02-28 19:47:09 +02:00
allegroai
9c257858dd Fix venv cache support for local folders 2021-02-23 18:54:38 +02:00
allegroai
2006ab20dd Fix conda support for git+http links 2021-02-23 12:46:06 +02:00
allegroai
0caf31719c Fix venv caching always reinstall git repositories and local repositories 2021-02-23 12:45:34 +02:00
allegroai
5da7184276 Add agent.ignore_requested_python_version (control for multi python environments) 2021-02-23 12:45:00 +02:00
allegroai
50fccdab96 PEP8 2021-02-23 12:44:26 +02:00
allegroai
77d6ff6630 Fix docker mode without venvs cache dir 2021-02-17 00:04:07 +02:00
allegroai
99614702ea Add missing default configuration value 2021-02-17 00:03:42 +02:00
allegroai
58cb344ee6 Upgrade pynvml add detect CUDA version from driver level 2021-02-17 00:03:16 +02:00
allegroai
22d5892b12 Use shared git cache between multiple agents on the same machine 2021-02-14 13:49:29 +02:00
allegroai
f619969efc Add venvs_cache configuration 2021-02-14 13:48:57 +02:00
allegroai
ca242424ab Fix service-mode support for venvs
Fix --services-mode with venvs
2021-02-14 13:45:17 +02:00
allegroai
407deb84e9 Fix multi instances on Windows 2021-02-14 13:44:39 +02:00
allegroai
14589aa094 Fix CPU mode 2021-02-14 13:44:00 +02:00
allegroai
1260e3d942 Update cache entries on conda package manager 2021-02-11 14:47:26 +02:00
allegroai
b22d926d94 Fix cache to take cuda version into account 2021-02-11 14:47:05 +02:00
allegroai
410cc8c7be Add --dynamic-gpus and limit in --services-mode 2021-02-11 14:46:37 +02:00
allegroai
784c676f5b Fix "from clearml" runtime diff patching (make sure we move it to after all the __future__ imports) include handling triple quotes in comments 2021-02-11 14:46:06 +02:00
allegroai
296f7970df Fix file not found error (no 2) interpreted as aborted (i.e. ctrl-c) 2021-02-11 14:44:54 +02:00
allegroai
cd59933c9c Remove unused packages 2021-02-11 14:44:35 +02:00
allegroai
b95d3f5300 Add venv caching with docker mode support 2021-02-11 14:44:19 +02:00
allegroai
fa0d5d8469 Fix --detached not supported on Windows, ignore and issue warning 2021-02-11 14:40:09 +02:00
allegroai
8229843018 Add base-pod-number parameter to k8s glue and example 2021-01-26 20:00:18 +02:00
allegroai
c578b37c6d Change dump configuration and ssh on every docker run 2021-01-24 08:48:10 +02:00
allegroai
8ea062c0bd Fix environment variables CLEARML_WEB_HOST/CLEARML_FILES_HOST not passed to running tasks (or updated on the config object) 2021-01-24 08:47:33 +02:00
allegroai
5d8bbde434 Fix applying git diff on new added file 2021-01-24 08:46:42 +02:00
allegroai
0462af6a3d Allow providing namespace in k8s glue and k8s glue example 2021-01-20 19:01:03 +02:00
allegroai
5a94a4048e Update agent and services docker files 2021-01-18 11:40:11 +02:00
allegroai
2602301e1d Improve agent.extra_docker_arguments documentation 2021-01-10 12:40:24 +02:00
allegroai
161993f66f Add agent.force_git_ssh_user configuration value (issue #42)
Change default docker to nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04
2021-01-10 12:38:45 +02:00
allegroai
b7f87fb8d3 Detect and delete "stuck" k8s pods k8s glue 2021-01-10 12:37:13 +02:00
allegroai
8fdb87f1f5 Fix docker --network returns None 2020-12-30 16:57:04 +02:00
Allegro AI
a9a68d230e Update README.md 2020-12-25 04:23:12 +02:00
allegroai
a1f2941ffd version bump 2020-12-25 02:10:06 +02:00
allegroai
c548eeacfc status stable 2020-12-25 02:09:54 +02:00
allegroai
428781af86 Fix support for Windows pip and Conda requirements.txt 2020-12-25 02:06:40 +02:00
Allegro AI
72efe2e9fe Update README.md 2020-12-23 01:42:10 +02:00
37 changed files with 4214 additions and 549 deletions

View File

@@ -5,10 +5,9 @@
**ClearML Agent - ML-Ops made easy
ML-Ops scheduler & orchestration solution supporting Linux, macOS and Windows**
[![GitHub license](https://img.shields.io/github/license/allegroai/trains-agent.svg)](https://img.shields.io/github/license/allegroai/trains-agent.svg)
[![GitHub license](https://img.shields.io/github/license/allegroai/clearml-agent.svg)](https://img.shields.io/github/license/allegroai/clearml-agent.svg)
[![PyPI pyversions](https://img.shields.io/pypi/pyversions/clearml-agent.svg)](https://img.shields.io/pypi/pyversions/clearml-agent.svg)
[![PyPI version shields.io](https://img.shields.io/pypi/v/clearml-agent.svg)](https://img.shields.io/pypi/v/clearml-agent.svg)
[![PyPI status](https://img.shields.io/pypi/status/clearml-agent.svg)](https://pypi.python.org/pypi/clearml-agent/)
</div>
@@ -22,23 +21,23 @@ ML-Ops scheduler & orchestration solution supporting Linux, macOS and Windows**
* Implement optimized resource utilization policies
* Deploy execution environments with either virtualenv or fully docker containerized with zero effort
* Launch-and-Forget service containers
* [Cloud autoscaling](https://allegro.ai/clearml/docs/examples/services/aws_autoscaler/aws_autoscaler/)
* [Customizable cleanup](https://allegro.ai/clearml/docs/examples/services/cleanup/cleanup_service/)
* Advanced [pipeline building and execution](https://allegro.ai/clearml/docs/examples/frameworks/pytorch/notebooks/table/tabular_training_pipeline/)
* [Cloud autoscaling](https://allegro.ai/clearml/docs/docs/examples/services/aws_autoscaler/aws_autoscaler.html)
* [Customizable cleanup](https://allegro.ai/clearml/docs/docs/examples/services/cleanup/cleanup_service.html)
* Advanced [pipeline building and execution](https://allegro.ai/clearml/docs/docs/examples/frameworks/pytorch/notebooks/table/tabular_training_pipeline.html)
It is a zero configuration fire-and-forget execution agent, providing a full ML/DL cluster solution.
**Full Automation in 5 steps**
1. ClearML Server [self-hosted](https://github.com/allegroai/trains-server) or [free tier hosting](https://app.community.clear.ml)
1. ClearML Server [self-hosted](https://github.com/allegroai/clearml-server) or [free tier hosting](https://app.community.clear.ml)
2. `pip install clearml-agent` ([install](#installing-the-clearml-agent) the ClearML Agent on any GPU machine: on-premises / cloud / ...)
3. Create a [job](https://github.com/allegroai/clearml/docs/clearml-task.md) or Add [ClearML](https://github.com/allegroai/trains) to your code with just 2 lines
3. Create a [job](https://github.com/allegroai/clearml/docs/clearml-task.md) or Add [ClearML](https://github.com/allegroai/clearml) to your code with just 2 lines
4. Change the [parameters](#using-the-clearml-agent) in the UI & schedule for [execution](#using-the-clearml-agent) (or automate with an [AutoML pipeline](#automl-and-orchestration-pipelines-))
5. :chart_with_downwards_trend: :chart_with_upwards_trend: :eyes: :beer:
"All the Deep/Machine-Learning DevOps your research needs, and then some... Because ain't nobody got time for that"
**Try ClearML now** [Self Hosted](https://github.com/allegroai/trains-server) or [Free tier Hosting](https://app.community.clear.ml)
<a href="https://app.community.clear.ml"><img src="https://raw.githubusercontent.com/allegroai/trains-agent/9f1e86c1ca45c984ee13edc9353c7b10c55d7257/docs/screenshots.gif" width="100%"></a>
**Try ClearML now** [Self Hosted](https://github.com/allegroai/clearml-server) or [Free tier Hosting](https://app.community.clear.ml)
<a href="https://app.community.clear.ml"><img src="https://github.com/allegroai/clearml/blob/master/docs/webapp_screenshots.gif?raw=true" width="100%"></a>
### Simple, Flexible Experiment Orchestration
**The ClearML Agent was built to address the DL/ML R&D DevOps needs:**
@@ -69,13 +68,13 @@ We designed `clearml-agent` so you can run bare-metal or inside a pod with any m
**Two K8s integration flavours**
- Spin ClearML-Agent as a long-lasting service pod
- use [clearml-agent](https://hub.docker.com/r/allegroai/trains-agent) docker image
- use [clearml-agent](https://hub.docker.com/r/allegroai/clearml-agent) docker image
- map docker socket into the pod (soon replaced by [podman](https://github.com/containers/podman))
- allow the clearml-agent to manage sibling dockers
- benefits: full use of the ClearML scheduling, no need to worry about wrong container images / lost pods etc.
- downside: Sibling containers
- Kubernetes Glue, map ClearML jobs directly to K8s jobs
- Run the [clearml-k8s glue](https://github.com/allegroai/trains-agent/blob/master/examples/k8s_glue_example.py) on a K8s cpu node
- Run the [clearml-k8s glue](https://github.com/allegroai/clearml-agent/blob/master/examples/k8s_glue_example.py) on a K8s cpu node
- The clearml-k8s glue pulls jobs from the ClearML job execution queue and prepares a K8s job (based on provided yaml template)
- Inside the pod itself the clearml-agent will install the job (experiment) environment and spin and monitor the experiment's process
- benefits: Kubernetes full view of all running jobs in the system
@@ -123,7 +122,7 @@ The ClearML Agent executes experiments using the following process:
#### System Design & Flow
<img src="https://allegro.ai/clearml/docs/img/ClearML_Architecture.png" width="100%" alt="clearml-architecture">
<img src="https://allegro.ai/clearml/docs/_images/ClearML_Architecture.png" width="100%" alt="clearml-architecture">
#### Installing the ClearML Agent
@@ -197,16 +196,16 @@ Notice: with `--detached` flag, the *clearml-agent* will be running in the backg
clearml-agent daemon --detached --queue default --docker
```
Example: spin two agents, one per gpu on the same machine, with default nvidia/cuda docker:
Example: spin two agents, one per gpu on the same machine, with default nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04 docker:
```bash
clearml-agent daemon --detached --gpus 0 --queue default --docker nvidia/cuda
clearml-agent daemon --detached --gpus 1 --queue default --docker nvidia/cuda
clearml-agent daemon --detached --gpus 0 --queue default --docker nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04
clearml-agent daemon --detached --gpus 1 --queue default --docker nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04
```
Example: spin two agents, pulling from dedicated `dual_gpu` queue, two gpu's per agent, with default nvidia/cuda docker:
Example: spin two agents, pulling from dedicated `dual_gpu` queue, two gpu's per agent, with default nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04 docker:
```bash
clearml-agent daemon --detached --gpus 0,1 --queue dual_gpu --docker nvidia/cuda
clearml-agent daemon --detached --gpus 2,3 --queue dual_gpu --docker nvidia/cuda
clearml-agent daemon --detached --gpus 0,1 --queue dual_gpu --docker nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04
clearml-agent daemon --detached --gpus 2,3 --queue dual_gpu --docker nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04
```
##### Starting the ClearML Agent - Priority Queues
@@ -226,11 +225,11 @@ Adding queues, managing job order within a queue and moving jobs between queues,
To stop a **ClearML Agent** running in the background, run the same command line used to start the agent with `--stop` appended.
For example, to stop the first of the above shown same machine, single gpu agents:
```bash
clearml-agent daemon --detached --gpus 0 --queue default --docker nvidia/cuda --stop
clearml-agent daemon --detached --gpus 0 --queue default --docker nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04 --stop
```
### How do I create an experiment on the ClearML Server? <a name="from-scratch"></a>
* Integrate [ClearML](https://github.com/allegroai/trains) with your code
* Integrate [ClearML](https://github.com/allegroai/clearml) with your code
* Execute the code on your machine (Manually / PyCharm / Jupyter Notebook)
* As your code is running, **ClearML** creates an experiment logging all the necessary execution information:
- Git repository link and commit ID (or an entire jupyter notebook)
@@ -274,18 +273,18 @@ clearml-agent daemon --services-mode --detached --queue services --create-queue
### AutoML and Orchestration Pipelines <a name="automl-pipes"></a>
The ClearML Agent can also be used to implement AutoML orchestration and Experiment Pipelines in conjunction with the ClearML package.
Sample AutoML & Orchestration examples can be found in the ClearML [example/automation](https://github.com/allegroai/trains/tree/master/examples/automation) folder.
Sample AutoML & Orchestration examples can be found in the ClearML [example/automation](https://github.com/allegroai/clearml/tree/master/examples/automation) folder.
AutoML examples
- [Toy Keras training experiment](https://github.com/allegroai/trains/blob/master/examples/optimization/hyper-parameter-optimization/base_template_keras_simple.py)
- [Toy Keras training experiment](https://github.com/allegroai/clearml/blob/master/examples/optimization/hyper-parameter-optimization/base_template_keras_simple.py)
- In order to create an experiment-template in the system, this code must be executed once manually
- [Random Search over the above Keras experiment-template](https://github.com/allegroai/trains/blob/master/examples/automation/manual_random_param_search_example.py)
- [Random Search over the above Keras experiment-template](https://github.com/allegroai/clearml/blob/master/examples/automation/manual_random_param_search_example.py)
- This example will create multiple copies of the Keras experiment-template, with different hyper-parameter combinations
Experiment Pipeline examples
- [First step experiment](https://github.com/allegroai/trains/blob/master/examples/automation/task_piping_example.py)
- [First step experiment](https://github.com/allegroai/clearml/blob/master/examples/automation/task_piping_example.py)
- This example will "process data", and once done, will launch a copy of the 'second step' experiment-template
- [Second step experiment](https://github.com/allegroai/trains/blob/master/examples/automation/toy_base_task.py)
- [Second step experiment](https://github.com/allegroai/clearml/blob/master/examples/automation/toy_base_task.py)
- In order to create an experiment-template in the system, this code must be executed once manually
### License

View File

@@ -19,11 +19,16 @@
force_git_ssh_protocol: false
# Force a specific SSH port when converting http to ssh links (the domain is kept the same)
# force_git_ssh_port: 0
# Force a specific SSH username when converting http to ssh links (the default username is 'git')
# force_git_ssh_user: git
# Set the python version to use when creating the virtual environment and launching the experiment
# Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6"
# The default is the python executing the clearml_agent
python_binary: ""
# ignore any requested python version (Default: False, if a Task was using a
# specific python version and the system supports multiple python the agent will use the requested python version)
# ignore_requested_python_version: true
# select python package manager:
# currently supported pip and conda
@@ -45,7 +50,7 @@
# extra_index_url: ["https://allegroai.jfrog.io/clearmlai/api/pypi/public/simple"]
# additional conda channels to use when installing with conda package manager
conda_channels: ["defaults", "conda-forge", "pytorch", ]
conda_channels: ["pytorch", "conda-forge", "defaults", ]
# If set to true, Task's "installed packages" are ignored,
# and the repository's "requirements.txt" is used instead
@@ -75,6 +80,16 @@
# target folder for virtual environments builds, created when executing experiment
venvs_dir = ~/.clearml/venvs-builds
# cached virtual environment folder
venvs_cache: {
# maximum number of cached venvs
max_entries: 10
# minimum required free space to allow for cache entry, disable by passing 0 or negative value
free_space_threshold_gb: 2.0
# unmark to enable virtual environment caching
# path: ~/.clearml/venvs-cache
},
# cached git clone folder
vcs_cache: {
enabled: true,
@@ -109,6 +124,11 @@
# optional shell script to run in docker when started before the experiment is started
# extra_docker_shell_script: ["apt-get install -y bindfs", ]
# Install the required packages for opencv libraries (libsm6 libxext6 libxrender-dev libglib2.0-0),
# for backwards compatibility reasons, true as default,
# change to false to skip installation and decrease docker spin up time
# docker_install_opencv_libs: true
# optional uptime configuration, make sure to use only one of 'uptime/downtime' and not both.
# If uptime is specified, agent will actively poll (and execute) tasks in the time-spans defined here.
# Outside of the specified time-spans, the agent will be idle.
@@ -131,12 +151,15 @@
default_docker: {
# default docker image to use when running in docker mode
image: "nvidia/cuda:10.1-runtime-ubuntu18.04"
image: "nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04"
# optional arguments to pass to docker image
# arguments: ["--ipc=host", ]
}
# set the OS environments based on the Task's Environment section before launching the Task process.
enable_task_env: false
# set the initial bash script to execute at the startup of any docker.
# all lines will be executed regardless of their exit code.
# {python_single_digit} is translated to 'python3' or 'python2' according to requested python version
@@ -162,4 +185,16 @@
# should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
# cuda_version: 10.1
# cudnn_version: 7.6
# Hide docker environment variables containing secrets when printing out the docker command by replacing their
# values with "********". Turning this feature on will hide the following environment variables values:
# CLEARML_API_SECRET_KEY, CLEARML_AGENT_GIT_PASS, AWS_SECRET_ACCESS_KEY, AZURE_STORAGE_KEY
# To include more environment variables, add their keys to the "extra_keys" list. E.g. to make sure the value of
# your custom environment variable named MY_SPECIAL_PASSWORD will not show in the logs when included in the
# docker command, set:
# extra_keys: ["MY_SPECIAL_PASSWORD"]
hide_docker_command_env_vars {
enabled: true
extra_keys: []
}
}

View File

@@ -155,7 +155,7 @@ class Session(TokenManager):
# update api version from server response
try:
token_dict = jwt.decode(self.token, verify=False)
token_dict = TokenManager.get_decoded_token(self.token, verify=False)
api_version = token_dict.get('api_version')
if not api_version:
api_version = '2.2' if token_dict.get('env', '') == 'prod' else Session.api_version

View File

@@ -3,6 +3,7 @@ from abc import ABCMeta, abstractmethod
from time import time
import jwt
from jwt.algorithms import get_default_algorithms
import six
@@ -66,10 +67,18 @@ class TokenManager(object):
pass
return 0
@classmethod
def get_decoded_token(cls, token, verify=False):
""" Get token expiration time. If not present, assume forever """
return jwt.decode(
token, verify=verify,
options=dict(verify_signature=False),
algorithms=get_default_algorithms())
@classmethod
def _get_token_exp(cls, token):
""" Get token expiration time. If not present, assume forever """
return jwt.decode(token, verify=False).get('exp', sys.maxsize)
return cls.get_decoded_token(token).get('exp', sys.maxsize)
def _set_token(self, token):
if token:

View File

@@ -6,16 +6,9 @@ import requests
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from urllib3 import PoolManager
import six
from .session.defs import ENV_HOST_VERIFY_CERT
if six.PY3:
from functools import lru_cache
elif six.PY2:
# python 2 support
from backports.functools_lru_cache import lru_cache
__disable_certificate_verification_warning = 0

View File

@@ -55,7 +55,7 @@ def backward_compatibility_support():
continue
# set OS environ:
keys = environ.keys()
keys = list(environ.keys())
for k in keys:
if not k.startswith('CLEARML_'):
continue

File diff suppressed because it is too large Load Diff

View File

@@ -62,14 +62,18 @@ class EnvironmentConfig(object):
return None
ENV_AGENT_SECRET_KEY = EnvironmentConfig("CLEARML_API_SECRET_KEY", "TRAINS_API_SECRET_KEY")
ENV_AWS_SECRET_KEY = EnvironmentConfig("AWS_SECRET_ACCESS_KEY")
ENV_AZURE_ACCOUNT_KEY = EnvironmentConfig("AZURE_STORAGE_KEY")
ENVIRONMENT_CONFIG = {
"api.api_server": EnvironmentConfig("CLEARML_API_HOST", "TRAINS_API_HOST", ),
"api.files_server": EnvironmentConfig("CLEARML_FILES_HOST", "TRAINS_FILES_HOST", ),
"api.web_server": EnvironmentConfig("CLEARML_WEB_HOST", "TRAINS_WEB_HOST", ),
"api.credentials.access_key": EnvironmentConfig(
"CLEARML_API_ACCESS_KEY", "TRAINS_API_ACCESS_KEY",
),
"api.credentials.secret_key": EnvironmentConfig(
"CLEARML_API_SECRET_KEY", "TRAINS_API_SECRET_KEY",
),
"api.credentials.secret_key": ENV_AGENT_SECRET_KEY,
"agent.worker_name": EnvironmentConfig("CLEARML_WORKER_NAME", "TRAINS_WORKER_NAME", ),
"agent.worker_id": EnvironmentConfig("CLEARML_WORKER_ID", "TRAINS_WORKER_ID", ),
"agent.cuda_version": EnvironmentConfig(
@@ -82,10 +86,10 @@ ENVIRONMENT_CONFIG = {
names=("CLEARML_CPU_ONLY", "TRAINS_CPU_ONLY", "CPU_ONLY"), type=bool
),
"sdk.aws.s3.key": EnvironmentConfig("AWS_ACCESS_KEY_ID"),
"sdk.aws.s3.secret": EnvironmentConfig("AWS_SECRET_ACCESS_KEY"),
"sdk.aws.s3.secret": ENV_AWS_SECRET_KEY,
"sdk.aws.s3.region": EnvironmentConfig("AWS_DEFAULT_REGION"),
"sdk.azure.storage.containers.0": {'account_name': EnvironmentConfig("AZURE_STORAGE_ACCOUNT"),
'account_key': EnvironmentConfig("AZURE_STORAGE_KEY")},
'account_key': ENV_AZURE_ACCOUNT_KEY},
"sdk.google.storage.credentials_json": EnvironmentConfig("GOOGLE_APPLICATION_CREDENTIALS"),
}

View File

@@ -16,7 +16,7 @@ def parse(reqstr):
filename = getattr(reqstr, 'name', None)
try:
# Python 2.x compatibility
if not isinstance(reqstr, basestring):
if not isinstance(reqstr, basestring): # noqa
reqstr = reqstr.read()
except NameError:
# Python 3.x only

View File

@@ -1,18 +1,21 @@
from __future__ import print_function, division, unicode_literals
import base64
import functools
import json
import logging
import os
import re
import subprocess
import tempfile
from copy import deepcopy
import yaml
import json
from pathlib import Path
from threading import Thread
from time import sleep
from typing import Text, List
import yaml
from clearml_agent.commands.events import Events
from clearml_agent.commands.worker import Worker
from clearml_agent.definitions import ENV_DOCKER_IMAGE
@@ -27,20 +30,22 @@ from clearml_agent.interface.base import ObjectID
class K8sIntegration(Worker):
K8S_PENDING_QUEUE = "k8s_scheduler"
KUBECTL_APPLY_CMD = "kubectl apply -f"
K8S_DEFAULT_NAMESPACE = "clearml"
KUBECTL_RUN_CMD = "kubectl run clearml-{queue_name}-id-{task_id} " \
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
KUBECTL_RUN_CMD = "kubectl run clearml-id-{task_id} " \
"--image {docker_image} " \
"--restart=Never --replicas=1 " \
"--generator=run-pod/v1 " \
"--namespace=clearml"
"--restart=Never " \
"--namespace={namespace}"
KUBECTL_DELETE_CMD = "kubectl delete pods " \
"--selector=TRAINS=agent " \
"--field-selector=status.phase!=Pending,status.phase!=Running " \
"--namespace=clearml"
"--namespace={namespace}"
BASH_INSTALL_SSH_CMD = [
"apt-get update",
"apt-get install -y openssh-server",
"mkdir -p /var/run/sshd",
"echo 'root:training' | chpasswd",
@@ -83,11 +88,15 @@ class K8sIntegration(Worker):
debug=False,
ports_mode=False,
num_of_services=20,
base_pod_num=1,
user_props_cb=None,
overrides_yaml=None,
template_yaml=None,
trains_conf_file=None,
clearml_conf_file=None,
extra_bash_init_script=None,
namespace=None,
max_pods_limit=None,
**kwargs
):
"""
Initialize the k8s integration glue layer daemon
@@ -104,14 +113,17 @@ class K8sIntegration(Worker):
Requires the `num_of_services` parameter.
:param int num_of_services: Number of k8s services configured in the cluster. Required if `port_mode` is True.
(default: 20)
:param int base_pod_num: Used when `ports_mode` is True, sets the base pod number to a given value (default: 1)
:param callable user_props_cb: An Optional callable allowing additional user properties to be specified
when scheduling a task to run in a pod. Callable can receive an optional pod number and should return
a dictionary of user properties (name and value). Signature is [[Optional[int]], Dict[str,str]]
:param str overrides_yaml: YAML file containing the overrides for the pod (optional)
:param str template_yaml: YAML file containing the template for the pod (optional).
If provided the pod is scheduled with kubectl apply and overrides are ignored, otherwise with kubectl run.
:param str trains_conf_file: clearml.conf file to be use by the pod itself (optional)
:param str clearml_conf_file: clearml.conf file to be use by the pod itself (optional)
:param str extra_bash_init_script: Additional bash script to run before starting the Task inside the container
:param str namespace: K8S namespace to be used when creating the new pods (default: clearml)
:param int max_pods_limit: Maximum number of pods that K8S glue can run at the same time
"""
super(K8sIntegration, self).__init__()
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
@@ -125,16 +137,19 @@ class K8sIntegration(Worker):
self.log.logger.setLevel(logging.INFO)
self.ports_mode = ports_mode
self.num_of_services = num_of_services
self.base_pod_num = base_pod_num
self._edit_hyperparams_support = None
self._user_props_cb = user_props_cb
self.trains_conf_file = None
self.conf_file_content = None
self.overrides_json_string = None
self.template_dict = None
self.extra_bash_init_script = extra_bash_init_script or None
if self.extra_bash_init_script and not isinstance(self.extra_bash_init_script, str):
self.extra_bash_init_script = ' ; '.join(self.extra_bash_init_script) # noqa
self.namespace = namespace or self.K8S_DEFAULT_NAMESPACE
self.pod_limits = []
self.pod_requests = []
self.max_pods_limit = max_pods_limit if not self.ports_mode else None
if overrides_yaml:
with open(os.path.expandvars(os.path.expanduser(str(overrides_yaml))), 'rt') as f:
overrides = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
@@ -161,11 +176,59 @@ class K8sIntegration(Worker):
with open(os.path.expandvars(os.path.expanduser(str(template_yaml))), 'rt') as f:
self.template_dict = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
if trains_conf_file:
with open(os.path.expandvars(os.path.expanduser(str(trains_conf_file))), 'rt') as f:
self.trains_conf_file = f.read()
clearml_conf_file = clearml_conf_file or kwargs.get('trains_conf_file')
if clearml_conf_file:
with open(os.path.expandvars(os.path.expanduser(str(clearml_conf_file))), 'rt') as f:
self.conf_file_content = f.read()
# make sure we use system packages!
self.trains_conf_file += '\nagent.package_manager.system_site_packages=true\n'
self.conf_file_content += '\nagent.package_manager.system_site_packages=true\n'
self._monitor_hanging_pods()
def _monitor_hanging_pods(self):
_check_pod_thread = Thread(target=self._monitor_hanging_pods_daemon)
_check_pod_thread.daemon = True
_check_pod_thread.start()
def _monitor_hanging_pods_daemon(self):
while True:
output = get_bash_output('kubectl get pods -n {namespace} -o=JSON'.format(
namespace=self.namespace
))
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
try:
output_config = json.loads(output)
except Exception as ex:
self.log.warning('K8S Glue pods monitor: Failed parsing kubectl output:\n{}\nEx: {}'.format(output, ex))
sleep(self._polling_interval)
continue
pods = output_config.get('items', [])
for pod in pods:
try:
reason = functools.reduce(
lambda a, b: a[b], ('status', 'containerStatuses', 0, 'state', 'waiting', 'reason'), pod
)
except (IndexError, KeyError):
continue
if reason == 'ImagePullBackOff':
pod_name = pod.get('metadata', {}).get('name', None)
if pod_name:
task_id = pod_name.rpartition('-')[-1]
delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, self.namespace)
get_bash_output(delete_pod_cmd)
try:
self._session.api_client.tasks.failed(
task=task_id,
status_reason="K8S glue error due to ImagePullBackOff",
status_message="Changed by K8S glue",
force=True
)
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: Failed deleting task "{}"\nEX: {}'.format(task_id, ex)
)
sleep(self._polling_interval)
def _set_task_user_properties(self, task_id: str, **properties: str):
if self._edit_hyperparams_support is not True:
@@ -213,20 +276,23 @@ class K8sIntegration(Worker):
return
if task_data.execution.docker_cmd:
docker_parts = task_data.execution.docker_cmd
docker_cmd = task_data.execution.docker_cmd
else:
docker_parts = str(ENV_DOCKER_IMAGE.get() or
self._session.config.get("agent.default_docker.image", "nvidia/cuda"))
docker_cmd = str(ENV_DOCKER_IMAGE.get() or
self._session.config.get("agent.default_docker.image", "nvidia/cuda"))
# take the first part, this is the docker image name (not arguments)
docker_parts = docker_parts.split()
docker_parts = docker_cmd.split()
docker_image = docker_parts[0]
docker_args = docker_parts[1:] if len(docker_parts) > 1 else []
# get the clearml.conf encoded file
# noinspection PyProtectedMember
hocon_config_encoded = (self.trains_conf_file or self._session._config_file).encode('ascii')
create_trains_conf = "echo '{}' | base64 --decode >> ~/clearml.conf".format(
hocon_config_encoded = (
self.conf_file_content
or Path(self._session._config_file).read_text()
).encode("ascii")
create_clearml_conf = "echo '{}' | base64 --decode >> ~/clearml.conf".format(
base64.b64encode(
hocon_config_encoded
).decode('ascii')
@@ -241,17 +307,22 @@ class K8sIntegration(Worker):
except Exception:
queue_name = 'k8s'
# conform queue name to k8s standards
safe_queue_name = queue_name.lower().strip()
safe_queue_name = re.sub(r'\W+', '', safe_queue_name).replace('_', '').replace('-', '')
# Search for a free pod number
pod_number = 1
while self.ports_mode:
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n clearml".format(
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
agent_label=self.AGENT_LABEL
)
pod_count = 0
pod_number = self.base_pod_num
while self.ports_mode or self.max_pods_limit:
pod_number = self.base_pod_num + pod_count
if self.ports_mode:
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format(
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
agent_label=self.AGENT_LABEL,
namespace=self.namespace,
)
else:
kubectl_cmd_new = "kubectl get pods -l {agent_label} -n {namespace} -o json".format(
agent_label=self.AGENT_LABEL,
namespace=self.namespace,
)
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
@@ -260,48 +331,82 @@ class K8sIntegration(Worker):
if not output:
# No such pod exist so we can use the pod_number we found
break
if pod_number >= self.num_of_services:
# All pod numbers are taken, exit
if self.max_pods_limit:
try:
current_pod_count = len(json.loads(output).get("items", []))
except (ValueError, TypeError) as ex:
self.log.warning(
"K8S Glue pods monitor: Failed parsing kubectl output:\n{}\ntask '{}' "
"will be enqueued back to queue '{}'\nEx: {}".format(
output, task_id, queue, ex
)
)
self._session.api_client.tasks.reset(task_id)
self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error')
return
max_count = self.max_pods_limit
else:
current_pod_count = pod_count
max_count = self.num_of_services - 1
if current_pod_count >= max_count:
# All pods are taken, exit
self.log.debug(
"kubectl last result: {}\n{}".format(error, output))
self.log.warning(
"kubectl last result: {}\n{}\nAll k8s services are in use, task '{}' "
"All k8s services are in use, task '{}' "
"will be enqueued back to queue '{}'".format(
error, output, task_id, queue
task_id, queue
)
)
self._session.api_client.tasks.reset(task_id)
self._session.api_client.tasks.enqueue(
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)')
return
pod_number += 1
elif self.max_pods_limit:
# max pods limit hasn't reached yet, so we can create the pod
break
pod_count += 1
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL]
labels.append("clearml-agent-queue={}".format(self._safe_k8s_label_value(queue)))
labels.append("clearml-agent-queue-name={}".format(self._safe_k8s_label_value(queue_name)))
if self.ports_mode:
print("Kubernetes scheduling task id={} on pod={}".format(task_id, pod_number))
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
else:
print("Kubernetes scheduling task id={}".format(task_id))
if self.template_dict:
output, error = self._kubectl_apply(
create_trains_conf=create_trains_conf,
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_image, docker_args=docker_args,
task_id=task_id, queue=queue, queue_name=safe_queue_name)
task_id=task_id, queue=queue)
else:
output, error = self._kubectl_run(
create_trains_conf=create_trains_conf,
labels=labels, docker_image=docker_image,
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_cmd,
task_data=task_data,
task_id=task_id, queue=queue, queue_name=safe_queue_name)
task_id=task_id, queue=queue)
error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8'))
output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8'))
print('kubectl output:\n{}\n{}'.format(error, output))
if error:
self.log.error("Running kubectl encountered an error: {}".format(error))
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
user_props = {"k8s-queue": str(queue_name)}
if self.ports_mode:
user_props.update({"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]})
user_props.update(
{
"k8s-pod-number": pod_number,
"k8s-pod-label": labels[0],
"k8s-internal-pod-count": pod_count,
}
)
if self._user_props_cb:
# noinspection PyBroadException
@@ -319,26 +424,27 @@ class K8sIntegration(Worker):
def _parse_docker_args(self, docker_args):
# type: (list) -> dict
kube_args = {'env': []}
kube_args = []
while docker_args:
cmd = docker_args.pop().strip()
cmd = docker_args.pop(0).strip()
if cmd in ('-e', '--env',):
env = docker_args.pop().strip()
env = docker_args.pop(0).strip()
key, value = env.split('=', 1)
kube_args[key] += {key: value}
kube_args.append({'name': key, 'value': value})
else:
self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd))
return kube_args
return {'env': kube_args} if kube_args else {}
def _kubectl_apply(self, create_trains_conf, docker_image, docker_args, labels, queue, task_id, queue_name):
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id):
template = deepcopy(self.template_dict)
template.setdefault('apiVersion', 'v1')
template['kind'] = 'Pod'
template.setdefault('metadata', {})
name = 'clearml-{queue}-id-{task_id}'.format(queue=queue_name, task_id=task_id)
name = 'clearml-id-{task_id}'.format(task_id=task_id)
template['metadata']['name'] = name
template.setdefault('spec', {})
template['spec'].setdefault('containers', [])
template['spec'].setdefault('restartPolicy', 'Never')
if labels:
labels_dict = dict(pair.split('=', 1) for pair in labels)
template['metadata'].setdefault('labels', {})
@@ -360,15 +466,16 @@ class K8sIntegration(Worker):
script_encoded.encode('ascii')
).decode('ascii'))
container = merge_dicts(
# Notice: we always leave with exit code 0, so pods are never restarted
container = self._merge_containers(
container,
dict(name=name, image=docker_image,
command=['/bin/bash'],
args=['-c', '{} ; {}'.format(create_trains_conf, create_init_script)])
args=['-c', '{} ; {} ; exit 0'.format(create_clearml_conf, create_init_script)])
)
if template['spec']['containers']:
template['spec']['containers'][0] = merge_dicts(template['spec']['containers'][0], container)
template['spec']['containers'][0] = self._merge_containers(template['spec']['containers'][0], container)
else:
template['spec']['containers'].append(container)
@@ -381,6 +488,7 @@ class K8sIntegration(Worker):
task_id=task_id,
docker_image=docker_image,
queue_id=queue,
namespace=self.namespace
)
# make sure we provide a list
if isinstance(kubectl_cmd, str):
@@ -398,15 +506,15 @@ class K8sIntegration(Worker):
return output, error
def _kubectl_run(self, create_trains_conf, docker_image, labels, queue, task_data, task_id, queue_name):
def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id):
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data, queue_name)
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
else:
kubectl_cmd = self.kubectl_cmd.format(
queue_name=queue_name,
task_id=task_id,
docker_image=docker_image,
queue_id=queue
queue_id=queue,
namespace=self.namespace,
)
# make sure we provide a list
if isinstance(kubectl_cmd, str):
@@ -430,7 +538,7 @@ class K8sIntegration(Worker):
"--",
"/bin/sh",
"-c",
"{} ; {}".format(create_trains_conf, container_bash_script.format(
"{} ; {}".format(create_clearml_conf, container_bash_script.format(
extra_bash_init_cmd=self.extra_bash_init_script, task_id=task_id)),
]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@@ -464,7 +572,7 @@ class K8sIntegration(Worker):
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed / failed pods
get_bash_output(self.KUBECTL_DELETE_CMD)
get_bash_output(self.KUBECTL_DELETE_CMD.format(namespace=self.namespace))
# get next task in queue
try:
@@ -516,3 +624,27 @@ class K8sIntegration(Worker):
@classmethod
def get_ssh_server_bash(cls, ssh_port_number):
return ' ; '.join(line.format(port=ssh_port_number) for line in cls.BASH_INSTALL_SSH_CMD)
@staticmethod
def _merge_containers(c1, c2):
def merge_env(k, d1, d2, not_set):
if k != "env":
return not_set
# Merge environment lists, second list overrides first
return list({
item['name']: item for envs in (d1, d2) for item in envs
}.values())
return merge_dicts(
c1, c2, custom_merge_func=merge_env
)
@staticmethod
def _safe_k8s_label_value(value):
""" Conform string to k8s standards for a label value """
value = value.lower().strip()
value = re.sub(r'^[^A-Za-z0-9]+', '', value) # strip leading non-alphanumeric chars
value = re.sub(r'[^A-Za-z0-9]+$', '', value) # strip trailing non-alphanumeric chars
value = re.sub(r'\W+', '-', value) # allow only word chars (this removed "." which is supported, but nvm)
value = re.sub(r'-+', '-', value) # don't leave messy "--" after replacing previous chars
return value[:63]

View File

@@ -24,7 +24,6 @@ import pyhocon
import yaml
from attr import fields_dict
from pathlib2 import Path
from tqdm import tqdm
import six
from six.moves import reduce
@@ -399,12 +398,6 @@ class TqdmStream(object):
self.buffer.write('\n')
class TqdmLog(tqdm):
def __init__(self, iterable=None, file=None, **kwargs):
super(TqdmLog, self).__init__(iterable, file=TqdmStream(file or sys.stderr), **kwargs)
def url_join(first, *rest):
"""
Join url parts similarly to Path.join

View File

@@ -1,17 +1,23 @@
from typing import Callable, Dict, Any
from typing import Callable, Dict, Any, Optional
_not_set = object()
def filter_keys(filter_, dct): # type: (Callable[[Any], bool], Dict) -> Dict
return {key: value for key, value in dct.items() if filter_(key)}
def merge_dicts(dict1, dict2):
def merge_dicts(dict1, dict2, custom_merge_func=None):
# type: (Any, Any, Optional[Callable[[str, Any, Any, Any], Any]]) -> Any
""" Recursively merges dict2 into dict1 """
if not isinstance(dict1, dict) or not isinstance(dict2, dict):
return dict2
for k in dict2:
if k in dict1:
dict1[k] = merge_dicts(dict1[k], dict2[k])
res = None
if custom_merge_func:
res = custom_merge_func(k, dict1[k], dict2[k], _not_set)
dict1[k] = merge_dicts(dict1[k], dict2[k], custom_merge_func) if res is _not_set else res
else:
dict1[k] = dict2[k]
return dict1

View File

@@ -20,6 +20,7 @@ import platform
import sys
import time
from datetime import datetime
from typing import Optional
import psutil
from ..gpu import pynvml as N
@@ -390,3 +391,38 @@ def new_query(shutdown=False, per_process_stats=False, get_driver_info=False):
'''
return GPUStatCollection.new_query(shutdown=shutdown, per_process_stats=per_process_stats,
get_driver_info=get_driver_info)
def get_driver_cuda_version():
# type: () -> Optional[str]
"""
:return: Return detected CUDA version from driver. On fail return value is None.
Example: `110` is cuda version 11.0
"""
# noinspection PyBroadException
try:
N.nvmlInit()
except BaseException:
return None
# noinspection PyBroadException
try:
cuda_version = str(N.nvmlSystemGetCudaDriverVersion())
except BaseException:
# noinspection PyBroadException
try:
cuda_version = str(N.nvmlSystemGetCudaDriverVersion_v2())
except BaseException:
cuda_version = ''
# noinspection PyBroadException
try:
N.nvmlShutdown()
except BaseException:
return None
# for some reason we get CUDA version 11020 instead of 11200, so this is the fix
if cuda_version and len(cuda_version) >= 4 and cuda_version[2] == '0' and cuda_version[3] != '0':
return cuda_version[:2]+cuda_version[3]
return cuda_version[:3] if cuda_version else None

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,225 @@
import os
import shutil
from logging import warning
from random import random
from time import time
from typing import List, Optional, Sequence
import psutil
from pathlib2 import Path
from .locks import FileLock
class FolderCache(object):
_lock_filename = '.clearml.lock'
_lock_timeout_seconds = 30
_temp_entry_prefix = '_temp.'
def __init__(self, cache_folder, max_cache_entries=5, min_free_space_gb=None):
self._cache_folder = Path(os.path.expandvars(cache_folder)).expanduser().absolute()
self._cache_folder.mkdir(parents=True, exist_ok=True)
self._max_cache_entries = max_cache_entries
self._last_copied_entry_folder = None
self._min_free_space_gb = min_free_space_gb if min_free_space_gb and min_free_space_gb > 0 else None
self._lock = FileLock((self._cache_folder / self._lock_filename).as_posix())
def get_cache_folder(self):
# type: () -> Path
"""
:return: Return the base cache folder
"""
return self._cache_folder
def copy_cached_entry(self, keys, destination):
# type: (List[str], Path) -> Optional[Path]
"""
Copy a cached entry into a destination directory, if the cached entry does not exist return None
:param keys:
:param destination:
:return: Target path, None if cached entry does not exist
"""
self._last_copied_entry_folder = None
if not keys:
return None
# lock so we make sure no one deletes it before we copy it
# noinspection PyBroadException
try:
self._lock.acquire(timeout=self._lock_timeout_seconds)
except BaseException as ex:
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
return None
src = None
try:
src = self.get_entry(keys)
if src:
destination = Path(destination).absolute()
destination.mkdir(parents=True, exist_ok=True)
shutil.rmtree(destination.as_posix())
shutil.copytree(src.as_posix(), dst=destination.as_posix(), symlinks=True)
except BaseException as ex:
warning('Could not copy cache folder {} to {}: {}'.format(src, destination, ex))
self._lock.release()
return None
# release Lock
self._lock.release()
self._last_copied_entry_folder = src
return destination if src else None
def get_entry(self, keys):
# type: (List[str]) -> Optional[Path]
"""
Return a folder (a sub-folder of inside the cache_folder) matching one of the keys
:param keys: List of keys, return the first match to one of the keys, notice keys cannot contain '.'
:return: Path to the sub-folder or None if none was found
"""
if not keys:
return None
# conform keys
keys = [keys] if isinstance(keys, str) else keys
keys = sorted([k.replace('.', '_') for k in keys])
for cache_folder in self._cache_folder.glob('*'):
if cache_folder.is_dir() and any(True for k in cache_folder.name.split('.') if k in keys):
cache_folder.touch()
return cache_folder
return None
def add_entry(self, keys, source_folder, exclude_sub_folders=None):
# type: (List[str], Path, Optional[Sequence[str]]) -> bool
"""
Add a local folder into the cache, copy all sub-folders inside `source_folder`
excluding folders matching `exclude_sub_folders` list
:param keys: Cache entry keys list (str)
:param source_folder: Folder to copy into the cache
:param exclude_sub_folders: List of sub-folders to exclude from the copy operation
:return: return True is new entry was added to cache
"""
if not keys:
return False
keys = [keys] if isinstance(keys, str) else keys
keys = sorted([k.replace('.', '_') for k in keys])
# If entry already exists skip it
cached_entry = self.get_entry(keys)
if cached_entry:
# make sure the entry contains all keys
cached_keys = cached_entry.name.split('.')
if set(keys) - set(cached_keys):
# noinspection PyBroadException
try:
self._lock.acquire(timeout=self._lock_timeout_seconds)
except BaseException as ex:
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
# failed locking do nothing
return True
keys = sorted(list(set(keys) | set(cached_keys)))
dst = cached_entry.parent / '.'.join(keys)
# rename
try:
shutil.move(src=cached_entry.as_posix(), dst=dst.as_posix())
except BaseException as ex:
warning('Could not rename cache entry {} to {}: ex'.format(
cached_entry.as_posix(), dst.as_posix(), ex))
# release lock
self._lock.release()
return True
# make sure we remove old entries
self._remove_old_entries()
# if we do not have enough free space, do nothing.
if not self._check_min_free_space():
warning('Could not add cache entry, not enough free space on drive, '
'free space threshold {} GB. Clearing all cache entries!'.format(self._min_free_space_gb))
self._remove_old_entries(max_cache_entries=0)
return False
# create the new entry for us
exclude_sub_folders = exclude_sub_folders or []
source_folder = Path(source_folder).absolute()
# create temp folder
temp_folder = \
self._temp_entry_prefix + \
'{}.{}'.format(str(time()).replace('.', '_'), str(random()).replace('.', '_'))
temp_folder = self._cache_folder / temp_folder
temp_folder.mkdir(parents=True, exist_ok=False)
for f in source_folder.glob('*'):
if f.name in exclude_sub_folders:
continue
if f.is_dir():
shutil.copytree(
src=f.as_posix(), dst=(temp_folder / f.name).as_posix(),
symlinks=True, ignore_dangling_symlinks=True)
else:
shutil.copy(
src=f.as_posix(), dst=(temp_folder / f.name).as_posix(),
follow_symlinks=False)
# rename the target folder
target_cache_folder = self._cache_folder / '.'.join(keys)
# if we failed moving it means someone else created the cached entry before us, we can just leave
# noinspection PyBroadException
try:
shutil.move(src=temp_folder.as_posix(), dst=target_cache_folder.as_posix())
except BaseException:
# noinspection PyBroadException
try:
shutil.rmtree(path=temp_folder.as_posix())
except BaseException:
return False
return True
def get_last_copied_entry(self):
# type: () -> Optional[Path]
"""
:return: the last copied cached entry folder inside the cache
"""
return self._last_copied_entry_folder
def _remove_old_entries(self, max_cache_entries=None):
# type: (Optional[int]) -> ()
"""
Notice we only keep self._max_cache_entries-1, assuming we will be adding a new entry soon
:param int max_cache_entries: if not None use instead of self._max_cache_entries
"""
folder_entries = [(cache_folder, cache_folder.stat().st_mtime)
for cache_folder in self._cache_folder.glob('*')
if cache_folder.is_dir() and not cache_folder.name.startswith(self._temp_entry_prefix)]
folder_entries = sorted(folder_entries, key=lambda x: x[1], reverse=True)
# lock so we make sure no one deletes it before we copy it
# noinspection PyBroadException
try:
self._lock.acquire(timeout=self._lock_timeout_seconds)
except BaseException as ex:
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
return
number_of_entries_to_keep = self._max_cache_entries - 1 \
if max_cache_entries is None else max(0, int(max_cache_entries))
for folder, ts in folder_entries[number_of_entries_to_keep:]:
try:
shutil.rmtree(folder.as_posix(), ignore_errors=True)
except BaseException as ex:
warning('Could not delete cache entry {}: {}'.format(folder.as_posix(), ex))
self._lock.release()
def _check_min_free_space(self):
# type: () -> bool
"""
:return: return False if we hit the free space limit.
If not free space limit provided, always return True
"""
if not self._min_free_space_gb or not self._cache_folder:
return True
free_space = float(psutil.disk_usage(self._cache_folder.as_posix()).free)
free_space /= 2**30
return free_space > self._min_free_space_gb

View File

@@ -0,0 +1,211 @@
import os
import time
import tempfile
import contextlib
from .portalocker import constants, exceptions, lock, unlock
current_time = getattr(time, "monotonic", time.time)
DEFAULT_TIMEOUT = 10 ** 8
DEFAULT_CHECK_INTERVAL = 0.25
LOCK_METHOD = constants.LOCK_EX | constants.LOCK_NB
__all__ = [
'FileLock',
'open_atomic',
]
@contextlib.contextmanager
def open_atomic(filename, binary=True):
"""Open a file for atomic writing. Instead of locking this method allows
you to write the entire file and move it to the actual location. Note that
this makes the assumption that a rename is atomic on your platform which
is generally the case but not a guarantee.
http://docs.python.org/library/os.html#os.rename
>>> filename = 'test_file.txt'
>>> if os.path.exists(filename):
... os.remove(filename)
>>> with open_atomic(filename) as fh:
... written = fh.write(b'test')
>>> assert os.path.exists(filename)
>>> os.remove(filename)
"""
assert not os.path.exists(filename), '%r exists' % filename
path, name = os.path.split(filename)
# Create the parent directory if it doesn't exist
if path and not os.path.isdir(path): # pragma: no cover
os.makedirs(path)
temp_fh = tempfile.NamedTemporaryFile(
mode=binary and 'wb' or 'w',
dir=path,
delete=False,
)
yield temp_fh
temp_fh.flush()
os.fsync(temp_fh.fileno())
temp_fh.close()
try:
os.rename(temp_fh.name, filename)
finally:
try:
os.remove(temp_fh.name)
except Exception: # noqa
pass
class FileLock(object):
def __init__(
self, filename, mode='a', timeout=DEFAULT_TIMEOUT,
check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=False,
flags=LOCK_METHOD, **file_open_kwargs):
"""Lock manager with build-in timeout
filename -- filename
mode -- the open mode, 'a' or 'ab' should be used for writing
truncate -- use truncate to emulate 'w' mode, None is disabled, 0 is
truncate to 0 bytes
timeout -- timeout when trying to acquire a lock
check_interval -- check interval while waiting
fail_when_locked -- after the initial lock failed, return an error
or lock the file
**file_open_kwargs -- The kwargs for the `open(...)` call
fail_when_locked is useful when multiple threads/processes can race
when creating a file. If set to true than the system will wait till
the lock was acquired and then return an AlreadyLocked exception.
Note that the file is opened first and locked later. So using 'w' as
mode will result in truncate _BEFORE_ the lock is checked.
"""
if 'w' in mode:
truncate = True
mode = mode.replace('w', 'a')
else:
truncate = False
self.fh = None
self.filename = filename
self.mode = mode
self.truncate = truncate
self.timeout = timeout
self.check_interval = check_interval
self.fail_when_locked = fail_when_locked
self.flags = flags
self.file_open_kwargs = file_open_kwargs
def acquire(
self, timeout=None, check_interval=None, fail_when_locked=None):
"""Acquire the locked filehandle"""
if timeout is None:
timeout = self.timeout
if timeout is None:
timeout = 0
if check_interval is None:
check_interval = self.check_interval
if fail_when_locked is None:
fail_when_locked = self.fail_when_locked
# If we already have a filehandle, return it
fh = self.fh
if fh:
return fh
# Get a new filehandler
fh = self._get_fh()
try:
# Try to lock
fh = self._get_lock(fh)
except exceptions.LockException as exception:
# Try till the timeout has passed
timeoutend = current_time() + timeout
while timeoutend > current_time():
# Wait a bit
time.sleep(check_interval)
# Try again
try:
# We already tried to the get the lock
# If fail_when_locked is true, then stop trying
if fail_when_locked:
raise exceptions.AlreadyLocked(exception)
else: # pragma: no cover
# We've got the lock
fh = self._get_lock(fh)
break
except exceptions.LockException:
pass
else:
# We got a timeout... reraising
raise exceptions.LockException(exception)
# Prepare the filehandle (truncate if needed)
fh = self._prepare_fh(fh)
self.fh = fh
return fh
def release(self):
"""Releases the currently locked file handle"""
if self.fh:
# noinspection PyBroadException
try:
unlock(self.fh)
except Exception:
pass
# noinspection PyBroadException
try:
self.fh.close()
except Exception:
pass
self.fh = None
def _get_fh(self):
"""Get a new filehandle"""
return open(self.filename, self.mode, **self.file_open_kwargs)
def _get_lock(self, fh):
"""
Try to lock the given filehandle
returns LockException if it fails"""
lock(fh, self.flags)
return fh
def _prepare_fh(self, fh):
"""
Prepare the filehandle for usage
If truncate is a number, the file will be truncated to that amount of
bytes
"""
if self.truncate:
fh.seek(0)
fh.truncate(0)
return fh
def __enter__(self):
return self.acquire()
def __exit__(self, type_, value, tb):
self.release()
def __delete__(self, instance): # pragma: no cover
instance.release()

View File

@@ -0,0 +1,193 @@
import os
import sys
class exceptions:
class BaseLockException(Exception):
# Error codes:
LOCK_FAILED = 1
def __init__(self, *args, **kwargs):
self.fh = kwargs.pop('fh', None)
Exception.__init__(self, *args, **kwargs)
class LockException(BaseLockException):
pass
class AlreadyLocked(BaseLockException):
pass
class FileToLarge(BaseLockException):
pass
class constants:
# The actual tests will execute the code anyhow so the following code can
# safely be ignored from the coverage tests
if os.name == 'nt': # pragma: no cover
import msvcrt
LOCK_EX = 0x1 #: exclusive lock
LOCK_SH = 0x2 #: shared lock
LOCK_NB = 0x4 #: non-blocking
LOCK_UN = msvcrt.LK_UNLCK #: unlock
LOCKFILE_FAIL_IMMEDIATELY = 1
LOCKFILE_EXCLUSIVE_LOCK = 2
elif os.name == 'posix': # pragma: no cover
import fcntl
LOCK_EX = fcntl.LOCK_EX #: exclusive lock
LOCK_SH = fcntl.LOCK_SH #: shared lock
LOCK_NB = fcntl.LOCK_NB #: non-blocking
LOCK_UN = fcntl.LOCK_UN #: unlock
else: # pragma: no cover
raise RuntimeError('PortaLocker only defined for nt and posix platforms')
if os.name == 'nt': # pragma: no cover
import msvcrt
if sys.version_info.major == 2:
lock_length = -1
else:
lock_length = int(2**31 - 1)
def lock(file_, flags):
if flags & constants.LOCK_SH:
import win32file
import pywintypes
import winerror
__overlapped = pywintypes.OVERLAPPED()
if sys.version_info.major == 2:
if flags & constants.LOCK_NB:
mode = constants.LOCKFILE_FAIL_IMMEDIATELY
else:
mode = 0
else:
if flags & constants.LOCK_NB:
mode = msvcrt.LK_NBRLCK
else:
mode = msvcrt.LK_RLCK
# is there any reason not to reuse the following structure?
hfile = win32file._get_osfhandle(file_.fileno())
try:
win32file.LockFileEx(hfile, mode, 0, -0x10000, __overlapped)
except pywintypes.error as exc_value:
# error: (33, 'LockFileEx', 'The process cannot access the file
# because another process has locked a portion of the file.')
if exc_value.winerror == winerror.ERROR_LOCK_VIOLATION:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
else:
# Q: Are there exceptions/codes we should be dealing with
# here?
raise
else:
mode = constants.LOCKFILE_EXCLUSIVE_LOCK
if flags & constants.LOCK_NB:
mode |= constants.LOCKFILE_FAIL_IMMEDIATELY
if flags & constants.LOCK_NB:
mode = msvcrt.LK_NBLCK
else:
mode = msvcrt.LK_LOCK
# windows locks byte ranges, so make sure to lock from file start
try:
savepos = file_.tell()
if savepos:
# [ ] test exclusive lock fails on seek here
# [ ] test if shared lock passes this point
file_.seek(0)
# [x] check if 0 param locks entire file (not documented in
# Python)
# [x] fails with "IOError: [Errno 13] Permission denied",
# but -1 seems to do the trick
try:
msvcrt.locking(file_.fileno(), mode, lock_length)
except IOError as exc_value:
# [ ] be more specific here
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
finally:
if savepos:
file_.seek(savepos)
except IOError as exc_value:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
fh=file_)
def unlock(file_):
try:
savepos = file_.tell()
if savepos:
file_.seek(0)
try:
msvcrt.locking(file_.fileno(), constants.LOCK_UN, lock_length)
except IOError as exc_value:
if exc_value.strerror == 'Permission denied':
import pywintypes
import win32file
import winerror
__overlapped = pywintypes.OVERLAPPED()
hfile = win32file._get_osfhandle(file_.fileno())
try:
win32file.UnlockFileEx(
hfile, 0, -0x10000, __overlapped)
except pywintypes.error as exc_value:
if exc_value.winerror == winerror.ERROR_NOT_LOCKED:
# error: (158, 'UnlockFileEx',
# 'The segment is already unlocked.')
# To match the 'posix' implementation, silently
# ignore this error
pass
else:
# Q: Are there exceptions/codes we should be
# dealing with here?
raise
else:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
finally:
if savepos:
file_.seek(savepos)
except IOError as exc_value:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
fh=file_)
elif os.name == 'posix': # pragma: no cover
import fcntl
def lock(file_, flags):
locking_exceptions = IOError,
try: # pragma: no cover
locking_exceptions += BlockingIOError,
except NameError: # pragma: no cover
pass
try:
fcntl.flock(file_.fileno(), flags)
except locking_exceptions as exc_value:
# The exception code varies on different systems so we'll catch
# every IO error
raise exceptions.LockException(exc_value, fh=file_)
def unlock(file_):
fcntl.flock(file_.fileno(), constants.LOCK_UN)
else: # pragma: no cover
raise RuntimeError('PortaLocker only defined for nt and posix platforms')

View File

@@ -1,11 +1,16 @@
from __future__ import unicode_literals
import abc
from collections import OrderedDict
from contextlib import contextmanager
from typing import Text, Iterable, Union
from typing import Text, Iterable, Union, Optional, Dict, List
from pathlib2 import Path
from hashlib import md5
import six
from clearml_agent.helper.base import mkstemp, safe_remove_file, join_lines, select_for_platform
from clearml_agent.helper.console import ensure_binary
from clearml_agent.helper.os.folder_cache import FolderCache
from clearml_agent.helper.process import Executable, Argv, PathLike
@@ -18,6 +23,12 @@ class PackageManager(object):
_selected_manager = None
_cwd = None
_pip_version = None
_config_cache_folder = 'agent.venvs_cache.path'
_config_cache_max_entries = 'agent.venvs_cache.max_entries'
_config_cache_free_space_threshold = 'agent.venvs_cache.free_space_threshold_gb'
def __init__(self):
self._cache_manager = None
@abc.abstractproperty
def bin(self):
@@ -67,7 +78,7 @@ class PackageManager(object):
def upgrade_pip(self):
result = self._install(
select_for_platform(windows='"pip{}"', linux='pip{}').format(self.get_pip_version()), "--upgrade")
select_for_platform(windows='pip{}', linux='pip{}').format(self.get_pip_version()), "--upgrade")
packages = self.run_with_env(('list',), output=True).splitlines()
# p.split is ('pip', 'x.y.z')
pip = [p.split() for p in packages if len(p.split()) == 2 and p.split()[0] == 'pip']
@@ -153,3 +164,100 @@ class PackageManager(object):
@classmethod
def get_pip_version(cls):
return cls._pip_version or ''
def get_cached_venv(self, requirements, docker_cmd, python_version, cuda_version, destination_folder):
# type: (Dict, Optional[Union[dict, str]], Optional[str], Optional[str], Path) -> Optional[Path]
"""
Copy a cached copy of the venv (based on the requirements) into destination_folder.
Return None if failed or cached entry does not exist
"""
if not self._get_cache_manager():
return None
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
return self._get_cache_manager().copy_cached_entry(keys, destination_folder)
def add_cached_venv(
self,
requirements, # type: Union[Dict, List[Dict]]
docker_cmd, # type: Optional[Union[dict, str]]
python_version, # type: Optional[str]
cuda_version, # type: Optional[str]
source_folder, # type: Path
exclude_sub_folders=None # type: Optional[List[str]]
):
# type: (...) -> ()
"""
Copy the local venv folder into the venv cache (keys are based on the requirements+python+docker).
"""
if not self._get_cache_manager():
return
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
return self._get_cache_manager().add_entry(
keys=keys, source_folder=source_folder, exclude_sub_folders=exclude_sub_folders)
def get_cache_folder(self):
# type: () -> Optional[Path]
if not self._get_cache_manager():
return
return self._get_cache_manager().get_cache_folder()
def get_last_used_entry_cache(self):
# type: () -> Optional[Path]
"""
:return: the last used cached folder entry
"""
if not self._get_cache_manager():
return
return self._get_cache_manager().get_last_copied_entry()
@classmethod
def _generate_reqs_hash_keys(cls, requirements_list, docker_cmd, python_version, cuda_version):
# type: (Union[Dict, List[Dict]], Optional[Union[dict, str]], Optional[str], Optional[str]) -> List[str]
requirements_list = requirements_list or dict()
if not isinstance(requirements_list, (list, tuple)):
requirements_list = [requirements_list]
docker_cmd = dict(docker_cmd=docker_cmd) if isinstance(docker_cmd, str) else docker_cmd or dict()
docker_cmd = OrderedDict(sorted(docker_cmd.items(), key=lambda t: t[0]))
if 'docker_cmd' in docker_cmd:
# we only take the first part of the docker_cmd which is the docker image name
docker_cmd['docker_cmd'] = docker_cmd['docker_cmd'].strip('\r\n\t ').split(' ')[0]
keys = []
strip_chars = '\n\r\t '
for requirements in requirements_list:
pip, conda = ('pip', 'conda')
pip_reqs = requirements.get(pip, '')
conda_reqs = requirements.get(conda, '')
if isinstance(pip_reqs, str):
pip_reqs = pip_reqs.split('\n')
if isinstance(conda_reqs, str):
conda_reqs = conda_reqs.split('\n')
pip_reqs = sorted([p.strip(strip_chars) for p in pip_reqs
if p.strip(strip_chars) and not p.strip(strip_chars).startswith('#')])
conda_reqs = sorted([p.strip(strip_chars) for p in conda_reqs
if p.strip(strip_chars) and not p.strip(strip_chars).startswith('#')])
if not pip_reqs and not conda_reqs:
continue
hash_text = '{class_type}\n{docker_cmd}\n{cuda_ver}\n{python_version}\n{pip_reqs}\n{conda_reqs}'.format(
class_type=str(cls),
docker_cmd=str(docker_cmd or ''),
cuda_ver=str(cuda_version or ''),
python_version=str(python_version or ''),
pip_reqs=str(pip_reqs or ''),
conda_reqs=str(conda_reqs or ''),
)
keys.append(md5(ensure_binary(hash_text)).hexdigest())
return sorted(list(set(keys)))
def _get_cache_manager(self):
if not self._cache_manager:
cache_folder = self.session.config.get(self._config_cache_folder, None)
if not cache_folder:
return None
max_entries = int(self.session.config.get(self._config_cache_max_entries, 10))
free_space_threshold = float(self.session.config.get(self._config_cache_free_space_threshold, 0))
self._cache_manager = FolderCache(
cache_folder, max_cache_entries=max_entries, min_free_space_gb=free_space_threshold)
return self._cache_manager

View File

@@ -69,6 +69,7 @@ class CondaAPI(PackageManager):
:param python: base python version to use (e.g python3.6)
:param path: path of env
"""
super(CondaAPI, self).__init__()
self.session = session
self.python = python
self.source = None
@@ -132,7 +133,7 @@ class CondaAPI(PackageManager):
if self.env_read_only:
print('Conda environment in read-only mode, skipping pip upgrade.')
return ''
return self._install(select_for_platform(windows='"pip{}"', linux='pip{}').format(self.pip.get_pip_version()))
return self._install(select_for_platform(windows='pip{}', linux='pip{}').format(self.pip.get_pip_version()))
def create(self):
"""
@@ -140,19 +141,7 @@ class CondaAPI(PackageManager):
"""
if self.conda_env_as_base_docker and self.conda_pre_build_env_path:
if Path(self.conda_pre_build_env_path).is_dir():
print("Using pre-existing Conda environment from {}".format(self.conda_pre_build_env_path))
self.path = Path(self.conda_pre_build_env_path)
self.source = ("conda", "activate", self.path.as_posix())
self.pip = CondaPip(
session=self.session,
source=self.source,
python=self.python,
requirements_manager=self.requirements_manager,
path=self.path,
)
conda_env = self._get_conda_sh()
self.source = self.pip.source = CommandSequence(('source', conda_env.as_posix()), self.source)
self.env_read_only = True
self._init_existing_environment(self.conda_pre_build_env_path)
return self
elif Path(self.conda_pre_build_env_path).is_file():
print("Restoring Conda environment from {}".format(self.conda_pre_build_env_path))
@@ -210,6 +199,21 @@ class CondaAPI(PackageManager):
pass
return self
def _init_existing_environment(self, conda_pre_build_env_path):
print("Using pre-existing Conda environment from {}".format(conda_pre_build_env_path))
self.path = Path(conda_pre_build_env_path)
self.source = ("conda", "activate", self.path.as_posix())
self.pip = CondaPip(
session=self.session,
source=self.source,
python=self.python,
requirements_manager=self.requirements_manager,
path=self.path,
)
conda_env = self._get_conda_sh()
self.source = self.pip.source = CommandSequence(('source', conda_env.as_posix()), self.source)
self.env_read_only = True
def remove(self):
"""
Delete a conda environment.
@@ -284,17 +288,11 @@ class CondaAPI(PackageManager):
"""
Try to install packages from conda. Install packages which are not available from conda with pip.
"""
try:
self._install_from_file(path)
return
except PackageNotFoundError as e:
pip_packages = [e.pkg]
except PackagesNotFoundError as e:
pip_packages = package_set(e.packages)
with self.temp_file("conda_reqs", _package_diff(path, pip_packages)) as reqs:
self.install_from_file(reqs)
with self.temp_file("pip_reqs", pip_packages) as reqs:
self.pip.install_from_file(reqs)
requirements = {}
# assume requirements.txt
with open(path, 'rt') as f:
requirements['pip'] = f.read()
self.load_requirements(requirements)
def freeze(self, freeze_full_environment=False):
requirements = self.pip.freeze()
@@ -507,6 +505,8 @@ class CondaAPI(PackageManager):
reqs.append(m)
# if we have a conda list, the rest should be installed with pip,
# this means any experiment that was executed with pip environment,
# will be installed using pip
if requirements.get('conda', None) is not None:
for r in requirements['pip']:
try:
@@ -520,7 +520,7 @@ class CondaAPI(PackageManager):
# skip over local files (we cannot change the version to a local file)
if m.local_file:
continue
m_name = m.name.lower()
m_name = (m.name or '').lower()
if m_name in conda_supported_req_names:
# this package is in the conda list,
# make sure that if we changed version and we match it in conda
@@ -557,7 +557,7 @@ class CondaAPI(PackageManager):
# conform conda packages (version/name)
for r in reqs:
# change _ to - in name but not the prefix _ (as this is conda prefix)
if not r.name.startswith('_') and not requirements.get('conda', None):
if r.name and not r.name.startswith('_') and not requirements.get('conda', None):
r.name = r.name.replace('_', '-')
# remove .post from version numbers, it fails ~= version, and change == to ~=
if r.specs and r.specs[0]:
@@ -573,6 +573,8 @@ class CondaAPI(PackageManager):
conda_env['dependencies'] = [clean_ver(r) for r in reqs]
with self.temp_file("conda_env", yaml.dump(conda_env), suffix=".yml") as name:
print('Conda: Trying to install requirements:\n{}'.format(conda_env['dependencies']))
if self.session.debug_mode:
print('{}:\n{}'.format(name, yaml.dump(conda_env)))
result = self._run_command(
("env", "update", "-p", self.path, "--file", name)
)
@@ -603,6 +605,8 @@ class CondaAPI(PackageManager):
pip_req_str = [r.tostr() for r in pip_requirements if r.name not in ('pip', 'virtualenv', )]
print('Conda: Installing requirements: step 2 - using pip:\n{}'.format(pip_req_str))
PackageManager._selected_manager = self.pip
if self.session.debug_mode:
print('pip requirements.txt:\n{}'.format('\n'.join(pip_req_str)))
self.pip.load_requirements({'pip': '\n'.join(pip_req_str)})
except Exception as e:
print(e)
@@ -646,12 +650,16 @@ class CondaAPI(PackageManager):
ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]')
return ansi_escape.sub('', line)
# make sure we are not running it with our own PYTHONPATH
env = dict(**os.environ)
env.pop('PYTHONPATH', None)
command = Argv(*command) # type: Executable
if not raw:
command = (self.conda,) + command + ("--quiet", "--json")
try:
print('Executing Conda: {}'.format(command.serialize()))
result = command.get_output(stdin=DEVNULL, **kwargs)
result = command.get_output(stdin=DEVNULL, env=env, **kwargs)
if self.session.debug_mode:
print(result)
except Exception as e:
@@ -671,6 +679,8 @@ class CondaAPI(PackageManager):
return result
def get_python_command(self, extra=()):
if not self.source:
self._init_existing_environment(self.path)
return CommandSequence(self.source, self.pip.get_python_command(extra=extra))
def _get_conda_sh(self):

View File

@@ -2,6 +2,8 @@ import re
from collections import OrderedDict
from typing import Text
from pathlib2 import Path
from .base import PackageManager
from .requirements import SimpleSubstitution
from ..base import safe_furl as furl
@@ -10,13 +12,27 @@ from ..base import safe_furl as furl
class ExternalRequirements(SimpleSubstitution):
name = "external_link"
cwd = None
def __init__(self, *args, **kwargs):
super(ExternalRequirements, self).__init__(*args, **kwargs)
self.post_install_req = []
self.post_install_req_lookup = OrderedDict()
self.post_install_local_req_lookup = OrderedDict()
def match(self, req):
# match local folder building:
if self.is_local_folder_package(req):
# noinspection PyBroadException
try:
folder_path = req.req.line.strip().split('#')[0].strip()
if self.cwd and not Path(folder_path).is_absolute():
folder_path = (Path(self.cwd) / Path(folder_path)).absolute().as_posix()
self.post_install_local_req_lookup['file://{}'.format(folder_path)] = req.req.line
except Exception:
pass
return True
# match both editable or code or unparsed
if not (not req.name or req.req and (req.req.editable or req.req.vcs)):
return False
@@ -103,4 +119,45 @@ class ExternalRequirements(SimpleSubstitution):
if r not in self.post_install_req_lookup]
list_of_requirements[k] += [self.post_install_req_lookup.get(r, '')
for r in self.post_install_req_lookup.keys() if r in original_requirements]
if self.post_install_local_req_lookup:
original_requirements = list_of_requirements[k]
list_of_requirements[k] = [
r for r in original_requirements
if len(r.split('@', 1)) != 2 or r.split('@', 1)[1].strip() not in self.post_install_local_req_lookup]
list_of_requirements[k] += [
self.post_install_local_req_lookup.get(r.split('@', 1)[1].strip(), '')
for r in original_requirements
if len(r.split('@', 1)) == 2 and r.split('@', 1)[1].strip() in self.post_install_local_req_lookup]
return list_of_requirements
@classmethod
def is_local_folder_package(cls, req):
# noinspection PyBroadException
try:
if not req.name and req.req and not req.req.editable and not req.req.vcs and \
req.req.line and req.req.line.strip().split('#')[0] and \
not req.req.line.strip().split('#')[0].lower().endswith('.whl'):
return True
except Exception:
pass
return False
class OnlyExternalRequirements(ExternalRequirements):
def __init__(self, *args, **kwargs):
super(OnlyExternalRequirements, self).__init__(*args, **kwargs)
def match(self, req):
return not super(OnlyExternalRequirements, self).match(req)
def replace(self, req):
"""
Replace a requirement
:raises: ValueError if version is pre-release
"""
# Do not store the skipped requirements
# mark skip package
return Text('')

View File

@@ -1,3 +1,4 @@
import os
import sys
from itertools import chain
from typing import Text, Optional
@@ -17,6 +18,7 @@ class SystemPip(PackageManager):
"""
Program interface to the system pip.
"""
super(SystemPip, self).__init__()
self._bin = interpreter or sys.executable
self.session = session
@@ -81,7 +83,10 @@ class SystemPip(PackageManager):
:param kwargs: kwargs for get_output/check_output command
"""
command = self._make_command(command)
return (command.get_output if output else command.check_call)(stdin=DEVNULL, **kwargs)
# make sure we are not running it with our own PYTHONPATH
env = dict(**os.environ)
env.pop('PYTHONPATH', None)
return (command.get_output if output else command.check_call)(stdin=DEVNULL, env=env, **kwargs)
def _make_command(self, command):
return Argv(self.bin, '-m', 'pip', '--disable-pip-version-check', *command)

View File

@@ -17,6 +17,7 @@ import six
from clearml_agent.definitions import PIP_EXTRA_INDICES
from clearml_agent.helper.base import warning, is_conda, which, join_lines, is_windows_platform
from clearml_agent.helper.process import Argv, PathLike
from clearml_agent.helper.gpu.gpustat import get_driver_cuda_version
from clearml_agent.session import Session, normalize_cuda_version
from clearml_agent.external.requirements_parser import parse
from clearml_agent.external.requirements_parser.requirement import Requirement
@@ -446,6 +447,7 @@ class RequirementsManager(object):
'cu'+agent['cuda_version'] if self.found_cuda else 'cpu')
self.translator = RequirementsTranslator(session, interpreter=base_interpreter,
cache_dir=pip_cache_dir.as_posix())
self._base_interpreter = base_interpreter
def register(self, cls): # type: (Type[RequirementSubstitution]) -> None
self.handlers.append(cls(self._session))
@@ -529,6 +531,9 @@ class RequirementsManager(object):
pass
return requirements
def get_interpreter(self):
return self._base_interpreter
@staticmethod
def get_cuda_version(config): # type: (ConfigTree) -> (Text, Text)
# we assume os.environ already updated the config['agent.cuda_version'] & config['agent.cudnn_version']
@@ -537,6 +542,9 @@ class RequirementsManager(object):
if cuda_version and cudnn_version:
return normalize_cuda_version(cuda_version), normalize_cuda_version(cudnn_version)
if not cuda_version:
cuda_version = get_driver_cuda_version()
if not cuda_version and is_windows_platform():
try:
cuda_vers = [int(k.replace('CUDA_PATH_V', '').replace('_', '')) for k in os.environ.keys()
@@ -601,4 +609,3 @@ class RequirementsManager(object):
return (normalize_cuda_version(cuda_version or 0),
normalize_cuda_version(cudnn_version or 0))

View File

@@ -7,7 +7,7 @@ import re
import subprocess
import sys
from contextlib import contextmanager
from copy import deepcopy
from copy import copy
from distutils.spawn import find_executable
from itertools import chain, repeat, islice
from os.path import devnull
@@ -42,20 +42,31 @@ def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False):
return output if not strip or not output else output.strip()
def terminate_process(pid, timeout=10.):
def terminate_process(pid, timeout=10., ignore_zombie=True, include_children=False):
# noinspection PyBroadException
try:
proc = psutil.Process(pid)
children = proc.children(recursive=True) if include_children else []
proc.terminate()
cnt = 0
while proc.is_running() and cnt < timeout:
while proc.is_running() and (ignore_zombie or proc.status() != 'zombie') and cnt < timeout:
sleep(1.)
cnt += 1
proc.terminate()
# terminate children
for c in children:
c.terminate()
cnt = 0
while proc.is_running() and cnt < timeout:
while proc.is_running() and (ignore_zombie or proc.status() != 'zombie') and cnt < timeout:
sleep(1.)
cnt += 1
# kill children
for c in children:
c.kill()
proc.kill()
except Exception:
pass
@@ -66,9 +77,8 @@ def terminate_process(pid, timeout=10.):
return True
def kill_all_child_processes(pid=None):
def kill_all_child_processes(pid=None, include_parent=True):
# get current process if pid not provided
include_parent = True
if not pid:
pid = os.getpid()
include_parent = False
@@ -84,6 +94,23 @@ def kill_all_child_processes(pid=None):
parent.kill()
def terminate_all_child_processes(pid=None, timeout=10., include_parent=True):
# get current process if pid not provided
if not pid:
pid = os.getpid()
include_parent = False
try:
parent = psutil.Process(pid)
except psutil.Error:
# could not find parent process id
return
for child in parent.children(recursive=False):
print('Terminating child process {}'.format(child.pid))
terminate_process(child.pid, timeout=timeout, ignore_zombie=False, include_children=True)
if include_parent:
terminate_process(parent.pid, timeout=timeout, ignore_zombie=False)
def get_docker_id(docker_cmd_contains):
try:
containers_running = get_bash_output(cmd='docker ps --no-trunc --format \"{{.ID}}: {{.Command}}\"')
@@ -103,9 +130,10 @@ def shutdown_docker_process(docker_cmd_contains=None, docker_id=None):
docker_id = get_docker_id(docker_cmd_contains=docker_cmd_contains)
if docker_id:
# we found our docker, stop it
get_bash_output(cmd='docker stop -t 1 {}'.format(docker_id))
return get_bash_output(cmd='docker stop -t 1 {}'.format(docker_id))
except Exception:
pass
return None
def commit_docker(container_name, docker_cmd_contains=None, docker_id=None, apply_change=None):
@@ -193,6 +221,7 @@ class Argv(Executable):
"""
self.argv = argv
self._log = kwargs.pop("log", None)
self._display_argv = kwargs.pop("display_argv", argv)
if not self._log:
self._log = logging.getLogger(__name__)
self._log.propagate = False
@@ -217,10 +246,10 @@ class Argv(Executable):
return self.argv
def __repr__(self):
return "<Argv{}>".format(self.argv)
return "<Argv{}>".format(self._display_argv)
def __str__(self):
return "Executing: {}".format(self.argv)
return "Executing: {}".format(self._display_argv)
def __iter__(self):
if is_windows_platform():
@@ -276,9 +305,9 @@ class CommandSequence(Executable):
self.commands = []
for c in commands:
if isinstance(c, CommandSequence):
self.commands.extend(deepcopy(c.commands))
self.commands.extend([copy(p) for p in c.commands])
elif isinstance(c, Argv):
self.commands.append(deepcopy(c))
self.commands.append(copy(c))
else:
self.commands.append(Argv(*c, log=self._log))
@@ -420,7 +449,7 @@ SOURCE_COMMAND = select_for_platform(linux="source", windows="call")
class ExitStatus(object):
success = 0
failure = 1
interrupted = 2
interrupted = -2
COMMAND_SUCCESS = 0

View File

@@ -4,7 +4,9 @@ import shutil
import subprocess
from distutils.spawn import find_executable
from hashlib import md5
from os import environ, getenv
from os import environ
from random import random
from threading import Lock
from typing import Text, Sequence, Mapping, Iterable, TypeVar, Callable, Tuple, Optional
import attr
@@ -23,6 +25,7 @@ from clearml_agent.helper.base import (
normalize_path,
create_file_if_not_exists,
)
from clearml_agent.helper.os.locks import FileLock
from clearml_agent.helper.process import DEVNULL, Argv, PathLike, COMMAND_SUCCESS
from clearml_agent.session import Session
@@ -88,7 +91,7 @@ class VCS(object):
# additional environment variables for VCS commands
COMMAND_ENV = {}
PATCH_ADDED_FILE_RE = re.compile(r"^\+\+\+ b/(?P<path>.*)")
PATCH_ADDED_FILE_RE = re.compile(r"^--- a/(?P<path>.*)")
def __init__(self, session, url, location, revision):
# type: (Session, Text, PathLike, Text) -> ()
@@ -115,21 +118,21 @@ class VCS(object):
"""
return self.add_auth(self.session.config, self.url)
@abc.abstractproperty
@abc.abstractmethod
def executable_name(self):
"""
Name of command executable
"""
pass
@abc.abstractproperty
@abc.abstractmethod
def main_branch(self):
"""
Name of default/main branch
"""
pass
@abc.abstractproperty
@abc.abstractmethod
def checkout_flags(self):
# type: () -> Sequence[Text]
"""
@@ -137,7 +140,7 @@ class VCS(object):
"""
pass
@abc.abstractproperty
@abc.abstractmethod
def patch_base(self):
# type: () -> Sequence[Text]
"""
@@ -254,15 +257,15 @@ class VCS(object):
return url
@classmethod
def replace_http_url(cls, url, port=None):
# type: (Text, Optional[int]) -> Text
def replace_http_url(cls, url, port=None, username=None):
# type: (Text, Optional[int], Optional[str]) -> Text
"""
Replace HTTPS URL with SSH URL when applicable
"""
parsed_url = furl(url)
if parsed_url.scheme == "https":
parsed_url.scheme = "ssh"
parsed_url.username = "git"
parsed_url.username = username or "git"
parsed_url.password = None
# make sure there is no port in the final url (safe_furl support)
# the original port was an https port, and we do not know if there is a different ssh port,
@@ -285,7 +288,10 @@ class VCS(object):
return
if parsed_url.scheme == "https":
new_url = self.replace_http_url(
self.url, port=self.session.config.get('agent.force_git_ssh_port', None))
self.url,
port=self.session.config.get('agent.force_git_ssh_port', None),
username=self.session.config.get('agent.force_git_ssh_user', None)
)
if new_url != self.url:
print("Using SSH credentials - replacing https url '{}' with ssh url '{}'".format(
self.url, new_url))
@@ -464,7 +470,7 @@ class VCS(object):
parsed_url.set(username=config_user, password=config_pass)
return parsed_url.url
@abc.abstractproperty
@abc.abstractmethod
def info_commands(self):
# type: () -> Mapping[Text, Argv]
"""
@@ -524,7 +530,7 @@ class Git(VCS):
self.call("checkout", self.revision, *self.checkout_flags, cwd=self.location)
try:
self.call("submodule", "update", "--recursive", cwd=self.location)
except:
except: # noqa
pass
info_commands = dict(
@@ -582,7 +588,10 @@ def clone_repository_cached(session, execution, destination):
:return: repository information
:raises: CommandFailedError if git/hg is not installed
"""
repo_url = execution.repository # type: str
# mock lock
repo_lock = Lock()
repo_lock_timeout_sec = 300
repo_url = execution.repository or '' # type: str
parsed_url = furl(repo_url)
no_password_url = parsed_url.copy().remove(password=True).url
@@ -593,37 +602,48 @@ def clone_repository_cached(session, execution, destination):
if standalone_mode:
cached_repo_path = clone_folder
else:
cached_repo_path = (
Path(session.config["agent.vcs_cache.path"]).expanduser()
/ "{}.{}".format(clone_folder_name, md5(ensure_binary(repo_url)).hexdigest())
/ clone_folder_name
) # type: Path
vcs_cache_path = Path(session.config["agent.vcs_cache.path"]).expanduser()
repo_hash = md5(ensure_binary(repo_url)).hexdigest()
# create lock
repo_lock = FileLock(filename=(vcs_cache_path / '{}.lock'.format(repo_hash)).as_posix())
# noinspection PyBroadException
try:
repo_lock.acquire(timeout=repo_lock_timeout_sec)
except BaseException:
print('Could not lock cache folder "{}" (timeout {} sec), using temp vcs cache.'.format(
clone_folder_name, repo_lock_timeout_sec))
repo_hash = '{}_{}'.format(repo_hash, str(random()).replace('.', ''))
# use mock lock for the context
repo_lock = Lock()
# select vcs cache folder
cached_repo_path = vcs_cache_path / "{}.{}".format(clone_folder_name, repo_hash) / clone_folder_name
vcs = VcsFactory.create(
session, execution_info=execution, location=cached_repo_path
)
if not find_executable(vcs.executable_name):
raise CommandFailedError(vcs.executable_not_found_error_help())
with repo_lock:
vcs = VcsFactory.create(
session, execution_info=execution, location=cached_repo_path
)
if not find_executable(vcs.executable_name):
raise CommandFailedError(vcs.executable_not_found_error_help())
if not standalone_mode:
if session.config["agent.vcs_cache.enabled"] and cached_repo_path.exists():
print('Using cached repository in "{}"'.format(cached_repo_path))
if not standalone_mode:
if session.config["agent.vcs_cache.enabled"] and cached_repo_path.exists():
print('Using cached repository in "{}"'.format(cached_repo_path))
else:
print("cloning: {}".format(no_password_url))
rm_tree(cached_repo_path)
# We clone the entire repository, not a specific branch
vcs.clone() # branch=execution.branch)
else:
print("cloning: {}".format(no_password_url))
rm_tree(cached_repo_path)
# We clone the entire repository, not a specific branch
vcs.clone() # branch=execution.branch)
vcs.pull()
rm_tree(destination)
shutil.copytree(Text(cached_repo_path), Text(clone_folder))
if not clone_folder.is_dir():
raise CommandFailedError(
"copying of repository failed: from {} to {}".format(
cached_repo_path, clone_folder
vcs.pull()
rm_tree(destination)
shutil.copytree(Text(cached_repo_path), Text(clone_folder))
if not clone_folder.is_dir():
raise CommandFailedError(
"copying of repository failed: from {} to {}".format(
cached_repo_path, clone_folder
)
)
)
# checkout in the newly copy destination
vcs.location = Text(clone_folder)
@@ -635,3 +655,70 @@ def clone_repository_cached(session, execution, destination):
repo_info = attr.evolve(repo_info, url=no_password_url)
return vcs, repo_info
def fix_package_import_diff_patch(entry_script_file):
# noinspection PyBroadException
try:
with open(entry_script_file, 'rt') as f:
lines = f.readlines()
except Exception:
return
# make sre we are the first import (i.e. we patched the source code)
if not lines or not lines[0].strip().startswith('from clearml ') or 'Task.init' not in lines[1]:
return
original_lines = lines
# skip over the first two lines, they are ours
# then skip over empty or comment lines
lines = [(i, line.split('#', 1)[0].rstrip()) for i, line in enumerate(lines)
if i >= 2 and line.strip('\r\n\t ') and not line.strip().startswith('#')]
# remove triple quotes ' """ '
nested_c = -1
skip_lines = []
for i, line_pair in enumerate(lines):
for _ in line_pair[1].split('"""')[1:]:
if nested_c >= 0:
skip_lines.extend(list(range(nested_c, i+1)))
nested_c = -1
else:
nested_c = i
# now select all the
lines = [pair for i, pair in enumerate(lines) if i not in skip_lines]
from_future = re.compile(r"^from[\s]*__future__[\s]*")
import_future = re.compile(r"^import[\s]*__future__[\s]*")
# test if we have __future__ import
found_index = -1
for a_i, (_, a_line) in enumerate(lines):
if found_index >= a_i:
continue
if from_future.match(a_line) or import_future.match(a_line):
found_index = a_i
# check the last import block
i, line = lines[found_index]
# wither we have \\ character at the end of the line or the line is indented
parenthesized_lines = '(' in line and ')' not in line
while line.endswith('\\') or parenthesized_lines:
found_index += 1
i, line = lines[found_index]
if ')' in line:
break
else:
break
# no imports found
if found_index < 0:
return
# now we need to move back the patched two lines
entry_line = lines[found_index][0]
new_lines = original_lines[2:entry_line + 1] + original_lines[0:2] + original_lines[entry_line + 1:]
# noinspection PyBroadException
try:
with open(entry_script_file, 'wt') as f:
f.writelines(new_lines)
except Exception:
return

View File

@@ -129,8 +129,9 @@ def get_uptime_string(entry):
def get_runtime_properties_string(runtime_properties):
# type: (List[dict]) -> Tuple[Optional[str], str]
# type: (Optional[List[dict]]) -> Tuple[Optional[str], str]
server_string = []
runtime_properties = runtime_properties or []
force_flag = next(
(prop for prop in runtime_properties if prop["key"] == UptimeConf.worker_key),
None,

View File

@@ -7,7 +7,7 @@ from tempfile import gettempdir, NamedTemporaryFile
from typing import List, Tuple, Optional
from clearml_agent.definitions import ENV_DOCKER_HOST_MOUNT
from clearml_agent.helper.base import warning
from clearml_agent.helper.base import warning, is_windows_platform, safe_remove_file
class Singleton(object):
@@ -22,6 +22,13 @@ class Singleton(object):
_lock_timeout = 10
_pid = None
@classmethod
def close_pid_file(cls):
if cls._pid_file:
cls._pid_file.close()
safe_remove_file(cls._pid_file.name)
cls._pid_file = None
@classmethod
def update_pid_file(cls):
new_pid = str(os.getpid())
@@ -115,7 +122,7 @@ class Singleton(object):
@classmethod
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
if cls.worker_id:
if cls.worker_id and cls.instance_slot is not None:
return cls.worker_id, cls.instance_slot
# make sure we have a unique name
instance_num = 0
@@ -167,7 +174,9 @@ class Singleton(object):
# create lock
cls._pid = str(os.getpid())
cls._pid_file = NamedTemporaryFile(
dir=cls._get_temp_folder(), prefix=cls.prefix + cls.sep + cls._pid + cls.sep, suffix=cls.ext)
dir=cls._get_temp_folder(), prefix=cls.prefix + cls.sep + cls._pid + cls.sep, suffix=cls.ext,
delete=False if is_windows_platform() else True
)
cls._pid_file.write(('{}\n{}'.format(unique_worker_id, cls.instance_slot)).encode())
cls._pid_file.flush()
cls.worker_id = unique_worker_id

View File

@@ -50,7 +50,7 @@ DAEMON_ARGS = dict({
},
'--docker': {
'help': 'Run execution task inside a docker (v19.03 and above). Optional args <image> <arguments> or '
'specify default docker image in agent.default_docker.image / agent.default_docker.arguments'
'specify default docker image in agent.default_docker.image / agent.default_docker.arguments '
'use --gpus/--cpu-only (or set NVIDIA_VISIBLE_DEVICES) to limit gpu visibility for docker',
'nargs': '*',
'default': False,
@@ -78,7 +78,10 @@ DAEMON_ARGS = dict({
},
'--services-mode': {
'help': 'Launch multiple long-term docker services. Implies docker & cpu-only flags.',
'action': 'store_true',
'nargs': '?',
'const': -1,
'type': int,
'default': None,
},
'--create-queue': {
'help': 'Create requested queue if it does not exist already.',
@@ -93,6 +96,13 @@ DAEMON_ARGS = dict({
'help': 'Stop the running agent (based on the same set of arguments)',
'action': 'store_true',
},
'--dynamic-gpus': {
'help': 'Allow to dynamically allocate gpus based on queue properties, '
'configure with \'--queues <queue_name>=<num_gpus>\'.'
' Example: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 single_gpu=1\''
' Example Opportunistic: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 max_quad_gpus=1-4 \'',
'action': 'store_true',
},
'--uptime': {
'help': 'Specify uptime for clearml-agent in "<hours> <days>" format. for example, use "17-20 TUE" to set '
'Tuesday\'s uptime to 17-20'
@@ -101,7 +111,7 @@ DAEMON_ARGS = dict({
'default': None,
},
'--downtime': {
'help': 'Specify uptime for clearml-agent in "<hours> <days>" format. for example, use "09-13 TUE" to set '
'help': 'Specify downtime for clearml-agent in "<hours> <days>" format. for example, use "09-13 TUE" to set '
'Tuesday\'s downtime to 09-13'
'Note: Make sure to have only on of uptime/downtime configuration and not both.',
'nargs': '*',

View File

@@ -204,7 +204,7 @@ class Session(_Session):
folder_keys = ('agent.venvs_dir', 'agent.vcs_cache.path',
'agent.pip_download_cache.path',
'agent.docker_pip_cache', 'agent.docker_apt_cache')
singleton_folders = ('agent.venvs_dir', 'agent.vcs_cache.path', 'agent.docker_apt_cache')
singleton_folders = ('agent.venvs_dir', 'agent.docker_apt_cache')
if ENV_TASK_EXECUTE_AS_USER.get():
folder_keys = tuple(list(folder_keys) + ['sdk.storage.cache.default_base_dir'])

View File

@@ -1 +1 @@
__version__ = '0.17.0'
__version__ = '1.0.1rc0'

View File

@@ -10,9 +10,9 @@ RUN apt-get dist-upgrade -y
RUN apt-get install -y curl python3-pip git
RUN curl -sSL https://get.docker.com/ | sh
RUN python3 -m pip install -U pip
RUN python3 -m pip install trains-agent
RUN python3 -m pip install clearml-agent
RUN python3 -m pip install -U "cryptography>=2.9"
ENV TRAINS_DOCKER_SKIP_GPUS_FLAG=1
ENV CLEARML_DOCKER_SKIP_GPUS_FLAG=1
ENTRYPOINT ["/usr/agent/entrypoint.sh"]

View File

@@ -1,7 +1,7 @@
#!/bin/sh
LOWER_PIP_UPDATE_VERSION="$(echo "$PIP_UPDATE_VERSION" | tr '[:upper:]' '[:lower:]')"
LOWER_TRAINS_AGENT_UPDATE_VERSION="$(echo "$TRAINS_AGENT_UPDATE_VERSION" | tr '[:upper:]' '[:lower:]')"
LOWER_CLEARML_AGENT_UPDATE_VERSION="$(echo "${CLEARML_AGENT_UPDATE_VERSION:-$TRAINS_AGENT_UPDATE_VERSION}" | tr '[:upper:]' '[:lower:]')"
if [ "$LOWER_PIP_UPDATE_VERSION" = "yes" ] || [ "$LOWER_PIP_UPDATE_VERSION" = "true" ] ; then
python3 -m pip install -U pip
@@ -9,11 +9,11 @@ elif [ ! -z "$LOWER_PIP_UPDATE_VERSION" ] ; then
python3 -m pip install pip$LOWER_PIP_UPDATE_VERSION ;
fi
echo "TRAINS_AGENT_UPDATE_VERSION = $LOWER_TRAINS_AGENT_UPDATE_VERSION"
if [ "$LOWER_TRAINS_AGENT_UPDATE_VERSION" = "yes" ] || [ "$LOWER_TRAINS_AGENT_UPDATE_VERSION" = "true" ] ; then
python3 -m pip install trains-agent -U
elif [ ! -z "$LOWER_TRAINS_AGENT_UPDATE_VERSION" ] ; then
python3 -m pip install trains-agent$LOWER_TRAINS_AGENT_UPDATE_VERSION ;
echo "CLEARML_AGENT_UPDATE_VERSION = $LOWER_CLEARML_AGENT_UPDATE_VERSION"
if [ "$LOWER_CLEARML_AGENT_UPDATE_VERSION" = "yes" ] || [ "$LOWER_CLEARML_AGENT_UPDATE_VERSION" = "true" ] ; then
python3 -m pip install clearml-agent -U
elif [ ! -z "$LOWER_CLEARML_AGENT_UPDATE_VERSION" ] ; then
python3 -m pip install clearml-agent$LOWER_CLEARML_AGENT_UPDATE_VERSION ;
fi
python3 -m trains_agent daemon --docker "$TRAINS_AGENT_DEFAULT_BASE_DOCKER" --force-current-version $TRAINS_AGENT_EXTRA_ARGS
python3 -m clearml_agent daemon --docker "${CLEARML_AGENT_DEFAULT_BASE_DOCKER:-$TRAINS_AGENT_DEFAULT_BASE_DOCKER}" --force-current-version ${CLEARML_AGENT_EXTRA_ARGS:-$TRAINS_AGENT_EXTRA_ARGS}

View File

@@ -19,7 +19,7 @@ RUN locale-gen en_US.UTF-8
RUN apt-get install -y curl python3-pip git
RUN curl -sSL https://get.docker.com/ | sh
RUN python3 -m pip install -U pip
RUN python3 -m pip install trains-agent
RUN python3 -m pip install clearml-agent
RUN python3 -m pip install -U "cryptography>=2.9"
ENTRYPOINT ["/usr/agent/entrypoint.sh"]

View File

@@ -1,14 +1,16 @@
#!/bin/sh
if [ -z "$TRAINS_FILES_HOST" ]; then
TRAINS_HOST_IP=${TRAINS_HOST_IP:-$(curl -s https://ifconfig.me/ip)}
CLEARML_FILES_HOST=${CLEARML_FILES_HOST:-$TRAINS_FILES_HOST}
if [ -z "$CLEARML_FILES_HOST" ]; then
CLEARML_HOST_IP=${CLEARML_HOST_IP:-${TRAINS_HOST_IP:-$(curl -s https://ifconfig.me/ip)}}
fi
TRAINS_FILES_HOST=${TRAINS_FILES_HOST:-"http://$TRAINS_HOST_IP:8081"}
TRAINS_WEB_HOST=${TRAINS_WEB_HOST:-"http://$TRAINS_HOST_IP:8080"}
TRAINS_API_HOST=${TRAINS_API_HOST:-"http://$TRAINS_HOST_IP:8008"}
CLEARML_FILES_HOST=${CLEARML_FILES_HOST:-${TRAINS_FILES_HOST:-"http://$CLEARML_HOST_IP:8081"}}
CLEARML_WEB_HOST=${CLEARML_WEB_HOST:-${TRAINS_WEB_HOST:-"http://$CLEARML_HOST_IP:8080"}}
CLEARML_API_HOST=${CLEARML_API_HOST:-${TRAINS_API_HOST:-"http://$CLEARML_HOST_IP:8008"}}
echo $TRAINS_FILES_HOST $TRAINS_WEB_HOST $TRAINS_API_HOST 1>&2
echo $CLEARML_FILES_HOST $CLEARML_WEB_HOST $CLEARML_API_HOST 1>&2
python3 -m pip install -q -U "trains-agent${TRAINS_AGENT_UPDATE_VERSION}"
trains-agent daemon --services-mode --queue services --create-queue --docker "$TRAINS_AGENT_DEFAULT_BASE_DOCKER" --cpu-only $TRAINS_AGENT_EXTRA_ARGS
python3 -m pip install -q -U "clearml-agent${CLEARML_AGENT_UPDATE_VERSION:-$TRAINS_AGENT_UPDATE_VERSION}"
clearml-agent daemon --services-mode --queue services --create-queue --docker "${CLEARML_AGENT_DEFAULT_BASE_DOCKER:-$TRAINS_AGENT_DEFAULT_BASE_DOCKER}" --cpu-only ${CLEARML_AGENT_EXTRA_ARGS:-$TRAINS_AGENT_EXTRA_ARGS}

View File

@@ -24,7 +24,9 @@ agent {
# Force GIT protocol to use SSH regardless of the git url (Assumes GIT user/pass are blank)
force_git_ssh_protocol: false
# Force a specific SSH port when converting http to ssh links (the domain is kept the same)
# force_git_ssh_port: ""
# force_git_ssh_port: 0
# Force a specific SSH username when converting http to ssh links (the default username is 'git')
# force_git_ssh_user: git
# unique name of this worker, if None, created based on hostname:process_id
# Overridden with os environment: CLEARML_WORKER_NAME
@@ -40,6 +42,9 @@ agent {
# Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6"
# The default is the python executing the clearml_agent
python_binary: ""
# ignore any requested python version (Default: False, if a Task was using a
# specific python version and the system supports multiple python the agent will use the requested python version)
# ignore_requested_python_version: true
# select python package manager:
# currently supported pip and conda
@@ -61,7 +66,7 @@ agent {
extra_index_url: []
# additional conda channels to use when installing with conda package manager
conda_channels: ["pytorch", "conda-forge", ]
conda_channels: ["pytorch", "conda-forge", "defaults", ]
# conda_full_env_update: false
# conda_env_as_base_docker: false
@@ -89,17 +94,28 @@ agent {
# target folder for virtual environments builds, created when executing experiment
venvs_dir = ~/.clearml/venvs-builds
# cached virtual environment folder
venvs_cache: {
# maximum number of cached venvs
max_entries: 10
# minimum required free space to allow for cache entry, disable by passing 0 or negative value
free_space_threshold_gb: 2.0
# unmark to enable virtual environment caching
# path: ~/.clearml/venvs-cache
},
# cached git clone folder
vcs_cache: {
enabled: true,
path: ~/.clearml/vcs-cache
},
# DEPRECATED: please use `venvs_cache` and set `venvs_cache.path`
# use venv-update in order to accelerate python virtual environment building
# Still in beta, turned off by default
venv_update: {
enabled: false,
},
# venv_update: {
# enabled: false,
# },
# cached folder for specific python package download (mostly pytorch versions)
pip_download_cache {
@@ -118,23 +134,31 @@ agent {
# optional arguments to pass to docker image
# these are local for this agent and will not be updated in the experiment's docker_cmd section
# extra_docker_arguments: ["--ipc=host", ]
# extra_docker_arguments: ["--ipc=host", "-v", "/mnt/host/data:/mnt/data"]
# optional shell script to run in docker when started before the experiment is started
# extra_docker_shell_script: ["apt-get install -y bindfs", ]
# Install the required packages for opencv libraries (libsm6 libxext6 libxrender-dev libglib2.0-0),
# for backwards compatibility reasons, true as default,
# change to false to skip installation and decrease docker spin up time
# docker_install_opencv_libs: true
# set to true in order to force "docker pull" before running an experiment using a docker image.
# This makes sure the docker image is updated.
docker_force_pull: false
default_docker: {
# default docker image to use when running in docker mode
image: "nvidia/cuda:10.1-runtime-ubuntu18.04"
image: "nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04"
# optional arguments to pass to docker image
# arguments: ["--ipc=host"]
}
# set the OS environments based on the Task's Environment section before launching the Task process.
enable_task_env: false
# CUDA versions used for Conda setup & solving PyTorch wheel packages
# it Should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
# cuda_version: 10.1

View File

@@ -10,12 +10,15 @@ from clearml_agent.glue.k8s import K8sIntegration
def parse_args():
parser = ArgumentParser()
group = parser.add_mutually_exclusive_group()
parser.add_argument(
"--queue", type=str, help="Queue to pull tasks from"
)
parser.add_argument(
group.add_argument(
"--ports-mode", action='store_true', default=False,
help="Ports-Mode will add a label to the pod which can be used as service, in order to expose ports"
"Should not be used with max-pods"
)
parser.add_argument(
"--num-of-services", type=int, default=20,
@@ -24,7 +27,13 @@ def parse_args():
parser.add_argument(
"--base-port", type=int,
help="Used in conjunction with ports-mode, specifies the base port exposed by the services. "
"For pod #X, the port will be <base-port>+X"
"For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num"
"e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003"
)
parser.add_argument(
"--base-pod-num", type=int, default=1,
help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the "
"service (default: %(default)s)"
)
parser.add_argument(
"--gateway-address", type=str, default=None,
@@ -47,6 +56,15 @@ def parse_args():
"--ssh-server-port", type=int, default=0,
help="If non-zero, every pod will also start an SSH server on the selected port (default: zero, not active)"
)
parser.add_argument(
"--namespace", type=str,
help="Specify the namespace in which pods will be created (default: %(default)s)", default="clearml"
)
group.add_argument(
"--max-pods", type=int,
help="Limit the maximum number of pods that this service can run at the same time."
"Should not be used with ports-mode"
)
return parser.parse_args()
@@ -63,10 +81,11 @@ def main():
user_props_cb = k8s_user_props_cb
k8s = K8sIntegration(
ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb,
overrides_yaml=args.overrides_yaml, trains_conf_file=args.pod_trains_conf, template_yaml=args.template_yaml,
extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None
ports_mode=args.ports_mode, num_of_services=args.num_of_services, base_pod_num=args.base_pod_num,
user_props_cb=user_props_cb, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf,
template_yaml=args.template_yaml, extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None,
namespace=args.namespace, max_pods_limit=args.max_pods or None,
)
k8s.k8s_daemon(args.queue)

View File

@@ -2,19 +2,16 @@ attrs>=18.0,<20.4.0
enum34>=0.9,<1.2.0 ; python_version < '3.6'
furl>=2.0.0,<2.2.0
future>=0.16.0,<0.19.0
humanfriendly>=2.1,<9.2
jsonschema>=2.6.0,<3.3.0
pathlib2>=2.3.0,<2.4.0
psutil>=3.4.2,<5.9.0
pyhocon>=0.3.38,<0.4.0
pyparsing>=2.0.3,<2.5.0
python-dateutil>=2.4.2,<2.9.0
pyjwt>=1.6.4,<1.8.0
PyYAML>=3.12,<5.4.0
requests-file>=1.4.2,<1.6.0
pyjwt>=1.6.4,<2.1.0
PyYAML>=3.12,<5.5.0
requests>=2.20.0,<2.26.0
six>=1.11.0,<1.16.0
tqdm>=4.19.5,<4.55.0
typing>=3.6.4,<3.8.0
urllib3>=1.21.1,<1.27.0
virtualenv>=16,<20
virtualenv>=16,<21

View File

@@ -44,7 +44,7 @@ setup(
author_email='clearml@allegro.ai',
license='Apache License 2.0',
classifiers=[
'Development Status :: 4 - Beta',
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'Intended Audience :: System Administrators',
@@ -60,6 +60,7 @@ setup(
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'License :: OSI Approved :: Apache Software License',
],