Use os.register_at_fork instead of monkey patching fork for python > 3.6

This commit is contained in:
Alex Burlacu 2023-08-21 14:06:59 +03:00
parent f6ad5e6c06
commit 0b521b00a6

View File

@ -89,6 +89,7 @@ class SimpleQueueWrapper(object):
class PatchOsFork(object):
_original_fork = None
_registered_fork_callbacks = False
_current_task = None
_original_process_run = None
@ -104,13 +105,20 @@ class PatchOsFork(object):
# noinspection PyBroadException
try:
# only once
if cls._original_fork:
if cls._registered_fork_callbacks or cls._original_fork:
return
if six.PY2:
cls._original_fork = staticmethod(os.fork)
else:
cls._original_fork = os.fork
os.fork = cls._patched_fork
try:
os.register_at_fork(before=PatchOsFork._fork_callback_before,
after_in_child=PatchOsFork._fork_callback_after_child)
cls._registered_fork_callbacks = True
except Exception:
# python <3.6
if six.PY2:
cls._original_fork = staticmethod(os.fork)
else:
cls._original_fork = os.fork
os.fork = cls._patched_fork
except Exception:
pass
@ -182,10 +190,9 @@ class PatchOsFork(object):
pass
@staticmethod
def _patched_fork(*args, **kwargs):
def _fork_callback_before():
if not PatchOsFork._current_task:
return PatchOsFork._original_fork(*args, **kwargs)
return
from ..task import Task
# ensure deferred is done, but never try to generate a Task object
@ -195,46 +202,63 @@ class PatchOsFork(object):
# noinspection PyProtectedMember
Task._wait_for_deferred(task)
@staticmethod
def _fork_callback_after_child():
if not PatchOsFork._current_task:
return
from ..task import Task
# force creating a Task
task = Task.current_task()
if not task:
return
PatchOsFork._current_task = task
# # Hack: now make sure we setup the reporter threads (Log+Reporter)
# noinspection PyProtectedMember
if not bool(task._report_subprocess_enabled):
BackgroundMonitor.start_all(task=task)
# The signal handler method is Not enough, for the time being, we have both
# even though it makes little sense
# # if we got here patch the os._exit of our instance to call us
def _at_exit_callback(*a_args, **a_kwargs):
# just make sure we flush the internal state (the at exist caught by the external signal does the rest
# in theory we should not have to do any of that, but for some reason if we do not
# the signal is never caught by the signal call backs, not sure why....
sleep(0.1)
# Since at_exist handlers do not work on forked processes, we have to manually call them here
if task:
try:
# not to worry there is a double _at_exit protection implemented inside task._at_exit()
# noinspection PyProtectedMember
task._at_exit()
except: # noqa
pass
# noinspection PyProtectedMember, PyUnresolvedReferences
return os._org_exit(*a_args, **a_kwargs)
if not hasattr(os, '_org_exit'):
# noinspection PyProtectedMember, PyUnresolvedReferences
os._org_exit = os._exit
os._exit = _at_exit_callback
@staticmethod
def _patched_fork(*args, **kwargs):
if not PatchOsFork._current_task:
return PatchOsFork._original_fork(*args, **kwargs)
PatchOsFork._fork_callback_before()
ret = PatchOsFork._original_fork(*args, **kwargs)
if not PatchOsFork._current_task:
return ret
# Make sure the new process stdout is logged
if not ret:
# force creating a Task
task = Task.current_task()
if not task:
return ret
PatchOsFork._current_task = task
# # Hack: now make sure we setup the reporter threads (Log+Reporter)
# noinspection PyProtectedMember
if not bool(task._report_subprocess_enabled):
BackgroundMonitor.start_all(task=task)
# The signal handler method is Not enough, for the time being, we have both
# even though it makes little sense
# # if we got here patch the os._exit of our instance to call us
def _at_exit_callback(*a_args, **a_kwargs):
# just make sure we flush the internal state (the at exist caught by the external signal does the rest
# in theory we should not have to do any of that, but for some reason if we do not
# the signal is never caught by the signal call backs, not sure why....
sleep(0.1)
# Since at_exist handlers do not work on forked processes, we have to manually call them here
if task:
try:
# not to worry there is a double _at_exit protection implemented inside task._at_exit()
# noinspection PyProtectedMember
task._at_exit()
except: # noqa
pass
# noinspection PyProtectedMember, PyUnresolvedReferences
return os._org_exit(*a_args, **a_kwargs)
if not hasattr(os, '_org_exit'):
# noinspection PyProtectedMember, PyUnresolvedReferences
os._org_exit = os._exit
os._exit = _at_exit_callback
PatchOsFork._fork_callback_after_child()
return ret