Improve docker support

This commit is contained in:
allegroai 2019-10-27 00:43:41 +03:00
parent cefdea78ac
commit 96cdcd5b69

View File

@ -411,16 +411,22 @@ class Worker(ServiceCommandSection):
try: try:
response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"]) response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"])
task_docker_cmd = response.execution.docker_cmd task_docker_cmd = response.execution.docker_cmd
task_docker_cmd = task_docker_cmd.strip() if task_docker_cmd else None
except Exception: except Exception:
warning('No docker image specified, using default docker image: {} {}'.format(
self._docker_image, self._docker_arguments or ''))
task_docker_cmd = None task_docker_cmd = None
if task_docker_cmd: if task_docker_cmd:
self.send_logs(task_id=task_id,
lines=['Running Task {} inside docker: {}\n'.format(task_id, task_docker_cmd)],
level="INFO")
task_docker_cmd = task_docker_cmd.split(' ') task_docker_cmd = task_docker_cmd.split(' ')
full_docker_cmd = self.docker_image_func(docker_image=task_docker_cmd[0], full_docker_cmd = self.docker_image_func(docker_image=task_docker_cmd[0],
docker_arguments=task_docker_cmd[1:]) docker_arguments=task_docker_cmd[1:])
else: else:
self.send_logs(task_id=task_id,
lines=['No docker image specified, running Task {} inside docker: {} {}\n'.format(
task_id, self._docker_image, self._docker_arguments or '')],
level="INFO")
full_docker_cmd = self.docker_image_func(docker_image=self._docker_image, full_docker_cmd = self.docker_image_func(docker_image=self._docker_image,
docker_arguments=self._docker_arguments) docker_arguments=self._docker_arguments)
# Update docker command # Update docker command
@ -489,7 +495,6 @@ class Worker(ServiceCommandSection):
:param worker_params: Worker command line arguments :param worker_params: Worker command line arguments
:type worker_params: ``trains_agent.helper.process.WorkerParams`` :type worker_params: ``trains_agent.helper.process.WorkerParams``
""" """
events_service = self.get_service(Events)
if not self._daemon_foreground: if not self._daemon_foreground:
print('Starting infinite task polling loop...') print('Starting infinite task polling loop...')
@ -519,13 +524,9 @@ class Worker(ServiceCommandSection):
print("No tasks in queue {}".format(queue)) print("No tasks in queue {}".format(queue))
continue continue
print("task {} pulled from {} by worker {}".format(task_id, queue, self.worker_id)) self.send_logs(
events_service.send_log_events(
self.worker_id,
task_id=task_id, task_id=task_id,
lines="task {} pulled from {} by worker {}".format( lines=["task {} pulled from {} by worker {}\n".format(task_id, queue, self.worker_id)],
task_id, queue, self.worker_id
),
level="INFO", level="INFO",
) )
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id)) self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
@ -730,8 +731,6 @@ class Worker(ServiceCommandSection):
# skip the previously printed lines, # skip the previously printed lines,
return f.readlines()[prev_line_count:] return f.readlines()[prev_line_count:]
events_service = self.get_service(Events)
stdout = open(stdout_path, "wt") stdout = open(stdout_path, "wt")
stderr = open(stderr_path, "wt") if stderr_path else stdout stderr = open(stderr_path, "wt") if stderr_path else stdout
try: try:
@ -757,10 +756,9 @@ class Worker(ServiceCommandSection):
# mark quit loop # mark quit loop
stopping = True stopping = True
if daemon: if daemon:
events_service.send_log_events( self.send_logs(
self.worker_id,
task_id=task_id, task_id=task_id,
lines=["User aborted: stopping task"], lines=["User aborted: stopping task\n"],
level="ERROR", level="ERROR",
) )
kill_all_child_processes(process.pid) kill_all_child_processes(process.pid)
@ -801,13 +799,14 @@ class Worker(ServiceCommandSection):
return status, stop_reason return status, stop_reason
def send_logs(self, task_id, lines): def send_logs(self, task_id, lines, level="DEBUG"):
""" """
Send output lines as log events to backend Send output lines as log events to backend
:param task_id: ID of task to send logs for :param task_id: ID of task to send logs for
:type task_id: Text :type task_id: Text
:param lines: lines to send :param lines: lines to send
:type lines: [Text] :type lines: [Text]
:param str level: log level, default DEBUG
:return: number of lines sent :return: number of lines sent
:rtype: int :rtype: int
""" """
@ -821,7 +820,7 @@ class Worker(ServiceCommandSection):
events_service = self.get_service(Events) events_service = self.get_service(Events)
try: try:
events_service.send_log_events( events_service.send_log_events(
self.worker_id, task_id=task_id, lines=lines, level="DEBUG" self.worker_id, task_id=task_id, lines=lines, level=level
) )
return len(lines) return len(lines)
except Exception as e: except Exception as e:
@ -1564,6 +1563,7 @@ class Worker(ServiceCommandSection):
mounted_cache_dir = '/root/.trains/cache' mounted_cache_dir = '/root/.trains/cache'
mounted_pip_dl_dir = '/root/.trains/pip-download-cache' mounted_pip_dl_dir = '/root/.trains/pip-download-cache'
mounted_vcs_cache = '/root/.trains/vcs-cache' mounted_vcs_cache = '/root/.trains/vcs-cache'
mounted_venv_dir = '/root/.trains/venvs-builds'
host_cache = Path(os.path.expandvars(self._session.config["sdk.storage.cache.default_base_dir"])).expanduser().as_posix() host_cache = Path(os.path.expandvars(self._session.config["sdk.storage.cache.default_base_dir"])).expanduser().as_posix()
host_pip_dl = Path(os.path.expandvars(self._session.config["agent.pip_download_cache.path"])).expanduser().as_posix() host_pip_dl = Path(os.path.expandvars(self._session.config["agent.pip_download_cache.path"])).expanduser().as_posix()
host_vcs_cache = Path(os.path.expandvars(self._session.config["agent.vcs_cache.path"])).expanduser().as_posix() host_vcs_cache = Path(os.path.expandvars(self._session.config["agent.vcs_cache.path"])).expanduser().as_posix()
@ -1572,6 +1572,7 @@ class Worker(ServiceCommandSection):
temp_config.put("agent.vcs_cache.path", mounted_vcs_cache) temp_config.put("agent.vcs_cache.path", mounted_vcs_cache)
temp_config.put("agent.package_manager.system_site_packages", True) temp_config.put("agent.package_manager.system_site_packages", True)
temp_config.put("agent.default_python", "") temp_config.put("agent.default_python", "")
temp_config.put("agent.venvs_dir", mounted_venv_dir)
host_apt_cache = Path(os.path.expandvars(self._session.config.get( host_apt_cache = Path(os.path.expandvars(self._session.config.get(
"agent.docker_apt_cache", '~/.trains/apt-cache'))).expanduser().as_posix() "agent.docker_apt_cache", '~/.trains/apt-cache'))).expanduser().as_posix()