Fix deadlock

This commit is contained in:
allegroai 2022-07-28 18:51:48 +03:00
parent 301d68d2d3
commit 969e095478
3 changed files with 24 additions and 12 deletions

View File

@ -1,10 +1,10 @@
import logging import logging
import sys import sys
import threading
from time import time from time import time
from ..binding.frameworks import _patched_call # noqa from ..binding.frameworks import _patched_call # noqa
from ..config import running_remotely, config, DEBUG_SIMULATE_REMOTE_TASK from ..config import running_remotely, config, DEBUG_SIMULATE_REMOTE_TASK
from ..utilities.process.mp import ForkSafeRLock
class StdStreamPatch(object): class StdStreamPatch(object):
@ -176,8 +176,8 @@ class PrintPatchLogger(object):
Used for capturing and logging stdin and stderr when running in development mode pseudo worker. Used for capturing and logging stdin and stderr when running in development mode pseudo worker.
""" """
patched = False patched = False
lock = threading.Lock() lock = ForkSafeRLock()
recursion_protect_lock = threading.RLock() recursion_protect_lock = ForkSafeRLock()
cr_flush_period = None cr_flush_period = None
def __init__(self, stream, logger=None, level=logging.INFO, load_config_defaults=True): def __init__(self, stream, logger=None, level=logging.INFO, load_config_defaults=True):

View File

@ -76,8 +76,8 @@ class SimpleQueueWrapper(object):
def _patched_put(*a_args, **a_kwargs): def _patched_put(*a_args, **a_kwargs):
# make sure we flush everything, because after we push the result we will get terminated # make sure we flush everything, because after we push the result we will get terminated
try: try:
task = self.__current_task if self.__current_task and self.__current_task.is_main_task():
task.flush(wait_for_uploads=True) self.__current_task.flush(wait_for_uploads=True)
except: # noqa except: # noqa
pass pass
return getattr(self.__simple_queue, "put")(*a_args, **a_kwargs) return getattr(self.__simple_queue, "put")(*a_args, **a_kwargs)
@ -152,6 +152,7 @@ class PatchOsFork(object):
task = None task = None
# check if this is Process Pool function # check if this is Process Pool function
patched_worker = False
if hasattr(self, "_target"): if hasattr(self, "_target"):
# Now we have to patch Pool, because pool terminates subprocess directly after # Now we have to patch Pool, because pool terminates subprocess directly after
# the return value of the pool worker function is pushed into the queue, # the return value of the pool worker function is pushed into the queue,
@ -159,19 +160,26 @@ class PatchOsFork(object):
try: try:
if self._target == pool.worker: # noqa if self._target == pool.worker: # noqa
self._target = partial(PatchOsFork._patched_pool_worker, pool.worker) # noqa self._target = partial(PatchOsFork._patched_pool_worker, pool.worker) # noqa
patched_worker = True
except: # noqa except: # noqa
pass pass
try: try:
return PatchOsFork._original_process_run(self, *args, **kwargs) return PatchOsFork._original_process_run(self, *args, **kwargs)
finally: finally:
# force creating a Task if task:
try: try:
if task: if patched_worker:
# noinspection PyProtectedMember # remove at exit hooks, we will deadlock when the
task._at_exit() # main Pool manager will terminate this process, and it will...
except: # noqa # noinspection PyProtectedMember
pass task._at_exit_called = True
else:
# terminate the current Task
# noinspection PyProtectedMember
task._at_exit()
except: # noqa
pass
@staticmethod @staticmethod
def _patched_fork(*args, **kwargs): def _patched_fork(*args, **kwargs):

View File

@ -142,6 +142,10 @@ class ForkSafeRLock(_ForkSafeThreadSyncObject):
# Do whatever cleanup. # Do whatever cleanup.
self.release() self.release()
def _is_owned(self):
self._create()
return self._sync._is_owned() # noqa
class ForkSemaphore(_ForkSafeThreadSyncObject): class ForkSemaphore(_ForkSafeThreadSyncObject):
def __init__(self, value=1): def __init__(self, value=1):