diff --git a/clearml/backend_interface/logger.py b/clearml/backend_interface/logger.py index 46ad2489..0fe69304 100644 --- a/clearml/backend_interface/logger.py +++ b/clearml/backend_interface/logger.py @@ -1,10 +1,10 @@ import logging import sys -import threading from time import time from ..binding.frameworks import _patched_call # noqa from ..config import running_remotely, config, DEBUG_SIMULATE_REMOTE_TASK +from ..utilities.process.mp import ForkSafeRLock class StdStreamPatch(object): @@ -176,8 +176,8 @@ class PrintPatchLogger(object): Used for capturing and logging stdin and stderr when running in development mode pseudo worker. """ patched = False - lock = threading.Lock() - recursion_protect_lock = threading.RLock() + lock = ForkSafeRLock() + recursion_protect_lock = ForkSafeRLock() cr_flush_period = None def __init__(self, stream, logger=None, level=logging.INFO, load_config_defaults=True): diff --git a/clearml/binding/environ_bind.py b/clearml/binding/environ_bind.py index acb429c2..605c2949 100644 --- a/clearml/binding/environ_bind.py +++ b/clearml/binding/environ_bind.py @@ -76,8 +76,8 @@ class SimpleQueueWrapper(object): def _patched_put(*a_args, **a_kwargs): # make sure we flush everything, because after we push the result we will get terminated try: - task = self.__current_task - task.flush(wait_for_uploads=True) + if self.__current_task and self.__current_task.is_main_task(): + self.__current_task.flush(wait_for_uploads=True) except: # noqa pass return getattr(self.__simple_queue, "put")(*a_args, **a_kwargs) @@ -152,6 +152,7 @@ class PatchOsFork(object): task = None # check if this is Process Pool function + patched_worker = False if hasattr(self, "_target"): # Now we have to patch Pool, because pool terminates subprocess directly after # the return value of the pool worker function is pushed into the queue, @@ -159,19 +160,26 @@ class PatchOsFork(object): try: if self._target == pool.worker: # noqa self._target = partial(PatchOsFork._patched_pool_worker, pool.worker) # noqa + patched_worker = True except: # noqa pass try: return PatchOsFork._original_process_run(self, *args, **kwargs) finally: - # force creating a Task - try: - if task: - # noinspection PyProtectedMember - task._at_exit() - except: # noqa - pass + if task: + try: + if patched_worker: + # remove at exit hooks, we will deadlock when the + # main Pool manager will terminate this process, and it will... + # noinspection PyProtectedMember + task._at_exit_called = True + else: + # terminate the current Task + # noinspection PyProtectedMember + task._at_exit() + except: # noqa + pass @staticmethod def _patched_fork(*args, **kwargs): diff --git a/clearml/utilities/process/mp.py b/clearml/utilities/process/mp.py index af21b755..35180550 100644 --- a/clearml/utilities/process/mp.py +++ b/clearml/utilities/process/mp.py @@ -142,6 +142,10 @@ class ForkSafeRLock(_ForkSafeThreadSyncObject): # Do whatever cleanup. self.release() + def _is_owned(self): + self._create() + return self._sync._is_owned() # noqa + class ForkSemaphore(_ForkSafeThreadSyncObject): def __init__(self, value=1):