diff --git a/trains_agent/commands/worker.py b/trains_agent/commands/worker.py index 0982818..113efee 100644 --- a/trains_agent/commands/worker.py +++ b/trains_agent/commands/worker.py @@ -3,6 +3,7 @@ from __future__ import print_function, division, unicode_literals import errno import json import logging +import os import os.path import re import signal @@ -16,7 +17,6 @@ from datetime import datetime from distutils.spawn import find_executable from functools import partial from itertools import chain -from os import environ, getpid from tempfile import gettempdir, mkdtemp from time import sleep, time from typing import Text, Optional, Any, Tuple @@ -59,7 +59,7 @@ from trains_agent.helper.base import ( is_conda, named_temporary_file, ExecutionInfo, - HOCONEncoder, error) + HOCONEncoder, error, get_python_path) from trains_agent.helper.console import ensure_text from trains_agent.helper.package.base import PackageManager from trains_agent.helper.package.conda_api import CondaAPI @@ -78,7 +78,7 @@ from trains_agent.helper.process import ( Argv, COMMAND_SUCCESS, Executable, - get_bash_output, shutdown_docker_process) + get_bash_output, shutdown_docker_process, get_docker_id, commit_docker) from trains_agent.helper.package.cython_req import CythonRequirement from trains_agent.helper.repo import clone_repository_cached, RepoInfo, VCS from trains_agent.helper.resource_monitor import ResourceMonitor @@ -326,7 +326,7 @@ class Worker(ServiceCommandSection): extra_url = [extra_url] # put external pip url before default ones, so we first look for packages there for e in reversed(extra_url): - PIP_EXTRA_INDICES.insert(0, e) + self._pip_extra_index_url.insert(0, e) except Exception: self.log.warning('Failed adding extra-index-url to pip environment: {}'.format(extra_url)) # update pip install command @@ -339,7 +339,7 @@ class Worker(ServiceCommandSection): ) self.pip_install_cmd = tuple(pip_install_cmd) self.worker_id = self._session.config["agent.worker_id"] or "{}:{}".format( - self._session.config["agent.worker_name"], getpid() + self._session.config["agent.worker_name"], os.getpid() ) self._last_stats = defaultdict(lambda: 0) self._last_report_timestamp = psutil.time.time() @@ -355,6 +355,7 @@ class Worker(ServiceCommandSection): self._docker_image = None self._docker_arguments = None self._daemon_foreground = None + self._standalone_mode = None def _get_requirements_manager(self, os_override=None, base_interpreter=None): requirements_manager = RequirementsManager( @@ -425,24 +426,27 @@ class Worker(ServiceCommandSection): lines=['Running Task {} inside docker: {}\n'.format(task_id, task_docker_cmd)], level="INFO") task_docker_cmd = task_docker_cmd.split(' ') - full_docker_cmd = self.docker_image_func(docker_image=task_docker_cmd[0], - docker_arguments=task_docker_cmd[1:]) + docker_image = task_docker_cmd[0] + docker_arguments = task_docker_cmd[1:] else: self.send_logs(task_id=task_id, lines=['running Task {} inside default docker image: {} {}\n'.format( task_id, self._docker_image, self._docker_arguments or '')], level="INFO") - full_docker_cmd = self.docker_image_func(docker_image=self._docker_image, - docker_arguments=self._docker_arguments) - # Update docker command - try: - docker_cmd = ' '.join([self._docker_image] + self._docker_arguments) - self._session.send_api( - tasks_api.EditRequest(task_id, execution=dict(docker_cmd=docker_cmd), force=True)) - except Exception: - pass + docker_image = self._docker_image + docker_arguments = self._docker_arguments - full_docker_cmd[-1] = full_docker_cmd[-1] + 'execute --disable-monitoring --id ' + task_id + # Update docker command + full_docker_cmd = self.docker_image_func(docker_image=docker_image, docker_arguments=docker_arguments) + try: + self._session.send_api( + tasks_api.EditRequest(task_id, force=True, execution=dict( + docker_cmd=' '.join([docker_image] + docker_arguments) if docker_arguments else docker_image))) + except Exception: + pass + + full_docker_cmd[-1] = full_docker_cmd[-1] + 'execute --disable-monitoring {} --id {}'.format( + '--standalone-mode' if self._standalone_mode else '', task_id) cmd = Argv(*full_docker_cmd) else: cmd = worker_args.get_argv_for_command("execute") + ( @@ -489,7 +493,7 @@ class Worker(ServiceCommandSection): safe_remove_file(temp_stdout_name) safe_remove_file(temp_stderr_name) if self.docker_image_func: - shutdown_docker_process(docker_cmd_ending='--id {}\'\"'.format(task_id)) + shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id)) def run_tasks_loop(self, queues, worker_params): """ @@ -600,6 +604,8 @@ class Worker(ServiceCommandSection): # check if we have the latest version start_check_update_daemon() + self._standalone_mode = kwargs.get('standalone_mode', False) + self.check(**kwargs) self.log.debug("starting resource monitor thread") print("Worker \"{}\" - ".format(self.worker_id), end='') @@ -863,15 +869,21 @@ class Worker(ServiceCommandSection): def build( self, task_id, - target_folder=None, + target=None, python_version=None, + docker=None, **_ ): if not task_id: raise CommandFailedError("Worker build must have valid task id") if not check_if_command_exists("virtualenv"): raise CommandFailedError("Worker must have virtualenv installed") + self._session.print_configuration() + + if docker is not False and docker is not None: + return self._build_docker(docker, target, task_id) + current_task = self._session.api_client.tasks.get_by_id(task_id) execution = self.get_execution_info(current_task) @@ -882,7 +894,7 @@ class Worker(ServiceCommandSection): requirements = None # TODO: make sure we pass the correct python_version - venv_folder, requirements_manager = self.install_virtualenv(venv_dir=target_folder, + venv_folder, requirements_manager = self.install_virtualenv(venv_dir=target, requested_python_version=python_version) if self._default_pip: @@ -913,6 +925,72 @@ class Worker(ServiceCommandSection): return 0 + def _build_docker(self, docker, target, task_id): + + self.temp_config_path = safe_mkstemp( + suffix=".cfg", prefix=".trains_agent.", text=True, name_only=True + ) + if not target: + ValueError("--target container name must be provided for docker build") + + temp_config, docker_image_func = self.get_docker_config_cmd(docker) + self.dump_config(temp_config) + self.docker_image_func = docker_image_func + try: + response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"]) + task_docker_cmd = response.execution.docker_cmd + task_docker_cmd = task_docker_cmd.strip() if task_docker_cmd else None + except Exception: + task_docker_cmd = None + if task_docker_cmd: + print('Building Task {} inside docker: {}\n'.format(task_id, task_docker_cmd)) + task_docker_cmd = task_docker_cmd.split(' ') + full_docker_cmd = self.docker_image_func(docker_image=task_docker_cmd[0], + docker_arguments=task_docker_cmd[1:]) + else: + print('running Task {} inside default docker image: {} {}\n'.format( + task_id, self._docker_image, self._docker_arguments or '')) + full_docker_cmd = self.docker_image_func(docker_image=self._docker_image, + docker_arguments=self._docker_arguments) + end_of_build_marker = "build.done=true" + docker_cmd_suffix = ' build --id {} ; ' \ + 'echo "" >> /root/trains.conf ; ' \ + 'echo {} >> /root/trains.conf ; ' \ + 'bash'.format(task_id, end_of_build_marker) + full_docker_cmd[-1] = full_docker_cmd[-1] + docker_cmd_suffix + cmd = Argv(*full_docker_cmd) + + # we will be checking the configuration file for changes + temp_config = Path(self.temp_config_path) + base_time_stamp = temp_config.stat().st_mtime + + # start the docker + print('Starting docker build') + cmd.call_subprocess(subprocess.Popen) + + # now we need to wait until the line shows on our configuration file. + while True: + while temp_config.stat().st_mtime == base_time_stamp: + sleep(5.0) + with open(temp_config.as_posix()) as f: + lines = [l.strip() for l in f.readlines()] + if 'build.done=true' in lines: + break + base_time_stamp = temp_config.stat().st_mtime + + print('\nDocker build done') + + # get the docker id. + docker_id = get_docker_id(docker_cmd_contains='--id {} '.format(task_id)) + if not docker_id: + print("Error: cannot locate docker for storage") + return + + print('Committing docker container to: {}'.format(target)) + print(commit_docker(container_name=target, docker_id=docker_id)) + shutdown_docker_process(docker_id=docker_id) + return + @resolve_names def execute( self, @@ -921,13 +999,33 @@ class Worker(ServiceCommandSection): optimization=0, disable_monitoring=False, full_monitoring=False, + require_queue=False, log_file=None, + standalone_mode=None, **_ ): if not task_id: raise CommandFailedError("Worker execute must have valid task id") if not check_if_command_exists("virtualenv"): raise CommandFailedError("Worker must have virtualenv installed") + + try: + current_task = self._session.api_client.tasks.get_by_id(task_id) + if not current_task.id: + pass + except Exception: + raise ValueError("Could not find task id={}".format(task_id)) + + # make sure this task is not stuck in an execution queue, it shouldn't have been, but just in case. + try: + res = self._session.api_client.tasks.dequeue(task=current_task.id) + if require_queue and res.meta.result_code != 200: + raise ValueError("Execution required enqueued task, " + "but task id={} is not queued.".format(current_task.id)) + except Exception: + if require_queue: + raise + if full_monitoring: worker_params = WorkerParams( log_level=log_level, @@ -942,13 +1040,8 @@ class Worker(ServiceCommandSection): return self._session.print_configuration() - current_task = self._session.api_client.tasks.get_by_id(task_id) - try: - if not current_task.id: - pass - except Exception: - raise ValueError("Could not find task id={}".format(task_id)) + # now mark the task as started self._session.api_client.tasks.started( task=current_task.id, status_reason="worker started execution", @@ -966,12 +1059,13 @@ class Worker(ServiceCommandSection): except AttributeError: requirements = None - venv_folder, requirements_manager = self.install_virtualenv() + venv_folder, requirements_manager = self.install_virtualenv(standalone_mode=standalone_mode) - if self._default_pip: - self.package_api.install_packages(*self._default_pip) + if not standalone_mode: + if self._default_pip: + self.package_api.install_packages(*self._default_pip) - print("\n") + print("\n") directory, vcs, repo_info = self.get_repo_info( execution, current_task, venv_folder @@ -979,12 +1073,14 @@ class Worker(ServiceCommandSection): print("\n") - self.install_requirements( - execution, - repo_info, - requirements_manager=requirements_manager, - cached_requirements=requirements, - ) + if not standalone_mode: + self.install_requirements( + execution, + repo_info, + requirements_manager=requirements_manager, + cached_requirements=requirements, + ) + # do not update the task packages if we are using conda, # it will most likely make the task environment unreproducible freeze = self.freeze_task_environment(current_task.id if not self.is_conda else None) @@ -1016,7 +1112,7 @@ class Worker(ServiceCommandSection): "log_to_backend": "0", "config_file": self._session.config_file, # The config file is the tmp file that trains_agent created } - environ.update( + os.environ.update( { sdk_key: str(value) for key, value in sdk_env.items() @@ -1027,6 +1123,11 @@ class Worker(ServiceCommandSection): if repo_info: self._update_commit_id(task_id, execution, repo_info) + # Add the script CWD to the python path + python_path = get_python_path(script_dir, execution.entry_point, self.package_api) + if python_path: + os.environ['PYTHONPATH'] = python_path + print("Starting Task Execution:\n".format(task_id)) exit_code = -1 try: @@ -1318,8 +1419,8 @@ class Worker(ServiceCommandSection): self.package_api.load_requirements(cached_requirements) except Exception as e: self.log_traceback(e) - self.error("Could not install task requirements! Trying to install requirements from repository") cached_requirements_failed = True + raise ValueError("Could not install task requirements!") else: self.log("task requirements installation passed") return @@ -1485,7 +1586,7 @@ class Worker(ServiceCommandSection): ) ) - def install_virtualenv(self, venv_dir=None, requested_python_version=None): + def install_virtualenv(self, venv_dir=None, requested_python_version=None, standalone_mode=False): # type: (str, str) -> Tuple[Path, RequirementsManager] """ Install a new python virtual environment, removing the old one if exists @@ -1502,7 +1603,7 @@ class Worker(ServiceCommandSection): self._session.config.put("agent.default_python", executable_version) self._session.config.put("agent.python_binary", executable_name) - first_time = ( + first_time = not standalone_mode and ( is_windows_platform() or self.is_conda or not venv_dir.is_dir() @@ -1532,6 +1633,10 @@ class Worker(ServiceCommandSection): if first_time: self.package_api.remove() self.package_api.create() + elif standalone_mode: + # conda with standalone mode + get_conda = partial(CondaAPI, **package_manager_params) + self.package_api = get_conda() else: get_conda = partial(CondaAPI, **package_manager_params) @@ -1578,7 +1683,8 @@ class Worker(ServiceCommandSection): args.update(kwargs) return self._get_docker_cmd(**args) - docker_image = str(self._session.config.get("agent.default_docker.image", "nvidia/cuda")) \ + docker_image = str(os.environ.get("TRAINS_DOCKER_IMAGE") or os.environ.get("ALG_DOCKER_IMAGE") or + self._session.config.get("agent.default_docker.image", "nvidia/cuda")) \ if not docker_args else docker_args[0] docker_arguments = docker_image.split(' ') if docker_image else [] if len(docker_arguments) > 1: @@ -1591,8 +1697,8 @@ class Worker(ServiceCommandSection): python_version = '3' if not python_version.startswith('python'): python_version = 'python'+python_version - print("Running in Docker mode (v19.03 and above) - using default docker image: {} running {}\n".format( - docker_image, python_version)) + print("Running in Docker {} mode (v19.03 and above) - using default docker image: {} running {}\n".format( + '*standalone*' if self._standalone_mode else '', docker_image, python_version)) temp_config = self._session.config.copy() mounted_cache_dir = '/root/.trains/cache' mounted_pip_dl_dir = '/root/.trains/pip-download-cache' @@ -1648,7 +1754,8 @@ class Worker(ServiceCommandSection): host_ssh_cache=host_ssh_cache, host_cache=host_cache, mounted_cache=mounted_cache_dir, host_pip_dl=host_pip_dl, mounted_pip_dl=mounted_pip_dl_dir, - host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache) + host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache, + standalone_mode=self._standalone_mode) return temp_config, partial(docker_cmd_functor, docker_cmd) @staticmethod @@ -1659,7 +1766,7 @@ class Worker(ServiceCommandSection): host_ssh_cache, host_cache, mounted_cache, host_pip_dl, mounted_pip_dl, - host_vcs_cache, mounted_vcs_cache): + host_vcs_cache, mounted_vcs_cache, standalone_mode=False): docker = 'docker' base_cmd = [docker, 'run', '-t'] @@ -1681,23 +1788,41 @@ class Worker(ServiceCommandSection): if host_ssh_cache: base_cmd += ['-v', host_ssh_cache+':/root/.ssh', ] + # if we are running a RC version, install the same version in the docker + # because the default latest, will be a release version (not RC) + specify_version = '' + try: + from trains_agent.version import __version__ + _version_parts = __version__.split('.') + if 'rc' in _version_parts[-1].lower() or 'rc' in _version_parts[-2].lower(): + specify_version = '=={}'.format(__version__) + except: + pass + + if standalone_mode: + update_scheme = "" + else: + update_scheme = \ + "echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " \ + "chown -R root /root/.cache/pip ; " \ + "apt-get update ; " \ + "apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0 {python_single_digit}-pip ; " \ + "{python} -m pip install -U pip ; " \ + "{python} -m pip install -U trains-agent{specify_version} ; ".format( + python_single_digit=python_version.split('.')[0], + python=python_version, specify_version=specify_version) + base_cmd += [ - '-v', conf_file+':/root/trains.conf', - '-v', host_apt_cache+':/var/cache/apt/archives', - '-v', host_pip_cache+':/root/.cache/pip', - '-v', host_pip_dl+':'+mounted_pip_dl, - '-v', host_cache+':'+mounted_cache, - '-v', host_vcs_cache+':'+mounted_vcs_cache, - '--rm', docker_image, 'bash', '-c', - "echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " - "chown -R root /root/.cache/pip ; " - "apt-get update ; " - "apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0 {python_single_digit}-pip ; " - "{python} -m pip install -U pip ; " - "{python} -m pip install -U trains-agent ; " - "NVIDIA_VISIBLE_DEVICES=all CUDA_VISIBLE_DEVICES= {python} -u -m trains_agent ".format( - python_single_digit=python_version.split('.')[0], - python=python_version)] + '-v', conf_file+':/root/trains.conf', + '-v', host_apt_cache+':/var/cache/apt/archives', + '-v', host_pip_cache+':/root/.cache/pip', + '-v', host_pip_dl+':'+mounted_pip_dl, + '-v', host_cache+':'+mounted_cache, + '-v', host_vcs_cache+':'+mounted_vcs_cache, + '--rm', docker_image, 'bash', '-c', + update_scheme + + "NVIDIA_VISIBLE_DEVICES=all {python} -u -m trains_agent ".format(python=python_version) + ] return base_cmd diff --git a/trains_agent/helper/base.py b/trains_agent/helper/base.py index 96292e1..86faaf8 100644 --- a/trains_agent/helper/base.py +++ b/trains_agent/helper/base.py @@ -176,6 +176,25 @@ def safe_remove_file(filename, error_message=None): print(error_message) +def get_python_path(script_dir, entry_point, package_api): + try: + python_path_sep = ';' if is_windows_platform() else ':' + python_path_cmd = package_api.get_python_command( + ["-c", "import sys; print('{}'.join(sys.path))".format(python_path_sep)]) + org_python_path = python_path_cmd.get_output(cwd=script_dir) + # Add path of the script directory and executable directory + python_path = '{}{python_path_sep}{}{python_path_sep}'.format( + Path(script_dir).absolute().as_posix(), + (Path(script_dir) / Path(entry_point)).parent.absolute().as_posix(), + python_path_sep=python_path_sep) + if is_windows_platform(): + return python_path.replace('/', '\\') + org_python_path + + return python_path + org_python_path + except Exception: + return None + + class Singleton(ABCMeta): _instances = {} diff --git a/trains_agent/helper/process.py b/trains_agent/helper/process.py index 4e80e69..6422ddc 100644 --- a/trains_agent/helper/process.py +++ b/trains_agent/helper/process.py @@ -59,17 +59,47 @@ def kill_all_child_processes(pid=None): parent.kill() -def shutdown_docker_process(docker_cmd_ending): +def get_docker_id(docker_cmd_contains): try: containers_running = get_bash_output(cmd='docker ps --no-trunc --format \"{{.ID}}: {{.Command}}\"') for docker_line in containers_running.split('\n'): parts = docker_line.split(':') - if parts[-1].endswith(docker_cmd_ending): - # we found our docker, stop it - get_bash_output(cmd='docker stop -t 1 {}'.format(parts[0])) - return + if docker_cmd_contains in parts[-1]: + # we found our docker, return it + return parts[0] except Exception: pass + return None + + +def shutdown_docker_process(docker_cmd_contains=None, docker_id=None): + try: + if not docker_id: + docker_id = get_docker_id(docker_cmd_contains=docker_cmd_contains) + if docker_id: + # we found our docker, stop it + get_bash_output(cmd='docker stop -t 1 {}'.format(docker_id)) + except Exception: + pass + + +def commit_docker(container_name, docker_cmd_contains=None, docker_id=None): + try: + if not docker_id: + docker_id = get_docker_id(docker_cmd_contains=docker_cmd_contains) + if not docker_id: + print("Failed locating requested docker") + return False + + if docker_id: + # we found our docker, stop it + output = get_bash_output(cmd='docker commit {} {}'.format(docker_id, container_name)) + return output + except Exception: + pass + + print("Failed storing requested docker") + return False def check_if_command_exists(cmd):