Compare commits

...

35 Commits

Author SHA1 Message Date
allegroai
2006ab20dd Fix conda support for git+http links 2021-02-23 12:46:06 +02:00
allegroai
0caf31719c Fix venv caching always reinstall git repositories and local repositories 2021-02-23 12:45:34 +02:00
allegroai
5da7184276 Add agent.ignore_requested_python_version (control for multi python environments) 2021-02-23 12:45:00 +02:00
allegroai
50fccdab96 PEP8 2021-02-23 12:44:26 +02:00
allegroai
77d6ff6630 Fix docker mode without venvs cache dir 2021-02-17 00:04:07 +02:00
allegroai
99614702ea Add missing default configuration value 2021-02-17 00:03:42 +02:00
allegroai
58cb344ee6 Upgrade pynvml add detect CUDA version from driver level 2021-02-17 00:03:16 +02:00
allegroai
22d5892b12 Use shared git cache between multiple agents on the same machine 2021-02-14 13:49:29 +02:00
allegroai
f619969efc Add venvs_cache configuration 2021-02-14 13:48:57 +02:00
allegroai
ca242424ab Fix service-mode support for venvs
Fix --services-mode with venvs
2021-02-14 13:45:17 +02:00
allegroai
407deb84e9 Fix multi instances on Windows 2021-02-14 13:44:39 +02:00
allegroai
14589aa094 Fix CPU mode 2021-02-14 13:44:00 +02:00
allegroai
1260e3d942 Update cache entries on conda package manager 2021-02-11 14:47:26 +02:00
allegroai
b22d926d94 Fix cache to take cuda version into account 2021-02-11 14:47:05 +02:00
allegroai
410cc8c7be Add --dynamic-gpus and limit in --services-mode 2021-02-11 14:46:37 +02:00
allegroai
784c676f5b Fix "from clearml" runtime diff patching (make sure we move it to after all the __future__ imports) include handling triple quotes in comments 2021-02-11 14:46:06 +02:00
allegroai
296f7970df Fix file not found error (no 2) interpreted as aborted (i.e. ctrl-c) 2021-02-11 14:44:54 +02:00
allegroai
cd59933c9c Remove unused packages 2021-02-11 14:44:35 +02:00
allegroai
b95d3f5300 Add venv caching with docker mode support 2021-02-11 14:44:19 +02:00
allegroai
fa0d5d8469 Fix --detached not supported on Windows, ignore and issue warning 2021-02-11 14:40:09 +02:00
allegroai
8229843018 Add base-pod-number parameter to k8s glue and example 2021-01-26 20:00:18 +02:00
allegroai
c578b37c6d Change dump configuration and ssh on every docker run 2021-01-24 08:48:10 +02:00
allegroai
8ea062c0bd Fix environment variables CLEARML_WEB_HOST/CLEARML_FILES_HOST not passed to running tasks (or updated on the config object) 2021-01-24 08:47:33 +02:00
allegroai
5d8bbde434 Fix applying git diff on new added file 2021-01-24 08:46:42 +02:00
allegroai
0462af6a3d Allow providing namespace in k8s glue and k8s glue example 2021-01-20 19:01:03 +02:00
allegroai
5a94a4048e Update agent and services docker files 2021-01-18 11:40:11 +02:00
allegroai
2602301e1d Improve agent.extra_docker_arguments documentation 2021-01-10 12:40:24 +02:00
allegroai
161993f66f Add agent.force_git_ssh_user configuration value (issue #42)
Change default docker to nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04
2021-01-10 12:38:45 +02:00
allegroai
b7f87fb8d3 Detect and delete "stuck" k8s pods k8s glue 2021-01-10 12:37:13 +02:00
allegroai
8fdb87f1f5 Fix docker --network returns None 2020-12-30 16:57:04 +02:00
Allegro AI
a9a68d230e Update README.md 2020-12-25 04:23:12 +02:00
allegroai
a1f2941ffd version bump 2020-12-25 02:10:06 +02:00
allegroai
c548eeacfc status stable 2020-12-25 02:09:54 +02:00
allegroai
428781af86 Fix support for Windows pip and Conda requirements.txt 2020-12-25 02:06:40 +02:00
Allegro AI
72efe2e9fe Update README.md 2020-12-23 01:42:10 +02:00
33 changed files with 3546 additions and 319 deletions

View File

@@ -8,7 +8,6 @@ ML-Ops scheduler & orchestration solution supporting Linux, macOS and Windows**
[![GitHub license](https://img.shields.io/github/license/allegroai/trains-agent.svg)](https://img.shields.io/github/license/allegroai/trains-agent.svg)
[![PyPI pyversions](https://img.shields.io/pypi/pyversions/clearml-agent.svg)](https://img.shields.io/pypi/pyversions/clearml-agent.svg)
[![PyPI version shields.io](https://img.shields.io/pypi/v/clearml-agent.svg)](https://img.shields.io/pypi/v/clearml-agent.svg)
[![PyPI status](https://img.shields.io/pypi/status/clearml-agent.svg)](https://pypi.python.org/pypi/clearml-agent/)
</div>
@@ -22,9 +21,9 @@ ML-Ops scheduler & orchestration solution supporting Linux, macOS and Windows**
* Implement optimized resource utilization policies
* Deploy execution environments with either virtualenv or fully docker containerized with zero effort
* Launch-and-Forget service containers
* [Cloud autoscaling](https://allegro.ai/clearml/docs/examples/services/aws_autoscaler/aws_autoscaler/)
* [Customizable cleanup](https://allegro.ai/clearml/docs/examples/services/cleanup/cleanup_service/)
* Advanced [pipeline building and execution](https://allegro.ai/clearml/docs/examples/frameworks/pytorch/notebooks/table/tabular_training_pipeline/)
* [Cloud autoscaling](https://allegro.ai/clearml/docs/docs/examples/services/aws_autoscaler/aws_autoscaler.html)
* [Customizable cleanup](https://allegro.ai/clearml/docs/docs/examples/services/cleanup/cleanup_service.html)
* Advanced [pipeline building and execution](https://allegro.ai/clearml/docs/docs/examples/frameworks/pytorch/notebooks/table/tabular_training_pipeline.html)
It is a zero configuration fire-and-forget execution agent, providing a full ML/DL cluster solution.
@@ -123,7 +122,7 @@ The ClearML Agent executes experiments using the following process:
#### System Design & Flow
<img src="https://allegro.ai/clearml/docs/img/ClearML_Architecture.png" width="100%" alt="clearml-architecture">
<img src="https://allegro.ai/clearml/docs/_images/ClearML_Architecture.png" width="100%" alt="clearml-architecture">
#### Installing the ClearML Agent

View File

@@ -19,6 +19,8 @@
force_git_ssh_protocol: false
# Force a specific SSH port when converting http to ssh links (the domain is kept the same)
# force_git_ssh_port: 0
# Force a specific SSH username when converting http to ssh links (the default username is 'git')
# force_git_ssh_user: git
# Set the python version to use when creating the virtual environment and launching the experiment
# Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6"
@@ -75,6 +77,16 @@
# target folder for virtual environments builds, created when executing experiment
venvs_dir = ~/.clearml/venvs-builds
# cached virtual environment folder
venvs_cache: {
# maximum number of cached venvs
max_entries: 10
# minimum required free space to allow for cache entry, disable by passing 0 or negative value
free_space_threshold_gb: 2.0
# unmark to enable virtual environment caching
# path: ~/.clearml/venvs-cache
},
# cached git clone folder
vcs_cache: {
enabled: true,
@@ -131,7 +143,7 @@
default_docker: {
# default docker image to use when running in docker mode
image: "nvidia/cuda:10.1-runtime-ubuntu18.04"
image: "nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04"
# optional arguments to pass to docker image
# arguments: ["--ipc=host", ]

View File

@@ -55,7 +55,7 @@ def backward_compatibility_support():
continue
# set OS environ:
keys = environ.keys()
keys = list(environ.keys())
for k in keys:
if not k.startswith('CLEARML_'):
continue

File diff suppressed because it is too large Load Diff

View File

@@ -64,6 +64,8 @@ class EnvironmentConfig(object):
ENVIRONMENT_CONFIG = {
"api.api_server": EnvironmentConfig("CLEARML_API_HOST", "TRAINS_API_HOST", ),
"api.files_server": EnvironmentConfig("CLEARML_FILES_HOST", "TRAINS_FILES_HOST", ),
"api.web_server": EnvironmentConfig("CLEARML_WEB_HOST", "TRAINS_WEB_HOST", ),
"api.credentials.access_key": EnvironmentConfig(
"CLEARML_API_ACCESS_KEY", "TRAINS_API_ACCESS_KEY",
),

View File

@@ -16,7 +16,7 @@ def parse(reqstr):
filename = getattr(reqstr, 'name', None)
try:
# Python 2.x compatibility
if not isinstance(reqstr, basestring):
if not isinstance(reqstr, basestring): # noqa
reqstr = reqstr.read()
except NameError:
# Python 3.x only

View File

@@ -1,18 +1,21 @@
from __future__ import print_function, division, unicode_literals
import base64
import functools
import json
import logging
import os
import re
import subprocess
import tempfile
from copy import deepcopy
import yaml
import json
from pathlib import Path
from threading import Thread
from time import sleep
from typing import Text, List
import yaml
from clearml_agent.commands.events import Events
from clearml_agent.commands.worker import Worker
from clearml_agent.definitions import ENV_DOCKER_IMAGE
@@ -27,18 +30,20 @@ from clearml_agent.interface.base import ObjectID
class K8sIntegration(Worker):
K8S_PENDING_QUEUE = "k8s_scheduler"
K8S_DEFAULT_NAMESPACE = "clearml"
KUBECTL_APPLY_CMD = "kubectl apply -f"
KUBECTL_RUN_CMD = "kubectl run clearml-{queue_name}-id-{task_id} " \
"--image {docker_image} " \
"--restart=Never --replicas=1 " \
"--generator=run-pod/v1 " \
"--namespace=clearml"
"--namespace={namespace}"
KUBECTL_DELETE_CMD = "kubectl delete pods " \
"--selector=TRAINS=agent " \
"--field-selector=status.phase!=Pending,status.phase!=Running " \
"--namespace=clearml"
"--namespace={namespace}"
BASH_INSTALL_SSH_CMD = [
"apt-get install -y openssh-server",
@@ -83,11 +88,14 @@ class K8sIntegration(Worker):
debug=False,
ports_mode=False,
num_of_services=20,
base_pod_num=1,
user_props_cb=None,
overrides_yaml=None,
template_yaml=None,
trains_conf_file=None,
clearml_conf_file=None,
extra_bash_init_script=None,
namespace=None,
**kwargs
):
"""
Initialize the k8s integration glue layer daemon
@@ -104,14 +112,16 @@ class K8sIntegration(Worker):
Requires the `num_of_services` parameter.
:param int num_of_services: Number of k8s services configured in the cluster. Required if `port_mode` is True.
(default: 20)
:param int base_pod_num: Used when `ports_mode` is True, sets the base pod number to a given value (default: 1)
:param callable user_props_cb: An Optional callable allowing additional user properties to be specified
when scheduling a task to run in a pod. Callable can receive an optional pod number and should return
a dictionary of user properties (name and value). Signature is [[Optional[int]], Dict[str,str]]
:param str overrides_yaml: YAML file containing the overrides for the pod (optional)
:param str template_yaml: YAML file containing the template for the pod (optional).
If provided the pod is scheduled with kubectl apply and overrides are ignored, otherwise with kubectl run.
:param str trains_conf_file: clearml.conf file to be use by the pod itself (optional)
:param str clearml_conf_file: clearml.conf file to be use by the pod itself (optional)
:param str extra_bash_init_script: Additional bash script to run before starting the Task inside the container
:param str namespace: K8S namespace to be used when creating the new pods (default: clearml)
"""
super(K8sIntegration, self).__init__()
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
@@ -125,14 +135,16 @@ class K8sIntegration(Worker):
self.log.logger.setLevel(logging.INFO)
self.ports_mode = ports_mode
self.num_of_services = num_of_services
self.base_pod_num = base_pod_num
self._edit_hyperparams_support = None
self._user_props_cb = user_props_cb
self.trains_conf_file = None
self.conf_file_content = None
self.overrides_json_string = None
self.template_dict = None
self.extra_bash_init_script = extra_bash_init_script or None
if self.extra_bash_init_script and not isinstance(self.extra_bash_init_script, str):
self.extra_bash_init_script = ' ; '.join(self.extra_bash_init_script) # noqa
self.namespace = namespace or self.K8S_DEFAULT_NAMESPACE
self.pod_limits = []
self.pod_requests = []
if overrides_yaml:
@@ -161,11 +173,59 @@ class K8sIntegration(Worker):
with open(os.path.expandvars(os.path.expanduser(str(template_yaml))), 'rt') as f:
self.template_dict = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
if trains_conf_file:
with open(os.path.expandvars(os.path.expanduser(str(trains_conf_file))), 'rt') as f:
self.trains_conf_file = f.read()
clearml_conf_file = clearml_conf_file or kwargs.get('trains_conf_file')
if clearml_conf_file:
with open(os.path.expandvars(os.path.expanduser(str(clearml_conf_file))), 'rt') as f:
self.conf_file_content = f.read()
# make sure we use system packages!
self.trains_conf_file += '\nagent.package_manager.system_site_packages=true\n'
self.conf_file_content += '\nagent.package_manager.system_site_packages=true\n'
self._monitor_hanging_pods()
def _monitor_hanging_pods(self):
_check_pod_thread = Thread(target=self._monitor_hanging_pods_daemon)
_check_pod_thread.daemon = True
_check_pod_thread.start()
def _monitor_hanging_pods_daemon(self):
while True:
output = get_bash_output('kubectl get pods -n {namespace} -o=JSON'.format(
namespace=self.namespace
))
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
try:
output_config = json.loads(output)
except Exception as ex:
self.log.warning('K8S Glue pods monitor: Failed parsing kubectl output:\n{}\nEx: {}'.format(output, ex))
sleep(self._polling_interval)
continue
pods = output_config.get('items', [])
for pod in pods:
try:
reason = functools.reduce(
lambda a, b: a[b], ('status', 'containerStatuses', 0, 'state', 'waiting', 'reason'), pod
)
except (IndexError, KeyError):
continue
if reason == 'ImagePullBackOff':
pod_name = pod.get('metadata', {}).get('name', None)
if pod_name:
task_id = pod_name.rpartition('-')[-1]
delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, self.namespace)
get_bash_output(delete_pod_cmd)
try:
self._session.api_client.tasks.failed(
task=task_id,
status_reason="K8S glue error due to ImagePullBackOff",
status_message="Changed by K8S glue",
force=True
)
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: Failed deleting task "{}"\nEX: {}'.format(task_id, ex)
)
sleep(self._polling_interval)
def _set_task_user_properties(self, task_id: str, **properties: str):
if self._edit_hyperparams_support is not True:
@@ -225,8 +285,11 @@ class K8sIntegration(Worker):
# get the clearml.conf encoded file
# noinspection PyProtectedMember
hocon_config_encoded = (self.trains_conf_file or self._session._config_file).encode('ascii')
create_trains_conf = "echo '{}' | base64 --decode >> ~/clearml.conf".format(
hocon_config_encoded = (
self.conf_file_content
or Path(self._session._config_file).read_text()
).encode("ascii")
create_clearml_conf = "echo '{}' | base64 --decode >> ~/clearml.conf".format(
base64.b64encode(
hocon_config_encoded
).decode('ascii')
@@ -246,11 +309,14 @@ class K8sIntegration(Worker):
safe_queue_name = re.sub(r'\W+', '', safe_queue_name).replace('_', '').replace('-', '')
# Search for a free pod number
pod_number = 1
pod_count = 0
pod_number = self.base_pod_num
while self.ports_mode:
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n clearml".format(
pod_number = self.base_pod_num + pod_count
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format(
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
agent_label=self.AGENT_LABEL
agent_label=self.AGENT_LABEL,
namespace=self.namespace,
)
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
@@ -260,7 +326,7 @@ class K8sIntegration(Worker):
if not output:
# No such pod exist so we can use the pod_number we found
break
if pod_number >= self.num_of_services:
if pod_count >= self.num_of_services - 1:
# All pod numbers are taken, exit
self.log.warning(
"kubectl last result: {}\n{}\nAll k8s services are in use, task '{}' "
@@ -272,23 +338,23 @@ class K8sIntegration(Worker):
self._session.api_client.tasks.enqueue(
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)')
return
pod_number += 1
pod_count += 1
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL]
if self.ports_mode:
print("Kubernetes scheduling task id={} on pod={}".format(task_id, pod_number))
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
else:
print("Kubernetes scheduling task id={}".format(task_id))
if self.template_dict:
output, error = self._kubectl_apply(
create_trains_conf=create_trains_conf,
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_image, docker_args=docker_args,
task_id=task_id, queue=queue, queue_name=safe_queue_name)
else:
output, error = self._kubectl_run(
create_trains_conf=create_trains_conf,
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_image,
task_data=task_data,
task_id=task_id, queue=queue, queue_name=safe_queue_name)
@@ -297,11 +363,19 @@ class K8sIntegration(Worker):
output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8'))
print('kubectl output:\n{}\n{}'.format(error, output))
if error:
self.log.error("Running kubectl encountered an error: {}".format(error))
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
user_props = {"k8s-queue": str(queue_name)}
if self.ports_mode:
user_props.update({"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]})
user_props.update(
{
"k8s-pod-number": pod_number,
"k8s-pod-label": labels[0],
"k8s-internal-pod-count": pod_count,
}
)
if self._user_props_cb:
# noinspection PyBroadException
@@ -330,7 +404,7 @@ class K8sIntegration(Worker):
self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd))
return kube_args
def _kubectl_apply(self, create_trains_conf, docker_image, docker_args, labels, queue, task_id, queue_name):
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id, queue_name):
template = deepcopy(self.template_dict)
template.setdefault('apiVersion', 'v1')
template['kind'] = 'Pod'
@@ -364,7 +438,7 @@ class K8sIntegration(Worker):
container,
dict(name=name, image=docker_image,
command=['/bin/bash'],
args=['-c', '{} ; {}'.format(create_trains_conf, create_init_script)])
args=['-c', '{} ; {}'.format(create_clearml_conf, create_init_script)])
)
if template['spec']['containers']:
@@ -381,6 +455,7 @@ class K8sIntegration(Worker):
task_id=task_id,
docker_image=docker_image,
queue_id=queue,
namespace=self.namespace
)
# make sure we provide a list
if isinstance(kubectl_cmd, str):
@@ -398,7 +473,7 @@ class K8sIntegration(Worker):
return output, error
def _kubectl_run(self, create_trains_conf, docker_image, labels, queue, task_data, task_id, queue_name):
def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id, queue_name):
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data, queue_name)
else:
@@ -406,7 +481,8 @@ class K8sIntegration(Worker):
queue_name=queue_name,
task_id=task_id,
docker_image=docker_image,
queue_id=queue
queue_id=queue,
namespace=self.namespace,
)
# make sure we provide a list
if isinstance(kubectl_cmd, str):
@@ -430,7 +506,7 @@ class K8sIntegration(Worker):
"--",
"/bin/sh",
"-c",
"{} ; {}".format(create_trains_conf, container_bash_script.format(
"{} ; {}".format(create_clearml_conf, container_bash_script.format(
extra_bash_init_cmd=self.extra_bash_init_script, task_id=task_id)),
]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@@ -464,7 +540,7 @@ class K8sIntegration(Worker):
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed / failed pods
get_bash_output(self.KUBECTL_DELETE_CMD)
get_bash_output(self.KUBECTL_DELETE_CMD.format(namespace=self.namespace))
# get next task in queue
try:

View File

@@ -24,7 +24,6 @@ import pyhocon
import yaml
from attr import fields_dict
from pathlib2 import Path
from tqdm import tqdm
import six
from six.moves import reduce
@@ -399,12 +398,6 @@ class TqdmStream(object):
self.buffer.write('\n')
class TqdmLog(tqdm):
def __init__(self, iterable=None, file=None, **kwargs):
super(TqdmLog, self).__init__(iterable, file=TqdmStream(file or sys.stderr), **kwargs)
def url_join(first, *rest):
"""
Join url parts similarly to Path.join

View File

@@ -20,6 +20,7 @@ import platform
import sys
import time
from datetime import datetime
from typing import Optional
import psutil
from ..gpu import pynvml as N
@@ -390,3 +391,34 @@ def new_query(shutdown=False, per_process_stats=False, get_driver_info=False):
'''
return GPUStatCollection.new_query(shutdown=shutdown, per_process_stats=per_process_stats,
get_driver_info=get_driver_info)
def get_driver_cuda_version():
# type: () -> Optional[str]
"""
:return: Return detected CUDA version from driver. On fail return value is None.
Example: `110` is cuda version 11.0
"""
# noinspection PyBroadException
try:
N.nvmlInit()
except BaseException:
return None
# noinspection PyBroadException
try:
cuda_version = str(N.nvmlSystemGetCudaDriverVersion())
except BaseException:
# noinspection PyBroadException
try:
cuda_version = str(N.nvmlSystemGetCudaDriverVersion_v2())
except BaseException:
cuda_version = ''
# noinspection PyBroadException
try:
N.nvmlShutdown()
except BaseException:
return None
return cuda_version[:3] if cuda_version else None

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,218 @@
import os
import shutil
from logging import warning
from random import random
from time import time
from typing import List, Optional, Sequence
import psutil
from pathlib2 import Path
from .locks import FileLock
class FolderCache(object):
_lock_filename = '.clearml.lock'
_lock_timeout_seconds = 30
_temp_entry_prefix = '_temp.'
def __init__(self, cache_folder, max_cache_entries=5, min_free_space_gb=None):
self._cache_folder = Path(os.path.expandvars(cache_folder)).expanduser().absolute()
self._cache_folder.mkdir(parents=True, exist_ok=True)
self._max_cache_entries = max_cache_entries
self._last_copied_entry_folder = None
self._min_free_space_gb = min_free_space_gb if min_free_space_gb and min_free_space_gb > 0 else None
self._lock = FileLock((self._cache_folder / self._lock_filename).as_posix())
def get_cache_folder(self):
# type: () -> Path
"""
:return: Return the base cache folder
"""
return self._cache_folder
def copy_cached_entry(self, keys, destination):
# type: (List[str], Path) -> Optional[Path]
"""
Copy a cached entry into a destination directory, if the cached entry does not exist return None
:param keys:
:param destination:
:return: Target path, None if cached entry does not exist
"""
self._last_copied_entry_folder = None
if not keys:
return None
# lock so we make sure no one deletes it before we copy it
# noinspection PyBroadException
try:
self._lock.acquire(timeout=self._lock_timeout_seconds)
except BaseException as ex:
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
return None
src = None
try:
src = self.get_entry(keys)
if src:
destination = Path(destination).absolute()
destination.mkdir(parents=True, exist_ok=True)
shutil.rmtree(destination.as_posix())
shutil.copytree(src.as_posix(), dst=destination.as_posix(), symlinks=True)
except BaseException as ex:
warning('Could not copy cache folder {} to {}: {}'.format(src, destination, ex))
self._lock.release()
return None
# release Lock
self._lock.release()
self._last_copied_entry_folder = src
return destination if src else None
def get_entry(self, keys):
# type: (List[str]) -> Optional[Path]
"""
Return a folder (a sub-folder of inside the cache_folder) matching one of the keys
:param keys: List of keys, return the first match to one of the keys, notice keys cannot contain '.'
:return: Path to the sub-folder or None if none was found
"""
if not keys:
return None
# conform keys
keys = [keys] if isinstance(keys, str) else keys
keys = sorted([k.replace('.', '_') for k in keys])
for cache_folder in self._cache_folder.glob('*'):
if cache_folder.is_dir() and any(True for k in cache_folder.name.split('.') if k in keys):
cache_folder.touch()
return cache_folder
return None
def add_entry(self, keys, source_folder, exclude_sub_folders=None):
# type: (List[str], Path, Optional[Sequence[str]]) -> bool
"""
Add a local folder into the cache, copy all sub-folders inside `source_folder`
excluding folders matching `exclude_sub_folders` list
:param keys: Cache entry keys list (str)
:param source_folder: Folder to copy into the cache
:param exclude_sub_folders: List of sub-folders to exclude from the copy operation
:return: return True is new entry was added to cache
"""
if not keys:
return False
keys = [keys] if isinstance(keys, str) else keys
keys = sorted([k.replace('.', '_') for k in keys])
# If entry already exists skip it
cached_entry = self.get_entry(keys)
if cached_entry:
# make sure the entry contains all keys
cached_keys = cached_entry.name.split('.')
if set(keys) - set(cached_keys):
# noinspection PyBroadException
try:
self._lock.acquire(timeout=self._lock_timeout_seconds)
except BaseException as ex:
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
# failed locking do nothing
return True
keys = sorted(list(set(keys) | set(cached_keys)))
dst = cached_entry.parent / '.'.join(keys)
# rename
try:
shutil.move(src=cached_entry.as_posix(), dst=dst.as_posix())
except BaseException as ex:
warning('Could not rename cache entry {} to {}: ex'.format(
cached_entry.as_posix(), dst.as_posix(), ex))
# release lock
self._lock.release()
return True
# make sure we remove old entries
self._remove_old_entries()
# if we do not have enough free space, do nothing.
if not self._check_min_free_space():
warning('Could not add cache entry, not enough free space on drive, '
'free space threshold {} GB. Clearing all cache entries!'.format(self._min_free_space_gb))
self._remove_old_entries(max_cache_entries=0)
return False
# create the new entry for us
exclude_sub_folders = exclude_sub_folders or []
source_folder = Path(source_folder).absolute()
# create temp folder
temp_folder = \
self._temp_entry_prefix + \
'{}.{}'.format(str(time()).replace('.', '_'), str(random()).replace('.', '_'))
temp_folder = self._cache_folder / temp_folder
temp_folder.mkdir(parents=True, exist_ok=False)
for f in source_folder.glob('*'):
if f.name in exclude_sub_folders:
continue
shutil.copytree(src=f.as_posix(), dst=(temp_folder / f.name).as_posix(), symlinks=True)
# rename the target folder
target_cache_folder = self._cache_folder / '.'.join(keys)
# if we failed moving it means someone else created the cached entry before us, we can just leave
# noinspection PyBroadException
try:
shutil.move(src=temp_folder.as_posix(), dst=target_cache_folder.as_posix())
except BaseException:
# noinspection PyBroadException
try:
shutil.rmtree(path=temp_folder.as_posix())
except BaseException:
return False
return True
def get_last_copied_entry(self):
# type: () -> Optional[Path]
"""
:return: the last copied cached entry folder inside the cache
"""
return self._last_copied_entry_folder
def _remove_old_entries(self, max_cache_entries=None):
# type: (Optional[int]) -> ()
"""
Notice we only keep self._max_cache_entries-1, assuming we will be adding a new entry soon
:param int max_cache_entries: if not None use instead of self._max_cache_entries
"""
folder_entries = [(cache_folder, cache_folder.stat().st_mtime)
for cache_folder in self._cache_folder.glob('*')
if cache_folder.is_dir() and not cache_folder.name.startswith(self._temp_entry_prefix)]
folder_entries = sorted(folder_entries, key=lambda x: x[1], reverse=True)
# lock so we make sure no one deletes it before we copy it
# noinspection PyBroadException
try:
self._lock.acquire(timeout=self._lock_timeout_seconds)
except BaseException as ex:
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
return
number_of_entries_to_keep = self._max_cache_entries - 1 \
if max_cache_entries is None else max(0, int(max_cache_entries))
for folder, ts in folder_entries[number_of_entries_to_keep:]:
try:
shutil.rmtree(folder.as_posix(), ignore_errors=True)
except BaseException as ex:
warning('Could not delete cache entry {}: {}'.format(folder.as_posix(), ex))
self._lock.release()
def _check_min_free_space(self):
# type: () -> bool
"""
:return: return False if we hit the free space limit.
If not free space limit provided, always return True
"""
if not self._min_free_space_gb or not self._cache_folder:
return True
free_space = float(psutil.disk_usage(self._cache_folder.as_posix()).free)
free_space /= 2**30
return free_space > self._min_free_space_gb

View File

@@ -0,0 +1,211 @@
import os
import time
import tempfile
import contextlib
from .portalocker import constants, exceptions, lock, unlock
current_time = getattr(time, "monotonic", time.time)
DEFAULT_TIMEOUT = 10 ** 8
DEFAULT_CHECK_INTERVAL = 0.25
LOCK_METHOD = constants.LOCK_EX | constants.LOCK_NB
__all__ = [
'FileLock',
'open_atomic',
]
@contextlib.contextmanager
def open_atomic(filename, binary=True):
"""Open a file for atomic writing. Instead of locking this method allows
you to write the entire file and move it to the actual location. Note that
this makes the assumption that a rename is atomic on your platform which
is generally the case but not a guarantee.
http://docs.python.org/library/os.html#os.rename
>>> filename = 'test_file.txt'
>>> if os.path.exists(filename):
... os.remove(filename)
>>> with open_atomic(filename) as fh:
... written = fh.write(b'test')
>>> assert os.path.exists(filename)
>>> os.remove(filename)
"""
assert not os.path.exists(filename), '%r exists' % filename
path, name = os.path.split(filename)
# Create the parent directory if it doesn't exist
if path and not os.path.isdir(path): # pragma: no cover
os.makedirs(path)
temp_fh = tempfile.NamedTemporaryFile(
mode=binary and 'wb' or 'w',
dir=path,
delete=False,
)
yield temp_fh
temp_fh.flush()
os.fsync(temp_fh.fileno())
temp_fh.close()
try:
os.rename(temp_fh.name, filename)
finally:
try:
os.remove(temp_fh.name)
except Exception: # noqa
pass
class FileLock(object):
def __init__(
self, filename, mode='a', timeout=DEFAULT_TIMEOUT,
check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=False,
flags=LOCK_METHOD, **file_open_kwargs):
"""Lock manager with build-in timeout
filename -- filename
mode -- the open mode, 'a' or 'ab' should be used for writing
truncate -- use truncate to emulate 'w' mode, None is disabled, 0 is
truncate to 0 bytes
timeout -- timeout when trying to acquire a lock
check_interval -- check interval while waiting
fail_when_locked -- after the initial lock failed, return an error
or lock the file
**file_open_kwargs -- The kwargs for the `open(...)` call
fail_when_locked is useful when multiple threads/processes can race
when creating a file. If set to true than the system will wait till
the lock was acquired and then return an AlreadyLocked exception.
Note that the file is opened first and locked later. So using 'w' as
mode will result in truncate _BEFORE_ the lock is checked.
"""
if 'w' in mode:
truncate = True
mode = mode.replace('w', 'a')
else:
truncate = False
self.fh = None
self.filename = filename
self.mode = mode
self.truncate = truncate
self.timeout = timeout
self.check_interval = check_interval
self.fail_when_locked = fail_when_locked
self.flags = flags
self.file_open_kwargs = file_open_kwargs
def acquire(
self, timeout=None, check_interval=None, fail_when_locked=None):
"""Acquire the locked filehandle"""
if timeout is None:
timeout = self.timeout
if timeout is None:
timeout = 0
if check_interval is None:
check_interval = self.check_interval
if fail_when_locked is None:
fail_when_locked = self.fail_when_locked
# If we already have a filehandle, return it
fh = self.fh
if fh:
return fh
# Get a new filehandler
fh = self._get_fh()
try:
# Try to lock
fh = self._get_lock(fh)
except exceptions.LockException as exception:
# Try till the timeout has passed
timeoutend = current_time() + timeout
while timeoutend > current_time():
# Wait a bit
time.sleep(check_interval)
# Try again
try:
# We already tried to the get the lock
# If fail_when_locked is true, then stop trying
if fail_when_locked:
raise exceptions.AlreadyLocked(exception)
else: # pragma: no cover
# We've got the lock
fh = self._get_lock(fh)
break
except exceptions.LockException:
pass
else:
# We got a timeout... reraising
raise exceptions.LockException(exception)
# Prepare the filehandle (truncate if needed)
fh = self._prepare_fh(fh)
self.fh = fh
return fh
def release(self):
"""Releases the currently locked file handle"""
if self.fh:
# noinspection PyBroadException
try:
unlock(self.fh)
except Exception:
pass
# noinspection PyBroadException
try:
self.fh.close()
except Exception:
pass
self.fh = None
def _get_fh(self):
"""Get a new filehandle"""
return open(self.filename, self.mode, **self.file_open_kwargs)
def _get_lock(self, fh):
"""
Try to lock the given filehandle
returns LockException if it fails"""
lock(fh, self.flags)
return fh
def _prepare_fh(self, fh):
"""
Prepare the filehandle for usage
If truncate is a number, the file will be truncated to that amount of
bytes
"""
if self.truncate:
fh.seek(0)
fh.truncate(0)
return fh
def __enter__(self):
return self.acquire()
def __exit__(self, type_, value, tb):
self.release()
def __delete__(self, instance): # pragma: no cover
instance.release()

View File

@@ -0,0 +1,193 @@
import os
import sys
class exceptions:
class BaseLockException(Exception):
# Error codes:
LOCK_FAILED = 1
def __init__(self, *args, **kwargs):
self.fh = kwargs.pop('fh', None)
Exception.__init__(self, *args, **kwargs)
class LockException(BaseLockException):
pass
class AlreadyLocked(BaseLockException):
pass
class FileToLarge(BaseLockException):
pass
class constants:
# The actual tests will execute the code anyhow so the following code can
# safely be ignored from the coverage tests
if os.name == 'nt': # pragma: no cover
import msvcrt
LOCK_EX = 0x1 #: exclusive lock
LOCK_SH = 0x2 #: shared lock
LOCK_NB = 0x4 #: non-blocking
LOCK_UN = msvcrt.LK_UNLCK #: unlock
LOCKFILE_FAIL_IMMEDIATELY = 1
LOCKFILE_EXCLUSIVE_LOCK = 2
elif os.name == 'posix': # pragma: no cover
import fcntl
LOCK_EX = fcntl.LOCK_EX #: exclusive lock
LOCK_SH = fcntl.LOCK_SH #: shared lock
LOCK_NB = fcntl.LOCK_NB #: non-blocking
LOCK_UN = fcntl.LOCK_UN #: unlock
else: # pragma: no cover
raise RuntimeError('PortaLocker only defined for nt and posix platforms')
if os.name == 'nt': # pragma: no cover
import msvcrt
if sys.version_info.major == 2:
lock_length = -1
else:
lock_length = int(2**31 - 1)
def lock(file_, flags):
if flags & constants.LOCK_SH:
import win32file
import pywintypes
import winerror
__overlapped = pywintypes.OVERLAPPED()
if sys.version_info.major == 2:
if flags & constants.LOCK_NB:
mode = constants.LOCKFILE_FAIL_IMMEDIATELY
else:
mode = 0
else:
if flags & constants.LOCK_NB:
mode = msvcrt.LK_NBRLCK
else:
mode = msvcrt.LK_RLCK
# is there any reason not to reuse the following structure?
hfile = win32file._get_osfhandle(file_.fileno())
try:
win32file.LockFileEx(hfile, mode, 0, -0x10000, __overlapped)
except pywintypes.error as exc_value:
# error: (33, 'LockFileEx', 'The process cannot access the file
# because another process has locked a portion of the file.')
if exc_value.winerror == winerror.ERROR_LOCK_VIOLATION:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
else:
# Q: Are there exceptions/codes we should be dealing with
# here?
raise
else:
mode = constants.LOCKFILE_EXCLUSIVE_LOCK
if flags & constants.LOCK_NB:
mode |= constants.LOCKFILE_FAIL_IMMEDIATELY
if flags & constants.LOCK_NB:
mode = msvcrt.LK_NBLCK
else:
mode = msvcrt.LK_LOCK
# windows locks byte ranges, so make sure to lock from file start
try:
savepos = file_.tell()
if savepos:
# [ ] test exclusive lock fails on seek here
# [ ] test if shared lock passes this point
file_.seek(0)
# [x] check if 0 param locks entire file (not documented in
# Python)
# [x] fails with "IOError: [Errno 13] Permission denied",
# but -1 seems to do the trick
try:
msvcrt.locking(file_.fileno(), mode, lock_length)
except IOError as exc_value:
# [ ] be more specific here
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
finally:
if savepos:
file_.seek(savepos)
except IOError as exc_value:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
fh=file_)
def unlock(file_):
try:
savepos = file_.tell()
if savepos:
file_.seek(0)
try:
msvcrt.locking(file_.fileno(), constants.LOCK_UN, lock_length)
except IOError as exc_value:
if exc_value.strerror == 'Permission denied':
import pywintypes
import win32file
import winerror
__overlapped = pywintypes.OVERLAPPED()
hfile = win32file._get_osfhandle(file_.fileno())
try:
win32file.UnlockFileEx(
hfile, 0, -0x10000, __overlapped)
except pywintypes.error as exc_value:
if exc_value.winerror == winerror.ERROR_NOT_LOCKED:
# error: (158, 'UnlockFileEx',
# 'The segment is already unlocked.')
# To match the 'posix' implementation, silently
# ignore this error
pass
else:
# Q: Are there exceptions/codes we should be
# dealing with here?
raise
else:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
finally:
if savepos:
file_.seek(savepos)
except IOError as exc_value:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
fh=file_)
elif os.name == 'posix': # pragma: no cover
import fcntl
def lock(file_, flags):
locking_exceptions = IOError,
try: # pragma: no cover
locking_exceptions += BlockingIOError,
except NameError: # pragma: no cover
pass
try:
fcntl.flock(file_.fileno(), flags)
except locking_exceptions as exc_value:
# The exception code varies on different systems so we'll catch
# every IO error
raise exceptions.LockException(exc_value, fh=file_)
def unlock(file_):
fcntl.flock(file_.fileno(), constants.LOCK_UN)
else: # pragma: no cover
raise RuntimeError('PortaLocker only defined for nt and posix platforms')

View File

@@ -1,11 +1,16 @@
from __future__ import unicode_literals
import abc
from collections import OrderedDict
from contextlib import contextmanager
from typing import Text, Iterable, Union
from typing import Text, Iterable, Union, Optional, Dict, List
from pathlib2 import Path
from hashlib import md5
import six
from clearml_agent.helper.base import mkstemp, safe_remove_file, join_lines, select_for_platform
from clearml_agent.helper.console import ensure_binary
from clearml_agent.helper.os.folder_cache import FolderCache
from clearml_agent.helper.process import Executable, Argv, PathLike
@@ -18,6 +23,12 @@ class PackageManager(object):
_selected_manager = None
_cwd = None
_pip_version = None
_config_cache_folder = 'agent.venvs_cache.path'
_config_cache_max_entries = 'agent.venvs_cache.max_entries'
_config_cache_free_space_threshold = 'agent.venvs_cache.free_space_threshold_gb'
def __init__(self):
self._cache_manager = None
@abc.abstractproperty
def bin(self):
@@ -67,7 +78,7 @@ class PackageManager(object):
def upgrade_pip(self):
result = self._install(
select_for_platform(windows='"pip{}"', linux='pip{}').format(self.get_pip_version()), "--upgrade")
select_for_platform(windows='pip{}', linux='pip{}').format(self.get_pip_version()), "--upgrade")
packages = self.run_with_env(('list',), output=True).splitlines()
# p.split is ('pip', 'x.y.z')
pip = [p.split() for p in packages if len(p.split()) == 2 and p.split()[0] == 'pip']
@@ -153,3 +164,100 @@ class PackageManager(object):
@classmethod
def get_pip_version(cls):
return cls._pip_version or ''
def get_cached_venv(self, requirements, docker_cmd, python_version, cuda_version, destination_folder):
# type: (Dict, Optional[Union[dict, str]], Optional[str], Optional[str], Path) -> Optional[Path]
"""
Copy a cached copy of the venv (based on the requirements) into destination_folder.
Return None if failed or cached entry does not exist
"""
if not self._get_cache_manager():
return None
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
return self._get_cache_manager().copy_cached_entry(keys, destination_folder)
def add_cached_venv(
self,
requirements, # type: Union[Dict, List[Dict]]
docker_cmd, # type: Optional[Union[dict, str]]
python_version, # type: Optional[str]
cuda_version, # type: Optional[str]
source_folder, # type: Path
exclude_sub_folders=None # type: Optional[List[str]]
):
# type: (...) -> ()
"""
Copy the local venv folder into the venv cache (keys are based on the requirements+python+docker).
"""
if not self._get_cache_manager():
return
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
return self._get_cache_manager().add_entry(
keys=keys, source_folder=source_folder, exclude_sub_folders=exclude_sub_folders)
def get_cache_folder(self):
# type: () -> Optional[Path]
if not self._get_cache_manager():
return
return self._get_cache_manager().get_cache_folder()
def get_last_used_entry_cache(self):
# type: () -> Optional[Path]
"""
:return: the last used cached folder entry
"""
if not self._get_cache_manager():
return
return self._get_cache_manager().get_last_copied_entry()
@classmethod
def _generate_reqs_hash_keys(cls, requirements_list, docker_cmd, python_version, cuda_version):
# type: (Union[Dict, List[Dict]], Optional[Union[dict, str]], Optional[str], Optional[str]) -> List[str]
requirements_list = requirements_list or dict()
if not isinstance(requirements_list, (list, tuple)):
requirements_list = [requirements_list]
docker_cmd = dict(docker_cmd=docker_cmd) if isinstance(docker_cmd, str) else docker_cmd or dict()
docker_cmd = OrderedDict(sorted(docker_cmd.items(), key=lambda t: t[0]))
if 'docker_cmd' in docker_cmd:
# we only take the first part of the docker_cmd which is the docker image name
docker_cmd['docker_cmd'] = docker_cmd['docker_cmd'].strip('\r\n\t ').split(' ')[0]
keys = []
strip_chars = '\n\r\t '
for requirements in requirements_list:
pip, conda = ('pip', 'conda')
pip_reqs = requirements.get(pip, '')
conda_reqs = requirements.get(conda, '')
if isinstance(pip_reqs, str):
pip_reqs = pip_reqs.split('\n')
if isinstance(conda_reqs, str):
conda_reqs = conda_reqs.split('\n')
pip_reqs = sorted([p.strip(strip_chars) for p in pip_reqs
if p.strip(strip_chars) and not p.strip(strip_chars).startswith('#')])
conda_reqs = sorted([p.strip(strip_chars) for p in conda_reqs
if p.strip(strip_chars) and not p.strip(strip_chars).startswith('#')])
if not pip_reqs and not conda_reqs:
continue
hash_text = '{class_type}\n{docker_cmd}\n{cuda_ver}\n{python_version}\n{pip_reqs}\n{conda_reqs}'.format(
class_type=str(cls),
docker_cmd=str(docker_cmd or ''),
cuda_ver=str(cuda_version or ''),
python_version=str(python_version or ''),
pip_reqs=str(pip_reqs or ''),
conda_reqs=str(conda_reqs or ''),
)
keys.append(md5(ensure_binary(hash_text)).hexdigest())
return sorted(list(set(keys)))
def _get_cache_manager(self):
if not self._cache_manager:
cache_folder = self.session.config.get(self._config_cache_folder, None)
if not cache_folder:
return None
max_entries = int(self.session.config.get(self._config_cache_max_entries, 10))
free_space_threshold = float(self.session.config.get(self._config_cache_free_space_threshold, 0))
self._cache_manager = FolderCache(
cache_folder, max_cache_entries=max_entries, min_free_space_gb=free_space_threshold)
return self._cache_manager

View File

@@ -69,6 +69,7 @@ class CondaAPI(PackageManager):
:param python: base python version to use (e.g python3.6)
:param path: path of env
"""
super(CondaAPI, self).__init__()
self.session = session
self.python = python
self.source = None
@@ -132,7 +133,7 @@ class CondaAPI(PackageManager):
if self.env_read_only:
print('Conda environment in read-only mode, skipping pip upgrade.')
return ''
return self._install(select_for_platform(windows='"pip{}"', linux='pip{}').format(self.pip.get_pip_version()))
return self._install(select_for_platform(windows='pip{}', linux='pip{}').format(self.pip.get_pip_version()))
def create(self):
"""
@@ -140,19 +141,7 @@ class CondaAPI(PackageManager):
"""
if self.conda_env_as_base_docker and self.conda_pre_build_env_path:
if Path(self.conda_pre_build_env_path).is_dir():
print("Using pre-existing Conda environment from {}".format(self.conda_pre_build_env_path))
self.path = Path(self.conda_pre_build_env_path)
self.source = ("conda", "activate", self.path.as_posix())
self.pip = CondaPip(
session=self.session,
source=self.source,
python=self.python,
requirements_manager=self.requirements_manager,
path=self.path,
)
conda_env = self._get_conda_sh()
self.source = self.pip.source = CommandSequence(('source', conda_env.as_posix()), self.source)
self.env_read_only = True
self._init_existing_environment(self.conda_pre_build_env_path)
return self
elif Path(self.conda_pre_build_env_path).is_file():
print("Restoring Conda environment from {}".format(self.conda_pre_build_env_path))
@@ -210,6 +199,21 @@ class CondaAPI(PackageManager):
pass
return self
def _init_existing_environment(self, conda_pre_build_env_path):
print("Using pre-existing Conda environment from {}".format(conda_pre_build_env_path))
self.path = Path(conda_pre_build_env_path)
self.source = ("conda", "activate", self.path.as_posix())
self.pip = CondaPip(
session=self.session,
source=self.source,
python=self.python,
requirements_manager=self.requirements_manager,
path=self.path,
)
conda_env = self._get_conda_sh()
self.source = self.pip.source = CommandSequence(('source', conda_env.as_posix()), self.source)
self.env_read_only = True
def remove(self):
"""
Delete a conda environment.
@@ -284,17 +288,11 @@ class CondaAPI(PackageManager):
"""
Try to install packages from conda. Install packages which are not available from conda with pip.
"""
try:
self._install_from_file(path)
return
except PackageNotFoundError as e:
pip_packages = [e.pkg]
except PackagesNotFoundError as e:
pip_packages = package_set(e.packages)
with self.temp_file("conda_reqs", _package_diff(path, pip_packages)) as reqs:
self.install_from_file(reqs)
with self.temp_file("pip_reqs", pip_packages) as reqs:
self.pip.install_from_file(reqs)
requirements = {}
# assume requirements.txt
with open(path, 'rt') as f:
requirements['pip'] = f.read()
self.load_requirements(requirements)
def freeze(self, freeze_full_environment=False):
requirements = self.pip.freeze()
@@ -507,6 +505,8 @@ class CondaAPI(PackageManager):
reqs.append(m)
# if we have a conda list, the rest should be installed with pip,
# this means any experiment that was executed with pip environment,
# will be installed using pip
if requirements.get('conda', None) is not None:
for r in requirements['pip']:
try:
@@ -520,7 +520,7 @@ class CondaAPI(PackageManager):
# skip over local files (we cannot change the version to a local file)
if m.local_file:
continue
m_name = m.name.lower()
m_name = (m.name or '').lower()
if m_name in conda_supported_req_names:
# this package is in the conda list,
# make sure that if we changed version and we match it in conda
@@ -557,7 +557,7 @@ class CondaAPI(PackageManager):
# conform conda packages (version/name)
for r in reqs:
# change _ to - in name but not the prefix _ (as this is conda prefix)
if not r.name.startswith('_') and not requirements.get('conda', None):
if r.name and not r.name.startswith('_') and not requirements.get('conda', None):
r.name = r.name.replace('_', '-')
# remove .post from version numbers, it fails ~= version, and change == to ~=
if r.specs and r.specs[0]:
@@ -671,6 +671,8 @@ class CondaAPI(PackageManager):
return result
def get_python_command(self, extra=()):
if not self.source:
self._init_existing_environment(self.path)
return CommandSequence(self.source, self.pip.get_python_command(extra=extra))
def _get_conda_sh(self):

View File

@@ -17,6 +17,15 @@ class ExternalRequirements(SimpleSubstitution):
self.post_install_req_lookup = OrderedDict()
def match(self, req):
# match local folder building:
# noinspection PyBroadException
try:
if not req.name and req.req and not req.req.editable and not req.req.vcs and \
req.req.line and not req.req.line.strip().split('#')[0].lower().endswith('.whl'):
return True
except Exception:
pass
# match both editable or code or unparsed
if not (not req.name or req.req and (req.req.editable or req.req.vcs)):
return False
@@ -104,3 +113,20 @@ class ExternalRequirements(SimpleSubstitution):
list_of_requirements[k] += [self.post_install_req_lookup.get(r, '')
for r in self.post_install_req_lookup.keys() if r in original_requirements]
return list_of_requirements
class OnlyExternalRequirements(ExternalRequirements):
def __init__(self, *args, **kwargs):
super(OnlyExternalRequirements, self).__init__(*args, **kwargs)
def match(self, req):
return not super(OnlyExternalRequirements, self).match(req)
def replace(self, req):
"""
Replace a requirement
:raises: ValueError if version is pre-release
"""
# Do not store the skipped requirements
# mark skip package
return Text('')

View File

@@ -17,6 +17,7 @@ class SystemPip(PackageManager):
"""
Program interface to the system pip.
"""
super(SystemPip, self).__init__()
self._bin = interpreter or sys.executable
self.session = session

View File

@@ -17,6 +17,7 @@ import six
from clearml_agent.definitions import PIP_EXTRA_INDICES
from clearml_agent.helper.base import warning, is_conda, which, join_lines, is_windows_platform
from clearml_agent.helper.process import Argv, PathLike
from clearml_agent.helper.gpu.gpustat import get_driver_cuda_version
from clearml_agent.session import Session, normalize_cuda_version
from clearml_agent.external.requirements_parser import parse
from clearml_agent.external.requirements_parser.requirement import Requirement
@@ -446,6 +447,7 @@ class RequirementsManager(object):
'cu'+agent['cuda_version'] if self.found_cuda else 'cpu')
self.translator = RequirementsTranslator(session, interpreter=base_interpreter,
cache_dir=pip_cache_dir.as_posix())
self._base_interpreter = base_interpreter
def register(self, cls): # type: (Type[RequirementSubstitution]) -> None
self.handlers.append(cls(self._session))
@@ -529,6 +531,9 @@ class RequirementsManager(object):
pass
return requirements
def get_interpreter(self):
return self._base_interpreter
@staticmethod
def get_cuda_version(config): # type: (ConfigTree) -> (Text, Text)
# we assume os.environ already updated the config['agent.cuda_version'] & config['agent.cudnn_version']
@@ -537,6 +542,9 @@ class RequirementsManager(object):
if cuda_version and cudnn_version:
return normalize_cuda_version(cuda_version), normalize_cuda_version(cudnn_version)
if not cuda_version:
cuda_version = get_driver_cuda_version()
if not cuda_version and is_windows_platform():
try:
cuda_vers = [int(k.replace('CUDA_PATH_V', '').replace('_', '')) for k in os.environ.keys()
@@ -601,4 +609,3 @@ class RequirementsManager(object):
return (normalize_cuda_version(cuda_version or 0),
normalize_cuda_version(cudnn_version or 0))

View File

@@ -7,7 +7,7 @@ import re
import subprocess
import sys
from contextlib import contextmanager
from copy import deepcopy
from copy import copy
from distutils.spawn import find_executable
from itertools import chain, repeat, islice
from os.path import devnull
@@ -276,9 +276,9 @@ class CommandSequence(Executable):
self.commands = []
for c in commands:
if isinstance(c, CommandSequence):
self.commands.extend(deepcopy(c.commands))
self.commands.extend([copy(p) for p in c.commands])
elif isinstance(c, Argv):
self.commands.append(deepcopy(c))
self.commands.append(copy(c))
else:
self.commands.append(Argv(*c, log=self._log))
@@ -420,7 +420,7 @@ SOURCE_COMMAND = select_for_platform(linux="source", windows="call")
class ExitStatus(object):
success = 0
failure = 1
interrupted = 2
interrupted = -2
COMMAND_SUCCESS = 0

View File

@@ -4,7 +4,9 @@ import shutil
import subprocess
from distutils.spawn import find_executable
from hashlib import md5
from os import environ, getenv
from os import environ
from random import random
from threading import Lock
from typing import Text, Sequence, Mapping, Iterable, TypeVar, Callable, Tuple, Optional
import attr
@@ -23,6 +25,7 @@ from clearml_agent.helper.base import (
normalize_path,
create_file_if_not_exists,
)
from clearml_agent.helper.os.locks import FileLock
from clearml_agent.helper.process import DEVNULL, Argv, PathLike, COMMAND_SUCCESS
from clearml_agent.session import Session
@@ -88,7 +91,7 @@ class VCS(object):
# additional environment variables for VCS commands
COMMAND_ENV = {}
PATCH_ADDED_FILE_RE = re.compile(r"^\+\+\+ b/(?P<path>.*)")
PATCH_ADDED_FILE_RE = re.compile(r"^--- a/(?P<path>.*)")
def __init__(self, session, url, location, revision):
# type: (Session, Text, PathLike, Text) -> ()
@@ -115,21 +118,21 @@ class VCS(object):
"""
return self.add_auth(self.session.config, self.url)
@abc.abstractproperty
@abc.abstractmethod
def executable_name(self):
"""
Name of command executable
"""
pass
@abc.abstractproperty
@abc.abstractmethod
def main_branch(self):
"""
Name of default/main branch
"""
pass
@abc.abstractproperty
@abc.abstractmethod
def checkout_flags(self):
# type: () -> Sequence[Text]
"""
@@ -137,7 +140,7 @@ class VCS(object):
"""
pass
@abc.abstractproperty
@abc.abstractmethod
def patch_base(self):
# type: () -> Sequence[Text]
"""
@@ -254,15 +257,15 @@ class VCS(object):
return url
@classmethod
def replace_http_url(cls, url, port=None):
# type: (Text, Optional[int]) -> Text
def replace_http_url(cls, url, port=None, username=None):
# type: (Text, Optional[int], Optional[str]) -> Text
"""
Replace HTTPS URL with SSH URL when applicable
"""
parsed_url = furl(url)
if parsed_url.scheme == "https":
parsed_url.scheme = "ssh"
parsed_url.username = "git"
parsed_url.username = username or "git"
parsed_url.password = None
# make sure there is no port in the final url (safe_furl support)
# the original port was an https port, and we do not know if there is a different ssh port,
@@ -285,7 +288,10 @@ class VCS(object):
return
if parsed_url.scheme == "https":
new_url = self.replace_http_url(
self.url, port=self.session.config.get('agent.force_git_ssh_port', None))
self.url,
port=self.session.config.get('agent.force_git_ssh_port', None),
username=self.session.config.get('agent.force_git_ssh_user', None)
)
if new_url != self.url:
print("Using SSH credentials - replacing https url '{}' with ssh url '{}'".format(
self.url, new_url))
@@ -464,7 +470,7 @@ class VCS(object):
parsed_url.set(username=config_user, password=config_pass)
return parsed_url.url
@abc.abstractproperty
@abc.abstractmethod
def info_commands(self):
# type: () -> Mapping[Text, Argv]
"""
@@ -524,7 +530,7 @@ class Git(VCS):
self.call("checkout", self.revision, *self.checkout_flags, cwd=self.location)
try:
self.call("submodule", "update", "--recursive", cwd=self.location)
except:
except: # noqa
pass
info_commands = dict(
@@ -582,6 +588,9 @@ def clone_repository_cached(session, execution, destination):
:return: repository information
:raises: CommandFailedError if git/hg is not installed
"""
# mock lock
repo_lock = Lock()
repo_lock_timeout_sec = 300
repo_url = execution.repository # type: str
parsed_url = furl(repo_url)
no_password_url = parsed_url.copy().remove(password=True).url
@@ -593,37 +602,48 @@ def clone_repository_cached(session, execution, destination):
if standalone_mode:
cached_repo_path = clone_folder
else:
cached_repo_path = (
Path(session.config["agent.vcs_cache.path"]).expanduser()
/ "{}.{}".format(clone_folder_name, md5(ensure_binary(repo_url)).hexdigest())
/ clone_folder_name
) # type: Path
vcs_cache_path = Path(session.config["agent.vcs_cache.path"]).expanduser()
repo_hash = md5(ensure_binary(repo_url)).hexdigest()
# create lock
repo_lock = FileLock(filename=(vcs_cache_path / '{}.lock'.format(repo_hash)).as_posix())
# noinspection PyBroadException
try:
repo_lock.acquire(timeout=repo_lock_timeout_sec)
except BaseException:
print('Could not lock cache folder "{}" (timeout {} sec), using temp vcs cache.'.format(
clone_folder_name, repo_lock_timeout_sec))
repo_hash = '{}_{}'.format(repo_hash, str(random()).replace('.', ''))
# use mock lock for the context
repo_lock = Lock()
# select vcs cache folder
cached_repo_path = vcs_cache_path / "{}.{}".format(clone_folder_name, repo_hash) / clone_folder_name
vcs = VcsFactory.create(
session, execution_info=execution, location=cached_repo_path
)
if not find_executable(vcs.executable_name):
raise CommandFailedError(vcs.executable_not_found_error_help())
with repo_lock:
vcs = VcsFactory.create(
session, execution_info=execution, location=cached_repo_path
)
if not find_executable(vcs.executable_name):
raise CommandFailedError(vcs.executable_not_found_error_help())
if not standalone_mode:
if session.config["agent.vcs_cache.enabled"] and cached_repo_path.exists():
print('Using cached repository in "{}"'.format(cached_repo_path))
if not standalone_mode:
if session.config["agent.vcs_cache.enabled"] and cached_repo_path.exists():
print('Using cached repository in "{}"'.format(cached_repo_path))
else:
print("cloning: {}".format(no_password_url))
rm_tree(cached_repo_path)
# We clone the entire repository, not a specific branch
vcs.clone() # branch=execution.branch)
else:
print("cloning: {}".format(no_password_url))
rm_tree(cached_repo_path)
# We clone the entire repository, not a specific branch
vcs.clone() # branch=execution.branch)
vcs.pull()
rm_tree(destination)
shutil.copytree(Text(cached_repo_path), Text(clone_folder))
if not clone_folder.is_dir():
raise CommandFailedError(
"copying of repository failed: from {} to {}".format(
cached_repo_path, clone_folder
vcs.pull()
rm_tree(destination)
shutil.copytree(Text(cached_repo_path), Text(clone_folder))
if not clone_folder.is_dir():
raise CommandFailedError(
"copying of repository failed: from {} to {}".format(
cached_repo_path, clone_folder
)
)
)
# checkout in the newly copy destination
vcs.location = Text(clone_folder)
@@ -635,3 +655,70 @@ def clone_repository_cached(session, execution, destination):
repo_info = attr.evolve(repo_info, url=no_password_url)
return vcs, repo_info
def fix_package_import_diff_patch(entry_script_file):
# noinspection PyBroadException
try:
with open(entry_script_file, 'rt') as f:
lines = f.readlines()
except Exception:
return
# make sre we are the first import (i.e. we patched the source code)
if not lines or not lines[0].strip().startswith('from clearml ') or 'Task.init' not in lines[1]:
return
original_lines = lines
# skip over the first two lines, they are ours
# then skip over empty or comment lines
lines = [(i, line.split('#', 1)[0].rstrip()) for i, line in enumerate(lines)
if i >= 2 and line.strip('\r\n\t ') and not line.strip().startswith('#')]
# remove triple quotes ' """ '
nested_c = -1
skip_lines = []
for i, line_pair in enumerate(lines):
for _ in line_pair[1].split('"""')[1:]:
if nested_c >= 0:
skip_lines.extend(list(range(nested_c, i+1)))
nested_c = -1
else:
nested_c = i
# now select all the
lines = [pair for i, pair in enumerate(lines) if i not in skip_lines]
from_future = re.compile(r"^from[\s]*__future__[\s]*")
import_future = re.compile(r"^import[\s]*__future__[\s]*")
# test if we have __future__ import
found_index = -1
for a_i, (_, a_line) in enumerate(lines):
if found_index >= a_i:
continue
if from_future.match(a_line) or import_future.match(a_line):
found_index = a_i
# check the last import block
i, line = lines[found_index]
# wither we have \\ character at the end of the line or the line is indented
parenthesized_lines = '(' in line and ')' not in line
while line.endswith('\\') or parenthesized_lines:
found_index += 1
i, line = lines[found_index]
if ')' in line:
break
else:
break
# no imports found
if found_index < 0:
return
# now we need to move back the patched two lines
entry_line = lines[found_index][0]
new_lines = original_lines[2:entry_line + 1] + original_lines[0:2] + original_lines[entry_line + 1:]
# noinspection PyBroadException
try:
with open(entry_script_file, 'wt') as f:
f.writelines(new_lines)
except Exception:
return

View File

@@ -129,8 +129,9 @@ def get_uptime_string(entry):
def get_runtime_properties_string(runtime_properties):
# type: (List[dict]) -> Tuple[Optional[str], str]
# type: (Optional[List[dict]]) -> Tuple[Optional[str], str]
server_string = []
runtime_properties = runtime_properties or []
force_flag = next(
(prop for prop in runtime_properties if prop["key"] == UptimeConf.worker_key),
None,

View File

@@ -7,7 +7,7 @@ from tempfile import gettempdir, NamedTemporaryFile
from typing import List, Tuple, Optional
from clearml_agent.definitions import ENV_DOCKER_HOST_MOUNT
from clearml_agent.helper.base import warning
from clearml_agent.helper.base import warning, is_windows_platform, safe_remove_file
class Singleton(object):
@@ -22,6 +22,13 @@ class Singleton(object):
_lock_timeout = 10
_pid = None
@classmethod
def close_pid_file(cls):
if cls._pid_file:
cls._pid_file.close()
safe_remove_file(cls._pid_file.name)
cls._pid_file = None
@classmethod
def update_pid_file(cls):
new_pid = str(os.getpid())
@@ -115,7 +122,7 @@ class Singleton(object):
@classmethod
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
if cls.worker_id:
if cls.worker_id and cls.instance_slot is not None:
return cls.worker_id, cls.instance_slot
# make sure we have a unique name
instance_num = 0
@@ -167,7 +174,9 @@ class Singleton(object):
# create lock
cls._pid = str(os.getpid())
cls._pid_file = NamedTemporaryFile(
dir=cls._get_temp_folder(), prefix=cls.prefix + cls.sep + cls._pid + cls.sep, suffix=cls.ext)
dir=cls._get_temp_folder(), prefix=cls.prefix + cls.sep + cls._pid + cls.sep, suffix=cls.ext,
delete=False if is_windows_platform() else True
)
cls._pid_file.write(('{}\n{}'.format(unique_worker_id, cls.instance_slot)).encode())
cls._pid_file.flush()
cls.worker_id = unique_worker_id

View File

@@ -78,7 +78,10 @@ DAEMON_ARGS = dict({
},
'--services-mode': {
'help': 'Launch multiple long-term docker services. Implies docker & cpu-only flags.',
'action': 'store_true',
'nargs': '?',
'const': -1,
'type': int,
'default': None,
},
'--create-queue': {
'help': 'Create requested queue if it does not exist already.',
@@ -93,6 +96,12 @@ DAEMON_ARGS = dict({
'help': 'Stop the running agent (based on the same set of arguments)',
'action': 'store_true',
},
'--dynamic-gpus': {
'help': 'Allow to dynamically allocate gpus based on queue properties, '
'configure with \'--queues <queue_name>=<num_gpus>\'.'
' Example: \'--dynamic-gpus --queue dual_gpus=2 single_gpu=1\'',
'action': 'store_true',
},
'--uptime': {
'help': 'Specify uptime for clearml-agent in "<hours> <days>" format. for example, use "17-20 TUE" to set '
'Tuesday\'s uptime to 17-20'

View File

@@ -204,7 +204,7 @@ class Session(_Session):
folder_keys = ('agent.venvs_dir', 'agent.vcs_cache.path',
'agent.pip_download_cache.path',
'agent.docker_pip_cache', 'agent.docker_apt_cache')
singleton_folders = ('agent.venvs_dir', 'agent.vcs_cache.path', 'agent.docker_apt_cache')
singleton_folders = ('agent.venvs_dir', 'agent.docker_apt_cache')
if ENV_TASK_EXECUTE_AS_USER.get():
folder_keys = tuple(list(folder_keys) + ['sdk.storage.cache.default_base_dir'])

View File

@@ -1 +1 @@
__version__ = '0.17.0'
__version__ = '0.17.1'

View File

@@ -10,9 +10,9 @@ RUN apt-get dist-upgrade -y
RUN apt-get install -y curl python3-pip git
RUN curl -sSL https://get.docker.com/ | sh
RUN python3 -m pip install -U pip
RUN python3 -m pip install trains-agent
RUN python3 -m pip install clearml-agent
RUN python3 -m pip install -U "cryptography>=2.9"
ENV TRAINS_DOCKER_SKIP_GPUS_FLAG=1
ENV CLEARML_DOCKER_SKIP_GPUS_FLAG=1
ENTRYPOINT ["/usr/agent/entrypoint.sh"]

View File

@@ -1,7 +1,7 @@
#!/bin/sh
LOWER_PIP_UPDATE_VERSION="$(echo "$PIP_UPDATE_VERSION" | tr '[:upper:]' '[:lower:]')"
LOWER_TRAINS_AGENT_UPDATE_VERSION="$(echo "$TRAINS_AGENT_UPDATE_VERSION" | tr '[:upper:]' '[:lower:]')"
LOWER_CLEARML_AGENT_UPDATE_VERSION="$(echo "${CLEARML_AGENT_UPDATE_VERSION:-$TRAINS_AGENT_UPDATE_VERSION}" | tr '[:upper:]' '[:lower:]')"
if [ "$LOWER_PIP_UPDATE_VERSION" = "yes" ] || [ "$LOWER_PIP_UPDATE_VERSION" = "true" ] ; then
python3 -m pip install -U pip
@@ -9,11 +9,11 @@ elif [ ! -z "$LOWER_PIP_UPDATE_VERSION" ] ; then
python3 -m pip install pip$LOWER_PIP_UPDATE_VERSION ;
fi
echo "TRAINS_AGENT_UPDATE_VERSION = $LOWER_TRAINS_AGENT_UPDATE_VERSION"
if [ "$LOWER_TRAINS_AGENT_UPDATE_VERSION" = "yes" ] || [ "$LOWER_TRAINS_AGENT_UPDATE_VERSION" = "true" ] ; then
python3 -m pip install trains-agent -U
elif [ ! -z "$LOWER_TRAINS_AGENT_UPDATE_VERSION" ] ; then
python3 -m pip install trains-agent$LOWER_TRAINS_AGENT_UPDATE_VERSION ;
echo "CLEARML_AGENT_UPDATE_VERSION = $LOWER_CLEARML_AGENT_UPDATE_VERSION"
if [ "$LOWER_CLEARML_AGENT_UPDATE_VERSION" = "yes" ] || [ "$LOWER_CLEARML_AGENT_UPDATE_VERSION" = "true" ] ; then
python3 -m pip install clearml-agent -U
elif [ ! -z "$LOWER_CLEARML_AGENT_UPDATE_VERSION" ] ; then
python3 -m pip install clearml-agent$LOWER_CLEARML_AGENT_UPDATE_VERSION ;
fi
python3 -m trains_agent daemon --docker "$TRAINS_AGENT_DEFAULT_BASE_DOCKER" --force-current-version $TRAINS_AGENT_EXTRA_ARGS
python3 -m clearml_agent daemon --docker "${CLEARML_AGENT_DEFAULT_BASE_DOCKER:-$TRAINS_AGENT_DEFAULT_BASE_DOCKER}" --force-current-version ${CLEARML_AGENT_EXTRA_ARGS:-$TRAINS_AGENT_EXTRA_ARGS}

View File

@@ -19,7 +19,7 @@ RUN locale-gen en_US.UTF-8
RUN apt-get install -y curl python3-pip git
RUN curl -sSL https://get.docker.com/ | sh
RUN python3 -m pip install -U pip
RUN python3 -m pip install trains-agent
RUN python3 -m pip install clearml-agent
RUN python3 -m pip install -U "cryptography>=2.9"
ENTRYPOINT ["/usr/agent/entrypoint.sh"]

View File

@@ -1,14 +1,16 @@
#!/bin/sh
if [ -z "$TRAINS_FILES_HOST" ]; then
TRAINS_HOST_IP=${TRAINS_HOST_IP:-$(curl -s https://ifconfig.me/ip)}
CLEARML_FILES_HOST=${CLEARML_FILES_HOST:-$TRAINS_FILES_HOST}
if [ -z "$CLEARML_FILES_HOST" ]; then
CLEARML_HOST_IP=${CLEARML_HOST_IP:-${TRAINS_HOST_IP:-$(curl -s https://ifconfig.me/ip)}}
fi
TRAINS_FILES_HOST=${TRAINS_FILES_HOST:-"http://$TRAINS_HOST_IP:8081"}
TRAINS_WEB_HOST=${TRAINS_WEB_HOST:-"http://$TRAINS_HOST_IP:8080"}
TRAINS_API_HOST=${TRAINS_API_HOST:-"http://$TRAINS_HOST_IP:8008"}
CLEARML_FILES_HOST=${CLEARML_FILES_HOST:-${TRAINS_FILES_HOST:-"http://$CLEARML_HOST_IP:8081"}}
CLEARML_WEB_HOST=${CLEARML_WEB_HOST:-${TRAINS_WEB_HOST:-"http://$CLEARML_HOST_IP:8080"}}
CLEARML_API_HOST=${CLEARML_API_HOST:-${TRAINS_API_HOST:-"http://$CLEARML_HOST_IP:8008"}}
echo $TRAINS_FILES_HOST $TRAINS_WEB_HOST $TRAINS_API_HOST 1>&2
echo $CLEARML_FILES_HOST $CLEARML_WEB_HOST $CLEARML_API_HOST 1>&2
python3 -m pip install -q -U "trains-agent${TRAINS_AGENT_UPDATE_VERSION}"
trains-agent daemon --services-mode --queue services --create-queue --docker "$TRAINS_AGENT_DEFAULT_BASE_DOCKER" --cpu-only $TRAINS_AGENT_EXTRA_ARGS
python3 -m pip install -q -U "clearml-agent${CLEARML_AGENT_UPDATE_VERSION:-$TRAINS_AGENT_UPDATE_VERSION}"
clearml-agent daemon --services-mode --queue services --create-queue --docker "${CLEARML_AGENT_DEFAULT_BASE_DOCKER:-$TRAINS_AGENT_DEFAULT_BASE_DOCKER}" --cpu-only ${CLEARML_AGENT_EXTRA_ARGS:-$TRAINS_AGENT_EXTRA_ARGS}

View File

@@ -24,7 +24,9 @@ agent {
# Force GIT protocol to use SSH regardless of the git url (Assumes GIT user/pass are blank)
force_git_ssh_protocol: false
# Force a specific SSH port when converting http to ssh links (the domain is kept the same)
# force_git_ssh_port: ""
# force_git_ssh_port: 0
# Force a specific SSH username when converting http to ssh links (the default username is 'git')
# force_git_ssh_user: git
# unique name of this worker, if None, created based on hostname:process_id
# Overridden with os environment: CLEARML_WORKER_NAME
@@ -89,6 +91,16 @@ agent {
# target folder for virtual environments builds, created when executing experiment
venvs_dir = ~/.clearml/venvs-builds
# cached virtual environment folder
venvs_cache: {
# maximum number of cached venvs
max_entries: 10
# minimum required free space to allow for cache entry, disable by passing 0 or negative value
free_space_threshold_gb: 2.0
# unmark to enable virtual environment caching
# path: ~/.clearml/venvs-cache
},
# cached git clone folder
vcs_cache: {
enabled: true,
@@ -118,7 +130,7 @@ agent {
# optional arguments to pass to docker image
# these are local for this agent and will not be updated in the experiment's docker_cmd section
# extra_docker_arguments: ["--ipc=host", ]
# extra_docker_arguments: ["--ipc=host", "-v", "/mnt/host/data:/mnt/data"]
# optional shell script to run in docker when started before the experiment is started
# extra_docker_shell_script: ["apt-get install -y bindfs", ]

View File

@@ -24,7 +24,13 @@ def parse_args():
parser.add_argument(
"--base-port", type=int,
help="Used in conjunction with ports-mode, specifies the base port exposed by the services. "
"For pod #X, the port will be <base-port>+X"
"For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num"
"e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003"
)
parser.add_argument(
"--base-pod-num", type=int, default=1,
help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the "
"service (default: %(default)s)"
)
parser.add_argument(
"--gateway-address", type=str, default=None,
@@ -47,6 +53,10 @@ def parse_args():
"--ssh-server-port", type=int, default=0,
help="If non-zero, every pod will also start an SSH server on the selected port (default: zero, not active)"
)
parser.add_argument(
"--namespace", type=str,
help="Specify the namespace in which pods will be created (default: %(default)s)", default="clearml"
)
return parser.parse_args()
@@ -63,10 +73,11 @@ def main():
user_props_cb = k8s_user_props_cb
k8s = K8sIntegration(
ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb,
overrides_yaml=args.overrides_yaml, trains_conf_file=args.pod_trains_conf, template_yaml=args.template_yaml,
extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None
ports_mode=args.ports_mode, num_of_services=args.num_of_services, base_pod_num=args.base_pod_num,
user_props_cb=user_props_cb, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf,
template_yaml=args.template_yaml, extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None,
namespace=args.namespace,
)
k8s.k8s_daemon(args.queue)

View File

@@ -2,7 +2,6 @@ attrs>=18.0,<20.4.0
enum34>=0.9,<1.2.0 ; python_version < '3.6'
furl>=2.0.0,<2.2.0
future>=0.16.0,<0.19.0
humanfriendly>=2.1,<9.2
jsonschema>=2.6.0,<3.3.0
pathlib2>=2.3.0,<2.4.0
psutil>=3.4.2,<5.9.0
@@ -11,10 +10,8 @@ pyparsing>=2.0.3,<2.5.0
python-dateutil>=2.4.2,<2.9.0
pyjwt>=1.6.4,<1.8.0
PyYAML>=3.12,<5.4.0
requests-file>=1.4.2,<1.6.0
requests>=2.20.0,<2.26.0
six>=1.11.0,<1.16.0
tqdm>=4.19.5,<4.55.0
typing>=3.6.4,<3.8.0
urllib3>=1.21.1,<1.27.0
virtualenv>=16,<20

View File

@@ -44,7 +44,7 @@ setup(
author_email='clearml@allegro.ai',
license='Apache License 2.0',
classifiers=[
'Development Status :: 4 - Beta',
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'Intended Audience :: System Administrators',