From 2766255e8ff5cf7b105b4b5f719e484328d34c8a Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 23 Oct 2022 15:21:30 +0300 Subject: [PATCH] Fix pipeline clone logic --- clearml/automation/controller.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 0768aa4b..745c15aa 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -1228,13 +1228,6 @@ class PipelineController(object): for node in list(self._nodes.values()): if node.executed is False: node.executed = None - # Associate any nodes that represent second or subsequent calls to a component function with the function. - for node in list(self._nodes.values()): - if not node.base_task_id and not node.task_factory_func and node.job_code_section: - if node.job_code_section in self._nodes: - func = self._nodes[node.job_code_section].task_factory_func - if func: - node.task_factory_func = func if not self._verify(): raise ValueError("Failed verifying pipeline execution graph, " "it has either inaccessible nodes, or contains cycles") @@ -1463,6 +1456,14 @@ class PipelineController(object): else self._nodes[k] for k, v in dag_dict.items()} + # set the task_factory_func for each cloned node + for node in list(self._nodes.values()): + if not node.base_task_id and not node.task_factory_func and node.job_code_section: + if node.job_code_section in self._nodes: + func = self._nodes[node.job_code_section].task_factory_func + if func: + node.task_factory_func = func + def _has_stored_configuration(self): """ Return True if we are running remotely and we have stored configuration on the Task @@ -1813,7 +1814,7 @@ class PipelineController(object): for k, v in function_input_artifacts.items()} ) - job_code_section = None + job_code_section = name task_name = task_name or name or None if self._mock_execution: @@ -1841,7 +1842,6 @@ class PipelineController(object): name=name, config_type='json', config_text=json.dumps(task_definition, indent=1) ) - job_code_section = name else: # load task definition from configuration # noinspection PyProtectedMember @@ -3483,8 +3483,17 @@ class PipelineDecorator(PipelineController): _node.parents = [] # find a new name counter = 1 - while _node.name in cls._singleton._launched_step_names: - _node.name = '{}_{}'.format(_node_name, counter) + # Use nodes in `_singleton._nodes` that have not been launched. + # First check if we launched the node. + # If it wasn't launched we also need to check that the new name of `_node` + # points to the original code section it was meant to run. + # Note that for the first iteration (when `_node.name == _node_name`) + # we always increment the name, as the name is always in `_launched_step_names` + while _node.name in cls._singleton._launched_step_names or ( + _node.name in cls._singleton._nodes + and cls._singleton._nodes[_node.name].job_code_section != cls._singleton._nodes[_node_name].job_code_section + ): + _node.name = "{}_{}".format(_node_name, counter) counter += 1 _node_name = _node.name if _node.name not in cls._singleton._nodes: