Compare commits

...

27 Commits

Author SHA1 Message Date
allegroai
d9b9b4984b Version bump to v0.17.2 2021-03-04 20:12:50 +02:00
allegroai
8a46dc6b03 Update default_docker in docs 2021-03-04 20:07:34 +02:00
allegroai
205f9dd816 Fix k8s glue does not pass docker environment variables
Remove deprecated flags
2021-03-03 15:07:06 +02:00
allegroai
9dfa1294e2 Add agent.enable_task_env set the OS environment based on the Environment section of the Task. 2021-02-28 19:47:44 +02:00
allegroai
f019905720 Fix venv cache support for local folders 2021-02-28 19:47:09 +02:00
allegroai
9c257858dd Fix venv cache support for local folders 2021-02-23 18:54:38 +02:00
allegroai
2006ab20dd Fix conda support for git+http links 2021-02-23 12:46:06 +02:00
allegroai
0caf31719c Fix venv caching always reinstall git repositories and local repositories 2021-02-23 12:45:34 +02:00
allegroai
5da7184276 Add agent.ignore_requested_python_version (control for multi python environments) 2021-02-23 12:45:00 +02:00
allegroai
50fccdab96 PEP8 2021-02-23 12:44:26 +02:00
allegroai
77d6ff6630 Fix docker mode without venvs cache dir 2021-02-17 00:04:07 +02:00
allegroai
99614702ea Add missing default configuration value 2021-02-17 00:03:42 +02:00
allegroai
58cb344ee6 Upgrade pynvml add detect CUDA version from driver level 2021-02-17 00:03:16 +02:00
allegroai
22d5892b12 Use shared git cache between multiple agents on the same machine 2021-02-14 13:49:29 +02:00
allegroai
f619969efc Add venvs_cache configuration 2021-02-14 13:48:57 +02:00
allegroai
ca242424ab Fix service-mode support for venvs
Fix --services-mode with venvs
2021-02-14 13:45:17 +02:00
allegroai
407deb84e9 Fix multi instances on Windows 2021-02-14 13:44:39 +02:00
allegroai
14589aa094 Fix CPU mode 2021-02-14 13:44:00 +02:00
allegroai
1260e3d942 Update cache entries on conda package manager 2021-02-11 14:47:26 +02:00
allegroai
b22d926d94 Fix cache to take cuda version into account 2021-02-11 14:47:05 +02:00
allegroai
410cc8c7be Add --dynamic-gpus and limit in --services-mode 2021-02-11 14:46:37 +02:00
allegroai
784c676f5b Fix "from clearml" runtime diff patching (make sure we move it to after all the __future__ imports) include handling triple quotes in comments 2021-02-11 14:46:06 +02:00
allegroai
296f7970df Fix file not found error (no 2) interpreted as aborted (i.e. ctrl-c) 2021-02-11 14:44:54 +02:00
allegroai
cd59933c9c Remove unused packages 2021-02-11 14:44:35 +02:00
allegroai
b95d3f5300 Add venv caching with docker mode support 2021-02-11 14:44:19 +02:00
allegroai
fa0d5d8469 Fix --detached not supported on Windows, ignore and issue warning 2021-02-11 14:40:09 +02:00
allegroai
8229843018 Add base-pod-number parameter to k8s glue and example 2021-01-26 20:00:18 +02:00
25 changed files with 3349 additions and 193 deletions

View File

@@ -77,6 +77,16 @@
# target folder for virtual environments builds, created when executing experiment
venvs_dir = ~/.clearml/venvs-builds
# cached virtual environment folder
venvs_cache: {
# maximum number of cached venvs
max_entries: 10
# minimum required free space to allow for cache entry, disable by passing 0 or negative value
free_space_threshold_gb: 2.0
# unmark to enable virtual environment caching
# path: ~/.clearml/venvs-cache
},
# cached git clone folder
vcs_cache: {
enabled: true,
@@ -139,6 +149,9 @@
# arguments: ["--ipc=host", ]
}
# set the OS environments based on the Task's Environment section before launching the Task process.
enable_task_env: false
# set the initial bash script to execute at the startup of any docker.
# all lines will be executed regardless of their exit code.
# {python_single_digit} is translated to 'python3' or 'python2' according to requested python version

View File

@@ -12,10 +12,10 @@ import sys
import shutil
import traceback
from collections import defaultdict
from copy import deepcopy
from copy import deepcopy, copy
from datetime import datetime
from distutils.spawn import find_executable
from functools import partial
from functools import partial, cmp_to_key
from itertools import chain
from tempfile import mkdtemp, NamedTemporaryFile
from time import sleep, time
@@ -26,6 +26,7 @@ import psutil
import six
from clearml_agent.backend_api.services import queues as queues_api
from clearml_agent.backend_api.services import tasks as tasks_api
from clearml_agent.backend_api.services import workers as workers_api
from pathlib2 import Path
from pyhocon import ConfigTree, ConfigFactory
from six.moves.urllib.parse import quote
@@ -72,7 +73,7 @@ from clearml_agent.helper.os.daemonize import daemonize_process
from clearml_agent.helper.package.base import PackageManager
from clearml_agent.helper.package.conda_api import CondaAPI
from clearml_agent.helper.package.post_req import PostRequirement
from clearml_agent.helper.package.external_req import ExternalRequirements
from clearml_agent.helper.package.external_req import ExternalRequirements, OnlyExternalRequirements
from clearml_agent.helper.package.pip_api.system import SystemPip
from clearml_agent.helper.package.pip_api.venv import VirtualenvPip
from clearml_agent.helper.package.poetry_api import PoetryConfig, PoetryAPI
@@ -92,7 +93,7 @@ from clearml_agent.helper.process import (
commit_docker, terminate_process,
)
from clearml_agent.helper.package.priority_req import PriorityPackageRequirement, PackageCollectorRequirement
from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS
from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fix_package_import_diff_patch
from clearml_agent.helper.resource_monitor import ResourceMonitor
from clearml_agent.helper.runtime_verification import check_runtime, print_uptime_properties
from clearml_agent.session import Session
@@ -411,6 +412,7 @@ class Worker(ServiceCommandSection):
self._daemon_foreground = None
self._standalone_mode = None
self._services_mode = None
self._dynamic_gpus = None
self._force_current_version = None
self._redirected_stdout_file_no = None
self._uptime_config = self._session.config.get("agent.uptime", None)
@@ -433,16 +435,17 @@ class Worker(ServiceCommandSection):
"""
if kwargs.get('services_mode'):
kwargs['cpu_only'] = True
kwargs['docker'] = kwargs.get('docker') or []
# kwargs['docker'] = kwargs.get('docker') or []
kwargs['gpus'] = None
return kwargs
def _get_requirements_manager(self, os_override=None, base_interpreter=None):
def _get_requirements_manager(self, os_override=None, base_interpreter=None, requirement_substitutions=None):
requirements_manager = RequirementsManager(
self._session, base_interpreter=base_interpreter
)
for requirement_cls in self._requirement_substitutions:
requirement_substitutions = requirement_substitutions or self._requirement_substitutions
for requirement_cls in requirement_substitutions:
if os_override and issubclass(requirement_cls, PytorchRequirement):
requirement_cls = partial(requirement_cls, os_override=os_override)
requirements_manager.register(requirement_cls)
@@ -498,6 +501,9 @@ class Worker(ServiceCommandSection):
)
docker_image = None
worker_id = '{}:service:{}'.format(self.worker_id, task_id) \
if self._services_mode and not self._dynamic_gpus else self.worker_id
if self.docker_image_func:
try:
response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"])
@@ -525,7 +531,7 @@ class Worker(ServiceCommandSection):
if self._services_mode:
# if this is services mode, give the docker a unique worker id, as it will register itself.
full_docker_cmd = self.docker_image_func(
worker_id='{}:service:{}'.format(self.worker_id, task_id),
worker_id=worker_id,
docker_image=docker_image, docker_arguments=docker_arguments)
else:
full_docker_cmd = self.docker_image_func(docker_image=docker_image, docker_arguments=docker_arguments)
@@ -551,7 +557,7 @@ class Worker(ServiceCommandSection):
print('Running Docker:\n{}\n'.format(str(cmd)))
else:
cmd = worker_args.get_argv_for_command("execute") + (
"--disable-monitoring",
'--full-monitoring' if self._services_mode else '--disable-monitoring',
"--id",
task_id,
)
@@ -567,7 +573,7 @@ class Worker(ServiceCommandSection):
status = ExitStatus.failure
try:
# set WORKER ID
ENV_WORKER_ID.set(self.worker_id)
ENV_WORKER_ID.set(worker_id)
if self._docker_force_pull and docker_image:
full_pull_cmd = ['docker', 'pull', docker_image]
@@ -592,7 +598,11 @@ class Worker(ServiceCommandSection):
errors = temp_stderr_name and Path(temp_stderr_name).read_text()
if errors:
print("\nEncountered errors:\n\n{}\n".format(errors))
if status is None:
if status is None and self._services_mode:
print(
"Service bootstrap completed: task '{}'".format(task_id)
)
elif status is None:
print(
"DONE: Running task '{}' (user aborted)".format(task_id)
)
@@ -612,29 +622,71 @@ class Worker(ServiceCommandSection):
safe_remove_file(temp_stdout_name)
safe_remove_file(temp_stderr_name)
def run_tasks_loop(self, queues, worker_params, priority_order=True):
def run_tasks_loop(self, queues, worker_params, priority_order=True, gpu_indexes=None, gpu_queues=None):
"""
:summary: Pull and run tasks from queues.
:description: 1. Go through ``queues`` by order.
2. Try getting the next task for each and run the first one that returns.
3. Go to step 1
:param queues: IDs of queues to pull tasks from
:type queues: list of ``Text``
:param worker_params: Worker command line arguments
:type worker_params: ``clearml_agent.helper.process.WorkerParams``
:param priority_order: If True pull order in priority manner. always from the first
:param list(str) queues: IDs of queues to pull tasks from
:param worker_params worker_params: Worker command line arguments
:param bool priority_order: If True pull order in priority manner. always from the first
If False, pull from each queue once in a round robin manner
:type priority_order: bool
:param list gpu_indexes: list of gpu_indexes. Needs special backend support
:param list gpu_queues: list of pairs (queue_id, num_gpus). Needs special backend support
"""
if not self._daemon_foreground:
print('Starting infinite task polling loop...')
_last_machine_update_ts = 0
if self._services_mode:
max_num_instances = int(self._services_mode)
else:
max_num_instances = None
# store in runtime configuration,
if max_num_instances and not self.set_runtime_properties(key='max_num_instances', value=max_num_instances):
warning('Maximum number of service instance not supported, removing limit.')
max_num_instances = -1
# get current running instances
available_gpus = None
if gpu_indexes and gpu_queues:
available_gpus, gpu_queues = self._setup_dynamic_gpus(gpu_queues)
# multi instance support
self._services_mode = True
while True:
queue_tags = None
runtime_props = None
if max_num_instances and max_num_instances > 0:
# make sure we do not have too many instances to run
if len(Singleton.get_running_pids()) >= max_num_instances:
if self._daemon_foreground or worker_params.debug:
print("Reached max number of services, sleeping for {:.1f} seconds".format(
self._polling_interval))
sleep(self._polling_interval)
continue
# update available gpus
if gpu_queues:
try:
response = self._session.send_api(workers_api.GetAllRequest(last_seen=60))
except Exception:
# if something went wrong start over from the highest priority queue
sleep(self._polling_interval)
continue
available_gpus = self._dynamic_gpu_get_available(gpu_indexes)
# if something went wrong or we have no free gpus
# start over from the highest priority queue
if not available_gpus:
if self._daemon_foreground or worker_params.debug:
print("All GPUs allocated, sleeping for {:.1f} seconds".format(self._polling_interval))
sleep(self._polling_interval)
continue
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
@@ -644,6 +696,25 @@ class Worker(ServiceCommandSection):
if not self.should_be_currently_active(queue_tags[queue], runtime_props):
continue
if gpu_queues:
# peek into queue
# get next task in queue
try:
response = self._session.send_api(queues_api.GetByIdRequest(queue=queue))
except Exception:
# if something went wrong start over from the highest priority queue
break
if not len(response.queue.entries):
continue
# check if we have enough available gpus
if gpu_queues[queue] > len(available_gpus):
# not enough available_gpus, we should sleep and start over
if self._daemon_foreground or worker_params.debug:
print("Not enough free GPUs {}/{}, sleeping for {:.1f} seconds".format(
len(available_gpus), gpu_queues[queue], self._polling_interval))
sleep(self._polling_interval)
break
# get next task in queue
try:
response = self._session.send_api(
@@ -674,13 +745,33 @@ class Worker(ServiceCommandSection):
except:
pass
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES')
worker_id = self.worker_id
if gpu_queues and gpu_queues.get(queue):
# pick the first available GPUs
gpus = available_gpus[:gpu_queues.get(queue)]
available_gpus = available_gpus[gpu_queues.get(queue):]
self.set_runtime_properties(
key='available_gpus', value=','.join(str(g) for g in available_gpus))
os.environ['CUDA_VISIBLE_DEVICES'] = \
os.environ['NVIDIA_VISIBLE_DEVICES'] = ','.join(str(g) for g in gpus)
self.worker_id = ':'.join(self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)])
self.send_logs(
task_id=task_id,
lines=["task {} pulled from {} by worker {}\n".format(task_id, queue, self.worker_id)],
level="INFO",
)
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
self.run_one_task(queue, task_id, worker_params)
if gpu_queues:
self.worker_id = worker_id
os.environ['CUDA_VISIBLE_DEVICES'] = \
os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
queue_tags = None
@@ -699,6 +790,39 @@ class Worker(ServiceCommandSection):
if self._session.config["agent.reload_config"]:
self.reload_config()
def _dynamic_gpu_get_available(self, gpu_indexes):
try:
response = self._session.send_api(workers_api.GetAllRequest(last_seen=60))
except Exception:
return None
worker_name = self._session.config["agent.worker_name"] + ':'
our_workers = [
w.id for w in response.workers
if w.id.startswith(worker_name) and w.id != self.worker_id]
gpus = []
for w in our_workers:
gpus += [int(g) for g in w.split(':')[-1].lower().replace('gpu', '').split(',')]
available_gpus = list(set(gpu_indexes) - set(gpus))
return available_gpus
def _setup_dynamic_gpus(self, gpu_queues):
available_gpus = self.get_runtime_properties()
if available_gpus is None:
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
available_gpus = [prop["value"] for prop in available_gpus if prop["key"] == 'available_gpus']
if available_gpus:
available_gpus = [int(g) for g in available_gpus[-1].split(',')]
if not isinstance(gpu_queues, dict):
gpu_queues = dict(gpu_queues)
if not self.set_runtime_properties(
key='available_gpus', value=','.join(str(g) for g in available_gpus)):
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
return available_gpus, gpu_queues
def get_worker_properties(self, queue_ids):
queue_tags = {
q.id: {'name': q.name, 'tags': q.tags}
@@ -714,11 +838,11 @@ class Worker(ServiceCommandSection):
# either not supported or never tested
if self._runtime_props_support == self._session.api_version:
# tested against latest api_version, not supported
return []
return None
if not self._session.check_min_api_version(UptimeConf.min_api_version):
# not supported due to insufficient api_version
self._runtime_props_support = self._session.api_version
return []
return None
try:
res = self.get("get_runtime_properties", worker=self.worker_id)["runtime_properties"]
# definitely supported
@@ -726,12 +850,38 @@ class Worker(ServiceCommandSection):
return res
except APIError:
self._runtime_props_support = self._session.api_version
return []
return None
def set_runtime_properties(self, key, value):
if self._runtime_props_support is not True:
# either not supported or never tested
if self._runtime_props_support == self._session.api_version:
# tested against latest api_version, not supported
return False
if not self._session.check_min_api_version(UptimeConf.min_api_version):
# not supported due to insufficient api_version
self._runtime_props_support = self._session.api_version
return False
# noinspection PyBroadException
try:
self.post("set_runtime_properties", worker=self.worker_id, json={key: value})
# definitely supported
self._runtime_props_support = True
return True
except APIError:
self._runtime_props_support = self._session.api_version
except Exception as ex:
# not sure what happened
pass
return False
def should_be_currently_active(self, current_queue, runtime_properties):
"""
Checks if a worker is active according to queue tags, worker's runtime properties and uptime schedule.
"""
runtime_properties = runtime_properties or []
if UptimeConf.queue_tag_off in current_queue['tags']:
self.log.debug("Queue {} is tagged '{}', worker will not pull tasks".format(
current_queue['name'], UptimeConf.queue_tag_off)
@@ -813,16 +963,12 @@ class Worker(ServiceCommandSection):
self._session.print_configuration()
def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs):
# if we do not need to create queues, make sure they are valid
# match previous behaviour when we validated queue names before everything else
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
self._standalone_mode = kwargs.get('standalone_mode', False)
self._services_mode = kwargs.get('services_mode', False)
# must have docker in services_mode
if self._services_mode:
kwargs = self._verify_command_states(kwargs)
docker = docker or kwargs.get('docker')
self._uptime_config = kwargs.get('uptime', None) or self._uptime_config
self._downtime_config = kwargs.get('downtime', None) or self._downtime_config
if self._uptime_config and self._downtime_config:
@@ -832,6 +978,16 @@ class Worker(ServiceCommandSection):
self._uptime_config = None
self._downtime_config = None
# support --dynamic-gpus
dynamic_gpus, gpu_indexes, queues = self._parse_dynamic_gpus(kwargs, queues)
if self._services_mode and dynamic_gpus:
raise ValueError("Combining --dynamic-gpus and --services-mode is not supported")
# if we do not need to create queues, make sure they are valid
# match previous behaviour when we validated queue names before everything else
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
# We are not running a daemon we are killing one.
# find the pid send termination signal and leave
if kwargs.get('stop', False):
@@ -854,7 +1010,7 @@ class Worker(ServiceCommandSection):
# make sure we only have a single instance,
# also make sure we set worker_id properly and cache folders
self._singleton()
self._singleton(dynamic=bool(dynamic_gpus))
# check if we have the latest version
start_check_update_daemon()
@@ -915,6 +1071,11 @@ class Worker(ServiceCommandSection):
if not self._session.debug_mode:
self._temp_cleanup_list.append(name)
# on widows we do nothing
if detached and is_windows_platform():
print('Detached not supported on Windows, ignoring --detached')
detached = False
if not detached:
# redirect std out/err to new file
sys.stdout = sys.stderr = out_file
@@ -943,6 +1104,8 @@ class Worker(ServiceCommandSection):
trace=self._session.trace,
),
priority_order=not order_fairness,
gpu_indexes=gpu_indexes,
gpu_queues=dynamic_gpus,
)
except Exception:
tb = six.text_type(traceback.format_exc())
@@ -963,6 +1126,54 @@ class Worker(ServiceCommandSection):
self._unregister(queues)
safe_remove_file(self.temp_config_path)
def _parse_dynamic_gpus(self, kwargs, queues):
dynamic_gpus = kwargs.get('dynamic_gpus', None)
if not dynamic_gpus:
return None, None, queues
queue_names = [q.name for q in queues]
if not all('=' in q for q in queue_names):
raise ValueError("using --dynamic-gpus, --queues [{}], "
"queue must be in format <queue_name>=<num_gpus>".format(queue_names))
gpu_indexes = kwargs.get('gpus')
# test gpus were passed correctly
if not gpu_indexes or len(gpu_indexes.split('-')) > 2 or (',' in gpu_indexes and '-' in gpu_indexes):
raise ValueError('--gpus must be provided, in one of two ways: '
'comma separated \'0,1,2,3\' or range \'0-3\'')
try:
if '-' in gpu_indexes:
gpu_indexes = list(range(int(gpu_indexes.split('-')[0]), 1 + int(gpu_indexes.split('-')[1])))
else:
gpu_indexes = [int(g) for g in gpu_indexes.split(',')]
except Exception:
raise ValueError('Failed parsing --gpus {}'.format(kwargs.get('gpus')))
dynamic_gpus = [(s[:-1 - len(s.split('=')[-1])], int(s.split('=')[-1])) for s in queue_names]
queue_names = [q for q, _ in dynamic_gpus]
# resolve queue ids
dynamic_gpus_q = self._resolve_queue_names(
queue_names, create_if_missing=kwargs.get('create_queue', False))
dynamic_gpus = list(zip(dynamic_gpus_q, [i for _, i in dynamic_gpus]))
dynamic_gpus = sorted(
dynamic_gpus, reverse=True, key=cmp_to_key(
lambda x, y: -1 if x[1] < y[1] or x[1] == y[1] and
dynamic_gpus_q.index(x[0]) > dynamic_gpus_q.index(y[0])
else +1))
# order queue priority based on the combination we have
queues = [q for q, _ in dynamic_gpus]
# test server support
available_gpus = self._dynamic_gpu_get_available(gpu_indexes)
if not self.set_runtime_properties(
key='available_gpus', value=','.join(str(g) for g in available_gpus)):
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
self._dynamic_gpus = True
return dynamic_gpus, gpu_indexes, queues
def report_monitor(self, report):
if not self.monitor:
self.new_monitor(report=report)
@@ -1246,10 +1457,11 @@ class Worker(ServiceCommandSection):
except:
python_version = None
venv_folder, requirements_manager = self.install_virtualenv(
venv_dir=target, requested_python_version=python_version, execution_info=execution)
venv_folder, requirements_manager, is_cached = self.install_virtualenv(
venv_dir=target, requested_python_version=python_version, execution_info=execution,
cached_requirements=requirements)
if self._default_pip:
if not is_cached and self._default_pip:
if install_globally and self.global_package_api:
self.global_package_api.install_packages(*self._default_pip)
else:
@@ -1257,15 +1469,30 @@ class Worker(ServiceCommandSection):
directory, vcs, repo_info = self.get_repo_info(execution, current_task, venv_folder.as_posix())
self.install_requirements(
execution,
repo_info,
requirements_manager=requirements_manager,
cached_requirements=requirements,
cwd=vcs.location if vcs and vcs.location else directory,
package_api=self.global_package_api if install_globally else None,
)
freeze = self.freeze_task_environment(requirements_manager=requirements_manager)
if is_cached:
# reinstalling git / local packages
package_api = copy(self.package_api)
package_api.requirements_manager = self._get_requirements_manager(
base_interpreter=package_api.requirements_manager.get_interpreter(),
requirement_substitutions=[OnlyExternalRequirements]
)
# make sure we run the handlers
cached_requirements = \
{k: package_api.requirements_manager.replace(requirements[k] or '')
for k in requirements}
package_api.load_requirements(cached_requirements)
else:
self.install_requirements(
execution,
repo_info,
requirements_manager=requirements_manager,
cached_requirements=requirements,
cwd=vcs.location if vcs and vcs.location else directory,
package_api=self.global_package_api if install_globally else None,
)
freeze = self.freeze_task_environment(
task_id=task_id, requirements_manager=requirements_manager, update_requirements=False)
script_dir = directory
# Summary
@@ -1431,6 +1658,17 @@ class Worker(ServiceCommandSection):
# We expect the same behaviour in case full_monitoring was set, and in case docker mode is used
if full_monitoring or docker is not False:
if full_monitoring:
if not (ENV_WORKER_ID.get() or '').strip():
self._session.config["agent"]["worker_id"] = ''
# make sure we support multiple instances if we need to
self._singleton()
self.temp_config_path = self.temp_config_path or safe_mkstemp(
suffix=".cfg", prefix=".clearml_agent.", text=True, name_only=True
)
self.dump_config(self.temp_config_path)
self._session._config_file = self.temp_config_path
worker_params = WorkerParams(
log_level=log_level,
config_file=self._session.config_file,
@@ -1442,6 +1680,10 @@ class Worker(ServiceCommandSection):
self.stop_monitor()
self._unregister()
if full_monitoring and self.temp_config_path:
safe_remove_file(self._session.config_file)
Singleton.close_pid_file()
return
self._session.print_configuration()
@@ -1477,10 +1719,11 @@ class Worker(ServiceCommandSection):
except:
python_ver = None
venv_folder, requirements_manager = self.install_virtualenv(
standalone_mode=standalone_mode, requested_python_version=python_ver, execution_info=execution)
venv_folder, requirements_manager, is_cached = self.install_virtualenv(
standalone_mode=standalone_mode, requested_python_version=python_ver,
execution_info=execution, cached_requirements=requirements)
if not standalone_mode:
if not is_cached and not standalone_mode:
if self._default_pip:
self.package_api.install_packages(*self._default_pip)
@@ -1492,7 +1735,23 @@ class Worker(ServiceCommandSection):
print("\n")
if not standalone_mode:
if is_cached and not standalone_mode:
# reinstalling git / local packages
package_api = copy(self.package_api)
package_api.requirements_manager = self._get_requirements_manager(
base_interpreter=package_api.requirements_manager.get_interpreter(),
requirement_substitutions=[OnlyExternalRequirements]
)
package_api.cwd = vcs.location if vcs and vcs.location else directory
# make sure we run the handlers
cached_requirements = \
{k: package_api.requirements_manager.replace(requirements[k] or '')
for k in requirements}
if str(cached_requirements.get('pip', '')).strip() \
or str(cached_requirements.get('conda', '')).strip():
package_api.load_requirements(cached_requirements)
elif not is_cached and not standalone_mode:
self.install_requirements(
execution,
repo_info,
@@ -1506,8 +1765,13 @@ class Worker(ServiceCommandSection):
skip_freeze_update = self.is_conda and not self._session.config.get(
"agent.package_manager.conda_full_env_update", False)
freeze = self.freeze_task_environment(current_task.id if not skip_freeze_update else None,
requirements_manager=requirements_manager)
freeze = self.freeze_task_environment(
task_id=current_task.id,
requirements_manager=requirements_manager,
add_venv_folder_cache=venv_folder,
execution_info=execution,
update_requirements=not skip_freeze_update,
)
script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix()
# run code
@@ -1559,6 +1823,12 @@ class Worker(ServiceCommandSection):
if repo_info:
self._update_commit_id(current_task.id, execution, repo_info)
# get Task Environments and update the process
if self._session.config.get('agent.enable_task_env', None):
hyper_params = self._get_task_os_env(current_task)
if hyper_params:
os.environ.update(hyper_params)
# Add the script CWD to the python path
python_path = get_python_path(script_dir, execution.entry_point, self.package_api, is_conda_env=self.is_conda)
if ENV_TASK_EXTRA_PYTHON_PATH.get():
@@ -1636,6 +1906,20 @@ class Worker(ServiceCommandSection):
return 1 if exit_code is None else exit_code
def _get_task_os_env(self, current_task):
if not self._session.check_min_api_version('2.9'):
return None
# noinspection PyBroadException
try:
hyper_params = self._session.get(
service="tasks", action="get_hyper_params", tasks=[current_task.id])
hyper_params = {
str(p['name']): str(p['value'])
for p in hyper_params['params'][0]['hyperparams'] if p['section'] == 'Environment'}
return hyper_params
except Exception:
return None
def set_docker_variables(self, docker):
temp_config, docker_image_func = self.get_docker_config_cmd(docker)
self.dump_config(self.temp_config_path, config=temp_config)
@@ -1676,6 +1960,7 @@ class Worker(ServiceCommandSection):
repo_info = None
directory = None
vcs = None
script_file = None
if has_repository:
vcs, repo_info = self._get_repo_info(execution, task, venv_folder)
directory = Path(repo_info.root, execution.working_dir or ".")
@@ -1686,17 +1971,26 @@ class Worker(ServiceCommandSection):
self.apply_diff(
task=task, vcs=vcs, execution_info=execution, repo_info=repo_info
)
script_file = Path(directory) / execution.entry_point
if is_literal_script:
self.log.info("found literal script in `script.diff`")
directory, script = literal_script.create_notebook_file(
task, execution, repo_info
)
execution.entry_point = script
if not has_repository:
return directory, None, None
script_file = Path(execution.entry_point)
else:
# in case of no literal script, there is not difference between empty working dir and `.`
execution.working_dir = execution.working_dir or "."
# fix our import patch (in case we have __future__)
if script_file and script_file.is_file():
fix_package_import_diff_patch(script_file.as_posix())
if is_literal_script and not has_repository:
return directory, None, None
if not directory:
assert False, "unreachable code"
return directory, vcs, repo_info
@@ -1801,7 +2095,8 @@ class Worker(ServiceCommandSection):
status_message=self._task_status_change_message,
)
def freeze_task_environment(self, task_id=None, requirements_manager=None):
def freeze_task_environment(self, task_id=None, requirements_manager=None,
add_venv_folder_cache=None, execution_info=None, update_requirements=False):
try:
freeze = self.package_api.freeze()
except Exception as e:
@@ -1816,13 +2111,36 @@ class Worker(ServiceCommandSection):
return freeze
# get original requirements and update with the new frozen requirements
previous_reqs = {}
# noinspection PyBroadException
try:
current_task = get_task(self._session, task_id, only_fields=["script.requirements"])
requirements = current_task.script.requirements
previous_reqs = dict(**requirements)
# replace only once.
if requirements.get('pip') and not requirements.get('org_pip'):
requirements['org_pip'] = requirements.pop('pip')
if requirements.get('conda') and not requirements.get('org_conda'):
requirements['org_conda'] = requirements.pop('conda')
requirements.update(freeze)
except Exception:
requirements = freeze
# add to cache
print('Adding venv into cache: {}'.format(add_venv_folder_cache))
if add_venv_folder_cache:
self.package_api.add_cached_venv(
requirements=[freeze, previous_reqs],
docker_cmd=execution_info.docker_cmd if execution_info else None,
python_version=getattr(self.package_api, 'python', ''),
cuda_version=self._session.config.get("agent.cuda_version"),
source_folder=add_venv_folder_cache,
exclude_sub_folders=['task_repository', 'code'])
# If do not update back requirements
if not update_requirements:
return freeze
request = tasks_api.SetRequirementsRequest(task=task_id, requirements=requirements)
try:
self._session.send_api(request)
@@ -2059,15 +2377,21 @@ class Worker(ServiceCommandSection):
)
def install_virtualenv(
self, venv_dir=None, requested_python_version=None, standalone_mode=False, execution_info=None):
# type: (str, str, bool, ExecutionInfo) -> Tuple[Path, RequirementsManager]
self, venv_dir=None, requested_python_version=None, standalone_mode=False,
execution_info=None, cached_requirements=None):
# type: (str, str, bool, ExecutionInfo, dict) -> Tuple[Path, RequirementsManager, bool]
"""
Install a new python virtual environment, removing the old one if exists
:return: virtualenv directory and requirements manager to use with task
:return: virtualenv directory, requirements manager to use with task, True if there is a cached venv entry
"""
requested_python_version = requested_python_version or \
Text(self._session.config.get("agent.python_binary", None)) or \
Text(self._session.config.get("agent.default_python", None))
if self._session.config.get("agent.ignore_requested_python_version", None):
requested_python_version = ''
requested_python_version = \
requested_python_version or \
Text(self._session.config.get("agent.python_binary", None)) or \
Text(self._session.config.get("agent.default_python", None))
if self.is_conda:
executable_version_suffix = \
requested_python_version[max(requested_python_version.find('python'), 0):].replace('python', '')
@@ -2102,6 +2426,7 @@ class Worker(ServiceCommandSection):
if not standalone_mode:
rm_tree(normalize_path(venv_dir, WORKING_REPOSITORY_DIR))
package_manager_params = dict(
session=self._session,
python=executable_version_suffix if self.is_conda else executable_name,
@@ -2115,6 +2440,8 @@ class Worker(ServiceCommandSection):
session=self._session,
)
call_package_manager_create = False
if not self.is_conda and standalone_mode:
# pip with standalone mode
get_pip = partial(VirtualenvPip, **package_manager_params)
@@ -2130,7 +2457,7 @@ class Worker(ServiceCommandSection):
self.package_api = VirtualenvPip(**package_manager_params)
if first_time:
self.package_api.remove()
self.package_api.create()
call_package_manager_create = True
self.global_package_api = SystemPip(**global_package_manager_params)
elif standalone_mode:
# conda with standalone mode
@@ -2142,6 +2469,7 @@ class Worker(ServiceCommandSection):
self.package_api = get_conda()
# no support for reusing Conda environments
self.package_api.remove()
call_package_manager_create = True
if venv_dir.exists():
timestamp = datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S")
@@ -2156,9 +2484,22 @@ class Worker(ServiceCommandSection):
venv_dir = new_venv_folder
self.package_api = get_conda(path=venv_dir)
# check if we have a cached folder
if cached_requirements and self.package_api.get_cached_venv(
requirements=cached_requirements,
docker_cmd=execution_info.docker_cmd if execution_info else None,
python_version=package_manager_params['python'],
cuda_version=self._session.config.get("agent.cuda_version"),
destination_folder=Path(venv_dir)
):
print('::: Using Cached environment {} :::'.format(self.package_api.get_last_used_entry_cache()))
return venv_dir, requirements_manager, True
# create the initial venv
if call_package_manager_create:
self.package_api.create()
return venv_dir, requirements_manager
return venv_dir, requirements_manager, False
def parse_requirements(self, reqs_file=None, overrides=None):
os = None
@@ -2216,6 +2557,9 @@ class Worker(ServiceCommandSection):
temp_config.put("agent.git_pass", (ENV_AGENT_GIT_PASS.get() or
self._session.config.get("agent.git_pass", None)))
if temp_config.get("agent.venvs_cache.path", None):
temp_config.put("agent.venvs_cache.path", '/root/.clearml/venvs-cache')
self._host_ssh_cache = mkdtemp(prefix='clearml_agent.ssh.')
self._temp_cleanup_list.append(self._host_ssh_cache)
@@ -2228,6 +2572,9 @@ class Worker(ServiceCommandSection):
self._session.config["agent.pip_download_cache.path"])).expanduser().as_posix()
host_vcs_cache = Path(os.path.expandvars(
self._session.config["agent.vcs_cache.path"])).expanduser().as_posix()
host_venvs_cache = Path(os.path.expandvars(
self._session.config["agent.venvs_cache.path"])).expanduser().as_posix() \
if self._session.config.get("agent.venvs_cache.path", None) else None
host_ssh_cache = self._host_ssh_cache
host_apt_cache = Path(os.path.expandvars(self._session.config.get(
@@ -2242,6 +2589,8 @@ class Worker(ServiceCommandSection):
Path(host_pip_dl).mkdir(parents=True, exist_ok=True)
Path(host_vcs_cache).mkdir(parents=True, exist_ok=True)
Path(host_ssh_cache).mkdir(parents=True, exist_ok=True)
if host_venvs_cache:
Path(host_venvs_cache).mkdir(parents=True, exist_ok=True)
# copy the .ssh folder to a temp folder, to be mapped into docker
# noinspection PyBroadException
@@ -2278,6 +2627,7 @@ class Worker(ServiceCommandSection):
mounted_cache_dir = temp_config.get("sdk.storage.cache.default_base_dir")
mounted_pip_dl_dir = temp_config.get("agent.pip_download_cache.path")
mounted_vcs_cache = temp_config.get("agent.vcs_cache.path")
mounted_venvs_cache = temp_config.get("agent.venvs_cache.path", "")
# Make sure we have created the configuration file for the executor
if not self.dump_config(self.temp_config_path, config=temp_config):
@@ -2297,6 +2647,7 @@ class Worker(ServiceCommandSection):
host_cache=host_cache, mounted_cache=mounted_cache_dir,
host_pip_dl=host_pip_dl, mounted_pip_dl=mounted_pip_dl_dir,
host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache,
host_venvs_cache=host_venvs_cache, mounted_venvs_cache=mounted_venvs_cache,
standalone_mode=self._standalone_mode,
force_current_version=self._force_current_version,
bash_script=bash_script,
@@ -2315,6 +2666,7 @@ class Worker(ServiceCommandSection):
host_cache, mounted_cache,
host_pip_dl, mounted_pip_dl,
host_vcs_cache, mounted_vcs_cache,
host_venvs_cache, mounted_venvs_cache,
standalone_mode=False, extra_docker_arguments=None, extra_shell_script=None,
force_current_version=None, host_git_credentials=None,
bash_script=None, preprocess_bash_script=None):
@@ -2370,14 +2722,16 @@ class Worker(ServiceCommandSection):
# expect CLEARML_AGENT_K8S_HOST_MOUNT = '/mnt/host/data:/root/.clearml'
k8s_node_mnt, _, k8s_pod_mnt = ENV_DOCKER_HOST_MOUNT.get().partition(':')
# search and replace all the host folders with the k8s
host_mounts = [host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache]
host_mounts = [host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache, host_venvs_cache]
for i, m in enumerate(host_mounts):
if not m:
continue
if k8s_pod_mnt not in m:
print('Warning: K8S mount missing, ignoring cached folder {}'.format(m))
host_mounts[i] = None
else:
host_mounts[i] = m.replace(k8s_pod_mnt, k8s_node_mnt, 1)
host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache = host_mounts
host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache, host_venvs_cache = host_mounts
# copy the configuration file into the mounted folder
new_conf_file = os.path.join(k8s_pod_mnt, '.clearml_agent.{}.cfg'.format(quote(worker_id, safe="")))
@@ -2467,6 +2821,7 @@ class Worker(ServiceCommandSection):
(['-v', host_pip_dl+':'+mounted_pip_dl] if host_pip_dl else []) +
(['-v', host_cache+':'+mounted_cache] if host_cache else []) +
(['-v', host_vcs_cache+':'+mounted_vcs_cache] if host_vcs_cache else []) +
(['-v', host_venvs_cache + ':' + mounted_venvs_cache] if host_venvs_cache else []) +
['--rm', docker_image, 'bash', '-c',
update_scheme +
extra_shell_script +
@@ -2576,9 +2931,9 @@ class Worker(ServiceCommandSection):
worker_name, worker_id))
return False
def _singleton(self):
def _singleton(self, dynamic=False):
# ensure singleton
worker_id, worker_name = self._generate_worker_id_name()
worker_id, worker_name = self._generate_worker_id_name(dynamic=dynamic)
# if we are running in services mode, we allow double register since
# docker-compose will kill instances before they cleanup
@@ -2593,13 +2948,14 @@ class Worker(ServiceCommandSection):
# update folders based on free slot
self._session.create_cache_folders(slot_index=worker_slot)
def _generate_worker_id_name(self):
def _generate_worker_id_name(self, dynamic=False):
worker_id = self._session.config["agent.worker_id"]
worker_name = self._session.config["agent.worker_name"]
if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None:
nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES')
if nvidia_visible_devices and nvidia_visible_devices.lower() != 'none':
worker_id = '{}:gpu{}'.format(worker_name, nvidia_visible_devices)
worker_id = '{}:{}gpu{}'.format(
worker_name, 'd' if dynamic else '', nvidia_visible_devices)
elif nvidia_visible_devices == '':
pass
else:
@@ -2613,15 +2969,15 @@ class Worker(ServiceCommandSection):
queues = return_list(queues)
if not create_if_missing:
return [self._resolve_name(q.name, "queues") for q in queues]
return [self._resolve_name(q if isinstance(q, str) else q.name, "queues") for q in queues]
queue_ids = []
for q in queues:
try:
q_id = self._resolve_name(q.name, "queues")
q_id = self._resolve_name(q if isinstance(q, str) else q.name, "queues")
except:
self._session.send_api(queues_api.CreateRequest(name=q.name))
q_id = self._resolve_name(q.name, "queues")
self._session.send_api(queues_api.CreateRequest(name=q if isinstance(q, str) else q.name))
q_id = self._resolve_name(q if isinstance(q, str) else q.name, "queues")
queue_ids.append(q_id)
return queue_ids

View File

@@ -16,7 +16,7 @@ def parse(reqstr):
filename = getattr(reqstr, 'name', None)
try:
# Python 2.x compatibility
if not isinstance(reqstr, basestring):
if not isinstance(reqstr, basestring): # noqa
reqstr = reqstr.read()
except NameError:
# Python 3.x only

View File

@@ -36,8 +36,7 @@ class K8sIntegration(Worker):
KUBECTL_RUN_CMD = "kubectl run clearml-{queue_name}-id-{task_id} " \
"--image {docker_image} " \
"--restart=Never --replicas=1 " \
"--generator=run-pod/v1 " \
"--restart=Never " \
"--namespace={namespace}"
KUBECTL_DELETE_CMD = "kubectl delete pods " \
@@ -88,6 +87,7 @@ class K8sIntegration(Worker):
debug=False,
ports_mode=False,
num_of_services=20,
base_pod_num=1,
user_props_cb=None,
overrides_yaml=None,
template_yaml=None,
@@ -111,6 +111,7 @@ class K8sIntegration(Worker):
Requires the `num_of_services` parameter.
:param int num_of_services: Number of k8s services configured in the cluster. Required if `port_mode` is True.
(default: 20)
:param int base_pod_num: Used when `ports_mode` is True, sets the base pod number to a given value (default: 1)
:param callable user_props_cb: An Optional callable allowing additional user properties to be specified
when scheduling a task to run in a pod. Callable can receive an optional pod number and should return
a dictionary of user properties (name and value). Signature is [[Optional[int]], Dict[str,str]]
@@ -133,6 +134,7 @@ class K8sIntegration(Worker):
self.log.logger.setLevel(logging.INFO)
self.ports_mode = ports_mode
self.num_of_services = num_of_services
self.base_pod_num = base_pod_num
self._edit_hyperparams_support = None
self._user_props_cb = user_props_cb
self.conf_file_content = None
@@ -270,13 +272,13 @@ class K8sIntegration(Worker):
return
if task_data.execution.docker_cmd:
docker_parts = task_data.execution.docker_cmd
docker_cmd = task_data.execution.docker_cmd
else:
docker_parts = str(ENV_DOCKER_IMAGE.get() or
self._session.config.get("agent.default_docker.image", "nvidia/cuda"))
docker_cmd = str(ENV_DOCKER_IMAGE.get() or
self._session.config.get("agent.default_docker.image", "nvidia/cuda"))
# take the first part, this is the docker image name (not arguments)
docker_parts = docker_parts.split()
docker_parts = docker_cmd.split()
docker_image = docker_parts[0]
docker_args = docker_parts[1:] if len(docker_parts) > 1 else []
@@ -306,8 +308,10 @@ class K8sIntegration(Worker):
safe_queue_name = re.sub(r'\W+', '', safe_queue_name).replace('_', '').replace('-', '')
# Search for a free pod number
pod_number = 1
pod_count = 0
pod_number = self.base_pod_num
while self.ports_mode:
pod_number = self.base_pod_num + pod_count
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format(
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
agent_label=self.AGENT_LABEL,
@@ -321,7 +325,7 @@ class K8sIntegration(Worker):
if not output:
# No such pod exist so we can use the pod_number we found
break
if pod_number >= self.num_of_services:
if pod_count >= self.num_of_services - 1:
# All pod numbers are taken, exit
self.log.warning(
"kubectl last result: {}\n{}\nAll k8s services are in use, task '{}' "
@@ -333,12 +337,12 @@ class K8sIntegration(Worker):
self._session.api_client.tasks.enqueue(
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)')
return
pod_number += 1
pod_count += 1
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL]
if self.ports_mode:
print("Kubernetes scheduling task id={} on pod={}".format(task_id, pod_number))
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
else:
print("Kubernetes scheduling task id={}".format(task_id))
@@ -350,7 +354,7 @@ class K8sIntegration(Worker):
else:
output, error = self._kubectl_run(
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_image,
labels=labels, docker_image=docker_cmd,
task_data=task_data,
task_id=task_id, queue=queue, queue_name=safe_queue_name)
@@ -364,7 +368,13 @@ class K8sIntegration(Worker):
user_props = {"k8s-queue": str(queue_name)}
if self.ports_mode:
user_props.update({"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]})
user_props.update(
{
"k8s-pod-number": pod_number,
"k8s-pod-label": labels[0],
"k8s-internal-pod-count": pod_count,
}
)
if self._user_props_cb:
# noinspection PyBroadException

View File

@@ -24,7 +24,6 @@ import pyhocon
import yaml
from attr import fields_dict
from pathlib2 import Path
from tqdm import tqdm
import six
from six.moves import reduce
@@ -399,12 +398,6 @@ class TqdmStream(object):
self.buffer.write('\n')
class TqdmLog(tqdm):
def __init__(self, iterable=None, file=None, **kwargs):
super(TqdmLog, self).__init__(iterable, file=TqdmStream(file or sys.stderr), **kwargs)
def url_join(first, *rest):
"""
Join url parts similarly to Path.join

View File

@@ -20,6 +20,7 @@ import platform
import sys
import time
from datetime import datetime
from typing import Optional
import psutil
from ..gpu import pynvml as N
@@ -390,3 +391,34 @@ def new_query(shutdown=False, per_process_stats=False, get_driver_info=False):
'''
return GPUStatCollection.new_query(shutdown=shutdown, per_process_stats=per_process_stats,
get_driver_info=get_driver_info)
def get_driver_cuda_version():
# type: () -> Optional[str]
"""
:return: Return detected CUDA version from driver. On fail return value is None.
Example: `110` is cuda version 11.0
"""
# noinspection PyBroadException
try:
N.nvmlInit()
except BaseException:
return None
# noinspection PyBroadException
try:
cuda_version = str(N.nvmlSystemGetCudaDriverVersion())
except BaseException:
# noinspection PyBroadException
try:
cuda_version = str(N.nvmlSystemGetCudaDriverVersion_v2())
except BaseException:
cuda_version = ''
# noinspection PyBroadException
try:
N.nvmlShutdown()
except BaseException:
return None
return cuda_version[:3] if cuda_version else None

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,218 @@
import os
import shutil
from logging import warning
from random import random
from time import time
from typing import List, Optional, Sequence
import psutil
from pathlib2 import Path
from .locks import FileLock
class FolderCache(object):
_lock_filename = '.clearml.lock'
_lock_timeout_seconds = 30
_temp_entry_prefix = '_temp.'
def __init__(self, cache_folder, max_cache_entries=5, min_free_space_gb=None):
self._cache_folder = Path(os.path.expandvars(cache_folder)).expanduser().absolute()
self._cache_folder.mkdir(parents=True, exist_ok=True)
self._max_cache_entries = max_cache_entries
self._last_copied_entry_folder = None
self._min_free_space_gb = min_free_space_gb if min_free_space_gb and min_free_space_gb > 0 else None
self._lock = FileLock((self._cache_folder / self._lock_filename).as_posix())
def get_cache_folder(self):
# type: () -> Path
"""
:return: Return the base cache folder
"""
return self._cache_folder
def copy_cached_entry(self, keys, destination):
# type: (List[str], Path) -> Optional[Path]
"""
Copy a cached entry into a destination directory, if the cached entry does not exist return None
:param keys:
:param destination:
:return: Target path, None if cached entry does not exist
"""
self._last_copied_entry_folder = None
if not keys:
return None
# lock so we make sure no one deletes it before we copy it
# noinspection PyBroadException
try:
self._lock.acquire(timeout=self._lock_timeout_seconds)
except BaseException as ex:
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
return None
src = None
try:
src = self.get_entry(keys)
if src:
destination = Path(destination).absolute()
destination.mkdir(parents=True, exist_ok=True)
shutil.rmtree(destination.as_posix())
shutil.copytree(src.as_posix(), dst=destination.as_posix(), symlinks=True)
except BaseException as ex:
warning('Could not copy cache folder {} to {}: {}'.format(src, destination, ex))
self._lock.release()
return None
# release Lock
self._lock.release()
self._last_copied_entry_folder = src
return destination if src else None
def get_entry(self, keys):
# type: (List[str]) -> Optional[Path]
"""
Return a folder (a sub-folder of inside the cache_folder) matching one of the keys
:param keys: List of keys, return the first match to one of the keys, notice keys cannot contain '.'
:return: Path to the sub-folder or None if none was found
"""
if not keys:
return None
# conform keys
keys = [keys] if isinstance(keys, str) else keys
keys = sorted([k.replace('.', '_') for k in keys])
for cache_folder in self._cache_folder.glob('*'):
if cache_folder.is_dir() and any(True for k in cache_folder.name.split('.') if k in keys):
cache_folder.touch()
return cache_folder
return None
def add_entry(self, keys, source_folder, exclude_sub_folders=None):
# type: (List[str], Path, Optional[Sequence[str]]) -> bool
"""
Add a local folder into the cache, copy all sub-folders inside `source_folder`
excluding folders matching `exclude_sub_folders` list
:param keys: Cache entry keys list (str)
:param source_folder: Folder to copy into the cache
:param exclude_sub_folders: List of sub-folders to exclude from the copy operation
:return: return True is new entry was added to cache
"""
if not keys:
return False
keys = [keys] if isinstance(keys, str) else keys
keys = sorted([k.replace('.', '_') for k in keys])
# If entry already exists skip it
cached_entry = self.get_entry(keys)
if cached_entry:
# make sure the entry contains all keys
cached_keys = cached_entry.name.split('.')
if set(keys) - set(cached_keys):
# noinspection PyBroadException
try:
self._lock.acquire(timeout=self._lock_timeout_seconds)
except BaseException as ex:
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
# failed locking do nothing
return True
keys = sorted(list(set(keys) | set(cached_keys)))
dst = cached_entry.parent / '.'.join(keys)
# rename
try:
shutil.move(src=cached_entry.as_posix(), dst=dst.as_posix())
except BaseException as ex:
warning('Could not rename cache entry {} to {}: ex'.format(
cached_entry.as_posix(), dst.as_posix(), ex))
# release lock
self._lock.release()
return True
# make sure we remove old entries
self._remove_old_entries()
# if we do not have enough free space, do nothing.
if not self._check_min_free_space():
warning('Could not add cache entry, not enough free space on drive, '
'free space threshold {} GB. Clearing all cache entries!'.format(self._min_free_space_gb))
self._remove_old_entries(max_cache_entries=0)
return False
# create the new entry for us
exclude_sub_folders = exclude_sub_folders or []
source_folder = Path(source_folder).absolute()
# create temp folder
temp_folder = \
self._temp_entry_prefix + \
'{}.{}'.format(str(time()).replace('.', '_'), str(random()).replace('.', '_'))
temp_folder = self._cache_folder / temp_folder
temp_folder.mkdir(parents=True, exist_ok=False)
for f in source_folder.glob('*'):
if f.name in exclude_sub_folders:
continue
shutil.copytree(src=f.as_posix(), dst=(temp_folder / f.name).as_posix(), symlinks=True)
# rename the target folder
target_cache_folder = self._cache_folder / '.'.join(keys)
# if we failed moving it means someone else created the cached entry before us, we can just leave
# noinspection PyBroadException
try:
shutil.move(src=temp_folder.as_posix(), dst=target_cache_folder.as_posix())
except BaseException:
# noinspection PyBroadException
try:
shutil.rmtree(path=temp_folder.as_posix())
except BaseException:
return False
return True
def get_last_copied_entry(self):
# type: () -> Optional[Path]
"""
:return: the last copied cached entry folder inside the cache
"""
return self._last_copied_entry_folder
def _remove_old_entries(self, max_cache_entries=None):
# type: (Optional[int]) -> ()
"""
Notice we only keep self._max_cache_entries-1, assuming we will be adding a new entry soon
:param int max_cache_entries: if not None use instead of self._max_cache_entries
"""
folder_entries = [(cache_folder, cache_folder.stat().st_mtime)
for cache_folder in self._cache_folder.glob('*')
if cache_folder.is_dir() and not cache_folder.name.startswith(self._temp_entry_prefix)]
folder_entries = sorted(folder_entries, key=lambda x: x[1], reverse=True)
# lock so we make sure no one deletes it before we copy it
# noinspection PyBroadException
try:
self._lock.acquire(timeout=self._lock_timeout_seconds)
except BaseException as ex:
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
return
number_of_entries_to_keep = self._max_cache_entries - 1 \
if max_cache_entries is None else max(0, int(max_cache_entries))
for folder, ts in folder_entries[number_of_entries_to_keep:]:
try:
shutil.rmtree(folder.as_posix(), ignore_errors=True)
except BaseException as ex:
warning('Could not delete cache entry {}: {}'.format(folder.as_posix(), ex))
self._lock.release()
def _check_min_free_space(self):
# type: () -> bool
"""
:return: return False if we hit the free space limit.
If not free space limit provided, always return True
"""
if not self._min_free_space_gb or not self._cache_folder:
return True
free_space = float(psutil.disk_usage(self._cache_folder.as_posix()).free)
free_space /= 2**30
return free_space > self._min_free_space_gb

View File

@@ -0,0 +1,211 @@
import os
import time
import tempfile
import contextlib
from .portalocker import constants, exceptions, lock, unlock
current_time = getattr(time, "monotonic", time.time)
DEFAULT_TIMEOUT = 10 ** 8
DEFAULT_CHECK_INTERVAL = 0.25
LOCK_METHOD = constants.LOCK_EX | constants.LOCK_NB
__all__ = [
'FileLock',
'open_atomic',
]
@contextlib.contextmanager
def open_atomic(filename, binary=True):
"""Open a file for atomic writing. Instead of locking this method allows
you to write the entire file and move it to the actual location. Note that
this makes the assumption that a rename is atomic on your platform which
is generally the case but not a guarantee.
http://docs.python.org/library/os.html#os.rename
>>> filename = 'test_file.txt'
>>> if os.path.exists(filename):
... os.remove(filename)
>>> with open_atomic(filename) as fh:
... written = fh.write(b'test')
>>> assert os.path.exists(filename)
>>> os.remove(filename)
"""
assert not os.path.exists(filename), '%r exists' % filename
path, name = os.path.split(filename)
# Create the parent directory if it doesn't exist
if path and not os.path.isdir(path): # pragma: no cover
os.makedirs(path)
temp_fh = tempfile.NamedTemporaryFile(
mode=binary and 'wb' or 'w',
dir=path,
delete=False,
)
yield temp_fh
temp_fh.flush()
os.fsync(temp_fh.fileno())
temp_fh.close()
try:
os.rename(temp_fh.name, filename)
finally:
try:
os.remove(temp_fh.name)
except Exception: # noqa
pass
class FileLock(object):
def __init__(
self, filename, mode='a', timeout=DEFAULT_TIMEOUT,
check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=False,
flags=LOCK_METHOD, **file_open_kwargs):
"""Lock manager with build-in timeout
filename -- filename
mode -- the open mode, 'a' or 'ab' should be used for writing
truncate -- use truncate to emulate 'w' mode, None is disabled, 0 is
truncate to 0 bytes
timeout -- timeout when trying to acquire a lock
check_interval -- check interval while waiting
fail_when_locked -- after the initial lock failed, return an error
or lock the file
**file_open_kwargs -- The kwargs for the `open(...)` call
fail_when_locked is useful when multiple threads/processes can race
when creating a file. If set to true than the system will wait till
the lock was acquired and then return an AlreadyLocked exception.
Note that the file is opened first and locked later. So using 'w' as
mode will result in truncate _BEFORE_ the lock is checked.
"""
if 'w' in mode:
truncate = True
mode = mode.replace('w', 'a')
else:
truncate = False
self.fh = None
self.filename = filename
self.mode = mode
self.truncate = truncate
self.timeout = timeout
self.check_interval = check_interval
self.fail_when_locked = fail_when_locked
self.flags = flags
self.file_open_kwargs = file_open_kwargs
def acquire(
self, timeout=None, check_interval=None, fail_when_locked=None):
"""Acquire the locked filehandle"""
if timeout is None:
timeout = self.timeout
if timeout is None:
timeout = 0
if check_interval is None:
check_interval = self.check_interval
if fail_when_locked is None:
fail_when_locked = self.fail_when_locked
# If we already have a filehandle, return it
fh = self.fh
if fh:
return fh
# Get a new filehandler
fh = self._get_fh()
try:
# Try to lock
fh = self._get_lock(fh)
except exceptions.LockException as exception:
# Try till the timeout has passed
timeoutend = current_time() + timeout
while timeoutend > current_time():
# Wait a bit
time.sleep(check_interval)
# Try again
try:
# We already tried to the get the lock
# If fail_when_locked is true, then stop trying
if fail_when_locked:
raise exceptions.AlreadyLocked(exception)
else: # pragma: no cover
# We've got the lock
fh = self._get_lock(fh)
break
except exceptions.LockException:
pass
else:
# We got a timeout... reraising
raise exceptions.LockException(exception)
# Prepare the filehandle (truncate if needed)
fh = self._prepare_fh(fh)
self.fh = fh
return fh
def release(self):
"""Releases the currently locked file handle"""
if self.fh:
# noinspection PyBroadException
try:
unlock(self.fh)
except Exception:
pass
# noinspection PyBroadException
try:
self.fh.close()
except Exception:
pass
self.fh = None
def _get_fh(self):
"""Get a new filehandle"""
return open(self.filename, self.mode, **self.file_open_kwargs)
def _get_lock(self, fh):
"""
Try to lock the given filehandle
returns LockException if it fails"""
lock(fh, self.flags)
return fh
def _prepare_fh(self, fh):
"""
Prepare the filehandle for usage
If truncate is a number, the file will be truncated to that amount of
bytes
"""
if self.truncate:
fh.seek(0)
fh.truncate(0)
return fh
def __enter__(self):
return self.acquire()
def __exit__(self, type_, value, tb):
self.release()
def __delete__(self, instance): # pragma: no cover
instance.release()

View File

@@ -0,0 +1,193 @@
import os
import sys
class exceptions:
class BaseLockException(Exception):
# Error codes:
LOCK_FAILED = 1
def __init__(self, *args, **kwargs):
self.fh = kwargs.pop('fh', None)
Exception.__init__(self, *args, **kwargs)
class LockException(BaseLockException):
pass
class AlreadyLocked(BaseLockException):
pass
class FileToLarge(BaseLockException):
pass
class constants:
# The actual tests will execute the code anyhow so the following code can
# safely be ignored from the coverage tests
if os.name == 'nt': # pragma: no cover
import msvcrt
LOCK_EX = 0x1 #: exclusive lock
LOCK_SH = 0x2 #: shared lock
LOCK_NB = 0x4 #: non-blocking
LOCK_UN = msvcrt.LK_UNLCK #: unlock
LOCKFILE_FAIL_IMMEDIATELY = 1
LOCKFILE_EXCLUSIVE_LOCK = 2
elif os.name == 'posix': # pragma: no cover
import fcntl
LOCK_EX = fcntl.LOCK_EX #: exclusive lock
LOCK_SH = fcntl.LOCK_SH #: shared lock
LOCK_NB = fcntl.LOCK_NB #: non-blocking
LOCK_UN = fcntl.LOCK_UN #: unlock
else: # pragma: no cover
raise RuntimeError('PortaLocker only defined for nt and posix platforms')
if os.name == 'nt': # pragma: no cover
import msvcrt
if sys.version_info.major == 2:
lock_length = -1
else:
lock_length = int(2**31 - 1)
def lock(file_, flags):
if flags & constants.LOCK_SH:
import win32file
import pywintypes
import winerror
__overlapped = pywintypes.OVERLAPPED()
if sys.version_info.major == 2:
if flags & constants.LOCK_NB:
mode = constants.LOCKFILE_FAIL_IMMEDIATELY
else:
mode = 0
else:
if flags & constants.LOCK_NB:
mode = msvcrt.LK_NBRLCK
else:
mode = msvcrt.LK_RLCK
# is there any reason not to reuse the following structure?
hfile = win32file._get_osfhandle(file_.fileno())
try:
win32file.LockFileEx(hfile, mode, 0, -0x10000, __overlapped)
except pywintypes.error as exc_value:
# error: (33, 'LockFileEx', 'The process cannot access the file
# because another process has locked a portion of the file.')
if exc_value.winerror == winerror.ERROR_LOCK_VIOLATION:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
else:
# Q: Are there exceptions/codes we should be dealing with
# here?
raise
else:
mode = constants.LOCKFILE_EXCLUSIVE_LOCK
if flags & constants.LOCK_NB:
mode |= constants.LOCKFILE_FAIL_IMMEDIATELY
if flags & constants.LOCK_NB:
mode = msvcrt.LK_NBLCK
else:
mode = msvcrt.LK_LOCK
# windows locks byte ranges, so make sure to lock from file start
try:
savepos = file_.tell()
if savepos:
# [ ] test exclusive lock fails on seek here
# [ ] test if shared lock passes this point
file_.seek(0)
# [x] check if 0 param locks entire file (not documented in
# Python)
# [x] fails with "IOError: [Errno 13] Permission denied",
# but -1 seems to do the trick
try:
msvcrt.locking(file_.fileno(), mode, lock_length)
except IOError as exc_value:
# [ ] be more specific here
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
finally:
if savepos:
file_.seek(savepos)
except IOError as exc_value:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
fh=file_)
def unlock(file_):
try:
savepos = file_.tell()
if savepos:
file_.seek(0)
try:
msvcrt.locking(file_.fileno(), constants.LOCK_UN, lock_length)
except IOError as exc_value:
if exc_value.strerror == 'Permission denied':
import pywintypes
import win32file
import winerror
__overlapped = pywintypes.OVERLAPPED()
hfile = win32file._get_osfhandle(file_.fileno())
try:
win32file.UnlockFileEx(
hfile, 0, -0x10000, __overlapped)
except pywintypes.error as exc_value:
if exc_value.winerror == winerror.ERROR_NOT_LOCKED:
# error: (158, 'UnlockFileEx',
# 'The segment is already unlocked.')
# To match the 'posix' implementation, silently
# ignore this error
pass
else:
# Q: Are there exceptions/codes we should be
# dealing with here?
raise
else:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
finally:
if savepos:
file_.seek(savepos)
except IOError as exc_value:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
fh=file_)
elif os.name == 'posix': # pragma: no cover
import fcntl
def lock(file_, flags):
locking_exceptions = IOError,
try: # pragma: no cover
locking_exceptions += BlockingIOError,
except NameError: # pragma: no cover
pass
try:
fcntl.flock(file_.fileno(), flags)
except locking_exceptions as exc_value:
# The exception code varies on different systems so we'll catch
# every IO error
raise exceptions.LockException(exc_value, fh=file_)
def unlock(file_):
fcntl.flock(file_.fileno(), constants.LOCK_UN)
else: # pragma: no cover
raise RuntimeError('PortaLocker only defined for nt and posix platforms')

View File

@@ -1,11 +1,16 @@
from __future__ import unicode_literals
import abc
from collections import OrderedDict
from contextlib import contextmanager
from typing import Text, Iterable, Union
from typing import Text, Iterable, Union, Optional, Dict, List
from pathlib2 import Path
from hashlib import md5
import six
from clearml_agent.helper.base import mkstemp, safe_remove_file, join_lines, select_for_platform
from clearml_agent.helper.console import ensure_binary
from clearml_agent.helper.os.folder_cache import FolderCache
from clearml_agent.helper.process import Executable, Argv, PathLike
@@ -18,6 +23,12 @@ class PackageManager(object):
_selected_manager = None
_cwd = None
_pip_version = None
_config_cache_folder = 'agent.venvs_cache.path'
_config_cache_max_entries = 'agent.venvs_cache.max_entries'
_config_cache_free_space_threshold = 'agent.venvs_cache.free_space_threshold_gb'
def __init__(self):
self._cache_manager = None
@abc.abstractproperty
def bin(self):
@@ -153,3 +164,100 @@ class PackageManager(object):
@classmethod
def get_pip_version(cls):
return cls._pip_version or ''
def get_cached_venv(self, requirements, docker_cmd, python_version, cuda_version, destination_folder):
# type: (Dict, Optional[Union[dict, str]], Optional[str], Optional[str], Path) -> Optional[Path]
"""
Copy a cached copy of the venv (based on the requirements) into destination_folder.
Return None if failed or cached entry does not exist
"""
if not self._get_cache_manager():
return None
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
return self._get_cache_manager().copy_cached_entry(keys, destination_folder)
def add_cached_venv(
self,
requirements, # type: Union[Dict, List[Dict]]
docker_cmd, # type: Optional[Union[dict, str]]
python_version, # type: Optional[str]
cuda_version, # type: Optional[str]
source_folder, # type: Path
exclude_sub_folders=None # type: Optional[List[str]]
):
# type: (...) -> ()
"""
Copy the local venv folder into the venv cache (keys are based on the requirements+python+docker).
"""
if not self._get_cache_manager():
return
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
return self._get_cache_manager().add_entry(
keys=keys, source_folder=source_folder, exclude_sub_folders=exclude_sub_folders)
def get_cache_folder(self):
# type: () -> Optional[Path]
if not self._get_cache_manager():
return
return self._get_cache_manager().get_cache_folder()
def get_last_used_entry_cache(self):
# type: () -> Optional[Path]
"""
:return: the last used cached folder entry
"""
if not self._get_cache_manager():
return
return self._get_cache_manager().get_last_copied_entry()
@classmethod
def _generate_reqs_hash_keys(cls, requirements_list, docker_cmd, python_version, cuda_version):
# type: (Union[Dict, List[Dict]], Optional[Union[dict, str]], Optional[str], Optional[str]) -> List[str]
requirements_list = requirements_list or dict()
if not isinstance(requirements_list, (list, tuple)):
requirements_list = [requirements_list]
docker_cmd = dict(docker_cmd=docker_cmd) if isinstance(docker_cmd, str) else docker_cmd or dict()
docker_cmd = OrderedDict(sorted(docker_cmd.items(), key=lambda t: t[0]))
if 'docker_cmd' in docker_cmd:
# we only take the first part of the docker_cmd which is the docker image name
docker_cmd['docker_cmd'] = docker_cmd['docker_cmd'].strip('\r\n\t ').split(' ')[0]
keys = []
strip_chars = '\n\r\t '
for requirements in requirements_list:
pip, conda = ('pip', 'conda')
pip_reqs = requirements.get(pip, '')
conda_reqs = requirements.get(conda, '')
if isinstance(pip_reqs, str):
pip_reqs = pip_reqs.split('\n')
if isinstance(conda_reqs, str):
conda_reqs = conda_reqs.split('\n')
pip_reqs = sorted([p.strip(strip_chars) for p in pip_reqs
if p.strip(strip_chars) and not p.strip(strip_chars).startswith('#')])
conda_reqs = sorted([p.strip(strip_chars) for p in conda_reqs
if p.strip(strip_chars) and not p.strip(strip_chars).startswith('#')])
if not pip_reqs and not conda_reqs:
continue
hash_text = '{class_type}\n{docker_cmd}\n{cuda_ver}\n{python_version}\n{pip_reqs}\n{conda_reqs}'.format(
class_type=str(cls),
docker_cmd=str(docker_cmd or ''),
cuda_ver=str(cuda_version or ''),
python_version=str(python_version or ''),
pip_reqs=str(pip_reqs or ''),
conda_reqs=str(conda_reqs or ''),
)
keys.append(md5(ensure_binary(hash_text)).hexdigest())
return sorted(list(set(keys)))
def _get_cache_manager(self):
if not self._cache_manager:
cache_folder = self.session.config.get(self._config_cache_folder, None)
if not cache_folder:
return None
max_entries = int(self.session.config.get(self._config_cache_max_entries, 10))
free_space_threshold = float(self.session.config.get(self._config_cache_free_space_threshold, 0))
self._cache_manager = FolderCache(
cache_folder, max_cache_entries=max_entries, min_free_space_gb=free_space_threshold)
return self._cache_manager

View File

@@ -69,6 +69,7 @@ class CondaAPI(PackageManager):
:param python: base python version to use (e.g python3.6)
:param path: path of env
"""
super(CondaAPI, self).__init__()
self.session = session
self.python = python
self.source = None
@@ -140,19 +141,7 @@ class CondaAPI(PackageManager):
"""
if self.conda_env_as_base_docker and self.conda_pre_build_env_path:
if Path(self.conda_pre_build_env_path).is_dir():
print("Using pre-existing Conda environment from {}".format(self.conda_pre_build_env_path))
self.path = Path(self.conda_pre_build_env_path)
self.source = ("conda", "activate", self.path.as_posix())
self.pip = CondaPip(
session=self.session,
source=self.source,
python=self.python,
requirements_manager=self.requirements_manager,
path=self.path,
)
conda_env = self._get_conda_sh()
self.source = self.pip.source = CommandSequence(('source', conda_env.as_posix()), self.source)
self.env_read_only = True
self._init_existing_environment(self.conda_pre_build_env_path)
return self
elif Path(self.conda_pre_build_env_path).is_file():
print("Restoring Conda environment from {}".format(self.conda_pre_build_env_path))
@@ -210,6 +199,21 @@ class CondaAPI(PackageManager):
pass
return self
def _init_existing_environment(self, conda_pre_build_env_path):
print("Using pre-existing Conda environment from {}".format(conda_pre_build_env_path))
self.path = Path(conda_pre_build_env_path)
self.source = ("conda", "activate", self.path.as_posix())
self.pip = CondaPip(
session=self.session,
source=self.source,
python=self.python,
requirements_manager=self.requirements_manager,
path=self.path,
)
conda_env = self._get_conda_sh()
self.source = self.pip.source = CommandSequence(('source', conda_env.as_posix()), self.source)
self.env_read_only = True
def remove(self):
"""
Delete a conda environment.
@@ -501,6 +505,8 @@ class CondaAPI(PackageManager):
reqs.append(m)
# if we have a conda list, the rest should be installed with pip,
# this means any experiment that was executed with pip environment,
# will be installed using pip
if requirements.get('conda', None) is not None:
for r in requirements['pip']:
try:
@@ -514,7 +520,7 @@ class CondaAPI(PackageManager):
# skip over local files (we cannot change the version to a local file)
if m.local_file:
continue
m_name = m.name.lower()
m_name = (m.name or '').lower()
if m_name in conda_supported_req_names:
# this package is in the conda list,
# make sure that if we changed version and we match it in conda
@@ -551,7 +557,7 @@ class CondaAPI(PackageManager):
# conform conda packages (version/name)
for r in reqs:
# change _ to - in name but not the prefix _ (as this is conda prefix)
if not r.name.startswith('_') and not requirements.get('conda', None):
if r.name and not r.name.startswith('_') and not requirements.get('conda', None):
r.name = r.name.replace('_', '-')
# remove .post from version numbers, it fails ~= version, and change == to ~=
if r.specs and r.specs[0]:
@@ -665,6 +671,8 @@ class CondaAPI(PackageManager):
return result
def get_python_command(self, extra=()):
if not self.source:
self._init_existing_environment(self.path)
return CommandSequence(self.source, self.pip.get_python_command(extra=extra))
def _get_conda_sh(self):

View File

@@ -17,6 +17,16 @@ class ExternalRequirements(SimpleSubstitution):
self.post_install_req_lookup = OrderedDict()
def match(self, req):
# match local folder building:
# noinspection PyBroadException
try:
if not req.name and req.req and not req.req.editable and not req.req.vcs and \
req.req.line and req.req.line.strip().split('#')[0] and \
not req.req.line.strip().split('#')[0].lower().endswith('.whl'):
return True
except Exception:
pass
# match both editable or code or unparsed
if not (not req.name or req.req and (req.req.editable or req.req.vcs)):
return False
@@ -104,3 +114,20 @@ class ExternalRequirements(SimpleSubstitution):
list_of_requirements[k] += [self.post_install_req_lookup.get(r, '')
for r in self.post_install_req_lookup.keys() if r in original_requirements]
return list_of_requirements
class OnlyExternalRequirements(ExternalRequirements):
def __init__(self, *args, **kwargs):
super(OnlyExternalRequirements, self).__init__(*args, **kwargs)
def match(self, req):
return not super(OnlyExternalRequirements, self).match(req)
def replace(self, req):
"""
Replace a requirement
:raises: ValueError if version is pre-release
"""
# Do not store the skipped requirements
# mark skip package
return Text('')

View File

@@ -17,6 +17,7 @@ class SystemPip(PackageManager):
"""
Program interface to the system pip.
"""
super(SystemPip, self).__init__()
self._bin = interpreter or sys.executable
self.session = session

View File

@@ -17,6 +17,7 @@ import six
from clearml_agent.definitions import PIP_EXTRA_INDICES
from clearml_agent.helper.base import warning, is_conda, which, join_lines, is_windows_platform
from clearml_agent.helper.process import Argv, PathLike
from clearml_agent.helper.gpu.gpustat import get_driver_cuda_version
from clearml_agent.session import Session, normalize_cuda_version
from clearml_agent.external.requirements_parser import parse
from clearml_agent.external.requirements_parser.requirement import Requirement
@@ -446,6 +447,7 @@ class RequirementsManager(object):
'cu'+agent['cuda_version'] if self.found_cuda else 'cpu')
self.translator = RequirementsTranslator(session, interpreter=base_interpreter,
cache_dir=pip_cache_dir.as_posix())
self._base_interpreter = base_interpreter
def register(self, cls): # type: (Type[RequirementSubstitution]) -> None
self.handlers.append(cls(self._session))
@@ -529,6 +531,9 @@ class RequirementsManager(object):
pass
return requirements
def get_interpreter(self):
return self._base_interpreter
@staticmethod
def get_cuda_version(config): # type: (ConfigTree) -> (Text, Text)
# we assume os.environ already updated the config['agent.cuda_version'] & config['agent.cudnn_version']
@@ -537,6 +542,9 @@ class RequirementsManager(object):
if cuda_version and cudnn_version:
return normalize_cuda_version(cuda_version), normalize_cuda_version(cudnn_version)
if not cuda_version:
cuda_version = get_driver_cuda_version()
if not cuda_version and is_windows_platform():
try:
cuda_vers = [int(k.replace('CUDA_PATH_V', '').replace('_', '')) for k in os.environ.keys()
@@ -601,4 +609,3 @@ class RequirementsManager(object):
return (normalize_cuda_version(cuda_version or 0),
normalize_cuda_version(cudnn_version or 0))

View File

@@ -7,7 +7,7 @@ import re
import subprocess
import sys
from contextlib import contextmanager
from copy import deepcopy
from copy import copy
from distutils.spawn import find_executable
from itertools import chain, repeat, islice
from os.path import devnull
@@ -276,9 +276,9 @@ class CommandSequence(Executable):
self.commands = []
for c in commands:
if isinstance(c, CommandSequence):
self.commands.extend(deepcopy(c.commands))
self.commands.extend([copy(p) for p in c.commands])
elif isinstance(c, Argv):
self.commands.append(deepcopy(c))
self.commands.append(copy(c))
else:
self.commands.append(Argv(*c, log=self._log))
@@ -420,7 +420,7 @@ SOURCE_COMMAND = select_for_platform(linux="source", windows="call")
class ExitStatus(object):
success = 0
failure = 1
interrupted = 2
interrupted = -2
COMMAND_SUCCESS = 0

View File

@@ -5,6 +5,8 @@ import subprocess
from distutils.spawn import find_executable
from hashlib import md5
from os import environ
from random import random
from threading import Lock
from typing import Text, Sequence, Mapping, Iterable, TypeVar, Callable, Tuple, Optional
import attr
@@ -23,6 +25,7 @@ from clearml_agent.helper.base import (
normalize_path,
create_file_if_not_exists,
)
from clearml_agent.helper.os.locks import FileLock
from clearml_agent.helper.process import DEVNULL, Argv, PathLike, COMMAND_SUCCESS
from clearml_agent.session import Session
@@ -585,6 +588,9 @@ def clone_repository_cached(session, execution, destination):
:return: repository information
:raises: CommandFailedError if git/hg is not installed
"""
# mock lock
repo_lock = Lock()
repo_lock_timeout_sec = 300
repo_url = execution.repository # type: str
parsed_url = furl(repo_url)
no_password_url = parsed_url.copy().remove(password=True).url
@@ -596,37 +602,48 @@ def clone_repository_cached(session, execution, destination):
if standalone_mode:
cached_repo_path = clone_folder
else:
cached_repo_path = (
Path(session.config["agent.vcs_cache.path"]).expanduser()
/ "{}.{}".format(clone_folder_name, md5(ensure_binary(repo_url)).hexdigest())
/ clone_folder_name
) # type: Path
vcs_cache_path = Path(session.config["agent.vcs_cache.path"]).expanduser()
repo_hash = md5(ensure_binary(repo_url)).hexdigest()
# create lock
repo_lock = FileLock(filename=(vcs_cache_path / '{}.lock'.format(repo_hash)).as_posix())
# noinspection PyBroadException
try:
repo_lock.acquire(timeout=repo_lock_timeout_sec)
except BaseException:
print('Could not lock cache folder "{}" (timeout {} sec), using temp vcs cache.'.format(
clone_folder_name, repo_lock_timeout_sec))
repo_hash = '{}_{}'.format(repo_hash, str(random()).replace('.', ''))
# use mock lock for the context
repo_lock = Lock()
# select vcs cache folder
cached_repo_path = vcs_cache_path / "{}.{}".format(clone_folder_name, repo_hash) / clone_folder_name
vcs = VcsFactory.create(
session, execution_info=execution, location=cached_repo_path
)
if not find_executable(vcs.executable_name):
raise CommandFailedError(vcs.executable_not_found_error_help())
with repo_lock:
vcs = VcsFactory.create(
session, execution_info=execution, location=cached_repo_path
)
if not find_executable(vcs.executable_name):
raise CommandFailedError(vcs.executable_not_found_error_help())
if not standalone_mode:
if session.config["agent.vcs_cache.enabled"] and cached_repo_path.exists():
print('Using cached repository in "{}"'.format(cached_repo_path))
if not standalone_mode:
if session.config["agent.vcs_cache.enabled"] and cached_repo_path.exists():
print('Using cached repository in "{}"'.format(cached_repo_path))
else:
print("cloning: {}".format(no_password_url))
rm_tree(cached_repo_path)
# We clone the entire repository, not a specific branch
vcs.clone() # branch=execution.branch)
else:
print("cloning: {}".format(no_password_url))
rm_tree(cached_repo_path)
# We clone the entire repository, not a specific branch
vcs.clone() # branch=execution.branch)
vcs.pull()
rm_tree(destination)
shutil.copytree(Text(cached_repo_path), Text(clone_folder))
if not clone_folder.is_dir():
raise CommandFailedError(
"copying of repository failed: from {} to {}".format(
cached_repo_path, clone_folder
vcs.pull()
rm_tree(destination)
shutil.copytree(Text(cached_repo_path), Text(clone_folder))
if not clone_folder.is_dir():
raise CommandFailedError(
"copying of repository failed: from {} to {}".format(
cached_repo_path, clone_folder
)
)
)
# checkout in the newly copy destination
vcs.location = Text(clone_folder)
@@ -638,3 +655,70 @@ def clone_repository_cached(session, execution, destination):
repo_info = attr.evolve(repo_info, url=no_password_url)
return vcs, repo_info
def fix_package_import_diff_patch(entry_script_file):
# noinspection PyBroadException
try:
with open(entry_script_file, 'rt') as f:
lines = f.readlines()
except Exception:
return
# make sre we are the first import (i.e. we patched the source code)
if not lines or not lines[0].strip().startswith('from clearml ') or 'Task.init' not in lines[1]:
return
original_lines = lines
# skip over the first two lines, they are ours
# then skip over empty or comment lines
lines = [(i, line.split('#', 1)[0].rstrip()) for i, line in enumerate(lines)
if i >= 2 and line.strip('\r\n\t ') and not line.strip().startswith('#')]
# remove triple quotes ' """ '
nested_c = -1
skip_lines = []
for i, line_pair in enumerate(lines):
for _ in line_pair[1].split('"""')[1:]:
if nested_c >= 0:
skip_lines.extend(list(range(nested_c, i+1)))
nested_c = -1
else:
nested_c = i
# now select all the
lines = [pair for i, pair in enumerate(lines) if i not in skip_lines]
from_future = re.compile(r"^from[\s]*__future__[\s]*")
import_future = re.compile(r"^import[\s]*__future__[\s]*")
# test if we have __future__ import
found_index = -1
for a_i, (_, a_line) in enumerate(lines):
if found_index >= a_i:
continue
if from_future.match(a_line) or import_future.match(a_line):
found_index = a_i
# check the last import block
i, line = lines[found_index]
# wither we have \\ character at the end of the line or the line is indented
parenthesized_lines = '(' in line and ')' not in line
while line.endswith('\\') or parenthesized_lines:
found_index += 1
i, line = lines[found_index]
if ')' in line:
break
else:
break
# no imports found
if found_index < 0:
return
# now we need to move back the patched two lines
entry_line = lines[found_index][0]
new_lines = original_lines[2:entry_line + 1] + original_lines[0:2] + original_lines[entry_line + 1:]
# noinspection PyBroadException
try:
with open(entry_script_file, 'wt') as f:
f.writelines(new_lines)
except Exception:
return

View File

@@ -129,8 +129,9 @@ def get_uptime_string(entry):
def get_runtime_properties_string(runtime_properties):
# type: (List[dict]) -> Tuple[Optional[str], str]
# type: (Optional[List[dict]]) -> Tuple[Optional[str], str]
server_string = []
runtime_properties = runtime_properties or []
force_flag = next(
(prop for prop in runtime_properties if prop["key"] == UptimeConf.worker_key),
None,

View File

@@ -7,7 +7,7 @@ from tempfile import gettempdir, NamedTemporaryFile
from typing import List, Tuple, Optional
from clearml_agent.definitions import ENV_DOCKER_HOST_MOUNT
from clearml_agent.helper.base import warning
from clearml_agent.helper.base import warning, is_windows_platform, safe_remove_file
class Singleton(object):
@@ -22,6 +22,13 @@ class Singleton(object):
_lock_timeout = 10
_pid = None
@classmethod
def close_pid_file(cls):
if cls._pid_file:
cls._pid_file.close()
safe_remove_file(cls._pid_file.name)
cls._pid_file = None
@classmethod
def update_pid_file(cls):
new_pid = str(os.getpid())
@@ -115,7 +122,7 @@ class Singleton(object):
@classmethod
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
if cls.worker_id:
if cls.worker_id and cls.instance_slot is not None:
return cls.worker_id, cls.instance_slot
# make sure we have a unique name
instance_num = 0
@@ -167,7 +174,9 @@ class Singleton(object):
# create lock
cls._pid = str(os.getpid())
cls._pid_file = NamedTemporaryFile(
dir=cls._get_temp_folder(), prefix=cls.prefix + cls.sep + cls._pid + cls.sep, suffix=cls.ext)
dir=cls._get_temp_folder(), prefix=cls.prefix + cls.sep + cls._pid + cls.sep, suffix=cls.ext,
delete=False if is_windows_platform() else True
)
cls._pid_file.write(('{}\n{}'.format(unique_worker_id, cls.instance_slot)).encode())
cls._pid_file.flush()
cls.worker_id = unique_worker_id

View File

@@ -78,7 +78,10 @@ DAEMON_ARGS = dict({
},
'--services-mode': {
'help': 'Launch multiple long-term docker services. Implies docker & cpu-only flags.',
'action': 'store_true',
'nargs': '?',
'const': -1,
'type': int,
'default': None,
},
'--create-queue': {
'help': 'Create requested queue if it does not exist already.',
@@ -93,6 +96,12 @@ DAEMON_ARGS = dict({
'help': 'Stop the running agent (based on the same set of arguments)',
'action': 'store_true',
},
'--dynamic-gpus': {
'help': 'Allow to dynamically allocate gpus based on queue properties, '
'configure with \'--queues <queue_name>=<num_gpus>\'.'
' Example: \'--dynamic-gpus --queue dual_gpus=2 single_gpu=1\'',
'action': 'store_true',
},
'--uptime': {
'help': 'Specify uptime for clearml-agent in "<hours> <days>" format. for example, use "17-20 TUE" to set '
'Tuesday\'s uptime to 17-20'

View File

@@ -204,7 +204,7 @@ class Session(_Session):
folder_keys = ('agent.venvs_dir', 'agent.vcs_cache.path',
'agent.pip_download_cache.path',
'agent.docker_pip_cache', 'agent.docker_apt_cache')
singleton_folders = ('agent.venvs_dir', 'agent.vcs_cache.path', 'agent.docker_apt_cache')
singleton_folders = ('agent.venvs_dir', 'agent.docker_apt_cache')
if ENV_TASK_EXECUTE_AS_USER.get():
folder_keys = tuple(list(folder_keys) + ['sdk.storage.cache.default_base_dir'])

View File

@@ -1 +1 @@
__version__ = '0.17.1'
__version__ = '0.17.2'

View File

@@ -24,7 +24,9 @@ agent {
# Force GIT protocol to use SSH regardless of the git url (Assumes GIT user/pass are blank)
force_git_ssh_protocol: false
# Force a specific SSH port when converting http to ssh links (the domain is kept the same)
# force_git_ssh_port: ""
# force_git_ssh_port: 0
# Force a specific SSH username when converting http to ssh links (the default username is 'git')
# force_git_ssh_user: git
# unique name of this worker, if None, created based on hostname:process_id
# Overridden with os environment: CLEARML_WORKER_NAME
@@ -89,6 +91,16 @@ agent {
# target folder for virtual environments builds, created when executing experiment
venvs_dir = ~/.clearml/venvs-builds
# cached virtual environment folder
venvs_cache: {
# maximum number of cached venvs
max_entries: 10
# minimum required free space to allow for cache entry, disable by passing 0 or negative value
free_space_threshold_gb: 2.0
# unmark to enable virtual environment caching
# path: ~/.clearml/venvs-cache
},
# cached git clone folder
vcs_cache: {
enabled: true,
@@ -129,12 +141,15 @@ agent {
default_docker: {
# default docker image to use when running in docker mode
image: "nvidia/cuda:10.1-runtime-ubuntu18.04"
image: "nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04"
# optional arguments to pass to docker image
# arguments: ["--ipc=host"]
}
# set the OS environments based on the Task's Environment section before launching the Task process.
enable_task_env: false
# CUDA versions used for Conda setup & solving PyTorch wheel packages
# it Should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
# cuda_version: 10.1

View File

@@ -24,7 +24,13 @@ def parse_args():
parser.add_argument(
"--base-port", type=int,
help="Used in conjunction with ports-mode, specifies the base port exposed by the services. "
"For pod #X, the port will be <base-port>+X"
"For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num"
"e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003"
)
parser.add_argument(
"--base-pod-num", type=int, default=1,
help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the "
"service (default: %(default)s)"
)
parser.add_argument(
"--gateway-address", type=str, default=None,
@@ -67,9 +73,9 @@ def main():
user_props_cb = k8s_user_props_cb
k8s = K8sIntegration(
ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb,
overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf, template_yaml=args.template_yaml,
extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
ports_mode=args.ports_mode, num_of_services=args.num_of_services, base_pod_num=args.base_pod_num,
user_props_cb=user_props_cb, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf,
template_yaml=args.template_yaml, extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None,
namespace=args.namespace,
)

View File

@@ -2,7 +2,6 @@ attrs>=18.0,<20.4.0
enum34>=0.9,<1.2.0 ; python_version < '3.6'
furl>=2.0.0,<2.2.0
future>=0.16.0,<0.19.0
humanfriendly>=2.1,<9.2
jsonschema>=2.6.0,<3.3.0
pathlib2>=2.3.0,<2.4.0
psutil>=3.4.2,<5.9.0
@@ -11,10 +10,8 @@ pyparsing>=2.0.3,<2.5.0
python-dateutil>=2.4.2,<2.9.0
pyjwt>=1.6.4,<1.8.0
PyYAML>=3.12,<5.4.0
requests-file>=1.4.2,<1.6.0
requests>=2.20.0,<2.26.0
six>=1.11.0,<1.16.0
tqdm>=4.19.5,<4.55.0
typing>=3.6.4,<3.8.0
urllib3>=1.21.1,<1.27.0
virtualenv>=16,<20