Fix pipeline proxy object is always resolved in main pipeline logic

This commit is contained in:
allegroai 2022-07-21 17:19:06 +03:00
parent beb1bd447b
commit 2d7dde93e2
2 changed files with 22 additions and 14 deletions

View File

@ -3029,8 +3029,7 @@ class PipelineDecorator(PipelineController):
{ {
k: walk_nested_dict_tuple_list( k: walk_nested_dict_tuple_list(
v, v,
lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x, lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x
stop_condition=lambda x: isinstance(x, LazyEvalWrapper) and x._remoteref(),
) )
for k, v in kwargs.items() for k, v in kwargs.items()
if isinstance(v, LazyEvalWrapper) if isinstance(v, LazyEvalWrapper)
@ -3129,11 +3128,15 @@ class PipelineDecorator(PipelineController):
'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())) 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()))
_node.executed = _node.job.task_id() _node.executed = _node.job.task_id()
tid = current_thread().ident
if cls._add_to_evaluated_return_values.get(tid, True): # make sure we mark the current state of the DAG execution tree
if tid not in cls._evaluated_return_values: # so that later we can find the "parents" to the current node
cls._evaluated_return_values[tid] = [] _tid = current_thread().ident
cls._evaluated_return_values[tid].append(_node.name) if cls._add_to_evaluated_return_values.get(_tid, True):
if _tid not in cls._evaluated_return_values:
cls._evaluated_return_values[_tid] = []
cls._evaluated_return_values[_tid].append(_node.name)
return Task.get_task(_node.job.task_id()).artifacts[return_name].get() return Task.get_task(_node.job.task_id()).artifacts[return_name].get()
return_w = [LazyEvalWrapper( return_w = [LazyEvalWrapper(
@ -3453,7 +3456,8 @@ class PipelineDecorator(PipelineController):
break break
if kwargs: if kwargs:
leaves = cls._singleton._find_executed_node_leaves() leaves = cls._singleton._find_executed_node_leaves()
_node.parents = (_node.parents or []) + [x for x in cls._evaluated_return_values.get(tid, []) if x in leaves] _node.parents = (_node.parents or []) + \
[x for x in cls._evaluated_return_values.get(tid, []) if x in leaves]
for k, v in kwargs.items(): for k, v in kwargs.items():
if v is None or isinstance(v, (bool, int, float, str)): if v is None or isinstance(v, (bool, int, float, str)):
_node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v _node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v

View File

@ -221,20 +221,24 @@ def naive_nested_from_flat_dictionary(flat_dict, sep='/'):
} }
def walk_nested_dict_tuple_list(dict_list_tuple, callback, stop_condition=None): def walk_nested_dict_tuple_list(dict_list_tuple, callback):
# Do Not Change, type call will not trigger the auto resolving / download of the Lazy evaluator
nested = (dict, tuple, list) nested = (dict, tuple, list)
if not isinstance(dict_list_tuple, nested) or (stop_condition and stop_condition(dict_list_tuple)): type_dict_list_tuple = type(dict_list_tuple)
if type_dict_list_tuple not in nested:
return callback(dict_list_tuple) return callback(dict_list_tuple)
if isinstance(dict_list_tuple, dict): if type_dict_list_tuple == dict:
ret = {} ret = {}
for k, v in dict_list_tuple.items(): for k, v in dict_list_tuple.items():
ret[k] = walk_nested_dict_tuple_list(v, callback=callback) if isinstance(v, nested) else callback(v) ret[k] = walk_nested_dict_tuple_list(v, callback=callback) if type(v) in nested else callback(v)
else: else:
ret = [] ret = []
for v in dict_list_tuple: for v in dict_list_tuple:
ret.append(walk_nested_dict_tuple_list(v, callback=callback) if isinstance(v, nested) else callback(v)) ret.append(walk_nested_dict_tuple_list(v, callback=callback) if type(v) in nested else callback(v))
if isinstance(dict_list_tuple, tuple):
if type_dict_list_tuple == tuple:
ret = tuple(dict_list_tuple) ret = tuple(dict_list_tuple)
return ret return ret