Add wait_for_task_creation argument to Task.init() (if False, future object is returned and task creation is done async)

This commit is contained in:
allegroai 2022-04-27 17:02:37 +03:00
parent 24da3e3e08
commit b919874905
2 changed files with 83 additions and 2 deletions

View File

@ -76,6 +76,7 @@ from .utilities.seed import make_deterministic
from .utilities.lowlevel.threads import get_current_thread_id
from .utilities.process.mp import BackgroundMonitor, leave_process
from .utilities.matching import matches_any_wildcard
from .utilities.future_caller import FutureCaller
# noinspection PyProtectedMember
from .backend_interface.task.args import _Arguments
@ -213,8 +214,9 @@ class Task(_Task):
auto_connect_frameworks=True, # type: Union[bool, Mapping[str, bool]]
auto_resource_monitoring=True, # type: bool
auto_connect_streams=True, # type: Union[bool, Mapping[str, bool]]
wait_for_task_init=True, # type: bool
):
# type: (...) -> "Task"
# type: (...) -> Union[Task, FutureCaller[Task]]
"""
Creates a new Task (experiment) if:
@ -416,8 +418,35 @@ class Task(_Task):
auto_connect_streams={'stdout': True, 'stderr': True, 'logging': False}
:return: The main execution Task (Task context).
:param wait_for_task_init: Wait for task to be initialized. If this is set to True, return the task after it was
initialized. If set to False, run the initialization in another thread and return a future that contains the task.
Wait and retrieve the task by calling result() on the returned future.
Note that the task will not capture information until it is initialized.
For example:
.. code-block:: py
task_future = Task.init(project_name='example', task_name='example', wait_for_task_init=False)
# execute some other code
task = task_future.result()
:return: The main execution Task (Task context) or a future to the Task (if wait_for_task_init=False).
"""
if not wait_for_task_init:
return FutureCaller().call(
cls.init,
project_name=project_name,
task_name=task_name,
tags=tags,
reuse_last_task_id=reuse_last_task_id,
continue_last_task=continue_last_task,
output_uri=output_uri,
auto_connect_arg_parser=auto_connect_arg_parser,
auto_connect_frameworks=auto_connect_frameworks,
auto_resource_monitoring=auto_resource_monitoring,
auto_connect_streams=auto_connect_streams,
wait_for_task_init=True,
)
def verify_defaults_match():
validate = [

View File

@ -0,0 +1,52 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Optional
from ..errors import UsageError
class FutureCaller:
"""
FutureCaller is used to call functions async, in another thread.
For example:
.. code-block:: py
future = FutureCaller().call(max, 1, 2)
print('Running other code')
print(future.result()) # will print '2'
"""
def __init__(self):
self._executor = None
self._future = None
def call(self, fn, *args, **kwargs):
# type: (Callable, *Any, **Any) -> FutureCaller
"""
Call fn(*args, **kwargs) in another thread
:return: This FutureCaller instance
"""
self._executor = ThreadPoolExecutor(max_workers=1)
if self._future:
raise UsageError("A function is currently running in this FutureCaller instance")
self._future = self._executor.submit(fn, *args, **kwargs)
return self
def result(self, timeout=None):
# type: (Optional[float]) -> Any
"""
Wait and get the result of the function called with self.call()
:param timeout: The maximum number of seconds to wait for the result. If None,
there is no limit for the wait time.
:return: The result of the called function
"""
if not self._executor:
raise UsageError("No function has been called in this FutureCaller instance")
result_ = self._future.result(timeout=timeout)
self._future = None
self._executor.shutdown(wait=False)
return result_