diff --git a/trains_agent/commands/worker.py b/trains_agent/commands/worker.py index a3a663e..ba1f2ed 100644 --- a/trains_agent/commands/worker.py +++ b/trains_agent/commands/worker.py @@ -411,16 +411,22 @@ class Worker(ServiceCommandSection): try: response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"]) task_docker_cmd = response.execution.docker_cmd + task_docker_cmd = task_docker_cmd.strip() if task_docker_cmd else None except Exception: - warning('No docker image specified, using default docker image: {} {}'.format( - self._docker_image, self._docker_arguments or '')) task_docker_cmd = None if task_docker_cmd: + self.send_logs(task_id=task_id, + lines=['Running Task {} inside docker: {}\n'.format(task_id, task_docker_cmd)], + level="INFO") task_docker_cmd = task_docker_cmd.split(' ') full_docker_cmd = self.docker_image_func(docker_image=task_docker_cmd[0], docker_arguments=task_docker_cmd[1:]) else: + self.send_logs(task_id=task_id, + lines=['No docker image specified, running Task {} inside docker: {} {}\n'.format( + task_id, self._docker_image, self._docker_arguments or '')], + level="INFO") full_docker_cmd = self.docker_image_func(docker_image=self._docker_image, docker_arguments=self._docker_arguments) # Update docker command @@ -489,7 +495,6 @@ class Worker(ServiceCommandSection): :param worker_params: Worker command line arguments :type worker_params: ``trains_agent.helper.process.WorkerParams`` """ - events_service = self.get_service(Events) if not self._daemon_foreground: print('Starting infinite task polling loop...') @@ -519,13 +524,9 @@ class Worker(ServiceCommandSection): print("No tasks in queue {}".format(queue)) continue - print("task {} pulled from {} by worker {}".format(task_id, queue, self.worker_id)) - events_service.send_log_events( - self.worker_id, + self.send_logs( task_id=task_id, - lines="task {} pulled from {} by worker {}".format( - task_id, queue, self.worker_id - ), + lines=["task {} pulled from {} by worker {}\n".format(task_id, queue, self.worker_id)], level="INFO", ) self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id)) @@ -730,8 +731,6 @@ class Worker(ServiceCommandSection): # skip the previously printed lines, return f.readlines()[prev_line_count:] - events_service = self.get_service(Events) - stdout = open(stdout_path, "wt") stderr = open(stderr_path, "wt") if stderr_path else stdout try: @@ -757,10 +756,9 @@ class Worker(ServiceCommandSection): # mark quit loop stopping = True if daemon: - events_service.send_log_events( - self.worker_id, + self.send_logs( task_id=task_id, - lines=["User aborted: stopping task"], + lines=["User aborted: stopping task\n"], level="ERROR", ) kill_all_child_processes(process.pid) @@ -801,13 +799,14 @@ class Worker(ServiceCommandSection): return status, stop_reason - def send_logs(self, task_id, lines): + def send_logs(self, task_id, lines, level="DEBUG"): """ Send output lines as log events to backend :param task_id: ID of task to send logs for :type task_id: Text :param lines: lines to send :type lines: [Text] + :param str level: log level, default DEBUG :return: number of lines sent :rtype: int """ @@ -821,7 +820,7 @@ class Worker(ServiceCommandSection): events_service = self.get_service(Events) try: events_service.send_log_events( - self.worker_id, task_id=task_id, lines=lines, level="DEBUG" + self.worker_id, task_id=task_id, lines=lines, level=level ) return len(lines) except Exception as e: @@ -1564,6 +1563,7 @@ class Worker(ServiceCommandSection): mounted_cache_dir = '/root/.trains/cache' mounted_pip_dl_dir = '/root/.trains/pip-download-cache' mounted_vcs_cache = '/root/.trains/vcs-cache' + mounted_venv_dir = '/root/.trains/venvs-builds' host_cache = Path(os.path.expandvars(self._session.config["sdk.storage.cache.default_base_dir"])).expanduser().as_posix() host_pip_dl = Path(os.path.expandvars(self._session.config["agent.pip_download_cache.path"])).expanduser().as_posix() host_vcs_cache = Path(os.path.expandvars(self._session.config["agent.vcs_cache.path"])).expanduser().as_posix() @@ -1572,6 +1572,7 @@ class Worker(ServiceCommandSection): temp_config.put("agent.vcs_cache.path", mounted_vcs_cache) temp_config.put("agent.package_manager.system_site_packages", True) temp_config.put("agent.default_python", "") + temp_config.put("agent.venvs_dir", mounted_venv_dir) host_apt_cache = Path(os.path.expandvars(self._session.config.get( "agent.docker_apt_cache", '~/.trains/apt-cache'))).expanduser().as_posix()