Add venv caching with docker mode support

This commit is contained in:
allegroai 2021-02-11 14:44:19 +02:00
parent fa0d5d8469
commit b95d3f5300
8 changed files with 811 additions and 26 deletions

View File

@ -77,6 +77,15 @@
# target folder for virtual environments builds, created when executing experiment
venvs_dir = ~/.clearml/venvs-builds
# cached git clone folder
venvs_cache: {
# unmark to enable venv caching
# path: ~/.clearml/venvs-cache
max_entries: 10
# minimum required free space to allow for cache entry, disable by passing 0 or negative value
free_space_threshold_gb: 2.0
},
# cached git clone folder
vcs_cache: {
enabled: true,

View File

@ -1251,10 +1251,11 @@ class Worker(ServiceCommandSection):
except:
python_version = None
venv_folder, requirements_manager = self.install_virtualenv(
venv_dir=target, requested_python_version=python_version, execution_info=execution)
venv_folder, requirements_manager, is_cached = self.install_virtualenv(
venv_dir=target, requested_python_version=python_version, execution_info=execution,
cached_requirements=requirements)
if self._default_pip:
if not is_cached and self._default_pip:
if install_globally and self.global_package_api:
self.global_package_api.install_packages(*self._default_pip)
else:
@ -1262,14 +1263,15 @@ class Worker(ServiceCommandSection):
directory, vcs, repo_info = self.get_repo_info(execution, current_task, venv_folder.as_posix())
self.install_requirements(
execution,
repo_info,
requirements_manager=requirements_manager,
cached_requirements=requirements,
cwd=vcs.location if vcs and vcs.location else directory,
package_api=self.global_package_api if install_globally else None,
)
if not is_cached:
self.install_requirements(
execution,
repo_info,
requirements_manager=requirements_manager,
cached_requirements=requirements,
cwd=vcs.location if vcs and vcs.location else directory,
package_api=self.global_package_api if install_globally else None,
)
freeze = self.freeze_task_environment(requirements_manager=requirements_manager)
script_dir = directory
@ -1482,10 +1484,11 @@ class Worker(ServiceCommandSection):
except:
python_ver = None
venv_folder, requirements_manager = self.install_virtualenv(
standalone_mode=standalone_mode, requested_python_version=python_ver, execution_info=execution)
venv_folder, requirements_manager, is_cached = self.install_virtualenv(
standalone_mode=standalone_mode, requested_python_version=python_ver,
execution_info=execution, cached_requirements=requirements)
if not standalone_mode:
if not is_cached and not standalone_mode:
if self._default_pip:
self.package_api.install_packages(*self._default_pip)
@ -1497,7 +1500,7 @@ class Worker(ServiceCommandSection):
print("\n")
if not standalone_mode:
if not is_cached and not standalone_mode:
self.install_requirements(
execution,
repo_info,
@ -1511,8 +1514,12 @@ class Worker(ServiceCommandSection):
skip_freeze_update = self.is_conda and not self._session.config.get(
"agent.package_manager.conda_full_env_update", False)
freeze = self.freeze_task_environment(current_task.id if not skip_freeze_update else None,
requirements_manager=requirements_manager)
freeze = self.freeze_task_environment(
current_task.id if not skip_freeze_update else None,
requirements_manager=requirements_manager,
add_venv_folder_cache=venv_folder,
execution_info=execution,
)
script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix()
# run code
@ -1806,7 +1813,8 @@ class Worker(ServiceCommandSection):
status_message=self._task_status_change_message,
)
def freeze_task_environment(self, task_id=None, requirements_manager=None):
def freeze_task_environment(self, task_id=None, requirements_manager=None,
add_venv_folder_cache=None, execution_info=None):
try:
freeze = self.package_api.freeze()
except Exception as e:
@ -1821,13 +1829,31 @@ class Worker(ServiceCommandSection):
return freeze
# get original requirements and update with the new frozen requirements
previous_reqs = {}
# noinspection PyBroadException
try:
current_task = get_task(self._session, task_id, only_fields=["script.requirements"])
requirements = current_task.script.requirements
previous_reqs = dict(**requirements)
# replace only once.
if requirements.get('pip') and not requirements.get('org_pip'):
requirements['org_pip'] = requirements.pop('pip')
if requirements.get('conda') and not requirements.get('org_conda'):
requirements['org_conda'] = requirements.pop('conda')
requirements.update(freeze)
except Exception:
requirements = freeze
# add to cache
print('Adding venv into cache: {}'.format(add_venv_folder_cache))
if add_venv_folder_cache:
self.package_api.add_cached_venv(
requirements=[freeze, previous_reqs],
docker_cmd=execution_info.docker_cmd if execution_info else None,
python_version=getattr(self.package_api, 'python', ''),
source_folder=add_venv_folder_cache,
exclude_sub_folders=['task_repository', 'code'])
request = tasks_api.SetRequirementsRequest(task=task_id, requirements=requirements)
try:
self._session.send_api(request)
@ -2064,11 +2090,12 @@ class Worker(ServiceCommandSection):
)
def install_virtualenv(
self, venv_dir=None, requested_python_version=None, standalone_mode=False, execution_info=None):
# type: (str, str, bool, ExecutionInfo) -> Tuple[Path, RequirementsManager]
self, venv_dir=None, requested_python_version=None, standalone_mode=False,
execution_info=None, cached_requirements=None):
# type: (str, str, bool, ExecutionInfo, dict) -> Tuple[Path, RequirementsManager, bool]
"""
Install a new python virtual environment, removing the old one if exists
:return: virtualenv directory and requirements manager to use with task
:return: virtualenv directory, requirements manager to use with task, True if there is a cached venv entry
"""
requested_python_version = requested_python_version or \
Text(self._session.config.get("agent.python_binary", None)) or \
@ -2120,6 +2147,8 @@ class Worker(ServiceCommandSection):
session=self._session,
)
call_package_manager_create = False
if not self.is_conda and standalone_mode:
# pip with standalone mode
get_pip = partial(VirtualenvPip, **package_manager_params)
@ -2135,7 +2164,7 @@ class Worker(ServiceCommandSection):
self.package_api = VirtualenvPip(**package_manager_params)
if first_time:
self.package_api.remove()
self.package_api.create()
call_package_manager_create = True
self.global_package_api = SystemPip(**global_package_manager_params)
elif standalone_mode:
# conda with standalone mode
@ -2147,6 +2176,7 @@ class Worker(ServiceCommandSection):
self.package_api = get_conda()
# no support for reusing Conda environments
self.package_api.remove()
call_package_manager_create = True
if venv_dir.exists():
timestamp = datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S")
@ -2161,9 +2191,21 @@ class Worker(ServiceCommandSection):
venv_dir = new_venv_folder
self.package_api = get_conda(path=venv_dir)
# check if we have a cached folder
if cached_requirements and self.package_api.get_cached_venv(
requirements=cached_requirements,
docker_cmd=execution_info.docker_cmd if execution_info else None,
python_version=package_manager_params['python'],
destination_folder=Path(venv_dir)
):
print('::: Using Cached environment {} :::'.format(self.package_api.get_last_used_entry_cache()))
return venv_dir, requirements_manager, True
# create the initial venv
if call_package_manager_create:
self.package_api.create()
return venv_dir, requirements_manager
return venv_dir, requirements_manager, False
def parse_requirements(self, reqs_file=None, overrides=None):
os = None
@ -2221,6 +2263,9 @@ class Worker(ServiceCommandSection):
temp_config.put("agent.git_pass", (ENV_AGENT_GIT_PASS.get() or
self._session.config.get("agent.git_pass", None)))
if temp_config.get("agent.venvs_cache.path"):
temp_config.put("agent.venvs_cache.path", '/root/.clearml/venvs-cache')
self._host_ssh_cache = mkdtemp(prefix='clearml_agent.ssh.')
self._temp_cleanup_list.append(self._host_ssh_cache)
@ -2233,6 +2278,9 @@ class Worker(ServiceCommandSection):
self._session.config["agent.pip_download_cache.path"])).expanduser().as_posix()
host_vcs_cache = Path(os.path.expandvars(
self._session.config["agent.vcs_cache.path"])).expanduser().as_posix()
host_venvs_cache = Path(os.path.expandvars(
self._session.config["agent.venvs_cache.path"])).expanduser().as_posix() \
if self._session.config.get("agent.venvs_cache.path") else None
host_ssh_cache = self._host_ssh_cache
host_apt_cache = Path(os.path.expandvars(self._session.config.get(
@ -2247,6 +2295,8 @@ class Worker(ServiceCommandSection):
Path(host_pip_dl).mkdir(parents=True, exist_ok=True)
Path(host_vcs_cache).mkdir(parents=True, exist_ok=True)
Path(host_ssh_cache).mkdir(parents=True, exist_ok=True)
if host_venvs_cache:
Path(host_venvs_cache).mkdir(parents=True, exist_ok=True)
# copy the .ssh folder to a temp folder, to be mapped into docker
# noinspection PyBroadException
@ -2283,6 +2333,7 @@ class Worker(ServiceCommandSection):
mounted_cache_dir = temp_config.get("sdk.storage.cache.default_base_dir")
mounted_pip_dl_dir = temp_config.get("agent.pip_download_cache.path")
mounted_vcs_cache = temp_config.get("agent.vcs_cache.path")
mounted_venvs_cache = temp_config.get("agent.venvs_cache.path")
# Make sure we have created the configuration file for the executor
if not self.dump_config(self.temp_config_path, config=temp_config):
@ -2302,6 +2353,7 @@ class Worker(ServiceCommandSection):
host_cache=host_cache, mounted_cache=mounted_cache_dir,
host_pip_dl=host_pip_dl, mounted_pip_dl=mounted_pip_dl_dir,
host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache,
host_venvs_cache=host_venvs_cache, mounted_venvs_cache=mounted_venvs_cache,
standalone_mode=self._standalone_mode,
force_current_version=self._force_current_version,
bash_script=bash_script,
@ -2320,6 +2372,7 @@ class Worker(ServiceCommandSection):
host_cache, mounted_cache,
host_pip_dl, mounted_pip_dl,
host_vcs_cache, mounted_vcs_cache,
host_venvs_cache, mounted_venvs_cache,
standalone_mode=False, extra_docker_arguments=None, extra_shell_script=None,
force_current_version=None, host_git_credentials=None,
bash_script=None, preprocess_bash_script=None):
@ -2375,14 +2428,16 @@ class Worker(ServiceCommandSection):
# expect CLEARML_AGENT_K8S_HOST_MOUNT = '/mnt/host/data:/root/.clearml'
k8s_node_mnt, _, k8s_pod_mnt = ENV_DOCKER_HOST_MOUNT.get().partition(':')
# search and replace all the host folders with the k8s
host_mounts = [host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache]
host_mounts = [host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache, host_venvs_cache]
for i, m in enumerate(host_mounts):
if not m:
continue
if k8s_pod_mnt not in m:
print('Warning: K8S mount missing, ignoring cached folder {}'.format(m))
host_mounts[i] = None
else:
host_mounts[i] = m.replace(k8s_pod_mnt, k8s_node_mnt, 1)
host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache = host_mounts
host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache, host_venvs_cache = host_mounts
# copy the configuration file into the mounted folder
new_conf_file = os.path.join(k8s_pod_mnt, '.clearml_agent.{}.cfg'.format(quote(worker_id, safe="")))
@ -2472,6 +2527,7 @@ class Worker(ServiceCommandSection):
(['-v', host_pip_dl+':'+mounted_pip_dl] if host_pip_dl else []) +
(['-v', host_cache+':'+mounted_cache] if host_cache else []) +
(['-v', host_vcs_cache+':'+mounted_vcs_cache] if host_vcs_cache else []) +
(['-v', host_venvs_cache + ':' + mounted_venvs_cache] if host_venvs_cache else []) +
['--rm', docker_image, 'bash', '-c',
update_scheme +
extra_shell_script +

View File

@ -0,0 +1,215 @@
import os
import shutil
from logging import warning
from random import random
from time import time
from typing import List, Optional, Sequence
import psutil
from pathlib2 import Path
from .locks import FileLock
class FolderCache(object):
_lock_filename = '.clearml.lock'
_lock_timeout_seconds = 30
_temp_entry_prefix = '_temp.'
def __init__(self, cache_folder, max_cache_entries=5, min_free_space_gb=None):
self._cache_folder = Path(os.path.expandvars(cache_folder)).expanduser().absolute()
self._cache_folder.mkdir(parents=True, exist_ok=True)
self._max_cache_entries = max_cache_entries
self._last_copied_entry_folder = None
self._min_free_space_gb = min_free_space_gb if min_free_space_gb and min_free_space_gb > 0 else None
self._lock = FileLock((self._cache_folder / self._lock_filename).as_posix())
def get_cache_folder(self):
# type: () -> Path
"""
:return: Return the base cache folder
"""
return self._cache_folder
def copy_cached_entry(self, keys, destination):
# type: (List[str], Path) -> Optional[Path]
"""
Copy a cached entry into a destination directory, if the cached entry does not exist return None
:param keys:
:param destination:
:return: Target path, None if cached entry does not exist
"""
self._last_copied_entry_folder = None
if not keys:
return None
# lock so we make sure no one deletes it before we copy it
# noinspection PyBroadException
try:
self._lock.acquire(timeout=self._lock_timeout_seconds)
except BaseException as ex:
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
return None
src = None
try:
src = self.get_entry(keys)
if src:
destination = Path(destination).absolute()
destination.mkdir(parents=True, exist_ok=True)
shutil.rmtree(destination.as_posix())
shutil.copytree(src.as_posix(), dst=destination.as_posix(), symlinks=True)
except BaseException as ex:
warning('Could not copy cache folder {} to {}: {}'.format(src, destination, ex))
self._lock.release()
return None
# release Lock
self._lock.release()
self._last_copied_entry_folder = src
return destination if src else None
def get_entry(self, keys):
# type: (List[str]) -> Optional[Path]
"""
Return a folder (a sub-folder of inside the cache_folder) matching one of the keys
:param keys: List of keys, return the first match to one of the keys, notice keys cannot contain '.'
:return: Path to the sub-folder or None if none was found
"""
if not keys:
return None
# conform keys
keys = [keys] if isinstance(keys, str) else keys
keys = sorted([k.replace('.', '_') for k in keys])
for cache_folder in self._cache_folder.glob('*'):
if cache_folder.is_dir() and any(True for k in cache_folder.name.split('.') if k in keys):
cache_folder.touch()
return cache_folder
return None
def add_entry(self, keys, source_folder, exclude_sub_folders=None):
# type: (List[str], Path, Optional[Sequence[str]]) -> bool
"""
Add a local folder into the cache, copy all sub-folders inside `source_folder`
excluding folders matching `exclude_sub_folders` list
:param keys: Cache entry keys list (str)
:param source_folder: Folder to copy into the cache
:param exclude_sub_folders: List of sub-folders to exclude from the copy operation
:return: return True is new entry was added to cache
"""
if not keys:
return False
keys = [keys] if isinstance(keys, str) else keys
keys = sorted([k.replace('.', '_') for k in keys])
# If entry already exists skip it
cached_entry = self.get_entry(keys)
if cached_entry:
# make sure the entry contains all keys
cached_keys = cached_entry.name.split('.')
if set(keys) - set(cached_keys):
# noinspection PyBroadException
try:
self._lock.acquire(timeout=self._lock_timeout_seconds)
except BaseException as ex:
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
# failed locking do nothing
return True
keys = sorted(list(set(keys) | set(cached_keys)))
dst = cached_entry.parent / '.'.join(keys)
# rename
try:
shutil.move(src=cached_entry.as_posix(), dst=dst.as_posix())
except BaseException as ex:
warning('Could not rename cache entry {} to {}: ex'.format(
cached_entry.as_posix(), dst.as_posix(), ex))
# release lock
self._lock.release()
return True
# make sure we remove old entries
self._remove_old_entries()
# if we do not have enough free space, do nothing.
if not self._check_min_free_space():
warning('Could not add cache entry, not enough free space on drive, '
'free space threshold {} GB'.format(self._min_free_space_gb))
return False
# create the new entry for us
exclude_sub_folders = exclude_sub_folders or []
source_folder = Path(source_folder).absolute()
# create temp folder
temp_folder = \
self._temp_entry_prefix + \
'{}.{}'.format(str(time()).replace('.', '_'), str(random()).replace('.', '_'))
temp_folder = self._cache_folder / temp_folder
temp_folder.mkdir(parents=True, exist_ok=False)
for f in source_folder.glob('*'):
if f.name in exclude_sub_folders:
continue
shutil.copytree(src=f.as_posix(), dst=(temp_folder / f.name).as_posix(), symlinks=True)
# rename the target folder
target_cache_folder = self._cache_folder / '.'.join(keys)
# if we failed moving it means someone else created the cached entry before us, we can just leave
# noinspection PyBroadException
try:
shutil.move(src=temp_folder.as_posix(), dst=target_cache_folder.as_posix())
except BaseException:
# noinspection PyBroadException
try:
shutil.rmtree(path=temp_folder.as_posix())
except BaseException:
return False
return True
def get_last_copied_entry(self):
# type: () -> Optional[Path]
"""
:return: the last copied cached entry folder inside the cache
"""
return self._last_copied_entry_folder
def _remove_old_entries(self):
# type: () -> ()
"""
Notice we only keep self._max_cache_entries-1, assuming we will be adding a new entry soon
"""
folder_entries = [(cache_folder, cache_folder.stat().st_mtime)
for cache_folder in self._cache_folder.glob('*')
if cache_folder.is_dir() and not cache_folder.name.startswith(self._temp_entry_prefix)]
folder_entries = sorted(folder_entries, key=lambda x: x[1], reverse=True)
# lock so we make sure no one deletes it before we copy it
# noinspection PyBroadException
try:
self._lock.acquire(timeout=self._lock_timeout_seconds)
except BaseException as ex:
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
return
number_of_entries_to_keep = self._max_cache_entries-1
for folder, ts in folder_entries[number_of_entries_to_keep:]:
try:
shutil.rmtree(folder.as_posix(), ignore_errors=True)
except BaseException as ex:
warning('Could not delete cache entry {}: {}'.format(folder.as_posix(), ex))
self._lock.release()
def _check_min_free_space(self):
# type: () -> bool
"""
:return: return False if we hit the free space limit.
If not free space limit provided, always return True
"""
if not self._min_free_space_gb or not self._cache_folder:
return True
free_space = float(psutil.disk_usage(self._cache_folder.as_posix()).free)
free_space /= 2**30
return free_space > self._min_free_space_gb

View File

@ -0,0 +1,211 @@
import os
import time
import tempfile
import contextlib
from .portalocker import constants, exceptions, lock, unlock
current_time = getattr(time, "monotonic", time.time)
DEFAULT_TIMEOUT = 10 ** 8
DEFAULT_CHECK_INTERVAL = 0.25
LOCK_METHOD = constants.LOCK_EX | constants.LOCK_NB
__all__ = [
'FileLock',
'open_atomic',
]
@contextlib.contextmanager
def open_atomic(filename, binary=True):
"""Open a file for atomic writing. Instead of locking this method allows
you to write the entire file and move it to the actual location. Note that
this makes the assumption that a rename is atomic on your platform which
is generally the case but not a guarantee.
http://docs.python.org/library/os.html#os.rename
>>> filename = 'test_file.txt'
>>> if os.path.exists(filename):
... os.remove(filename)
>>> with open_atomic(filename) as fh:
... written = fh.write(b'test')
>>> assert os.path.exists(filename)
>>> os.remove(filename)
"""
assert not os.path.exists(filename), '%r exists' % filename
path, name = os.path.split(filename)
# Create the parent directory if it doesn't exist
if path and not os.path.isdir(path): # pragma: no cover
os.makedirs(path)
temp_fh = tempfile.NamedTemporaryFile(
mode=binary and 'wb' or 'w',
dir=path,
delete=False,
)
yield temp_fh
temp_fh.flush()
os.fsync(temp_fh.fileno())
temp_fh.close()
try:
os.rename(temp_fh.name, filename)
finally:
try:
os.remove(temp_fh.name)
except Exception: # noqa
pass
class FileLock(object):
def __init__(
self, filename, mode='a', timeout=DEFAULT_TIMEOUT,
check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=False,
flags=LOCK_METHOD, **file_open_kwargs):
"""Lock manager with build-in timeout
filename -- filename
mode -- the open mode, 'a' or 'ab' should be used for writing
truncate -- use truncate to emulate 'w' mode, None is disabled, 0 is
truncate to 0 bytes
timeout -- timeout when trying to acquire a lock
check_interval -- check interval while waiting
fail_when_locked -- after the initial lock failed, return an error
or lock the file
**file_open_kwargs -- The kwargs for the `open(...)` call
fail_when_locked is useful when multiple threads/processes can race
when creating a file. If set to true than the system will wait till
the lock was acquired and then return an AlreadyLocked exception.
Note that the file is opened first and locked later. So using 'w' as
mode will result in truncate _BEFORE_ the lock is checked.
"""
if 'w' in mode:
truncate = True
mode = mode.replace('w', 'a')
else:
truncate = False
self.fh = None
self.filename = filename
self.mode = mode
self.truncate = truncate
self.timeout = timeout
self.check_interval = check_interval
self.fail_when_locked = fail_when_locked
self.flags = flags
self.file_open_kwargs = file_open_kwargs
def acquire(
self, timeout=None, check_interval=None, fail_when_locked=None):
"""Acquire the locked filehandle"""
if timeout is None:
timeout = self.timeout
if timeout is None:
timeout = 0
if check_interval is None:
check_interval = self.check_interval
if fail_when_locked is None:
fail_when_locked = self.fail_when_locked
# If we already have a filehandle, return it
fh = self.fh
if fh:
return fh
# Get a new filehandler
fh = self._get_fh()
try:
# Try to lock
fh = self._get_lock(fh)
except exceptions.LockException as exception:
# Try till the timeout has passed
timeoutend = current_time() + timeout
while timeoutend > current_time():
# Wait a bit
time.sleep(check_interval)
# Try again
try:
# We already tried to the get the lock
# If fail_when_locked is true, then stop trying
if fail_when_locked:
raise exceptions.AlreadyLocked(exception)
else: # pragma: no cover
# We've got the lock
fh = self._get_lock(fh)
break
except exceptions.LockException:
pass
else:
# We got a timeout... reraising
raise exceptions.LockException(exception)
# Prepare the filehandle (truncate if needed)
fh = self._prepare_fh(fh)
self.fh = fh
return fh
def release(self):
"""Releases the currently locked file handle"""
if self.fh:
# noinspection PyBroadException
try:
unlock(self.fh)
except Exception:
pass
# noinspection PyBroadException
try:
self.fh.close()
except Exception:
pass
self.fh = None
def _get_fh(self):
"""Get a new filehandle"""
return open(self.filename, self.mode, **self.file_open_kwargs)
def _get_lock(self, fh):
"""
Try to lock the given filehandle
returns LockException if it fails"""
lock(fh, self.flags)
return fh
def _prepare_fh(self, fh):
"""
Prepare the filehandle for usage
If truncate is a number, the file will be truncated to that amount of
bytes
"""
if self.truncate:
fh.seek(0)
fh.truncate(0)
return fh
def __enter__(self):
return self.acquire()
def __exit__(self, type_, value, tb):
self.release()
def __delete__(self, instance): # pragma: no cover
instance.release()

View File

@ -0,0 +1,193 @@
import os
import sys
class exceptions:
class BaseLockException(Exception):
# Error codes:
LOCK_FAILED = 1
def __init__(self, *args, **kwargs):
self.fh = kwargs.pop('fh', None)
Exception.__init__(self, *args, **kwargs)
class LockException(BaseLockException):
pass
class AlreadyLocked(BaseLockException):
pass
class FileToLarge(BaseLockException):
pass
class constants:
# The actual tests will execute the code anyhow so the following code can
# safely be ignored from the coverage tests
if os.name == 'nt': # pragma: no cover
import msvcrt
LOCK_EX = 0x1 #: exclusive lock
LOCK_SH = 0x2 #: shared lock
LOCK_NB = 0x4 #: non-blocking
LOCK_UN = msvcrt.LK_UNLCK #: unlock
LOCKFILE_FAIL_IMMEDIATELY = 1
LOCKFILE_EXCLUSIVE_LOCK = 2
elif os.name == 'posix': # pragma: no cover
import fcntl
LOCK_EX = fcntl.LOCK_EX #: exclusive lock
LOCK_SH = fcntl.LOCK_SH #: shared lock
LOCK_NB = fcntl.LOCK_NB #: non-blocking
LOCK_UN = fcntl.LOCK_UN #: unlock
else: # pragma: no cover
raise RuntimeError('PortaLocker only defined for nt and posix platforms')
if os.name == 'nt': # pragma: no cover
import msvcrt
if sys.version_info.major == 2:
lock_length = -1
else:
lock_length = int(2**31 - 1)
def lock(file_, flags):
if flags & constants.LOCK_SH:
import win32file
import pywintypes
import winerror
__overlapped = pywintypes.OVERLAPPED()
if sys.version_info.major == 2:
if flags & constants.LOCK_NB:
mode = constants.LOCKFILE_FAIL_IMMEDIATELY
else:
mode = 0
else:
if flags & constants.LOCK_NB:
mode = msvcrt.LK_NBRLCK
else:
mode = msvcrt.LK_RLCK
# is there any reason not to reuse the following structure?
hfile = win32file._get_osfhandle(file_.fileno())
try:
win32file.LockFileEx(hfile, mode, 0, -0x10000, __overlapped)
except pywintypes.error as exc_value:
# error: (33, 'LockFileEx', 'The process cannot access the file
# because another process has locked a portion of the file.')
if exc_value.winerror == winerror.ERROR_LOCK_VIOLATION:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
else:
# Q: Are there exceptions/codes we should be dealing with
# here?
raise
else:
mode = constants.LOCKFILE_EXCLUSIVE_LOCK
if flags & constants.LOCK_NB:
mode |= constants.LOCKFILE_FAIL_IMMEDIATELY
if flags & constants.LOCK_NB:
mode = msvcrt.LK_NBLCK
else:
mode = msvcrt.LK_LOCK
# windows locks byte ranges, so make sure to lock from file start
try:
savepos = file_.tell()
if savepos:
# [ ] test exclusive lock fails on seek here
# [ ] test if shared lock passes this point
file_.seek(0)
# [x] check if 0 param locks entire file (not documented in
# Python)
# [x] fails with "IOError: [Errno 13] Permission denied",
# but -1 seems to do the trick
try:
msvcrt.locking(file_.fileno(), mode, lock_length)
except IOError as exc_value:
# [ ] be more specific here
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
finally:
if savepos:
file_.seek(savepos)
except IOError as exc_value:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
fh=file_)
def unlock(file_):
try:
savepos = file_.tell()
if savepos:
file_.seek(0)
try:
msvcrt.locking(file_.fileno(), constants.LOCK_UN, lock_length)
except IOError as exc_value:
if exc_value.strerror == 'Permission denied':
import pywintypes
import win32file
import winerror
__overlapped = pywintypes.OVERLAPPED()
hfile = win32file._get_osfhandle(file_.fileno())
try:
win32file.UnlockFileEx(
hfile, 0, -0x10000, __overlapped)
except pywintypes.error as exc_value:
if exc_value.winerror == winerror.ERROR_NOT_LOCKED:
# error: (158, 'UnlockFileEx',
# 'The segment is already unlocked.')
# To match the 'posix' implementation, silently
# ignore this error
pass
else:
# Q: Are there exceptions/codes we should be
# dealing with here?
raise
else:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
finally:
if savepos:
file_.seek(savepos)
except IOError as exc_value:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
fh=file_)
elif os.name == 'posix': # pragma: no cover
import fcntl
def lock(file_, flags):
locking_exceptions = IOError,
try: # pragma: no cover
locking_exceptions += BlockingIOError,
except NameError: # pragma: no cover
pass
try:
fcntl.flock(file_.fileno(), flags)
except locking_exceptions as exc_value:
# The exception code varies on different systems so we'll catch
# every IO error
raise exceptions.LockException(exc_value, fh=file_)
def unlock(file_):
fcntl.flock(file_.fileno(), constants.LOCK_UN)
else: # pragma: no cover
raise RuntimeError('PortaLocker only defined for nt and posix platforms')

View File

@ -1,11 +1,16 @@
from __future__ import unicode_literals
import abc
from collections import OrderedDict
from contextlib import contextmanager
from typing import Text, Iterable, Union
from typing import Text, Iterable, Union, Optional, Dict, List
from pathlib2 import Path
from hashlib import md5
import six
from clearml_agent.helper.base import mkstemp, safe_remove_file, join_lines, select_for_platform
from clearml_agent.helper.console import ensure_binary
from clearml_agent.helper.os.folder_cache import FolderCache
from clearml_agent.helper.process import Executable, Argv, PathLike
@ -18,6 +23,12 @@ class PackageManager(object):
_selected_manager = None
_cwd = None
_pip_version = None
_config_cache_folder = 'agent.venvs_cache.path'
_config_cache_max_entries = 'agent.venvs_cache.max_entries'
_config_cache_free_space_threshold = 'agent.venvs_cache.free_space_threshold_gb'
def __init__(self):
self._cache_manager = None
@abc.abstractproperty
def bin(self):
@ -153,3 +164,91 @@ class PackageManager(object):
@classmethod
def get_pip_version(cls):
return cls._pip_version or ''
def get_cached_venv(self, requirements, docker_cmd, python_version, destination_folder):
# type: (Dict, Optional[Union[dict, str]], Optional[str], Path) -> Optional[Path]
"""
Copy a cached copy of the venv (based on the requirements) into destination_folder.
Return None if failed or cached entry does not exist
"""
if not self._get_cache_manager():
return None
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version)
return self._get_cache_manager().copy_cached_entry(keys, destination_folder)
def add_cached_venv(self, requirements, docker_cmd, python_version, source_folder, exclude_sub_folders=None):
# type: (Union[Dict, List[Dict]], Optional[Union[dict, str]], Optional[str], Path, Optional[List[str]]) -> ()
"""
Copy the local venv folder into the venv cache (keys are based on the requirements+python+docker).
"""
if not self._get_cache_manager():
return
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version)
return self._get_cache_manager().add_entry(
keys=keys, source_folder=source_folder, exclude_sub_folders=exclude_sub_folders)
def get_cache_folder(self):
# type: () -> Optional[Path]
if not self._get_cache_manager():
return
return self._get_cache_manager().get_cache_folder()
def get_last_used_entry_cache(self):
# type: () -> Optional[Path]
"""
:return: the last used cached folder entry
"""
if not self._get_cache_manager():
return
return self._get_cache_manager().get_last_copied_entry()
@classmethod
def _generate_reqs_hash_keys(cls, requirements_list, docker_cmd, python_version):
# type: (Union[Dict, List[Dict]], Optional[Union[dict, str]], Optional[str]) -> List[str]
requirements_list = requirements_list or dict()
if not isinstance(requirements_list, (list, tuple)):
requirements_list = [requirements_list]
docker_cmd = dict(docker_cmd=docker_cmd) if isinstance(docker_cmd, str) else docker_cmd or dict()
docker_cmd = OrderedDict(sorted(docker_cmd.items(), key=lambda t: t[0]))
if 'docker_cmd' in docker_cmd:
# we only take the first part of the docker_cmd which is the docker image name
docker_cmd['docker_cmd'] = docker_cmd['docker_cmd'].strip('\r\n\t ').split(' ')[0]
keys = []
strip_chars = '\n\r\t '
for requirements in requirements_list:
pip, conda = ('pip', 'conda')
pip_reqs = requirements.get(pip, '')
conda_reqs = requirements.get(conda, '')
if isinstance(pip_reqs, str):
pip_reqs = pip_reqs.split('\n')
if isinstance(conda_reqs, str):
conda_reqs = conda_reqs.split('\n')
pip_reqs = sorted([p.strip(strip_chars) for p in pip_reqs
if p.strip(strip_chars) and not p.strip(strip_chars).startswith('#')])
conda_reqs = sorted([p.strip(strip_chars) for p in conda_reqs
if p.strip(strip_chars) and not p.strip(strip_chars).startswith('#')])
if not pip_reqs and not conda_reqs:
continue
hash_text = '{class_type}\n{docker_cmd}\n{python_version}\n{pip_reqs}\n{conda_reqs}'.format(
class_type=str(cls),
docker_cmd=str(docker_cmd or ''),
python_version=str(python_version or ''),
pip_reqs=str(pip_reqs or ''),
conda_reqs=str(conda_reqs or ''),
)
keys.append(md5(ensure_binary(hash_text)).hexdigest())
return sorted(list(set(keys)))
def _get_cache_manager(self):
if not self._cache_manager:
cache_folder = self.session.config.get(self._config_cache_folder, None)
if not cache_folder:
return None
max_entries = int(self.session.config.get(self._config_cache_max_entries, 10))
free_space_threshold = float(self.session.config.get(self._config_cache_free_space_threshold, 0))
self._cache_manager = FolderCache(
cache_folder, max_cache_entries=max_entries, min_free_space_gb=free_space_threshold)
return self._cache_manager

View File

@ -69,6 +69,7 @@ class CondaAPI(PackageManager):
:param python: base python version to use (e.g python3.6)
:param path: path of env
"""
super(CondaAPI, self).__init__()
self.session = session
self.python = python
self.source = None

View File

@ -17,6 +17,7 @@ class SystemPip(PackageManager):
"""
Program interface to the system pip.
"""
super(SystemPip, self).__init__()
self._bin = interpreter or sys.executable
self.session = session