diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index c854d38..ead54cd 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -220,6 +220,35 @@ def get_task(session, task_id, *args, **kwargs): return session.api_client.tasks.get_all(id=[task_id], *args, **kwargs)[0] +def get_task_container(session, task_id): + if session.check_min_api_version("2.13"): + result = session.send_request( + service='tasks', + action='get_all', + version='2.13', + json={'id': [task_id], 'only_fields': ['container']}, + method='get', + async_enable=False, + ) + try: + container = result.json()['data']['tasks'][0]['container'] if result.ok else {} + container['arguments'] = str(container.get('arguments') or '').split(' ') + except (ValueError, TypeError): + container = {} + else: + response = get_task(session, task_id, only_fields=["execution.docker_cmd"]) + task_docker_cmd_parts = str(response.execution.docker_cmd or '').strip().split(' ') + try: + container = dict( + container=task_docker_cmd_parts[0], + arguments=task_docker_cmd_parts[1:] if len (task_docker_cmd_parts[0])>1 else '' + ) + except (ValueError, TypeError): + container = {} + + return container + + class TaskStopSignal(object): """ Follow task status and signals when it should be stopped @@ -506,20 +535,19 @@ class Worker(ServiceCommandSection): if self._services_mode and not self._dynamic_gpus else self.worker_id if self.docker_image_func: + # noinspection PyBroadException try: - response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"]) - task_docker_cmd = docker or response.execution.docker_cmd - task_docker_cmd = task_docker_cmd.strip() if task_docker_cmd else None + task_container = get_task_container(self._session, task_id) except Exception: - task_docker_cmd = None + task_container = {} - if task_docker_cmd: + if task_container: self.send_logs(task_id=task_id, - lines=['Running Task {} inside docker: {}\n'.format(task_id, task_docker_cmd)], + lines=['Running Task {} inside docker: {}\n'.format(task_id, str(task_container))], level="INFO") - task_docker_cmd = task_docker_cmd.split(' ') - docker_image = task_docker_cmd[0] - docker_arguments = task_docker_cmd[1:] + docker_image = task_container.get('image') + docker_arguments = task_container.get('arguments') + docker_setup_script = task_container.get('setup_shell_script') else: self.send_logs(task_id=task_id, lines=['running Task {} inside default docker image: {} {}\n'.format( @@ -527,21 +555,32 @@ class Worker(ServiceCommandSection): level="INFO") docker_image = self._docker_image docker_arguments = self._docker_arguments + docker_setup_script = None # Update docker command if self._services_mode: # if this is services mode, give the docker a unique worker id, as it will register itself. full_docker_cmd = self.docker_image_func( worker_id=worker_id, - docker_image=docker_image, docker_arguments=docker_arguments) + docker_image=docker_image, + docker_arguments=docker_arguments, + docker_bash_setup_script=docker_setup_script) else: - full_docker_cmd = self.docker_image_func(docker_image=docker_image, docker_arguments=docker_arguments) - try: - self._session.send_api( - tasks_api.EditRequest(task_id, force=True, execution=dict( # noqa - docker_cmd=' '.join([docker_image] + docker_arguments) if docker_arguments else docker_image))) - except Exception: - pass + full_docker_cmd = self.docker_image_func( + docker_image=docker_image, + docker_arguments=docker_arguments, + docker_bash_setup_script=docker_setup_script) + + # if we are using the default docker, update back the Task: + if not task_container: + # noinspection PyBroadException + try: + self._session.send_api( + tasks_api.EditRequest(task_id, force=True, execution=dict( # noqa + docker_cmd=' '.join([docker_image] + docker_arguments) + if docker_arguments else str(docker_image)))) + except Exception: + pass # if this is services_mode, change the worker_id to a unique name # abd use full-monitoring, ot it registers itself as a worker for this specific service. @@ -1562,21 +1601,24 @@ class Worker(ServiceCommandSection): self.dump_config(self.temp_config_path, config=temp_config) self.docker_image_func = docker_image_func try: - response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"]) - task_docker_cmd = response.execution.docker_cmd - task_docker_cmd = task_docker_cmd.strip() if task_docker_cmd else None + task_container = get_task_container(self._session, task_id) except Exception: - task_docker_cmd = None - if task_docker_cmd: - print('Building Task {} inside docker: {}\n'.format(task_id, task_docker_cmd)) - task_docker_cmd = task_docker_cmd.split(' ') - full_docker_cmd = self.docker_image_func(docker_image=task_docker_cmd[0], - docker_arguments=task_docker_cmd[1:]) + task_container = {} + + if task_container: + docker_image = task_container.get('image') + docker_arguments = task_container.get('arguments') + docker_setup_script = task_container.get('setup_shell_script') else: - print('Building Task {} inside default docker image: {} {}\n'.format( - task_id, self._docker_image, self._docker_arguments or '')) - full_docker_cmd = self.docker_image_func(docker_image=self._docker_image, - docker_arguments=self._docker_arguments) + docker_image = self._docker_image + docker_arguments = self._docker_arguments + docker_setup_script = None + + print('Building Task {} inside docker image: {} {} setup_script={}\n'.format( + task_id, docker_image, docker_arguments or '', docker_setup_script or '')) + full_docker_cmd = self.docker_image_func( + docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script) + end_of_build_marker = "build.done=true" docker_cmd_suffix = ' build --id {task_id} --install-globally; ' \ 'echo "" >> {conf_file} ; ' \ @@ -2875,6 +2917,12 @@ class Worker(ServiceCommandSection): for git_credentials in host_git_credentials: base_cmd += ['-v', '{}:/root/{}'.format(git_credentials, Path(git_credentials).name)] + if docker_bash_setup_script and docker_bash_setup_script.strip('\n '): + extra_shell_script = (extra_shell_script or '') + \ + ' ; '.join(line.strip().replace('\"', '\\\"') + for line in docker_bash_setup_script.split('\n') if line.strip()) + \ + ' ; ' + base_cmd += ( ['-v', conf_file+':'+DOCKER_ROOT_CONF_FILE] + (['-v', host_ssh_cache+':/root/.ssh'] if host_ssh_cache else []) +