mirror of
https://github.com/clearml/clearml-agent
synced 2025-01-31 17:16:51 +00:00
269 lines
9.8 KiB
Python
269 lines
9.8 KiB
Python
from __future__ import unicode_literals
|
|
|
|
import abc
|
|
from collections import OrderedDict
|
|
from contextlib import contextmanager
|
|
from hashlib import md5
|
|
from typing import Text, Iterable, Union, Optional, Dict, List
|
|
|
|
import six
|
|
from pathlib2 import Path
|
|
|
|
from clearml_agent.definitions import ENV_VENV_CACHE_PATH
|
|
from clearml_agent.helper.base import mkstemp, safe_remove_file, join_lines, select_for_platform
|
|
from clearml_agent.helper.console import ensure_binary
|
|
from clearml_agent.helper.os.folder_cache import FolderCache
|
|
from clearml_agent.helper.process import Executable, Argv, PathLike
|
|
|
|
|
|
@six.add_metaclass(abc.ABCMeta)
|
|
class PackageManager(object):
|
|
"""
|
|
ABC for classes providing python package management interface
|
|
"""
|
|
|
|
_selected_manager = None
|
|
_cwd = None
|
|
_pip_version = None
|
|
_config_cache_folder = 'agent.venvs_cache.path'
|
|
_config_cache_max_entries = 'agent.venvs_cache.max_entries'
|
|
_config_cache_free_space_threshold = 'agent.venvs_cache.free_space_threshold_gb'
|
|
|
|
def __init__(self):
|
|
self._cache_manager = None
|
|
|
|
@abc.abstractproperty
|
|
def bin(self):
|
|
# type: () -> PathLike
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def create(self):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def remove(self):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def install_from_file(self, path):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def freeze(self):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def load_requirements(self, requirements):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def install_packages(self, *packages):
|
|
# type: (Iterable[Text]) -> None
|
|
"""
|
|
Install packages, upgrading depends on config
|
|
"""
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _install(self, *packages):
|
|
# type: (Iterable[Text]) -> None
|
|
"""
|
|
Run install command
|
|
"""
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def uninstall_packages(self, *packages):
|
|
# type: (Iterable[Text]) -> None
|
|
pass
|
|
|
|
def upgrade_pip(self):
|
|
result = self._install(
|
|
select_for_platform(windows='pip{}', linux='pip{}').format(self.get_pip_version()), "--upgrade")
|
|
packages = self.run_with_env(('list',), output=True).splitlines()
|
|
# p.split is ('pip', 'x.y.z')
|
|
pip = [p.split() for p in packages if len(p.split()) == 2 and p.split()[0] == 'pip']
|
|
if pip:
|
|
# noinspection PyBroadException
|
|
try:
|
|
from .requirements import MarkerRequirement
|
|
pip = pip[0][1].split('.')
|
|
MarkerRequirement.pip_new_version = bool(int(pip[0]) >= 20)
|
|
except Exception:
|
|
pass
|
|
return result
|
|
|
|
def get_python_command(self, extra=()):
|
|
# type: (...) -> Executable
|
|
return Argv(self.bin, *extra)
|
|
|
|
@contextmanager
|
|
def temp_file(self, prefix, contents, suffix=".txt"):
|
|
# type: (Union[Text, Iterable[Text]], Iterable[Text], Text) -> Text
|
|
"""
|
|
Write contents to a temporary file, yielding its path. Finally, delete it.
|
|
:param prefix: file name prefix
|
|
:param contents: text lines to write
|
|
:param suffix: file name suffix
|
|
"""
|
|
f, temp_path = mkstemp(suffix=suffix, prefix=prefix)
|
|
with f:
|
|
f.write(
|
|
contents
|
|
if isinstance(contents, six.text_type)
|
|
else join_lines(contents)
|
|
)
|
|
try:
|
|
yield temp_path
|
|
finally:
|
|
if not self.session.debug_mode:
|
|
safe_remove_file(temp_path)
|
|
|
|
def set_selected_package_manager(self):
|
|
# set this instance as the selected package manager
|
|
# this is helpful when we want out of context requirement installations
|
|
PackageManager._selected_manager = self
|
|
|
|
@property
|
|
def cwd(self):
|
|
return self._cwd
|
|
|
|
@cwd.setter
|
|
def cwd(self, value):
|
|
self._cwd = value
|
|
|
|
@classmethod
|
|
def out_of_scope_install_package(cls, package_name, *args):
|
|
if PackageManager._selected_manager is not None:
|
|
try:
|
|
result = PackageManager._selected_manager._install(package_name, *args)
|
|
if result not in (0, None, True):
|
|
return False
|
|
except Exception:
|
|
return False
|
|
return True
|
|
|
|
@classmethod
|
|
def out_of_scope_freeze(cls):
|
|
if PackageManager._selected_manager is not None:
|
|
try:
|
|
return PackageManager._selected_manager.freeze()
|
|
except Exception:
|
|
pass
|
|
return []
|
|
|
|
@classmethod
|
|
def set_pip_version(cls, version):
|
|
if not version:
|
|
return
|
|
version = version.replace(' ', '')
|
|
if ('=' in version) or ('~' in version) or ('<' in version) or ('>' in version):
|
|
cls._pip_version = version
|
|
else:
|
|
cls._pip_version = "=="+version
|
|
|
|
@classmethod
|
|
def get_pip_version(cls):
|
|
return cls._pip_version or ''
|
|
|
|
def get_cached_venv(self, requirements, docker_cmd, python_version, cuda_version, destination_folder):
|
|
# type: (Dict, Optional[Union[dict, str]], Optional[str], Optional[str], Path) -> Optional[Path]
|
|
"""
|
|
Copy a cached copy of the venv (based on the requirements) into destination_folder.
|
|
Return None if failed or cached entry does not exist
|
|
"""
|
|
if not self._get_cache_manager():
|
|
return None
|
|
|
|
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
|
|
return self._get_cache_manager().copy_cached_entry(keys, destination_folder)
|
|
|
|
def add_cached_venv(
|
|
self,
|
|
requirements, # type: Union[Dict, List[Dict]]
|
|
docker_cmd, # type: Optional[Union[dict, str]]
|
|
python_version, # type: Optional[str]
|
|
cuda_version, # type: Optional[str]
|
|
source_folder, # type: Path
|
|
exclude_sub_folders=None # type: Optional[List[str]]
|
|
):
|
|
# type: (...) -> ()
|
|
"""
|
|
Copy the local venv folder into the venv cache (keys are based on the requirements+python+docker).
|
|
"""
|
|
if not self._get_cache_manager():
|
|
return
|
|
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
|
|
return self._get_cache_manager().add_entry(
|
|
keys=keys, source_folder=source_folder, exclude_sub_folders=exclude_sub_folders)
|
|
|
|
def get_cache_folder(self):
|
|
# type: () -> Optional[Path]
|
|
if not self._get_cache_manager():
|
|
return
|
|
return self._get_cache_manager().get_cache_folder()
|
|
|
|
def get_last_used_entry_cache(self):
|
|
# type: () -> Optional[Path]
|
|
"""
|
|
:return: the last used cached folder entry
|
|
"""
|
|
if not self._get_cache_manager():
|
|
return
|
|
return self._get_cache_manager().get_last_copied_entry()
|
|
|
|
@classmethod
|
|
def _generate_reqs_hash_keys(cls, requirements_list, docker_cmd, python_version, cuda_version):
|
|
# type: (Union[Dict, List[Dict]], Optional[Union[dict, str]], Optional[str], Optional[str]) -> List[str]
|
|
requirements_list = requirements_list or dict()
|
|
if not isinstance(requirements_list, (list, tuple)):
|
|
requirements_list = [requirements_list]
|
|
docker_cmd = dict(docker_cmd=docker_cmd) if isinstance(docker_cmd, str) else docker_cmd or dict()
|
|
docker_cmd = OrderedDict(sorted(docker_cmd.items(), key=lambda t: t[0]))
|
|
if 'docker_cmd' in docker_cmd:
|
|
# we only take the first part of the docker_cmd which is the docker image name
|
|
docker_cmd['docker_cmd'] = docker_cmd['docker_cmd'].strip('\r\n\t ').split(' ')[0]
|
|
|
|
keys = []
|
|
strip_chars = '\n\r\t '
|
|
for requirements in requirements_list:
|
|
pip, conda = ('pip', 'conda')
|
|
pip_reqs = requirements.get(pip, '')
|
|
conda_reqs = requirements.get(conda, '')
|
|
if isinstance(pip_reqs, str):
|
|
pip_reqs = pip_reqs.split('\n')
|
|
if isinstance(conda_reqs, str):
|
|
conda_reqs = conda_reqs.split('\n')
|
|
pip_reqs = sorted([p.strip(strip_chars) for p in pip_reqs
|
|
if p.strip(strip_chars) and not p.strip(strip_chars).startswith('#')])
|
|
conda_reqs = sorted([p.strip(strip_chars) for p in conda_reqs
|
|
if p.strip(strip_chars) and not p.strip(strip_chars).startswith('#')])
|
|
if not pip_reqs and not conda_reqs:
|
|
continue
|
|
# do not process "-r" or "--requirement" because we cannot know what we have in the git repo.
|
|
if any(r.strip().startswith('-r ') or r.strip().startswith('--requirement ') for r in pip_reqs):
|
|
continue
|
|
hash_text = '{class_type}\n{docker_cmd}\n{cuda_ver}\n{python_version}\n{pip_reqs}\n{conda_reqs}'.format(
|
|
class_type=str(cls),
|
|
docker_cmd=str(docker_cmd or ''),
|
|
cuda_ver=str(cuda_version or ''),
|
|
python_version=str(python_version or ''),
|
|
pip_reqs=str(pip_reqs or ''),
|
|
conda_reqs=str(conda_reqs or ''),
|
|
)
|
|
keys.append(md5(ensure_binary(hash_text)).hexdigest())
|
|
return sorted(list(set(keys)))
|
|
|
|
def _get_cache_manager(self):
|
|
if not self._cache_manager:
|
|
cache_folder = ENV_VENV_CACHE_PATH.get() or self.session.config.get(self._config_cache_folder, None)
|
|
if not cache_folder:
|
|
return None
|
|
|
|
max_entries = int(self.session.config.get(self._config_cache_max_entries, 10))
|
|
free_space_threshold = float(self.session.config.get(self._config_cache_free_space_threshold, 0))
|
|
self._cache_manager = FolderCache(
|
|
cache_folder, max_cache_entries=max_entries, min_free_space_gb=free_space_threshold)
|
|
return self._cache_manager
|