From e1d92e2419a1ea4e387115ae133c2170555cc377 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 6 Mar 2021 00:47:58 +0200 Subject: [PATCH] Improve background reporting speed --- clearml/backend_interface/metrics/reporter.py | 9 +- clearml/backend_interface/task/log.py | 2 + clearml/utilities/process/mp.py | 97 ++++++++++++++++++- 3 files changed, 101 insertions(+), 7 deletions(-) diff --git a/clearml/backend_interface/metrics/reporter.py b/clearml/backend_interface/metrics/reporter.py index f0ee066e..01a76973 100644 --- a/clearml/backend_interface/metrics/reporter.py +++ b/clearml/backend_interface/metrics/reporter.py @@ -50,11 +50,13 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): if isinstance(self._queue, TrQueue): self._write() self._queue = PrQueue() - if isinstance(self._exit_event, TrEvent): + if not isinstance(self._exit_event, SafeEvent): self._exit_event = SafeEvent() super(BackgroundReportService, self).set_subprocess_mode() def stop(self): + if isinstance(self._queue, PrQueue): + self._queue.close(self._event) if not self.is_subprocess() or self.is_subprocess_alive(): self._exit_event.set() super(BackgroundReportService, self).stop() @@ -65,6 +67,8 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): self._event.set() def add_event(self, ev): + if not self._queue: + return self._queue.put(ev) self._queue_size += 1 if self._queue_size >= self._flush_threshold: @@ -98,7 +102,8 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): events.append(self._queue.get()) except Empty: break - + if not events: + return res = self._metrics.write_events( events, async_enable=self._async_enable, storage_uri=self._storage_uri) if self._async_enable: diff --git a/clearml/backend_interface/task/log.py b/clearml/backend_interface/task/log.py index 3969f5e4..7f153e63 100644 --- a/clearml/backend_interface/task/log.py +++ b/clearml/backend_interface/task/log.py @@ -30,6 +30,8 @@ class BackgroundLogService(BackgroundMonitor): self._last_timestamp = 0 def stop(self): + if isinstance(self._queue, PrQueue): + self._queue.close(self._event) super(BackgroundLogService, self).stop() self.flush() diff --git a/clearml/utilities/process/mp.py b/clearml/utilities/process/mp.py index e9decd3b..f44237d1 100644 --- a/clearml/utilities/process/mp.py +++ b/clearml/utilities/process/mp.py @@ -1,12 +1,17 @@ import os -import psutil +import pickle +import struct import sys +from functools import partial from multiprocessing import Process, Lock, Event as ProcessEvent from multiprocessing.pool import ThreadPool from threading import Thread, Event as TrEvent from time import sleep from typing import List, Dict +import psutil +from six.moves.queue import Empty, Queue as TrQueue + from ..py3_interop import AbstractContextManager try: @@ -35,20 +40,102 @@ class SingletonThreadPool(object): class SafeQueue(object): + """ + Many writers Single Reader multiprocessing safe Queue + """ __thread_pool = SingletonThreadPool() def __init__(self, *args, **kwargs): + self._reader_thread = None self._q = SimpleQueue(*args, **kwargs) + # Fix the simple queue write so it uses a single OS write, making it atomic message passing + # noinspection PyBroadException + try: + self._q._writer._send_bytes = partial(SafeQueue._pipe_override_send_bytes, self._q._writer) + except Exception: + pass + self._internal_q = None + self._q_size = 0 def empty(self): - return self._q.empty() + return self._q.empty() and (not self._internal_q or self._internal_q.empty()) - def get(self): - return self._q.get() + def is_pending(self): + # only call from main put process + return self._q_size > 0 or not self.empty() + + def close(self, event): + # wait until all pending requests pushed + while self.is_pending(): + if event: + event.set() + sleep(0.1) + + def get(self, *args, **kwargs): + return self._get_internal_queue(*args, **kwargs) + + def batch_get(self, max_items=1000, timeout=0.2, throttle_sleep=0.1): + buffer = [] + timeout_count = int(timeout/throttle_sleep) + empty_count = timeout_count + while len(buffer) < max_items: + while not self.empty() and len(buffer) < max_items: + try: + buffer.append(self._get_internal_queue(block=False)) + empty_count = 0 + except Empty: + break + empty_count += 1 + if empty_count > timeout_count or len(buffer) >= max_items: + break + sleep(throttle_sleep) + return buffer def put(self, obj): + # GIL will make sure it is atomic + self._q_size += 1 # make sure the block put is done in the thread pool i.e. in the background - SafeQueue.__thread_pool.get().apply_async(self._q.put, args=(obj, )) + obj = pickle.dumps(obj) + self.__thread_pool.get().apply_async(self._q_put, args=(obj, )) + + def _q_put(self, obj): + self._q.put(obj) + # GIL will make sure it is atomic + self._q_size -= 1 + + def _get_internal_queue(self, *args, **kwargs): + if not self._internal_q: + self._internal_q = TrQueue() + if not self._reader_thread: + self._reader_thread = Thread(target=self._reader_daemon) + self._reader_thread.daemon = True + self._reader_thread.start() + obj = self._internal_q.get(*args, **kwargs) + # deserialize + return pickle.loads(obj) + + def _reader_daemon(self): + # pull from process queue and push into thread queue + while True: + # noinspection PyBroadException + try: + obj = self._q.get() + if obj is None: + break + except Exception: + break + self._internal_q.put(obj) + + @staticmethod + def _pipe_override_send_bytes(self, buf): + n = len(buf) + # For wire compatibility with 3.2 and lower + header = struct.pack("!i", n) + # Issue #20540: concatenate before sending, to avoid delays due + # to Nagle's algorithm on a TCP socket. + # Also note we want to avoid sending a 0-length buffer separately, + # to avoid "broken pipe" errors if the other end closed the pipe. + self._send(header + buf) class SafeEvent(object):