diff --git a/trains_agent/commands/worker.py b/trains_agent/commands/worker.py index afa88ff..9403894 100644 --- a/trains_agent/commands/worker.py +++ b/trains_agent/commands/worker.py @@ -82,7 +82,11 @@ from trains_agent.helper.process import ( Argv, COMMAND_SUCCESS, Executable, - get_bash_output, shutdown_docker_process, get_docker_id, commit_docker) + get_bash_output, + shutdown_docker_process, + get_docker_id, + commit_docker +) from trains_agent.helper.package.cython_req import CythonRequirement from trains_agent.helper.repo import clone_repository_cached, RepoInfo, VCS from trains_agent.helper.resource_monitor import ResourceMonitor @@ -1096,8 +1100,9 @@ class Worker(ServiceCommandSection): debug=self._session.debug_mode, trace=self._session.trace, ) - self.report_monitor(ResourceMonitor.StatusReport(task=task_id)) - self.run_one_task(queue='', task_id=task_id, worker_args=worker_params, docker=docker) + self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id)) + self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker) + self.stop_monitor() self._unregister() return @@ -1113,7 +1118,7 @@ class Worker(ServiceCommandSection): if not disable_monitoring: self.log.debug("starting resource monitor") - self.report_monitor(ResourceMonitor.StatusReport(task=task_id)) + self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id)) execution = self.get_execution_info(current_task) @@ -1161,7 +1166,7 @@ class Worker(ServiceCommandSection): script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix() # run code - print("Running task id [%s]:" % task_id) + print("Running task id [%s]:" % current_task.id) extra = ['-u', ] if optimization: extra.append( @@ -1195,7 +1200,7 @@ class Worker(ServiceCommandSection): ) if repo_info: - self._update_commit_id(task_id, execution, repo_info) + self._update_commit_id(current_task.id, execution, repo_info) # Add the script CWD to the python path python_path = get_python_path(script_dir, execution.entry_point, self.package_api) \ @@ -1213,7 +1218,7 @@ class Worker(ServiceCommandSection): else: use_execv = is_linux_platform() and not isinstance(self.package_api, (PoetryAPI, CondaAPI)) - print("Starting Task Execution:\n".format(task_id)) + print("Starting Task Execution:\n".format(current_task.id)) exit_code = -1 try: if disable_monitoring: @@ -1242,13 +1247,13 @@ class Worker(ServiceCommandSection): ) print("Storing stdout and stderr log into [%s]" % temp_stdout_fname) exit_code, _ = self._log_command_output( - task_id=task_id, + task_id=current_task.id, cmd=command, stdout_path=temp_stdout_fname, cwd=script_dir, ) except KeyboardInterrupt: - self.handle_user_abort(task_id) + self.handle_user_abort(current_task.id) raise except Exception as e: self.log.warning(str(e)) @@ -1265,7 +1270,7 @@ class Worker(ServiceCommandSection): if not disable_monitoring: # we need to change task status according to exit code - self.handle_task_termination(task_id, exit_code, TaskStopReason.no_stop) + self.handle_task_termination(current_task.id, exit_code, TaskStopReason.no_stop) self.stop_monitor() # unregister the worker self._unregister() @@ -1863,6 +1868,10 @@ class Worker(ServiceCommandSection): cmds = [cmds] extra_shell_script_str = " ; ".join(map(str, cmds)) + " ; " + self.temp_config_path = self.temp_config_path or safe_mkstemp( + suffix=".cfg", prefix=".trains_agent.", text=True, name_only=True + ) + docker_cmd = dict(worker_id=self.worker_id, # docker_image=docker_image, # docker_arguments=docker_arguments,