diff --git a/clearml/backend_interface/metrics/reporter.py b/clearml/backend_interface/metrics/reporter.py index 9a7c42df..da61baf9 100644 --- a/clearml/backend_interface/metrics/reporter.py +++ b/clearml/backend_interface/metrics/reporter.py @@ -4,6 +4,7 @@ import logging import math from multiprocessing import Semaphore from threading import Event as TrEvent +from time import sleep import numpy as np import six @@ -60,16 +61,21 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): def stop(self): if isinstance(self._queue, PrQueue): self._queue.close(self._event) - if not self.is_subprocess() or self.is_subprocess_alive(): + if not self.is_subprocess_mode() or self.is_subprocess_alive(): self._exit_event.set() super(BackgroundReportService, self).stop() def flush(self): self._queue_size = 0 - if not self.is_subprocess() or self.is_subprocess_alive(): + if not self.is_subprocess_mode() or self.is_subprocess_alive(): self._event.set() def wait_for_events(self, timeout=None): + # noinspection PyProtectedMember + if self._is_subprocess_mode_and_not_parent_process(): + while self._queue and not self._queue.empty(): + sleep(0.1) + return self._empty_state_event.clear() return self._empty_state_event.wait(timeout) @@ -218,7 +224,7 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan return report_service = self._report_service self._report_service = None - if not report_service.is_subprocess() or report_service.is_alive(): + if not report_service.is_subprocess_mode() or report_service.is_alive(): report_service.stop() report_service.wait() else: diff --git a/clearml/backend_interface/task/log.py b/clearml/backend_interface/task/log.py index e90f8310..5483a127 100644 --- a/clearml/backend_interface/task/log.py +++ b/clearml/backend_interface/task/log.py @@ -254,7 +254,7 @@ class TaskHandler(BufferingHandler): _background_log = self._background_log self._background_log = None if _background_log: - if not _background_log.is_subprocess() or _background_log.is_alive(): + if not _background_log.is_subprocess_mode() or _background_log.is_alive(): _background_log.stop() if wait: # noinspection PyBroadException diff --git a/clearml/utilities/process/mp.py b/clearml/utilities/process/mp.py index 8391fbc8..da60410e 100644 --- a/clearml/utilities/process/mp.py +++ b/clearml/utilities/process/mp.py @@ -328,7 +328,7 @@ class BackgroundMonitor(object): if not self._thread: return - if not self.is_subprocess() or self.is_subprocess_alive(): + if not self.is_subprocess_mode() or self.is_subprocess_alive(): self._event.set() if isinstance(self._thread, Thread): @@ -505,7 +505,7 @@ class BackgroundMonitor(object): return def is_alive(self): - if self.is_subprocess(): + if self.is_subprocess_mode(): return self.is_subprocess_alive() and self._thread \ and self._start_ev.is_set() and not self._done_ev.is_set() else: @@ -534,13 +534,16 @@ class BackgroundMonitor(object): return child.is_running() and child.status() != psutil.STATUS_ZOMBIE return False - def is_subprocess(self): + def is_subprocess_mode(self): return self._subprocess is not False and \ bool(self._main_process) and self._task_id == self._main_process_task_id def _get_instances(self): return self._instances.setdefault(self._task_obj_id, []) + def _is_subprocess_mode_and_not_parent_process(self): + return self.is_subprocess_mode() and self._main_process != os.getpid() + @classmethod def is_subprocess_enabled(cls, task=None): return bool(cls._main_process) and (not task or task.id == cls._main_process_task_id)