diff --git a/clearml/backend_interface/metrics/reporter.py b/clearml/backend_interface/metrics/reporter.py index b9f844ce..f0ee066e 100644 --- a/clearml/backend_interface/metrics/reporter.py +++ b/clearml/backend_interface/metrics/reporter.py @@ -31,12 +31,12 @@ except ImportError: class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): - def __init__(self, use_subprocess, async_enable, metrics, flush_frequency, flush_threshold): - super(BackgroundReportService, self).__init__(wait_period=flush_frequency) - self._subprocess = use_subprocess + def __init__(self, task, async_enable, metrics, flush_frequency, flush_threshold): + super(BackgroundReportService, self).__init__( + task=task, wait_period=flush_frequency) self._flush_threshold = flush_threshold - self._exit_event = SafeEvent() if self._subprocess else TrEvent() - self._queue = PrQueue() if self._subprocess else TrQueue() + self._exit_event = TrEvent() + self._queue = TrQueue() self._queue_size = 0 self._res_waiting = Semaphore() self._metrics = metrics @@ -50,6 +50,8 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): if isinstance(self._queue, TrQueue): self._write() self._queue = PrQueue() + if isinstance(self._exit_event, TrEvent): + self._exit_event = SafeEvent() super(BackgroundReportService, self).set_subprocess_mode() def stop(self): @@ -114,9 +116,6 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): return False return not self._res_waiting.get_value() - def post_execution(self): - super(BackgroundReportService, self).post_execution() - class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncManagerMixin): """ @@ -133,11 +132,12 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan reporter.flush() """ - def __init__(self, metrics, flush_threshold=10, async_enable=False, use_subprocess=False): + def __init__(self, metrics, task, flush_threshold=10, async_enable=False, use_subprocess=False): """ Create a reporter :param metrics: A Metrics manager instance that handles actual reporting, uploads etc. :type metrics: .backend_interface.metrics.Metrics + :param task: Task object :param flush_threshold: Events flush threshold. This determines the threshold over which cached reported events are flushed and sent to the backend. :type flush_threshold: int @@ -152,7 +152,7 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan self._flush_frequency = 30.0 self._max_iteration = 0 self._report_service = BackgroundReportService( - use_subprocess=use_subprocess, async_enable=async_enable, metrics=metrics, + task=task, async_enable=async_enable, metrics=metrics, flush_frequency=self._flush_frequency, flush_threshold=flush_threshold) self._report_service.start() diff --git a/clearml/backend_interface/task/log.py b/clearml/backend_interface/task/log.py index c9549560..31e5d344 100644 --- a/clearml/backend_interface/task/log.py +++ b/clearml/backend_interface/task/log.py @@ -17,10 +17,10 @@ from ...utilities.process.mp import SafeQueue as PrQueue, SafeEvent class BackgroundLogService(BackgroundMonitor): __max_event_size = 1024 * 1024 - def __init__(self, session, wait_period, worker=None, task_id=None, offline_log_filename=None): - super(BackgroundLogService, self).__init__(wait_period=wait_period) + def __init__(self, session, wait_period, worker=None, task=None, offline_log_filename=None): + super(BackgroundLogService, self).__init__(task=task, wait_period=wait_period) self._worker = worker - self._task_id = task_id + self._task_id = task.id self._queue = TrQueue() self._flush = TrEvent() self._last_event = None @@ -192,7 +192,7 @@ class TaskHandler(BufferingHandler): offline_folder.mkdir(parents=True, exist_ok=True) self._offline_log_filename = offline_folder / self.__offline_filename self._background_log = BackgroundLogService( - worker=task.session.worker, task_id=task.id, + worker=task.session.worker, task=task, session=task.session, wait_period=DevWorker.report_period, offline_log_filename=self._offline_log_filename) self._background_log_size = 0 diff --git a/clearml/backend_interface/task/task.py b/clearml/backend_interface/task/task.py index bce9203a..5e6bedd8 100644 --- a/clearml/backend_interface/task/task.py +++ b/clearml/backend_interface/task/task.py @@ -70,7 +70,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): _store_diff = config.get('development.store_uncommitted_code_diff', False) _store_remote_diff = config.get('development.store_code_diff_from_remote', False) - _report_use_subprocess = bool(config.get('development.report_use_subprocess', True)) + _report_subprocess_enabled = config.get('development.report_use_subprocess', True) _offline_filename = 'task.json' class TaskTypes(Enum): @@ -505,7 +505,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): except ValueError: storage_uri = None self.__reporter = Reporter( - self._get_metrics_manager(storage_uri=storage_uri), use_subprocess=self._report_use_subprocess) + metrics=self._get_metrics_manager(storage_uri=storage_uri), task=self) return self.__reporter def _get_output_destination_suffix(self, extra_path=None): diff --git a/clearml/task.py b/clearml/task.py index faad32f6..f63a0d23 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -521,7 +521,7 @@ class Task(_Task): # register the main task for at exit hooks (there should only be one) task.__register_at_exit(task._at_exit) # patch OS forking if we are not logging with a subprocess - if not cls._report_use_subprocess: + if not cls._report_subprocess_enabled: PatchOsFork.patch_fork() if auto_connect_frameworks: is_auto_connect_frameworks_bool = not isinstance(auto_connect_frameworks, dict) @@ -590,7 +590,7 @@ class Task(_Task): # start monitoring in background process or background threads # monitoring are: Resource monitoring and Dev Worker monitoring classes - BackgroundMonitor.start_all(execute_in_subprocess=task._report_use_subprocess) + BackgroundMonitor.start_all(task=task) return task diff --git a/clearml/utilities/process/mp.py b/clearml/utilities/process/mp.py index cde33d08..c3e67fe6 100644 --- a/clearml/utilities/process/mp.py +++ b/clearml/utilities/process/mp.py @@ -5,7 +5,7 @@ from multiprocessing import Process, Lock, Event as ProcessEvent from multiprocessing.pool import ThreadPool from threading import Thread, Event as TrEvent from time import sleep -from typing import List +from typing import List, Dict from ..py3_interop import AbstractContextManager @@ -55,7 +55,7 @@ class SafeEvent(object): return self._event.is_set() def set(self): - if not BackgroundMonitor.is_subprocess() or BackgroundMonitor.is_subprocess_alive(): + if not BackgroundMonitor.is_subprocess_enabled() or BackgroundMonitor.is_subprocess_alive(): self._event.set() # SafeEvent.__thread_pool.get().apply_async(func=self._event.set, args=()) @@ -109,25 +109,30 @@ class BackgroundMonitor(object): _main_process = None _parent_pid = None _sub_process_started = None - _instances = [] # type: List[BackgroundMonitor] + _instances = {} # type: Dict[int, List[BackgroundMonitor]] - def __init__(self, wait_period): + def __init__(self, task, wait_period): self._event = TrEvent() self._done_ev = TrEvent() self._start_ev = TrEvent() self._task_pid = os.getpid() self._thread = None self._wait_timeout = wait_period - self._subprocess = None + self._subprocess = None if task.is_main_task() else False + self._task_obj_id = id(task) def start(self): if not self._thread: self._thread = True self._event.clear() self._done_ev.clear() - # append to instances - if self not in BackgroundMonitor._instances: - BackgroundMonitor._instances.append(self) + if self._subprocess is False: + # start the thread we are in threading mode. + self._start() + else: + # append to instances + if self not in self._get_instances(): + self._get_instances().append(self) def wait(self, timeout=None): if not self._thread: @@ -135,6 +140,9 @@ class BackgroundMonitor(object): self._done_ev.wait(timeout=timeout) def _start(self): + # if we already started do nothing + if isinstance(self._thread, Thread): + return self._thread = Thread(target=self._daemon) self._thread.daemon = True self._thread.start() @@ -148,7 +156,7 @@ class BackgroundMonitor(object): if isinstance(self._thread, Thread): try: - self._instances.remove(self) + self._get_instances().remove(self) except ValueError: pass self._thread = None @@ -169,27 +177,34 @@ class BackgroundMonitor(object): def set_subprocess_mode(self): # called just before launching the daemon in a subprocess - self._subprocess = True - self._done_ev = SafeEvent() - self._start_ev = SafeEvent() - self._event = SafeEvent() + if not self._subprocess: + self._subprocess = True + if not isinstance(self._done_ev, SafeEvent): + self._done_ev = SafeEvent() + if not isinstance(self._start_ev, SafeEvent): + self._start_ev = SafeEvent() + if not isinstance(self._event, SafeEvent): + self._event = SafeEvent() def _daemon_step(self): pass @classmethod - def start_all(cls, execute_in_subprocess, wait_for_subprocess=False): + def start_all(cls, task, wait_for_subprocess=False): + # noinspection PyProtectedMember + execute_in_subprocess = task._report_subprocess_enabled + if not execute_in_subprocess: - for d in BackgroundMonitor._instances: + for d in BackgroundMonitor._instances.get(id(task), []): d._start() elif not BackgroundMonitor._main_process: cls._parent_pid = os.getpid() cls._sub_process_started = SafeEvent() cls._sub_process_started.clear() # setup - for d in BackgroundMonitor._instances: + for d in BackgroundMonitor._instances.get(id(task), []): d.set_subprocess_mode() - BackgroundMonitor._main_process = Process(target=cls._background_process_start) + BackgroundMonitor._main_process = Process(target=cls._background_process_start, args=(id(task), )) BackgroundMonitor._main_process.daemon = True BackgroundMonitor._main_process.start() # wait until subprocess is up @@ -197,7 +212,7 @@ class BackgroundMonitor(object): cls._sub_process_started.wait() @classmethod - def _background_process_start(cls): + def _background_process_start(cls, task_obj_id): is_debugger_running = bool(getattr(sys, 'gettrace', None) and sys.gettrace()) # restore original signal, this will prevent any deadlocks # Do not change the exception we need to catch base exception as well @@ -214,14 +229,14 @@ class BackgroundMonitor(object): sleep(3) # launch all the threads - for d in cls._instances: + for d in cls._instances.get(task_obj_id, []): d._start() if cls._sub_process_started: cls._sub_process_started.set() # wait until we are signaled - for i in BackgroundMonitor._instances: + for i in BackgroundMonitor._instances.get(task_obj_id, []): # noinspection PyBroadException try: if i._thread and i._thread.is_alive(): @@ -268,6 +283,12 @@ class BackgroundMonitor(object): return child.status() != psutil.STATUS_ZOMBIE return False + def is_subprocess(self): + return self._subprocess is not False and bool(self._main_process) + + def _get_instances(self): + return self._instances.setdefault(self._task_obj_id, []) + @classmethod - def is_subprocess(cls): + def is_subprocess_enabled(cls): return bool(cls._main_process) diff --git a/clearml/utilities/resource_monitor.py b/clearml/utilities/resource_monitor.py index c1424346..d97bbea6 100644 --- a/clearml/utilities/resource_monitor.py +++ b/clearml/utilities/resource_monitor.py @@ -23,7 +23,7 @@ class ResourceMonitor(BackgroundMonitor): def __init__(self, task, sample_frequency_per_sec=2., report_frequency_sec=30., first_report_sec=None, wait_for_first_iteration_to_start_sec=180.0, max_wait_for_first_iteration_to_start_sec=1800., report_mem_used_per_process=True): - super(ResourceMonitor, self).__init__(sample_frequency_per_sec) + super(ResourceMonitor, self).__init__(task=task, wait_period=sample_frequency_per_sec) self._task = task self._sample_frequency = sample_frequency_per_sec self._report_frequency = report_frequency_sec