Fix reporter from non main tasks should always use threads (not subprocesses)

This commit is contained in:
allegroai 2021-01-24 09:20:42 +02:00
parent 2676e14d4d
commit 698977d05e
6 changed files with 61 additions and 40 deletions

View File

@ -31,12 +31,12 @@ except ImportError:
class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
def __init__(self, use_subprocess, async_enable, metrics, flush_frequency, flush_threshold): def __init__(self, task, async_enable, metrics, flush_frequency, flush_threshold):
super(BackgroundReportService, self).__init__(wait_period=flush_frequency) super(BackgroundReportService, self).__init__(
self._subprocess = use_subprocess task=task, wait_period=flush_frequency)
self._flush_threshold = flush_threshold self._flush_threshold = flush_threshold
self._exit_event = SafeEvent() if self._subprocess else TrEvent() self._exit_event = TrEvent()
self._queue = PrQueue() if self._subprocess else TrQueue() self._queue = TrQueue()
self._queue_size = 0 self._queue_size = 0
self._res_waiting = Semaphore() self._res_waiting = Semaphore()
self._metrics = metrics self._metrics = metrics
@ -50,6 +50,8 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
if isinstance(self._queue, TrQueue): if isinstance(self._queue, TrQueue):
self._write() self._write()
self._queue = PrQueue() self._queue = PrQueue()
if isinstance(self._exit_event, TrEvent):
self._exit_event = SafeEvent()
super(BackgroundReportService, self).set_subprocess_mode() super(BackgroundReportService, self).set_subprocess_mode()
def stop(self): def stop(self):
@ -114,9 +116,6 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
return False return False
return not self._res_waiting.get_value() return not self._res_waiting.get_value()
def post_execution(self):
super(BackgroundReportService, self).post_execution()
class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncManagerMixin): class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncManagerMixin):
""" """
@ -133,11 +132,12 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan
reporter.flush() reporter.flush()
""" """
def __init__(self, metrics, flush_threshold=10, async_enable=False, use_subprocess=False): def __init__(self, metrics, task, flush_threshold=10, async_enable=False, use_subprocess=False):
""" """
Create a reporter Create a reporter
:param metrics: A Metrics manager instance that handles actual reporting, uploads etc. :param metrics: A Metrics manager instance that handles actual reporting, uploads etc.
:type metrics: .backend_interface.metrics.Metrics :type metrics: .backend_interface.metrics.Metrics
:param task: Task object
:param flush_threshold: Events flush threshold. This determines the threshold over which cached reported events :param flush_threshold: Events flush threshold. This determines the threshold over which cached reported events
are flushed and sent to the backend. are flushed and sent to the backend.
:type flush_threshold: int :type flush_threshold: int
@ -152,7 +152,7 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan
self._flush_frequency = 30.0 self._flush_frequency = 30.0
self._max_iteration = 0 self._max_iteration = 0
self._report_service = BackgroundReportService( self._report_service = BackgroundReportService(
use_subprocess=use_subprocess, async_enable=async_enable, metrics=metrics, task=task, async_enable=async_enable, metrics=metrics,
flush_frequency=self._flush_frequency, flush_threshold=flush_threshold) flush_frequency=self._flush_frequency, flush_threshold=flush_threshold)
self._report_service.start() self._report_service.start()

View File

@ -17,10 +17,10 @@ from ...utilities.process.mp import SafeQueue as PrQueue, SafeEvent
class BackgroundLogService(BackgroundMonitor): class BackgroundLogService(BackgroundMonitor):
__max_event_size = 1024 * 1024 __max_event_size = 1024 * 1024
def __init__(self, session, wait_period, worker=None, task_id=None, offline_log_filename=None): def __init__(self, session, wait_period, worker=None, task=None, offline_log_filename=None):
super(BackgroundLogService, self).__init__(wait_period=wait_period) super(BackgroundLogService, self).__init__(task=task, wait_period=wait_period)
self._worker = worker self._worker = worker
self._task_id = task_id self._task_id = task.id
self._queue = TrQueue() self._queue = TrQueue()
self._flush = TrEvent() self._flush = TrEvent()
self._last_event = None self._last_event = None
@ -192,7 +192,7 @@ class TaskHandler(BufferingHandler):
offline_folder.mkdir(parents=True, exist_ok=True) offline_folder.mkdir(parents=True, exist_ok=True)
self._offline_log_filename = offline_folder / self.__offline_filename self._offline_log_filename = offline_folder / self.__offline_filename
self._background_log = BackgroundLogService( self._background_log = BackgroundLogService(
worker=task.session.worker, task_id=task.id, worker=task.session.worker, task=task,
session=task.session, wait_period=DevWorker.report_period, session=task.session, wait_period=DevWorker.report_period,
offline_log_filename=self._offline_log_filename) offline_log_filename=self._offline_log_filename)
self._background_log_size = 0 self._background_log_size = 0

View File

@ -70,7 +70,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
_store_diff = config.get('development.store_uncommitted_code_diff', False) _store_diff = config.get('development.store_uncommitted_code_diff', False)
_store_remote_diff = config.get('development.store_code_diff_from_remote', False) _store_remote_diff = config.get('development.store_code_diff_from_remote', False)
_report_use_subprocess = bool(config.get('development.report_use_subprocess', True)) _report_subprocess_enabled = config.get('development.report_use_subprocess', True)
_offline_filename = 'task.json' _offline_filename = 'task.json'
class TaskTypes(Enum): class TaskTypes(Enum):
@ -505,7 +505,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
except ValueError: except ValueError:
storage_uri = None storage_uri = None
self.__reporter = Reporter( self.__reporter = Reporter(
self._get_metrics_manager(storage_uri=storage_uri), use_subprocess=self._report_use_subprocess) metrics=self._get_metrics_manager(storage_uri=storage_uri), task=self)
return self.__reporter return self.__reporter
def _get_output_destination_suffix(self, extra_path=None): def _get_output_destination_suffix(self, extra_path=None):

View File

@ -521,7 +521,7 @@ class Task(_Task):
# register the main task for at exit hooks (there should only be one) # register the main task for at exit hooks (there should only be one)
task.__register_at_exit(task._at_exit) task.__register_at_exit(task._at_exit)
# patch OS forking if we are not logging with a subprocess # patch OS forking if we are not logging with a subprocess
if not cls._report_use_subprocess: if not cls._report_subprocess_enabled:
PatchOsFork.patch_fork() PatchOsFork.patch_fork()
if auto_connect_frameworks: if auto_connect_frameworks:
is_auto_connect_frameworks_bool = not isinstance(auto_connect_frameworks, dict) is_auto_connect_frameworks_bool = not isinstance(auto_connect_frameworks, dict)
@ -590,7 +590,7 @@ class Task(_Task):
# start monitoring in background process or background threads # start monitoring in background process or background threads
# monitoring are: Resource monitoring and Dev Worker monitoring classes # monitoring are: Resource monitoring and Dev Worker monitoring classes
BackgroundMonitor.start_all(execute_in_subprocess=task._report_use_subprocess) BackgroundMonitor.start_all(task=task)
return task return task

View File

@ -5,7 +5,7 @@ from multiprocessing import Process, Lock, Event as ProcessEvent
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from threading import Thread, Event as TrEvent from threading import Thread, Event as TrEvent
from time import sleep from time import sleep
from typing import List from typing import List, Dict
from ..py3_interop import AbstractContextManager from ..py3_interop import AbstractContextManager
@ -55,7 +55,7 @@ class SafeEvent(object):
return self._event.is_set() return self._event.is_set()
def set(self): def set(self):
if not BackgroundMonitor.is_subprocess() or BackgroundMonitor.is_subprocess_alive(): if not BackgroundMonitor.is_subprocess_enabled() or BackgroundMonitor.is_subprocess_alive():
self._event.set() self._event.set()
# SafeEvent.__thread_pool.get().apply_async(func=self._event.set, args=()) # SafeEvent.__thread_pool.get().apply_async(func=self._event.set, args=())
@ -109,25 +109,30 @@ class BackgroundMonitor(object):
_main_process = None _main_process = None
_parent_pid = None _parent_pid = None
_sub_process_started = None _sub_process_started = None
_instances = [] # type: List[BackgroundMonitor] _instances = {} # type: Dict[int, List[BackgroundMonitor]]
def __init__(self, wait_period): def __init__(self, task, wait_period):
self._event = TrEvent() self._event = TrEvent()
self._done_ev = TrEvent() self._done_ev = TrEvent()
self._start_ev = TrEvent() self._start_ev = TrEvent()
self._task_pid = os.getpid() self._task_pid = os.getpid()
self._thread = None self._thread = None
self._wait_timeout = wait_period self._wait_timeout = wait_period
self._subprocess = None self._subprocess = None if task.is_main_task() else False
self._task_obj_id = id(task)
def start(self): def start(self):
if not self._thread: if not self._thread:
self._thread = True self._thread = True
self._event.clear() self._event.clear()
self._done_ev.clear() self._done_ev.clear()
if self._subprocess is False:
# start the thread we are in threading mode.
self._start()
else:
# append to instances # append to instances
if self not in BackgroundMonitor._instances: if self not in self._get_instances():
BackgroundMonitor._instances.append(self) self._get_instances().append(self)
def wait(self, timeout=None): def wait(self, timeout=None):
if not self._thread: if not self._thread:
@ -135,6 +140,9 @@ class BackgroundMonitor(object):
self._done_ev.wait(timeout=timeout) self._done_ev.wait(timeout=timeout)
def _start(self): def _start(self):
# if we already started do nothing
if isinstance(self._thread, Thread):
return
self._thread = Thread(target=self._daemon) self._thread = Thread(target=self._daemon)
self._thread.daemon = True self._thread.daemon = True
self._thread.start() self._thread.start()
@ -148,7 +156,7 @@ class BackgroundMonitor(object):
if isinstance(self._thread, Thread): if isinstance(self._thread, Thread):
try: try:
self._instances.remove(self) self._get_instances().remove(self)
except ValueError: except ValueError:
pass pass
self._thread = None self._thread = None
@ -169,27 +177,34 @@ class BackgroundMonitor(object):
def set_subprocess_mode(self): def set_subprocess_mode(self):
# called just before launching the daemon in a subprocess # called just before launching the daemon in a subprocess
if not self._subprocess:
self._subprocess = True self._subprocess = True
if not isinstance(self._done_ev, SafeEvent):
self._done_ev = SafeEvent() self._done_ev = SafeEvent()
if not isinstance(self._start_ev, SafeEvent):
self._start_ev = SafeEvent() self._start_ev = SafeEvent()
if not isinstance(self._event, SafeEvent):
self._event = SafeEvent() self._event = SafeEvent()
def _daemon_step(self): def _daemon_step(self):
pass pass
@classmethod @classmethod
def start_all(cls, execute_in_subprocess, wait_for_subprocess=False): def start_all(cls, task, wait_for_subprocess=False):
# noinspection PyProtectedMember
execute_in_subprocess = task._report_subprocess_enabled
if not execute_in_subprocess: if not execute_in_subprocess:
for d in BackgroundMonitor._instances: for d in BackgroundMonitor._instances.get(id(task), []):
d._start() d._start()
elif not BackgroundMonitor._main_process: elif not BackgroundMonitor._main_process:
cls._parent_pid = os.getpid() cls._parent_pid = os.getpid()
cls._sub_process_started = SafeEvent() cls._sub_process_started = SafeEvent()
cls._sub_process_started.clear() cls._sub_process_started.clear()
# setup # setup
for d in BackgroundMonitor._instances: for d in BackgroundMonitor._instances.get(id(task), []):
d.set_subprocess_mode() d.set_subprocess_mode()
BackgroundMonitor._main_process = Process(target=cls._background_process_start) BackgroundMonitor._main_process = Process(target=cls._background_process_start, args=(id(task), ))
BackgroundMonitor._main_process.daemon = True BackgroundMonitor._main_process.daemon = True
BackgroundMonitor._main_process.start() BackgroundMonitor._main_process.start()
# wait until subprocess is up # wait until subprocess is up
@ -197,7 +212,7 @@ class BackgroundMonitor(object):
cls._sub_process_started.wait() cls._sub_process_started.wait()
@classmethod @classmethod
def _background_process_start(cls): def _background_process_start(cls, task_obj_id):
is_debugger_running = bool(getattr(sys, 'gettrace', None) and sys.gettrace()) is_debugger_running = bool(getattr(sys, 'gettrace', None) and sys.gettrace())
# restore original signal, this will prevent any deadlocks # restore original signal, this will prevent any deadlocks
# Do not change the exception we need to catch base exception as well # Do not change the exception we need to catch base exception as well
@ -214,14 +229,14 @@ class BackgroundMonitor(object):
sleep(3) sleep(3)
# launch all the threads # launch all the threads
for d in cls._instances: for d in cls._instances.get(task_obj_id, []):
d._start() d._start()
if cls._sub_process_started: if cls._sub_process_started:
cls._sub_process_started.set() cls._sub_process_started.set()
# wait until we are signaled # wait until we are signaled
for i in BackgroundMonitor._instances: for i in BackgroundMonitor._instances.get(task_obj_id, []):
# noinspection PyBroadException # noinspection PyBroadException
try: try:
if i._thread and i._thread.is_alive(): if i._thread and i._thread.is_alive():
@ -268,6 +283,12 @@ class BackgroundMonitor(object):
return child.status() != psutil.STATUS_ZOMBIE return child.status() != psutil.STATUS_ZOMBIE
return False return False
def is_subprocess(self):
return self._subprocess is not False and bool(self._main_process)
def _get_instances(self):
return self._instances.setdefault(self._task_obj_id, [])
@classmethod @classmethod
def is_subprocess(cls): def is_subprocess_enabled(cls):
return bool(cls._main_process) return bool(cls._main_process)

View File

@ -23,7 +23,7 @@ class ResourceMonitor(BackgroundMonitor):
def __init__(self, task, sample_frequency_per_sec=2., report_frequency_sec=30., def __init__(self, task, sample_frequency_per_sec=2., report_frequency_sec=30.,
first_report_sec=None, wait_for_first_iteration_to_start_sec=180.0, first_report_sec=None, wait_for_first_iteration_to_start_sec=180.0,
max_wait_for_first_iteration_to_start_sec=1800., report_mem_used_per_process=True): max_wait_for_first_iteration_to_start_sec=1800., report_mem_used_per_process=True):
super(ResourceMonitor, self).__init__(sample_frequency_per_sec) super(ResourceMonitor, self).__init__(task=task, wait_period=sample_frequency_per_sec)
self._task = task self._task = task
self._sample_frequency = sample_frequency_per_sec self._sample_frequency = sample_frequency_per_sec
self._report_frequency = report_frequency_sec self._report_frequency = report_frequency_sec