Improve background reporting speed

This commit is contained in:
allegroai 2021-03-06 00:47:58 +02:00
parent 4106aa9c49
commit e1d92e2419
3 changed files with 101 additions and 7 deletions

View File

@ -50,11 +50,13 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
if isinstance(self._queue, TrQueue):
self._write()
self._queue = PrQueue()
if isinstance(self._exit_event, TrEvent):
if not isinstance(self._exit_event, SafeEvent):
self._exit_event = SafeEvent()
super(BackgroundReportService, self).set_subprocess_mode()
def stop(self):
if isinstance(self._queue, PrQueue):
self._queue.close(self._event)
if not self.is_subprocess() or self.is_subprocess_alive():
self._exit_event.set()
super(BackgroundReportService, self).stop()
@ -65,6 +67,8 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
self._event.set()
def add_event(self, ev):
if not self._queue:
return
self._queue.put(ev)
self._queue_size += 1
if self._queue_size >= self._flush_threshold:
@ -98,7 +102,8 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
events.append(self._queue.get())
except Empty:
break
if not events:
return
res = self._metrics.write_events(
events, async_enable=self._async_enable, storage_uri=self._storage_uri)
if self._async_enable:

View File

@ -30,6 +30,8 @@ class BackgroundLogService(BackgroundMonitor):
self._last_timestamp = 0
def stop(self):
if isinstance(self._queue, PrQueue):
self._queue.close(self._event)
super(BackgroundLogService, self).stop()
self.flush()

View File

@ -1,12 +1,17 @@
import os
import psutil
import pickle
import struct
import sys
from functools import partial
from multiprocessing import Process, Lock, Event as ProcessEvent
from multiprocessing.pool import ThreadPool
from threading import Thread, Event as TrEvent
from time import sleep
from typing import List, Dict
import psutil
from six.moves.queue import Empty, Queue as TrQueue
from ..py3_interop import AbstractContextManager
try:
@ -35,20 +40,102 @@ class SingletonThreadPool(object):
class SafeQueue(object):
"""
Many writers Single Reader multiprocessing safe Queue
"""
__thread_pool = SingletonThreadPool()
def __init__(self, *args, **kwargs):
self._reader_thread = None
self._q = SimpleQueue(*args, **kwargs)
# Fix the simple queue write so it uses a single OS write, making it atomic message passing
# noinspection PyBroadException
try:
self._q._writer._send_bytes = partial(SafeQueue._pipe_override_send_bytes, self._q._writer)
except Exception:
pass
self._internal_q = None
self._q_size = 0
def empty(self):
return self._q.empty()
return self._q.empty() and (not self._internal_q or self._internal_q.empty())
def get(self):
return self._q.get()
def is_pending(self):
# only call from main put process
return self._q_size > 0 or not self.empty()
def close(self, event):
# wait until all pending requests pushed
while self.is_pending():
if event:
event.set()
sleep(0.1)
def get(self, *args, **kwargs):
return self._get_internal_queue(*args, **kwargs)
def batch_get(self, max_items=1000, timeout=0.2, throttle_sleep=0.1):
buffer = []
timeout_count = int(timeout/throttle_sleep)
empty_count = timeout_count
while len(buffer) < max_items:
while not self.empty() and len(buffer) < max_items:
try:
buffer.append(self._get_internal_queue(block=False))
empty_count = 0
except Empty:
break
empty_count += 1
if empty_count > timeout_count or len(buffer) >= max_items:
break
sleep(throttle_sleep)
return buffer
def put(self, obj):
# GIL will make sure it is atomic
self._q_size += 1
# make sure the block put is done in the thread pool i.e. in the background
SafeQueue.__thread_pool.get().apply_async(self._q.put, args=(obj, ))
obj = pickle.dumps(obj)
self.__thread_pool.get().apply_async(self._q_put, args=(obj, ))
def _q_put(self, obj):
self._q.put(obj)
# GIL will make sure it is atomic
self._q_size -= 1
def _get_internal_queue(self, *args, **kwargs):
if not self._internal_q:
self._internal_q = TrQueue()
if not self._reader_thread:
self._reader_thread = Thread(target=self._reader_daemon)
self._reader_thread.daemon = True
self._reader_thread.start()
obj = self._internal_q.get(*args, **kwargs)
# deserialize
return pickle.loads(obj)
def _reader_daemon(self):
# pull from process queue and push into thread queue
while True:
# noinspection PyBroadException
try:
obj = self._q.get()
if obj is None:
break
except Exception:
break
self._internal_q.put(obj)
@staticmethod
def _pipe_override_send_bytes(self, buf):
n = len(buf)
# For wire compatibility with 3.2 and lower
header = struct.pack("!i", n)
# Issue #20540: concatenate before sending, to avoid delays due
# to Nagle's algorithm on a TCP socket.
# Also note we want to avoid sending a 0-length buffer separately,
# to avoid "broken pipe" errors if the other end closed the pipe.
self._send(header + buf)
class SafeEvent(object):