Fix pipeline clone logic

This commit is contained in:
allegroai 2022-10-23 15:21:30 +03:00
parent 9d096d8ab0
commit 2766255e8f

View File

@ -1228,13 +1228,6 @@ class PipelineController(object):
for node in list(self._nodes.values()):
if node.executed is False:
node.executed = None
# Associate any nodes that represent second or subsequent calls to a component function with the function.
for node in list(self._nodes.values()):
if not node.base_task_id and not node.task_factory_func and node.job_code_section:
if node.job_code_section in self._nodes:
func = self._nodes[node.job_code_section].task_factory_func
if func:
node.task_factory_func = func
if not self._verify():
raise ValueError("Failed verifying pipeline execution graph, "
"it has either inaccessible nodes, or contains cycles")
@ -1463,6 +1456,14 @@ class PipelineController(object):
else self._nodes[k]
for k, v in dag_dict.items()}
# set the task_factory_func for each cloned node
for node in list(self._nodes.values()):
if not node.base_task_id and not node.task_factory_func and node.job_code_section:
if node.job_code_section in self._nodes:
func = self._nodes[node.job_code_section].task_factory_func
if func:
node.task_factory_func = func
def _has_stored_configuration(self):
"""
Return True if we are running remotely and we have stored configuration on the Task
@ -1813,7 +1814,7 @@ class PipelineController(object):
for k, v in function_input_artifacts.items()}
)
job_code_section = None
job_code_section = name
task_name = task_name or name or None
if self._mock_execution:
@ -1841,7 +1842,6 @@ class PipelineController(object):
name=name, config_type='json',
config_text=json.dumps(task_definition, indent=1)
)
job_code_section = name
else:
# load task definition from configuration
# noinspection PyProtectedMember
@ -3483,8 +3483,17 @@ class PipelineDecorator(PipelineController):
_node.parents = []
# find a new name
counter = 1
while _node.name in cls._singleton._launched_step_names:
_node.name = '{}_{}'.format(_node_name, counter)
# Use nodes in `_singleton._nodes` that have not been launched.
# First check if we launched the node.
# If it wasn't launched we also need to check that the new name of `_node`
# points to the original code section it was meant to run.
# Note that for the first iteration (when `_node.name == _node_name`)
# we always increment the name, as the name is always in `_launched_step_names`
while _node.name in cls._singleton._launched_step_names or (
_node.name in cls._singleton._nodes
and cls._singleton._nodes[_node.name].job_code_section != cls._singleton._nodes[_node_name].job_code_section
):
_node.name = "{}_{}".format(_node_name, counter)
counter += 1
_node_name = _node.name
if _node.name not in cls._singleton._nodes: