From 1d652e3eb67b5685bc66d863db62139345deecc7 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Wed, 22 Dec 2021 13:29:03 +0200 Subject: [PATCH] Fix main process's reporting subprocess lost, switch back to thread mode --- clearml/backend_interface/metrics/reporter.py | 26 ++++++++++- clearml/backend_interface/task/log.py | 17 +++++++ clearml/utilities/process/mp.py | 45 ++++++++++++++++++- 3 files changed, 86 insertions(+), 2 deletions(-) diff --git a/clearml/backend_interface/metrics/reporter.py b/clearml/backend_interface/metrics/reporter.py index 4d0f367d..2f30a81c 100644 --- a/clearml/backend_interface/metrics/reporter.py +++ b/clearml/backend_interface/metrics/reporter.py @@ -94,6 +94,26 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): def add_event(self, ev): if not self._queue: return + # check that we did not loose the reporter sub-process + if self.is_subprocess_mode() and not self._fast_is_subprocess_alive(): + # we lost the reporting subprocess, let's switch to thread mode + # gel all data, work on local queue: + self._write() + # replace queue: + self._queue = TrQueue() + self._queue_size = 0 + self._event = TrEvent() + self._done_ev = TrEvent() + self._start_ev = TrEvent() + self._exit_event = TrEvent() + self._empty_state_event = TrEvent() + self._res_waiting = Semaphore() + # set thread mode + self._subprocess = False + # start background thread + self._thread = None + self._start() + self._queue.put(ev) self._queue_size += 1 if self._queue_size >= self._flush_threshold: @@ -103,12 +123,16 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): while not self._exit_event.wait(0): self._event.wait(self._wait_timeout) self._event.clear() + # lock state self._res_waiting.acquire() self._write() # wait for all reports if self.get_num_results() > 0: self.wait_for_results() - self._empty_state_event.set() + # set empty flag only if we are not waiting for exit signal + if not self._exit_event.wait(0): + self._empty_state_event.set() + # unlock state self._res_waiting.release() # make sure we flushed everything self._async_enable = False diff --git a/clearml/backend_interface/task/log.py b/clearml/backend_interface/task/log.py index 5483a127..94b28c3c 100644 --- a/clearml/backend_interface/task/log.py +++ b/clearml/backend_interface/task/log.py @@ -83,6 +83,23 @@ class BackgroundLogService(BackgroundMonitor): self._flush = SafeEvent() def add_to_queue(self, record): + # check that we did not loose the reporter sub-process + if self.is_subprocess_mode() and not self._fast_is_subprocess_alive(): + # we lost the reporting subprocess, let's switch to thread mode + # gel all data, work on local queue: + self.send_all_records() + # replace queue: + self._queue = TrQueue() + self._flush = TrEvent() + self._event = TrEvent() + self._done_ev = TrEvent() + self._start_ev = TrEvent() + # set thread mode + self._subprocess = False + # start background thread + self._thread = None + self._start() + self._queue.put(record) def empty(self): diff --git a/clearml/utilities/process/mp.py b/clearml/utilities/process/mp.py index 933e9755..eca564ca 100644 --- a/clearml/utilities/process/mp.py +++ b/clearml/utilities/process/mp.py @@ -4,7 +4,7 @@ import struct import sys from functools import partial from multiprocessing import Lock, Event as ProcessEvent -from threading import Thread, Event as TrEvent +from threading import Thread, Event as TrEvent, RLock as ThreadRLock from time import sleep, time from typing import List, Dict, Optional from multiprocessing import Process @@ -33,6 +33,39 @@ except ImportError: return False +class ForkSafeRLock(object): + def __init__(self): + self._lock = None + self._instance_pid = None + + def acquire(self, *args, **kwargs): + self.create() + return self._lock.acquire(*args, **kwargs) + + def release(self, *args, **kwargs): + if self._lock is None: + return None + return self._lock.release(*args, **kwargs) + + def create(self): + # this part is not atomic, and there is not a lot we can do about it. + if self._instance_pid != os.getpid() or not self._lock: + self._lock = ThreadRLock() + self._instance_pid = os.getpid() + + def __enter__(self): + """Return `self` upon entering the runtime context.""" + self.acquire() + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Raise any exception triggered within the runtime context.""" + # Do whatever cleanup. + self.release() + if any((exc_type, exc_value, traceback,)): + raise (exc_type, exc_value, traceback) + + class ThreadCalls(object): def __init__(self): self._queue = TrQueue() @@ -529,6 +562,16 @@ class BackgroundMonitor(object): else: return isinstance(self._thread, Thread) and self._thread.is_alive() + @classmethod + def _fast_is_subprocess_alive(cls): + if not cls._main_process: + return False + # noinspection PyBroadException + try: + return psutil.pid_exists(cls._main_process) + except Exception: + return False + @classmethod def is_subprocess_alive(cls, task=None): if not cls._main_process or (task and cls._main_process_task_id != task.id):