diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 1bec5442..6ec2664a 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -10,7 +10,7 @@ from datetime import datetime from logging import getLogger from multiprocessing import Process, Queue from multiprocessing.pool import ThreadPool -from threading import Thread, Event, RLock +from threading import Thread, Event, RLock, current_thread from time import time from typing import Sequence, Optional, Mapping, Callable, Any, List, Dict, Union, Tuple @@ -57,6 +57,8 @@ class PipelineController(object): _monitor_node_interval = 5.*60 _report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow') _report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details') + _evaluated_return_values = {} # TID: pipeline_name + _add_to_evaluated_return_values = {} # TID: bool valid_job_status = ["failed", "cached", "completed", "aborted", "queued", "running", "skipped", "pending"] @@ -66,11 +68,11 @@ class PipelineController(object): base_task_id = attrib(type=str, default=None) # base Task ID to be cloned and launched task_factory_func = attrib(type=Callable, default=None) # alternative to base_task_id, function creating a Task queue = attrib(type=str, default=None) # execution queue name to use - parents = attrib(type=list, default=[]) # list of parent DAG steps + parents = attrib(type=list, default=None) # list of parent DAG steps timeout = attrib(type=float, default=None) # execution timeout limit - parameters = attrib(type=dict, default={}) # Task hyper parameters to change - configurations = attrib(type=dict, default={}) # Task configuration objects to change - task_overrides = attrib(type=dict, default={}) # Task overrides to change + parameters = attrib(type=dict, default=None) # Task hyper parameters to change + configurations = attrib(type=dict, default=None) # Task configuration objects to change + task_overrides = attrib(type=dict, default=None) # Task overrides to change executed = attrib(type=str, default=None) # The actual executed Task ID (None if not executed yet) status = attrib(type=str, default="pending") # The Node Task status (cached, aborted, etc.) clone_task = attrib(type=bool, default=True) # If True cline the base_task_id, then execute the cloned Task @@ -82,10 +84,28 @@ class PipelineController(object): skip_job = attrib(type=bool, default=False) # if True, this step should be skipped continue_on_fail = attrib(type=bool, default=False) # if True, the pipeline continues even if the step failed cache_executed_step = attrib(type=bool, default=False) # if True this pipeline step should be cached - return_artifacts = attrib(type=list, default=[]) # List of artifact names returned by the step - monitor_metrics = attrib(type=list, default=[]) # List of metric title/series to monitor - monitor_artifacts = attrib(type=list, default=[]) # List of artifact names to monitor - monitor_models = attrib(type=list, default=[]) # List of models to monitor + return_artifacts = attrib(type=list, default=None) # List of artifact names returned by the step + monitor_metrics = attrib(type=list, default=None) # List of metric title/series to monitor + monitor_artifacts = attrib(type=list, default=None) # List of artifact names to monitor + monitor_models = attrib(type=list, default=None) # List of models to monitor + + def __attrs_post_init__(self): + if self.parents is None: + self.parents = [] + if self.parameters is None: + self.parameters = {} + if self.configurations is None: + self.configurations = {} + if self.task_overrides is None: + self.task_overrides = {} + if self.return_artifacts is None: + self.return_artifacts = [] + if self.monitor_metrics is None: + self.monitor_metrics = [] + if self.monitor_artifacts is None: + self.monitor_artifacts = [] + if self.monitor_models is None: + self.monitor_models = [] def copy(self): # type: () -> PipelineController.Node @@ -3000,6 +3020,11 @@ class PipelineDecorator(PipelineController): for i, v in enumerate(args): kwargs[inspect_func.args[i]] = v + # We need to remember when a pipeline step's return value is evaluated by the pipeline + # controller, but not when it's done here (as we would remember the step every time). + # _add_to_evaluated_return_values protects that + tid = current_thread().ident + cls._add_to_evaluated_return_values[tid] = False kwargs_artifacts.update( { k: walk_nested_dict_tuple_list( @@ -3011,6 +3036,7 @@ class PipelineDecorator(PipelineController): if isinstance(v, LazyEvalWrapper) } ) + cls._add_to_evaluated_return_values[tid] = True kwargs = {k: deepcopy(v) for k, v in kwargs.items() if not isinstance(v, LazyEvalWrapper)} # check if we have the singleton @@ -3050,6 +3076,7 @@ class PipelineDecorator(PipelineController): # if we already launched a JOB on the node, this means we are calling the same function/task # twice inside the pipeline, this means we need to replicate the node. _node = cls._singleton._nodes[_node_name].copy() + _node.parents = [] # find a new name counter = 1 while _node.name in cls._singleton._nodes: @@ -3065,7 +3092,7 @@ class PipelineDecorator(PipelineController): # The actual launch is a bit slow, we run it in the background launch_thread = Thread( target=cls._component_launch, - args=(_node_name, _node, kwargs_artifacts, kwargs)) + args=(_node_name, _node, kwargs_artifacts, kwargs, current_thread().ident)) def results_reference(return_name): # wait until launch is completed @@ -3102,6 +3129,11 @@ class PipelineDecorator(PipelineController): 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())) _node.executed = _node.job.task_id() + tid = current_thread().ident + if cls._add_to_evaluated_return_values.get(tid, True): + if tid not in cls._evaluated_return_values: + cls._evaluated_return_values[tid] = [] + cls._evaluated_return_values[tid].append(_node.name) return Task.get_task(_node.job.task_id()).artifacts[return_name].get() return_w = [LazyEvalWrapper( @@ -3402,7 +3434,7 @@ class PipelineDecorator(PipelineController): return cls._wait_for_multi_pipelines() @classmethod - def _component_launch(cls, node_name, node, kwargs_artifacts, kwargs): + def _component_launch(cls, node_name, node, kwargs_artifacts, kwargs, tid): _node_name = node_name _node = node # update artifacts kwargs @@ -3414,37 +3446,26 @@ class PipelineDecorator(PipelineController): if v and '.' in str(v): parent_id, _ = str(v).split('.', 1) # find parent and push it into the _node.parents - for n, node in list(cls._singleton._nodes.items()): + for n, node in sorted(list(cls._singleton._nodes.items()), reverse=True): if n != _node.name and node.executed and node.executed == parent_id: if n not in _node.parents: _node.parents.append(n) break + if kwargs: + leaves = cls._singleton._find_executed_node_leaves() + _node.parents = (_node.parents or []) + [x for x in cls._evaluated_return_values.get(tid, []) if x in leaves] for k, v in kwargs.items(): if v is None or isinstance(v, (bool, int, float, str)): _node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v elif isinstance(v, (list, tuple)) and all(isinstance(i, (bool, int, float, str)) for i in v): _node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v else: - # find parents if we have any - arg_parents = [] - if isinstance(v, (list, tuple, dict)): - walk_nested_dict_tuple_list( - v, - callback=lambda x: - not cls._ref_lazy_loader_id_to_node_name.get(id(x)) or - arg_parents.append(cls._ref_lazy_loader_id_to_node_name[id(x)]) - ) - # we need to create an artifact artifact_name = 'result_{}_{}'.format(re.sub(r'\W+', '', _node.name), k) cls._singleton._task.upload_artifact( name=artifact_name, artifact_object=v, wait_on_upload=True) _node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = \ "{}.{}".format(cls._singleton._task.id, artifact_name) - # now add all the executed nodes as parents (only the leaves of the DAG, no need for parents) - _node.parents = list( - set((_node.parents or []) + cls._singleton._find_executed_node_leaves() + arg_parents) - - set(list(_node.name))) # verify the new step cls._singleton._verify_node(_node) diff --git a/clearml/backend_interface/task/repo/scriptinfo.py b/clearml/backend_interface/task/repo/scriptinfo.py index d58b58db..bb6ab02a 100644 --- a/clearml/backend_interface/task/repo/scriptinfo.py +++ b/clearml/backend_interface/task/repo/scriptinfo.py @@ -708,7 +708,7 @@ class ScriptInfo(object): try: # Use os.path.relpath as it calculates up dir movements (../) entry_point = os.path.relpath( - str(script_path), str(cls._get_working_dir(repo_root, return_abs=True))) + str(os.path.realpath(script_path)), str(cls._get_working_dir(repo_root, return_abs=True))) except ValueError: # Working directory not under repository root entry_point = script_path.relative_to(repo_root) diff --git a/examples/pipeline/pipeline_from_decorator.py b/examples/pipeline/pipeline_from_decorator.py index be606fd3..12926996 100644 --- a/examples/pipeline/pipeline_from_decorator.py +++ b/examples/pipeline/pipeline_from_decorator.py @@ -77,9 +77,6 @@ def executing_pipeline(pickle_url, mock_parameter='mock'): print('launch step two') processed_data = step_two(data_frame) - # Notice we can actually process/modify the returned values inside the pipeline logic context. - # This means the modified object will be stored on the pipeline Task. - processed_data = [processed_data[0], processed_data[1]*2, processed_data[2], processed_data[3]] print('launch step three') model = step_three(processed_data)