Add Task.init() deferred_init argument as experimental feature (Task.init() called in background thread)

Fix previous wait_for_task_init behavior
Add environment variable CLEARML_DEFERRED_TASK_INIT
This commit is contained in:
allegroai 2022-05-05 12:07:45 +03:00
parent 6ce91e5288
commit 556e9b25fe
5 changed files with 137 additions and 62 deletions

View File

@ -17,6 +17,7 @@ ENV_CLEARML_NO_DEFAULT_SERVER = EnvEntry("CLEARML_NO_DEFAULT_SERVER", "TRAINS_NO
ENV_DISABLE_VAULT_SUPPORT = EnvEntry('CLEARML_DISABLE_VAULT_SUPPORT', type=bool) ENV_DISABLE_VAULT_SUPPORT = EnvEntry('CLEARML_DISABLE_VAULT_SUPPORT', type=bool)
ENV_ENABLE_ENV_CONFIG_SECTION = EnvEntry('CLEARML_ENABLE_ENV_CONFIG_SECTION', type=bool) ENV_ENABLE_ENV_CONFIG_SECTION = EnvEntry('CLEARML_ENABLE_ENV_CONFIG_SECTION', type=bool)
ENV_ENABLE_FILES_CONFIG_SECTION = EnvEntry('CLEARML_ENABLE_FILES_CONFIG_SECTION', type=bool) ENV_ENABLE_FILES_CONFIG_SECTION = EnvEntry('CLEARML_ENABLE_FILES_CONFIG_SECTION', type=bool)
ENV_DEFERRED_TASK_INIT = EnvEntry('CLEARML_DEFERRED_TASK_INIT', type=bool)
""" """
Experimental option to set the request method for all API requests and auth login. Experimental option to set the request method for all API requests and auth login.

View File

@ -78,10 +78,18 @@ class PatchOsFork(object):
@staticmethod @staticmethod
def _patched_fork(*args, **kwargs): def _patched_fork(*args, **kwargs):
from ..task import Task
# ensure deferred is done, but never try to generate a Task object
# noinspection PyProtectedMember
task = Task._Task__main_task
# this will force the deferred init call to finish
# noinspection PyProtectedMember
Task._wait_for_deferred(task)
ret = PatchOsFork._original_fork(*args, **kwargs) ret = PatchOsFork._original_fork(*args, **kwargs)
# Make sure the new process stdout is logged # Make sure the new process stdout is logged
if not ret: if not ret:
from ..task import Task
# force creating a Task # force creating a Task
task = Task.current_task() task = Task.current_task()
if task is None: if task is None:

View File

@ -310,6 +310,9 @@ class WeightsFileHandler(object):
if task is None: if task is None:
return saved_path return saved_path
# Make sure that if we have a deferred object it is completed
task.id # noqa
try: try:
WeightsFileHandler._model_store_lookup_lock.acquire() WeightsFileHandler._model_store_lookup_lock.acquire()

View File

@ -28,6 +28,7 @@ from .backend_config.defs import get_active_config_file, get_config_file
from .backend_api.services import tasks, projects from .backend_api.services import tasks, projects
from .backend_api.session.session import ( from .backend_api.session.session import (
Session, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_HOST, ENV_WEB_HOST, ENV_FILES_HOST, ) Session, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_HOST, ENV_WEB_HOST, ENV_FILES_HOST, )
from .backend_api.session.defs import ENV_DEFERRED_TASK_INIT
from .backend_interface.metrics import Metrics from .backend_interface.metrics import Metrics
from .backend_interface.model import Model as BackendModel from .backend_interface.model import Model as BackendModel
from .backend_interface.task import Task as _Task from .backend_interface.task import Task as _Task
@ -214,7 +215,7 @@ class Task(_Task):
auto_connect_frameworks=True, # type: Union[bool, Mapping[str, Union[bool, str, list]]] auto_connect_frameworks=True, # type: Union[bool, Mapping[str, Union[bool, str, list]]]
auto_resource_monitoring=True, # type: bool auto_resource_monitoring=True, # type: bool
auto_connect_streams=True, # type: Union[bool, Mapping[str, bool]] auto_connect_streams=True, # type: Union[bool, Mapping[str, bool]]
wait_for_task_init=True, # type: bool deferred_init=False, # type: bool
): ):
# type: (...) -> Task # type: (...) -> Task
""" """
@ -419,35 +420,28 @@ class Task(_Task):
auto_connect_streams={'stdout': True, 'stderr': True, 'logging': False} auto_connect_streams={'stdout': True, 'stderr': True, 'logging': False}
:param wait_for_task_init: Wait for task to be initialized. If this is set to True, return the task after it was :param deferred_init: (default: False) Wait for Task to be fully initialized (regular behaviour).
initialized. If set to False, run the initialization in another thread and return a future that contains the task.
Wait and retrieve the task by calling result() on the returned future.
Note that the task will not capture information until it is initialized.
For example: ** BETA feature! use with care **
.. code-block:: py If set to True, `Task.init` function returns immediately and all initialization / communication
task_future = Task.init(project_name='example', task_name='example', wait_for_task_init=False) to the clearml-server is running in a background thread. The returned object is
# execute some other code a full proxy to the regular Task object, hence everything will be working as expected.
task = task_future.result() Default behaviour can be controlled with:
`CLEARML_DEFERRED_TASK_INIT=1`
:return: The main execution Task (Task context) or a future to the Task (if wait_for_task_init=False). Notes:
- Any access to the returned proxy `Task` object will essentially wait for the `Task.init`
to be completed. For example: `print(task.name)` will wait for `Task.init` to complete in the
background and then return the `name` property of the task original object
- Before `Task.init` completes in the background, auto-magic logging
(console/metric) might be missed
- If running via an agent, this argument is ignored,
and Task init is called synchronously (default)
:return: The main execution Task (Task context)
""" """
if not wait_for_task_init:
return FutureCaller().call(
cls.init,
project_name=project_name,
task_name=task_name,
tags=tags,
reuse_last_task_id=reuse_last_task_id,
continue_last_task=continue_last_task,
output_uri=output_uri,
auto_connect_arg_parser=auto_connect_arg_parser,
auto_connect_frameworks=auto_connect_frameworks,
auto_resource_monitoring=auto_resource_monitoring,
auto_connect_streams=auto_connect_streams,
wait_for_task_init=True,
)
def verify_defaults_match(): def verify_defaults_match():
validate = [ validate = [
@ -469,7 +463,8 @@ class Task(_Task):
) )
) )
if cls.__main_task is not None: # if deferred_init==0 this means this is the nested call that actually generates the Task.init
if cls.__main_task is not None and deferred_init != 0:
# if this is a subprocess, regardless of what the init was called for, # if this is a subprocess, regardless of what the init was called for,
# we have to fix the main task hooks and stdout bindings # we have to fix the main task hooks and stdout bindings
if cls.__forked_proc_main_pid != os.getpid() and cls.__is_subprocess(): if cls.__forked_proc_main_pid != os.getpid() and cls.__is_subprocess():
@ -542,10 +537,38 @@ class Task(_Task):
task_type, Task.TaskTypes.__members__.keys())) task_type, Task.TaskTypes.__members__.keys()))
task_type = Task.TaskTypes.__members__[str(task_type)] task_type = Task.TaskTypes.__members__[str(task_type)]
is_deferred = False
try: try:
if not running_remotely(): if not running_remotely():
# only allow if running locally and creating the first Task
# otherwise we ignore and perform in order
if deferred_init != 0 and ENV_DEFERRED_TASK_INIT.get():
deferred_init = True
if not is_sub_process_task_id and deferred_init:
def completed_cb(x):
Task.__main_task = x
task = FutureCaller(
func=cls.init,
func_cb=completed_cb,
override_cls=cls,
project_name=project_name,
task_name=task_name,
tags=tags,
reuse_last_task_id=reuse_last_task_id,
continue_last_task=continue_last_task,
output_uri=output_uri,
auto_connect_arg_parser=auto_connect_arg_parser,
auto_connect_frameworks=auto_connect_frameworks,
auto_resource_monitoring=auto_resource_monitoring,
auto_connect_streams=auto_connect_streams,
deferred_init=0, # notice we use it as a flag to mark the nested call
)
is_deferred = True
# mark as temp master
cls.__update_master_pid_task()
# if this is the main process, create the task # if this is the main process, create the task
if not is_sub_process_task_id: elif not is_sub_process_task_id:
task = cls._create_dev_task( task = cls._create_dev_task(
default_project_name=project_name, default_project_name=project_name,
default_task_name=task_name, default_task_name=task_name,
@ -594,10 +617,15 @@ class Task(_Task):
raise raise
else: else:
Task.__main_task = task Task.__main_task = task
# register at exist only on the real (none deferred) Task
if not is_deferred:
# register the main task for at exit hooks (there should only be one) # register the main task for at exit hooks (there should only be one)
task.__register_at_exit(task._at_exit) task.__register_at_exit(task._at_exit)
# always patch OS forking because of ProcessPool and the alike # always patch OS forking because of ProcessPool and the alike
PatchOsFork.patch_fork() PatchOsFork.patch_fork()
if auto_connect_frameworks: if auto_connect_frameworks:
def should_connect(*keys): def should_connect(*keys):
""" """
@ -615,16 +643,16 @@ class Task(_Task):
should_bind_framework = should_bind_framework.get(key, True) should_bind_framework = should_bind_framework.get(key, True)
return bool(should_bind_framework) return bool(should_bind_framework)
if should_connect("hydra"): if not is_deferred and should_connect("hydra"):
PatchHydra.update_current_task(task) PatchHydra.update_current_task(task)
if should_connect("scikit") and should_connect("joblib"): if should_connect("scikit") and should_connect("joblib"):
PatchedJoblib.update_current_task(task) PatchedJoblib.update_current_task(task)
if should_connect("matplotlib"): if should_connect("matplotlib"):
PatchedMatplotlib.update_current_task(Task.__main_task) PatchedMatplotlib.update_current_task(task)
if should_connect("tensorflow") or should_connect("tensorboard"): if should_connect("tensorflow") or should_connect("tensorboard"):
# allow to disable tfdefines # allow disabling tfdefines
if should_connect("tfdefines"): if not is_deferred and should_connect("tfdefines"):
PatchAbsl.update_current_task(Task.__main_task) PatchAbsl.update_current_task(task)
TensorflowBinding.update_current_task( TensorflowBinding.update_current_task(
task, task,
patch_reporting=should_connect("tensorboard"), patch_reporting=should_connect("tensorboard"),
@ -643,6 +671,13 @@ class Task(_Task):
PatchFastai.update_current_task(task) PatchFastai.update_current_task(task)
if should_connect("lightgbm"): if should_connect("lightgbm"):
PatchLIGHTgbmModelIO.update_current_task(task) PatchLIGHTgbmModelIO.update_current_task(task)
cls.__add_model_wildcards(auto_connect_frameworks)
# if we are deferred, stop here (the rest we do in the actual init)
if is_deferred:
return task # noqa
if auto_resource_monitoring and not is_sub_process_task_id: if auto_resource_monitoring and not is_sub_process_task_id:
resource_monitor_cls = auto_resource_monitoring \ resource_monitor_cls = auto_resource_monitoring \
if isinstance(auto_resource_monitoring, six.class_types) else ResourceMonitor if isinstance(auto_resource_monitoring, six.class_types) else ResourceMonitor
@ -650,7 +685,6 @@ class Task(_Task):
task, report_mem_used_per_process=not config.get( task, report_mem_used_per_process=not config.get(
'development.worker.report_global_mem_used', False)) 'development.worker.report_global_mem_used', False))
task._resource_monitor.start() task._resource_monitor.start()
cls.__add_model_wildcards(auto_connect_frameworks)
# make sure all random generators are initialized with new seed # make sure all random generators are initialized with new seed
make_deterministic(task.get_random_seed()) make_deterministic(task.get_random_seed())
@ -3767,6 +3801,21 @@ class Task(_Task):
ret_tasks.extend(res.response.tasks) ret_tasks.extend(res.response.tasks)
return ret_tasks return ret_tasks
@classmethod
def _wait_for_deferred(cls, task):
# type: (Optional[Task]) -> None
"""
Make sure the task object deferred `Task.init` is completed.
Accessing any of the `task` object's property will ensure the Task.init call was also complete
This is an internal utility function
:param task: Optional deferred Task object as returned form Task.init
"""
if not task:
return
# force deferred init to complete
task.id # noqa
@classmethod @classmethod
def __get_hash_key(cls, *args): def __get_hash_key(cls, *args):
def normalize(x): def normalize(x):

View File

@ -1,40 +1,56 @@
from concurrent.futures import ThreadPoolExecutor from threading import Thread
from typing import Any, Callable, Optional from typing import Any, Callable, Optional, Type
from ..errors import UsageError
class FutureCaller: class FutureCaller(object):
""" """
FutureCaller is used to call functions async, in another thread. FutureCaller is used to create a class via a functions async, in another thread.
For example: For example:
.. code-block:: py .. code-block:: py
future = FutureCaller().call(max, 1, 2) future = FutureCaller().call(func=max, func_cb=None, override_cls=None, 1, 2)
print('Running other code') print('Running other code')
print(future.result()) # will print '2' print(future.result()) # will print '2'
""" """
__slots__ = ('__object', '__object_cls', '__executor')
def __init__(self): @property
self._executor = None def __class__(self):
self._future = None return self.__object_cls
def call(self, fn, *args, **kwargs): def __init__(self, func, func_cb, override_cls, *args, **kwargs):
# type: (Callable, *Any, **Any) -> FutureCaller # type: (Callable, Optional[Callable], Type, *Any, **Any) -> None
""" """
Call fn(*args, **kwargs) in another thread __init__(*args, **kwargs) in another thread
:return: This FutureCaller instance :return: This FutureCaller instance
""" """
self._executor = ThreadPoolExecutor(max_workers=1) self.__object = None
if self._future: self.__object_cls = override_cls
raise UsageError("A function is currently running in this FutureCaller instance")
self._future = self._executor.submit(fn, *args, **kwargs)
return self
def result(self, timeout=None): self.__executor = Thread(target=self.__submit__, args=(func, func_cb, args, kwargs))
self.__executor.daemon = True
self.__executor.start()
def __submit__(self, fn, fn_cb, args, kwargs):
self.__object = fn(*args, **kwargs)
if fn_cb is not None:
fn_cb(self.__object)
def __getattr__(self, item):
# if we get here, by definition this is not a __slot__ entry, pass to the object
return getattr(self.__result__(), item)
def __setattr__(self, item, value):
# make sure we can set the slots
if item in ["_FutureCaller__executor", "_FutureCaller__object", "_FutureCaller__object_cls"]:
return super(FutureCaller, self).__setattr__(item, value)
setattr(self.__result__(), item, value)
def __result__(self, timeout=None):
# type: (Optional[float]) -> Any # type: (Optional[float]) -> Any
""" """
Wait and get the result of the function called with self.call() Wait and get the result of the function called with self.call()
@ -44,9 +60,7 @@ class FutureCaller:
:return: The result of the called function :return: The result of the called function
""" """
if not self._executor: if self.__executor:
raise UsageError("No function has been called in this FutureCaller instance") self.__executor.join(timeout=timeout)
result_ = self._future.result(timeout=timeout) self.__executor = None
self._future = None return self.__object
self._executor.shutdown(wait=False)
return result_