diff --git a/clearml/task.py b/clearml/task.py index 8c67a6dc..52f93651 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -2886,122 +2886,121 @@ class Task(_Task): is_sub_process = self.__is_subprocess() - if True: # not is_sub_process: # todo: remove IF - # noinspection PyBroadException - try: - wait_for_uploads = True - # first thing mark task as stopped, so we will not end up with "running" on lost tasks - # if we are running remotely, the daemon will take care of it - task_status = None - wait_for_std_log = True - if (not running_remotely() or DEBUG_SIMULATE_REMOTE_TASK.get()) \ - and self.is_main_task() and not is_sub_process: - # check if we crashed, ot the signal is not interrupt (manual break) - task_status = ('stopped', ) - if self.__exit_hook: - is_exception = self.__exit_hook.exception - # check if we are running inside a debugger - if not is_exception and sys.modules.get('pydevd'): - # noinspection PyBroadException - try: - is_exception = sys.last_type - except Exception: - pass - - # only if we have an exception (and not ctrl-break) or signal is not SIGTERM / SIGINT - if (is_exception and not isinstance(self.__exit_hook.exception, KeyboardInterrupt)) \ - or (not self.__exit_hook.remote_user_aborted and - self.__exit_hook.signal not in (None, 2, 15)): - task_status = ( - 'failed', - 'Exception {}'.format(is_exception) if is_exception else - 'Signal {}'.format(self.__exit_hook.signal)) - wait_for_uploads = False - else: - wait_for_uploads = (self.__exit_hook.remote_user_aborted or self.__exit_hook.signal is None) - if not self.__exit_hook.remote_user_aborted and self.__exit_hook.signal is None and \ - not is_exception: - task_status = ('completed', ) - else: - task_status = ('stopped', ) - # user aborted. do not bother flushing the stdout logs - wait_for_std_log = self.__exit_hook.signal is not None - - # wait for repository detection (if we didn't crash) - if wait_for_uploads and self._logger: - # we should print summary here - self._summary_artifacts() - # make sure that if we crashed the thread we are not waiting forever - if not is_sub_process: - self._wait_for_repo_detection(timeout=10.) - - # kill the repo thread (negative timeout, do not wait), if it hasn't finished yet. - if not is_sub_process: - self._wait_for_repo_detection(timeout=-1) - - # wait for uploads - print_done_waiting = False - if wait_for_uploads and (BackendModel.get_num_results() > 0 or - (self.__reporter and self.__reporter.events_waiting())): - self.log.info('Waiting to finish uploads') - print_done_waiting = True - # from here, do not send log in background thread - if wait_for_uploads: - self.flush(wait_for_uploads=True) - # wait until the reporter flush everything - if self.__reporter: - self.__reporter.stop() - if self.is_main_task(): - # notice: this will close the reporting for all the Tasks in the system - Metrics.close_async_threads() - # notice: this will close the jupyter monitoring - ScriptInfo.close() - if self.is_main_task(): + # noinspection PyBroadException + try: + wait_for_uploads = True + # first thing mark task as stopped, so we will not end up with "running" on lost tasks + # if we are running remotely, the daemon will take care of it + task_status = None + wait_for_std_log = True + if (not running_remotely() or DEBUG_SIMULATE_REMOTE_TASK.get()) \ + and self.is_main_task() and not is_sub_process: + # check if we crashed, ot the signal is not interrupt (manual break) + task_status = ('stopped', ) + if self.__exit_hook: + is_exception = self.__exit_hook.exception + # check if we are running inside a debugger + if not is_exception and sys.modules.get('pydevd'): # noinspection PyBroadException try: - from .storage.helper import StorageHelper - StorageHelper.close_async_threads() + is_exception = sys.last_type except Exception: pass - if print_done_waiting: - self.log.info('Finished uploading') - # elif self._logger: - # # noinspection PyProtectedMember - # self._logger._flush_stdout_handler() - - # from here, do not check worker status - if self._dev_worker: - self._dev_worker.unregister() - self._dev_worker = None - - # stop resource monitoring - if self._resource_monitor: - self._resource_monitor.stop() - self._resource_monitor = None - - if self._logger: - self._logger.set_flush_period(None) - # noinspection PyProtectedMember - self._logger._close_stdout_handler(wait=wait_for_uploads or wait_for_std_log) + # only if we have an exception (and not ctrl-break) or signal is not SIGTERM / SIGINT + if (is_exception and not isinstance(self.__exit_hook.exception, KeyboardInterrupt)) \ + or (not self.__exit_hook.remote_user_aborted and + self.__exit_hook.signal not in (None, 2, 15)): + task_status = ( + 'failed', + 'Exception {}'.format(is_exception) if is_exception else + 'Signal {}'.format(self.__exit_hook.signal)) + wait_for_uploads = False + else: + wait_for_uploads = (self.__exit_hook.remote_user_aborted or self.__exit_hook.signal is None) + if not self.__exit_hook.remote_user_aborted and self.__exit_hook.signal is None and \ + not is_exception: + task_status = ('completed', ) + else: + task_status = ('stopped', ) + # user aborted. do not bother flushing the stdout logs + wait_for_std_log = self.__exit_hook.signal is not None + # wait for repository detection (if we didn't crash) + if wait_for_uploads and self._logger: + # we should print summary here + self._summary_artifacts() + # make sure that if we crashed the thread we are not waiting forever if not is_sub_process: - # change task status - if not task_status: - pass - elif task_status[0] == 'failed': - self.mark_failed(status_reason=task_status[1]) - elif task_status[0] == 'completed': - self.completed() - elif task_status[0] == 'stopped': - self.stopped() + self._wait_for_repo_detection(timeout=10.) - # this is so in theory we can close a main task and start a new one + # kill the repo thread (negative timeout, do not wait), if it hasn't finished yet. + if not is_sub_process: + self._wait_for_repo_detection(timeout=-1) + + # wait for uploads + print_done_waiting = False + if wait_for_uploads and (BackendModel.get_num_results() > 0 or + (self.__reporter and self.__reporter.events_waiting())): + self.log.info('Waiting to finish uploads') + print_done_waiting = True + # from here, do not send log in background thread + if wait_for_uploads: + self.flush(wait_for_uploads=True) + # wait until the reporter flush everything + if self.__reporter: + self.__reporter.stop() + if self.is_main_task(): + # notice: this will close the reporting for all the Tasks in the system + Metrics.close_async_threads() + # notice: this will close the jupyter monitoring + ScriptInfo.close() if self.is_main_task(): - Task.__main_task = None - except Exception: - # make sure we do not interrupt the exit process - pass + # noinspection PyBroadException + try: + from .storage.helper import StorageHelper + StorageHelper.close_async_threads() + except Exception: + pass + + if print_done_waiting: + self.log.info('Finished uploading') + # elif self._logger: + # # noinspection PyProtectedMember + # self._logger._flush_stdout_handler() + + # from here, do not check worker status + if self._dev_worker: + self._dev_worker.unregister() + self._dev_worker = None + + # stop resource monitoring + if self._resource_monitor: + self._resource_monitor.stop() + self._resource_monitor = None + + if self._logger: + self._logger.set_flush_period(None) + # noinspection PyProtectedMember + self._logger._close_stdout_handler(wait=wait_for_uploads or wait_for_std_log) + + if not is_sub_process: + # change task status + if not task_status: + pass + elif task_status[0] == 'failed': + self.mark_failed(status_reason=task_status[1]) + elif task_status[0] == 'completed': + self.completed() + elif task_status[0] == 'stopped': + self.stopped() + + # this is so in theory we can close a main task and start a new one + if self.is_main_task(): + Task.__main_task = None + except Exception: + # make sure we do not interrupt the exit process + pass # make sure we store last task state if self._offline_mode and not is_sub_process: @@ -3030,6 +3029,7 @@ class Task(_Task): # make sure no one will re-enter the shutdown method self._at_exit_called = True + BackgroundMonitor.wait_for_sub_process() @classmethod def __register_at_exit(cls, exit_callback, only_remove_signal_and_exception_hooks=False): diff --git a/clearml/utilities/process/mp.py b/clearml/utilities/process/mp.py index e414da2e..dd88f4df 100644 --- a/clearml/utilities/process/mp.py +++ b/clearml/utilities/process/mp.py @@ -6,7 +6,7 @@ from functools import partial from multiprocessing import Process, Lock, Event as ProcessEvent from multiprocessing.pool import ThreadPool from threading import Thread, Event as TrEvent -from time import sleep +from time import sleep, time from typing import List, Dict import psutil @@ -415,8 +415,17 @@ class BackgroundMonitor(object): @classmethod def clear_main_process(cls): + cls.wait_for_sub_process() BackgroundMonitor._main_process = None BackgroundMonitor._parent_pid = None BackgroundMonitor._sub_process_started = None BackgroundMonitor._instances = {} SingletonThreadPool.clear() + + @classmethod + def wait_for_sub_process(cls, timeout=None): + if not cls.is_subprocess_enabled(): + return + tic = time() + while cls.is_subprocess_alive() and (not timeout or time()-tic < timeout): + sleep(0.03)