From 4ad4939febfbf0066394add08d835f3a23688a14 Mon Sep 17 00:00:00 2001 From: Alex Burlacu Date: Mon, 3 Apr 2023 18:35:11 +0300 Subject: [PATCH] Add tags to pipeline steps and controller --- clearml/automation/controller.py | 48 +++++++++++++++++++++++++++----- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 383fd8f1..06ecfbd8 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -607,7 +607,7 @@ class PipelineController(object): docker=None, # type: Optional[str] docker_args=None, # type: Optional[str] docker_bash_setup_script=None, # type: Optional[str] - parents=None, # type: Optional[Sequence[str]], + parents=None, # type: Optional[Sequence[str]] execution_queue=None, # type: Optional[str] monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] @@ -618,7 +618,8 @@ class PipelineController(object): post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa cache_executed_step=False, # type: bool retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa - status_change_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa + status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa + tags=None # type: Optional[Union[str, Sequence[str]]] ): # type: (...) -> bool """ @@ -782,6 +783,9 @@ class PipelineController(object): ): pass + :param tags: A list of tags for the specific pipeline step. + When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. + :return: True if successful """ function_kwargs = function_kwargs or {} @@ -819,7 +823,8 @@ class PipelineController(object): post_execute_callback=post_execute_callback, cache_executed_step=cache_executed_step, retry_on_failure=retry_on_failure, - status_change_callback=status_change_callback + status_change_callback=status_change_callback, + tags=tags ) def start( @@ -1286,6 +1291,23 @@ class PipelineController(object): """ return self._pipeline_args + @property + def tags(self): + # type: () -> List[str] + return self._task.get_tags() or [] + + def add_tags(self, tags): + # type: (Union[Sequence[str], str]) -> None + """ + Add tags to this pipeline. Old tags are not deleted. + When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. + + :param tags: A list of tags for this pipeline. + """ + if not self._task: + return # should not actually happen + self._task.add_tags(tags) + def _create_task_from_function( self, docker, docker_args, docker_bash_setup_script, function, function_input_artifacts, function_kwargs, function_return, @@ -1742,7 +1764,7 @@ class PipelineController(object): docker=None, # type: Optional[str] docker_args=None, # type: Optional[str] docker_bash_setup_script=None, # type: Optional[str] - parents=None, # type: Optional[Sequence[str]], + parents=None, # type: Optional[Sequence[str]] execution_queue=None, # type: Optional[str] monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] @@ -1753,7 +1775,8 @@ class PipelineController(object): post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa cache_executed_step=False, # type: bool retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa - status_change_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa + status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa + tags=None # type: Optional[Union[str, Sequence[str]]] ): # type: (...) -> bool """ @@ -1917,6 +1940,9 @@ class PipelineController(object): ): pass + :param tags: A list of tags for the specific pipeline step. + When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. + :return: True if successful """ # always store callback functions (even when running remotely) @@ -1998,6 +2024,10 @@ class PipelineController(object): ) # replace reference a_task.update_task(task_definition) + + if tags: + a_task.add_tags(tags) + return a_task self._nodes[name] = self.Node( @@ -3493,7 +3523,8 @@ class PipelineDecorator(PipelineController): retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa - status_change_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa + status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa + tags=None # type: Optional[Union[str, Sequence[str]]] ): # type: (...) -> Callable """ @@ -3628,6 +3659,8 @@ class PipelineDecorator(PipelineController): ): pass + :param tags: A list of tags for the specific pipeline step. + When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. :return: function wrapper """ @@ -3669,7 +3702,8 @@ class PipelineDecorator(PipelineController): monitor_artifacts=monitor_artifacts, pre_execute_callback=pre_execute_callback, post_execute_callback=post_execute_callback, - status_change_callback=status_change_callback + status_change_callback=status_change_callback, + tags=tags ) if cls._singleton: