diff --git a/clearml_agent/backend_api/config/default/agent.conf b/clearml_agent/backend_api/config/default/agent.conf index 9238e73..aa8863e 100644 --- a/clearml_agent/backend_api/config/default/agent.conf +++ b/clearml_agent/backend_api/config/default/agent.conf @@ -77,6 +77,15 @@ # target folder for virtual environments builds, created when executing experiment venvs_dir = ~/.clearml/venvs-builds + # cached git clone folder + venvs_cache: { + # unmark to enable venv caching + # path: ~/.clearml/venvs-cache + max_entries: 10 + # minimum required free space to allow for cache entry, disable by passing 0 or negative value + free_space_threshold_gb: 2.0 + }, + # cached git clone folder vcs_cache: { enabled: true, diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index 202d676..afd3354 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -1251,10 +1251,11 @@ class Worker(ServiceCommandSection): except: python_version = None - venv_folder, requirements_manager = self.install_virtualenv( - venv_dir=target, requested_python_version=python_version, execution_info=execution) + venv_folder, requirements_manager, is_cached = self.install_virtualenv( + venv_dir=target, requested_python_version=python_version, execution_info=execution, + cached_requirements=requirements) - if self._default_pip: + if not is_cached and self._default_pip: if install_globally and self.global_package_api: self.global_package_api.install_packages(*self._default_pip) else: @@ -1262,14 +1263,15 @@ class Worker(ServiceCommandSection): directory, vcs, repo_info = self.get_repo_info(execution, current_task, venv_folder.as_posix()) - self.install_requirements( - execution, - repo_info, - requirements_manager=requirements_manager, - cached_requirements=requirements, - cwd=vcs.location if vcs and vcs.location else directory, - package_api=self.global_package_api if install_globally else None, - ) + if not is_cached: + self.install_requirements( + execution, + repo_info, + requirements_manager=requirements_manager, + cached_requirements=requirements, + cwd=vcs.location if vcs and vcs.location else directory, + package_api=self.global_package_api if install_globally else None, + ) freeze = self.freeze_task_environment(requirements_manager=requirements_manager) script_dir = directory @@ -1482,10 +1484,11 @@ class Worker(ServiceCommandSection): except: python_ver = None - venv_folder, requirements_manager = self.install_virtualenv( - standalone_mode=standalone_mode, requested_python_version=python_ver, execution_info=execution) + venv_folder, requirements_manager, is_cached = self.install_virtualenv( + standalone_mode=standalone_mode, requested_python_version=python_ver, + execution_info=execution, cached_requirements=requirements) - if not standalone_mode: + if not is_cached and not standalone_mode: if self._default_pip: self.package_api.install_packages(*self._default_pip) @@ -1497,7 +1500,7 @@ class Worker(ServiceCommandSection): print("\n") - if not standalone_mode: + if not is_cached and not standalone_mode: self.install_requirements( execution, repo_info, @@ -1511,8 +1514,12 @@ class Worker(ServiceCommandSection): skip_freeze_update = self.is_conda and not self._session.config.get( "agent.package_manager.conda_full_env_update", False) - freeze = self.freeze_task_environment(current_task.id if not skip_freeze_update else None, - requirements_manager=requirements_manager) + freeze = self.freeze_task_environment( + current_task.id if not skip_freeze_update else None, + requirements_manager=requirements_manager, + add_venv_folder_cache=venv_folder, + execution_info=execution, + ) script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix() # run code @@ -1806,7 +1813,8 @@ class Worker(ServiceCommandSection): status_message=self._task_status_change_message, ) - def freeze_task_environment(self, task_id=None, requirements_manager=None): + def freeze_task_environment(self, task_id=None, requirements_manager=None, + add_venv_folder_cache=None, execution_info=None): try: freeze = self.package_api.freeze() except Exception as e: @@ -1821,13 +1829,31 @@ class Worker(ServiceCommandSection): return freeze # get original requirements and update with the new frozen requirements + previous_reqs = {} + # noinspection PyBroadException try: current_task = get_task(self._session, task_id, only_fields=["script.requirements"]) requirements = current_task.script.requirements + previous_reqs = dict(**requirements) + # replace only once. + if requirements.get('pip') and not requirements.get('org_pip'): + requirements['org_pip'] = requirements.pop('pip') + if requirements.get('conda') and not requirements.get('org_conda'): + requirements['org_conda'] = requirements.pop('conda') requirements.update(freeze) except Exception: requirements = freeze + # add to cache + print('Adding venv into cache: {}'.format(add_venv_folder_cache)) + if add_venv_folder_cache: + self.package_api.add_cached_venv( + requirements=[freeze, previous_reqs], + docker_cmd=execution_info.docker_cmd if execution_info else None, + python_version=getattr(self.package_api, 'python', ''), + source_folder=add_venv_folder_cache, + exclude_sub_folders=['task_repository', 'code']) + request = tasks_api.SetRequirementsRequest(task=task_id, requirements=requirements) try: self._session.send_api(request) @@ -2064,11 +2090,12 @@ class Worker(ServiceCommandSection): ) def install_virtualenv( - self, venv_dir=None, requested_python_version=None, standalone_mode=False, execution_info=None): - # type: (str, str, bool, ExecutionInfo) -> Tuple[Path, RequirementsManager] + self, venv_dir=None, requested_python_version=None, standalone_mode=False, + execution_info=None, cached_requirements=None): + # type: (str, str, bool, ExecutionInfo, dict) -> Tuple[Path, RequirementsManager, bool] """ Install a new python virtual environment, removing the old one if exists - :return: virtualenv directory and requirements manager to use with task + :return: virtualenv directory, requirements manager to use with task, True if there is a cached venv entry """ requested_python_version = requested_python_version or \ Text(self._session.config.get("agent.python_binary", None)) or \ @@ -2120,6 +2147,8 @@ class Worker(ServiceCommandSection): session=self._session, ) + call_package_manager_create = False + if not self.is_conda and standalone_mode: # pip with standalone mode get_pip = partial(VirtualenvPip, **package_manager_params) @@ -2135,7 +2164,7 @@ class Worker(ServiceCommandSection): self.package_api = VirtualenvPip(**package_manager_params) if first_time: self.package_api.remove() - self.package_api.create() + call_package_manager_create = True self.global_package_api = SystemPip(**global_package_manager_params) elif standalone_mode: # conda with standalone mode @@ -2147,6 +2176,7 @@ class Worker(ServiceCommandSection): self.package_api = get_conda() # no support for reusing Conda environments self.package_api.remove() + call_package_manager_create = True if venv_dir.exists(): timestamp = datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S") @@ -2161,9 +2191,21 @@ class Worker(ServiceCommandSection): venv_dir = new_venv_folder self.package_api = get_conda(path=venv_dir) + # check if we have a cached folder + if cached_requirements and self.package_api.get_cached_venv( + requirements=cached_requirements, + docker_cmd=execution_info.docker_cmd if execution_info else None, + python_version=package_manager_params['python'], + destination_folder=Path(venv_dir) + ): + print('::: Using Cached environment {} :::'.format(self.package_api.get_last_used_entry_cache())) + return venv_dir, requirements_manager, True + + # create the initial venv + if call_package_manager_create: self.package_api.create() - return venv_dir, requirements_manager + return venv_dir, requirements_manager, False def parse_requirements(self, reqs_file=None, overrides=None): os = None @@ -2221,6 +2263,9 @@ class Worker(ServiceCommandSection): temp_config.put("agent.git_pass", (ENV_AGENT_GIT_PASS.get() or self._session.config.get("agent.git_pass", None))) + if temp_config.get("agent.venvs_cache.path"): + temp_config.put("agent.venvs_cache.path", '/root/.clearml/venvs-cache') + self._host_ssh_cache = mkdtemp(prefix='clearml_agent.ssh.') self._temp_cleanup_list.append(self._host_ssh_cache) @@ -2233,6 +2278,9 @@ class Worker(ServiceCommandSection): self._session.config["agent.pip_download_cache.path"])).expanduser().as_posix() host_vcs_cache = Path(os.path.expandvars( self._session.config["agent.vcs_cache.path"])).expanduser().as_posix() + host_venvs_cache = Path(os.path.expandvars( + self._session.config["agent.venvs_cache.path"])).expanduser().as_posix() \ + if self._session.config.get("agent.venvs_cache.path") else None host_ssh_cache = self._host_ssh_cache host_apt_cache = Path(os.path.expandvars(self._session.config.get( @@ -2247,6 +2295,8 @@ class Worker(ServiceCommandSection): Path(host_pip_dl).mkdir(parents=True, exist_ok=True) Path(host_vcs_cache).mkdir(parents=True, exist_ok=True) Path(host_ssh_cache).mkdir(parents=True, exist_ok=True) + if host_venvs_cache: + Path(host_venvs_cache).mkdir(parents=True, exist_ok=True) # copy the .ssh folder to a temp folder, to be mapped into docker # noinspection PyBroadException @@ -2283,6 +2333,7 @@ class Worker(ServiceCommandSection): mounted_cache_dir = temp_config.get("sdk.storage.cache.default_base_dir") mounted_pip_dl_dir = temp_config.get("agent.pip_download_cache.path") mounted_vcs_cache = temp_config.get("agent.vcs_cache.path") + mounted_venvs_cache = temp_config.get("agent.venvs_cache.path") # Make sure we have created the configuration file for the executor if not self.dump_config(self.temp_config_path, config=temp_config): @@ -2302,6 +2353,7 @@ class Worker(ServiceCommandSection): host_cache=host_cache, mounted_cache=mounted_cache_dir, host_pip_dl=host_pip_dl, mounted_pip_dl=mounted_pip_dl_dir, host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache, + host_venvs_cache=host_venvs_cache, mounted_venvs_cache=mounted_venvs_cache, standalone_mode=self._standalone_mode, force_current_version=self._force_current_version, bash_script=bash_script, @@ -2320,6 +2372,7 @@ class Worker(ServiceCommandSection): host_cache, mounted_cache, host_pip_dl, mounted_pip_dl, host_vcs_cache, mounted_vcs_cache, + host_venvs_cache, mounted_venvs_cache, standalone_mode=False, extra_docker_arguments=None, extra_shell_script=None, force_current_version=None, host_git_credentials=None, bash_script=None, preprocess_bash_script=None): @@ -2375,14 +2428,16 @@ class Worker(ServiceCommandSection): # expect CLEARML_AGENT_K8S_HOST_MOUNT = '/mnt/host/data:/root/.clearml' k8s_node_mnt, _, k8s_pod_mnt = ENV_DOCKER_HOST_MOUNT.get().partition(':') # search and replace all the host folders with the k8s - host_mounts = [host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache] + host_mounts = [host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache, host_venvs_cache] for i, m in enumerate(host_mounts): + if not m: + continue if k8s_pod_mnt not in m: print('Warning: K8S mount missing, ignoring cached folder {}'.format(m)) host_mounts[i] = None else: host_mounts[i] = m.replace(k8s_pod_mnt, k8s_node_mnt, 1) - host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache = host_mounts + host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache, host_venvs_cache = host_mounts # copy the configuration file into the mounted folder new_conf_file = os.path.join(k8s_pod_mnt, '.clearml_agent.{}.cfg'.format(quote(worker_id, safe=""))) @@ -2472,6 +2527,7 @@ class Worker(ServiceCommandSection): (['-v', host_pip_dl+':'+mounted_pip_dl] if host_pip_dl else []) + (['-v', host_cache+':'+mounted_cache] if host_cache else []) + (['-v', host_vcs_cache+':'+mounted_vcs_cache] if host_vcs_cache else []) + + (['-v', host_venvs_cache + ':' + mounted_venvs_cache] if host_venvs_cache else []) + ['--rm', docker_image, 'bash', '-c', update_scheme + extra_shell_script + diff --git a/clearml_agent/helper/os/folder_cache.py b/clearml_agent/helper/os/folder_cache.py new file mode 100644 index 0000000..509169b --- /dev/null +++ b/clearml_agent/helper/os/folder_cache.py @@ -0,0 +1,215 @@ +import os +import shutil +from logging import warning +from random import random +from time import time +from typing import List, Optional, Sequence + +import psutil +from pathlib2 import Path + +from .locks import FileLock + + +class FolderCache(object): + _lock_filename = '.clearml.lock' + _lock_timeout_seconds = 30 + _temp_entry_prefix = '_temp.' + + def __init__(self, cache_folder, max_cache_entries=5, min_free_space_gb=None): + self._cache_folder = Path(os.path.expandvars(cache_folder)).expanduser().absolute() + self._cache_folder.mkdir(parents=True, exist_ok=True) + self._max_cache_entries = max_cache_entries + self._last_copied_entry_folder = None + self._min_free_space_gb = min_free_space_gb if min_free_space_gb and min_free_space_gb > 0 else None + self._lock = FileLock((self._cache_folder / self._lock_filename).as_posix()) + + def get_cache_folder(self): + # type: () -> Path + """ + :return: Return the base cache folder + """ + return self._cache_folder + + def copy_cached_entry(self, keys, destination): + # type: (List[str], Path) -> Optional[Path] + """ + Copy a cached entry into a destination directory, if the cached entry does not exist return None + :param keys: + :param destination: + :return: Target path, None if cached entry does not exist + """ + self._last_copied_entry_folder = None + if not keys: + return None + + # lock so we make sure no one deletes it before we copy it + # noinspection PyBroadException + try: + self._lock.acquire(timeout=self._lock_timeout_seconds) + except BaseException as ex: + warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex)) + return None + + src = None + try: + src = self.get_entry(keys) + if src: + destination = Path(destination).absolute() + destination.mkdir(parents=True, exist_ok=True) + shutil.rmtree(destination.as_posix()) + shutil.copytree(src.as_posix(), dst=destination.as_posix(), symlinks=True) + except BaseException as ex: + warning('Could not copy cache folder {} to {}: {}'.format(src, destination, ex)) + self._lock.release() + return None + + # release Lock + self._lock.release() + + self._last_copied_entry_folder = src + return destination if src else None + + def get_entry(self, keys): + # type: (List[str]) -> Optional[Path] + """ + Return a folder (a sub-folder of inside the cache_folder) matching one of the keys + :param keys: List of keys, return the first match to one of the keys, notice keys cannot contain '.' + :return: Path to the sub-folder or None if none was found + """ + if not keys: + return None + # conform keys + keys = [keys] if isinstance(keys, str) else keys + keys = sorted([k.replace('.', '_') for k in keys]) + for cache_folder in self._cache_folder.glob('*'): + if cache_folder.is_dir() and any(True for k in cache_folder.name.split('.') if k in keys): + cache_folder.touch() + return cache_folder + return None + + def add_entry(self, keys, source_folder, exclude_sub_folders=None): + # type: (List[str], Path, Optional[Sequence[str]]) -> bool + """ + Add a local folder into the cache, copy all sub-folders inside `source_folder` + excluding folders matching `exclude_sub_folders` list + :param keys: Cache entry keys list (str) + :param source_folder: Folder to copy into the cache + :param exclude_sub_folders: List of sub-folders to exclude from the copy operation + :return: return True is new entry was added to cache + """ + if not keys: + return False + + keys = [keys] if isinstance(keys, str) else keys + keys = sorted([k.replace('.', '_') for k in keys]) + + # If entry already exists skip it + cached_entry = self.get_entry(keys) + if cached_entry: + # make sure the entry contains all keys + cached_keys = cached_entry.name.split('.') + if set(keys) - set(cached_keys): + # noinspection PyBroadException + try: + self._lock.acquire(timeout=self._lock_timeout_seconds) + except BaseException as ex: + warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex)) + # failed locking do nothing + return True + keys = sorted(list(set(keys) | set(cached_keys))) + dst = cached_entry.parent / '.'.join(keys) + # rename + try: + shutil.move(src=cached_entry.as_posix(), dst=dst.as_posix()) + except BaseException as ex: + warning('Could not rename cache entry {} to {}: ex'.format( + cached_entry.as_posix(), dst.as_posix(), ex)) + # release lock + self._lock.release() + return True + + # make sure we remove old entries + self._remove_old_entries() + + # if we do not have enough free space, do nothing. + if not self._check_min_free_space(): + warning('Could not add cache entry, not enough free space on drive, ' + 'free space threshold {} GB'.format(self._min_free_space_gb)) + return False + + # create the new entry for us + exclude_sub_folders = exclude_sub_folders or [] + source_folder = Path(source_folder).absolute() + # create temp folder + temp_folder = \ + self._temp_entry_prefix + \ + '{}.{}'.format(str(time()).replace('.', '_'), str(random()).replace('.', '_')) + temp_folder = self._cache_folder / temp_folder + temp_folder.mkdir(parents=True, exist_ok=False) + + for f in source_folder.glob('*'): + if f.name in exclude_sub_folders: + continue + shutil.copytree(src=f.as_posix(), dst=(temp_folder / f.name).as_posix(), symlinks=True) + + # rename the target folder + target_cache_folder = self._cache_folder / '.'.join(keys) + # if we failed moving it means someone else created the cached entry before us, we can just leave + # noinspection PyBroadException + try: + shutil.move(src=temp_folder.as_posix(), dst=target_cache_folder.as_posix()) + except BaseException: + # noinspection PyBroadException + try: + shutil.rmtree(path=temp_folder.as_posix()) + except BaseException: + return False + + return True + + def get_last_copied_entry(self): + # type: () -> Optional[Path] + """ + :return: the last copied cached entry folder inside the cache + """ + return self._last_copied_entry_folder + + def _remove_old_entries(self): + # type: () -> () + """ + Notice we only keep self._max_cache_entries-1, assuming we will be adding a new entry soon + """ + folder_entries = [(cache_folder, cache_folder.stat().st_mtime) + for cache_folder in self._cache_folder.glob('*') + if cache_folder.is_dir() and not cache_folder.name.startswith(self._temp_entry_prefix)] + folder_entries = sorted(folder_entries, key=lambda x: x[1], reverse=True) + + # lock so we make sure no one deletes it before we copy it + # noinspection PyBroadException + try: + self._lock.acquire(timeout=self._lock_timeout_seconds) + except BaseException as ex: + warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex)) + return + + number_of_entries_to_keep = self._max_cache_entries-1 + for folder, ts in folder_entries[number_of_entries_to_keep:]: + try: + shutil.rmtree(folder.as_posix(), ignore_errors=True) + except BaseException as ex: + warning('Could not delete cache entry {}: {}'.format(folder.as_posix(), ex)) + + self._lock.release() + + def _check_min_free_space(self): + # type: () -> bool + """ + :return: return False if we hit the free space limit. + If not free space limit provided, always return True + """ + if not self._min_free_space_gb or not self._cache_folder: + return True + free_space = float(psutil.disk_usage(self._cache_folder.as_posix()).free) + free_space /= 2**30 + return free_space > self._min_free_space_gb diff --git a/clearml_agent/helper/os/locks.py b/clearml_agent/helper/os/locks.py new file mode 100644 index 0000000..e414893 --- /dev/null +++ b/clearml_agent/helper/os/locks.py @@ -0,0 +1,211 @@ +import os +import time +import tempfile +import contextlib + +from .portalocker import constants, exceptions, lock, unlock + + +current_time = getattr(time, "monotonic", time.time) + +DEFAULT_TIMEOUT = 10 ** 8 +DEFAULT_CHECK_INTERVAL = 0.25 +LOCK_METHOD = constants.LOCK_EX | constants.LOCK_NB + +__all__ = [ + 'FileLock', + 'open_atomic', +] + + +@contextlib.contextmanager +def open_atomic(filename, binary=True): + """Open a file for atomic writing. Instead of locking this method allows + you to write the entire file and move it to the actual location. Note that + this makes the assumption that a rename is atomic on your platform which + is generally the case but not a guarantee. + + http://docs.python.org/library/os.html#os.rename + + >>> filename = 'test_file.txt' + >>> if os.path.exists(filename): + ... os.remove(filename) + + >>> with open_atomic(filename) as fh: + ... written = fh.write(b'test') + >>> assert os.path.exists(filename) + >>> os.remove(filename) + + """ + assert not os.path.exists(filename), '%r exists' % filename + path, name = os.path.split(filename) + + # Create the parent directory if it doesn't exist + if path and not os.path.isdir(path): # pragma: no cover + os.makedirs(path) + + temp_fh = tempfile.NamedTemporaryFile( + mode=binary and 'wb' or 'w', + dir=path, + delete=False, + ) + yield temp_fh + temp_fh.flush() + os.fsync(temp_fh.fileno()) + temp_fh.close() + try: + os.rename(temp_fh.name, filename) + finally: + try: + os.remove(temp_fh.name) + except Exception: # noqa + pass + + +class FileLock(object): + + def __init__( + self, filename, mode='a', timeout=DEFAULT_TIMEOUT, + check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=False, + flags=LOCK_METHOD, **file_open_kwargs): + """Lock manager with build-in timeout + + filename -- filename + mode -- the open mode, 'a' or 'ab' should be used for writing + truncate -- use truncate to emulate 'w' mode, None is disabled, 0 is + truncate to 0 bytes + timeout -- timeout when trying to acquire a lock + check_interval -- check interval while waiting + fail_when_locked -- after the initial lock failed, return an error + or lock the file + **file_open_kwargs -- The kwargs for the `open(...)` call + + fail_when_locked is useful when multiple threads/processes can race + when creating a file. If set to true than the system will wait till + the lock was acquired and then return an AlreadyLocked exception. + + Note that the file is opened first and locked later. So using 'w' as + mode will result in truncate _BEFORE_ the lock is checked. + """ + + if 'w' in mode: + truncate = True + mode = mode.replace('w', 'a') + else: + truncate = False + + self.fh = None + self.filename = filename + self.mode = mode + self.truncate = truncate + self.timeout = timeout + self.check_interval = check_interval + self.fail_when_locked = fail_when_locked + self.flags = flags + self.file_open_kwargs = file_open_kwargs + + def acquire( + self, timeout=None, check_interval=None, fail_when_locked=None): + """Acquire the locked filehandle""" + if timeout is None: + timeout = self.timeout + if timeout is None: + timeout = 0 + + if check_interval is None: + check_interval = self.check_interval + + if fail_when_locked is None: + fail_when_locked = self.fail_when_locked + + # If we already have a filehandle, return it + fh = self.fh + if fh: + return fh + + # Get a new filehandler + fh = self._get_fh() + try: + # Try to lock + fh = self._get_lock(fh) + except exceptions.LockException as exception: + # Try till the timeout has passed + timeoutend = current_time() + timeout + while timeoutend > current_time(): + # Wait a bit + time.sleep(check_interval) + + # Try again + try: + + # We already tried to the get the lock + # If fail_when_locked is true, then stop trying + if fail_when_locked: + raise exceptions.AlreadyLocked(exception) + + else: # pragma: no cover + # We've got the lock + fh = self._get_lock(fh) + break + + except exceptions.LockException: + pass + + else: + # We got a timeout... reraising + raise exceptions.LockException(exception) + + # Prepare the filehandle (truncate if needed) + fh = self._prepare_fh(fh) + + self.fh = fh + return fh + + def release(self): + """Releases the currently locked file handle""" + if self.fh: + # noinspection PyBroadException + try: + unlock(self.fh) + except Exception: + pass + # noinspection PyBroadException + try: + self.fh.close() + except Exception: + pass + self.fh = None + + def _get_fh(self): + """Get a new filehandle""" + return open(self.filename, self.mode, **self.file_open_kwargs) + + def _get_lock(self, fh): + """ + Try to lock the given filehandle + + returns LockException if it fails""" + lock(fh, self.flags) + return fh + + def _prepare_fh(self, fh): + """ + Prepare the filehandle for usage + + If truncate is a number, the file will be truncated to that amount of + bytes + """ + if self.truncate: + fh.seek(0) + fh.truncate(0) + + return fh + + def __enter__(self): + return self.acquire() + + def __exit__(self, type_, value, tb): + self.release() + + def __delete__(self, instance): # pragma: no cover + instance.release() diff --git a/clearml_agent/helper/os/portalocker.py b/clearml_agent/helper/os/portalocker.py new file mode 100644 index 0000000..634bf58 --- /dev/null +++ b/clearml_agent/helper/os/portalocker.py @@ -0,0 +1,193 @@ +import os +import sys + + +class exceptions: + class BaseLockException(Exception): + # Error codes: + LOCK_FAILED = 1 + + def __init__(self, *args, **kwargs): + self.fh = kwargs.pop('fh', None) + Exception.__init__(self, *args, **kwargs) + + class LockException(BaseLockException): + pass + + class AlreadyLocked(BaseLockException): + pass + + class FileToLarge(BaseLockException): + pass + + +class constants: + # The actual tests will execute the code anyhow so the following code can + # safely be ignored from the coverage tests + if os.name == 'nt': # pragma: no cover + import msvcrt + + LOCK_EX = 0x1 #: exclusive lock + LOCK_SH = 0x2 #: shared lock + LOCK_NB = 0x4 #: non-blocking + LOCK_UN = msvcrt.LK_UNLCK #: unlock + + LOCKFILE_FAIL_IMMEDIATELY = 1 + LOCKFILE_EXCLUSIVE_LOCK = 2 + + elif os.name == 'posix': # pragma: no cover + import fcntl + + LOCK_EX = fcntl.LOCK_EX #: exclusive lock + LOCK_SH = fcntl.LOCK_SH #: shared lock + LOCK_NB = fcntl.LOCK_NB #: non-blocking + LOCK_UN = fcntl.LOCK_UN #: unlock + + else: # pragma: no cover + raise RuntimeError('PortaLocker only defined for nt and posix platforms') + + +if os.name == 'nt': # pragma: no cover + import msvcrt + + if sys.version_info.major == 2: + lock_length = -1 + else: + lock_length = int(2**31 - 1) + + def lock(file_, flags): + if flags & constants.LOCK_SH: + import win32file + import pywintypes + import winerror + __overlapped = pywintypes.OVERLAPPED() + if sys.version_info.major == 2: + if flags & constants.LOCK_NB: + mode = constants.LOCKFILE_FAIL_IMMEDIATELY + else: + mode = 0 + + else: + if flags & constants.LOCK_NB: + mode = msvcrt.LK_NBRLCK + else: + mode = msvcrt.LK_RLCK + + # is there any reason not to reuse the following structure? + hfile = win32file._get_osfhandle(file_.fileno()) + try: + win32file.LockFileEx(hfile, mode, 0, -0x10000, __overlapped) + except pywintypes.error as exc_value: + # error: (33, 'LockFileEx', 'The process cannot access the file + # because another process has locked a portion of the file.') + if exc_value.winerror == winerror.ERROR_LOCK_VIOLATION: + raise exceptions.LockException( + exceptions.LockException.LOCK_FAILED, + exc_value.strerror, + fh=file_) + else: + # Q: Are there exceptions/codes we should be dealing with + # here? + raise + else: + mode = constants.LOCKFILE_EXCLUSIVE_LOCK + if flags & constants.LOCK_NB: + mode |= constants.LOCKFILE_FAIL_IMMEDIATELY + + if flags & constants.LOCK_NB: + mode = msvcrt.LK_NBLCK + else: + mode = msvcrt.LK_LOCK + + # windows locks byte ranges, so make sure to lock from file start + try: + savepos = file_.tell() + if savepos: + # [ ] test exclusive lock fails on seek here + # [ ] test if shared lock passes this point + file_.seek(0) + # [x] check if 0 param locks entire file (not documented in + # Python) + # [x] fails with "IOError: [Errno 13] Permission denied", + # but -1 seems to do the trick + + try: + msvcrt.locking(file_.fileno(), mode, lock_length) + except IOError as exc_value: + # [ ] be more specific here + raise exceptions.LockException( + exceptions.LockException.LOCK_FAILED, + exc_value.strerror, + fh=file_) + finally: + if savepos: + file_.seek(savepos) + except IOError as exc_value: + raise exceptions.LockException( + exceptions.LockException.LOCK_FAILED, exc_value.strerror, + fh=file_) + + def unlock(file_): + try: + savepos = file_.tell() + if savepos: + file_.seek(0) + + try: + msvcrt.locking(file_.fileno(), constants.LOCK_UN, lock_length) + except IOError as exc_value: + if exc_value.strerror == 'Permission denied': + import pywintypes + import win32file + import winerror + __overlapped = pywintypes.OVERLAPPED() + hfile = win32file._get_osfhandle(file_.fileno()) + try: + win32file.UnlockFileEx( + hfile, 0, -0x10000, __overlapped) + except pywintypes.error as exc_value: + if exc_value.winerror == winerror.ERROR_NOT_LOCKED: + # error: (158, 'UnlockFileEx', + # 'The segment is already unlocked.') + # To match the 'posix' implementation, silently + # ignore this error + pass + else: + # Q: Are there exceptions/codes we should be + # dealing with here? + raise + else: + raise exceptions.LockException( + exceptions.LockException.LOCK_FAILED, + exc_value.strerror, + fh=file_) + finally: + if savepos: + file_.seek(savepos) + except IOError as exc_value: + raise exceptions.LockException( + exceptions.LockException.LOCK_FAILED, exc_value.strerror, + fh=file_) + +elif os.name == 'posix': # pragma: no cover + import fcntl + + def lock(file_, flags): + locking_exceptions = IOError, + try: # pragma: no cover + locking_exceptions += BlockingIOError, + except NameError: # pragma: no cover + pass + + try: + fcntl.flock(file_.fileno(), flags) + except locking_exceptions as exc_value: + # The exception code varies on different systems so we'll catch + # every IO error + raise exceptions.LockException(exc_value, fh=file_) + + def unlock(file_): + fcntl.flock(file_.fileno(), constants.LOCK_UN) + +else: # pragma: no cover + raise RuntimeError('PortaLocker only defined for nt and posix platforms') diff --git a/clearml_agent/helper/package/base.py b/clearml_agent/helper/package/base.py index feec39f..737465f 100644 --- a/clearml_agent/helper/package/base.py +++ b/clearml_agent/helper/package/base.py @@ -1,11 +1,16 @@ from __future__ import unicode_literals import abc +from collections import OrderedDict from contextlib import contextmanager -from typing import Text, Iterable, Union +from typing import Text, Iterable, Union, Optional, Dict, List +from pathlib2 import Path +from hashlib import md5 import six from clearml_agent.helper.base import mkstemp, safe_remove_file, join_lines, select_for_platform +from clearml_agent.helper.console import ensure_binary +from clearml_agent.helper.os.folder_cache import FolderCache from clearml_agent.helper.process import Executable, Argv, PathLike @@ -18,6 +23,12 @@ class PackageManager(object): _selected_manager = None _cwd = None _pip_version = None + _config_cache_folder = 'agent.venvs_cache.path' + _config_cache_max_entries = 'agent.venvs_cache.max_entries' + _config_cache_free_space_threshold = 'agent.venvs_cache.free_space_threshold_gb' + + def __init__(self): + self._cache_manager = None @abc.abstractproperty def bin(self): @@ -153,3 +164,91 @@ class PackageManager(object): @classmethod def get_pip_version(cls): return cls._pip_version or '' + + def get_cached_venv(self, requirements, docker_cmd, python_version, destination_folder): + # type: (Dict, Optional[Union[dict, str]], Optional[str], Path) -> Optional[Path] + """ + Copy a cached copy of the venv (based on the requirements) into destination_folder. + Return None if failed or cached entry does not exist + """ + if not self._get_cache_manager(): + return None + + keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version) + return self._get_cache_manager().copy_cached_entry(keys, destination_folder) + + def add_cached_venv(self, requirements, docker_cmd, python_version, source_folder, exclude_sub_folders=None): + # type: (Union[Dict, List[Dict]], Optional[Union[dict, str]], Optional[str], Path, Optional[List[str]]) -> () + """ + Copy the local venv folder into the venv cache (keys are based on the requirements+python+docker). + """ + if not self._get_cache_manager(): + return + keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version) + return self._get_cache_manager().add_entry( + keys=keys, source_folder=source_folder, exclude_sub_folders=exclude_sub_folders) + + def get_cache_folder(self): + # type: () -> Optional[Path] + if not self._get_cache_manager(): + return + return self._get_cache_manager().get_cache_folder() + + def get_last_used_entry_cache(self): + # type: () -> Optional[Path] + """ + :return: the last used cached folder entry + """ + if not self._get_cache_manager(): + return + return self._get_cache_manager().get_last_copied_entry() + + @classmethod + def _generate_reqs_hash_keys(cls, requirements_list, docker_cmd, python_version): + # type: (Union[Dict, List[Dict]], Optional[Union[dict, str]], Optional[str]) -> List[str] + requirements_list = requirements_list or dict() + if not isinstance(requirements_list, (list, tuple)): + requirements_list = [requirements_list] + docker_cmd = dict(docker_cmd=docker_cmd) if isinstance(docker_cmd, str) else docker_cmd or dict() + docker_cmd = OrderedDict(sorted(docker_cmd.items(), key=lambda t: t[0])) + if 'docker_cmd' in docker_cmd: + # we only take the first part of the docker_cmd which is the docker image name + docker_cmd['docker_cmd'] = docker_cmd['docker_cmd'].strip('\r\n\t ').split(' ')[0] + + keys = [] + strip_chars = '\n\r\t ' + for requirements in requirements_list: + pip, conda = ('pip', 'conda') + pip_reqs = requirements.get(pip, '') + conda_reqs = requirements.get(conda, '') + if isinstance(pip_reqs, str): + pip_reqs = pip_reqs.split('\n') + if isinstance(conda_reqs, str): + conda_reqs = conda_reqs.split('\n') + pip_reqs = sorted([p.strip(strip_chars) for p in pip_reqs + if p.strip(strip_chars) and not p.strip(strip_chars).startswith('#')]) + conda_reqs = sorted([p.strip(strip_chars) for p in conda_reqs + if p.strip(strip_chars) and not p.strip(strip_chars).startswith('#')]) + if not pip_reqs and not conda_reqs: + continue + hash_text = '{class_type}\n{docker_cmd}\n{python_version}\n{pip_reqs}\n{conda_reqs}'.format( + class_type=str(cls), + docker_cmd=str(docker_cmd or ''), + python_version=str(python_version or ''), + pip_reqs=str(pip_reqs or ''), + conda_reqs=str(conda_reqs or ''), + ) + keys.append(md5(ensure_binary(hash_text)).hexdigest()) + return sorted(list(set(keys))) + + def _get_cache_manager(self): + if not self._cache_manager: + cache_folder = self.session.config.get(self._config_cache_folder, None) + if not cache_folder: + return None + + max_entries = int(self.session.config.get(self._config_cache_max_entries, 10)) + free_space_threshold = float(self.session.config.get(self._config_cache_free_space_threshold, 0)) + self._cache_manager = FolderCache( + cache_folder, max_cache_entries=max_entries, min_free_space_gb=free_space_threshold) + return self._cache_manager diff --git a/clearml_agent/helper/package/conda_api.py b/clearml_agent/helper/package/conda_api.py index 64656b4..a9fe45c 100644 --- a/clearml_agent/helper/package/conda_api.py +++ b/clearml_agent/helper/package/conda_api.py @@ -69,6 +69,7 @@ class CondaAPI(PackageManager): :param python: base python version to use (e.g python3.6) :param path: path of env """ + super(CondaAPI, self).__init__() self.session = session self.python = python self.source = None diff --git a/clearml_agent/helper/package/pip_api/system.py b/clearml_agent/helper/package/pip_api/system.py index e1f3db7..7a7b2e9 100644 --- a/clearml_agent/helper/package/pip_api/system.py +++ b/clearml_agent/helper/package/pip_api/system.py @@ -17,6 +17,7 @@ class SystemPip(PackageManager): """ Program interface to the system pip. """ + super(SystemPip, self).__init__() self._bin = interpreter or sys.executable self.session = session