Fix multiprocess spawn context using ProcessFork kills subprocess before parent process ends

This commit is contained in:
allegroai 2021-06-12 23:10:26 +03:00
parent d769582332
commit e7de292c1c

View File

@ -7,6 +7,7 @@ from multiprocessing import Lock, Event as ProcessEvent
from threading import Thread, Event as TrEvent from threading import Thread, Event as TrEvent
from time import sleep, time from time import sleep, time
from typing import List, Dict, Optional from typing import List, Dict, Optional
from multiprocessing import Process, get_context
import psutil import psutil
from six.moves.queue import Empty, Queue as TrQueue from six.moves.queue import Empty, Queue as TrQueue
@ -19,9 +20,9 @@ except ImportError:
from multiprocessing.queues import SimpleQueue from multiprocessing.queues import SimpleQueue
try: try:
from multiprocessing.context import ForkProcess as Process # noqa from multiprocessing.context import ForkContext # noqa
except ImportError: except ImportError:
from multiprocessing import Process ForkContext = None
class ThreadCalls(object): class ThreadCalls(object):
@ -374,12 +375,37 @@ class BackgroundMonitor(object):
# setup # setup
for d in BackgroundMonitor._instances.get(id(task.id), []): for d in BackgroundMonitor._instances.get(id(task.id), []):
d.set_subprocess_mode() d.set_subprocess_mode()
# todo: solve for standalone spawn subprocess # todo: solve for standalone spawn subprocess
BackgroundMonitor._main_process = Process( if ForkContext is not None and isinstance(get_context(), ForkContext):
cls.__start_subprocess_forkprocess(task_obj_id=id(task.id))
else:
cls.__start_subprocess_os_fork(task_obj_id=id(task.id))
# wait until subprocess is up
if wait_for_subprocess:
cls._sub_process_started.wait()
@classmethod
def __start_subprocess_os_fork(cls, task_obj_id):
process_args = (task_obj_id, cls._sub_process_started, os.getpid())
BackgroundMonitor._main_process = os.fork()
# check if we are the child process
if BackgroundMonitor._main_process == 0:
# update to the child process pid
BackgroundMonitor._main_process = os.getpid()
cls._background_process_start(*process_args)
# force to leave the subprocess
leave_process(0)
return
@classmethod
def __start_subprocess_forkprocess(cls, task_obj_id):
_main_process = Process(
target=cls._background_process_start, target=cls._background_process_start,
args=(id(task.id), cls._sub_process_started) args=(task_obj_id, cls._sub_process_started, os.getpid())
) )
BackgroundMonitor._main_process.daemon = True _main_process.daemon = True
# Hack allow to create daemon subprocesses (even though python doesn't like it) # Hack allow to create daemon subprocesses (even though python doesn't like it)
un_daemonize = False un_daemonize = False
# noinspection PyBroadException # noinspection PyBroadException
@ -393,13 +419,14 @@ class BackgroundMonitor(object):
# try to start the background process, if we fail retry again, or crash # try to start the background process, if we fail retry again, or crash
for i in range(4): for i in range(4):
try: try:
BackgroundMonitor._main_process.start() _main_process.start()
break break
except BaseException: except BaseException:
if i < 3: if i < 3:
sleep(1) sleep(1)
continue continue
raise raise
BackgroundMonitor._main_process = _main_process.pid
if un_daemonize: if un_daemonize:
# noinspection PyBroadException # noinspection PyBroadException
try: try:
@ -407,14 +434,13 @@ class BackgroundMonitor(object):
current_process()._config['daemon'] = un_daemonize # noqa current_process()._config['daemon'] = un_daemonize # noqa
except BaseException: except BaseException:
pass pass
# wait until subprocess is up
if wait_for_subprocess:
cls._sub_process_started.wait()
@classmethod @classmethod
def _background_process_start(cls, task_obj_id, event_start=None): def _background_process_start(cls, task_obj_id, event_start=None, parent_pid=None):
# type: (int, Optional[SafeEvent]) -> None # type: (int, Optional[SafeEvent], Optional[int]) -> None
is_debugger_running = bool(getattr(sys, 'gettrace', None) and sys.gettrace()) is_debugger_running = bool(getattr(sys, 'gettrace', None) and sys.gettrace())
# make sure we update the pid to our own
cls._main_process = os.getpid()
# restore original signal, this will prevent any deadlocks # restore original signal, this will prevent any deadlocks
# Do not change the exception we need to catch base exception as well # Do not change the exception we need to catch base exception as well
# noinspection PyBroadException # noinspection PyBroadException
@ -445,16 +471,26 @@ class BackgroundMonitor(object):
# wait until we are signaled # wait until we are signaled
for i in instances: for i in instances:
# DO NOT CHANGE, we need to catch base exception, if the process gte's killed
try:
while i._thread and i._thread.is_alive():
# noinspection PyBroadException # noinspection PyBroadException
try: try:
if i._thread and i._thread.is_alive(): p = psutil.Process(parent_pid)
# DO Not change, we need to catch base exception, if the process gte's killed parent_alive = p.is_running() and p.status() != psutil.STATUS_ZOMBIE
except Exception:
parent_alive = False
# if parent process is not here we should just leave!
if not parent_alive:
return
# DO NOT CHANGE, we need to catch base exception, if the process gte's killed
try: try:
i._thread.join() # timeout so we can detect if the parent process got killed.
i._thread.join(timeout=30.)
except: # noqa except: # noqa
break break
else:
pass
except: # noqa except: # noqa
pass pass
# we are done, leave process # we are done, leave process
@ -473,11 +509,10 @@ class BackgroundMonitor(object):
return False return False
# noinspection PyBroadException # noinspection PyBroadException
try: try:
return \ p = psutil.Process(cls._main_process)
cls._main_process.is_alive() and \ return p.is_running() and p.status() != psutil.STATUS_ZOMBIE
psutil.Process(cls._main_process.pid).status() != psutil.STATUS_ZOMBIE
except Exception: except Exception:
current_pid = cls._main_process.pid current_pid = cls._main_process
if not current_pid: if not current_pid:
return False return False
try: try:
@ -488,7 +523,7 @@ class BackgroundMonitor(object):
for child in parent.children(recursive=True): for child in parent.children(recursive=True):
# kill ourselves last (if we need to) # kill ourselves last (if we need to)
if child.pid == current_pid: if child.pid == current_pid:
return child.status() != psutil.STATUS_ZOMBIE return child.is_running() and child.status() != psutil.STATUS_ZOMBIE
return False return False
def is_subprocess(self): def is_subprocess(self):