diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 6ec2664a..464fc7f4 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -3029,8 +3029,7 @@ class PipelineDecorator(PipelineController): { k: walk_nested_dict_tuple_list( v, - lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x, - stop_condition=lambda x: isinstance(x, LazyEvalWrapper) and x._remoteref(), + lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x ) for k, v in kwargs.items() if isinstance(v, LazyEvalWrapper) @@ -3129,11 +3128,15 @@ class PipelineDecorator(PipelineController): 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())) _node.executed = _node.job.task_id() - tid = current_thread().ident - if cls._add_to_evaluated_return_values.get(tid, True): - if tid not in cls._evaluated_return_values: - cls._evaluated_return_values[tid] = [] - cls._evaluated_return_values[tid].append(_node.name) + + # make sure we mark the current state of the DAG execution tree + # so that later we can find the "parents" to the current node + _tid = current_thread().ident + if cls._add_to_evaluated_return_values.get(_tid, True): + if _tid not in cls._evaluated_return_values: + cls._evaluated_return_values[_tid] = [] + cls._evaluated_return_values[_tid].append(_node.name) + return Task.get_task(_node.job.task_id()).artifacts[return_name].get() return_w = [LazyEvalWrapper( @@ -3453,7 +3456,8 @@ class PipelineDecorator(PipelineController): break if kwargs: leaves = cls._singleton._find_executed_node_leaves() - _node.parents = (_node.parents or []) + [x for x in cls._evaluated_return_values.get(tid, []) if x in leaves] + _node.parents = (_node.parents or []) + \ + [x for x in cls._evaluated_return_values.get(tid, []) if x in leaves] for k, v in kwargs.items(): if v is None or isinstance(v, (bool, int, float, str)): _node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v diff --git a/clearml/utilities/proxy_object.py b/clearml/utilities/proxy_object.py index 97161c7d..72c6491f 100644 --- a/clearml/utilities/proxy_object.py +++ b/clearml/utilities/proxy_object.py @@ -221,20 +221,24 @@ def naive_nested_from_flat_dictionary(flat_dict, sep='/'): } -def walk_nested_dict_tuple_list(dict_list_tuple, callback, stop_condition=None): +def walk_nested_dict_tuple_list(dict_list_tuple, callback): + # Do Not Change, type call will not trigger the auto resolving / download of the Lazy evaluator nested = (dict, tuple, list) - if not isinstance(dict_list_tuple, nested) or (stop_condition and stop_condition(dict_list_tuple)): + type_dict_list_tuple = type(dict_list_tuple) + if type_dict_list_tuple not in nested: return callback(dict_list_tuple) - if isinstance(dict_list_tuple, dict): + if type_dict_list_tuple == dict: ret = {} for k, v in dict_list_tuple.items(): - ret[k] = walk_nested_dict_tuple_list(v, callback=callback) if isinstance(v, nested) else callback(v) + ret[k] = walk_nested_dict_tuple_list(v, callback=callback) if type(v) in nested else callback(v) + else: ret = [] for v in dict_list_tuple: - ret.append(walk_nested_dict_tuple_list(v, callback=callback) if isinstance(v, nested) else callback(v)) - if isinstance(dict_list_tuple, tuple): + ret.append(walk_nested_dict_tuple_list(v, callback=callback) if type(v) in nested else callback(v)) + + if type_dict_list_tuple == tuple: ret = tuple(dict_list_tuple) return ret