From d37aa23fbf577513be999bcf966b4d771e1056d8 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 11 Mar 2021 09:42:35 +0200 Subject: [PATCH] Fix spawn logger/reporting --- clearml/backend_interface/metrics/reporter.py | 8 ++++++- clearml/backend_interface/task/log.py | 3 +-- clearml/backend_interface/task/task.py | 8 +++---- clearml/binding/environ_bind.py | 7 +++--- clearml/task.py | 22 ++++++++++++++----- clearml/utilities/process/mp.py | 3 ++- 6 files changed, 33 insertions(+), 18 deletions(-) diff --git a/clearml/backend_interface/metrics/reporter.py b/clearml/backend_interface/metrics/reporter.py index 01a76973..b9d0dbd4 100644 --- a/clearml/backend_interface/metrics/reporter.py +++ b/clearml/backend_interface/metrics/reporter.py @@ -119,7 +119,10 @@ class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin): return True if not self.is_alive(): return False - return not self._res_waiting.get_value() + try: + return not self._res_waiting.get_value() + except NotImplementedError: + return self.get_num_results() > 0 class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncManagerMixin): @@ -207,6 +210,9 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan else: report_service.send_all_events() + def is_alive(self): + return self._report_service and self._report_service.is_alive() + def get_num_results(self): return self._report_service.get_num_results() diff --git a/clearml/backend_interface/task/log.py b/clearml/backend_interface/task/log.py index 7f153e63..9a2d3184 100644 --- a/clearml/backend_interface/task/log.py +++ b/clearml/backend_interface/task/log.py @@ -92,8 +92,7 @@ class BackgroundLogService(BackgroundMonitor): while self._queue and not self._queue.empty(): # noinspection PyBroadException try: - # request = self._queue.get(block=False) - request = self._queue.get() + request = self._queue.get(block=False) if request: buffer.append(request) except Exception: diff --git a/clearml/backend_interface/task/task.py b/clearml/backend_interface/task/task.py index 62a85bf6..be793d4a 100644 --- a/clearml/backend_interface/task/task.py +++ b/clearml/backend_interface/task/task.py @@ -70,7 +70,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): _store_diff = config.get('development.store_uncommitted_code_diff', False) _store_remote_diff = config.get('development.store_code_diff_from_remote', False) - _report_subprocess_enabled = config.get('development.report_use_subprocess', True) + _report_subprocess_enabled = config.get('development.report_use_subprocess', sys.platform == 'linux') _offline_filename = 'task.json' class TaskTypes(Enum): @@ -1607,7 +1607,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): @classmethod def add_requirements(cls, package_name, package_version=None): - # type: (str, Optional[str]) -> () + # type: (str, Optional[str]) -> None """ Force the adding of a package to the requirements list. If ``package_version`` is None, use the installed package version, if found. @@ -2091,7 +2091,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): @classmethod def __update_master_pid_task(cls, pid=None, task=None): - # type: (Optional[int], Union[str, Task]) -> () + # type: (Optional[int], Union[str, Task]) -> None pid = pid or os.getpid() if not task: PROC_MASTER_ID_ENV_VAR.set(str(pid) + ':') @@ -2122,7 +2122,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): @classmethod def set_offline(cls, offline_mode=False): - # type: (bool) -> () + # type: (bool) -> None """ Set offline mode, where all data and logs are stored into local folder, for later transmission diff --git a/clearml/binding/environ_bind.py b/clearml/binding/environ_bind.py index af52343c..a4a8b863 100644 --- a/clearml/binding/environ_bind.py +++ b/clearml/binding/environ_bind.py @@ -3,6 +3,7 @@ import os import six from ..config import TASK_LOG_ENVIRONMENT, running_remotely, config +from ..utilities.process.mp import BackgroundMonitor class EnvironmentBind(object): @@ -83,10 +84,8 @@ class PatchOsFork(object): task = Task.init(project_name=None, task_name=None, task_type=None) task.get_logger().flush() - # Hack: now make sure we setup the reporter thread - - # noinspection PyProtectedMember - task._setup_reporter() + # Hack: now make sure we setup the reporter threads (Log+Reporter) + BackgroundMonitor.start_all(task=task) # TODO: Check if the signal handler method is enough, for the time being, we have both # # if we got here patch the os._exit of our instance to call us diff --git a/clearml/task.py b/clearml/task.py index 57aa050f..c2de0491 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -45,7 +45,7 @@ from .binding.frameworks.xgboost_bind import PatchXGBoostModelIO from .binding.joblib_bind import PatchedJoblib from .binding.matplotlib_bind import PatchedMatplotlib from .binding.hydra_bind import PatchHydra -from .config import config, DEV_TASK_NO_REUSE, get_is_master_node, DEBUG_SIMULATE_REMOTE_TASK +from .config import config, DEV_TASK_NO_REUSE, get_is_master_node, DEBUG_SIMULATE_REMOTE_TASK, PROC_MASTER_ID_ENV_VAR from .config import running_remotely, get_remote_task_id from .config.cache import SessionCache from .debugging.log import LoggerRoot @@ -179,6 +179,11 @@ class Task(_Task): :return: The current running Task (experiment). """ + # check if we have no main Task, but the main process created one. + if not cls.__main_task and PROC_MASTER_ID_ENV_VAR.get(): + # initialize the Task, connect to stdout + Task.init() + # return main Task return cls.__main_task @classmethod @@ -433,7 +438,7 @@ class Task(_Task): is_sub_process_task_id = None # check that we are not a child process, in that case do nothing. - # we should not get here unless this is Windows platform, all others support fork + # we should not get here unless this is Windows/macOS platform, linux support fork if cls.__is_subprocess(): class _TaskStub(object): def __call__(self, *args, **kwargs): @@ -588,6 +593,10 @@ class Task(_Task): # something to the log. task._dev_mode_task_start() + if (not task._reporter or not task._reporter.is_alive()) and \ + is_sub_process_task_id and not cls._report_subprocess_enabled: + task._setup_reporter() + # start monitoring in background process or background threads # monitoring are: Resource monitoring and Dev Worker monitoring classes BackgroundMonitor.start_all(task=task) @@ -2159,7 +2168,7 @@ class Task(_Task): secret=None, store_conf_file=False ): - # type: (Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], bool) -> () + # type: (Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], bool) -> None """ Set new default **ClearML Server** (backend) host and credentials. @@ -2606,7 +2615,7 @@ class Task(_Task): try: if 'IPython' in sys.modules: # noinspection PyPackageRequirements - from IPython import get_ipython + from IPython import get_ipython # noqa ip = get_ipython() if ip is not None and 'IPKernelApp' in ip.config: return parser @@ -2868,7 +2877,7 @@ class Task(_Task): is_sub_process = self.__is_subprocess() - if not is_sub_process: + if True:##not is_sub_process: # noinspection PyBroadException try: wait_for_uploads = True @@ -2918,7 +2927,8 @@ class Task(_Task): self._wait_for_repo_detection(timeout=10.) # kill the repo thread (negative timeout, do not wait), if it hasn't finished yet. - self._wait_for_repo_detection(timeout=-1) + if not is_sub_process: + self._wait_for_repo_detection(timeout=-1) # wait for uploads print_done_waiting = False diff --git a/clearml/utilities/process/mp.py b/clearml/utilities/process/mp.py index f44237d1..e414da2e 100644 --- a/clearml/utilities/process/mp.py +++ b/clearml/utilities/process/mp.py @@ -228,7 +228,7 @@ class BackgroundMonitor(object): self._get_instances().append(self) def wait(self, timeout=None): - if not self._thread: + if not self._done_ev: return self._done_ev.wait(timeout=timeout) @@ -264,6 +264,7 @@ class BackgroundMonitor(object): self._start_ev.set() self.daemon() self.post_execution() + self._thread = None def post_execution(self): self._done_ev.set()