From 6e6271fb91f2aeb2aa7a13c6d07d4e635baaa670 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 10 Jul 2021 10:34:17 +0300 Subject: [PATCH] Add reporter.wait_for_events() to ensure flush with wait_for_uploads awaits background processes --- clearml/backend_interface/metrics/reporter.py | 13 +++++++++++++ clearml/task.py | 4 ++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/clearml/backend_interface/metrics/reporter.py b/clearml/backend_interface/metrics/reporter.py index 4b200344..bac7c419 100644 --- a/clearml/backend_interface/metrics/reporter.py +++ b/clearml/backend_interface/metrics/reporter.py @@ -36,6 +36,7 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): task=task, wait_period=flush_frequency) self._flush_threshold = flush_threshold self._exit_event = TrEvent() + self._empty_state_event = TrEvent() self._queue = TrQueue() self._queue_size = 0 self._res_waiting = Semaphore() @@ -52,6 +53,8 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): self._queue = PrQueue() if not isinstance(self._exit_event, SafeEvent): self._exit_event = SafeEvent() + if not isinstance(self._empty_state_event, SafeEvent): + self._empty_state_event = SafeEvent() super(BackgroundReportService, self).set_subprocess_mode() def stop(self): @@ -66,6 +69,10 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): if not self.is_subprocess() or self.is_subprocess_alive(): self._event.set() + def wait_for_events(self, timeout=None): + self._empty_state_event.clear() + return self._empty_state_event.wait(timeout) + def add_event(self, ev): if not self._queue: return @@ -83,6 +90,7 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): # wait for all reports if self.get_num_results() > 0: self.wait_for_results() + self._empty_state_event.set() self._res_waiting.release() # make sure we flushed everything self._async_enable = False @@ -90,6 +98,7 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): self._write() if self.get_num_results() > 0: self.wait_for_results() + self._empty_state_event.set() self._res_waiting.release() def _write(self): @@ -200,6 +209,10 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan if self._report_service: self._report_service.flush() + def wait_for_events(self, timeout=None): + if self._report_service: + return self._report_service.wait_for_events(timeout=timeout) + def stop(self): if not self._report_service: return diff --git a/clearml/task.py b/clearml/task.py index 7985382d..a9ea88f0 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -1351,8 +1351,8 @@ class Task(_Task): self._logger._flush_stdout_handler() if self.__reporter: self.__reporter.flush() - # if wait_for_uploads: - # self.__reporter.wait_for_events() + if wait_for_uploads: + self.__reporter.wait_for_events() LoggerRoot.flush()