From ed1356976bb9b5c9205b3689e0e94d8ddb3e0d76 Mon Sep 17 00:00:00 2001 From: Alex Burlacu Date: Thu, 24 Aug 2023 19:00:36 +0300 Subject: [PATCH] Move extra configurations to Worker init to make sure all available configurations can be overridden --- clearml_agent/backend_api/session/session.py | 9 +- clearml_agent/commands/worker.py | 97 ++++++++++++-------- 2 files changed, 67 insertions(+), 39 deletions(-) diff --git a/clearml_agent/backend_api/session/session.py b/clearml_agent/backend_api/session/session.py index c6b85fe..d7e5f73 100644 --- a/clearml_agent/backend_api/session/session.py +++ b/clearml_agent/backend_api/session/session.py @@ -203,6 +203,8 @@ class Session(TokenManager): print("Using forced API version {}".format(self.force_max_api_version)) Session.max_api_version = Session.api_version = str(self.force_max_api_version) + self.pre_vault_config = None + def _setup_session(self, http_retries_config, initial_session=False, default_initial_connect_override=None): # type: (dict, bool, Optional[bool]) -> (dict, requests.Session) http_retries_config = http_retries_config or self.config.get( @@ -254,7 +256,11 @@ class Session(TokenManager): def parse(vault): # noinspection PyBroadException try: - d = vault.get('data', None) + print("Loaded {} vault: {}".format( + vault.get("scope", ""), + (vault.get("description", None) or "")[:50] or vault.get("id", "")) + ) + d = vault.get("data", None) if d: r = ConfigFactory.parse_string(d) if isinstance(r, (ConfigTree, dict)): @@ -270,6 +276,7 @@ class Session(TokenManager): vaults = res.json().get("data", {}).get("vaults", []) data = list(filter(None, map(parse, vaults))) if data: + self.pre_vault_config = self.config.copy() self.config.set_overrides(*data) return True elif res.status_code != 404: diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index fde69da..c684c12 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -665,9 +665,13 @@ class Worker(ServiceCommandSection): self.log = self._session.get_logger(__name__) self.register_signal_handler() self._worker_registered = False + + self._apply_extra_configuration() + self.is_conda = is_conda(self._session.config) # type: bool # Add extra index url - system wide extra_url = None + # noinspection PyBroadException try: if self._session.config.get("agent.package_manager.extra_index_url", None): extra_url = self._session.config.get("agent.package_manager.extra_index_url", []) @@ -1537,8 +1541,6 @@ class Worker(ServiceCommandSection): return self._resolve_queue_names(queues=queues, create_if_missing=create_if_missing) def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs): - self._apply_extra_configuration() - # check that we have docker command if we need it if docker not in (False, None) and not check_if_command_exists("docker"): raise ValueError("Running in Docker mode, 'docker' command was not found") @@ -2079,19 +2081,26 @@ class Worker(ServiceCommandSection): def _apply_extra_configuration(self): # store a few things we updated in runtime (TODO: we should list theme somewhere) - agent_config = self._session.config["agent"].copy() + vault_loaded = False + session = self._session + agent_config = session.config["agent"].copy() agent_config_keys = ["cuda_version", "cudnn_version", "default_python", "worker_id", "worker_name", "debug"] try: - self._session.load_vaults() + vault_loaded = session.load_vaults() except Exception as ex: print("Error: failed applying extra configuration: {}".format(ex)) - # merge back - for restore_key in agent_config_keys: - if restore_key in agent_config: - self._session.config["agent"][restore_key] = agent_config[restore_key] + config = session.config + + # merge back + if vault_loaded: + for restore_key in agent_config_keys: + if restore_key in agent_config and agent_config[restore_key] != config["agent"].get(restore_key, None): + print("Ignoring vault value for '{}' (agent config takes precedence), using '{}'".format( + restore_key, agent_config[restore_key] + )) + config["agent"][restore_key] = agent_config[restore_key] - config = self._session.config default = config.get("agent.apply_environment", False) if ENV_ENABLE_ENV_CONFIG_SECTION.get(default=default): try: @@ -2373,8 +2382,10 @@ class Worker(ServiceCommandSection): print("Cloning task id={}".format(task_id)) current_task = self._session.api_client.tasks.get_by_id( self._session.send_api( - tasks_api.CloneRequest(task=current_task.id, - new_task_name='Clone of {}'.format(current_task.name)) + tasks_api.CloneRequest( + task=current_task.id, + new_task_name="Clone of {}".format(current_task.name) + ) ).id ) print("Task cloned, new task id={}".format(current_task.id)) @@ -2384,10 +2395,21 @@ class Worker(ServiceCommandSection): # make sure this task is not stuck in an execution queue, it shouldn't have been, but just in case. # noinspection PyBroadException try: - res = self._session.api_client.tasks.dequeue(task=current_task.id) - if require_queue and res.meta.result_code != 200: - raise ValueError("Execution required enqueued task, " - "but task id={} is not queued.".format(current_task.id)) + res = self._session.send_request( + service="tasks", action="dequeue", method=Request.def_method, + json={"task": current_task.id, "new_status": "in_progress"}, + ) + if require_queue and (not res.ok or res.json().get("data", {}).get("updated", 0) < 1): + raise ValueError( + "Execution required enqueued task, but task id={} is not queued.".format(current_task.id) + ) + # Set task status to started to prevent any external monitoring from killing it + self._session.api_client.tasks.started( + task=current_task.id, + status_reason="starting execution soon", + status_message="", + force=True, + ) except Exception: if require_queue: raise @@ -2398,14 +2420,14 @@ class Worker(ServiceCommandSection): # We expect the same behaviour in case full_monitoring was set, and in case docker mode is used if full_monitoring or docker is not False: if full_monitoring: - if not (ENV_WORKER_ID.get() or '').strip(): - self._session.config["agent"]["worker_id"] = '' + if not (ENV_WORKER_ID.get() or "").strip(): + self._session.config["agent"]["worker_id"] = "" # make sure we support multiple instances if we need to self._singleton() self.temp_config_path = self.temp_config_path or safe_mkstemp( suffix=".cfg", prefix=".clearml_agent.", text=True, name_only=True ) - self.dump_config(self.temp_config_path) + self.dump_config(filename=self.temp_config_path, config=self._session.pre_vault_config) self._session._config_file = self.temp_config_path worker_params = WorkerParams( @@ -2426,8 +2448,6 @@ class Worker(ServiceCommandSection): Singleton.close_pid_file() return status if ENV_PROPAGATE_EXITCODE.get() else 0 - self._apply_extra_configuration() - self._session.print_configuration() # now mark the task as started @@ -3672,34 +3692,35 @@ class Worker(ServiceCommandSection): def _get_docker_config_cmd(self, temp_config, clean_api_credentials=False, **kwargs): self.debug("Setting up docker config command") - host_cache = Path(os.path.expandvars( - self._session.config["sdk.storage.cache.default_base_dir"])).expanduser().as_posix() + + def load_path(field, default=None): + value = self._session.config.get(field, default) + return Path(os.path.expandvars(value)).expanduser().as_posix() if value else None + + host_cache = load_path("sdk.storage.cache.default_base_dir") self.debug("host_cache: {}".format(host_cache)) - host_pip_dl = Path(os.path.expandvars( - self._session.config["agent.pip_download_cache.path"])).expanduser().as_posix() + + host_pip_dl = load_path("agent.pip_download_cache.path") self.debug("host_pip_dl: {}".format(host_pip_dl)) - host_vcs_cache = Path(os.path.expandvars( - self._session.config["agent.vcs_cache.path"])).expanduser().as_posix() + + host_vcs_cache = load_path("agent.vcs_cache.path") self.debug("host_vcs_cache: {}".format(host_vcs_cache)) - host_venvs_cache = Path(os.path.expandvars( - self._session.config["agent.venvs_cache.path"])).expanduser().as_posix() \ - if self._session.config.get("agent.venvs_cache.path", None) else None + + host_venvs_cache = load_path("agent.venvs_cache.path") self.debug("host_venvs_cache: {}".format(host_venvs_cache)) + host_ssh_cache = self._host_ssh_cache self.debug("host_ssh_cache: {}".format(host_ssh_cache)) - host_apt_cache = Path(os.path.expandvars(self._session.config.get( - "agent.docker_apt_cache", '~/.clearml/apt-cache'))).expanduser().as_posix() + host_apt_cache = load_path("agent.docker_apt_cache", default="~/.clearml/apt-cache") self.debug("host_apt_cache: {}".format(host_apt_cache)) - host_pip_cache = Path(os.path.expandvars(self._session.config.get( - "agent.docker_pip_cache", '~/.clearml/pip-cache'))).expanduser().as_posix() + + host_pip_cache = load_path("agent.docker_pip_cache", default="~/.clearml/pip-cache") self.debug("host_pip_cache: {}".format(host_pip_cache)) - if self.poetry.enabled: - host_poetry_cache = Path(os.path.expandvars(self._session.config.get( - "agent.docker_poetry_cache", '~/.clearml/poetry-cache'))).expanduser().as_posix() - else: - host_poetry_cache = None + host_poetry_cache = ( + load_path("agent.docker_poetry_cache", "~/.clearml/poetry-cache") if self.poetry.enabled else None + ) self.debug("host_poetry_cache: {}".format(host_poetry_cache)) # make sure all folders are valid