diff --git a/clearml_agent/__main__.py b/clearml_agent/__main__.py index 93b30c9..0bcc318 100644 --- a/clearml_agent/__main__.py +++ b/clearml_agent/__main__.py @@ -12,7 +12,7 @@ from clearml_agent.definitions import FileBuffering, CONFIG_FILE from clearml_agent.helper.base import reverse_home_folder_expansion, chain_map, named_temporary_file from clearml_agent.helper.process import ExitStatus from . import interface, session, definitions, commands -from .errors import ConfigFileNotFound, Sigterm, APIError +from .errors import ConfigFileNotFound, Sigterm, APIError, CustomBuildScriptFailed from .helper.trace import PackageTrace from .interface import get_parser @@ -44,6 +44,8 @@ def run_command(parser, args, command_name): debug = command._session.debug_mode func = getattr(command, command_name) return func(**args_dict) + except CustomBuildScriptFailed as e: + command_class.exit(e.message, e.errno) except ConfigFileNotFound: message = 'Cannot find configuration file in "{}".\n' \ 'To create a configuration file, run:\n' \ diff --git a/clearml_agent/backend_api/config/default/agent.conf b/clearml_agent/backend_api/config/default/agent.conf index ba6087e..d7a2aa5 100644 --- a/clearml_agent/backend_api/config/default/agent.conf +++ b/clearml_agent/backend_api/config/default/agent.conf @@ -35,6 +35,11 @@ # default false, only the working directory will be added to the PYHTONPATH # force_git_root_python_path: false + # in docker mode, if container's entrypoint automatically activated a virtual environment + # use the activated virtual environment and install everything there + # set to False to disable, and always create a new venv inheriting from the system_site_packages + # docker_use_activated_venv: true + # select python package manager: # currently supported: pip, conda and poetry # if "pip" or "conda" are used, the agent installs the required packages @@ -269,4 +274,34 @@ # target_format: json # } # } + + # Specifies a custom environment setup script to be executed instead of installing a virtual environment. + # If provided, this script is executed following Git cloning. Script command may include environment variable and + # will be expanded before execution (e.g. "$CLEARML_GIT_ROOT/script.sh"). + # The script can also be specified using the CLEARML_AGENT_CUSTOM_BUILD_SCRIPT environment variable. + # + # When running the script, the following environment variables will be set: + # - CLEARML_CUSTOM_BUILD_TASK_CONFIG_JSON: specifies a path to a temporary files containing the complete task + # contents in JSON format + # - CLEARML_TASK_SCRIPT_ENTRY: task entrypoint script as defined in the task's script section + # - CLEARML_TASK_WORKING_DIR: task working directory as defined in the task's script section + # - CLEARML_VENV_PATH: path to the agent's default virtual environment path (as defined in the configuration) + # - CLEARML_GIT_ROOT: path to the cloned Git repository + # - CLEARML_CUSTOM_BUILD_OUTPUT: a path to a non-existing file that may be created by the script. If created, + # this file must be in the following JSON format: + # ```json + # { + # "binary": "/absolute/path/to/python-executable", + # "entry_point": "/absolute/path/to/task-entrypoint-script", + # "working_dir": "/absolute/path/to/task-working/dir" + # } + # ``` + # If provided, the agent will use these instead of the predefined task script section to execute the task and will + # skip virtual environment creation. + # + # In case the custom script returns with a non-zero exit code, the agent will fail with the same exit code. + # In case the custom script is specified but does not exist, or if the custom script does not write valid content + # into the file specified in CLEARML_CUSTOM_BUILD_OUTPUT, the agent will emit a warning and continue with the + # standard flow. + custom_build_script: "" } diff --git a/clearml_agent/backend_api/session/defs.py b/clearml_agent/backend_api/session/defs.py index 2964bcd..cf266b4 100644 --- a/clearml_agent/backend_api/session/defs.py +++ b/clearml_agent/backend_api/session/defs.py @@ -15,6 +15,7 @@ ENV_NO_DEFAULT_SERVER = EnvEntry("CLEARML_NO_DEFAULT_SERVER", "TRAINS_NO_DEFAULT ENV_DISABLE_VAULT_SUPPORT = EnvEntry('CLEARML_AGENT_DISABLE_VAULT_SUPPORT', type=bool) ENV_ENABLE_ENV_CONFIG_SECTION = EnvEntry('CLEARML_AGENT_ENABLE_ENV_CONFIG_SECTION', type=bool) ENV_ENABLE_FILES_CONFIG_SECTION = EnvEntry('CLEARML_AGENT_ENABLE_FILES_CONFIG_SECTION', type=bool) +ENV_VENV_CONFIGURED = EnvEntry('VIRTUAL_ENV', type=str) ENV_INITIAL_CONNECT_RETRY_OVERRIDE = EnvEntry( 'CLEARML_AGENT_INITIAL_CONNECT_RETRY_OVERRIDE', default=True, converter=safe_text_to_bool ) diff --git a/clearml_agent/backend_api/session/session.py b/clearml_agent/backend_api/session/session.py index e03c683..ace788e 100644 --- a/clearml_agent/backend_api/session/session.py +++ b/clearml_agent/backend_api/session/session.py @@ -206,7 +206,7 @@ class Session(TokenManager): http_retries_config = dict(**http_retries_config) http_retries_config['connect'] = connect_retries - return http_retries_config, get_http_session_with_retry(**http_retries_config) + return http_retries_config, get_http_session_with_retry(config=self.config or None, **http_retries_config) def load_vaults(self): if not self.check_min_api_version("2.15") or self.feature_set == "basic": diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index 5855dad..42e53c2 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -39,7 +39,9 @@ from clearml_agent.backend_api.services import queues as queues_api from clearml_agent.backend_api.services import tasks as tasks_api from clearml_agent.backend_api.services import workers as workers_api from clearml_agent.backend_api.session import CallResult -from clearml_agent.backend_api.session.defs import ENV_ENABLE_ENV_CONFIG_SECTION, ENV_ENABLE_FILES_CONFIG_SECTION +from clearml_agent.backend_api.session.defs import ( + ENV_ENABLE_ENV_CONFIG_SECTION, ENV_ENABLE_FILES_CONFIG_SECTION, + ENV_VENV_CONFIGURED, ) from clearml_agent.backend_config.defs import UptimeConf from clearml_agent.backend_config.utils import apply_environment, apply_files from clearml_agent.commands.base import resolve_names, ServiceCommandSection @@ -65,10 +67,17 @@ from clearml_agent.definitions import ( ENV_SSH_AUTH_SOCK, ENV_AGENT_SKIP_PIP_VENV_INSTALL, ENV_EXTRA_DOCKER_ARGS, + ENV_CUSTOM_BUILD_SCRIPT, ENV_AGENT_SKIP_PYTHON_ENV_INSTALL, WORKING_STANDALONE_DIR, ) from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES -from clearml_agent.errors import APIError, CommandFailedError, Sigterm +from clearml_agent.errors import ( + APIError, + CommandFailedError, + Sigterm, + SkippedCustomBuildScript, + CustomBuildScriptFailed, +) from clearml_agent.helper.base import ( return_list, print_parameters, @@ -218,7 +227,7 @@ class LiteralScriptManager(object): location = None location = location or (repo_info and repo_info.root) if not location: - location = Path(self.venv_folder, "code") + location = Path(self.venv_folder, WORKING_STANDALONE_DIR) location.mkdir(exist_ok=True, parents=True) log.debug("selected execution directory: %s", location) return Text(location), self.write(task, location, execution.entry_point) @@ -698,6 +707,9 @@ class Worker(ServiceCommandSection): ) if self._impersonate_as_task_owner: docker_params["auth_token"] = task_session.token + elif self._session.access_key is None or self._session.secret_key is None: + # We're using a token right now + docker_params["auth_token"] = self._session.token if self._worker_tags: docker_params["worker_tags"] = self._worker_tags if self._services_mode: @@ -720,7 +732,7 @@ class Worker(ServiceCommandSection): else: print("Warning: generated docker container name is invalid: {}".format(name)) - full_docker_cmd = self.docker_image_func(**docker_params) + full_docker_cmd = self.docker_image_func(env_task_id=task_id, **docker_params) # if we are using the default docker, update back the Task: if default_docker: @@ -1258,6 +1270,7 @@ class Worker(ServiceCommandSection): self._session.print_configuration() def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs): + self._apply_extra_configuration() # check that we have docker command if we need it if docker not in (False, None) and not check_if_command_exists("docker"): @@ -1292,8 +1305,12 @@ class Worker(ServiceCommandSection): # We are not running a daemon we are killing one. # find the pid send termination signal and leave - if kwargs.get('stop', False): - return 1 if not self._kill_daemon(dynamic_gpus=dynamic_gpus) else 0 + if kwargs.get('stop', False) is not False: + return_code = 0 + for worker_id in kwargs.get('stop') or [None]: + if not self._kill_daemon(dynamic_gpus=dynamic_gpus, worker_id=worker_id): + return_code = 1 + return return_code # if we do not need to create queues, make sure they are valid # match previous behaviour when we validated queue names before everything else @@ -1772,11 +1789,19 @@ class Worker(ServiceCommandSection): "ERROR! Failed applying git diff, see diff above.".format(diff)) def _apply_extra_configuration(self): + # store a few things we updated in runtime (TODO: we should list theme somewhere) + agent_config = self._session.config["agent"].copy() + agent_config_keys = ["cuda_version", "cudnn_version", "default_python", "worker_id", "debug"] try: self._session.load_vaults() except Exception as ex: print("Error: failed applying extra configuration: {}".format(ex)) + # merge back + for restore_key in agent_config_keys: + if restore_key in agent_config: + self._session.config["agent"][restore_key] = agent_config[restore_key] + config = self._session.config default = config.get("agent.apply_environment", False) if ENV_ENABLE_ENV_CONFIG_SECTION.get(default=default): @@ -1829,13 +1854,7 @@ class Worker(ServiceCommandSection): requirements = None if not python_version: - try: - python_version = current_task.script.binary - python_version = python_version.split('/')[-1].replace('python', '') - # if we can cast it, we are good - python_version = '{:.1f}'.format(float(python_version)) - except: - python_version = None + python_version = self._get_task_python_version(current_task) venv_folder, requirements_manager, is_cached = self.install_virtualenv( venv_dir=target, requested_python_version=python_version, execution_info=execution, @@ -1985,6 +2004,16 @@ class Worker(ServiceCommandSection): return + def _get_task_python_version(self, task): + # noinspection PyBroadException + try: + python_ver = task.script.binary + python_ver = python_ver.split('/')[-1].replace('python', '') + # if we can cast it, we are good + return '{:.1f}'.format(float(python_ver)) + except Exception: + pass + @resolve_names def execute( self, @@ -2097,85 +2126,140 @@ class Worker(ServiceCommandSection): execution = self.get_execution_info(current_task) - if self._session.config.get("agent.package_manager.force_repo_requirements_txt", False): - requirements = None - print("[package_manager.force_repo_requirements_txt=true] " - "Skipping requirements, using repository \"requirements.txt\" ") - else: + python_ver = self._get_task_python_version(current_task) + + freeze = None + repo_info = None + script_dir = "" + venv_folder = "" + + custom_build_script = self._session.config.get("agent.custom_build_script", "") or ENV_CUSTOM_BUILD_SCRIPT.get() + if custom_build_script: try: - requirements = current_task.script.requirements - except AttributeError: + venv_folder = Path(self._session.config["agent.venvs_dir"], python_ver or "3") + venv_folder = Path(os.path.expanduser(os.path.expandvars(venv_folder.as_posix()))) + directory, vcs, repo_info = self.get_repo_info( + execution, current_task, str(venv_folder) + ) + binary, entry_point, working_dir = self.run_custom_build_script( + custom_build_script, + current_task, + execution, + venv_folder=venv_folder, + git_root=vcs.location, + ) + + execution.entry_point = str(entry_point) + execution.working_dir = str(working_dir) + script_dir = str(working_dir) + + self.package_api = VirtualenvPip( + session=self._session, + interpreter=str(binary), + python=str(binary), + requirements_manager=RequirementsManager(self._session), + execution_info=execution, + path=venv_folder, + ) + + self.global_package_api = SystemPip( + session=self._session, + interpreter=str(binary), + ) + + except SkippedCustomBuildScript as ex: + print("Warning: {}".format(str(ex))) + custom_build_script = None + + if not custom_build_script: + if self._session.config.get("agent.package_manager.force_repo_requirements_txt", False): requirements = None + print("[package_manager.force_repo_requirements_txt=true] " + "Skipping requirements, using repository \"requirements.txt\" ") + else: + try: + requirements = current_task.script.requirements + except AttributeError: + requirements = None - try: - python_ver = current_task.script.binary - python_ver = python_ver.split('/')[-1].replace('python', '') - # if we can cast it, we are good - python_ver = '{:.1f}'.format(float(python_ver)) - except: - python_ver = None + alternative_code_folder = None + if ENV_AGENT_SKIP_PYTHON_ENV_INSTALL.get(): + venv_folder, requirements_manager, is_cached = None, None, False + # we need to create a folder for the code to be dumped into + code_folder = self._session.config.get("agent.venvs_dir") + code_folder = Path(os.path.expanduser(os.path.expandvars(code_folder))) + # let's make sure it is clear from previous runs + rm_tree(normalize_path(code_folder, WORKING_REPOSITORY_DIR)) + rm_tree(normalize_path(code_folder, WORKING_STANDALONE_DIR)) + if not code_folder.exists(): + code_folder.mkdir(parents=True, exist_ok=True) + alternative_code_folder = code_folder.as_posix() + else: + venv_folder, requirements_manager, is_cached = self.install_virtualenv( + standalone_mode=standalone_mode, + requested_python_version=python_ver, + execution_info=execution, + cached_requirements=requirements, + ) - venv_folder, requirements_manager, is_cached = self.install_virtualenv( - standalone_mode=standalone_mode, - requested_python_version=python_ver, - execution_info=execution, - cached_requirements=requirements, - ) + if not is_cached and not standalone_mode: + if self._default_pip: + self.package_api.install_packages(*self._default_pip) - if not is_cached and not standalone_mode: - if self._default_pip: - self.package_api.install_packages(*self._default_pip) + print("\n") + + # either use the venvs base folder for code or the cwd + directory, vcs, repo_info = self.get_repo_info( + execution, current_task, str(venv_folder or alternative_code_folder) + ) print("\n") - directory, vcs, repo_info = self.get_repo_info( - execution, current_task, venv_folder - ) + cwd = vcs.location if vcs and vcs.location else directory - print("\n") + if not standalone_mode: + if is_cached: + # reinstalling git / local packages + package_api = copy(self.package_api) + OnlyExternalRequirements.cwd = package_api.cwd = cwd + package_api.requirements_manager = self._get_requirements_manager( + base_interpreter=package_api.requirements_manager.get_interpreter(), + requirement_substitutions=[OnlyExternalRequirements] + ) + # make sure we run the handlers + cached_requirements = \ + {k: package_api.requirements_manager.replace(requirements[k] or '') + for k in requirements} + if str(cached_requirements.get('pip', '')).strip() \ + or str(cached_requirements.get('conda', '')).strip(): + package_api.load_requirements(cached_requirements) + # make sure we call the correct freeze + requirements_manager = package_api.requirements_manager + elif requirements_manager: + self.install_requirements( + execution, + repo_info, + requirements_manager=requirements_manager, + cached_requirements=requirements, + cwd=cwd, + ) + elif not self.package_api: + # check if we have to manually configure package API, it will be readonly + self.package_api = SystemPip(session=self._session) - cwd = vcs.location if vcs and vcs.location else directory + # do not update the task packages if we are using conda, + # it will most likely make the task environment unreproducible + skip_freeze_update = self.is_conda and not self._session.config.get( + "agent.package_manager.conda_full_env_update", False) - if not standalone_mode: - if is_cached: - # reinstalling git / local packages - package_api = copy(self.package_api) - OnlyExternalRequirements.cwd = package_api.cwd = cwd - package_api.requirements_manager = self._get_requirements_manager( - base_interpreter=package_api.requirements_manager.get_interpreter(), - requirement_substitutions=[OnlyExternalRequirements] - ) - # make sure we run the handlers - cached_requirements = \ - {k: package_api.requirements_manager.replace(requirements[k] or '') - for k in requirements} - if str(cached_requirements.get('pip', '')).strip() \ - or str(cached_requirements.get('conda', '')).strip(): - package_api.load_requirements(cached_requirements) - # make sure we call the correct freeze - requirements_manager = package_api.requirements_manager - else: - self.install_requirements( - execution, - repo_info, - requirements_manager=requirements_manager, - cached_requirements=requirements, - cwd=cwd, - ) - - # do not update the task packages if we are using conda, - # it will most likely make the task environment unreproducible - skip_freeze_update = self.is_conda and not self._session.config.get( - "agent.package_manager.conda_full_env_update", False) - - freeze = self.freeze_task_environment( - task_id=current_task.id, - requirements_manager=requirements_manager, - add_venv_folder_cache=venv_folder, - execution_info=execution, - update_requirements=not skip_freeze_update, - ) - script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix() + freeze = self.freeze_task_environment( + task_id=current_task.id, + requirements_manager=requirements_manager, + add_venv_folder_cache=venv_folder, + execution_info=execution, + update_requirements=not skip_freeze_update, + ) + script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix() # run code # print("Running task id [%s]:" % current_task.id) @@ -2185,7 +2269,9 @@ class Worker(ServiceCommandSection): extra.append( WorkerParams(optimization=optimization).get_optimization_flag() ) + # check if this is a module load, then load it. + # noinspection PyBroadException try: if current_task.script.binary and current_task.script.binary.startswith('python') and \ execution.entry_point and execution.entry_point.split()[0].strip() == '-m': @@ -2193,7 +2279,7 @@ class Worker(ServiceCommandSection): extra.extend(shlex.split(execution.entry_point)) else: extra.append(execution.entry_point) - except: + except Exception: extra.append(execution.entry_point) command = self.package_api.get_python_command(extra) @@ -2577,7 +2663,7 @@ class Worker(ServiceCommandSection): python_version=getattr(self.package_api, 'python', ''), cuda_version=self._session.config.get("agent.cuda_version"), source_folder=add_venv_folder_cache, - exclude_sub_folders=['task_repository', 'code']) + exclude_sub_folders=[WORKING_REPOSITORY_DIR, WORKING_STANDALONE_DIR]) # If do not update back requirements if not update_requirements: @@ -2852,28 +2938,122 @@ class Worker(ServiceCommandSection): ) ) - def install_virtualenv( - self, - venv_dir=None, - requested_python_version=None, - standalone_mode=False, - execution_info=None, - cached_requirements=None, - ): - # type: (str, str, bool, ExecutionInfo, dict) -> Tuple[Path, RequirementsManager, bool] + def run_custom_build_script(self, script, task, execution, venv_folder, git_root): + # type: (str, tasks_api.Task, ExecutionInfo, Path, str)-> Tuple[Path, Path, Path] """ - Install a new python virtual environment, removing the old one if exists - If CLEARML_SKIP_PIP_VENV_INSTALL is set then an emtpy virtual env folder is created - and package manager is configured to work with the global python interpreter (the interpreter - path itself can be passed in this variable) - :return: virtualenv directory, requirements manager to use with task, True if there is a cached venv entry + Run a custom env build script + :param script: + :return: A tuple containing: + - a full path to a python executable + - a new task entry_point (replacing the entry_point in the task's script section) + - a new working directory (replacing the working_dir in the task's script section) + - a requirements manager instance """ - skip_pip_venv_install = ENV_AGENT_SKIP_PIP_VENV_INSTALL.get() + script = os.path.expanduser(os.path.expandvars(script)) + + try: + if not os.path.isfile(script): + raise SkippedCustomBuildScript("Build script {} is not found".format(script)) + except OSError as ex: + raise SkippedCustomBuildScript(str(ex)) + + print("Running custom build script {}".format(script)) + + script_output_file = NamedTemporaryFile(prefix="custom_build_script", suffix=".json", mode="wt", delete=False) + + os.environ["CLEARML_AGENT_CUSTOM_BUILD_SCRIPT"] = script + os.environ["CLEARML_CUSTOM_BUILD_TASK_CONFIG_JSON"] = json.dumps( + task.to_dict(), separators=(',', ':'), default=str + ) + os.environ["CLEARML_CUSTOM_BUILD_OUTPUT"] = script_output_file.name + os.environ["CLEARML_TASK_SCRIPT_ENTRY"] = execution.entry_point + os.environ["CLEARML_TASK_WORKING_DIR"] = execution.working_dir + os.environ["CLEARML_VENV_PATH"] = str(venv_folder) + os.environ["CLEARML_GIT_ROOT"] = git_root + + try: + subprocess.check_call([script]) + except subprocess.CalledProcessError as ex: + raise CustomBuildScriptFailed( + message="Custom build script failed with return code {}".format(ex.returncode), + errno=ex.returncode + ) + + output = Path(script_output_file.name).read_text() + if not output: + raise SkippedCustomBuildScript("Build script {} is not found".format(script)) + + try: + output = json.loads(output) + binary = Path(output["binary"]) + entry_point = Path(output["entry_point"]) + working_dir = Path(output["working_dir"]) + except ValueError as ex: + raise SkippedCustomBuildScript( + "Failed parsing build script output JSON ({}): {}".format(script_output_file.name, ex) + ) + except KeyError as ex: + raise SkippedCustomBuildScript("Build script output missing {} field".format(ex.args[0])) + + try: + if not binary.is_file(): + raise SkippedCustomBuildScript( + "Invalid binary path returned from custom build script: {}".format(binary) + ) + if not entry_point.is_file(): + raise SkippedCustomBuildScript( + "Invalid entrypoint path returned from custom build script: {}".format(entry_point) + ) + if not working_dir.is_dir(): + raise SkippedCustomBuildScript( + "Invalid working dir returned from custom build script: {}".format(working_dir) + ) + except OSError as ex: + raise SkippedCustomBuildScript(str(ex)) + + return binary, entry_point, working_dir + + def _get_skip_pip_venv_install(self, skip_pip_venv_install=None): + if skip_pip_venv_install is None: + skip_pip_venv_install = ENV_AGENT_SKIP_PIP_VENV_INSTALL.get() + if skip_pip_venv_install: try: skip_pip_venv_install = bool(strtobool(skip_pip_venv_install)) except ValueError: pass + elif ENV_VENV_CONFIGURED.get() and ENV_DOCKER_IMAGE.get() and \ + self._session.config.get("agent.docker_use_activated_venv", True) and \ + self._session.config.get("agent.package_manager.system_site_packages", False): + # if we are running inside a container, and virtual environment is already installed, + # we should install directly into it, because we cannot inherit from the system packages + skip_pip_venv_install = find_executable("python") or True + + # check if we are running inside a container: + print( + "Warning! Found python virtual environment [{}] already activated inside the container, " + "installing packages into venv (pip does not support inherit/nested venv)".format( + skip_pip_venv_install if isinstance(skip_pip_venv_install, str) else ENV_VENV_CONFIGURED.get()) + ) + return skip_pip_venv_install + + def install_virtualenv( + self, + venv_dir=None, + requested_python_version=None, + standalone_mode=False, + execution_info=None, + cached_requirements=None, + ): + # type: (str, str, bool, ExecutionInfo, dict) -> Tuple[Path, RequirementsManager, bool] + """ + Install a new python virtual environment, removing the old one if exists + If skip_pip_venv_install is True or contains a string (or if CLEARML_SKIP_PIP_VENV_INSTALL is set) + then an emtpy virtual env folder is created and package manager is configured to work with the global python + interpreter (or using a custom interpreter if an interpreter path is passed in this variable) + :return: virtualenv directory, requirements manager to use with task, True if there is a cached venv entry + """ + skip_pip_venv_install = self._get_skip_pip_venv_install() if self._session.config.get("agent.ignore_requested_python_version", None): requested_python_version = '' @@ -2930,13 +3110,50 @@ class Worker(ServiceCommandSection): or not self.is_venv_update ) + if not standalone_mode: + rm_tree(normalize_path(venv_dir, WORKING_REPOSITORY_DIR)) + rm_tree(normalize_path(venv_dir, WORKING_STANDALONE_DIR)) + + call_package_manager_create, requirements_manager = self._setup_package_api( + executable_name=executable_name, + executable_version_suffix=executable_version_suffix, + venv_dir=venv_dir, + execution_info=execution_info, + standalone_mode=standalone_mode, + skip_pip_venv_install=skip_pip_venv_install, + first_time=first_time, + ) + + # check if we have a cached folder + if cached_requirements and not skip_pip_venv_install and self.package_api.get_cached_venv( + requirements=cached_requirements, + docker_cmd=execution_info.docker_cmd if execution_info else None, + python_version=self.package_api.python, + cuda_version=self._session.config.get("agent.cuda_version"), + destination_folder=Path(venv_dir) + ): + print('::: Using Cached environment {} :::'.format(self.package_api.get_last_used_entry_cache())) + return venv_dir, requirements_manager, True + + # create the initial venv + if not skip_pip_venv_install: + if call_package_manager_create: + self.package_api.create() + else: + if not venv_dir.exists(): + venv_dir.mkdir(parents=True, exist_ok=True) + + return venv_dir, requirements_manager, False + + def _setup_package_api( + self, executable_name, executable_version_suffix, venv_dir, execution_info, + standalone_mode, skip_pip_venv_install=False, first_time=False + ): + # type: (str, str, Path, ExecutionInfo, bool, bool, bool) -> Tuple[bool, RequirementsManager] requirements_manager = self._get_requirements_manager( base_interpreter=executable_name ) - if not standalone_mode: - rm_tree(normalize_path(venv_dir, WORKING_REPOSITORY_DIR)) - package_manager_params = dict( session=self._session, python=executable_version_suffix if self.is_conda else executable_name, @@ -2951,7 +3168,6 @@ class Worker(ServiceCommandSection): ) call_package_manager_create = False - if not self.is_conda: if standalone_mode or skip_pip_venv_install: # pip with standalone mode @@ -2959,7 +3175,10 @@ class Worker(ServiceCommandSection): if standalone_mode: self.package_api = VirtualenvPip(**package_manager_params) else: - self.package_api = self.global_package_api + # we can change it, no one is going to use it anyhow + package_manager_params['path'] = None + package_manager_params['interpreter'] = executable_name + self.package_api = VirtualenvPip(**package_manager_params) else: if self.is_venv_update: self.package_api = VenvUpdateAPI( @@ -2997,26 +3216,7 @@ class Worker(ServiceCommandSection): venv_dir = new_venv_folder self.package_api = get_conda(path=venv_dir) - # check if we have a cached folder - if cached_requirements and not skip_pip_venv_install and self.package_api.get_cached_venv( - requirements=cached_requirements, - docker_cmd=execution_info.docker_cmd if execution_info else None, - python_version=package_manager_params['python'], - cuda_version=self._session.config.get("agent.cuda_version"), - destination_folder=Path(venv_dir) - ): - print('::: Using Cached environment {} :::'.format(self.package_api.get_last_used_entry_cache())) - return venv_dir, requirements_manager, True - - # create the initial venv - if not skip_pip_venv_install: - if call_package_manager_create: - self.package_api.create() - else: - if not venv_dir.exists(): - venv_dir.mkdir(parents=True, exist_ok=True) - - return venv_dir, requirements_manager, False + return call_package_manager_create, requirements_manager def parse_requirements(self, reqs_file=None, overrides=None): os = None @@ -3266,6 +3466,7 @@ class Worker(ServiceCommandSection): worker_tags=None, name=None, mount_ssh=None, mount_apt_cache=None, mount_pip_cache=None, mount_poetry_cache=None, + env_task_id=None, ): docker = 'docker' @@ -3359,6 +3560,9 @@ class Worker(ServiceCommandSection): # update the docker image, so the system knows where it runs base_cmd += ['-e', 'CLEARML_DOCKER_IMAGE={} {}'.format(docker_image, ' '.join(docker_arguments or [])).strip()] + if env_task_id: + base_cmd += ['-e', 'CLEARML_TASK_ID={}'.format(env_task_id), ] + if auth_token: # if auth token is passed then put it in the env var base_cmd += ['-e', '{}={}'.format(ENV_AGENT_AUTH_TOKEN.vars[0], auth_token)] @@ -3550,8 +3754,11 @@ class Worker(ServiceCommandSection): return command, script_dir - def _kill_daemon(self, dynamic_gpus=False): - worker_id, worker_name = self._generate_worker_id_name(dynamic_gpus=dynamic_gpus) + def _kill_daemon(self, dynamic_gpus=False, worker_id=None): + if not worker_id: + worker_id, worker_name = self._generate_worker_id_name(dynamic_gpus=dynamic_gpus) + else: + worker_name = worker_id # Iterate over all running process for pid, uid, slot, file in sorted(Singleton.get_running_pids(), key=lambda x: x[1] or ''): diff --git a/clearml_agent/definitions.py b/clearml_agent/definitions.py index b20b357..00a502a 100644 --- a/clearml_agent/definitions.py +++ b/clearml_agent/definitions.py @@ -126,6 +126,7 @@ DEFAULT_VENV_UPDATE_URL = ( "https://raw.githubusercontent.com/Yelp/venv-update/v3.2.4/venv_update.py" ) WORKING_REPOSITORY_DIR = "task_repository" +WORKING_STANDALONE_DIR = "code" DEFAULT_VCS_CACHE = normalize_path(CONFIG_DIR, "vcs-cache") PIP_EXTRA_INDICES = [ ] @@ -134,6 +135,7 @@ ENV_DOCKER_IMAGE = EnvironmentConfig('CLEARML_DOCKER_IMAGE', 'TRAINS_DOCKER_IMAG ENV_WORKER_ID = EnvironmentConfig('CLEARML_WORKER_ID', 'TRAINS_WORKER_ID') ENV_WORKER_TAGS = EnvironmentConfig('CLEARML_WORKER_TAGS') ENV_AGENT_SKIP_PIP_VENV_INSTALL = EnvironmentConfig('CLEARML_AGENT_SKIP_PIP_VENV_INSTALL') +ENV_AGENT_SKIP_PYTHON_ENV_INSTALL = EnvironmentConfig('CLEARML_AGENT_SKIP_PYTHON_ENV_INSTALL', type=bool) ENV_DOCKER_SKIP_GPUS_FLAG = EnvironmentConfig('CLEARML_DOCKER_SKIP_GPUS_FLAG', 'TRAINS_DOCKER_SKIP_GPUS_FLAG') ENV_AGENT_GIT_USER = EnvironmentConfig('CLEARML_AGENT_GIT_USER', 'TRAINS_AGENT_GIT_USER') ENV_AGENT_GIT_PASS = EnvironmentConfig('CLEARML_AGENT_GIT_PASS', 'TRAINS_AGENT_GIT_PASS') @@ -147,6 +149,38 @@ ENV_DOCKER_HOST_MOUNT = EnvironmentConfig('CLEARML_AGENT_K8S_HOST_MOUNT', 'CLEAR ENV_VENV_CACHE_PATH = EnvironmentConfig('CLEARML_AGENT_VENV_CACHE_PATH') ENV_EXTRA_DOCKER_ARGS = EnvironmentConfig('CLEARML_AGENT_EXTRA_DOCKER_ARGS', type=list) +ENV_CUSTOM_BUILD_SCRIPT = EnvironmentConfig('CLEARML_AGENT_CUSTOM_BUILD_SCRIPT') +""" + Specifies a custom environment setup script to be executed instead of installing a virtual environment. + If provided, this script is executed following Git cloning. Script command may include environment variable and + will be expanded before execution (e.g. "$CLEARML_GIT_ROOT/script.sh"). + The script can also be specified using the `agent.custom_build_script` configuration setting. + + When running the script, the following environment variables will be set: + - CLEARML_CUSTOM_BUILD_TASK_CONFIG_JSON: specifies a path to a temporary files containing the complete task + contents in JSON format + - CLEARML_TASK_SCRIPT_ENTRY: task entrypoint script as defined in the task's script section + - CLEARML_TASK_WORKING_DIR: task working directory as defined in the task's script section + - CLEARML_VENV_PATH: path to the agent's default virtual environment path (as defined in the configuration) + - CLEARML_GIT_ROOT: path to the cloned Git repository + - CLEARML_CUSTOM_BUILD_OUTPUT: a path to a non-existing file that may be created by the script. If created, + this file must be in the following JSON format: + ```json + { + "binary": "/absolute/path/to/python-executable", + "entry_point": "/absolute/path/to/task-entrypoint-script", + "working_dir": "/absolute/path/to/task-working/dir" + } + ``` + If provided, the agent will use these instead of the predefined task script section to execute the task and will + skip virtual environment creation. + + In case the custom script returns with a non-zero exit code, the agent will fail with the same exit code. + In case the custom script is specified but does not exist, or if the custom script does not write valid content + into the file specified in CLEARML_CUSTOM_BUILD_OUTPUT, the agent will emit a warning and continue with the + standard flow. +""" + class FileBuffering(IntEnum): """ diff --git a/clearml_agent/errors.py b/clearml_agent/errors.py index b4d6bc8..00f7694 100644 --- a/clearml_agent/errors.py +++ b/clearml_agent/errors.py @@ -84,3 +84,13 @@ class MissingPackageError(CommandFailedError): def __str__(self): return '{self.__class__.__name__}: ' \ '"{self.name}" package is required. Please run "pip install {self.name}"'.format(self=self) + + +class CustomBuildScriptFailed(CommandFailedError): + def __init__(self, errno, *args, **kwargs): + super(CustomBuildScriptFailed, self).__init__(*args, **kwargs) + self.errno = errno + + +class SkippedCustomBuildScript(CommandFailedError): + pass diff --git a/clearml_agent/helper/base.py b/clearml_agent/helper/base.py index aa881a1..8a07832 100644 --- a/clearml_agent/helper/base.py +++ b/clearml_agent/helper/base.py @@ -506,6 +506,38 @@ def is_conda(config): return config['agent.package_manager.type'].lower() == 'conda' +def convert_cuda_version_to_float_single_digit_str(cuda_version): + """ + Convert a cuda_version (string/float/int) into a float representation, e.g. 11.4 + Notice returns String Single digit only! + :return str: + """ + cuda_version = str(cuda_version or 0) + # if we have patch version we parse it here + cuda_version_parts = [int(v) for v in cuda_version.split('.')] + if len(cuda_version_parts) > 1 or cuda_version_parts[0] < 60: + cuda_version = 10 * cuda_version_parts[0] + if len(cuda_version_parts) > 1: + cuda_version += float(".{:d}".format(cuda_version_parts[1]))*10 + + cuda_version_full = "{:.1f}".format(float(cuda_version) / 10.) + else: + cuda_version = cuda_version_parts[0] + cuda_version_full = "{:.1f}".format(float(cuda_version) / 10.) + + return cuda_version_full + + +def convert_cuda_version_to_int_10_base_str(cuda_version): + """ + Convert a cuda_version (string/float/int) into an integer version, e.g. 112 for cuda 11.2 + Return string + :return str: + """ + cuda_version = convert_cuda_version_to_float_single_digit_str(cuda_version) + return str(int(float(cuda_version)*10)) + + class NonStrictAttrs(object): @classmethod diff --git a/clearml_agent/helper/package/conda_api.py b/clearml_agent/helper/package/conda_api.py index 0ffd506..2d94f5f 100644 --- a/clearml_agent/helper/package/conda_api.py +++ b/clearml_agent/helper/package/conda_api.py @@ -19,7 +19,9 @@ from clearml_agent.external.requirements_parser import parse from clearml_agent.external.requirements_parser.requirement import Requirement from clearml_agent.errors import CommandFailedError -from clearml_agent.helper.base import rm_tree, NonStrictAttrs, select_for_platform, is_windows_platform, ExecutionInfo +from clearml_agent.helper.base import ( + rm_tree, NonStrictAttrs, select_for_platform, is_windows_platform, ExecutionInfo, + convert_cuda_version_to_float_single_digit_str, convert_cuda_version_to_int_10_base_str, ) from clearml_agent.helper.process import Argv, Executable, DEVNULL, CommandSequence, PathLike from clearml_agent.helper.package.requirements import SimpleVersion from clearml_agent.session import Session @@ -167,7 +169,7 @@ class CondaAPI(PackageManager): raise ValueError("Could not restore Conda environment, cannot find {}".format( self.conda_pre_build_env_path)) - output = Argv( + command = Argv( self.conda, "create", "--yes", @@ -175,7 +177,9 @@ class CondaAPI(PackageManager): "--prefix", self.path, "python={}".format(self.python), - ).get_output(stderr=DEVNULL) + ) + print('Executing Conda: {}'.format(command.serialize())) + output = command.get_output(stderr=DEVNULL) match = re.search( r"\W*(.*activate) ({})".format(re.escape(str(self.path))), output ) @@ -457,16 +461,8 @@ class CondaAPI(PackageManager): if not cuda_version: cuda_version = 0 else: - cuda_version_full = str(cuda_version) - # if we have patch version we parse it here - cuda_version_parts = [int(v) for v in cuda_version.split('.')] - if len(cuda_version_parts) > 1 or cuda_version_parts[0] < 60: - cuda_version = 10*cuda_version_parts[0] - if len(cuda_version_parts) > 1: - cuda_version += cuda_version_parts[1] - else: - cuda_version = cuda_version_parts[0] - cuda_version_full = "{:.1f}".format(float(cuda_version)/10.) + cuda_version_full = convert_cuda_version_to_float_single_digit_str(cuda_version) + cuda_version = int(convert_cuda_version_to_int_10_base_str(cuda_version)) except Exception: cuda_version = 0 diff --git a/clearml_agent/helper/package/pip_api/venv.py b/clearml_agent/helper/package/pip_api/venv.py index 868d7f6..3a41287 100644 --- a/clearml_agent/helper/package/pip_api/venv.py +++ b/clearml_agent/helper/package/pip_api/venv.py @@ -12,7 +12,7 @@ from ..requirements import RequirementsManager class VirtualenvPip(SystemPip, PackageManager): def __init__(self, session, python, requirements_manager, path, interpreter=None, execution_info=None, **kwargs): - # type: (Session, float, RequirementsManager, PathLike, PathLike, ExecutionInfo, Any) -> () + # type: (Session, str, RequirementsManager, PathLike, PathLike, ExecutionInfo, Any) -> () """ Program interface to virtualenv pip. Must be given either path to virtualenv or source command. @@ -48,7 +48,7 @@ class VirtualenvPip(SystemPip, PackageManager): return Argv.conditional_flag( self.session.config["agent.package_manager.system_site_packages"], "--system-site-packages", - ) + ("--python", self._bin) + ) def install_flags(self): """ @@ -64,10 +64,6 @@ class VirtualenvPip(SystemPip, PackageManager): Only valid if instantiated with path. Use self.python as self.bin does not exist. """ - # Log virtualenv information to stdout - self.session.command( - self.python, "-m", "virtualenv", "--version" - ) self.session.command( self.python, "-m", "virtualenv", self.path, *self.create_flags() ).check_call() diff --git a/clearml_agent/helper/package/pytorch.py b/clearml_agent/helper/package/pytorch.py index 6b8b03a..5b2ef15 100644 --- a/clearml_agent/helper/package/pytorch.py +++ b/clearml_agent/helper/package/pytorch.py @@ -174,36 +174,42 @@ class PytorchRequirement(SimpleSubstitution): self.log = self._session.get_logger(__name__) self.package_manager = self.config["agent.package_manager.type"].lower() self.os = os_name or self.get_platform() - self.cuda = "cuda{}".format(self.cuda_version).lower() - self.python_version_string = str(self.config["agent.default_python"]) - self.python_major_minor_str = '.'.join(self.python_version_string.split('.')[:2]) - if '.' not in self.python_major_minor_str: - raise PytorchResolutionError( - "invalid python version {!r} defined in configuration file, key 'agent.default_python': " - "must have both major and minor parts of the version (for example: '3.7')".format( - self.python_version_string - ) - ) - self.python = "python{}".format(self.python_major_minor_str) - - self.exceptions = [ - PytorchResolutionError(message) - for message in ( - None, - 'cuda version "{}" is not supported'.format(self.cuda), - 'python version "{}" is not supported'.format( - self.python_version_string - ), - ) - ] - - try: - self.validate_python_version() - except PytorchResolutionError as e: - self.log.warn("will not be able to install pytorch wheels: %s", e.args[0]) - + self.cuda = None + self.python_version_string = None + self.python_major_minor_str = None + self.python = None + self.exceptions = [] self._original_req = [] + def _init_python_ver_cuda_ver(self): + if self.cuda is None: + self.cuda = "cuda{}".format(self.cuda_version).lower() + if self.python_version_string is None: + self.python_version_string = str(self.config["agent.default_python"]) + if self.python_major_minor_str is None: + self.python_major_minor_str = '.'.join(self.python_version_string.split('.')[:2]) + if '.' not in self.python_major_minor_str: + raise PytorchResolutionError( + "invalid python version {!r} defined in configuration file, key 'agent.default_python': " + "must have both major and minor parts of the version (for example: '3.7')".format( + self.python_version_string + ) + ) + if self.python is None: + self.python = "python{}".format(self.python_major_minor_str) + + if not self.exceptions: + self.exceptions = [ + PytorchResolutionError(message) + for message in ( + None, + 'cuda version "{}" is not supported'.format(self.cuda), + 'python version "{}" is not supported'.format( + self.python_version_string + ), + ) + ] + @property def is_conda(self): return self.package_manager == "conda" @@ -216,6 +222,8 @@ class PytorchRequirement(SimpleSubstitution): """ Make sure python version has both major and minor versions as required for choosing pytorch wheel """ + self._init_python_ver_cuda_ver() + if self.is_pip and not self.python_major_minor_str: raise PytorchResolutionError( "invalid python version {!r} defined in configuration file, key 'agent.default_python': " @@ -294,6 +302,7 @@ class PytorchRequirement(SimpleSubstitution): def get_url_for_platform(self, req): # check if package is already installed with system packages + self.validate_python_version() # noinspection PyBroadException try: if self.config.get("agent.package_manager.system_site_packages", None): diff --git a/clearml_agent/helper/package/requirements.py b/clearml_agent/helper/package/requirements.py index 501c893..b19f8a8 100644 --- a/clearml_agent/helper/package/requirements.py +++ b/clearml_agent/helper/package/requirements.py @@ -16,7 +16,9 @@ from pyhocon import ConfigTree import six import logging from clearml_agent.definitions import PIP_EXTRA_INDICES -from clearml_agent.helper.base import warning, is_conda, which, join_lines, is_windows_platform +from clearml_agent.helper.base import ( + warning, is_conda, which, join_lines, is_windows_platform, + convert_cuda_version_to_int_10_base_str, ) from clearml_agent.helper.process import Argv, PathLike from clearml_agent.helper.gpu.gpustat import get_driver_cuda_version from clearml_agent.session import Session, normalize_cuda_version @@ -474,7 +476,7 @@ class RequirementSubstitution(object): @property def cuda_version(self): - return self.config['agent.cuda_version'] + return convert_cuda_version_to_int_10_base_str(self.config['agent.cuda_version']) @property def cudnn_version(self): diff --git a/clearml_agent/interface/worker.py b/clearml_agent/interface/worker.py index bc4b23c..47ee0c2 100644 --- a/clearml_agent/interface/worker.py +++ b/clearml_agent/interface/worker.py @@ -99,8 +99,10 @@ DAEMON_ARGS = dict({ 'aliases': ['-d'], }, '--stop': { - 'help': 'Stop the running agent (based on the same set of arguments)', - 'action': 'store_true', + 'help': 'Stop the running agent (based on the same set of arguments). ' + 'Optional: provide a list of specific local worker IDs to stop', + 'nargs': '*', + 'default': False, }, '--dynamic-gpus': { 'help': 'Allow to dynamically allocate gpus based on queue properties, '