Move extra configurations to Worker init to make sure all available configurations can be overridden

This commit is contained in:
Alex Burlacu 2023-08-24 19:00:36 +03:00
parent 2b815354e0
commit ed1356976b
2 changed files with 67 additions and 39 deletions

View File

@ -203,6 +203,8 @@ class Session(TokenManager):
print("Using forced API version {}".format(self.force_max_api_version))
Session.max_api_version = Session.api_version = str(self.force_max_api_version)
self.pre_vault_config = None
def _setup_session(self, http_retries_config, initial_session=False, default_initial_connect_override=None):
# type: (dict, bool, Optional[bool]) -> (dict, requests.Session)
http_retries_config = http_retries_config or self.config.get(
@ -254,7 +256,11 @@ class Session(TokenManager):
def parse(vault):
# noinspection PyBroadException
try:
d = vault.get('data', None)
print("Loaded {} vault: {}".format(
vault.get("scope", ""),
(vault.get("description", None) or "")[:50] or vault.get("id", ""))
)
d = vault.get("data", None)
if d:
r = ConfigFactory.parse_string(d)
if isinstance(r, (ConfigTree, dict)):
@ -270,6 +276,7 @@ class Session(TokenManager):
vaults = res.json().get("data", {}).get("vaults", [])
data = list(filter(None, map(parse, vaults)))
if data:
self.pre_vault_config = self.config.copy()
self.config.set_overrides(*data)
return True
elif res.status_code != 404:

View File

@ -665,9 +665,13 @@ class Worker(ServiceCommandSection):
self.log = self._session.get_logger(__name__)
self.register_signal_handler()
self._worker_registered = False
self._apply_extra_configuration()
self.is_conda = is_conda(self._session.config) # type: bool
# Add extra index url - system wide
extra_url = None
# noinspection PyBroadException
try:
if self._session.config.get("agent.package_manager.extra_index_url", None):
extra_url = self._session.config.get("agent.package_manager.extra_index_url", [])
@ -1537,8 +1541,6 @@ class Worker(ServiceCommandSection):
return self._resolve_queue_names(queues=queues, create_if_missing=create_if_missing)
def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs):
self._apply_extra_configuration()
# check that we have docker command if we need it
if docker not in (False, None) and not check_if_command_exists("docker"):
raise ValueError("Running in Docker mode, 'docker' command was not found")
@ -2079,19 +2081,26 @@ class Worker(ServiceCommandSection):
def _apply_extra_configuration(self):
# store a few things we updated in runtime (TODO: we should list theme somewhere)
agent_config = self._session.config["agent"].copy()
vault_loaded = False
session = self._session
agent_config = session.config["agent"].copy()
agent_config_keys = ["cuda_version", "cudnn_version", "default_python", "worker_id", "worker_name", "debug"]
try:
self._session.load_vaults()
vault_loaded = session.load_vaults()
except Exception as ex:
print("Error: failed applying extra configuration: {}".format(ex))
# merge back
for restore_key in agent_config_keys:
if restore_key in agent_config:
self._session.config["agent"][restore_key] = agent_config[restore_key]
config = session.config
# merge back
if vault_loaded:
for restore_key in agent_config_keys:
if restore_key in agent_config and agent_config[restore_key] != config["agent"].get(restore_key, None):
print("Ignoring vault value for '{}' (agent config takes precedence), using '{}'".format(
restore_key, agent_config[restore_key]
))
config["agent"][restore_key] = agent_config[restore_key]
config = self._session.config
default = config.get("agent.apply_environment", False)
if ENV_ENABLE_ENV_CONFIG_SECTION.get(default=default):
try:
@ -2373,8 +2382,10 @@ class Worker(ServiceCommandSection):
print("Cloning task id={}".format(task_id))
current_task = self._session.api_client.tasks.get_by_id(
self._session.send_api(
tasks_api.CloneRequest(task=current_task.id,
new_task_name='Clone of {}'.format(current_task.name))
tasks_api.CloneRequest(
task=current_task.id,
new_task_name="Clone of {}".format(current_task.name)
)
).id
)
print("Task cloned, new task id={}".format(current_task.id))
@ -2384,10 +2395,21 @@ class Worker(ServiceCommandSection):
# make sure this task is not stuck in an execution queue, it shouldn't have been, but just in case.
# noinspection PyBroadException
try:
res = self._session.api_client.tasks.dequeue(task=current_task.id)
if require_queue and res.meta.result_code != 200:
raise ValueError("Execution required enqueued task, "
"but task id={} is not queued.".format(current_task.id))
res = self._session.send_request(
service="tasks", action="dequeue", method=Request.def_method,
json={"task": current_task.id, "new_status": "in_progress"},
)
if require_queue and (not res.ok or res.json().get("data", {}).get("updated", 0) < 1):
raise ValueError(
"Execution required enqueued task, but task id={} is not queued.".format(current_task.id)
)
# Set task status to started to prevent any external monitoring from killing it
self._session.api_client.tasks.started(
task=current_task.id,
status_reason="starting execution soon",
status_message="",
force=True,
)
except Exception:
if require_queue:
raise
@ -2398,14 +2420,14 @@ class Worker(ServiceCommandSection):
# We expect the same behaviour in case full_monitoring was set, and in case docker mode is used
if full_monitoring or docker is not False:
if full_monitoring:
if not (ENV_WORKER_ID.get() or '').strip():
self._session.config["agent"]["worker_id"] = ''
if not (ENV_WORKER_ID.get() or "").strip():
self._session.config["agent"]["worker_id"] = ""
# make sure we support multiple instances if we need to
self._singleton()
self.temp_config_path = self.temp_config_path or safe_mkstemp(
suffix=".cfg", prefix=".clearml_agent.", text=True, name_only=True
)
self.dump_config(self.temp_config_path)
self.dump_config(filename=self.temp_config_path, config=self._session.pre_vault_config)
self._session._config_file = self.temp_config_path
worker_params = WorkerParams(
@ -2426,8 +2448,6 @@ class Worker(ServiceCommandSection):
Singleton.close_pid_file()
return status if ENV_PROPAGATE_EXITCODE.get() else 0
self._apply_extra_configuration()
self._session.print_configuration()
# now mark the task as started
@ -3672,34 +3692,35 @@ class Worker(ServiceCommandSection):
def _get_docker_config_cmd(self, temp_config, clean_api_credentials=False, **kwargs):
self.debug("Setting up docker config command")
host_cache = Path(os.path.expandvars(
self._session.config["sdk.storage.cache.default_base_dir"])).expanduser().as_posix()
def load_path(field, default=None):
value = self._session.config.get(field, default)
return Path(os.path.expandvars(value)).expanduser().as_posix() if value else None
host_cache = load_path("sdk.storage.cache.default_base_dir")
self.debug("host_cache: {}".format(host_cache))
host_pip_dl = Path(os.path.expandvars(
self._session.config["agent.pip_download_cache.path"])).expanduser().as_posix()
host_pip_dl = load_path("agent.pip_download_cache.path")
self.debug("host_pip_dl: {}".format(host_pip_dl))
host_vcs_cache = Path(os.path.expandvars(
self._session.config["agent.vcs_cache.path"])).expanduser().as_posix()
host_vcs_cache = load_path("agent.vcs_cache.path")
self.debug("host_vcs_cache: {}".format(host_vcs_cache))
host_venvs_cache = Path(os.path.expandvars(
self._session.config["agent.venvs_cache.path"])).expanduser().as_posix() \
if self._session.config.get("agent.venvs_cache.path", None) else None
host_venvs_cache = load_path("agent.venvs_cache.path")
self.debug("host_venvs_cache: {}".format(host_venvs_cache))
host_ssh_cache = self._host_ssh_cache
self.debug("host_ssh_cache: {}".format(host_ssh_cache))
host_apt_cache = Path(os.path.expandvars(self._session.config.get(
"agent.docker_apt_cache", '~/.clearml/apt-cache'))).expanduser().as_posix()
host_apt_cache = load_path("agent.docker_apt_cache", default="~/.clearml/apt-cache")
self.debug("host_apt_cache: {}".format(host_apt_cache))
host_pip_cache = Path(os.path.expandvars(self._session.config.get(
"agent.docker_pip_cache", '~/.clearml/pip-cache'))).expanduser().as_posix()
host_pip_cache = load_path("agent.docker_pip_cache", default="~/.clearml/pip-cache")
self.debug("host_pip_cache: {}".format(host_pip_cache))
if self.poetry.enabled:
host_poetry_cache = Path(os.path.expandvars(self._session.config.get(
"agent.docker_poetry_cache", '~/.clearml/poetry-cache'))).expanduser().as_posix()
else:
host_poetry_cache = None
host_poetry_cache = (
load_path("agent.docker_poetry_cache", "~/.clearml/poetry-cache") if self.poetry.enabled else None
)
self.debug("host_poetry_cache: {}".format(host_poetry_cache))
# make sure all folders are valid