Add Pipeline/Optimization can be attached to any Task (not just the current task)

This commit is contained in:
allegroai 2020-10-30 09:57:38 +02:00
parent 820fa6ac7a
commit 53a889f869
2 changed files with 12 additions and 11 deletions

View File

@ -43,7 +43,7 @@ class PipelineController(object):
pool_frequency=0.2, # type: float
default_execution_queue=None, # type: Optional[str]
pipeline_time_limit=None, # type: Optional[float]
auto_connect_task=True, # type: bool
auto_connect_task=True, # type: Union[bool, Task]
always_create_task=False, # type: bool
add_pipeline_tags=False, # type: bool
):
@ -56,11 +56,12 @@ class PipelineController(object):
:param float pipeline_time_limit: The maximum time (minutes) for the entire pipeline process. The
default is ``None``, indicating no time limit.
:param bool auto_connect_task: Store pipeline arguments and configuration in the Task
- ``True`` - The pipeline argument and configuration will be stored in the Task. All arguments will
be under the hyper-parameter section as ``opt/<arg>``, and the hyper_parameters will stored in the
Task ``connect_configuration`` (see artifacts/hyper-parameter).
- ``True`` - The pipeline argument and configuration will be stored in the current Task. All arguments will
be under the hyper-parameter section ``Pipeline``, and the pipeline DAG will be stored as a
Task configuration object named ``Pipeline``.
- ``False`` - Do not store with Task.
- ``Task`` - A specific Task object to connect the pipeline with.
:param bool always_create_task: Always create a new Task
- ``True`` - No current Task initialized. Create a new task named ``Pipeline`` in the ``base_task_id``
project.
@ -79,7 +80,7 @@ class PipelineController(object):
self._stop_event = None
self._experiment_created_cb = None
self._add_pipeline_tags = add_pipeline_tags
self._task = Task.current_task()
self._task = auto_connect_task if isinstance(auto_connect_task, Task) else Task.current_task()
self._step_ref_pattern = re.compile(self._step_pattern)
if not self._task and always_create_task:
self._task = Task.init(
@ -91,7 +92,7 @@ class PipelineController(object):
# make sure all the created tasks are our children, as we are creating them
if self._task:
self._task.add_tags([self._tag])
self._auto_connect_task = auto_connect_task
self._auto_connect_task = bool(auto_connect_task)
def add_step(
self,

View File

@ -819,7 +819,7 @@ class HyperParameterOptimizer(object):
execution_queue='default', # type: str
optimization_time_limit=None, # type: Optional[float]
compute_time_limit=None, # type: Optional[float]
auto_connect_task=True, # type: bool
auto_connect_task=True, # type: Union[bool, Task]
always_create_task=False, # type: bool
**optimizer_kwargs # type: Any
):
@ -854,11 +854,11 @@ class HyperParameterOptimizer(object):
The values are:
- ``True`` - The optimization argument and configuration will be stored in the Task. All arguments will
be under the hyper-parameter section as ``opt/<arg>``, and the hyper_parameters will stored in the
Task ``connect_configuration`` (see artifacts/hyper-parameter).
be under the hyper-parameter section ``opt``, and the optimization hyper_parameters space will
stored in the Task configuration object section.
- ``False`` - Do not store with Task.
- ``Task`` - A specific Task object to connect the optimization process with.
:param bool always_create_task: Always create a new Task
The values are:
@ -910,7 +910,7 @@ class HyperParameterOptimizer(object):
"""
# create a new Task, if we do not have one already
self._task = Task.current_task()
self._task = auto_connect_task if isinstance(auto_connect_task, Task) else Task.current_task()
if not self._task and always_create_task:
base_task = Task.get_task(task_id=base_task_id)
self._task = Task.init(