mirror of
https://github.com/clearml/clearml
synced 2025-03-04 02:57:24 +00:00
Add nested pipeline components missing pipeline tags
Fix nested pipeline component point parent point to pipeline Task
This commit is contained in:
parent
c8340d4d3c
commit
8e1ff6eed9
@ -31,6 +31,7 @@ class PipelineController(object):
|
|||||||
Notice: The pipeline controller lives as long as the pipeline itself is being executed.
|
Notice: The pipeline controller lives as long as the pipeline itself is being executed.
|
||||||
"""
|
"""
|
||||||
_tag = 'pipeline'
|
_tag = 'pipeline'
|
||||||
|
_node_tag_prefix = 'pipe:'
|
||||||
_step_pattern = r"\${[^}]*}"
|
_step_pattern = r"\${[^}]*}"
|
||||||
_config_section = 'Pipeline'
|
_config_section = 'Pipeline'
|
||||||
_args_section = 'Args'
|
_args_section = 'Args'
|
||||||
@ -1435,7 +1436,8 @@ class PipelineController(object):
|
|||||||
base_task_id=task_id,
|
base_task_id=task_id,
|
||||||
parameter_override=updated_hyper_parameters,
|
parameter_override=updated_hyper_parameters,
|
||||||
configuration_overrides=node.configurations,
|
configuration_overrides=node.configurations,
|
||||||
tags=['pipe: {}'.format(self._task.id)] if self._add_pipeline_tags and self._task else None,
|
tags=['{} {}'.format(self._node_tag_prefix, self._task.id)]
|
||||||
|
if self._add_pipeline_tags and self._task else None,
|
||||||
parent=self._task.id if self._task else None,
|
parent=self._task.id if self._task else None,
|
||||||
disable_clone_task=disable_clone_task,
|
disable_clone_task=disable_clone_task,
|
||||||
task_overrides=task_overrides,
|
task_overrides=task_overrides,
|
||||||
@ -2720,6 +2722,14 @@ class PipelineDecorator(PipelineController):
|
|||||||
cls._singleton._launch_node(_node)
|
cls._singleton._launch_node(_node)
|
||||||
# check if we generated the pipeline we need to update the new eager step
|
# check if we generated the pipeline we need to update the new eager step
|
||||||
if PipelineDecorator._eager_execution_instance and _node.job:
|
if PipelineDecorator._eager_execution_instance and _node.job:
|
||||||
|
# check if we need to add the pipeline tag on the new node
|
||||||
|
pipeline_tags = [t for t in Task.current_task().get_tags() or []
|
||||||
|
if str(t).startswith(cls._node_tag_prefix)]
|
||||||
|
if pipeline_tags and _node.job and _node.job.task:
|
||||||
|
pipeline_tags = list(set((_node.job.task.get_tags() or []) + pipeline_tags))
|
||||||
|
_node.job.task.set_tags(pipeline_tags)
|
||||||
|
# force parent task as pipeline
|
||||||
|
_node.job.task._edit(parent=Task.current_task().parent)
|
||||||
# store the new generated node, so we can later serialize it
|
# store the new generated node, so we can later serialize it
|
||||||
pipeline_dag = cls._singleton._serialize()
|
pipeline_dag = cls._singleton._serialize()
|
||||||
# check if node is cached
|
# check if node is cached
|
||||||
|
Loading…
Reference in New Issue
Block a user