mirror of
https://github.com/clearml/clearml-agent
synced 2025-06-26 18:16:15 +00:00
Compare commits
27 Commits
v0.17.2rc0
...
0.17.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d9b9b4984b | ||
|
|
8a46dc6b03 | ||
|
|
205f9dd816 | ||
|
|
9dfa1294e2 | ||
|
|
f019905720 | ||
|
|
9c257858dd | ||
|
|
2006ab20dd | ||
|
|
0caf31719c | ||
|
|
5da7184276 | ||
|
|
50fccdab96 | ||
|
|
77d6ff6630 | ||
|
|
99614702ea | ||
|
|
58cb344ee6 | ||
|
|
22d5892b12 | ||
|
|
f619969efc | ||
|
|
ca242424ab | ||
|
|
407deb84e9 | ||
|
|
14589aa094 | ||
|
|
1260e3d942 | ||
|
|
b22d926d94 | ||
|
|
410cc8c7be | ||
|
|
784c676f5b | ||
|
|
296f7970df | ||
|
|
cd59933c9c | ||
|
|
b95d3f5300 | ||
|
|
fa0d5d8469 | ||
|
|
8229843018 |
@@ -77,6 +77,16 @@
|
||||
# target folder for virtual environments builds, created when executing experiment
|
||||
venvs_dir = ~/.clearml/venvs-builds
|
||||
|
||||
# cached virtual environment folder
|
||||
venvs_cache: {
|
||||
# maximum number of cached venvs
|
||||
max_entries: 10
|
||||
# minimum required free space to allow for cache entry, disable by passing 0 or negative value
|
||||
free_space_threshold_gb: 2.0
|
||||
# unmark to enable virtual environment caching
|
||||
# path: ~/.clearml/venvs-cache
|
||||
},
|
||||
|
||||
# cached git clone folder
|
||||
vcs_cache: {
|
||||
enabled: true,
|
||||
@@ -139,6 +149,9 @@
|
||||
# arguments: ["--ipc=host", ]
|
||||
}
|
||||
|
||||
# set the OS environments based on the Task's Environment section before launching the Task process.
|
||||
enable_task_env: false
|
||||
|
||||
# set the initial bash script to execute at the startup of any docker.
|
||||
# all lines will be executed regardless of their exit code.
|
||||
# {python_single_digit} is translated to 'python3' or 'python2' according to requested python version
|
||||
|
||||
@@ -12,10 +12,10 @@ import sys
|
||||
import shutil
|
||||
import traceback
|
||||
from collections import defaultdict
|
||||
from copy import deepcopy
|
||||
from copy import deepcopy, copy
|
||||
from datetime import datetime
|
||||
from distutils.spawn import find_executable
|
||||
from functools import partial
|
||||
from functools import partial, cmp_to_key
|
||||
from itertools import chain
|
||||
from tempfile import mkdtemp, NamedTemporaryFile
|
||||
from time import sleep, time
|
||||
@@ -26,6 +26,7 @@ import psutil
|
||||
import six
|
||||
from clearml_agent.backend_api.services import queues as queues_api
|
||||
from clearml_agent.backend_api.services import tasks as tasks_api
|
||||
from clearml_agent.backend_api.services import workers as workers_api
|
||||
from pathlib2 import Path
|
||||
from pyhocon import ConfigTree, ConfigFactory
|
||||
from six.moves.urllib.parse import quote
|
||||
@@ -72,7 +73,7 @@ from clearml_agent.helper.os.daemonize import daemonize_process
|
||||
from clearml_agent.helper.package.base import PackageManager
|
||||
from clearml_agent.helper.package.conda_api import CondaAPI
|
||||
from clearml_agent.helper.package.post_req import PostRequirement
|
||||
from clearml_agent.helper.package.external_req import ExternalRequirements
|
||||
from clearml_agent.helper.package.external_req import ExternalRequirements, OnlyExternalRequirements
|
||||
from clearml_agent.helper.package.pip_api.system import SystemPip
|
||||
from clearml_agent.helper.package.pip_api.venv import VirtualenvPip
|
||||
from clearml_agent.helper.package.poetry_api import PoetryConfig, PoetryAPI
|
||||
@@ -92,7 +93,7 @@ from clearml_agent.helper.process import (
|
||||
commit_docker, terminate_process,
|
||||
)
|
||||
from clearml_agent.helper.package.priority_req import PriorityPackageRequirement, PackageCollectorRequirement
|
||||
from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS
|
||||
from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fix_package_import_diff_patch
|
||||
from clearml_agent.helper.resource_monitor import ResourceMonitor
|
||||
from clearml_agent.helper.runtime_verification import check_runtime, print_uptime_properties
|
||||
from clearml_agent.session import Session
|
||||
@@ -411,6 +412,7 @@ class Worker(ServiceCommandSection):
|
||||
self._daemon_foreground = None
|
||||
self._standalone_mode = None
|
||||
self._services_mode = None
|
||||
self._dynamic_gpus = None
|
||||
self._force_current_version = None
|
||||
self._redirected_stdout_file_no = None
|
||||
self._uptime_config = self._session.config.get("agent.uptime", None)
|
||||
@@ -433,16 +435,17 @@ class Worker(ServiceCommandSection):
|
||||
"""
|
||||
if kwargs.get('services_mode'):
|
||||
kwargs['cpu_only'] = True
|
||||
kwargs['docker'] = kwargs.get('docker') or []
|
||||
# kwargs['docker'] = kwargs.get('docker') or []
|
||||
kwargs['gpus'] = None
|
||||
|
||||
return kwargs
|
||||
|
||||
def _get_requirements_manager(self, os_override=None, base_interpreter=None):
|
||||
def _get_requirements_manager(self, os_override=None, base_interpreter=None, requirement_substitutions=None):
|
||||
requirements_manager = RequirementsManager(
|
||||
self._session, base_interpreter=base_interpreter
|
||||
)
|
||||
for requirement_cls in self._requirement_substitutions:
|
||||
requirement_substitutions = requirement_substitutions or self._requirement_substitutions
|
||||
for requirement_cls in requirement_substitutions:
|
||||
if os_override and issubclass(requirement_cls, PytorchRequirement):
|
||||
requirement_cls = partial(requirement_cls, os_override=os_override)
|
||||
requirements_manager.register(requirement_cls)
|
||||
@@ -498,6 +501,9 @@ class Worker(ServiceCommandSection):
|
||||
)
|
||||
|
||||
docker_image = None
|
||||
worker_id = '{}:service:{}'.format(self.worker_id, task_id) \
|
||||
if self._services_mode and not self._dynamic_gpus else self.worker_id
|
||||
|
||||
if self.docker_image_func:
|
||||
try:
|
||||
response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"])
|
||||
@@ -525,7 +531,7 @@ class Worker(ServiceCommandSection):
|
||||
if self._services_mode:
|
||||
# if this is services mode, give the docker a unique worker id, as it will register itself.
|
||||
full_docker_cmd = self.docker_image_func(
|
||||
worker_id='{}:service:{}'.format(self.worker_id, task_id),
|
||||
worker_id=worker_id,
|
||||
docker_image=docker_image, docker_arguments=docker_arguments)
|
||||
else:
|
||||
full_docker_cmd = self.docker_image_func(docker_image=docker_image, docker_arguments=docker_arguments)
|
||||
@@ -551,7 +557,7 @@ class Worker(ServiceCommandSection):
|
||||
print('Running Docker:\n{}\n'.format(str(cmd)))
|
||||
else:
|
||||
cmd = worker_args.get_argv_for_command("execute") + (
|
||||
"--disable-monitoring",
|
||||
'--full-monitoring' if self._services_mode else '--disable-monitoring',
|
||||
"--id",
|
||||
task_id,
|
||||
)
|
||||
@@ -567,7 +573,7 @@ class Worker(ServiceCommandSection):
|
||||
status = ExitStatus.failure
|
||||
try:
|
||||
# set WORKER ID
|
||||
ENV_WORKER_ID.set(self.worker_id)
|
||||
ENV_WORKER_ID.set(worker_id)
|
||||
|
||||
if self._docker_force_pull and docker_image:
|
||||
full_pull_cmd = ['docker', 'pull', docker_image]
|
||||
@@ -592,7 +598,11 @@ class Worker(ServiceCommandSection):
|
||||
errors = temp_stderr_name and Path(temp_stderr_name).read_text()
|
||||
if errors:
|
||||
print("\nEncountered errors:\n\n{}\n".format(errors))
|
||||
if status is None:
|
||||
if status is None and self._services_mode:
|
||||
print(
|
||||
"Service bootstrap completed: task '{}'".format(task_id)
|
||||
)
|
||||
elif status is None:
|
||||
print(
|
||||
"DONE: Running task '{}' (user aborted)".format(task_id)
|
||||
)
|
||||
@@ -612,29 +622,71 @@ class Worker(ServiceCommandSection):
|
||||
safe_remove_file(temp_stdout_name)
|
||||
safe_remove_file(temp_stderr_name)
|
||||
|
||||
def run_tasks_loop(self, queues, worker_params, priority_order=True):
|
||||
def run_tasks_loop(self, queues, worker_params, priority_order=True, gpu_indexes=None, gpu_queues=None):
|
||||
"""
|
||||
:summary: Pull and run tasks from queues.
|
||||
:description: 1. Go through ``queues`` by order.
|
||||
2. Try getting the next task for each and run the first one that returns.
|
||||
3. Go to step 1
|
||||
:param queues: IDs of queues to pull tasks from
|
||||
:type queues: list of ``Text``
|
||||
:param worker_params: Worker command line arguments
|
||||
:type worker_params: ``clearml_agent.helper.process.WorkerParams``
|
||||
:param priority_order: If True pull order in priority manner. always from the first
|
||||
:param list(str) queues: IDs of queues to pull tasks from
|
||||
:param worker_params worker_params: Worker command line arguments
|
||||
:param bool priority_order: If True pull order in priority manner. always from the first
|
||||
If False, pull from each queue once in a round robin manner
|
||||
:type priority_order: bool
|
||||
:param list gpu_indexes: list of gpu_indexes. Needs special backend support
|
||||
:param list gpu_queues: list of pairs (queue_id, num_gpus). Needs special backend support
|
||||
"""
|
||||
|
||||
if not self._daemon_foreground:
|
||||
print('Starting infinite task polling loop...')
|
||||
|
||||
_last_machine_update_ts = 0
|
||||
if self._services_mode:
|
||||
max_num_instances = int(self._services_mode)
|
||||
else:
|
||||
max_num_instances = None
|
||||
|
||||
# store in runtime configuration,
|
||||
if max_num_instances and not self.set_runtime_properties(key='max_num_instances', value=max_num_instances):
|
||||
warning('Maximum number of service instance not supported, removing limit.')
|
||||
max_num_instances = -1
|
||||
|
||||
# get current running instances
|
||||
available_gpus = None
|
||||
if gpu_indexes and gpu_queues:
|
||||
available_gpus, gpu_queues = self._setup_dynamic_gpus(gpu_queues)
|
||||
# multi instance support
|
||||
self._services_mode = True
|
||||
|
||||
while True:
|
||||
queue_tags = None
|
||||
runtime_props = None
|
||||
|
||||
if max_num_instances and max_num_instances > 0:
|
||||
# make sure we do not have too many instances to run
|
||||
if len(Singleton.get_running_pids()) >= max_num_instances:
|
||||
if self._daemon_foreground or worker_params.debug:
|
||||
print("Reached max number of services, sleeping for {:.1f} seconds".format(
|
||||
self._polling_interval))
|
||||
sleep(self._polling_interval)
|
||||
continue
|
||||
|
||||
# update available gpus
|
||||
if gpu_queues:
|
||||
try:
|
||||
response = self._session.send_api(workers_api.GetAllRequest(last_seen=60))
|
||||
except Exception:
|
||||
# if something went wrong start over from the highest priority queue
|
||||
sleep(self._polling_interval)
|
||||
continue
|
||||
available_gpus = self._dynamic_gpu_get_available(gpu_indexes)
|
||||
# if something went wrong or we have no free gpus
|
||||
# start over from the highest priority queue
|
||||
if not available_gpus:
|
||||
if self._daemon_foreground or worker_params.debug:
|
||||
print("All GPUs allocated, sleeping for {:.1f} seconds".format(self._polling_interval))
|
||||
sleep(self._polling_interval)
|
||||
continue
|
||||
|
||||
# iterate over queues (priority style, queues[0] is highest)
|
||||
for queue in queues:
|
||||
|
||||
@@ -644,6 +696,25 @@ class Worker(ServiceCommandSection):
|
||||
if not self.should_be_currently_active(queue_tags[queue], runtime_props):
|
||||
continue
|
||||
|
||||
if gpu_queues:
|
||||
# peek into queue
|
||||
# get next task in queue
|
||||
try:
|
||||
response = self._session.send_api(queues_api.GetByIdRequest(queue=queue))
|
||||
except Exception:
|
||||
# if something went wrong start over from the highest priority queue
|
||||
break
|
||||
if not len(response.queue.entries):
|
||||
continue
|
||||
# check if we have enough available gpus
|
||||
if gpu_queues[queue] > len(available_gpus):
|
||||
# not enough available_gpus, we should sleep and start over
|
||||
if self._daemon_foreground or worker_params.debug:
|
||||
print("Not enough free GPUs {}/{}, sleeping for {:.1f} seconds".format(
|
||||
len(available_gpus), gpu_queues[queue], self._polling_interval))
|
||||
sleep(self._polling_interval)
|
||||
break
|
||||
|
||||
# get next task in queue
|
||||
try:
|
||||
response = self._session.send_api(
|
||||
@@ -674,13 +745,33 @@ class Worker(ServiceCommandSection):
|
||||
except:
|
||||
pass
|
||||
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
|
||||
|
||||
org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES')
|
||||
worker_id = self.worker_id
|
||||
if gpu_queues and gpu_queues.get(queue):
|
||||
# pick the first available GPUs
|
||||
gpus = available_gpus[:gpu_queues.get(queue)]
|
||||
available_gpus = available_gpus[gpu_queues.get(queue):]
|
||||
self.set_runtime_properties(
|
||||
key='available_gpus', value=','.join(str(g) for g in available_gpus))
|
||||
os.environ['CUDA_VISIBLE_DEVICES'] = \
|
||||
os.environ['NVIDIA_VISIBLE_DEVICES'] = ','.join(str(g) for g in gpus)
|
||||
self.worker_id = ':'.join(self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)])
|
||||
|
||||
self.send_logs(
|
||||
task_id=task_id,
|
||||
lines=["task {} pulled from {} by worker {}\n".format(task_id, queue, self.worker_id)],
|
||||
level="INFO",
|
||||
)
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
|
||||
|
||||
self.run_one_task(queue, task_id, worker_params)
|
||||
|
||||
if gpu_queues:
|
||||
self.worker_id = worker_id
|
||||
os.environ['CUDA_VISIBLE_DEVICES'] = \
|
||||
os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus
|
||||
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
|
||||
|
||||
queue_tags = None
|
||||
@@ -699,6 +790,39 @@ class Worker(ServiceCommandSection):
|
||||
if self._session.config["agent.reload_config"]:
|
||||
self.reload_config()
|
||||
|
||||
def _dynamic_gpu_get_available(self, gpu_indexes):
|
||||
try:
|
||||
response = self._session.send_api(workers_api.GetAllRequest(last_seen=60))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
worker_name = self._session.config["agent.worker_name"] + ':'
|
||||
our_workers = [
|
||||
w.id for w in response.workers
|
||||
if w.id.startswith(worker_name) and w.id != self.worker_id]
|
||||
gpus = []
|
||||
for w in our_workers:
|
||||
gpus += [int(g) for g in w.split(':')[-1].lower().replace('gpu', '').split(',')]
|
||||
available_gpus = list(set(gpu_indexes) - set(gpus))
|
||||
|
||||
return available_gpus
|
||||
|
||||
def _setup_dynamic_gpus(self, gpu_queues):
|
||||
available_gpus = self.get_runtime_properties()
|
||||
if available_gpus is None:
|
||||
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
|
||||
available_gpus = [prop["value"] for prop in available_gpus if prop["key"] == 'available_gpus']
|
||||
if available_gpus:
|
||||
available_gpus = [int(g) for g in available_gpus[-1].split(',')]
|
||||
if not isinstance(gpu_queues, dict):
|
||||
gpu_queues = dict(gpu_queues)
|
||||
|
||||
if not self.set_runtime_properties(
|
||||
key='available_gpus', value=','.join(str(g) for g in available_gpus)):
|
||||
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
|
||||
|
||||
return available_gpus, gpu_queues
|
||||
|
||||
def get_worker_properties(self, queue_ids):
|
||||
queue_tags = {
|
||||
q.id: {'name': q.name, 'tags': q.tags}
|
||||
@@ -714,11 +838,11 @@ class Worker(ServiceCommandSection):
|
||||
# either not supported or never tested
|
||||
if self._runtime_props_support == self._session.api_version:
|
||||
# tested against latest api_version, not supported
|
||||
return []
|
||||
return None
|
||||
if not self._session.check_min_api_version(UptimeConf.min_api_version):
|
||||
# not supported due to insufficient api_version
|
||||
self._runtime_props_support = self._session.api_version
|
||||
return []
|
||||
return None
|
||||
try:
|
||||
res = self.get("get_runtime_properties", worker=self.worker_id)["runtime_properties"]
|
||||
# definitely supported
|
||||
@@ -726,12 +850,38 @@ class Worker(ServiceCommandSection):
|
||||
return res
|
||||
except APIError:
|
||||
self._runtime_props_support = self._session.api_version
|
||||
return []
|
||||
return None
|
||||
|
||||
def set_runtime_properties(self, key, value):
|
||||
if self._runtime_props_support is not True:
|
||||
# either not supported or never tested
|
||||
if self._runtime_props_support == self._session.api_version:
|
||||
# tested against latest api_version, not supported
|
||||
return False
|
||||
if not self._session.check_min_api_version(UptimeConf.min_api_version):
|
||||
# not supported due to insufficient api_version
|
||||
self._runtime_props_support = self._session.api_version
|
||||
return False
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.post("set_runtime_properties", worker=self.worker_id, json={key: value})
|
||||
# definitely supported
|
||||
self._runtime_props_support = True
|
||||
return True
|
||||
except APIError:
|
||||
self._runtime_props_support = self._session.api_version
|
||||
except Exception as ex:
|
||||
# not sure what happened
|
||||
pass
|
||||
|
||||
return False
|
||||
|
||||
def should_be_currently_active(self, current_queue, runtime_properties):
|
||||
"""
|
||||
Checks if a worker is active according to queue tags, worker's runtime properties and uptime schedule.
|
||||
"""
|
||||
runtime_properties = runtime_properties or []
|
||||
if UptimeConf.queue_tag_off in current_queue['tags']:
|
||||
self.log.debug("Queue {} is tagged '{}', worker will not pull tasks".format(
|
||||
current_queue['name'], UptimeConf.queue_tag_off)
|
||||
@@ -813,16 +963,12 @@ class Worker(ServiceCommandSection):
|
||||
self._session.print_configuration()
|
||||
|
||||
def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs):
|
||||
# if we do not need to create queues, make sure they are valid
|
||||
# match previous behaviour when we validated queue names before everything else
|
||||
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
|
||||
|
||||
self._standalone_mode = kwargs.get('standalone_mode', False)
|
||||
self._services_mode = kwargs.get('services_mode', False)
|
||||
# must have docker in services_mode
|
||||
if self._services_mode:
|
||||
kwargs = self._verify_command_states(kwargs)
|
||||
docker = docker or kwargs.get('docker')
|
||||
self._uptime_config = kwargs.get('uptime', None) or self._uptime_config
|
||||
self._downtime_config = kwargs.get('downtime', None) or self._downtime_config
|
||||
if self._uptime_config and self._downtime_config:
|
||||
@@ -832,6 +978,16 @@ class Worker(ServiceCommandSection):
|
||||
self._uptime_config = None
|
||||
self._downtime_config = None
|
||||
|
||||
# support --dynamic-gpus
|
||||
dynamic_gpus, gpu_indexes, queues = self._parse_dynamic_gpus(kwargs, queues)
|
||||
|
||||
if self._services_mode and dynamic_gpus:
|
||||
raise ValueError("Combining --dynamic-gpus and --services-mode is not supported")
|
||||
|
||||
# if we do not need to create queues, make sure they are valid
|
||||
# match previous behaviour when we validated queue names before everything else
|
||||
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
|
||||
|
||||
# We are not running a daemon we are killing one.
|
||||
# find the pid send termination signal and leave
|
||||
if kwargs.get('stop', False):
|
||||
@@ -854,7 +1010,7 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
# make sure we only have a single instance,
|
||||
# also make sure we set worker_id properly and cache folders
|
||||
self._singleton()
|
||||
self._singleton(dynamic=bool(dynamic_gpus))
|
||||
|
||||
# check if we have the latest version
|
||||
start_check_update_daemon()
|
||||
@@ -915,6 +1071,11 @@ class Worker(ServiceCommandSection):
|
||||
if not self._session.debug_mode:
|
||||
self._temp_cleanup_list.append(name)
|
||||
|
||||
# on widows we do nothing
|
||||
if detached and is_windows_platform():
|
||||
print('Detached not supported on Windows, ignoring --detached')
|
||||
detached = False
|
||||
|
||||
if not detached:
|
||||
# redirect std out/err to new file
|
||||
sys.stdout = sys.stderr = out_file
|
||||
@@ -943,6 +1104,8 @@ class Worker(ServiceCommandSection):
|
||||
trace=self._session.trace,
|
||||
),
|
||||
priority_order=not order_fairness,
|
||||
gpu_indexes=gpu_indexes,
|
||||
gpu_queues=dynamic_gpus,
|
||||
)
|
||||
except Exception:
|
||||
tb = six.text_type(traceback.format_exc())
|
||||
@@ -963,6 +1126,54 @@ class Worker(ServiceCommandSection):
|
||||
self._unregister(queues)
|
||||
safe_remove_file(self.temp_config_path)
|
||||
|
||||
def _parse_dynamic_gpus(self, kwargs, queues):
|
||||
dynamic_gpus = kwargs.get('dynamic_gpus', None)
|
||||
if not dynamic_gpus:
|
||||
return None, None, queues
|
||||
|
||||
queue_names = [q.name for q in queues]
|
||||
if not all('=' in q for q in queue_names):
|
||||
raise ValueError("using --dynamic-gpus, --queues [{}], "
|
||||
"queue must be in format <queue_name>=<num_gpus>".format(queue_names))
|
||||
|
||||
gpu_indexes = kwargs.get('gpus')
|
||||
|
||||
# test gpus were passed correctly
|
||||
if not gpu_indexes or len(gpu_indexes.split('-')) > 2 or (',' in gpu_indexes and '-' in gpu_indexes):
|
||||
raise ValueError('--gpus must be provided, in one of two ways: '
|
||||
'comma separated \'0,1,2,3\' or range \'0-3\'')
|
||||
try:
|
||||
if '-' in gpu_indexes:
|
||||
gpu_indexes = list(range(int(gpu_indexes.split('-')[0]), 1 + int(gpu_indexes.split('-')[1])))
|
||||
else:
|
||||
gpu_indexes = [int(g) for g in gpu_indexes.split(',')]
|
||||
except Exception:
|
||||
raise ValueError('Failed parsing --gpus {}'.format(kwargs.get('gpus')))
|
||||
|
||||
dynamic_gpus = [(s[:-1 - len(s.split('=')[-1])], int(s.split('=')[-1])) for s in queue_names]
|
||||
queue_names = [q for q, _ in dynamic_gpus]
|
||||
# resolve queue ids
|
||||
dynamic_gpus_q = self._resolve_queue_names(
|
||||
queue_names, create_if_missing=kwargs.get('create_queue', False))
|
||||
dynamic_gpus = list(zip(dynamic_gpus_q, [i for _, i in dynamic_gpus]))
|
||||
dynamic_gpus = sorted(
|
||||
dynamic_gpus, reverse=True, key=cmp_to_key(
|
||||
lambda x, y: -1 if x[1] < y[1] or x[1] == y[1] and
|
||||
dynamic_gpus_q.index(x[0]) > dynamic_gpus_q.index(y[0])
|
||||
else +1))
|
||||
# order queue priority based on the combination we have
|
||||
queues = [q for q, _ in dynamic_gpus]
|
||||
|
||||
# test server support
|
||||
available_gpus = self._dynamic_gpu_get_available(gpu_indexes)
|
||||
if not self.set_runtime_properties(
|
||||
key='available_gpus', value=','.join(str(g) for g in available_gpus)):
|
||||
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
|
||||
|
||||
self._dynamic_gpus = True
|
||||
|
||||
return dynamic_gpus, gpu_indexes, queues
|
||||
|
||||
def report_monitor(self, report):
|
||||
if not self.monitor:
|
||||
self.new_monitor(report=report)
|
||||
@@ -1246,10 +1457,11 @@ class Worker(ServiceCommandSection):
|
||||
except:
|
||||
python_version = None
|
||||
|
||||
venv_folder, requirements_manager = self.install_virtualenv(
|
||||
venv_dir=target, requested_python_version=python_version, execution_info=execution)
|
||||
venv_folder, requirements_manager, is_cached = self.install_virtualenv(
|
||||
venv_dir=target, requested_python_version=python_version, execution_info=execution,
|
||||
cached_requirements=requirements)
|
||||
|
||||
if self._default_pip:
|
||||
if not is_cached and self._default_pip:
|
||||
if install_globally and self.global_package_api:
|
||||
self.global_package_api.install_packages(*self._default_pip)
|
||||
else:
|
||||
@@ -1257,15 +1469,30 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
directory, vcs, repo_info = self.get_repo_info(execution, current_task, venv_folder.as_posix())
|
||||
|
||||
self.install_requirements(
|
||||
execution,
|
||||
repo_info,
|
||||
requirements_manager=requirements_manager,
|
||||
cached_requirements=requirements,
|
||||
cwd=vcs.location if vcs and vcs.location else directory,
|
||||
package_api=self.global_package_api if install_globally else None,
|
||||
)
|
||||
freeze = self.freeze_task_environment(requirements_manager=requirements_manager)
|
||||
if is_cached:
|
||||
# reinstalling git / local packages
|
||||
package_api = copy(self.package_api)
|
||||
package_api.requirements_manager = self._get_requirements_manager(
|
||||
base_interpreter=package_api.requirements_manager.get_interpreter(),
|
||||
requirement_substitutions=[OnlyExternalRequirements]
|
||||
)
|
||||
# make sure we run the handlers
|
||||
cached_requirements = \
|
||||
{k: package_api.requirements_manager.replace(requirements[k] or '')
|
||||
for k in requirements}
|
||||
package_api.load_requirements(cached_requirements)
|
||||
else:
|
||||
self.install_requirements(
|
||||
execution,
|
||||
repo_info,
|
||||
requirements_manager=requirements_manager,
|
||||
cached_requirements=requirements,
|
||||
cwd=vcs.location if vcs and vcs.location else directory,
|
||||
package_api=self.global_package_api if install_globally else None,
|
||||
)
|
||||
|
||||
freeze = self.freeze_task_environment(
|
||||
task_id=task_id, requirements_manager=requirements_manager, update_requirements=False)
|
||||
script_dir = directory
|
||||
|
||||
# Summary
|
||||
@@ -1431,6 +1658,17 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
# We expect the same behaviour in case full_monitoring was set, and in case docker mode is used
|
||||
if full_monitoring or docker is not False:
|
||||
if full_monitoring:
|
||||
if not (ENV_WORKER_ID.get() or '').strip():
|
||||
self._session.config["agent"]["worker_id"] = ''
|
||||
# make sure we support multiple instances if we need to
|
||||
self._singleton()
|
||||
self.temp_config_path = self.temp_config_path or safe_mkstemp(
|
||||
suffix=".cfg", prefix=".clearml_agent.", text=True, name_only=True
|
||||
)
|
||||
self.dump_config(self.temp_config_path)
|
||||
self._session._config_file = self.temp_config_path
|
||||
|
||||
worker_params = WorkerParams(
|
||||
log_level=log_level,
|
||||
config_file=self._session.config_file,
|
||||
@@ -1442,6 +1680,10 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
self.stop_monitor()
|
||||
self._unregister()
|
||||
|
||||
if full_monitoring and self.temp_config_path:
|
||||
safe_remove_file(self._session.config_file)
|
||||
Singleton.close_pid_file()
|
||||
return
|
||||
|
||||
self._session.print_configuration()
|
||||
@@ -1477,10 +1719,11 @@ class Worker(ServiceCommandSection):
|
||||
except:
|
||||
python_ver = None
|
||||
|
||||
venv_folder, requirements_manager = self.install_virtualenv(
|
||||
standalone_mode=standalone_mode, requested_python_version=python_ver, execution_info=execution)
|
||||
venv_folder, requirements_manager, is_cached = self.install_virtualenv(
|
||||
standalone_mode=standalone_mode, requested_python_version=python_ver,
|
||||
execution_info=execution, cached_requirements=requirements)
|
||||
|
||||
if not standalone_mode:
|
||||
if not is_cached and not standalone_mode:
|
||||
if self._default_pip:
|
||||
self.package_api.install_packages(*self._default_pip)
|
||||
|
||||
@@ -1492,7 +1735,23 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
print("\n")
|
||||
|
||||
if not standalone_mode:
|
||||
if is_cached and not standalone_mode:
|
||||
# reinstalling git / local packages
|
||||
package_api = copy(self.package_api)
|
||||
package_api.requirements_manager = self._get_requirements_manager(
|
||||
base_interpreter=package_api.requirements_manager.get_interpreter(),
|
||||
requirement_substitutions=[OnlyExternalRequirements]
|
||||
)
|
||||
package_api.cwd = vcs.location if vcs and vcs.location else directory
|
||||
# make sure we run the handlers
|
||||
cached_requirements = \
|
||||
{k: package_api.requirements_manager.replace(requirements[k] or '')
|
||||
for k in requirements}
|
||||
if str(cached_requirements.get('pip', '')).strip() \
|
||||
or str(cached_requirements.get('conda', '')).strip():
|
||||
package_api.load_requirements(cached_requirements)
|
||||
|
||||
elif not is_cached and not standalone_mode:
|
||||
self.install_requirements(
|
||||
execution,
|
||||
repo_info,
|
||||
@@ -1506,8 +1765,13 @@ class Worker(ServiceCommandSection):
|
||||
skip_freeze_update = self.is_conda and not self._session.config.get(
|
||||
"agent.package_manager.conda_full_env_update", False)
|
||||
|
||||
freeze = self.freeze_task_environment(current_task.id if not skip_freeze_update else None,
|
||||
requirements_manager=requirements_manager)
|
||||
freeze = self.freeze_task_environment(
|
||||
task_id=current_task.id,
|
||||
requirements_manager=requirements_manager,
|
||||
add_venv_folder_cache=venv_folder,
|
||||
execution_info=execution,
|
||||
update_requirements=not skip_freeze_update,
|
||||
)
|
||||
script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix()
|
||||
|
||||
# run code
|
||||
@@ -1559,6 +1823,12 @@ class Worker(ServiceCommandSection):
|
||||
if repo_info:
|
||||
self._update_commit_id(current_task.id, execution, repo_info)
|
||||
|
||||
# get Task Environments and update the process
|
||||
if self._session.config.get('agent.enable_task_env', None):
|
||||
hyper_params = self._get_task_os_env(current_task)
|
||||
if hyper_params:
|
||||
os.environ.update(hyper_params)
|
||||
|
||||
# Add the script CWD to the python path
|
||||
python_path = get_python_path(script_dir, execution.entry_point, self.package_api, is_conda_env=self.is_conda)
|
||||
if ENV_TASK_EXTRA_PYTHON_PATH.get():
|
||||
@@ -1636,6 +1906,20 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
return 1 if exit_code is None else exit_code
|
||||
|
||||
def _get_task_os_env(self, current_task):
|
||||
if not self._session.check_min_api_version('2.9'):
|
||||
return None
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
hyper_params = self._session.get(
|
||||
service="tasks", action="get_hyper_params", tasks=[current_task.id])
|
||||
hyper_params = {
|
||||
str(p['name']): str(p['value'])
|
||||
for p in hyper_params['params'][0]['hyperparams'] if p['section'] == 'Environment'}
|
||||
return hyper_params
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def set_docker_variables(self, docker):
|
||||
temp_config, docker_image_func = self.get_docker_config_cmd(docker)
|
||||
self.dump_config(self.temp_config_path, config=temp_config)
|
||||
@@ -1676,6 +1960,7 @@ class Worker(ServiceCommandSection):
|
||||
repo_info = None
|
||||
directory = None
|
||||
vcs = None
|
||||
script_file = None
|
||||
if has_repository:
|
||||
vcs, repo_info = self._get_repo_info(execution, task, venv_folder)
|
||||
directory = Path(repo_info.root, execution.working_dir or ".")
|
||||
@@ -1686,17 +1971,26 @@ class Worker(ServiceCommandSection):
|
||||
self.apply_diff(
|
||||
task=task, vcs=vcs, execution_info=execution, repo_info=repo_info
|
||||
)
|
||||
script_file = Path(directory) / execution.entry_point
|
||||
|
||||
if is_literal_script:
|
||||
self.log.info("found literal script in `script.diff`")
|
||||
directory, script = literal_script.create_notebook_file(
|
||||
task, execution, repo_info
|
||||
)
|
||||
execution.entry_point = script
|
||||
if not has_repository:
|
||||
return directory, None, None
|
||||
script_file = Path(execution.entry_point)
|
||||
else:
|
||||
# in case of no literal script, there is not difference between empty working dir and `.`
|
||||
execution.working_dir = execution.working_dir or "."
|
||||
|
||||
# fix our import patch (in case we have __future__)
|
||||
if script_file and script_file.is_file():
|
||||
fix_package_import_diff_patch(script_file.as_posix())
|
||||
|
||||
if is_literal_script and not has_repository:
|
||||
return directory, None, None
|
||||
|
||||
if not directory:
|
||||
assert False, "unreachable code"
|
||||
return directory, vcs, repo_info
|
||||
@@ -1801,7 +2095,8 @@ class Worker(ServiceCommandSection):
|
||||
status_message=self._task_status_change_message,
|
||||
)
|
||||
|
||||
def freeze_task_environment(self, task_id=None, requirements_manager=None):
|
||||
def freeze_task_environment(self, task_id=None, requirements_manager=None,
|
||||
add_venv_folder_cache=None, execution_info=None, update_requirements=False):
|
||||
try:
|
||||
freeze = self.package_api.freeze()
|
||||
except Exception as e:
|
||||
@@ -1816,13 +2111,36 @@ class Worker(ServiceCommandSection):
|
||||
return freeze
|
||||
|
||||
# get original requirements and update with the new frozen requirements
|
||||
previous_reqs = {}
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
current_task = get_task(self._session, task_id, only_fields=["script.requirements"])
|
||||
requirements = current_task.script.requirements
|
||||
previous_reqs = dict(**requirements)
|
||||
# replace only once.
|
||||
if requirements.get('pip') and not requirements.get('org_pip'):
|
||||
requirements['org_pip'] = requirements.pop('pip')
|
||||
if requirements.get('conda') and not requirements.get('org_conda'):
|
||||
requirements['org_conda'] = requirements.pop('conda')
|
||||
requirements.update(freeze)
|
||||
except Exception:
|
||||
requirements = freeze
|
||||
|
||||
# add to cache
|
||||
print('Adding venv into cache: {}'.format(add_venv_folder_cache))
|
||||
if add_venv_folder_cache:
|
||||
self.package_api.add_cached_venv(
|
||||
requirements=[freeze, previous_reqs],
|
||||
docker_cmd=execution_info.docker_cmd if execution_info else None,
|
||||
python_version=getattr(self.package_api, 'python', ''),
|
||||
cuda_version=self._session.config.get("agent.cuda_version"),
|
||||
source_folder=add_venv_folder_cache,
|
||||
exclude_sub_folders=['task_repository', 'code'])
|
||||
|
||||
# If do not update back requirements
|
||||
if not update_requirements:
|
||||
return freeze
|
||||
|
||||
request = tasks_api.SetRequirementsRequest(task=task_id, requirements=requirements)
|
||||
try:
|
||||
self._session.send_api(request)
|
||||
@@ -2059,15 +2377,21 @@ class Worker(ServiceCommandSection):
|
||||
)
|
||||
|
||||
def install_virtualenv(
|
||||
self, venv_dir=None, requested_python_version=None, standalone_mode=False, execution_info=None):
|
||||
# type: (str, str, bool, ExecutionInfo) -> Tuple[Path, RequirementsManager]
|
||||
self, venv_dir=None, requested_python_version=None, standalone_mode=False,
|
||||
execution_info=None, cached_requirements=None):
|
||||
# type: (str, str, bool, ExecutionInfo, dict) -> Tuple[Path, RequirementsManager, bool]
|
||||
"""
|
||||
Install a new python virtual environment, removing the old one if exists
|
||||
:return: virtualenv directory and requirements manager to use with task
|
||||
:return: virtualenv directory, requirements manager to use with task, True if there is a cached venv entry
|
||||
"""
|
||||
requested_python_version = requested_python_version or \
|
||||
Text(self._session.config.get("agent.python_binary", None)) or \
|
||||
Text(self._session.config.get("agent.default_python", None))
|
||||
if self._session.config.get("agent.ignore_requested_python_version", None):
|
||||
requested_python_version = ''
|
||||
|
||||
requested_python_version = \
|
||||
requested_python_version or \
|
||||
Text(self._session.config.get("agent.python_binary", None)) or \
|
||||
Text(self._session.config.get("agent.default_python", None))
|
||||
|
||||
if self.is_conda:
|
||||
executable_version_suffix = \
|
||||
requested_python_version[max(requested_python_version.find('python'), 0):].replace('python', '')
|
||||
@@ -2102,6 +2426,7 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
if not standalone_mode:
|
||||
rm_tree(normalize_path(venv_dir, WORKING_REPOSITORY_DIR))
|
||||
|
||||
package_manager_params = dict(
|
||||
session=self._session,
|
||||
python=executable_version_suffix if self.is_conda else executable_name,
|
||||
@@ -2115,6 +2440,8 @@ class Worker(ServiceCommandSection):
|
||||
session=self._session,
|
||||
)
|
||||
|
||||
call_package_manager_create = False
|
||||
|
||||
if not self.is_conda and standalone_mode:
|
||||
# pip with standalone mode
|
||||
get_pip = partial(VirtualenvPip, **package_manager_params)
|
||||
@@ -2130,7 +2457,7 @@ class Worker(ServiceCommandSection):
|
||||
self.package_api = VirtualenvPip(**package_manager_params)
|
||||
if first_time:
|
||||
self.package_api.remove()
|
||||
self.package_api.create()
|
||||
call_package_manager_create = True
|
||||
self.global_package_api = SystemPip(**global_package_manager_params)
|
||||
elif standalone_mode:
|
||||
# conda with standalone mode
|
||||
@@ -2142,6 +2469,7 @@ class Worker(ServiceCommandSection):
|
||||
self.package_api = get_conda()
|
||||
# no support for reusing Conda environments
|
||||
self.package_api.remove()
|
||||
call_package_manager_create = True
|
||||
|
||||
if venv_dir.exists():
|
||||
timestamp = datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S")
|
||||
@@ -2156,9 +2484,22 @@ class Worker(ServiceCommandSection):
|
||||
venv_dir = new_venv_folder
|
||||
self.package_api = get_conda(path=venv_dir)
|
||||
|
||||
# check if we have a cached folder
|
||||
if cached_requirements and self.package_api.get_cached_venv(
|
||||
requirements=cached_requirements,
|
||||
docker_cmd=execution_info.docker_cmd if execution_info else None,
|
||||
python_version=package_manager_params['python'],
|
||||
cuda_version=self._session.config.get("agent.cuda_version"),
|
||||
destination_folder=Path(venv_dir)
|
||||
):
|
||||
print('::: Using Cached environment {} :::'.format(self.package_api.get_last_used_entry_cache()))
|
||||
return venv_dir, requirements_manager, True
|
||||
|
||||
# create the initial venv
|
||||
if call_package_manager_create:
|
||||
self.package_api.create()
|
||||
|
||||
return venv_dir, requirements_manager
|
||||
return venv_dir, requirements_manager, False
|
||||
|
||||
def parse_requirements(self, reqs_file=None, overrides=None):
|
||||
os = None
|
||||
@@ -2216,6 +2557,9 @@ class Worker(ServiceCommandSection):
|
||||
temp_config.put("agent.git_pass", (ENV_AGENT_GIT_PASS.get() or
|
||||
self._session.config.get("agent.git_pass", None)))
|
||||
|
||||
if temp_config.get("agent.venvs_cache.path", None):
|
||||
temp_config.put("agent.venvs_cache.path", '/root/.clearml/venvs-cache')
|
||||
|
||||
self._host_ssh_cache = mkdtemp(prefix='clearml_agent.ssh.')
|
||||
self._temp_cleanup_list.append(self._host_ssh_cache)
|
||||
|
||||
@@ -2228,6 +2572,9 @@ class Worker(ServiceCommandSection):
|
||||
self._session.config["agent.pip_download_cache.path"])).expanduser().as_posix()
|
||||
host_vcs_cache = Path(os.path.expandvars(
|
||||
self._session.config["agent.vcs_cache.path"])).expanduser().as_posix()
|
||||
host_venvs_cache = Path(os.path.expandvars(
|
||||
self._session.config["agent.venvs_cache.path"])).expanduser().as_posix() \
|
||||
if self._session.config.get("agent.venvs_cache.path", None) else None
|
||||
host_ssh_cache = self._host_ssh_cache
|
||||
|
||||
host_apt_cache = Path(os.path.expandvars(self._session.config.get(
|
||||
@@ -2242,6 +2589,8 @@ class Worker(ServiceCommandSection):
|
||||
Path(host_pip_dl).mkdir(parents=True, exist_ok=True)
|
||||
Path(host_vcs_cache).mkdir(parents=True, exist_ok=True)
|
||||
Path(host_ssh_cache).mkdir(parents=True, exist_ok=True)
|
||||
if host_venvs_cache:
|
||||
Path(host_venvs_cache).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# copy the .ssh folder to a temp folder, to be mapped into docker
|
||||
# noinspection PyBroadException
|
||||
@@ -2278,6 +2627,7 @@ class Worker(ServiceCommandSection):
|
||||
mounted_cache_dir = temp_config.get("sdk.storage.cache.default_base_dir")
|
||||
mounted_pip_dl_dir = temp_config.get("agent.pip_download_cache.path")
|
||||
mounted_vcs_cache = temp_config.get("agent.vcs_cache.path")
|
||||
mounted_venvs_cache = temp_config.get("agent.venvs_cache.path", "")
|
||||
|
||||
# Make sure we have created the configuration file for the executor
|
||||
if not self.dump_config(self.temp_config_path, config=temp_config):
|
||||
@@ -2297,6 +2647,7 @@ class Worker(ServiceCommandSection):
|
||||
host_cache=host_cache, mounted_cache=mounted_cache_dir,
|
||||
host_pip_dl=host_pip_dl, mounted_pip_dl=mounted_pip_dl_dir,
|
||||
host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache,
|
||||
host_venvs_cache=host_venvs_cache, mounted_venvs_cache=mounted_venvs_cache,
|
||||
standalone_mode=self._standalone_mode,
|
||||
force_current_version=self._force_current_version,
|
||||
bash_script=bash_script,
|
||||
@@ -2315,6 +2666,7 @@ class Worker(ServiceCommandSection):
|
||||
host_cache, mounted_cache,
|
||||
host_pip_dl, mounted_pip_dl,
|
||||
host_vcs_cache, mounted_vcs_cache,
|
||||
host_venvs_cache, mounted_venvs_cache,
|
||||
standalone_mode=False, extra_docker_arguments=None, extra_shell_script=None,
|
||||
force_current_version=None, host_git_credentials=None,
|
||||
bash_script=None, preprocess_bash_script=None):
|
||||
@@ -2370,14 +2722,16 @@ class Worker(ServiceCommandSection):
|
||||
# expect CLEARML_AGENT_K8S_HOST_MOUNT = '/mnt/host/data:/root/.clearml'
|
||||
k8s_node_mnt, _, k8s_pod_mnt = ENV_DOCKER_HOST_MOUNT.get().partition(':')
|
||||
# search and replace all the host folders with the k8s
|
||||
host_mounts = [host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache]
|
||||
host_mounts = [host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache, host_venvs_cache]
|
||||
for i, m in enumerate(host_mounts):
|
||||
if not m:
|
||||
continue
|
||||
if k8s_pod_mnt not in m:
|
||||
print('Warning: K8S mount missing, ignoring cached folder {}'.format(m))
|
||||
host_mounts[i] = None
|
||||
else:
|
||||
host_mounts[i] = m.replace(k8s_pod_mnt, k8s_node_mnt, 1)
|
||||
host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache = host_mounts
|
||||
host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache, host_venvs_cache = host_mounts
|
||||
|
||||
# copy the configuration file into the mounted folder
|
||||
new_conf_file = os.path.join(k8s_pod_mnt, '.clearml_agent.{}.cfg'.format(quote(worker_id, safe="")))
|
||||
@@ -2467,6 +2821,7 @@ class Worker(ServiceCommandSection):
|
||||
(['-v', host_pip_dl+':'+mounted_pip_dl] if host_pip_dl else []) +
|
||||
(['-v', host_cache+':'+mounted_cache] if host_cache else []) +
|
||||
(['-v', host_vcs_cache+':'+mounted_vcs_cache] if host_vcs_cache else []) +
|
||||
(['-v', host_venvs_cache + ':' + mounted_venvs_cache] if host_venvs_cache else []) +
|
||||
['--rm', docker_image, 'bash', '-c',
|
||||
update_scheme +
|
||||
extra_shell_script +
|
||||
@@ -2576,9 +2931,9 @@ class Worker(ServiceCommandSection):
|
||||
worker_name, worker_id))
|
||||
return False
|
||||
|
||||
def _singleton(self):
|
||||
def _singleton(self, dynamic=False):
|
||||
# ensure singleton
|
||||
worker_id, worker_name = self._generate_worker_id_name()
|
||||
worker_id, worker_name = self._generate_worker_id_name(dynamic=dynamic)
|
||||
|
||||
# if we are running in services mode, we allow double register since
|
||||
# docker-compose will kill instances before they cleanup
|
||||
@@ -2593,13 +2948,14 @@ class Worker(ServiceCommandSection):
|
||||
# update folders based on free slot
|
||||
self._session.create_cache_folders(slot_index=worker_slot)
|
||||
|
||||
def _generate_worker_id_name(self):
|
||||
def _generate_worker_id_name(self, dynamic=False):
|
||||
worker_id = self._session.config["agent.worker_id"]
|
||||
worker_name = self._session.config["agent.worker_name"]
|
||||
if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None:
|
||||
nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES')
|
||||
if nvidia_visible_devices and nvidia_visible_devices.lower() != 'none':
|
||||
worker_id = '{}:gpu{}'.format(worker_name, nvidia_visible_devices)
|
||||
worker_id = '{}:{}gpu{}'.format(
|
||||
worker_name, 'd' if dynamic else '', nvidia_visible_devices)
|
||||
elif nvidia_visible_devices == '':
|
||||
pass
|
||||
else:
|
||||
@@ -2613,15 +2969,15 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
queues = return_list(queues)
|
||||
if not create_if_missing:
|
||||
return [self._resolve_name(q.name, "queues") for q in queues]
|
||||
return [self._resolve_name(q if isinstance(q, str) else q.name, "queues") for q in queues]
|
||||
|
||||
queue_ids = []
|
||||
for q in queues:
|
||||
try:
|
||||
q_id = self._resolve_name(q.name, "queues")
|
||||
q_id = self._resolve_name(q if isinstance(q, str) else q.name, "queues")
|
||||
except:
|
||||
self._session.send_api(queues_api.CreateRequest(name=q.name))
|
||||
q_id = self._resolve_name(q.name, "queues")
|
||||
self._session.send_api(queues_api.CreateRequest(name=q if isinstance(q, str) else q.name))
|
||||
q_id = self._resolve_name(q if isinstance(q, str) else q.name, "queues")
|
||||
queue_ids.append(q_id)
|
||||
return queue_ids
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ def parse(reqstr):
|
||||
filename = getattr(reqstr, 'name', None)
|
||||
try:
|
||||
# Python 2.x compatibility
|
||||
if not isinstance(reqstr, basestring):
|
||||
if not isinstance(reqstr, basestring): # noqa
|
||||
reqstr = reqstr.read()
|
||||
except NameError:
|
||||
# Python 3.x only
|
||||
|
||||
@@ -36,8 +36,7 @@ class K8sIntegration(Worker):
|
||||
|
||||
KUBECTL_RUN_CMD = "kubectl run clearml-{queue_name}-id-{task_id} " \
|
||||
"--image {docker_image} " \
|
||||
"--restart=Never --replicas=1 " \
|
||||
"--generator=run-pod/v1 " \
|
||||
"--restart=Never " \
|
||||
"--namespace={namespace}"
|
||||
|
||||
KUBECTL_DELETE_CMD = "kubectl delete pods " \
|
||||
@@ -88,6 +87,7 @@ class K8sIntegration(Worker):
|
||||
debug=False,
|
||||
ports_mode=False,
|
||||
num_of_services=20,
|
||||
base_pod_num=1,
|
||||
user_props_cb=None,
|
||||
overrides_yaml=None,
|
||||
template_yaml=None,
|
||||
@@ -111,6 +111,7 @@ class K8sIntegration(Worker):
|
||||
Requires the `num_of_services` parameter.
|
||||
:param int num_of_services: Number of k8s services configured in the cluster. Required if `port_mode` is True.
|
||||
(default: 20)
|
||||
:param int base_pod_num: Used when `ports_mode` is True, sets the base pod number to a given value (default: 1)
|
||||
:param callable user_props_cb: An Optional callable allowing additional user properties to be specified
|
||||
when scheduling a task to run in a pod. Callable can receive an optional pod number and should return
|
||||
a dictionary of user properties (name and value). Signature is [[Optional[int]], Dict[str,str]]
|
||||
@@ -133,6 +134,7 @@ class K8sIntegration(Worker):
|
||||
self.log.logger.setLevel(logging.INFO)
|
||||
self.ports_mode = ports_mode
|
||||
self.num_of_services = num_of_services
|
||||
self.base_pod_num = base_pod_num
|
||||
self._edit_hyperparams_support = None
|
||||
self._user_props_cb = user_props_cb
|
||||
self.conf_file_content = None
|
||||
@@ -270,13 +272,13 @@ class K8sIntegration(Worker):
|
||||
return
|
||||
|
||||
if task_data.execution.docker_cmd:
|
||||
docker_parts = task_data.execution.docker_cmd
|
||||
docker_cmd = task_data.execution.docker_cmd
|
||||
else:
|
||||
docker_parts = str(ENV_DOCKER_IMAGE.get() or
|
||||
self._session.config.get("agent.default_docker.image", "nvidia/cuda"))
|
||||
docker_cmd = str(ENV_DOCKER_IMAGE.get() or
|
||||
self._session.config.get("agent.default_docker.image", "nvidia/cuda"))
|
||||
|
||||
# take the first part, this is the docker image name (not arguments)
|
||||
docker_parts = docker_parts.split()
|
||||
docker_parts = docker_cmd.split()
|
||||
docker_image = docker_parts[0]
|
||||
docker_args = docker_parts[1:] if len(docker_parts) > 1 else []
|
||||
|
||||
@@ -306,8 +308,10 @@ class K8sIntegration(Worker):
|
||||
safe_queue_name = re.sub(r'\W+', '', safe_queue_name).replace('_', '').replace('-', '')
|
||||
|
||||
# Search for a free pod number
|
||||
pod_number = 1
|
||||
pod_count = 0
|
||||
pod_number = self.base_pod_num
|
||||
while self.ports_mode:
|
||||
pod_number = self.base_pod_num + pod_count
|
||||
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format(
|
||||
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
|
||||
agent_label=self.AGENT_LABEL,
|
||||
@@ -321,7 +325,7 @@ class K8sIntegration(Worker):
|
||||
if not output:
|
||||
# No such pod exist so we can use the pod_number we found
|
||||
break
|
||||
if pod_number >= self.num_of_services:
|
||||
if pod_count >= self.num_of_services - 1:
|
||||
# All pod numbers are taken, exit
|
||||
self.log.warning(
|
||||
"kubectl last result: {}\n{}\nAll k8s services are in use, task '{}' "
|
||||
@@ -333,12 +337,12 @@ class K8sIntegration(Worker):
|
||||
self._session.api_client.tasks.enqueue(
|
||||
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)')
|
||||
return
|
||||
pod_number += 1
|
||||
pod_count += 1
|
||||
|
||||
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL]
|
||||
|
||||
if self.ports_mode:
|
||||
print("Kubernetes scheduling task id={} on pod={}".format(task_id, pod_number))
|
||||
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
|
||||
else:
|
||||
print("Kubernetes scheduling task id={}".format(task_id))
|
||||
|
||||
@@ -350,7 +354,7 @@ class K8sIntegration(Worker):
|
||||
else:
|
||||
output, error = self._kubectl_run(
|
||||
create_clearml_conf=create_clearml_conf,
|
||||
labels=labels, docker_image=docker_image,
|
||||
labels=labels, docker_image=docker_cmd,
|
||||
task_data=task_data,
|
||||
task_id=task_id, queue=queue, queue_name=safe_queue_name)
|
||||
|
||||
@@ -364,7 +368,13 @@ class K8sIntegration(Worker):
|
||||
|
||||
user_props = {"k8s-queue": str(queue_name)}
|
||||
if self.ports_mode:
|
||||
user_props.update({"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]})
|
||||
user_props.update(
|
||||
{
|
||||
"k8s-pod-number": pod_number,
|
||||
"k8s-pod-label": labels[0],
|
||||
"k8s-internal-pod-count": pod_count,
|
||||
}
|
||||
)
|
||||
|
||||
if self._user_props_cb:
|
||||
# noinspection PyBroadException
|
||||
|
||||
@@ -24,7 +24,6 @@ import pyhocon
|
||||
import yaml
|
||||
from attr import fields_dict
|
||||
from pathlib2 import Path
|
||||
from tqdm import tqdm
|
||||
|
||||
import six
|
||||
from six.moves import reduce
|
||||
@@ -399,12 +398,6 @@ class TqdmStream(object):
|
||||
self.buffer.write('\n')
|
||||
|
||||
|
||||
class TqdmLog(tqdm):
|
||||
|
||||
def __init__(self, iterable=None, file=None, **kwargs):
|
||||
super(TqdmLog, self).__init__(iterable, file=TqdmStream(file or sys.stderr), **kwargs)
|
||||
|
||||
|
||||
def url_join(first, *rest):
|
||||
"""
|
||||
Join url parts similarly to Path.join
|
||||
|
||||
@@ -20,6 +20,7 @@ import platform
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
import psutil
|
||||
from ..gpu import pynvml as N
|
||||
@@ -390,3 +391,34 @@ def new_query(shutdown=False, per_process_stats=False, get_driver_info=False):
|
||||
'''
|
||||
return GPUStatCollection.new_query(shutdown=shutdown, per_process_stats=per_process_stats,
|
||||
get_driver_info=get_driver_info)
|
||||
|
||||
|
||||
def get_driver_cuda_version():
|
||||
# type: () -> Optional[str]
|
||||
"""
|
||||
:return: Return detected CUDA version from driver. On fail return value is None.
|
||||
Example: `110` is cuda version 11.0
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
N.nvmlInit()
|
||||
except BaseException:
|
||||
return None
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
cuda_version = str(N.nvmlSystemGetCudaDriverVersion())
|
||||
except BaseException:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
cuda_version = str(N.nvmlSystemGetCudaDriverVersion_v2())
|
||||
except BaseException:
|
||||
cuda_version = ''
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
N.nvmlShutdown()
|
||||
except BaseException:
|
||||
return None
|
||||
|
||||
return cuda_version[:3] if cuda_version else None
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
218
clearml_agent/helper/os/folder_cache.py
Normal file
218
clearml_agent/helper/os/folder_cache.py
Normal file
@@ -0,0 +1,218 @@
|
||||
import os
|
||||
import shutil
|
||||
from logging import warning
|
||||
from random import random
|
||||
from time import time
|
||||
from typing import List, Optional, Sequence
|
||||
|
||||
import psutil
|
||||
from pathlib2 import Path
|
||||
|
||||
from .locks import FileLock
|
||||
|
||||
|
||||
class FolderCache(object):
|
||||
_lock_filename = '.clearml.lock'
|
||||
_lock_timeout_seconds = 30
|
||||
_temp_entry_prefix = '_temp.'
|
||||
|
||||
def __init__(self, cache_folder, max_cache_entries=5, min_free_space_gb=None):
|
||||
self._cache_folder = Path(os.path.expandvars(cache_folder)).expanduser().absolute()
|
||||
self._cache_folder.mkdir(parents=True, exist_ok=True)
|
||||
self._max_cache_entries = max_cache_entries
|
||||
self._last_copied_entry_folder = None
|
||||
self._min_free_space_gb = min_free_space_gb if min_free_space_gb and min_free_space_gb > 0 else None
|
||||
self._lock = FileLock((self._cache_folder / self._lock_filename).as_posix())
|
||||
|
||||
def get_cache_folder(self):
|
||||
# type: () -> Path
|
||||
"""
|
||||
:return: Return the base cache folder
|
||||
"""
|
||||
return self._cache_folder
|
||||
|
||||
def copy_cached_entry(self, keys, destination):
|
||||
# type: (List[str], Path) -> Optional[Path]
|
||||
"""
|
||||
Copy a cached entry into a destination directory, if the cached entry does not exist return None
|
||||
:param keys:
|
||||
:param destination:
|
||||
:return: Target path, None if cached entry does not exist
|
||||
"""
|
||||
self._last_copied_entry_folder = None
|
||||
if not keys:
|
||||
return None
|
||||
|
||||
# lock so we make sure no one deletes it before we copy it
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self._lock.acquire(timeout=self._lock_timeout_seconds)
|
||||
except BaseException as ex:
|
||||
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
|
||||
return None
|
||||
|
||||
src = None
|
||||
try:
|
||||
src = self.get_entry(keys)
|
||||
if src:
|
||||
destination = Path(destination).absolute()
|
||||
destination.mkdir(parents=True, exist_ok=True)
|
||||
shutil.rmtree(destination.as_posix())
|
||||
shutil.copytree(src.as_posix(), dst=destination.as_posix(), symlinks=True)
|
||||
except BaseException as ex:
|
||||
warning('Could not copy cache folder {} to {}: {}'.format(src, destination, ex))
|
||||
self._lock.release()
|
||||
return None
|
||||
|
||||
# release Lock
|
||||
self._lock.release()
|
||||
|
||||
self._last_copied_entry_folder = src
|
||||
return destination if src else None
|
||||
|
||||
def get_entry(self, keys):
|
||||
# type: (List[str]) -> Optional[Path]
|
||||
"""
|
||||
Return a folder (a sub-folder of inside the cache_folder) matching one of the keys
|
||||
:param keys: List of keys, return the first match to one of the keys, notice keys cannot contain '.'
|
||||
:return: Path to the sub-folder or None if none was found
|
||||
"""
|
||||
if not keys:
|
||||
return None
|
||||
# conform keys
|
||||
keys = [keys] if isinstance(keys, str) else keys
|
||||
keys = sorted([k.replace('.', '_') for k in keys])
|
||||
for cache_folder in self._cache_folder.glob('*'):
|
||||
if cache_folder.is_dir() and any(True for k in cache_folder.name.split('.') if k in keys):
|
||||
cache_folder.touch()
|
||||
return cache_folder
|
||||
return None
|
||||
|
||||
def add_entry(self, keys, source_folder, exclude_sub_folders=None):
|
||||
# type: (List[str], Path, Optional[Sequence[str]]) -> bool
|
||||
"""
|
||||
Add a local folder into the cache, copy all sub-folders inside `source_folder`
|
||||
excluding folders matching `exclude_sub_folders` list
|
||||
:param keys: Cache entry keys list (str)
|
||||
:param source_folder: Folder to copy into the cache
|
||||
:param exclude_sub_folders: List of sub-folders to exclude from the copy operation
|
||||
:return: return True is new entry was added to cache
|
||||
"""
|
||||
if not keys:
|
||||
return False
|
||||
|
||||
keys = [keys] if isinstance(keys, str) else keys
|
||||
keys = sorted([k.replace('.', '_') for k in keys])
|
||||
|
||||
# If entry already exists skip it
|
||||
cached_entry = self.get_entry(keys)
|
||||
if cached_entry:
|
||||
# make sure the entry contains all keys
|
||||
cached_keys = cached_entry.name.split('.')
|
||||
if set(keys) - set(cached_keys):
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self._lock.acquire(timeout=self._lock_timeout_seconds)
|
||||
except BaseException as ex:
|
||||
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
|
||||
# failed locking do nothing
|
||||
return True
|
||||
keys = sorted(list(set(keys) | set(cached_keys)))
|
||||
dst = cached_entry.parent / '.'.join(keys)
|
||||
# rename
|
||||
try:
|
||||
shutil.move(src=cached_entry.as_posix(), dst=dst.as_posix())
|
||||
except BaseException as ex:
|
||||
warning('Could not rename cache entry {} to {}: ex'.format(
|
||||
cached_entry.as_posix(), dst.as_posix(), ex))
|
||||
# release lock
|
||||
self._lock.release()
|
||||
return True
|
||||
|
||||
# make sure we remove old entries
|
||||
self._remove_old_entries()
|
||||
|
||||
# if we do not have enough free space, do nothing.
|
||||
if not self._check_min_free_space():
|
||||
warning('Could not add cache entry, not enough free space on drive, '
|
||||
'free space threshold {} GB. Clearing all cache entries!'.format(self._min_free_space_gb))
|
||||
self._remove_old_entries(max_cache_entries=0)
|
||||
return False
|
||||
|
||||
# create the new entry for us
|
||||
exclude_sub_folders = exclude_sub_folders or []
|
||||
source_folder = Path(source_folder).absolute()
|
||||
# create temp folder
|
||||
temp_folder = \
|
||||
self._temp_entry_prefix + \
|
||||
'{}.{}'.format(str(time()).replace('.', '_'), str(random()).replace('.', '_'))
|
||||
temp_folder = self._cache_folder / temp_folder
|
||||
temp_folder.mkdir(parents=True, exist_ok=False)
|
||||
|
||||
for f in source_folder.glob('*'):
|
||||
if f.name in exclude_sub_folders:
|
||||
continue
|
||||
shutil.copytree(src=f.as_posix(), dst=(temp_folder / f.name).as_posix(), symlinks=True)
|
||||
|
||||
# rename the target folder
|
||||
target_cache_folder = self._cache_folder / '.'.join(keys)
|
||||
# if we failed moving it means someone else created the cached entry before us, we can just leave
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
shutil.move(src=temp_folder.as_posix(), dst=target_cache_folder.as_posix())
|
||||
except BaseException:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
shutil.rmtree(path=temp_folder.as_posix())
|
||||
except BaseException:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def get_last_copied_entry(self):
|
||||
# type: () -> Optional[Path]
|
||||
"""
|
||||
:return: the last copied cached entry folder inside the cache
|
||||
"""
|
||||
return self._last_copied_entry_folder
|
||||
|
||||
def _remove_old_entries(self, max_cache_entries=None):
|
||||
# type: (Optional[int]) -> ()
|
||||
"""
|
||||
Notice we only keep self._max_cache_entries-1, assuming we will be adding a new entry soon
|
||||
:param int max_cache_entries: if not None use instead of self._max_cache_entries
|
||||
"""
|
||||
folder_entries = [(cache_folder, cache_folder.stat().st_mtime)
|
||||
for cache_folder in self._cache_folder.glob('*')
|
||||
if cache_folder.is_dir() and not cache_folder.name.startswith(self._temp_entry_prefix)]
|
||||
folder_entries = sorted(folder_entries, key=lambda x: x[1], reverse=True)
|
||||
|
||||
# lock so we make sure no one deletes it before we copy it
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self._lock.acquire(timeout=self._lock_timeout_seconds)
|
||||
except BaseException as ex:
|
||||
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
|
||||
return
|
||||
|
||||
number_of_entries_to_keep = self._max_cache_entries - 1 \
|
||||
if max_cache_entries is None else max(0, int(max_cache_entries))
|
||||
for folder, ts in folder_entries[number_of_entries_to_keep:]:
|
||||
try:
|
||||
shutil.rmtree(folder.as_posix(), ignore_errors=True)
|
||||
except BaseException as ex:
|
||||
warning('Could not delete cache entry {}: {}'.format(folder.as_posix(), ex))
|
||||
|
||||
self._lock.release()
|
||||
|
||||
def _check_min_free_space(self):
|
||||
# type: () -> bool
|
||||
"""
|
||||
:return: return False if we hit the free space limit.
|
||||
If not free space limit provided, always return True
|
||||
"""
|
||||
if not self._min_free_space_gb or not self._cache_folder:
|
||||
return True
|
||||
free_space = float(psutil.disk_usage(self._cache_folder.as_posix()).free)
|
||||
free_space /= 2**30
|
||||
return free_space > self._min_free_space_gb
|
||||
211
clearml_agent/helper/os/locks.py
Normal file
211
clearml_agent/helper/os/locks.py
Normal file
@@ -0,0 +1,211 @@
|
||||
import os
|
||||
import time
|
||||
import tempfile
|
||||
import contextlib
|
||||
|
||||
from .portalocker import constants, exceptions, lock, unlock
|
||||
|
||||
|
||||
current_time = getattr(time, "monotonic", time.time)
|
||||
|
||||
DEFAULT_TIMEOUT = 10 ** 8
|
||||
DEFAULT_CHECK_INTERVAL = 0.25
|
||||
LOCK_METHOD = constants.LOCK_EX | constants.LOCK_NB
|
||||
|
||||
__all__ = [
|
||||
'FileLock',
|
||||
'open_atomic',
|
||||
]
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def open_atomic(filename, binary=True):
|
||||
"""Open a file for atomic writing. Instead of locking this method allows
|
||||
you to write the entire file and move it to the actual location. Note that
|
||||
this makes the assumption that a rename is atomic on your platform which
|
||||
is generally the case but not a guarantee.
|
||||
|
||||
http://docs.python.org/library/os.html#os.rename
|
||||
|
||||
>>> filename = 'test_file.txt'
|
||||
>>> if os.path.exists(filename):
|
||||
... os.remove(filename)
|
||||
|
||||
>>> with open_atomic(filename) as fh:
|
||||
... written = fh.write(b'test')
|
||||
>>> assert os.path.exists(filename)
|
||||
>>> os.remove(filename)
|
||||
|
||||
"""
|
||||
assert not os.path.exists(filename), '%r exists' % filename
|
||||
path, name = os.path.split(filename)
|
||||
|
||||
# Create the parent directory if it doesn't exist
|
||||
if path and not os.path.isdir(path): # pragma: no cover
|
||||
os.makedirs(path)
|
||||
|
||||
temp_fh = tempfile.NamedTemporaryFile(
|
||||
mode=binary and 'wb' or 'w',
|
||||
dir=path,
|
||||
delete=False,
|
||||
)
|
||||
yield temp_fh
|
||||
temp_fh.flush()
|
||||
os.fsync(temp_fh.fileno())
|
||||
temp_fh.close()
|
||||
try:
|
||||
os.rename(temp_fh.name, filename)
|
||||
finally:
|
||||
try:
|
||||
os.remove(temp_fh.name)
|
||||
except Exception: # noqa
|
||||
pass
|
||||
|
||||
|
||||
class FileLock(object):
|
||||
|
||||
def __init__(
|
||||
self, filename, mode='a', timeout=DEFAULT_TIMEOUT,
|
||||
check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=False,
|
||||
flags=LOCK_METHOD, **file_open_kwargs):
|
||||
"""Lock manager with build-in timeout
|
||||
|
||||
filename -- filename
|
||||
mode -- the open mode, 'a' or 'ab' should be used for writing
|
||||
truncate -- use truncate to emulate 'w' mode, None is disabled, 0 is
|
||||
truncate to 0 bytes
|
||||
timeout -- timeout when trying to acquire a lock
|
||||
check_interval -- check interval while waiting
|
||||
fail_when_locked -- after the initial lock failed, return an error
|
||||
or lock the file
|
||||
**file_open_kwargs -- The kwargs for the `open(...)` call
|
||||
|
||||
fail_when_locked is useful when multiple threads/processes can race
|
||||
when creating a file. If set to true than the system will wait till
|
||||
the lock was acquired and then return an AlreadyLocked exception.
|
||||
|
||||
Note that the file is opened first and locked later. So using 'w' as
|
||||
mode will result in truncate _BEFORE_ the lock is checked.
|
||||
"""
|
||||
|
||||
if 'w' in mode:
|
||||
truncate = True
|
||||
mode = mode.replace('w', 'a')
|
||||
else:
|
||||
truncate = False
|
||||
|
||||
self.fh = None
|
||||
self.filename = filename
|
||||
self.mode = mode
|
||||
self.truncate = truncate
|
||||
self.timeout = timeout
|
||||
self.check_interval = check_interval
|
||||
self.fail_when_locked = fail_when_locked
|
||||
self.flags = flags
|
||||
self.file_open_kwargs = file_open_kwargs
|
||||
|
||||
def acquire(
|
||||
self, timeout=None, check_interval=None, fail_when_locked=None):
|
||||
"""Acquire the locked filehandle"""
|
||||
if timeout is None:
|
||||
timeout = self.timeout
|
||||
if timeout is None:
|
||||
timeout = 0
|
||||
|
||||
if check_interval is None:
|
||||
check_interval = self.check_interval
|
||||
|
||||
if fail_when_locked is None:
|
||||
fail_when_locked = self.fail_when_locked
|
||||
|
||||
# If we already have a filehandle, return it
|
||||
fh = self.fh
|
||||
if fh:
|
||||
return fh
|
||||
|
||||
# Get a new filehandler
|
||||
fh = self._get_fh()
|
||||
try:
|
||||
# Try to lock
|
||||
fh = self._get_lock(fh)
|
||||
except exceptions.LockException as exception:
|
||||
# Try till the timeout has passed
|
||||
timeoutend = current_time() + timeout
|
||||
while timeoutend > current_time():
|
||||
# Wait a bit
|
||||
time.sleep(check_interval)
|
||||
|
||||
# Try again
|
||||
try:
|
||||
|
||||
# We already tried to the get the lock
|
||||
# If fail_when_locked is true, then stop trying
|
||||
if fail_when_locked:
|
||||
raise exceptions.AlreadyLocked(exception)
|
||||
|
||||
else: # pragma: no cover
|
||||
# We've got the lock
|
||||
fh = self._get_lock(fh)
|
||||
break
|
||||
|
||||
except exceptions.LockException:
|
||||
pass
|
||||
|
||||
else:
|
||||
# We got a timeout... reraising
|
||||
raise exceptions.LockException(exception)
|
||||
|
||||
# Prepare the filehandle (truncate if needed)
|
||||
fh = self._prepare_fh(fh)
|
||||
|
||||
self.fh = fh
|
||||
return fh
|
||||
|
||||
def release(self):
|
||||
"""Releases the currently locked file handle"""
|
||||
if self.fh:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
unlock(self.fh)
|
||||
except Exception:
|
||||
pass
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.fh.close()
|
||||
except Exception:
|
||||
pass
|
||||
self.fh = None
|
||||
|
||||
def _get_fh(self):
|
||||
"""Get a new filehandle"""
|
||||
return open(self.filename, self.mode, **self.file_open_kwargs)
|
||||
|
||||
def _get_lock(self, fh):
|
||||
"""
|
||||
Try to lock the given filehandle
|
||||
|
||||
returns LockException if it fails"""
|
||||
lock(fh, self.flags)
|
||||
return fh
|
||||
|
||||
def _prepare_fh(self, fh):
|
||||
"""
|
||||
Prepare the filehandle for usage
|
||||
|
||||
If truncate is a number, the file will be truncated to that amount of
|
||||
bytes
|
||||
"""
|
||||
if self.truncate:
|
||||
fh.seek(0)
|
||||
fh.truncate(0)
|
||||
|
||||
return fh
|
||||
|
||||
def __enter__(self):
|
||||
return self.acquire()
|
||||
|
||||
def __exit__(self, type_, value, tb):
|
||||
self.release()
|
||||
|
||||
def __delete__(self, instance): # pragma: no cover
|
||||
instance.release()
|
||||
193
clearml_agent/helper/os/portalocker.py
Normal file
193
clearml_agent/helper/os/portalocker.py
Normal file
@@ -0,0 +1,193 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
|
||||
class exceptions:
|
||||
class BaseLockException(Exception):
|
||||
# Error codes:
|
||||
LOCK_FAILED = 1
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.fh = kwargs.pop('fh', None)
|
||||
Exception.__init__(self, *args, **kwargs)
|
||||
|
||||
class LockException(BaseLockException):
|
||||
pass
|
||||
|
||||
class AlreadyLocked(BaseLockException):
|
||||
pass
|
||||
|
||||
class FileToLarge(BaseLockException):
|
||||
pass
|
||||
|
||||
|
||||
class constants:
|
||||
# The actual tests will execute the code anyhow so the following code can
|
||||
# safely be ignored from the coverage tests
|
||||
if os.name == 'nt': # pragma: no cover
|
||||
import msvcrt
|
||||
|
||||
LOCK_EX = 0x1 #: exclusive lock
|
||||
LOCK_SH = 0x2 #: shared lock
|
||||
LOCK_NB = 0x4 #: non-blocking
|
||||
LOCK_UN = msvcrt.LK_UNLCK #: unlock
|
||||
|
||||
LOCKFILE_FAIL_IMMEDIATELY = 1
|
||||
LOCKFILE_EXCLUSIVE_LOCK = 2
|
||||
|
||||
elif os.name == 'posix': # pragma: no cover
|
||||
import fcntl
|
||||
|
||||
LOCK_EX = fcntl.LOCK_EX #: exclusive lock
|
||||
LOCK_SH = fcntl.LOCK_SH #: shared lock
|
||||
LOCK_NB = fcntl.LOCK_NB #: non-blocking
|
||||
LOCK_UN = fcntl.LOCK_UN #: unlock
|
||||
|
||||
else: # pragma: no cover
|
||||
raise RuntimeError('PortaLocker only defined for nt and posix platforms')
|
||||
|
||||
|
||||
if os.name == 'nt': # pragma: no cover
|
||||
import msvcrt
|
||||
|
||||
if sys.version_info.major == 2:
|
||||
lock_length = -1
|
||||
else:
|
||||
lock_length = int(2**31 - 1)
|
||||
|
||||
def lock(file_, flags):
|
||||
if flags & constants.LOCK_SH:
|
||||
import win32file
|
||||
import pywintypes
|
||||
import winerror
|
||||
__overlapped = pywintypes.OVERLAPPED()
|
||||
if sys.version_info.major == 2:
|
||||
if flags & constants.LOCK_NB:
|
||||
mode = constants.LOCKFILE_FAIL_IMMEDIATELY
|
||||
else:
|
||||
mode = 0
|
||||
|
||||
else:
|
||||
if flags & constants.LOCK_NB:
|
||||
mode = msvcrt.LK_NBRLCK
|
||||
else:
|
||||
mode = msvcrt.LK_RLCK
|
||||
|
||||
# is there any reason not to reuse the following structure?
|
||||
hfile = win32file._get_osfhandle(file_.fileno())
|
||||
try:
|
||||
win32file.LockFileEx(hfile, mode, 0, -0x10000, __overlapped)
|
||||
except pywintypes.error as exc_value:
|
||||
# error: (33, 'LockFileEx', 'The process cannot access the file
|
||||
# because another process has locked a portion of the file.')
|
||||
if exc_value.winerror == winerror.ERROR_LOCK_VIOLATION:
|
||||
raise exceptions.LockException(
|
||||
exceptions.LockException.LOCK_FAILED,
|
||||
exc_value.strerror,
|
||||
fh=file_)
|
||||
else:
|
||||
# Q: Are there exceptions/codes we should be dealing with
|
||||
# here?
|
||||
raise
|
||||
else:
|
||||
mode = constants.LOCKFILE_EXCLUSIVE_LOCK
|
||||
if flags & constants.LOCK_NB:
|
||||
mode |= constants.LOCKFILE_FAIL_IMMEDIATELY
|
||||
|
||||
if flags & constants.LOCK_NB:
|
||||
mode = msvcrt.LK_NBLCK
|
||||
else:
|
||||
mode = msvcrt.LK_LOCK
|
||||
|
||||
# windows locks byte ranges, so make sure to lock from file start
|
||||
try:
|
||||
savepos = file_.tell()
|
||||
if savepos:
|
||||
# [ ] test exclusive lock fails on seek here
|
||||
# [ ] test if shared lock passes this point
|
||||
file_.seek(0)
|
||||
# [x] check if 0 param locks entire file (not documented in
|
||||
# Python)
|
||||
# [x] fails with "IOError: [Errno 13] Permission denied",
|
||||
# but -1 seems to do the trick
|
||||
|
||||
try:
|
||||
msvcrt.locking(file_.fileno(), mode, lock_length)
|
||||
except IOError as exc_value:
|
||||
# [ ] be more specific here
|
||||
raise exceptions.LockException(
|
||||
exceptions.LockException.LOCK_FAILED,
|
||||
exc_value.strerror,
|
||||
fh=file_)
|
||||
finally:
|
||||
if savepos:
|
||||
file_.seek(savepos)
|
||||
except IOError as exc_value:
|
||||
raise exceptions.LockException(
|
||||
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
|
||||
fh=file_)
|
||||
|
||||
def unlock(file_):
|
||||
try:
|
||||
savepos = file_.tell()
|
||||
if savepos:
|
||||
file_.seek(0)
|
||||
|
||||
try:
|
||||
msvcrt.locking(file_.fileno(), constants.LOCK_UN, lock_length)
|
||||
except IOError as exc_value:
|
||||
if exc_value.strerror == 'Permission denied':
|
||||
import pywintypes
|
||||
import win32file
|
||||
import winerror
|
||||
__overlapped = pywintypes.OVERLAPPED()
|
||||
hfile = win32file._get_osfhandle(file_.fileno())
|
||||
try:
|
||||
win32file.UnlockFileEx(
|
||||
hfile, 0, -0x10000, __overlapped)
|
||||
except pywintypes.error as exc_value:
|
||||
if exc_value.winerror == winerror.ERROR_NOT_LOCKED:
|
||||
# error: (158, 'UnlockFileEx',
|
||||
# 'The segment is already unlocked.')
|
||||
# To match the 'posix' implementation, silently
|
||||
# ignore this error
|
||||
pass
|
||||
else:
|
||||
# Q: Are there exceptions/codes we should be
|
||||
# dealing with here?
|
||||
raise
|
||||
else:
|
||||
raise exceptions.LockException(
|
||||
exceptions.LockException.LOCK_FAILED,
|
||||
exc_value.strerror,
|
||||
fh=file_)
|
||||
finally:
|
||||
if savepos:
|
||||
file_.seek(savepos)
|
||||
except IOError as exc_value:
|
||||
raise exceptions.LockException(
|
||||
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
|
||||
fh=file_)
|
||||
|
||||
elif os.name == 'posix': # pragma: no cover
|
||||
import fcntl
|
||||
|
||||
def lock(file_, flags):
|
||||
locking_exceptions = IOError,
|
||||
try: # pragma: no cover
|
||||
locking_exceptions += BlockingIOError,
|
||||
except NameError: # pragma: no cover
|
||||
pass
|
||||
|
||||
try:
|
||||
fcntl.flock(file_.fileno(), flags)
|
||||
except locking_exceptions as exc_value:
|
||||
# The exception code varies on different systems so we'll catch
|
||||
# every IO error
|
||||
raise exceptions.LockException(exc_value, fh=file_)
|
||||
|
||||
def unlock(file_):
|
||||
fcntl.flock(file_.fileno(), constants.LOCK_UN)
|
||||
|
||||
else: # pragma: no cover
|
||||
raise RuntimeError('PortaLocker only defined for nt and posix platforms')
|
||||
@@ -1,11 +1,16 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import abc
|
||||
from collections import OrderedDict
|
||||
from contextlib import contextmanager
|
||||
from typing import Text, Iterable, Union
|
||||
from typing import Text, Iterable, Union, Optional, Dict, List
|
||||
from pathlib2 import Path
|
||||
from hashlib import md5
|
||||
|
||||
import six
|
||||
from clearml_agent.helper.base import mkstemp, safe_remove_file, join_lines, select_for_platform
|
||||
from clearml_agent.helper.console import ensure_binary
|
||||
from clearml_agent.helper.os.folder_cache import FolderCache
|
||||
from clearml_agent.helper.process import Executable, Argv, PathLike
|
||||
|
||||
|
||||
@@ -18,6 +23,12 @@ class PackageManager(object):
|
||||
_selected_manager = None
|
||||
_cwd = None
|
||||
_pip_version = None
|
||||
_config_cache_folder = 'agent.venvs_cache.path'
|
||||
_config_cache_max_entries = 'agent.venvs_cache.max_entries'
|
||||
_config_cache_free_space_threshold = 'agent.venvs_cache.free_space_threshold_gb'
|
||||
|
||||
def __init__(self):
|
||||
self._cache_manager = None
|
||||
|
||||
@abc.abstractproperty
|
||||
def bin(self):
|
||||
@@ -153,3 +164,100 @@ class PackageManager(object):
|
||||
@classmethod
|
||||
def get_pip_version(cls):
|
||||
return cls._pip_version or ''
|
||||
|
||||
def get_cached_venv(self, requirements, docker_cmd, python_version, cuda_version, destination_folder):
|
||||
# type: (Dict, Optional[Union[dict, str]], Optional[str], Optional[str], Path) -> Optional[Path]
|
||||
"""
|
||||
Copy a cached copy of the venv (based on the requirements) into destination_folder.
|
||||
Return None if failed or cached entry does not exist
|
||||
"""
|
||||
if not self._get_cache_manager():
|
||||
return None
|
||||
|
||||
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
|
||||
return self._get_cache_manager().copy_cached_entry(keys, destination_folder)
|
||||
|
||||
def add_cached_venv(
|
||||
self,
|
||||
requirements, # type: Union[Dict, List[Dict]]
|
||||
docker_cmd, # type: Optional[Union[dict, str]]
|
||||
python_version, # type: Optional[str]
|
||||
cuda_version, # type: Optional[str]
|
||||
source_folder, # type: Path
|
||||
exclude_sub_folders=None # type: Optional[List[str]]
|
||||
):
|
||||
# type: (...) -> ()
|
||||
"""
|
||||
Copy the local venv folder into the venv cache (keys are based on the requirements+python+docker).
|
||||
"""
|
||||
if not self._get_cache_manager():
|
||||
return
|
||||
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
|
||||
return self._get_cache_manager().add_entry(
|
||||
keys=keys, source_folder=source_folder, exclude_sub_folders=exclude_sub_folders)
|
||||
|
||||
def get_cache_folder(self):
|
||||
# type: () -> Optional[Path]
|
||||
if not self._get_cache_manager():
|
||||
return
|
||||
return self._get_cache_manager().get_cache_folder()
|
||||
|
||||
def get_last_used_entry_cache(self):
|
||||
# type: () -> Optional[Path]
|
||||
"""
|
||||
:return: the last used cached folder entry
|
||||
"""
|
||||
if not self._get_cache_manager():
|
||||
return
|
||||
return self._get_cache_manager().get_last_copied_entry()
|
||||
|
||||
@classmethod
|
||||
def _generate_reqs_hash_keys(cls, requirements_list, docker_cmd, python_version, cuda_version):
|
||||
# type: (Union[Dict, List[Dict]], Optional[Union[dict, str]], Optional[str], Optional[str]) -> List[str]
|
||||
requirements_list = requirements_list or dict()
|
||||
if not isinstance(requirements_list, (list, tuple)):
|
||||
requirements_list = [requirements_list]
|
||||
docker_cmd = dict(docker_cmd=docker_cmd) if isinstance(docker_cmd, str) else docker_cmd or dict()
|
||||
docker_cmd = OrderedDict(sorted(docker_cmd.items(), key=lambda t: t[0]))
|
||||
if 'docker_cmd' in docker_cmd:
|
||||
# we only take the first part of the docker_cmd which is the docker image name
|
||||
docker_cmd['docker_cmd'] = docker_cmd['docker_cmd'].strip('\r\n\t ').split(' ')[0]
|
||||
|
||||
keys = []
|
||||
strip_chars = '\n\r\t '
|
||||
for requirements in requirements_list:
|
||||
pip, conda = ('pip', 'conda')
|
||||
pip_reqs = requirements.get(pip, '')
|
||||
conda_reqs = requirements.get(conda, '')
|
||||
if isinstance(pip_reqs, str):
|
||||
pip_reqs = pip_reqs.split('\n')
|
||||
if isinstance(conda_reqs, str):
|
||||
conda_reqs = conda_reqs.split('\n')
|
||||
pip_reqs = sorted([p.strip(strip_chars) for p in pip_reqs
|
||||
if p.strip(strip_chars) and not p.strip(strip_chars).startswith('#')])
|
||||
conda_reqs = sorted([p.strip(strip_chars) for p in conda_reqs
|
||||
if p.strip(strip_chars) and not p.strip(strip_chars).startswith('#')])
|
||||
if not pip_reqs and not conda_reqs:
|
||||
continue
|
||||
hash_text = '{class_type}\n{docker_cmd}\n{cuda_ver}\n{python_version}\n{pip_reqs}\n{conda_reqs}'.format(
|
||||
class_type=str(cls),
|
||||
docker_cmd=str(docker_cmd or ''),
|
||||
cuda_ver=str(cuda_version or ''),
|
||||
python_version=str(python_version or ''),
|
||||
pip_reqs=str(pip_reqs or ''),
|
||||
conda_reqs=str(conda_reqs or ''),
|
||||
)
|
||||
keys.append(md5(ensure_binary(hash_text)).hexdigest())
|
||||
return sorted(list(set(keys)))
|
||||
|
||||
def _get_cache_manager(self):
|
||||
if not self._cache_manager:
|
||||
cache_folder = self.session.config.get(self._config_cache_folder, None)
|
||||
if not cache_folder:
|
||||
return None
|
||||
|
||||
max_entries = int(self.session.config.get(self._config_cache_max_entries, 10))
|
||||
free_space_threshold = float(self.session.config.get(self._config_cache_free_space_threshold, 0))
|
||||
self._cache_manager = FolderCache(
|
||||
cache_folder, max_cache_entries=max_entries, min_free_space_gb=free_space_threshold)
|
||||
return self._cache_manager
|
||||
|
||||
@@ -69,6 +69,7 @@ class CondaAPI(PackageManager):
|
||||
:param python: base python version to use (e.g python3.6)
|
||||
:param path: path of env
|
||||
"""
|
||||
super(CondaAPI, self).__init__()
|
||||
self.session = session
|
||||
self.python = python
|
||||
self.source = None
|
||||
@@ -140,19 +141,7 @@ class CondaAPI(PackageManager):
|
||||
"""
|
||||
if self.conda_env_as_base_docker and self.conda_pre_build_env_path:
|
||||
if Path(self.conda_pre_build_env_path).is_dir():
|
||||
print("Using pre-existing Conda environment from {}".format(self.conda_pre_build_env_path))
|
||||
self.path = Path(self.conda_pre_build_env_path)
|
||||
self.source = ("conda", "activate", self.path.as_posix())
|
||||
self.pip = CondaPip(
|
||||
session=self.session,
|
||||
source=self.source,
|
||||
python=self.python,
|
||||
requirements_manager=self.requirements_manager,
|
||||
path=self.path,
|
||||
)
|
||||
conda_env = self._get_conda_sh()
|
||||
self.source = self.pip.source = CommandSequence(('source', conda_env.as_posix()), self.source)
|
||||
self.env_read_only = True
|
||||
self._init_existing_environment(self.conda_pre_build_env_path)
|
||||
return self
|
||||
elif Path(self.conda_pre_build_env_path).is_file():
|
||||
print("Restoring Conda environment from {}".format(self.conda_pre_build_env_path))
|
||||
@@ -210,6 +199,21 @@ class CondaAPI(PackageManager):
|
||||
pass
|
||||
return self
|
||||
|
||||
def _init_existing_environment(self, conda_pre_build_env_path):
|
||||
print("Using pre-existing Conda environment from {}".format(conda_pre_build_env_path))
|
||||
self.path = Path(conda_pre_build_env_path)
|
||||
self.source = ("conda", "activate", self.path.as_posix())
|
||||
self.pip = CondaPip(
|
||||
session=self.session,
|
||||
source=self.source,
|
||||
python=self.python,
|
||||
requirements_manager=self.requirements_manager,
|
||||
path=self.path,
|
||||
)
|
||||
conda_env = self._get_conda_sh()
|
||||
self.source = self.pip.source = CommandSequence(('source', conda_env.as_posix()), self.source)
|
||||
self.env_read_only = True
|
||||
|
||||
def remove(self):
|
||||
"""
|
||||
Delete a conda environment.
|
||||
@@ -501,6 +505,8 @@ class CondaAPI(PackageManager):
|
||||
reqs.append(m)
|
||||
|
||||
# if we have a conda list, the rest should be installed with pip,
|
||||
# this means any experiment that was executed with pip environment,
|
||||
# will be installed using pip
|
||||
if requirements.get('conda', None) is not None:
|
||||
for r in requirements['pip']:
|
||||
try:
|
||||
@@ -514,7 +520,7 @@ class CondaAPI(PackageManager):
|
||||
# skip over local files (we cannot change the version to a local file)
|
||||
if m.local_file:
|
||||
continue
|
||||
m_name = m.name.lower()
|
||||
m_name = (m.name or '').lower()
|
||||
if m_name in conda_supported_req_names:
|
||||
# this package is in the conda list,
|
||||
# make sure that if we changed version and we match it in conda
|
||||
@@ -551,7 +557,7 @@ class CondaAPI(PackageManager):
|
||||
# conform conda packages (version/name)
|
||||
for r in reqs:
|
||||
# change _ to - in name but not the prefix _ (as this is conda prefix)
|
||||
if not r.name.startswith('_') and not requirements.get('conda', None):
|
||||
if r.name and not r.name.startswith('_') and not requirements.get('conda', None):
|
||||
r.name = r.name.replace('_', '-')
|
||||
# remove .post from version numbers, it fails ~= version, and change == to ~=
|
||||
if r.specs and r.specs[0]:
|
||||
@@ -665,6 +671,8 @@ class CondaAPI(PackageManager):
|
||||
return result
|
||||
|
||||
def get_python_command(self, extra=()):
|
||||
if not self.source:
|
||||
self._init_existing_environment(self.path)
|
||||
return CommandSequence(self.source, self.pip.get_python_command(extra=extra))
|
||||
|
||||
def _get_conda_sh(self):
|
||||
|
||||
@@ -17,6 +17,16 @@ class ExternalRequirements(SimpleSubstitution):
|
||||
self.post_install_req_lookup = OrderedDict()
|
||||
|
||||
def match(self, req):
|
||||
# match local folder building:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
if not req.name and req.req and not req.req.editable and not req.req.vcs and \
|
||||
req.req.line and req.req.line.strip().split('#')[0] and \
|
||||
not req.req.line.strip().split('#')[0].lower().endswith('.whl'):
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# match both editable or code or unparsed
|
||||
if not (not req.name or req.req and (req.req.editable or req.req.vcs)):
|
||||
return False
|
||||
@@ -104,3 +114,20 @@ class ExternalRequirements(SimpleSubstitution):
|
||||
list_of_requirements[k] += [self.post_install_req_lookup.get(r, '')
|
||||
for r in self.post_install_req_lookup.keys() if r in original_requirements]
|
||||
return list_of_requirements
|
||||
|
||||
|
||||
class OnlyExternalRequirements(ExternalRequirements):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(OnlyExternalRequirements, self).__init__(*args, **kwargs)
|
||||
|
||||
def match(self, req):
|
||||
return not super(OnlyExternalRequirements, self).match(req)
|
||||
|
||||
def replace(self, req):
|
||||
"""
|
||||
Replace a requirement
|
||||
:raises: ValueError if version is pre-release
|
||||
"""
|
||||
# Do not store the skipped requirements
|
||||
# mark skip package
|
||||
return Text('')
|
||||
|
||||
@@ -17,6 +17,7 @@ class SystemPip(PackageManager):
|
||||
"""
|
||||
Program interface to the system pip.
|
||||
"""
|
||||
super(SystemPip, self).__init__()
|
||||
self._bin = interpreter or sys.executable
|
||||
self.session = session
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ import six
|
||||
from clearml_agent.definitions import PIP_EXTRA_INDICES
|
||||
from clearml_agent.helper.base import warning, is_conda, which, join_lines, is_windows_platform
|
||||
from clearml_agent.helper.process import Argv, PathLike
|
||||
from clearml_agent.helper.gpu.gpustat import get_driver_cuda_version
|
||||
from clearml_agent.session import Session, normalize_cuda_version
|
||||
from clearml_agent.external.requirements_parser import parse
|
||||
from clearml_agent.external.requirements_parser.requirement import Requirement
|
||||
@@ -446,6 +447,7 @@ class RequirementsManager(object):
|
||||
'cu'+agent['cuda_version'] if self.found_cuda else 'cpu')
|
||||
self.translator = RequirementsTranslator(session, interpreter=base_interpreter,
|
||||
cache_dir=pip_cache_dir.as_posix())
|
||||
self._base_interpreter = base_interpreter
|
||||
|
||||
def register(self, cls): # type: (Type[RequirementSubstitution]) -> None
|
||||
self.handlers.append(cls(self._session))
|
||||
@@ -529,6 +531,9 @@ class RequirementsManager(object):
|
||||
pass
|
||||
return requirements
|
||||
|
||||
def get_interpreter(self):
|
||||
return self._base_interpreter
|
||||
|
||||
@staticmethod
|
||||
def get_cuda_version(config): # type: (ConfigTree) -> (Text, Text)
|
||||
# we assume os.environ already updated the config['agent.cuda_version'] & config['agent.cudnn_version']
|
||||
@@ -537,6 +542,9 @@ class RequirementsManager(object):
|
||||
if cuda_version and cudnn_version:
|
||||
return normalize_cuda_version(cuda_version), normalize_cuda_version(cudnn_version)
|
||||
|
||||
if not cuda_version:
|
||||
cuda_version = get_driver_cuda_version()
|
||||
|
||||
if not cuda_version and is_windows_platform():
|
||||
try:
|
||||
cuda_vers = [int(k.replace('CUDA_PATH_V', '').replace('_', '')) for k in os.environ.keys()
|
||||
@@ -601,4 +609,3 @@ class RequirementsManager(object):
|
||||
|
||||
return (normalize_cuda_version(cuda_version or 0),
|
||||
normalize_cuda_version(cudnn_version or 0))
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ import re
|
||||
import subprocess
|
||||
import sys
|
||||
from contextlib import contextmanager
|
||||
from copy import deepcopy
|
||||
from copy import copy
|
||||
from distutils.spawn import find_executable
|
||||
from itertools import chain, repeat, islice
|
||||
from os.path import devnull
|
||||
@@ -276,9 +276,9 @@ class CommandSequence(Executable):
|
||||
self.commands = []
|
||||
for c in commands:
|
||||
if isinstance(c, CommandSequence):
|
||||
self.commands.extend(deepcopy(c.commands))
|
||||
self.commands.extend([copy(p) for p in c.commands])
|
||||
elif isinstance(c, Argv):
|
||||
self.commands.append(deepcopy(c))
|
||||
self.commands.append(copy(c))
|
||||
else:
|
||||
self.commands.append(Argv(*c, log=self._log))
|
||||
|
||||
@@ -420,7 +420,7 @@ SOURCE_COMMAND = select_for_platform(linux="source", windows="call")
|
||||
class ExitStatus(object):
|
||||
success = 0
|
||||
failure = 1
|
||||
interrupted = 2
|
||||
interrupted = -2
|
||||
|
||||
|
||||
COMMAND_SUCCESS = 0
|
||||
|
||||
@@ -5,6 +5,8 @@ import subprocess
|
||||
from distutils.spawn import find_executable
|
||||
from hashlib import md5
|
||||
from os import environ
|
||||
from random import random
|
||||
from threading import Lock
|
||||
from typing import Text, Sequence, Mapping, Iterable, TypeVar, Callable, Tuple, Optional
|
||||
|
||||
import attr
|
||||
@@ -23,6 +25,7 @@ from clearml_agent.helper.base import (
|
||||
normalize_path,
|
||||
create_file_if_not_exists,
|
||||
)
|
||||
from clearml_agent.helper.os.locks import FileLock
|
||||
from clearml_agent.helper.process import DEVNULL, Argv, PathLike, COMMAND_SUCCESS
|
||||
from clearml_agent.session import Session
|
||||
|
||||
@@ -585,6 +588,9 @@ def clone_repository_cached(session, execution, destination):
|
||||
:return: repository information
|
||||
:raises: CommandFailedError if git/hg is not installed
|
||||
"""
|
||||
# mock lock
|
||||
repo_lock = Lock()
|
||||
repo_lock_timeout_sec = 300
|
||||
repo_url = execution.repository # type: str
|
||||
parsed_url = furl(repo_url)
|
||||
no_password_url = parsed_url.copy().remove(password=True).url
|
||||
@@ -596,37 +602,48 @@ def clone_repository_cached(session, execution, destination):
|
||||
if standalone_mode:
|
||||
cached_repo_path = clone_folder
|
||||
else:
|
||||
cached_repo_path = (
|
||||
Path(session.config["agent.vcs_cache.path"]).expanduser()
|
||||
/ "{}.{}".format(clone_folder_name, md5(ensure_binary(repo_url)).hexdigest())
|
||||
/ clone_folder_name
|
||||
) # type: Path
|
||||
vcs_cache_path = Path(session.config["agent.vcs_cache.path"]).expanduser()
|
||||
repo_hash = md5(ensure_binary(repo_url)).hexdigest()
|
||||
# create lock
|
||||
repo_lock = FileLock(filename=(vcs_cache_path / '{}.lock'.format(repo_hash)).as_posix())
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
repo_lock.acquire(timeout=repo_lock_timeout_sec)
|
||||
except BaseException:
|
||||
print('Could not lock cache folder "{}" (timeout {} sec), using temp vcs cache.'.format(
|
||||
clone_folder_name, repo_lock_timeout_sec))
|
||||
repo_hash = '{}_{}'.format(repo_hash, str(random()).replace('.', ''))
|
||||
# use mock lock for the context
|
||||
repo_lock = Lock()
|
||||
# select vcs cache folder
|
||||
cached_repo_path = vcs_cache_path / "{}.{}".format(clone_folder_name, repo_hash) / clone_folder_name
|
||||
|
||||
vcs = VcsFactory.create(
|
||||
session, execution_info=execution, location=cached_repo_path
|
||||
)
|
||||
if not find_executable(vcs.executable_name):
|
||||
raise CommandFailedError(vcs.executable_not_found_error_help())
|
||||
with repo_lock:
|
||||
vcs = VcsFactory.create(
|
||||
session, execution_info=execution, location=cached_repo_path
|
||||
)
|
||||
if not find_executable(vcs.executable_name):
|
||||
raise CommandFailedError(vcs.executable_not_found_error_help())
|
||||
|
||||
if not standalone_mode:
|
||||
if session.config["agent.vcs_cache.enabled"] and cached_repo_path.exists():
|
||||
print('Using cached repository in "{}"'.format(cached_repo_path))
|
||||
if not standalone_mode:
|
||||
if session.config["agent.vcs_cache.enabled"] and cached_repo_path.exists():
|
||||
print('Using cached repository in "{}"'.format(cached_repo_path))
|
||||
|
||||
else:
|
||||
print("cloning: {}".format(no_password_url))
|
||||
rm_tree(cached_repo_path)
|
||||
# We clone the entire repository, not a specific branch
|
||||
vcs.clone() # branch=execution.branch)
|
||||
else:
|
||||
print("cloning: {}".format(no_password_url))
|
||||
rm_tree(cached_repo_path)
|
||||
# We clone the entire repository, not a specific branch
|
||||
vcs.clone() # branch=execution.branch)
|
||||
|
||||
vcs.pull()
|
||||
rm_tree(destination)
|
||||
shutil.copytree(Text(cached_repo_path), Text(clone_folder))
|
||||
if not clone_folder.is_dir():
|
||||
raise CommandFailedError(
|
||||
"copying of repository failed: from {} to {}".format(
|
||||
cached_repo_path, clone_folder
|
||||
vcs.pull()
|
||||
rm_tree(destination)
|
||||
shutil.copytree(Text(cached_repo_path), Text(clone_folder))
|
||||
if not clone_folder.is_dir():
|
||||
raise CommandFailedError(
|
||||
"copying of repository failed: from {} to {}".format(
|
||||
cached_repo_path, clone_folder
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
# checkout in the newly copy destination
|
||||
vcs.location = Text(clone_folder)
|
||||
@@ -638,3 +655,70 @@ def clone_repository_cached(session, execution, destination):
|
||||
repo_info = attr.evolve(repo_info, url=no_password_url)
|
||||
|
||||
return vcs, repo_info
|
||||
|
||||
|
||||
def fix_package_import_diff_patch(entry_script_file):
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
with open(entry_script_file, 'rt') as f:
|
||||
lines = f.readlines()
|
||||
except Exception:
|
||||
return
|
||||
# make sre we are the first import (i.e. we patched the source code)
|
||||
if not lines or not lines[0].strip().startswith('from clearml ') or 'Task.init' not in lines[1]:
|
||||
return
|
||||
|
||||
original_lines = lines
|
||||
# skip over the first two lines, they are ours
|
||||
# then skip over empty or comment lines
|
||||
lines = [(i, line.split('#', 1)[0].rstrip()) for i, line in enumerate(lines)
|
||||
if i >= 2 and line.strip('\r\n\t ') and not line.strip().startswith('#')]
|
||||
|
||||
# remove triple quotes ' """ '
|
||||
nested_c = -1
|
||||
skip_lines = []
|
||||
for i, line_pair in enumerate(lines):
|
||||
for _ in line_pair[1].split('"""')[1:]:
|
||||
if nested_c >= 0:
|
||||
skip_lines.extend(list(range(nested_c, i+1)))
|
||||
nested_c = -1
|
||||
else:
|
||||
nested_c = i
|
||||
# now select all the
|
||||
lines = [pair for i, pair in enumerate(lines) if i not in skip_lines]
|
||||
|
||||
from_future = re.compile(r"^from[\s]*__future__[\s]*")
|
||||
import_future = re.compile(r"^import[\s]*__future__[\s]*")
|
||||
# test if we have __future__ import
|
||||
found_index = -1
|
||||
for a_i, (_, a_line) in enumerate(lines):
|
||||
if found_index >= a_i:
|
||||
continue
|
||||
if from_future.match(a_line) or import_future.match(a_line):
|
||||
found_index = a_i
|
||||
# check the last import block
|
||||
i, line = lines[found_index]
|
||||
# wither we have \\ character at the end of the line or the line is indented
|
||||
parenthesized_lines = '(' in line and ')' not in line
|
||||
while line.endswith('\\') or parenthesized_lines:
|
||||
found_index += 1
|
||||
i, line = lines[found_index]
|
||||
if ')' in line:
|
||||
break
|
||||
|
||||
else:
|
||||
break
|
||||
|
||||
# no imports found
|
||||
if found_index < 0:
|
||||
return
|
||||
|
||||
# now we need to move back the patched two lines
|
||||
entry_line = lines[found_index][0]
|
||||
new_lines = original_lines[2:entry_line + 1] + original_lines[0:2] + original_lines[entry_line + 1:]
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
with open(entry_script_file, 'wt') as f:
|
||||
f.writelines(new_lines)
|
||||
except Exception:
|
||||
return
|
||||
|
||||
@@ -129,8 +129,9 @@ def get_uptime_string(entry):
|
||||
|
||||
|
||||
def get_runtime_properties_string(runtime_properties):
|
||||
# type: (List[dict]) -> Tuple[Optional[str], str]
|
||||
# type: (Optional[List[dict]]) -> Tuple[Optional[str], str]
|
||||
server_string = []
|
||||
runtime_properties = runtime_properties or []
|
||||
force_flag = next(
|
||||
(prop for prop in runtime_properties if prop["key"] == UptimeConf.worker_key),
|
||||
None,
|
||||
|
||||
@@ -7,7 +7,7 @@ from tempfile import gettempdir, NamedTemporaryFile
|
||||
from typing import List, Tuple, Optional
|
||||
|
||||
from clearml_agent.definitions import ENV_DOCKER_HOST_MOUNT
|
||||
from clearml_agent.helper.base import warning
|
||||
from clearml_agent.helper.base import warning, is_windows_platform, safe_remove_file
|
||||
|
||||
|
||||
class Singleton(object):
|
||||
@@ -22,6 +22,13 @@ class Singleton(object):
|
||||
_lock_timeout = 10
|
||||
_pid = None
|
||||
|
||||
@classmethod
|
||||
def close_pid_file(cls):
|
||||
if cls._pid_file:
|
||||
cls._pid_file.close()
|
||||
safe_remove_file(cls._pid_file.name)
|
||||
cls._pid_file = None
|
||||
|
||||
@classmethod
|
||||
def update_pid_file(cls):
|
||||
new_pid = str(os.getpid())
|
||||
@@ -115,7 +122,7 @@ class Singleton(object):
|
||||
|
||||
@classmethod
|
||||
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
|
||||
if cls.worker_id:
|
||||
if cls.worker_id and cls.instance_slot is not None:
|
||||
return cls.worker_id, cls.instance_slot
|
||||
# make sure we have a unique name
|
||||
instance_num = 0
|
||||
@@ -167,7 +174,9 @@ class Singleton(object):
|
||||
# create lock
|
||||
cls._pid = str(os.getpid())
|
||||
cls._pid_file = NamedTemporaryFile(
|
||||
dir=cls._get_temp_folder(), prefix=cls.prefix + cls.sep + cls._pid + cls.sep, suffix=cls.ext)
|
||||
dir=cls._get_temp_folder(), prefix=cls.prefix + cls.sep + cls._pid + cls.sep, suffix=cls.ext,
|
||||
delete=False if is_windows_platform() else True
|
||||
)
|
||||
cls._pid_file.write(('{}\n{}'.format(unique_worker_id, cls.instance_slot)).encode())
|
||||
cls._pid_file.flush()
|
||||
cls.worker_id = unique_worker_id
|
||||
|
||||
@@ -78,7 +78,10 @@ DAEMON_ARGS = dict({
|
||||
},
|
||||
'--services-mode': {
|
||||
'help': 'Launch multiple long-term docker services. Implies docker & cpu-only flags.',
|
||||
'action': 'store_true',
|
||||
'nargs': '?',
|
||||
'const': -1,
|
||||
'type': int,
|
||||
'default': None,
|
||||
},
|
||||
'--create-queue': {
|
||||
'help': 'Create requested queue if it does not exist already.',
|
||||
@@ -93,6 +96,12 @@ DAEMON_ARGS = dict({
|
||||
'help': 'Stop the running agent (based on the same set of arguments)',
|
||||
'action': 'store_true',
|
||||
},
|
||||
'--dynamic-gpus': {
|
||||
'help': 'Allow to dynamically allocate gpus based on queue properties, '
|
||||
'configure with \'--queues <queue_name>=<num_gpus>\'.'
|
||||
' Example: \'--dynamic-gpus --queue dual_gpus=2 single_gpu=1\'',
|
||||
'action': 'store_true',
|
||||
},
|
||||
'--uptime': {
|
||||
'help': 'Specify uptime for clearml-agent in "<hours> <days>" format. for example, use "17-20 TUE" to set '
|
||||
'Tuesday\'s uptime to 17-20'
|
||||
|
||||
@@ -204,7 +204,7 @@ class Session(_Session):
|
||||
folder_keys = ('agent.venvs_dir', 'agent.vcs_cache.path',
|
||||
'agent.pip_download_cache.path',
|
||||
'agent.docker_pip_cache', 'agent.docker_apt_cache')
|
||||
singleton_folders = ('agent.venvs_dir', 'agent.vcs_cache.path', 'agent.docker_apt_cache')
|
||||
singleton_folders = ('agent.venvs_dir', 'agent.docker_apt_cache')
|
||||
|
||||
if ENV_TASK_EXECUTE_AS_USER.get():
|
||||
folder_keys = tuple(list(folder_keys) + ['sdk.storage.cache.default_base_dir'])
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = '0.17.1'
|
||||
__version__ = '0.17.2'
|
||||
|
||||
@@ -24,7 +24,9 @@ agent {
|
||||
# Force GIT protocol to use SSH regardless of the git url (Assumes GIT user/pass are blank)
|
||||
force_git_ssh_protocol: false
|
||||
# Force a specific SSH port when converting http to ssh links (the domain is kept the same)
|
||||
# force_git_ssh_port: ""
|
||||
# force_git_ssh_port: 0
|
||||
# Force a specific SSH username when converting http to ssh links (the default username is 'git')
|
||||
# force_git_ssh_user: git
|
||||
|
||||
# unique name of this worker, if None, created based on hostname:process_id
|
||||
# Overridden with os environment: CLEARML_WORKER_NAME
|
||||
@@ -89,6 +91,16 @@ agent {
|
||||
# target folder for virtual environments builds, created when executing experiment
|
||||
venvs_dir = ~/.clearml/venvs-builds
|
||||
|
||||
# cached virtual environment folder
|
||||
venvs_cache: {
|
||||
# maximum number of cached venvs
|
||||
max_entries: 10
|
||||
# minimum required free space to allow for cache entry, disable by passing 0 or negative value
|
||||
free_space_threshold_gb: 2.0
|
||||
# unmark to enable virtual environment caching
|
||||
# path: ~/.clearml/venvs-cache
|
||||
},
|
||||
|
||||
# cached git clone folder
|
||||
vcs_cache: {
|
||||
enabled: true,
|
||||
@@ -129,12 +141,15 @@ agent {
|
||||
|
||||
default_docker: {
|
||||
# default docker image to use when running in docker mode
|
||||
image: "nvidia/cuda:10.1-runtime-ubuntu18.04"
|
||||
image: "nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04"
|
||||
|
||||
# optional arguments to pass to docker image
|
||||
# arguments: ["--ipc=host"]
|
||||
}
|
||||
|
||||
# set the OS environments based on the Task's Environment section before launching the Task process.
|
||||
enable_task_env: false
|
||||
|
||||
# CUDA versions used for Conda setup & solving PyTorch wheel packages
|
||||
# it Should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
|
||||
# cuda_version: 10.1
|
||||
|
||||
@@ -24,7 +24,13 @@ def parse_args():
|
||||
parser.add_argument(
|
||||
"--base-port", type=int,
|
||||
help="Used in conjunction with ports-mode, specifies the base port exposed by the services. "
|
||||
"For pod #X, the port will be <base-port>+X"
|
||||
"For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num"
|
||||
"e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--base-pod-num", type=int, default=1,
|
||||
help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the "
|
||||
"service (default: %(default)s)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--gateway-address", type=str, default=None,
|
||||
@@ -67,9 +73,9 @@ def main():
|
||||
user_props_cb = k8s_user_props_cb
|
||||
|
||||
k8s = K8sIntegration(
|
||||
ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb,
|
||||
overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf, template_yaml=args.template_yaml,
|
||||
extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
|
||||
ports_mode=args.ports_mode, num_of_services=args.num_of_services, base_pod_num=args.base_pod_num,
|
||||
user_props_cb=user_props_cb, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf,
|
||||
template_yaml=args.template_yaml, extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
|
||||
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None,
|
||||
namespace=args.namespace,
|
||||
)
|
||||
|
||||
@@ -2,7 +2,6 @@ attrs>=18.0,<20.4.0
|
||||
enum34>=0.9,<1.2.0 ; python_version < '3.6'
|
||||
furl>=2.0.0,<2.2.0
|
||||
future>=0.16.0,<0.19.0
|
||||
humanfriendly>=2.1,<9.2
|
||||
jsonschema>=2.6.0,<3.3.0
|
||||
pathlib2>=2.3.0,<2.4.0
|
||||
psutil>=3.4.2,<5.9.0
|
||||
@@ -11,10 +10,8 @@ pyparsing>=2.0.3,<2.5.0
|
||||
python-dateutil>=2.4.2,<2.9.0
|
||||
pyjwt>=1.6.4,<1.8.0
|
||||
PyYAML>=3.12,<5.4.0
|
||||
requests-file>=1.4.2,<1.6.0
|
||||
requests>=2.20.0,<2.26.0
|
||||
six>=1.11.0,<1.16.0
|
||||
tqdm>=4.19.5,<4.55.0
|
||||
typing>=3.6.4,<3.8.0
|
||||
urllib3>=1.21.1,<1.27.0
|
||||
virtualenv>=16,<20
|
||||
|
||||
Reference in New Issue
Block a user