From 132e76cfb96665ea3b5acb0a7ed684932d9cd0e5 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 21 Feb 2021 14:59:50 +0200 Subject: [PATCH] Fix multiple Task.init()/close() calls in the same process --- clearml/task.py | 7 ++++++- clearml/utilities/process/mp.py | 16 +++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/clearml/task.py b/clearml/task.py index 024da38e..03067c9d 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -1327,6 +1327,7 @@ class Task(_Task): # store is main before we call at_exit, because will will Null it is_main = self.is_main_task() + is_sub_process = self.__is_subprocess() # wait for repository detection (5 minutes should be reasonable time to detect all packages) if self._logger and not self.__is_subprocess(): @@ -1336,6 +1337,11 @@ class Task(_Task): # unregister atexit callbacks and signal hooks, if we are the main task if is_main: self.__register_at_exit(None) + if not is_sub_process: + # make sure we enable multiple Task.init callas with reporting sub-processes + BackgroundMonitor.clear_main_process() + # noinspection PyProtectedMember + Logger._remove_std_logger() def delete(self, delete_artifacts_and_models=True, skip_models_used_by_other_tasks=True, raise_on_error=False): # type: (bool, bool, bool) -> bool @@ -2975,7 +2981,6 @@ class Task(_Task): if self.is_main_task(): Task.__main_task = None except Exception as ex: - import traceback # make sure we do not interrupt the exit process pass diff --git a/clearml/utilities/process/mp.py b/clearml/utilities/process/mp.py index 225c7480..e9decd3b 100644 --- a/clearml/utilities/process/mp.py +++ b/clearml/utilities/process/mp.py @@ -16,7 +16,6 @@ except ImportError: # noqa class SingletonThreadPool(object): - __lock = None __thread_pool = None __thread_pool_pid = None @@ -27,6 +26,13 @@ class SingletonThreadPool(object): cls.__thread_pool_pid = os.getpid() return cls.__thread_pool + @classmethod + def clear(cls): + if cls.__thread_pool: + cls.__thread_pool.close() + cls.__thread_pool = None + cls.__thread_pool_pid = None + class SafeQueue(object): __thread_pool = SingletonThreadPool() @@ -318,3 +324,11 @@ class BackgroundMonitor(object): @classmethod def is_subprocess_enabled(cls): return bool(cls._main_process) + + @classmethod + def clear_main_process(cls): + BackgroundMonitor._main_process = None + BackgroundMonitor._parent_pid = None + BackgroundMonitor._sub_process_started = None + BackgroundMonitor._instances = {} + SingletonThreadPool.clear()