Fix main process's reporting subprocess lost, switch back to thread mode

This commit is contained in:
allegroai 2021-12-22 13:29:03 +02:00
parent d9aee85821
commit 1d652e3eb6
3 changed files with 86 additions and 2 deletions

View File

@ -94,6 +94,26 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
def add_event(self, ev):
if not self._queue:
return
# check that we did not loose the reporter sub-process
if self.is_subprocess_mode() and not self._fast_is_subprocess_alive():
# we lost the reporting subprocess, let's switch to thread mode
# gel all data, work on local queue:
self._write()
# replace queue:
self._queue = TrQueue()
self._queue_size = 0
self._event = TrEvent()
self._done_ev = TrEvent()
self._start_ev = TrEvent()
self._exit_event = TrEvent()
self._empty_state_event = TrEvent()
self._res_waiting = Semaphore()
# set thread mode
self._subprocess = False
# start background thread
self._thread = None
self._start()
self._queue.put(ev)
self._queue_size += 1
if self._queue_size >= self._flush_threshold:
@ -103,12 +123,16 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
while not self._exit_event.wait(0):
self._event.wait(self._wait_timeout)
self._event.clear()
# lock state
self._res_waiting.acquire()
self._write()
# wait for all reports
if self.get_num_results() > 0:
self.wait_for_results()
self._empty_state_event.set()
# set empty flag only if we are not waiting for exit signal
if not self._exit_event.wait(0):
self._empty_state_event.set()
# unlock state
self._res_waiting.release()
# make sure we flushed everything
self._async_enable = False

View File

@ -83,6 +83,23 @@ class BackgroundLogService(BackgroundMonitor):
self._flush = SafeEvent()
def add_to_queue(self, record):
# check that we did not loose the reporter sub-process
if self.is_subprocess_mode() and not self._fast_is_subprocess_alive():
# we lost the reporting subprocess, let's switch to thread mode
# gel all data, work on local queue:
self.send_all_records()
# replace queue:
self._queue = TrQueue()
self._flush = TrEvent()
self._event = TrEvent()
self._done_ev = TrEvent()
self._start_ev = TrEvent()
# set thread mode
self._subprocess = False
# start background thread
self._thread = None
self._start()
self._queue.put(record)
def empty(self):

View File

@ -4,7 +4,7 @@ import struct
import sys
from functools import partial
from multiprocessing import Lock, Event as ProcessEvent
from threading import Thread, Event as TrEvent
from threading import Thread, Event as TrEvent, RLock as ThreadRLock
from time import sleep, time
from typing import List, Dict, Optional
from multiprocessing import Process
@ -33,6 +33,39 @@ except ImportError:
return False
class ForkSafeRLock(object):
def __init__(self):
self._lock = None
self._instance_pid = None
def acquire(self, *args, **kwargs):
self.create()
return self._lock.acquire(*args, **kwargs)
def release(self, *args, **kwargs):
if self._lock is None:
return None
return self._lock.release(*args, **kwargs)
def create(self):
# this part is not atomic, and there is not a lot we can do about it.
if self._instance_pid != os.getpid() or not self._lock:
self._lock = ThreadRLock()
self._instance_pid = os.getpid()
def __enter__(self):
"""Return `self` upon entering the runtime context."""
self.acquire()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Raise any exception triggered within the runtime context."""
# Do whatever cleanup.
self.release()
if any((exc_type, exc_value, traceback,)):
raise (exc_type, exc_value, traceback)
class ThreadCalls(object):
def __init__(self):
self._queue = TrQueue()
@ -529,6 +562,16 @@ class BackgroundMonitor(object):
else:
return isinstance(self._thread, Thread) and self._thread.is_alive()
@classmethod
def _fast_is_subprocess_alive(cls):
if not cls._main_process:
return False
# noinspection PyBroadException
try:
return psutil.pid_exists(cls._main_process)
except Exception:
return False
@classmethod
def is_subprocess_alive(cls, task=None):
if not cls._main_process or (task and cls._main_process_task_id != task.id):