Add reporter.wait_for_events() to ensure flush with wait_for_uploads awaits background processes

This commit is contained in:
allegroai 2021-07-10 10:34:17 +03:00
parent 6b9297660e
commit 6e6271fb91
2 changed files with 15 additions and 2 deletions

View File

@ -36,6 +36,7 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
task=task, wait_period=flush_frequency)
self._flush_threshold = flush_threshold
self._exit_event = TrEvent()
self._empty_state_event = TrEvent()
self._queue = TrQueue()
self._queue_size = 0
self._res_waiting = Semaphore()
@ -52,6 +53,8 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
self._queue = PrQueue()
if not isinstance(self._exit_event, SafeEvent):
self._exit_event = SafeEvent()
if not isinstance(self._empty_state_event, SafeEvent):
self._empty_state_event = SafeEvent()
super(BackgroundReportService, self).set_subprocess_mode()
def stop(self):
@ -66,6 +69,10 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
if not self.is_subprocess() or self.is_subprocess_alive():
self._event.set()
def wait_for_events(self, timeout=None):
self._empty_state_event.clear()
return self._empty_state_event.wait(timeout)
def add_event(self, ev):
if not self._queue:
return
@ -83,6 +90,7 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
# wait for all reports
if self.get_num_results() > 0:
self.wait_for_results()
self._empty_state_event.set()
self._res_waiting.release()
# make sure we flushed everything
self._async_enable = False
@ -90,6 +98,7 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
self._write()
if self.get_num_results() > 0:
self.wait_for_results()
self._empty_state_event.set()
self._res_waiting.release()
def _write(self):
@ -200,6 +209,10 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan
if self._report_service:
self._report_service.flush()
def wait_for_events(self, timeout=None):
if self._report_service:
return self._report_service.wait_for_events(timeout=timeout)
def stop(self):
if not self._report_service:
return

View File

@ -1351,8 +1351,8 @@ class Task(_Task):
self._logger._flush_stdout_handler()
if self.__reporter:
self.__reporter.flush()
# if wait_for_uploads:
# self.__reporter.wait_for_events()
if wait_for_uploads:
self.__reporter.wait_for_events()
LoggerRoot.flush()