diff --git a/clearml/utilities/process/mp.py b/clearml/utilities/process/mp.py index 39b590ae..257d8d23 100644 --- a/clearml/utilities/process/mp.py +++ b/clearml/utilities/process/mp.py @@ -201,6 +201,7 @@ class SingletonLock(AbstractContextManager): class BackgroundMonitor(object): # If we will need multiple monitoring contexts (i.e. subprocesses) this will become a dict _main_process = None + _main_process_task_id = None _parent_pid = None _sub_process_started = None _instances = {} # type: Dict[int, List[BackgroundMonitor]] @@ -213,6 +214,7 @@ class BackgroundMonitor(object): self._thread = None self._wait_timeout = wait_period self._subprocess = None if task.is_main_task() else False + self._task_id = task.id self._task_obj_id = id(task.id) def start(self): @@ -296,6 +298,7 @@ class BackgroundMonitor(object): cls._parent_pid = os.getpid() cls._sub_process_started = SafeEvent() cls._sub_process_started.clear() + cls._main_process_task_id = task.id # setup for d in BackgroundMonitor._instances.get(id(task.id), []): d.set_subprocess_mode() @@ -381,8 +384,8 @@ class BackgroundMonitor(object): return isinstance(self._thread, Thread) and self._thread.is_alive() @classmethod - def is_subprocess_alive(cls): - if not cls._main_process: + def is_subprocess_alive(cls, task=None): + if not cls._main_process or (task and cls._main_process_task_id != task.id): return False # noinspection PyBroadException try: @@ -405,19 +408,23 @@ class BackgroundMonitor(object): return False def is_subprocess(self): - return self._subprocess is not False and bool(self._main_process) + return self._subprocess is not False and \ + bool(self._main_process) and self._task_id == self._main_process_task_id def _get_instances(self): return self._instances.setdefault(self._task_obj_id, []) @classmethod - def is_subprocess_enabled(cls): - return bool(cls._main_process) + def is_subprocess_enabled(cls, task=None): + return bool(cls._main_process) and (not task or task.id == cls._main_process_task_id) @classmethod def clear_main_process(cls, task): + if BackgroundMonitor._main_process_task_id != task.id: + return cls.wait_for_sub_process(task) BackgroundMonitor._main_process = None + BackgroundMonitor._main_process_task_id = None BackgroundMonitor._parent_pid = None BackgroundMonitor._sub_process_started = None BackgroundMonitor._instances = {} @@ -425,14 +432,14 @@ class BackgroundMonitor(object): @classmethod def wait_for_sub_process(cls, task, timeout=None): - if not cls.is_subprocess_enabled(): + if not cls.is_subprocess_enabled(task=task): return for d in BackgroundMonitor._instances.get(id(task.id), []): d.stop() tic = time() - while cls.is_subprocess_alive() and (not timeout or time()-tic < timeout): + while cls.is_subprocess_alive(task=task) and (not timeout or time()-tic < timeout): sleep(0.03)