From 272fa07c29a7f0881ed0d8e141373998007865f5 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 9 May 2020 19:57:25 +0300 Subject: [PATCH] Fix and enhance "build --docker" - Fix standalone docker execution - Add --install-globally option to install required packages in the docker's system python - Add --entry-point option to allow automatic task cloning when running the docker --- trains_agent/commands/worker.py | 108 ++++++++++++++++++++++--------- trains_agent/helper/process.py | 13 +++- trains_agent/helper/repo.py | 47 ++++++++------ trains_agent/interface/worker.py | 12 ++++ 4 files changed, 127 insertions(+), 53 deletions(-) diff --git a/trains_agent/commands/worker.py b/trains_agent/commands/worker.py index e281764..1d3ef52 100644 --- a/trains_agent/commands/worker.py +++ b/trains_agent/commands/worker.py @@ -71,6 +71,7 @@ from trains_agent.helper.package.base import PackageManager from trains_agent.helper.package.conda_api import CondaAPI from trains_agent.helper.package.horovod_req import HorovodRequirement from trains_agent.helper.package.external_req import ExternalRequirements +from trains_agent.helper.package.pip_api.system import SystemPip from trains_agent.helper.package.pip_api.venv import VirtualenvPip from trains_agent.helper.package.poetry_api import PoetryConfig, PoetryAPI from trains_agent.helper.package.pytorch import PytorchRequirement @@ -98,6 +99,8 @@ from .events import Events log = logging.getLogger(__name__) +DOCKER_ROOT_CONF_FILE = "/root/trains.conf" +DOCKER_DEFAULT_CONF_FILE = "/root/default_trains.conf" @attr.s class LiteralScriptManager(object): @@ -365,6 +368,7 @@ class Worker(ServiceCommandSection): self.queues = () self.venv_folder = None # type: Optional[Text] self.package_api = None # type: PackageManager + self.global_package_api = None self.is_venv_update = self._session.config.agent.venv_update.enabled self.poetry = PoetryConfig(self._session) @@ -997,6 +1001,8 @@ class Worker(ServiceCommandSection): target=None, python_version=None, docker=None, + entry_point=None, + install_globally=False, **_ ): if not task_id: @@ -1005,7 +1011,7 @@ class Worker(ServiceCommandSection): self._session.print_configuration() if docker is not False and docker is not None: - return self._build_docker(docker, target, task_id) + return self._build_docker(docker, target, task_id, entry_point) current_task = self._session.api_client.tasks.get_by_id(task_id) @@ -1029,7 +1035,10 @@ class Worker(ServiceCommandSection): requested_python_version=python_version) if self._default_pip: - self.package_api.install_packages(*self._default_pip) + if install_globally and self.global_package_api: + self.global_package_api.install_packages(*self._default_pip) + else: + self.package_api.install_packages(*self._default_pip) directory, vcs, repo_info = self.get_repo_info(execution, current_task, venv_folder.as_posix()) @@ -1039,6 +1048,7 @@ class Worker(ServiceCommandSection): requirements_manager=requirements_manager, cached_requirements=requirements, cwd=vcs.location if vcs and vcs.location else directory, + package_api=self.global_package_api if install_globally else None, ) freeze = self.freeze_task_environment(requirements_manager=requirements_manager) script_dir = directory @@ -1057,13 +1067,13 @@ class Worker(ServiceCommandSection): return 0 - def _build_docker(self, docker, target, task_id): + def _build_docker(self, docker, target, task_id, entry_point=None, standalone_mode=True): self.temp_config_path = safe_mkstemp( suffix=".cfg", prefix=".trains_agent.", text=True, name_only=True ) if not target: - ValueError("--target container name must be provided for docker build") + target = "task_id_{}".format(task_id) temp_config, docker_image_func = self.get_docker_config_cmd(docker) self.dump_config(temp_config) @@ -1080,15 +1090,19 @@ class Worker(ServiceCommandSection): full_docker_cmd = self.docker_image_func(docker_image=task_docker_cmd[0], docker_arguments=task_docker_cmd[1:]) else: - print('running Task {} inside default docker image: {} {}\n'.format( + print('Building Task {} inside default docker image: {} {}\n'.format( task_id, self._docker_image, self._docker_arguments or '')) full_docker_cmd = self.docker_image_func(docker_image=self._docker_image, docker_arguments=self._docker_arguments) end_of_build_marker = "build.done=true" - docker_cmd_suffix = ' build --id {} ; ' \ - 'echo "" >> /root/trains.conf ; ' \ - 'echo {} >> /root/trains.conf ; ' \ - 'bash'.format(task_id, end_of_build_marker) + docker_cmd_suffix = ' build --id {task_id} --install-globally; ' \ + 'echo "" >> {conf_file} ; ' \ + 'echo {end_of_build_marker} >> {conf_file} ; ' \ + 'bash'.format( + task_id=task_id, + end_of_build_marker=end_of_build_marker, + conf_file=DOCKER_ROOT_CONF_FILE + ) full_docker_cmd[-1] = full_docker_cmd[-1] + docker_cmd_suffix cmd = Argv(*full_docker_cmd) @@ -1118,9 +1132,22 @@ class Worker(ServiceCommandSection): print("Error: cannot locate docker for storage") return + if entry_point == "clone_task" or entry_point == "reuse_task": + change = 'ENTRYPOINT if [ ! -s "{trains_conf}" ] ; then ' \ + 'cp {default_trains_conf} {trains_conf} ; ' \ + ' fi ; trains-agent execute --id {task_id} --standalone-mode {clone}'.format( + default_trains_conf=DOCKER_DEFAULT_CONF_FILE, + trains_conf=DOCKER_ROOT_CONF_FILE, + task_id=task_id, + clone=("--clone" if entry_point == "clone_task" else ""), + ) + else: + change = None + print('Committing docker container to: {}'.format(target)) - print(commit_docker(container_name=target, docker_id=docker_id)) + print(commit_docker(container_name=target, docker_id=docker_id, apply_change=change)) shutdown_docker_process(docker_id=docker_id) + return @resolve_names @@ -1138,6 +1165,9 @@ class Worker(ServiceCommandSection): clone=False, **_ ): + + self._standalone_mode = standalone_mode + if not task_id: raise CommandFailedError("Worker execute must have valid task id") @@ -1433,6 +1463,7 @@ class Worker(ServiceCommandSection): def _get_repo_info(self, execution, task, venv_folder): try: + self._session.config.put("agent.standalone_mode", self._standalone_mode) vcs, repo_info = clone_repository_cached( session=self._session, execution=execution, @@ -1578,7 +1609,14 @@ class Worker(ServiceCommandSection): return None def install_requirements( - self, execution, repo_info, requirements_manager, cached_requirements=None, cwd=None, + self, execution, repo_info, requirements_manager, cached_requirements=None, cwd=None, package_api=None + ): + return self.install_requirements_for_package_api(execution, repo_info, requirements_manager, + cached_requirements=cached_requirements, cwd=cwd, + package_api=package_api if package_api else self.package_api) + + def install_requirements_for_package_api( + self, execution, repo_info, requirements_manager, cached_requirements=None, cwd=None, package_api=None, ): # type: (ExecutionInfo, RepoInfo, RequirementsManager, Optional[dict]) -> None """ @@ -1590,27 +1628,28 @@ class Worker(ServiceCommandSection): :param repo_info: repository information :param requirements_manager: requirements manager for task :param cached_requirements: cached requirements from previous run + :param package_api: package_api to be used when installing requirements """ - if self.package_api: - self.package_api.cwd = cwd + if package_api: + package_api.cwd = cwd api = self._install_poetry_requirements(repo_info) if api: - self.package_api = api + package_api = api return - self.package_api.upgrade_pip() - self.package_api.set_selected_package_manager() + package_api.upgrade_pip() + package_api.set_selected_package_manager() # always install cython, # if we have a specific version in the requirements, # the CythonRequirement(SimpleSubstitution) will reinstall cython with the specific version if not self.is_conda: - self.package_api.out_of_scope_install_package('Cython') + package_api.out_of_scope_install_package('Cython') cached_requirements_failed = False if cached_requirements and ('pip' in cached_requirements or 'conda' in cached_requirements): self.log("Found task requirements section, trying to install") try: - self.package_api.load_requirements(cached_requirements) + package_api.load_requirements(cached_requirements) except Exception as e: self.log_traceback(e) cached_requirements_failed = True @@ -1646,7 +1685,7 @@ class Worker(ServiceCommandSection): temp_file.write(new_requirements) temp_file.flush() # close the file before reading in install_from_file for Windows compatibility - self.package_api.install_from_file(temp_file.name) + package_api.install_from_file(temp_file.name) except Exception as e: print('ERROR: Failed installing requirements.txt:\n{}'.format(requirements_text)) raise e @@ -1818,7 +1857,8 @@ class Worker(ServiceCommandSection): base_interpreter=executable_name ) - rm_tree(normalize_path(venv_dir, WORKING_REPOSITORY_DIR)) + if not standalone_mode: + rm_tree(normalize_path(venv_dir, WORKING_REPOSITORY_DIR)) package_manager_params = dict( session=self._session, python=executable_version_suffix if self.is_conda else executable_name, @@ -1826,7 +1866,16 @@ class Worker(ServiceCommandSection): requirements_manager=requirements_manager, ) - if not self.is_conda: + global_package_manager_params = dict( + interpreter=executable_name, + ) + + if not self.is_conda and standalone_mode: + # pip with standalone mode + get_pip = partial(VirtualenvPip, **package_manager_params) + self.package_api = get_pip() + self.global_package_api = SystemPip(**global_package_manager_params) + elif not self.is_conda: if self.is_venv_update: self.package_api = VenvUpdateAPI( url=self._session.config["agent.venv_update.url"] or DEFAULT_VENV_UPDATE_URL, @@ -1837,6 +1886,7 @@ class Worker(ServiceCommandSection): if first_time: self.package_api.remove() self.package_api.create() + self.global_package_api = SystemPip(**global_package_manager_params) elif standalone_mode: # conda with standalone mode get_conda = partial(CondaAPI, **package_manager_params) @@ -2099,7 +2149,7 @@ class Worker(ServiceCommandSection): specify_version=specify_version) base_cmd += ( - ['-v', conf_file+':/root/trains.conf'] + + ['-v', conf_file+':'+DOCKER_ROOT_CONF_FILE] + (['-v', host_git_credentials+':/root/.git-credentials'] if host_git_credentials else []) + (['-v', host_ssh_cache+':/root/.ssh'] if host_ssh_cache else []) + (['-v', host_apt_cache+':/var/cache/apt/archives'] if host_apt_cache else []) + @@ -2110,6 +2160,7 @@ class Worker(ServiceCommandSection): ['--rm', docker_image, 'bash', '-c', update_scheme + extra_shell_script + + "cp {} {} ; ".format(DOCKER_ROOT_CONF_FILE, DOCKER_DEFAULT_CONF_FILE) + "NVIDIA_VISIBLE_DEVICES={nv_visible} {python} -u -m trains_agent ".format( nv_visible=dockers_nvidia_visible_devices, python=python_version) ]) @@ -2140,18 +2191,13 @@ class Worker(ServiceCommandSection): # create a home folder for our user trains_agent_home = 'trains_agent_home{}'.format('.'+str(Singleton.get_slot()) if Singleton.get_slot() else '') try: - home_folder = (Path('/') / trains_agent_home).absolute().as_posix() + home_folder = '/trains_agent_home' rm_tree(home_folder) Path(home_folder).mkdir(parents=True, exist_ok=True) except: - try: - home_folder = (Path.home().parent / trains_agent_home).absolute().as_posix() - rm_tree(home_folder) - Path(home_folder).mkdir(parents=True, exist_ok=True) - except: - home_folder = (Path(gettempdir()) / trains_agent_home).absolute().as_posix() - rm_tree(home_folder) - Path(home_folder).mkdir(parents=True, exist_ok=True) + home_folder = '/home/trains_agent_home' + rm_tree(home_folder) + Path(home_folder).mkdir(parents=True, exist_ok=True) # move our entire venv into the new home venv_folder = venv_folder.as_posix() diff --git a/trains_agent/helper/process.py b/trains_agent/helper/process.py index 6422ddc..9676819 100644 --- a/trains_agent/helper/process.py +++ b/trains_agent/helper/process.py @@ -83,7 +83,15 @@ def shutdown_docker_process(docker_cmd_contains=None, docker_id=None): pass -def commit_docker(container_name, docker_cmd_contains=None, docker_id=None): +def commit_docker(container_name, docker_cmd_contains=None, docker_id=None, apply_change=None): + """ + Commit a docker into a new image + :param str container_name: Name for the new image + :param docker_cmd_contains: partial container id to be committed + :param str docker_id: Id of container to be comitted + :param str apply_change: apply Dockerfile instructions to the image that is created + (see docker commit documentation for '--change'). + """ try: if not docker_id: docker_id = get_docker_id(docker_cmd_contains=docker_cmd_contains) @@ -93,7 +101,8 @@ def commit_docker(container_name, docker_cmd_contains=None, docker_id=None): if docker_id: # we found our docker, stop it - output = get_bash_output(cmd='docker commit {} {}'.format(docker_id, container_name)) + apply_change = '--change=\'{}\''.format(apply_change) if apply_change else '' + output = get_bash_output(cmd='docker commit {} {} {}'.format(apply_change, docker_id, container_name)) return output except Exception: pass diff --git a/trains_agent/helper/repo.py b/trains_agent/helper/repo.py index 9d9dd67..2d5c812 100644 --- a/trains_agent/helper/repo.py +++ b/trains_agent/helper/repo.py @@ -533,11 +533,16 @@ def clone_repository_cached(session, execution, destination): clone_folder_name = Path(str(furl(repo_url).path)).name # type: str clone_folder = Path(destination) / clone_folder_name - cached_repo_path = ( - Path(session.config["agent.vcs_cache.path"]).expanduser() - / "{}.{}".format(clone_folder_name, md5(ensure_binary(repo_url)).hexdigest()) - / clone_folder_name - ) # type: Path + + standalone_mode = session.config.get("agent.standalone_mode", False) + if standalone_mode: + cached_repo_path = clone_folder + else: + cached_repo_path = ( + Path(session.config["agent.vcs_cache.path"]).expanduser() + / "{}.{}".format(clone_folder_name, md5(ensure_binary(repo_url)).hexdigest()) + / clone_folder_name + ) # type: Path vcs = VcsFactory.create( session, execution_info=execution, location=cached_repo_path @@ -545,23 +550,25 @@ def clone_repository_cached(session, execution, destination): if not find_executable(vcs.executable_name): raise CommandFailedError(vcs.executable_not_found_error_help()) - if session.config["agent.vcs_cache.enabled"] and cached_repo_path.exists(): - print('Using cached repository in "{}"'.format(cached_repo_path)) - else: - print("cloning: {}".format(no_password_url)) - rm_tree(cached_repo_path) - # We clone the entire repository, not a specific branch - vcs.clone() # branch=execution.branch) + if not standalone_mode: + if session.config["agent.vcs_cache.enabled"] and cached_repo_path.exists(): + print('Using cached repository in "{}"'.format(cached_repo_path)) - vcs.pull() - rm_tree(destination) - shutil.copytree(Text(cached_repo_path), Text(clone_folder)) - if not clone_folder.is_dir(): - raise CommandFailedError( - "copying of repository failed: from {} to {}".format( - cached_repo_path, clone_folder + else: + print("cloning: {}".format(no_password_url)) + rm_tree(cached_repo_path) + # We clone the entire repository, not a specific branch + vcs.clone() # branch=execution.branch) + + vcs.pull() + rm_tree(destination) + shutil.copytree(Text(cached_repo_path), Text(clone_folder)) + if not clone_folder.is_dir(): + raise CommandFailedError( + "copying of repository failed: from {} to {}".format( + cached_repo_path, clone_folder + ) ) - ) # checkout in the newly copy destination vcs.location = Text(clone_folder) diff --git a/trains_agent/interface/worker.py b/trains_agent/interface/worker.py index 2137adc..7884f95 100644 --- a/trains_agent/interface/worker.py +++ b/trains_agent/interface/worker.py @@ -146,6 +146,12 @@ COMMANDS = { 'help': 'Where to build the task\'s virtual environment and source code. ' 'When used with --docker, target docker image name to create', }, + '--install-globally': { + 'help': 'Install required python packages before creating the virtual environment used to execute an ' + 'experiment, and use the \'agent.package_manager.system_site_packages\' virtual env option. ' + 'Note: when --docker is used, install-globally is always true', + 'action': 'store_true', + }, '--docker': { 'help': 'Build the experiment inside a docker (v19.03 and above). Optional args or ' 'specify default docker image in agent.default_docker.image / agent.default_docker.arguments' @@ -156,6 +162,12 @@ COMMANDS = { '--python-version': { 'help': 'Virtual environment python version to use', }, + '--entry-point': { + 'help': 'Run the task in the new docker. There are two options:\nEither add "reuse_task" to run the ' + 'given task in the docker, or "clone_task" to first clone the given task and then run it in the docker', + 'default': False, + 'choices': ['reuse_task', 'clone_task'], + } }, **WORKER_ARGS), }, 'list': {