Wait for subprocess when terminating

This commit is contained in:
allegroai 2021-04-20 18:08:22 +03:00
parent 1c6685e273
commit 4832de5a65
2 changed files with 117 additions and 108 deletions

View File

@ -2886,122 +2886,121 @@ class Task(_Task):
is_sub_process = self.__is_subprocess() is_sub_process = self.__is_subprocess()
if True: # not is_sub_process: # todo: remove IF # noinspection PyBroadException
# noinspection PyBroadException try:
try: wait_for_uploads = True
wait_for_uploads = True # first thing mark task as stopped, so we will not end up with "running" on lost tasks
# first thing mark task as stopped, so we will not end up with "running" on lost tasks # if we are running remotely, the daemon will take care of it
# if we are running remotely, the daemon will take care of it task_status = None
task_status = None wait_for_std_log = True
wait_for_std_log = True if (not running_remotely() or DEBUG_SIMULATE_REMOTE_TASK.get()) \
if (not running_remotely() or DEBUG_SIMULATE_REMOTE_TASK.get()) \ and self.is_main_task() and not is_sub_process:
and self.is_main_task() and not is_sub_process: # check if we crashed, ot the signal is not interrupt (manual break)
# check if we crashed, ot the signal is not interrupt (manual break) task_status = ('stopped', )
task_status = ('stopped', ) if self.__exit_hook:
if self.__exit_hook: is_exception = self.__exit_hook.exception
is_exception = self.__exit_hook.exception # check if we are running inside a debugger
# check if we are running inside a debugger if not is_exception and sys.modules.get('pydevd'):
if not is_exception and sys.modules.get('pydevd'):
# noinspection PyBroadException
try:
is_exception = sys.last_type
except Exception:
pass
# only if we have an exception (and not ctrl-break) or signal is not SIGTERM / SIGINT
if (is_exception and not isinstance(self.__exit_hook.exception, KeyboardInterrupt)) \
or (not self.__exit_hook.remote_user_aborted and
self.__exit_hook.signal not in (None, 2, 15)):
task_status = (
'failed',
'Exception {}'.format(is_exception) if is_exception else
'Signal {}'.format(self.__exit_hook.signal))
wait_for_uploads = False
else:
wait_for_uploads = (self.__exit_hook.remote_user_aborted or self.__exit_hook.signal is None)
if not self.__exit_hook.remote_user_aborted and self.__exit_hook.signal is None and \
not is_exception:
task_status = ('completed', )
else:
task_status = ('stopped', )
# user aborted. do not bother flushing the stdout logs
wait_for_std_log = self.__exit_hook.signal is not None
# wait for repository detection (if we didn't crash)
if wait_for_uploads and self._logger:
# we should print summary here
self._summary_artifacts()
# make sure that if we crashed the thread we are not waiting forever
if not is_sub_process:
self._wait_for_repo_detection(timeout=10.)
# kill the repo thread (negative timeout, do not wait), if it hasn't finished yet.
if not is_sub_process:
self._wait_for_repo_detection(timeout=-1)
# wait for uploads
print_done_waiting = False
if wait_for_uploads and (BackendModel.get_num_results() > 0 or
(self.__reporter and self.__reporter.events_waiting())):
self.log.info('Waiting to finish uploads')
print_done_waiting = True
# from here, do not send log in background thread
if wait_for_uploads:
self.flush(wait_for_uploads=True)
# wait until the reporter flush everything
if self.__reporter:
self.__reporter.stop()
if self.is_main_task():
# notice: this will close the reporting for all the Tasks in the system
Metrics.close_async_threads()
# notice: this will close the jupyter monitoring
ScriptInfo.close()
if self.is_main_task():
# noinspection PyBroadException # noinspection PyBroadException
try: try:
from .storage.helper import StorageHelper is_exception = sys.last_type
StorageHelper.close_async_threads()
except Exception: except Exception:
pass pass
if print_done_waiting: # only if we have an exception (and not ctrl-break) or signal is not SIGTERM / SIGINT
self.log.info('Finished uploading') if (is_exception and not isinstance(self.__exit_hook.exception, KeyboardInterrupt)) \
# elif self._logger: or (not self.__exit_hook.remote_user_aborted and
# # noinspection PyProtectedMember self.__exit_hook.signal not in (None, 2, 15)):
# self._logger._flush_stdout_handler() task_status = (
'failed',
# from here, do not check worker status 'Exception {}'.format(is_exception) if is_exception else
if self._dev_worker: 'Signal {}'.format(self.__exit_hook.signal))
self._dev_worker.unregister() wait_for_uploads = False
self._dev_worker = None else:
wait_for_uploads = (self.__exit_hook.remote_user_aborted or self.__exit_hook.signal is None)
# stop resource monitoring if not self.__exit_hook.remote_user_aborted and self.__exit_hook.signal is None and \
if self._resource_monitor: not is_exception:
self._resource_monitor.stop() task_status = ('completed', )
self._resource_monitor = None else:
task_status = ('stopped', )
if self._logger: # user aborted. do not bother flushing the stdout logs
self._logger.set_flush_period(None) wait_for_std_log = self.__exit_hook.signal is not None
# noinspection PyProtectedMember
self._logger._close_stdout_handler(wait=wait_for_uploads or wait_for_std_log)
# wait for repository detection (if we didn't crash)
if wait_for_uploads and self._logger:
# we should print summary here
self._summary_artifacts()
# make sure that if we crashed the thread we are not waiting forever
if not is_sub_process: if not is_sub_process:
# change task status self._wait_for_repo_detection(timeout=10.)
if not task_status:
pass
elif task_status[0] == 'failed':
self.mark_failed(status_reason=task_status[1])
elif task_status[0] == 'completed':
self.completed()
elif task_status[0] == 'stopped':
self.stopped()
# this is so in theory we can close a main task and start a new one # kill the repo thread (negative timeout, do not wait), if it hasn't finished yet.
if not is_sub_process:
self._wait_for_repo_detection(timeout=-1)
# wait for uploads
print_done_waiting = False
if wait_for_uploads and (BackendModel.get_num_results() > 0 or
(self.__reporter and self.__reporter.events_waiting())):
self.log.info('Waiting to finish uploads')
print_done_waiting = True
# from here, do not send log in background thread
if wait_for_uploads:
self.flush(wait_for_uploads=True)
# wait until the reporter flush everything
if self.__reporter:
self.__reporter.stop()
if self.is_main_task():
# notice: this will close the reporting for all the Tasks in the system
Metrics.close_async_threads()
# notice: this will close the jupyter monitoring
ScriptInfo.close()
if self.is_main_task(): if self.is_main_task():
Task.__main_task = None # noinspection PyBroadException
except Exception: try:
# make sure we do not interrupt the exit process from .storage.helper import StorageHelper
pass StorageHelper.close_async_threads()
except Exception:
pass
if print_done_waiting:
self.log.info('Finished uploading')
# elif self._logger:
# # noinspection PyProtectedMember
# self._logger._flush_stdout_handler()
# from here, do not check worker status
if self._dev_worker:
self._dev_worker.unregister()
self._dev_worker = None
# stop resource monitoring
if self._resource_monitor:
self._resource_monitor.stop()
self._resource_monitor = None
if self._logger:
self._logger.set_flush_period(None)
# noinspection PyProtectedMember
self._logger._close_stdout_handler(wait=wait_for_uploads or wait_for_std_log)
if not is_sub_process:
# change task status
if not task_status:
pass
elif task_status[0] == 'failed':
self.mark_failed(status_reason=task_status[1])
elif task_status[0] == 'completed':
self.completed()
elif task_status[0] == 'stopped':
self.stopped()
# this is so in theory we can close a main task and start a new one
if self.is_main_task():
Task.__main_task = None
except Exception:
# make sure we do not interrupt the exit process
pass
# make sure we store last task state # make sure we store last task state
if self._offline_mode and not is_sub_process: if self._offline_mode and not is_sub_process:
@ -3030,6 +3029,7 @@ class Task(_Task):
# make sure no one will re-enter the shutdown method # make sure no one will re-enter the shutdown method
self._at_exit_called = True self._at_exit_called = True
BackgroundMonitor.wait_for_sub_process()
@classmethod @classmethod
def __register_at_exit(cls, exit_callback, only_remove_signal_and_exception_hooks=False): def __register_at_exit(cls, exit_callback, only_remove_signal_and_exception_hooks=False):

View File

@ -6,7 +6,7 @@ from functools import partial
from multiprocessing import Process, Lock, Event as ProcessEvent from multiprocessing import Process, Lock, Event as ProcessEvent
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from threading import Thread, Event as TrEvent from threading import Thread, Event as TrEvent
from time import sleep from time import sleep, time
from typing import List, Dict from typing import List, Dict
import psutil import psutil
@ -415,8 +415,17 @@ class BackgroundMonitor(object):
@classmethod @classmethod
def clear_main_process(cls): def clear_main_process(cls):
cls.wait_for_sub_process()
BackgroundMonitor._main_process = None BackgroundMonitor._main_process = None
BackgroundMonitor._parent_pid = None BackgroundMonitor._parent_pid = None
BackgroundMonitor._sub_process_started = None BackgroundMonitor._sub_process_started = None
BackgroundMonitor._instances = {} BackgroundMonitor._instances = {}
SingletonThreadPool.clear() SingletonThreadPool.clear()
@classmethod
def wait_for_sub_process(cls, timeout=None):
if not cls.is_subprocess_enabled():
return
tic = time()
while cls.is_subprocess_alive() and (not timeout or time()-tic < timeout):
sleep(0.03)