From e7de292c1cf2d86f27bc1b76bd5e6e65c810d4aa Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 12 Jun 2021 23:10:26 +0300 Subject: [PATCH] Fix multiprocess spawn context using ProcessFork kills subprocess before parent process ends --- clearml/utilities/process/mp.py | 129 ++++++++++++++++++++------------ 1 file changed, 82 insertions(+), 47 deletions(-) diff --git a/clearml/utilities/process/mp.py b/clearml/utilities/process/mp.py index f26a4315..3f2af17e 100644 --- a/clearml/utilities/process/mp.py +++ b/clearml/utilities/process/mp.py @@ -7,6 +7,7 @@ from multiprocessing import Lock, Event as ProcessEvent from threading import Thread, Event as TrEvent from time import sleep, time from typing import List, Dict, Optional +from multiprocessing import Process, get_context import psutil from six.moves.queue import Empty, Queue as TrQueue @@ -19,9 +20,9 @@ except ImportError: from multiprocessing.queues import SimpleQueue try: - from multiprocessing.context import ForkProcess as Process # noqa + from multiprocessing.context import ForkContext # noqa except ImportError: - from multiprocessing import Process + ForkContext = None class ThreadCalls(object): @@ -374,47 +375,72 @@ class BackgroundMonitor(object): # setup for d in BackgroundMonitor._instances.get(id(task.id), []): d.set_subprocess_mode() + # todo: solve for standalone spawn subprocess - BackgroundMonitor._main_process = Process( - target=cls._background_process_start, - args=(id(task.id), cls._sub_process_started) - ) - BackgroundMonitor._main_process.daemon = True - # Hack allow to create daemon subprocesses (even though python doesn't like it) - un_daemonize = False - # noinspection PyBroadException - try: - from multiprocessing import current_process - if current_process()._config.get('daemon'): # noqa - un_daemonize = current_process()._config.get('daemon') # noqa - current_process()._config['daemon'] = False # noqa - except BaseException: - pass - # try to start the background process, if we fail retry again, or crash - for i in range(4): - try: - BackgroundMonitor._main_process.start() - break - except BaseException: - if i < 3: - sleep(1) - continue - raise - if un_daemonize: - # noinspection PyBroadException - try: - from multiprocessing import current_process - current_process()._config['daemon'] = un_daemonize # noqa - except BaseException: - pass + if ForkContext is not None and isinstance(get_context(), ForkContext): + cls.__start_subprocess_forkprocess(task_obj_id=id(task.id)) + else: + cls.__start_subprocess_os_fork(task_obj_id=id(task.id)) + # wait until subprocess is up if wait_for_subprocess: cls._sub_process_started.wait() @classmethod - def _background_process_start(cls, task_obj_id, event_start=None): - # type: (int, Optional[SafeEvent]) -> None + def __start_subprocess_os_fork(cls, task_obj_id): + process_args = (task_obj_id, cls._sub_process_started, os.getpid()) + BackgroundMonitor._main_process = os.fork() + # check if we are the child process + if BackgroundMonitor._main_process == 0: + # update to the child process pid + BackgroundMonitor._main_process = os.getpid() + cls._background_process_start(*process_args) + # force to leave the subprocess + leave_process(0) + return + + @classmethod + def __start_subprocess_forkprocess(cls, task_obj_id): + _main_process = Process( + target=cls._background_process_start, + args=(task_obj_id, cls._sub_process_started, os.getpid()) + ) + _main_process.daemon = True + # Hack allow to create daemon subprocesses (even though python doesn't like it) + un_daemonize = False + # noinspection PyBroadException + try: + from multiprocessing import current_process + if current_process()._config.get('daemon'): # noqa + un_daemonize = current_process()._config.get('daemon') # noqa + current_process()._config['daemon'] = False # noqa + except BaseException: + pass + # try to start the background process, if we fail retry again, or crash + for i in range(4): + try: + _main_process.start() + break + except BaseException: + if i < 3: + sleep(1) + continue + raise + BackgroundMonitor._main_process = _main_process.pid + if un_daemonize: + # noinspection PyBroadException + try: + from multiprocessing import current_process + current_process()._config['daemon'] = un_daemonize # noqa + except BaseException: + pass + + @classmethod + def _background_process_start(cls, task_obj_id, event_start=None, parent_pid=None): + # type: (int, Optional[SafeEvent], Optional[int]) -> None is_debugger_running = bool(getattr(sys, 'gettrace', None) and sys.gettrace()) + # make sure we update the pid to our own + cls._main_process = os.getpid() # restore original signal, this will prevent any deadlocks # Do not change the exception we need to catch base exception as well # noinspection PyBroadException @@ -445,16 +471,26 @@ class BackgroundMonitor(object): # wait until we are signaled for i in instances: - # noinspection PyBroadException + # DO NOT CHANGE, we need to catch base exception, if the process gte's killed try: - if i._thread and i._thread.is_alive(): - # DO Not change, we need to catch base exception, if the process gte's killed + while i._thread and i._thread.is_alive(): + # noinspection PyBroadException try: - i._thread.join() + p = psutil.Process(parent_pid) + parent_alive = p.is_running() and p.status() != psutil.STATUS_ZOMBIE + except Exception: + parent_alive = False + + # if parent process is not here we should just leave! + if not parent_alive: + return + + # DO NOT CHANGE, we need to catch base exception, if the process gte's killed + try: + # timeout so we can detect if the parent process got killed. + i._thread.join(timeout=30.) except: # noqa break - else: - pass except: # noqa pass # we are done, leave process @@ -473,11 +509,10 @@ class BackgroundMonitor(object): return False # noinspection PyBroadException try: - return \ - cls._main_process.is_alive() and \ - psutil.Process(cls._main_process.pid).status() != psutil.STATUS_ZOMBIE + p = psutil.Process(cls._main_process) + return p.is_running() and p.status() != psutil.STATUS_ZOMBIE except Exception: - current_pid = cls._main_process.pid + current_pid = cls._main_process if not current_pid: return False try: @@ -488,7 +523,7 @@ class BackgroundMonitor(object): for child in parent.children(recursive=True): # kill ourselves last (if we need to) if child.pid == current_pid: - return child.status() != psutil.STATUS_ZOMBIE + return child.is_running() and child.status() != psutil.STATUS_ZOMBIE return False def is_subprocess(self):