diff --git a/trains/backend_interface/task/log.py b/trains/backend_interface/task/log.py index 6bdce2dd..8582c03f 100644 --- a/trains/backend_interface/task/log.py +++ b/trains/backend_interface/task/log.py @@ -12,6 +12,7 @@ buffer_capacity = config.get('log.task_log_buffer_capacity', 100) class TaskHandler(BufferingHandler): __flush_max_history_seconds = 30. + __wait_for_flush_timeout = int(10) __once = False @property @@ -130,6 +131,7 @@ class TaskHandler(BufferingHandler): def wait_for_flush(self, shutdown=False): msg = 'Task.log.wait_for_flush: %d' + from ...utilities.os.lowlevel import threadpool_waited_join ll = self.__log_stderr ll(msg % 0) self.acquire() @@ -142,9 +144,14 @@ class TaskHandler(BufferingHandler): ll(msg % 4) try: ll(msg % 5) - t.close() + t.terminate() ll(msg % 6) - t.join() + for i in range(int(self.__wait_for_flush_timeout)): + if self._pending <= 0: + break + self.__log_stderr('INFO: trains.Task - flushing console logs (timeout {}s)'.format( + self.__wait_for_flush_timeout-i)) + threadpool_waited_join(t, timeout=1.0) ll(msg % 7) except Exception: ll(msg % 8) @@ -175,7 +182,7 @@ class TaskHandler(BufferingHandler): try: if self._thread_pool is None: self.__log_stderr('INFO: trains.Task - ' - 'Task.close() flushing remaining logs ({}){}'.format(self._pending, a_request)) + 'Task.close() flushing remaining logs ({})'.format(self._pending)) self._pending -= 1 res = self.session.send(a_request) if not res.ok():