From 0b521b00a68a298e0cbe0adb52e3908c03e3aa93 Mon Sep 17 00:00:00 2001 From: Alex Burlacu Date: Mon, 21 Aug 2023 14:06:59 +0300 Subject: [PATCH] Use os.register_at_fork instead of monkey patching fork for python > 3.6 --- clearml/binding/environ_bind.py | 114 +++++++++++++++++++------------- 1 file changed, 69 insertions(+), 45 deletions(-) diff --git a/clearml/binding/environ_bind.py b/clearml/binding/environ_bind.py index 605c2949..e5167368 100644 --- a/clearml/binding/environ_bind.py +++ b/clearml/binding/environ_bind.py @@ -89,6 +89,7 @@ class SimpleQueueWrapper(object): class PatchOsFork(object): _original_fork = None + _registered_fork_callbacks = False _current_task = None _original_process_run = None @@ -104,13 +105,20 @@ class PatchOsFork(object): # noinspection PyBroadException try: # only once - if cls._original_fork: + if cls._registered_fork_callbacks or cls._original_fork: return - if six.PY2: - cls._original_fork = staticmethod(os.fork) - else: - cls._original_fork = os.fork - os.fork = cls._patched_fork + try: + os.register_at_fork(before=PatchOsFork._fork_callback_before, + after_in_child=PatchOsFork._fork_callback_after_child) + cls._registered_fork_callbacks = True + except Exception: + # python <3.6 + if six.PY2: + cls._original_fork = staticmethod(os.fork) + else: + cls._original_fork = os.fork + os.fork = cls._patched_fork + except Exception: pass @@ -182,10 +190,9 @@ class PatchOsFork(object): pass @staticmethod - def _patched_fork(*args, **kwargs): + def _fork_callback_before(): if not PatchOsFork._current_task: - return PatchOsFork._original_fork(*args, **kwargs) - + return from ..task import Task # ensure deferred is done, but never try to generate a Task object @@ -195,46 +202,63 @@ class PatchOsFork(object): # noinspection PyProtectedMember Task._wait_for_deferred(task) + @staticmethod + def _fork_callback_after_child(): + if not PatchOsFork._current_task: + return + + from ..task import Task + + # force creating a Task + task = Task.current_task() + if not task: + return + + PatchOsFork._current_task = task + # # Hack: now make sure we setup the reporter threads (Log+Reporter) + # noinspection PyProtectedMember + if not bool(task._report_subprocess_enabled): + BackgroundMonitor.start_all(task=task) + + # The signal handler method is Not enough, for the time being, we have both + # even though it makes little sense + # # if we got here patch the os._exit of our instance to call us + def _at_exit_callback(*a_args, **a_kwargs): + # just make sure we flush the internal state (the at exist caught by the external signal does the rest + # in theory we should not have to do any of that, but for some reason if we do not + # the signal is never caught by the signal call backs, not sure why.... + sleep(0.1) + # Since at_exist handlers do not work on forked processes, we have to manually call them here + if task: + try: + # not to worry there is a double _at_exit protection implemented inside task._at_exit() + # noinspection PyProtectedMember + task._at_exit() + except: # noqa + pass + + # noinspection PyProtectedMember, PyUnresolvedReferences + return os._org_exit(*a_args, **a_kwargs) + + if not hasattr(os, '_org_exit'): + # noinspection PyProtectedMember, PyUnresolvedReferences + os._org_exit = os._exit + + os._exit = _at_exit_callback + + + @staticmethod + def _patched_fork(*args, **kwargs): + if not PatchOsFork._current_task: + return PatchOsFork._original_fork(*args, **kwargs) + + PatchOsFork._fork_callback_before() + ret = PatchOsFork._original_fork(*args, **kwargs) if not PatchOsFork._current_task: return ret # Make sure the new process stdout is logged if not ret: - # force creating a Task - task = Task.current_task() - if not task: - return ret - - PatchOsFork._current_task = task - # # Hack: now make sure we setup the reporter threads (Log+Reporter) - # noinspection PyProtectedMember - if not bool(task._report_subprocess_enabled): - BackgroundMonitor.start_all(task=task) - - # The signal handler method is Not enough, for the time being, we have both - # even though it makes little sense - # # if we got here patch the os._exit of our instance to call us - def _at_exit_callback(*a_args, **a_kwargs): - # just make sure we flush the internal state (the at exist caught by the external signal does the rest - # in theory we should not have to do any of that, but for some reason if we do not - # the signal is never caught by the signal call backs, not sure why.... - sleep(0.1) - # Since at_exist handlers do not work on forked processes, we have to manually call them here - if task: - try: - # not to worry there is a double _at_exit protection implemented inside task._at_exit() - # noinspection PyProtectedMember - task._at_exit() - except: # noqa - pass - - # noinspection PyProtectedMember, PyUnresolvedReferences - return os._org_exit(*a_args, **a_kwargs) - - if not hasattr(os, '_org_exit'): - # noinspection PyProtectedMember, PyUnresolvedReferences - os._org_exit = os._exit - - os._exit = _at_exit_callback + PatchOsFork._fork_callback_after_child() return ret