From c8c824815ef25ed0501701224e8bd96db2600b16 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Wed, 13 Apr 2022 14:20:00 +0300 Subject: [PATCH] Fix Windows sub process might end up waiting forever for uploads to finish if subprocess is very shot lived --- clearml/backend_interface/metrics/reporter.py | 22 +++++++++++++++++++ clearml/task.py | 2 +- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/clearml/backend_interface/metrics/reporter.py b/clearml/backend_interface/metrics/reporter.py index e90413d1..6aa5c5b3 100644 --- a/clearml/backend_interface/metrics/reporter.py +++ b/clearml/backend_interface/metrics/reporter.py @@ -31,6 +31,8 @@ except ImportError: class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): + __daemon_live_check_timeout = 10.0 + def __init__(self, task, async_enable, metrics, flush_frequency, flush_threshold): super(BackgroundReportService, self).__init__( task=task, wait_period=flush_frequency) @@ -65,6 +67,15 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): self._flush_event.set() super(BackgroundReportService, self).stop() + def wait(self, timeout=None): + if not self._done_ev: + return + if not self.is_subprocess_mode() or self.is_subprocess_mode_and_parent_process(): + tic = time() + while self.is_alive() and (not timeout or time()-tic < timeout): + if self._done_ev.wait(timeout=1.0): + break + def flush(self): while isinstance(self._queue, PrQueue) and self._queue.is_pending(): sleep(0.1) @@ -85,10 +96,17 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): self._empty_state_event.clear() if isinstance(self._empty_state_event, ForkEvent): + self._flush_event.set() tic = time() while self._thread and self._thread.is_alive() and (not timeout or time()-tic < timeout): if self._empty_state_event.wait(timeout=1.0): break + if self._event.wait(0) or self._done_ev.wait(0): + break + # if enough time passed and the flush event was not cleared, + # there is no daemon thread running, we should leave + if time()-tic > self.__daemon_live_check_timeout and self._flush_event.wait(0): + break elif isinstance(self._empty_state_event, SafeEvent): tic = time() while self.is_subprocess_alive() and (not timeout or time()-tic < timeout): @@ -290,6 +308,10 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan def is_alive(self): return self._report_service and self._report_service.is_alive() + def is_constructed(self): + # noinspection PyProtectedMember + return self._report_service and (self._report_service.is_alive() or self._report_service._thread is True) + def get_num_results(self): return self._report_service.get_num_results() diff --git a/clearml/task.py b/clearml/task.py index 9152fa07..b7b21885 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -663,7 +663,7 @@ class Task(_Task): # something to the log. task._dev_mode_setup_worker() - if (not task._reporter or not task._reporter.is_alive()) and \ + if (not task._reporter or not task._reporter.is_constructed()) and \ is_sub_process_task_id and not cls._report_subprocess_enabled: task._setup_reporter()