Fix crash inside forked subprocess might leave SafeQueue in a locked state, causing task.close() to hang

This commit is contained in:
allegroai 2021-10-24 17:33:21 +03:00
parent 12439b9970
commit 1795344ec3

View File

@ -49,14 +49,18 @@ class ThreadCalls(object):
self._queue.put((func, args))
return True
def close(self):
if not self._thread:
return
def close(self, timeout=5.):
t = self._thread
# push something into queue so it knows this is the end
self._queue.put(None)
# wait fot thread
t.join()
if not t:
return
try:
# push something into queue so it knows this is the end
self._queue.put(None)
# wait fot thread it should not take long, so we have a 5 second timeout
# the background thread itself is doing nothing but push into a queue, so it should not take long
t.join(timeout=timeout)
except BaseException: # noqa
pass
# mark thread is done
self._thread = None
@ -76,6 +80,7 @@ class ThreadCalls(object):
request[0]()
except Exception:
pass
self._thread = None
class SingletonThreadPool(object):
@ -110,8 +115,9 @@ class SafeQueue(object):
def __init__(self, *args, **kwargs):
self._reader_thread = None
self._reader_thread_started = False
# Fix the python Queue and Use SimpleQueue write so it uses a single OS write,
# making it atomic message passing
self._q = SimpleQueue(*args, **kwargs)
# Fix the simple queue write so it uses a single OS write, making it atomic message passing
# noinspection PyBroadException
try:
# noinspection PyUnresolvedReferences,PyProtectedMember
@ -129,17 +135,24 @@ class SafeQueue(object):
# only call from main put process
return self._q_size > 0
def close(self, event, timeout=100.0):
def close(self, event, timeout=3.0):
# wait until all pending requests pushed
tic = time()
prev_q_size = self._q_size
while self.is_pending():
if event:
event.set()
if not self.__thread_pool.is_active():
break
sleep(0.1)
# timeout is for the maximum time to pull a single object from the queue,
# this way if we get stuck we notice quickly and abort
if timeout and (time()-tic) > timeout:
break
if prev_q_size == self._q_size:
break
else:
prev_q_size = self._q_size
tic = time()
def get(self, *args, **kwargs):
return self._get_internal_queue(*args, **kwargs)
@ -169,7 +182,12 @@ class SafeQueue(object):
self.__thread_pool.get().apply_async(self._q_put, args=(obj, ))
def _q_put(self, obj):
self._q.put(obj)
try:
self._q.put(obj)
except BaseException:
# make sure we zero the _q_size of the process dies (i.e. queue put fails)
self._q_size = 0
raise
# GIL will make sure it is atomic
self._q_size -= 1