Fix reported images might not all be reported when waiting to complete the task

This commit is contained in:
allegroai 2022-11-09 11:39:26 +02:00
parent b793f2dfc6
commit 9164a38708

View File

@ -47,6 +47,16 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
self._async_enable = async_enable
self._is_thread_mode_in_subprocess_flag = None
# We need this list because on close, the daemon thread might call _write.
# _write will pop everything from queue and add the events to a list,
# then attempt to send the list of events to the backend.
# But its possible on close for the daemon thread to die in the middle of all that.
# So we have to preserve the list the daemon thread attempted to send to the backend
# such that we can retry this.
# Is is possible that we send the same events twice or that we are missing exactly one event.
# Both of these cases should be very rare and I dont really see how we can do better.
self._processing_events = []
def set_storage_uri(self, uri):
self._storage_uri = uri
@ -106,6 +116,7 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
# if enough time passed and the flush event was not cleared,
# there is no daemon thread running, we should leave
if time()-tic > self.__daemon_live_check_timeout and self._flush_event.wait(0):
self._write()
break
elif isinstance(self._empty_state_event, SafeEvent):
tic = time()
@ -176,8 +187,11 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
if self._queue.empty():
return
# print('reporting %d events' % len(self._events))
if self._async_enable:
events = []
else:
events = self._processing_events
while not self._queue.empty():
try:
events.append(self._queue.get(block=False))
@ -196,6 +210,8 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
if self._async_enable:
self._add_async_result(res)
else:
self._processing_events.clear()
def send_all_events(self, wait=True):
self._write()
@ -245,7 +261,7 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan
self._bucket_config = None
self._storage_uri = None
self._async_enable = async_enable
self._flush_frequency = config.get("development.worker.report_period_sec", 2)
self._flush_frequency = 5.0
self._max_iteration = 0
flush_threshold = config.get("development.worker.report_event_flush_threshold", 50)
self._report_service = BackgroundReportService(