Fix pipeline DAG

This commit is contained in:
allegroai 2022-07-15 16:19:15 +03:00
parent 0c4555fcef
commit 7ec0691910
3 changed files with 48 additions and 30 deletions

View File

@ -10,7 +10,7 @@ from datetime import datetime
from logging import getLogger
from multiprocessing import Process, Queue
from multiprocessing.pool import ThreadPool
from threading import Thread, Event, RLock
from threading import Thread, Event, RLock, current_thread
from time import time
from typing import Sequence, Optional, Mapping, Callable, Any, List, Dict, Union, Tuple
@ -57,6 +57,8 @@ class PipelineController(object):
_monitor_node_interval = 5.*60
_report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow')
_report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details')
_evaluated_return_values = {} # TID: pipeline_name
_add_to_evaluated_return_values = {} # TID: bool
valid_job_status = ["failed", "cached", "completed", "aborted", "queued", "running", "skipped", "pending"]
@ -66,11 +68,11 @@ class PipelineController(object):
base_task_id = attrib(type=str, default=None) # base Task ID to be cloned and launched
task_factory_func = attrib(type=Callable, default=None) # alternative to base_task_id, function creating a Task
queue = attrib(type=str, default=None) # execution queue name to use
parents = attrib(type=list, default=[]) # list of parent DAG steps
parents = attrib(type=list, default=None) # list of parent DAG steps
timeout = attrib(type=float, default=None) # execution timeout limit
parameters = attrib(type=dict, default={}) # Task hyper parameters to change
configurations = attrib(type=dict, default={}) # Task configuration objects to change
task_overrides = attrib(type=dict, default={}) # Task overrides to change
parameters = attrib(type=dict, default=None) # Task hyper parameters to change
configurations = attrib(type=dict, default=None) # Task configuration objects to change
task_overrides = attrib(type=dict, default=None) # Task overrides to change
executed = attrib(type=str, default=None) # The actual executed Task ID (None if not executed yet)
status = attrib(type=str, default="pending") # The Node Task status (cached, aborted, etc.)
clone_task = attrib(type=bool, default=True) # If True cline the base_task_id, then execute the cloned Task
@ -82,10 +84,28 @@ class PipelineController(object):
skip_job = attrib(type=bool, default=False) # if True, this step should be skipped
continue_on_fail = attrib(type=bool, default=False) # if True, the pipeline continues even if the step failed
cache_executed_step = attrib(type=bool, default=False) # if True this pipeline step should be cached
return_artifacts = attrib(type=list, default=[]) # List of artifact names returned by the step
monitor_metrics = attrib(type=list, default=[]) # List of metric title/series to monitor
monitor_artifacts = attrib(type=list, default=[]) # List of artifact names to monitor
monitor_models = attrib(type=list, default=[]) # List of models to monitor
return_artifacts = attrib(type=list, default=None) # List of artifact names returned by the step
monitor_metrics = attrib(type=list, default=None) # List of metric title/series to monitor
monitor_artifacts = attrib(type=list, default=None) # List of artifact names to monitor
monitor_models = attrib(type=list, default=None) # List of models to monitor
def __attrs_post_init__(self):
if self.parents is None:
self.parents = []
if self.parameters is None:
self.parameters = {}
if self.configurations is None:
self.configurations = {}
if self.task_overrides is None:
self.task_overrides = {}
if self.return_artifacts is None:
self.return_artifacts = []
if self.monitor_metrics is None:
self.monitor_metrics = []
if self.monitor_artifacts is None:
self.monitor_artifacts = []
if self.monitor_models is None:
self.monitor_models = []
def copy(self):
# type: () -> PipelineController.Node
@ -3000,6 +3020,11 @@ class PipelineDecorator(PipelineController):
for i, v in enumerate(args):
kwargs[inspect_func.args[i]] = v
# We need to remember when a pipeline step's return value is evaluated by the pipeline
# controller, but not when it's done here (as we would remember the step every time).
# _add_to_evaluated_return_values protects that
tid = current_thread().ident
cls._add_to_evaluated_return_values[tid] = False
kwargs_artifacts.update(
{
k: walk_nested_dict_tuple_list(
@ -3011,6 +3036,7 @@ class PipelineDecorator(PipelineController):
if isinstance(v, LazyEvalWrapper)
}
)
cls._add_to_evaluated_return_values[tid] = True
kwargs = {k: deepcopy(v) for k, v in kwargs.items() if not isinstance(v, LazyEvalWrapper)}
# check if we have the singleton
@ -3050,6 +3076,7 @@ class PipelineDecorator(PipelineController):
# if we already launched a JOB on the node, this means we are calling the same function/task
# twice inside the pipeline, this means we need to replicate the node.
_node = cls._singleton._nodes[_node_name].copy()
_node.parents = []
# find a new name
counter = 1
while _node.name in cls._singleton._nodes:
@ -3065,7 +3092,7 @@ class PipelineDecorator(PipelineController):
# The actual launch is a bit slow, we run it in the background
launch_thread = Thread(
target=cls._component_launch,
args=(_node_name, _node, kwargs_artifacts, kwargs))
args=(_node_name, _node, kwargs_artifacts, kwargs, current_thread().ident))
def results_reference(return_name):
# wait until launch is completed
@ -3102,6 +3129,11 @@ class PipelineDecorator(PipelineController):
'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()))
_node.executed = _node.job.task_id()
tid = current_thread().ident
if cls._add_to_evaluated_return_values.get(tid, True):
if tid not in cls._evaluated_return_values:
cls._evaluated_return_values[tid] = []
cls._evaluated_return_values[tid].append(_node.name)
return Task.get_task(_node.job.task_id()).artifacts[return_name].get()
return_w = [LazyEvalWrapper(
@ -3402,7 +3434,7 @@ class PipelineDecorator(PipelineController):
return cls._wait_for_multi_pipelines()
@classmethod
def _component_launch(cls, node_name, node, kwargs_artifacts, kwargs):
def _component_launch(cls, node_name, node, kwargs_artifacts, kwargs, tid):
_node_name = node_name
_node = node
# update artifacts kwargs
@ -3414,37 +3446,26 @@ class PipelineDecorator(PipelineController):
if v and '.' in str(v):
parent_id, _ = str(v).split('.', 1)
# find parent and push it into the _node.parents
for n, node in list(cls._singleton._nodes.items()):
for n, node in sorted(list(cls._singleton._nodes.items()), reverse=True):
if n != _node.name and node.executed and node.executed == parent_id:
if n not in _node.parents:
_node.parents.append(n)
break
if kwargs:
leaves = cls._singleton._find_executed_node_leaves()
_node.parents = (_node.parents or []) + [x for x in cls._evaluated_return_values.get(tid, []) if x in leaves]
for k, v in kwargs.items():
if v is None or isinstance(v, (bool, int, float, str)):
_node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v
elif isinstance(v, (list, tuple)) and all(isinstance(i, (bool, int, float, str)) for i in v):
_node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v
else:
# find parents if we have any
arg_parents = []
if isinstance(v, (list, tuple, dict)):
walk_nested_dict_tuple_list(
v,
callback=lambda x:
not cls._ref_lazy_loader_id_to_node_name.get(id(x)) or
arg_parents.append(cls._ref_lazy_loader_id_to_node_name[id(x)])
)
# we need to create an artifact
artifact_name = 'result_{}_{}'.format(re.sub(r'\W+', '', _node.name), k)
cls._singleton._task.upload_artifact(
name=artifact_name, artifact_object=v, wait_on_upload=True)
_node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = \
"{}.{}".format(cls._singleton._task.id, artifact_name)
# now add all the executed nodes as parents (only the leaves of the DAG, no need for parents)
_node.parents = list(
set((_node.parents or []) + cls._singleton._find_executed_node_leaves() + arg_parents)
- set(list(_node.name)))
# verify the new step
cls._singleton._verify_node(_node)

View File

@ -708,7 +708,7 @@ class ScriptInfo(object):
try:
# Use os.path.relpath as it calculates up dir movements (../)
entry_point = os.path.relpath(
str(script_path), str(cls._get_working_dir(repo_root, return_abs=True)))
str(os.path.realpath(script_path)), str(cls._get_working_dir(repo_root, return_abs=True)))
except ValueError:
# Working directory not under repository root
entry_point = script_path.relative_to(repo_root)

View File

@ -77,9 +77,6 @@ def executing_pipeline(pickle_url, mock_parameter='mock'):
print('launch step two')
processed_data = step_two(data_frame)
# Notice we can actually process/modify the returned values inside the pipeline logic context.
# This means the modified object will be stored on the pipeline Task.
processed_data = [processed_data[0], processed_data[1]*2, processed_data[2], processed_data[3]]
print('launch step three')
model = step_three(processed_data)