Fix Windows sub process might end up waiting forever for uploads to finish if subprocess is very shot lived

This commit is contained in:
allegroai 2022-04-13 14:20:00 +03:00
parent 15574aa7e2
commit c8c824815e
2 changed files with 23 additions and 1 deletions

View File

@ -31,6 +31,8 @@ except ImportError:
class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
__daemon_live_check_timeout = 10.0
def __init__(self, task, async_enable, metrics, flush_frequency, flush_threshold):
super(BackgroundReportService, self).__init__(
task=task, wait_period=flush_frequency)
@ -65,6 +67,15 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
self._flush_event.set()
super(BackgroundReportService, self).stop()
def wait(self, timeout=None):
if not self._done_ev:
return
if not self.is_subprocess_mode() or self.is_subprocess_mode_and_parent_process():
tic = time()
while self.is_alive() and (not timeout or time()-tic < timeout):
if self._done_ev.wait(timeout=1.0):
break
def flush(self):
while isinstance(self._queue, PrQueue) and self._queue.is_pending():
sleep(0.1)
@ -85,10 +96,17 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
self._empty_state_event.clear()
if isinstance(self._empty_state_event, ForkEvent):
self._flush_event.set()
tic = time()
while self._thread and self._thread.is_alive() and (not timeout or time()-tic < timeout):
if self._empty_state_event.wait(timeout=1.0):
break
if self._event.wait(0) or self._done_ev.wait(0):
break
# if enough time passed and the flush event was not cleared,
# there is no daemon thread running, we should leave
if time()-tic > self.__daemon_live_check_timeout and self._flush_event.wait(0):
break
elif isinstance(self._empty_state_event, SafeEvent):
tic = time()
while self.is_subprocess_alive() and (not timeout or time()-tic < timeout):
@ -290,6 +308,10 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan
def is_alive(self):
return self._report_service and self._report_service.is_alive()
def is_constructed(self):
# noinspection PyProtectedMember
return self._report_service and (self._report_service.is_alive() or self._report_service._thread is True)
def get_num_results(self):
return self._report_service.get_num_results()

View File

@ -663,7 +663,7 @@ class Task(_Task):
# something to the log.
task._dev_mode_setup_worker()
if (not task._reporter or not task._reporter.is_alive()) and \
if (not task._reporter or not task._reporter.is_constructed()) and \
is_sub_process_task_id and not cls._report_subprocess_enabled:
task._setup_reporter()