Add tags to pipeline steps and controller

This commit is contained in:
Alex Burlacu 2023-04-03 18:35:11 +03:00
parent 03fdf0807f
commit 4ad4939feb

View File

@ -607,7 +607,7 @@ class PipelineController(object):
docker=None, # type: Optional[str] docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str] docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str] docker_bash_setup_script=None, # type: Optional[str]
parents=None, # type: Optional[Sequence[str]], parents=None, # type: Optional[Sequence[str]]
execution_queue=None, # type: Optional[str] execution_queue=None, # type: Optional[str]
monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
@ -618,7 +618,8 @@ class PipelineController(object):
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
cache_executed_step=False, # type: bool cache_executed_step=False, # type: bool
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
status_change_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
tags=None # type: Optional[Union[str, Sequence[str]]]
): ):
# type: (...) -> bool # type: (...) -> bool
""" """
@ -782,6 +783,9 @@ class PipelineController(object):
): ):
pass pass
:param tags: A list of tags for the specific pipeline step.
When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:return: True if successful :return: True if successful
""" """
function_kwargs = function_kwargs or {} function_kwargs = function_kwargs or {}
@ -819,7 +823,8 @@ class PipelineController(object):
post_execute_callback=post_execute_callback, post_execute_callback=post_execute_callback,
cache_executed_step=cache_executed_step, cache_executed_step=cache_executed_step,
retry_on_failure=retry_on_failure, retry_on_failure=retry_on_failure,
status_change_callback=status_change_callback status_change_callback=status_change_callback,
tags=tags
) )
def start( def start(
@ -1286,6 +1291,23 @@ class PipelineController(object):
""" """
return self._pipeline_args return self._pipeline_args
@property
def tags(self):
# type: () -> List[str]
return self._task.get_tags() or []
def add_tags(self, tags):
# type: (Union[Sequence[str], str]) -> None
"""
Add tags to this pipeline. Old tags are not deleted.
When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:param tags: A list of tags for this pipeline.
"""
if not self._task:
return # should not actually happen
self._task.add_tags(tags)
def _create_task_from_function( def _create_task_from_function(
self, docker, docker_args, docker_bash_setup_script, self, docker, docker_args, docker_bash_setup_script,
function, function_input_artifacts, function_kwargs, function_return, function, function_input_artifacts, function_kwargs, function_return,
@ -1742,7 +1764,7 @@ class PipelineController(object):
docker=None, # type: Optional[str] docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str] docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str] docker_bash_setup_script=None, # type: Optional[str]
parents=None, # type: Optional[Sequence[str]], parents=None, # type: Optional[Sequence[str]]
execution_queue=None, # type: Optional[str] execution_queue=None, # type: Optional[str]
monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
@ -1753,7 +1775,8 @@ class PipelineController(object):
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
cache_executed_step=False, # type: bool cache_executed_step=False, # type: bool
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
status_change_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
tags=None # type: Optional[Union[str, Sequence[str]]]
): ):
# type: (...) -> bool # type: (...) -> bool
""" """
@ -1917,6 +1940,9 @@ class PipelineController(object):
): ):
pass pass
:param tags: A list of tags for the specific pipeline step.
When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:return: True if successful :return: True if successful
""" """
# always store callback functions (even when running remotely) # always store callback functions (even when running remotely)
@ -1998,6 +2024,10 @@ class PipelineController(object):
) )
# replace reference # replace reference
a_task.update_task(task_definition) a_task.update_task(task_definition)
if tags:
a_task.add_tags(tags)
return a_task return a_task
self._nodes[name] = self.Node( self._nodes[name] = self.Node(
@ -3493,7 +3523,8 @@ class PipelineDecorator(PipelineController):
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
status_change_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
tags=None # type: Optional[Union[str, Sequence[str]]]
): ):
# type: (...) -> Callable # type: (...) -> Callable
""" """
@ -3628,6 +3659,8 @@ class PipelineDecorator(PipelineController):
): ):
pass pass
:param tags: A list of tags for the specific pipeline step.
When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:return: function wrapper :return: function wrapper
""" """
@ -3669,7 +3702,8 @@ class PipelineDecorator(PipelineController):
monitor_artifacts=monitor_artifacts, monitor_artifacts=monitor_artifacts,
pre_execute_callback=pre_execute_callback, pre_execute_callback=pre_execute_callback,
post_execute_callback=post_execute_callback, post_execute_callback=post_execute_callback,
status_change_callback=status_change_callback status_change_callback=status_change_callback,
tags=tags
) )
if cls._singleton: if cls._singleton: