Fix multi-process spawning wait-for-uploads can create a deadlock in very rare cases

This commit is contained in:
allegroai 2021-09-03 03:25:58 +03:00
parent 0217dea75d
commit dbfe45d005
3 changed files with 16 additions and 7 deletions

View File

@ -4,6 +4,7 @@ import logging
import math import math
from multiprocessing import Semaphore from multiprocessing import Semaphore
from threading import Event as TrEvent from threading import Event as TrEvent
from time import sleep
import numpy as np import numpy as np
import six import six
@ -60,16 +61,21 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
def stop(self): def stop(self):
if isinstance(self._queue, PrQueue): if isinstance(self._queue, PrQueue):
self._queue.close(self._event) self._queue.close(self._event)
if not self.is_subprocess() or self.is_subprocess_alive(): if not self.is_subprocess_mode() or self.is_subprocess_alive():
self._exit_event.set() self._exit_event.set()
super(BackgroundReportService, self).stop() super(BackgroundReportService, self).stop()
def flush(self): def flush(self):
self._queue_size = 0 self._queue_size = 0
if not self.is_subprocess() or self.is_subprocess_alive(): if not self.is_subprocess_mode() or self.is_subprocess_alive():
self._event.set() self._event.set()
def wait_for_events(self, timeout=None): def wait_for_events(self, timeout=None):
# noinspection PyProtectedMember
if self._is_subprocess_mode_and_not_parent_process():
while self._queue and not self._queue.empty():
sleep(0.1)
return
self._empty_state_event.clear() self._empty_state_event.clear()
return self._empty_state_event.wait(timeout) return self._empty_state_event.wait(timeout)
@ -218,7 +224,7 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan
return return
report_service = self._report_service report_service = self._report_service
self._report_service = None self._report_service = None
if not report_service.is_subprocess() or report_service.is_alive(): if not report_service.is_subprocess_mode() or report_service.is_alive():
report_service.stop() report_service.stop()
report_service.wait() report_service.wait()
else: else:

View File

@ -254,7 +254,7 @@ class TaskHandler(BufferingHandler):
_background_log = self._background_log _background_log = self._background_log
self._background_log = None self._background_log = None
if _background_log: if _background_log:
if not _background_log.is_subprocess() or _background_log.is_alive(): if not _background_log.is_subprocess_mode() or _background_log.is_alive():
_background_log.stop() _background_log.stop()
if wait: if wait:
# noinspection PyBroadException # noinspection PyBroadException

View File

@ -328,7 +328,7 @@ class BackgroundMonitor(object):
if not self._thread: if not self._thread:
return return
if not self.is_subprocess() or self.is_subprocess_alive(): if not self.is_subprocess_mode() or self.is_subprocess_alive():
self._event.set() self._event.set()
if isinstance(self._thread, Thread): if isinstance(self._thread, Thread):
@ -505,7 +505,7 @@ class BackgroundMonitor(object):
return return
def is_alive(self): def is_alive(self):
if self.is_subprocess(): if self.is_subprocess_mode():
return self.is_subprocess_alive() and self._thread \ return self.is_subprocess_alive() and self._thread \
and self._start_ev.is_set() and not self._done_ev.is_set() and self._start_ev.is_set() and not self._done_ev.is_set()
else: else:
@ -534,13 +534,16 @@ class BackgroundMonitor(object):
return child.is_running() and child.status() != psutil.STATUS_ZOMBIE return child.is_running() and child.status() != psutil.STATUS_ZOMBIE
return False return False
def is_subprocess(self): def is_subprocess_mode(self):
return self._subprocess is not False and \ return self._subprocess is not False and \
bool(self._main_process) and self._task_id == self._main_process_task_id bool(self._main_process) and self._task_id == self._main_process_task_id
def _get_instances(self): def _get_instances(self):
return self._instances.setdefault(self._task_obj_id, []) return self._instances.setdefault(self._task_obj_id, [])
def _is_subprocess_mode_and_not_parent_process(self):
return self.is_subprocess_mode() and self._main_process != os.getpid()
@classmethod @classmethod
def is_subprocess_enabled(cls, task=None): def is_subprocess_enabled(cls, task=None):
return bool(cls._main_process) and (not task or task.id == cls._main_process_task_id) return bool(cls._main_process) and (not task or task.id == cls._main_process_task_id)