Add draft option to pipeline steps (#1226)

This commit is contained in:
Charles Frankum 2024-03-10 09:25:16 +00:00 committed by GitHub
parent 2022d036a8
commit 2ac637bfc6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -103,6 +103,7 @@ class PipelineController(object):
recursively_parse_parameters = attrib(type=bool, default=False) # if True, recursively parse parameters in
# lists, dicts, or tuples
output_uri = attrib(type=Union[bool, str], default=None) # The default location for output models and other artifacts
draft = attrib(type=bool, default=False) # Specify whether to create the Task as a draft
def __attrs_post_init__(self):
if self.parents is None:
@ -670,7 +671,8 @@ class PipelineController(object):
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
tags=None, # type: Optional[Union[str, Sequence[str]]]
output_uri=None # type: Optional[Union[str, bool]]
output_uri=None, # type: Optional[Union[str, bool]]
draft=False, # type: Optional[bool]
):
# type: (...) -> bool
"""
@ -839,7 +841,7 @@ class PipelineController(object):
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:param output_uri: The storage / output url for this step. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
: param: (default False). If True, the Task will be created as a draft.
:return: True if successful
"""
function_kwargs = function_kwargs or {}
@ -879,7 +881,8 @@ class PipelineController(object):
retry_on_failure=retry_on_failure,
status_change_callback=status_change_callback,
tags=tags,
output_uri=output_uri
output_uri=output_uri,
draft=draft,
)
def start(
@ -1981,7 +1984,8 @@ class PipelineController(object):
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
tags=None, # type: Optional[Union[str, Sequence[str]]]
output_uri=None # type: Optional[Union[str, bool]]
output_uri=None, # type: Optional[Union[str, bool]]
draft=False, # type: Optional[bool]
):
# type: (...) -> bool
"""
@ -2150,6 +2154,7 @@ class PipelineController(object):
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:param output_uri: The storage / output url for this step. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
: param: (default False). If True, the Task will be created as a draft.
:return: True if successful
"""
@ -2258,7 +2263,8 @@ class PipelineController(object):
monitor_models=monitor_models,
job_code_section=job_code_section,
explicit_docker_image=docker,
output_uri=output_uri
output_uri=output_uri,
draft=draft,
)
self._retries[name] = 0
self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \
@ -2373,6 +2379,8 @@ class PipelineController(object):
if task_factory_func_task:
task_factory_func_task.delete(raise_on_error=False)
self._running_nodes.append(node.name)
elif node.draft:
self._running_nodes.append(node.name)
else:
self._running_nodes.append(node.name)
@ -3781,7 +3789,8 @@ class PipelineDecorator(PipelineController):
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
tags=None, # type: Optional[Union[str, Sequence[str]]]
output_uri=None # type: Optional[Union[str, bool]]
output_uri=None, # type: Optional[Union[str, bool]]
draft=False, # type: Optional[bool]
):
# type: (...) -> Callable
"""
@ -3921,6 +3930,7 @@ class PipelineDecorator(PipelineController):
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:param output_uri: The storage / output url for this step. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
: param: (default False). If True, the Task will be created as a draft.
:return: function wrapper
"""
@ -3966,7 +3976,8 @@ class PipelineDecorator(PipelineController):
post_execute_callback=post_execute_callback,
status_change_callback=status_change_callback,
tags=tags,
output_uri=output_uri
output_uri=output_uri,
draft=draft,
)
if cls._singleton: