Create dev task manually when constructing the Task

This commit is contained in:
allegroai 2020-04-09 12:27:13 +03:00
parent 11420adce7
commit b2c2002c40
3 changed files with 14 additions and 10 deletions

View File

@ -168,15 +168,15 @@ class TaskHandler(BufferingHandler):
def _send_events(self, a_request): def _send_events(self, a_request):
try: try:
if self._thread_pool is None: if self._thread_pool is None:
self.__log_stderr('Warning: trains.Task - ' self.__log_stderr('WARNING: trains.Task - '
'Task.close() flushing remaining logs ({})'.format(self._pending)) 'Task.close() flushing remaining logs ({})'.format(self._pending))
self._pending -= 1 self._pending -= 1
res = self.session.send(a_request) res = self.session.send(a_request)
if not res.ok(): if not res.ok():
self.__log_stderr("Warning: trains.log._send_events: failed logging task to backend " self.__log_stderr("WARNING: trains.log._send_events: failed logging task to backend "
"({:d} lines, {})".format(len(a_request.requests), str(res.meta))) "({:d} lines, {})".format(len(a_request.requests), str(res.meta)))
except Exception as ex: except Exception as ex:
self.__log_stderr("Warning: trains.log._send_events: Retrying, " self.__log_stderr("WARNING: trains.log._send_events: Retrying, "
"failed logging task to backend ({:d} lines): {}".format(len(a_request.requests), ex)) "failed logging task to backend ({:d} lines): {}".format(len(a_request.requests), ex))
# we should push ourselves back into the thread pool # we should push ourselves back into the thread pool
if self._thread_pool: if self._thread_pool:

View File

@ -724,10 +724,11 @@ class Logger(object):
def _start_task_if_needed(self): def _start_task_if_needed(self):
# do not refresh the task status read from cached variable _status # do not refresh the task status read from cached variable _status
if str(self._task._status) == str(tasks.TaskStatusEnum.created): # if str(self._task._status) == str(tasks.TaskStatusEnum.created):
self._task.mark_started() # self._task.mark_started()
#
self._task._dev_mode_task_start() # self._task._dev_mode_task_start()
pass
def _flush_stdout_handler(self): def _flush_stdout_handler(self):
if self._task_handler and DevWorker.report_stdout: if self._task_handler and DevWorker.report_stdout:

View File

@ -767,10 +767,13 @@ class Task(_Task):
Close the current Task. Enables to manually shutdown the task. Close the current Task. Enables to manually shutdown the task.
Should only be called if you are absolutely sure there is no need for the Task. Should only be called if you are absolutely sure there is no need for the Task.
""" """
# store is main before we call at_exit, because will will Null it
is_main = self.is_main_task()
self._at_exit() self._at_exit()
self._at_exit_called = False # leave _at_exit_called set to True (I think)
## self._at_exit_called = False
# unregister atexit callbacks and signal hooks, if we are the main task # unregister atexit callbacks and signal hooks, if we are the main task
if self.is_main_task(): if is_main:
self.__register_at_exit(None) self.__register_at_exit(None)
def register_artifact(self, name, artifact, metadata=None, uniqueness_columns=True): def register_artifact(self, name, artifact, metadata=None, uniqueness_columns=True):
@ -1419,7 +1422,7 @@ class Task(_Task):
parent.terminate() parent.terminate()
def _dev_mode_setup_worker(self, model_updated=False): def _dev_mode_setup_worker(self, model_updated=False):
if running_remotely() or not self.is_main_task(): if running_remotely() or not self.is_main_task() or self._at_exit_called:
return return
if self._dev_worker: if self._dev_worker: