Fix Dataset.finalize() can hang in extreme scenario

This commit is contained in:
allegroai 2021-05-20 11:36:53 +03:00
parent 07a22a38ac
commit d531b508cb

View File

@ -201,6 +201,7 @@ class SingletonLock(AbstractContextManager):
class BackgroundMonitor(object): class BackgroundMonitor(object):
# If we will need multiple monitoring contexts (i.e. subprocesses) this will become a dict # If we will need multiple monitoring contexts (i.e. subprocesses) this will become a dict
_main_process = None _main_process = None
_main_process_task_id = None
_parent_pid = None _parent_pid = None
_sub_process_started = None _sub_process_started = None
_instances = {} # type: Dict[int, List[BackgroundMonitor]] _instances = {} # type: Dict[int, List[BackgroundMonitor]]
@ -213,6 +214,7 @@ class BackgroundMonitor(object):
self._thread = None self._thread = None
self._wait_timeout = wait_period self._wait_timeout = wait_period
self._subprocess = None if task.is_main_task() else False self._subprocess = None if task.is_main_task() else False
self._task_id = task.id
self._task_obj_id = id(task.id) self._task_obj_id = id(task.id)
def start(self): def start(self):
@ -296,6 +298,7 @@ class BackgroundMonitor(object):
cls._parent_pid = os.getpid() cls._parent_pid = os.getpid()
cls._sub_process_started = SafeEvent() cls._sub_process_started = SafeEvent()
cls._sub_process_started.clear() cls._sub_process_started.clear()
cls._main_process_task_id = task.id
# setup # setup
for d in BackgroundMonitor._instances.get(id(task.id), []): for d in BackgroundMonitor._instances.get(id(task.id), []):
d.set_subprocess_mode() d.set_subprocess_mode()
@ -381,8 +384,8 @@ class BackgroundMonitor(object):
return isinstance(self._thread, Thread) and self._thread.is_alive() return isinstance(self._thread, Thread) and self._thread.is_alive()
@classmethod @classmethod
def is_subprocess_alive(cls): def is_subprocess_alive(cls, task=None):
if not cls._main_process: if not cls._main_process or (task and cls._main_process_task_id != task.id):
return False return False
# noinspection PyBroadException # noinspection PyBroadException
try: try:
@ -405,19 +408,23 @@ class BackgroundMonitor(object):
return False return False
def is_subprocess(self): def is_subprocess(self):
return self._subprocess is not False and bool(self._main_process) return self._subprocess is not False and \
bool(self._main_process) and self._task_id == self._main_process_task_id
def _get_instances(self): def _get_instances(self):
return self._instances.setdefault(self._task_obj_id, []) return self._instances.setdefault(self._task_obj_id, [])
@classmethod @classmethod
def is_subprocess_enabled(cls): def is_subprocess_enabled(cls, task=None):
return bool(cls._main_process) return bool(cls._main_process) and (not task or task.id == cls._main_process_task_id)
@classmethod @classmethod
def clear_main_process(cls, task): def clear_main_process(cls, task):
if BackgroundMonitor._main_process_task_id != task.id:
return
cls.wait_for_sub_process(task) cls.wait_for_sub_process(task)
BackgroundMonitor._main_process = None BackgroundMonitor._main_process = None
BackgroundMonitor._main_process_task_id = None
BackgroundMonitor._parent_pid = None BackgroundMonitor._parent_pid = None
BackgroundMonitor._sub_process_started = None BackgroundMonitor._sub_process_started = None
BackgroundMonitor._instances = {} BackgroundMonitor._instances = {}
@ -425,14 +432,14 @@ class BackgroundMonitor(object):
@classmethod @classmethod
def wait_for_sub_process(cls, task, timeout=None): def wait_for_sub_process(cls, task, timeout=None):
if not cls.is_subprocess_enabled(): if not cls.is_subprocess_enabled(task=task):
return return
for d in BackgroundMonitor._instances.get(id(task.id), []): for d in BackgroundMonitor._instances.get(id(task.id), []):
d.stop() d.stop()
tic = time() tic = time()
while cls.is_subprocess_alive() and (not timeout or time()-tic < timeout): while cls.is_subprocess_alive(task=task) and (not timeout or time()-tic < timeout):
sleep(0.03) sleep(0.03)