From 8e1ff6eed941345adcdc36965cb2f04690587d2d Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 16 Oct 2021 23:17:14 +0300 Subject: [PATCH] Add nested pipeline components missing pipeline tags Fix nested pipeline component point parent point to pipeline Task --- clearml/automation/controller.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 44ade492..d7e2492a 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -31,6 +31,7 @@ class PipelineController(object): Notice: The pipeline controller lives as long as the pipeline itself is being executed. """ _tag = 'pipeline' + _node_tag_prefix = 'pipe:' _step_pattern = r"\${[^}]*}" _config_section = 'Pipeline' _args_section = 'Args' @@ -1435,7 +1436,8 @@ class PipelineController(object): base_task_id=task_id, parameter_override=updated_hyper_parameters, configuration_overrides=node.configurations, - tags=['pipe: {}'.format(self._task.id)] if self._add_pipeline_tags and self._task else None, + tags=['{} {}'.format(self._node_tag_prefix, self._task.id)] + if self._add_pipeline_tags and self._task else None, parent=self._task.id if self._task else None, disable_clone_task=disable_clone_task, task_overrides=task_overrides, @@ -2720,6 +2722,14 @@ class PipelineDecorator(PipelineController): cls._singleton._launch_node(_node) # check if we generated the pipeline we need to update the new eager step if PipelineDecorator._eager_execution_instance and _node.job: + # check if we need to add the pipeline tag on the new node + pipeline_tags = [t for t in Task.current_task().get_tags() or [] + if str(t).startswith(cls._node_tag_prefix)] + if pipeline_tags and _node.job and _node.job.task: + pipeline_tags = list(set((_node.job.task.get_tags() or []) + pipeline_tags)) + _node.job.task.set_tags(pipeline_tags) + # force parent task as pipeline + _node.job.task._edit(parent=Task.current_task().parent) # store the new generated node, so we can later serialize it pipeline_dag = cls._singleton._serialize() # check if node is cached