Add support for new container base setup script feature

This commit is contained in:
allegroai 2021-04-07 18:46:14 +03:00
parent e71e6865d2
commit 3774fa6abd

View File

@ -220,6 +220,35 @@ def get_task(session, task_id, *args, **kwargs):
return session.api_client.tasks.get_all(id=[task_id], *args, **kwargs)[0]
def get_task_container(session, task_id):
if session.check_min_api_version("2.13"):
result = session.send_request(
service='tasks',
action='get_all',
version='2.13',
json={'id': [task_id], 'only_fields': ['container']},
method='get',
async_enable=False,
)
try:
container = result.json()['data']['tasks'][0]['container'] if result.ok else {}
container['arguments'] = str(container.get('arguments') or '').split(' ')
except (ValueError, TypeError):
container = {}
else:
response = get_task(session, task_id, only_fields=["execution.docker_cmd"])
task_docker_cmd_parts = str(response.execution.docker_cmd or '').strip().split(' ')
try:
container = dict(
container=task_docker_cmd_parts[0],
arguments=task_docker_cmd_parts[1:] if len (task_docker_cmd_parts[0])>1 else ''
)
except (ValueError, TypeError):
container = {}
return container
class TaskStopSignal(object):
"""
Follow task status and signals when it should be stopped
@ -506,20 +535,19 @@ class Worker(ServiceCommandSection):
if self._services_mode and not self._dynamic_gpus else self.worker_id
if self.docker_image_func:
# noinspection PyBroadException
try:
response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"])
task_docker_cmd = docker or response.execution.docker_cmd
task_docker_cmd = task_docker_cmd.strip() if task_docker_cmd else None
task_container = get_task_container(self._session, task_id)
except Exception:
task_docker_cmd = None
task_container = {}
if task_docker_cmd:
if task_container:
self.send_logs(task_id=task_id,
lines=['Running Task {} inside docker: {}\n'.format(task_id, task_docker_cmd)],
lines=['Running Task {} inside docker: {}\n'.format(task_id, str(task_container))],
level="INFO")
task_docker_cmd = task_docker_cmd.split(' ')
docker_image = task_docker_cmd[0]
docker_arguments = task_docker_cmd[1:]
docker_image = task_container.get('image')
docker_arguments = task_container.get('arguments')
docker_setup_script = task_container.get('setup_shell_script')
else:
self.send_logs(task_id=task_id,
lines=['running Task {} inside default docker image: {} {}\n'.format(
@ -527,21 +555,32 @@ class Worker(ServiceCommandSection):
level="INFO")
docker_image = self._docker_image
docker_arguments = self._docker_arguments
docker_setup_script = None
# Update docker command
if self._services_mode:
# if this is services mode, give the docker a unique worker id, as it will register itself.
full_docker_cmd = self.docker_image_func(
worker_id=worker_id,
docker_image=docker_image, docker_arguments=docker_arguments)
docker_image=docker_image,
docker_arguments=docker_arguments,
docker_bash_setup_script=docker_setup_script)
else:
full_docker_cmd = self.docker_image_func(docker_image=docker_image, docker_arguments=docker_arguments)
try:
self._session.send_api(
tasks_api.EditRequest(task_id, force=True, execution=dict( # noqa
docker_cmd=' '.join([docker_image] + docker_arguments) if docker_arguments else docker_image)))
except Exception:
pass
full_docker_cmd = self.docker_image_func(
docker_image=docker_image,
docker_arguments=docker_arguments,
docker_bash_setup_script=docker_setup_script)
# if we are using the default docker, update back the Task:
if not task_container:
# noinspection PyBroadException
try:
self._session.send_api(
tasks_api.EditRequest(task_id, force=True, execution=dict( # noqa
docker_cmd=' '.join([docker_image] + docker_arguments)
if docker_arguments else str(docker_image))))
except Exception:
pass
# if this is services_mode, change the worker_id to a unique name
# abd use full-monitoring, ot it registers itself as a worker for this specific service.
@ -1562,21 +1601,24 @@ class Worker(ServiceCommandSection):
self.dump_config(self.temp_config_path, config=temp_config)
self.docker_image_func = docker_image_func
try:
response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"])
task_docker_cmd = response.execution.docker_cmd
task_docker_cmd = task_docker_cmd.strip() if task_docker_cmd else None
task_container = get_task_container(self._session, task_id)
except Exception:
task_docker_cmd = None
if task_docker_cmd:
print('Building Task {} inside docker: {}\n'.format(task_id, task_docker_cmd))
task_docker_cmd = task_docker_cmd.split(' ')
full_docker_cmd = self.docker_image_func(docker_image=task_docker_cmd[0],
docker_arguments=task_docker_cmd[1:])
task_container = {}
if task_container:
docker_image = task_container.get('image')
docker_arguments = task_container.get('arguments')
docker_setup_script = task_container.get('setup_shell_script')
else:
print('Building Task {} inside default docker image: {} {}\n'.format(
task_id, self._docker_image, self._docker_arguments or ''))
full_docker_cmd = self.docker_image_func(docker_image=self._docker_image,
docker_arguments=self._docker_arguments)
docker_image = self._docker_image
docker_arguments = self._docker_arguments
docker_setup_script = None
print('Building Task {} inside docker image: {} {} setup_script={}\n'.format(
task_id, docker_image, docker_arguments or '', docker_setup_script or ''))
full_docker_cmd = self.docker_image_func(
docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script)
end_of_build_marker = "build.done=true"
docker_cmd_suffix = ' build --id {task_id} --install-globally; ' \
'echo "" >> {conf_file} ; ' \
@ -2875,6 +2917,12 @@ class Worker(ServiceCommandSection):
for git_credentials in host_git_credentials:
base_cmd += ['-v', '{}:/root/{}'.format(git_credentials, Path(git_credentials).name)]
if docker_bash_setup_script and docker_bash_setup_script.strip('\n '):
extra_shell_script = (extra_shell_script or '') + \
' ; '.join(line.strip().replace('\"', '\\\"')
for line in docker_bash_setup_script.split('\n') if line.strip()) + \
' ; '
base_cmd += (
['-v', conf_file+':'+DOCKER_ROOT_CONF_FILE] +
(['-v', host_ssh_cache+':/root/.ssh'] if host_ssh_cache else []) +