Change dump configuration and ssh on every docker run

This commit is contained in:
allegroai 2021-01-24 08:48:10 +02:00
parent 8ea062c0bd
commit c578b37c6d

View File

@ -416,6 +416,7 @@ class Worker(ServiceCommandSection):
self._uptime_config = self._session.config.get("agent.uptime", None)
self._downtime_config = self._session.config.get("agent.downtime", None)
self._suppress_cr = self._session.config.get("agent.suppress_carriage_return", True)
self._host_ssh_cache = None
# True - supported
# None - not initialized
@ -782,7 +783,7 @@ class Worker(ServiceCommandSection):
self
)
)
self.dump_config()
self.dump_config(self.temp_config_path)
def check(self, **_):
try:
@ -878,7 +879,7 @@ class Worker(ServiceCommandSection):
self._force_current_version = kwargs.get('force_current_version', False)
self.set_docker_variables(docker)
else:
self.dump_config()
self.dump_config(self.temp_config_path)
# only in none docker we have to make sure we have CUDA setup
# make sure we have CUDA set if we have --gpus
@ -984,12 +985,23 @@ class Worker(ServiceCommandSection):
self.monitor.start()
return self.monitor
def dump_config(self, config=None):
def dump_config(self, filename, config=None):
def to_json(config):
return json.dumps(config.as_plain_ordered_dict(), cls=HOCONEncoder, indent=4)
# noinspection PyBroadException
try:
Path(self.temp_config_path).write_text(
six.text_type(self._session.to_json() if config is None else to_json(config)))
current_content = Path(filename).read_text()
except Exception:
current_content = None
# noinspection PyBroadException
try:
new_content = six.text_type(self._session.to_json() if config is None else to_json(config))
# Overwrite file only if the content is different, because we are mounting the same file into
# multiple containers in services mode, and we don't want to change it if we do not have to.
if new_content != current_content:
Path(filename).write_text(new_content)
except Exception:
return False
return True
@ -1279,7 +1291,7 @@ class Worker(ServiceCommandSection):
target = "task_id_{}".format(task_id)
temp_config, docker_image_func = self.get_docker_config_cmd(docker)
self.dump_config(temp_config)
self.dump_config(self.temp_config_path, config=temp_config)
self.docker_image_func = docker_image_func
try:
response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"])
@ -1378,6 +1390,12 @@ class Worker(ServiceCommandSection):
current_task = self._session.api_client.tasks.get_by_id(task_id)
if not current_task.id:
pass
except AttributeError:
raise ValueError(
"Could not find task id={} (for host: {})".format(
task_id, self._session.config.get("api.host", "")
)
)
except Exception as ex:
raise ValueError(
"Could not find task id={} (for host: {})\nException: {}".format(
@ -1620,7 +1638,7 @@ class Worker(ServiceCommandSection):
def set_docker_variables(self, docker):
temp_config, docker_image_func = self.get_docker_config_cmd(docker)
self.dump_config(temp_config)
self.dump_config(self.temp_config_path, config=temp_config)
self.docker_image_func = docker_image_func
def get_execution_info(self, current_task):
@ -2159,14 +2177,6 @@ class Worker(ServiceCommandSection):
print(requirements_manager.replace(contents))
def get_docker_config_cmd(self, docker_args):
def docker_cmd_functor(default_kwargs, temp_config, **kwargs):
# Make sure we have created the configuration file for the executor
if not self.dump_config(temp_config):
self.log.warning('Could not update docker configuration file {}'.format(self.temp_config_path))
args = deepcopy(default_kwargs)
args.update(kwargs)
return self._get_docker_cmd(**args)
docker_image = str(ENV_DOCKER_IMAGE.get() or
self._session.config.get("agent.default_docker.image", "nvidia/cuda")) \
if not docker_args else docker_args[0]
@ -2178,22 +2188,19 @@ class Worker(ServiceCommandSection):
docker_arguments = self._session.config.get("agent.default_docker.arguments", None) or []
if isinstance(docker_arguments, six.string_types):
docker_arguments = [docker_arguments]
python_version = '3'
if not python_version.startswith('python'):
python_version = 'python'+python_version
print("Running in Docker {} mode (v19.03 and above) - using default docker image: {} running {}\n".format(
'*standalone*' if self._standalone_mode else '', docker_image, python_version))
# store docker arguments
self._docker_image = docker_image
self._docker_arguments = docker_arguments
print("Running in Docker {} mode (v19.03 and above) - using default docker image: {} {}\n".format(
'*standalone*' if self._standalone_mode else '', self._docker_image, self._docker_arguments or ''))
temp_config = deepcopy(self._session.config)
mounted_cache_dir = self._docker_fixed_user_cache # '/root/.clearml/cache'
mounted_pip_dl_dir = '/root/.clearml/pip-download-cache'
mounted_vcs_cache = '/root/.clearml/vcs-cache'
mounted_venv_dir = '/root/.clearml/venvs-builds'
host_cache = Path(os.path.expandvars(
self._session.config["sdk.storage.cache.default_base_dir"])).expanduser().as_posix()
host_pip_dl = Path(os.path.expandvars(
self._session.config["agent.pip_download_cache.path"])).expanduser().as_posix()
host_vcs_cache = Path(os.path.expandvars(
self._session.config["agent.vcs_cache.path"])).expanduser().as_posix()
temp_config.put("sdk.storage.cache.default_base_dir", mounted_cache_dir)
temp_config.put("agent.pip_download_cache.path", mounted_pip_dl_dir)
temp_config.put("agent.vcs_cache.path", mounted_vcs_cache)
@ -2209,12 +2216,24 @@ class Worker(ServiceCommandSection):
temp_config.put("agent.git_pass", (ENV_AGENT_GIT_PASS.get() or
self._session.config.get("agent.git_pass", None)))
self._host_ssh_cache = mkdtemp(prefix='clearml_agent.ssh.')
self._temp_cleanup_list.append(self._host_ssh_cache)
return temp_config, partial(self._get_docker_config_cmd, temp_config=temp_config)
def _get_docker_config_cmd(self, temp_config, **kwargs):
host_cache = Path(os.path.expandvars(
self._session.config["sdk.storage.cache.default_base_dir"])).expanduser().as_posix()
host_pip_dl = Path(os.path.expandvars(
self._session.config["agent.pip_download_cache.path"])).expanduser().as_posix()
host_vcs_cache = Path(os.path.expandvars(
self._session.config["agent.vcs_cache.path"])).expanduser().as_posix()
host_ssh_cache = self._host_ssh_cache
host_apt_cache = Path(os.path.expandvars(self._session.config.get(
"agent.docker_apt_cache", '~/.clearml/apt-cache'))).expanduser().as_posix()
host_pip_cache = Path(os.path.expandvars(self._session.config.get(
"agent.docker_pip_cache", '~/.clearml/pip-cache'))).expanduser().as_posix()
host_ssh_cache = mkdtemp(prefix='clearml_agent.ssh.')
self._temp_cleanup_list.append(host_ssh_cache)
# make sure all folders are valid
Path(host_apt_cache).mkdir(parents=True, exist_ok=True)
@ -2225,15 +2244,14 @@ class Worker(ServiceCommandSection):
Path(host_ssh_cache).mkdir(parents=True, exist_ok=True)
# copy the .ssh folder to a temp folder, to be mapped into docker
# noinspection PyBroadException
try:
src = Path(host_ssh_cache)
if src.is_dir():
src.rmdir()
if Path(host_ssh_cache).is_dir():
shutil.rmtree(host_ssh_cache, ignore_errors=True)
shutil.copytree(Path('~/.ssh').expanduser().as_posix(), host_ssh_cache)
except Exception:
host_ssh_cache = None
self.log.warning('Failed creating temporary copy of ~/.ssh for git credential')
pass
# check if the .git credentials exist:
try:
@ -2243,10 +2261,6 @@ class Worker(ServiceCommandSection):
except Exception:
host_git_credentials = None
# store docker arguments
self._docker_image = docker_image
self._docker_arguments = docker_arguments
extra_shell_script_str = ""
if self._extra_shell_script:
cmds = self._extra_shell_script
@ -2261,24 +2275,36 @@ class Worker(ServiceCommandSection):
suffix=".cfg", prefix=".clearml_agent.", text=True, name_only=True
)
docker_cmd = dict(worker_id=self.worker_id,
# docker_image=docker_image,
# docker_arguments=docker_arguments,
extra_docker_arguments=self._extra_docker_arguments,
extra_shell_script=extra_shell_script_str,
python_version=python_version, conf_file=self.temp_config_path,
host_apt_cache=host_apt_cache,
host_pip_cache=host_pip_cache,
host_ssh_cache=host_ssh_cache, host_git_credentials=host_git_credentials,
host_cache=host_cache, mounted_cache=mounted_cache_dir,
host_pip_dl=host_pip_dl, mounted_pip_dl=mounted_pip_dl_dir,
host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache,
standalone_mode=self._standalone_mode,
force_current_version=self._force_current_version,
bash_script=bash_script,
preprocess_bash_script=preprocess_bash_script,
)
return temp_config, partial(docker_cmd_functor, docker_cmd, temp_config)
mounted_cache_dir = temp_config.get("sdk.storage.cache.default_base_dir")
mounted_pip_dl_dir = temp_config.get("agent.pip_download_cache.path")
mounted_vcs_cache = temp_config.get("agent.vcs_cache.path")
# Make sure we have created the configuration file for the executor
if not self.dump_config(self.temp_config_path, config=temp_config):
self.log.warning('Could not update docker configuration file {}'.format(self.temp_config_path))
docker_cmd = dict(
worker_id=self.worker_id,
# docker_image=docker_image,
# docker_arguments=docker_arguments,
extra_docker_arguments=self._extra_docker_arguments,
extra_shell_script=extra_shell_script_str,
python_version='python3',
conf_file=self.temp_config_path,
host_apt_cache=host_apt_cache,
host_pip_cache=host_pip_cache,
host_ssh_cache=host_ssh_cache, host_git_credentials=host_git_credentials,
host_cache=host_cache, mounted_cache=mounted_cache_dir,
host_pip_dl=host_pip_dl, mounted_pip_dl=mounted_pip_dl_dir,
host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache,
standalone_mode=self._standalone_mode,
force_current_version=self._force_current_version,
bash_script=bash_script,
preprocess_bash_script=preprocess_bash_script,
)
docker_cmd.update(kwargs)
return self._get_docker_cmd(**docker_cmd)
@staticmethod
def _get_docker_cmd(worker_id, docker_image, docker_arguments,