diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index d9cf0c71..c6037e42 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -53,6 +53,7 @@ class PipelineController(object): _task_project_lookup = {} _clearml_job_class = ClearmlJob _update_execution_plot_interval = 5.*60 + _update_progress_interval = 10. _monitor_node_interval = 5.*60 _report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow') _report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details') @@ -152,6 +153,7 @@ class PipelineController(object): self._pipeline_args = dict() self._pipeline_args_desc = dict() self._pipeline_args_type = dict() + self._args_map = dict() self._stop_event = None self._experiment_created_cb = None self._experiment_completed_cb = None @@ -166,6 +168,7 @@ class PipelineController(object): self._auto_version_bump = bool(auto_version_bump) self._mock_execution = False # used for nested pipelines (eager execution) self._pipeline_as_sub_project = bool(Session.check_min_api_server_version("2.17")) + self._last_progress_update_time = 0 if not self._task: task_name = name or project or '{}'.format(datetime.now()) if self._pipeline_as_sub_project: @@ -492,7 +495,7 @@ class PipelineController(object): create_task_from_function( mock_func, - function_input_artifacts={'matrix_np': 'aabb1122.previous_matrix'}, + function_kwargs={'matrix_np': 'aabb1122.previous_matrix'}, function_return=['square_matrix'] ) @@ -504,7 +507,7 @@ class PipelineController(object): Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`: {'numpy_matrix': 'aabbcc.answer'} :param function_return: Provide a list of names for all the results. - If not provided no results will be stored as artifacts. + If not provided, no results will be stored as artifacts. :param project_name: Set the project name for the task. Required if base_task_id is None. :param task_name: Set the name of the remote task, if not provided use `name` argument. :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', @@ -617,8 +620,10 @@ class PipelineController(object): if step in self._nodes and artifact in self._nodes[step].return_artifacts: function_input_artifacts[k] = "${{{}.id}}.{}".format(step, artifact) continue - # verify the reference - self.__verify_step_reference(node=self.Node(name=name), step_ref_string=v) + # verify the reference only if we are running locally (on remote when we have multiple + # steps from tasks the _nodes is till empty, only after deserializing we will have the full DAG) + if self._task.running_locally(): + self.__verify_step_reference(node=self.Node(name=name), step_ref_string=v) function_kwargs = {k: v for k, v in function_kwargs.items() if k not in function_input_artifacts} parameters = {"{}/{}".format(CreateFromFunction.kwargs_section, k): v for k, v in function_kwargs.items()} @@ -688,9 +693,6 @@ class PipelineController(object): job_code_section=job_code_section, ) - if self._task and not self._task.running_locally() and not self._mock_execution: - self.update_execution_plot() - return True def start( @@ -899,7 +901,7 @@ class PipelineController(object): :param Any preview: The artifact preview - :param bool wait_on_upload: Whether or not the upload should be synchronous, forcing the upload to complete + :param bool wait_on_upload: Whether the upload should be synchronous, forcing the upload to complete before continuing. :return: The status of the upload. @@ -1225,10 +1227,17 @@ class PipelineController(object): self._task._set_configuration( name=self._config_section, config_type='dictionary', config_text=json.dumps(pipeline_dag, indent=2)) + args_map_inversed = {} + for section, arg_list in self._args_map.items(): + for arg in arg_list: + args_map_inversed[arg] = section pipeline_args = flatten_dictionary(self._pipeline_args) # noinspection PyProtectedMember self._task._set_parameters( - {'{}/{}'.format(self._args_section, k): v for k, v in pipeline_args.items()}, + { + "{}/{}".format(args_map_inversed.get(k, self._args_section), k): v + for k, v in pipeline_args.items() + }, __parameters_descriptions=self._pipeline_args_desc, __parameters_types=self._pipeline_args_type, __update=True, @@ -1249,7 +1258,19 @@ class PipelineController(object): }) else: self._task.connect_configuration(pipeline_dag, name=self._config_section) - self._task.connect(self._pipeline_args, name=self._args_section) + connected_args = set() + new_pipeline_args = {} + for section, arg_list in self._args_map.items(): + mutable_dict = {arg: self._pipeline_args.get(arg) for arg in arg_list} + self._task.connect(mutable_dict, name=section) + new_pipeline_args.update(mutable_dict) + connected_args.update(arg_list) + mutable_dict = {k: v for k, v in self._pipeline_args.items() if k not in connected_args} + self._task.connect( + mutable_dict, name=self._args_section + ) + new_pipeline_args.update(mutable_dict) + self._pipeline_args = new_pipeline_args self._task.connect(params, name=self._pipeline_section) # noinspection PyProtectedMember if self._task._get_runtime_properties().get(self._runtime_property_hash): @@ -1688,6 +1709,10 @@ class PipelineController(object): sankey_link['target'].append(idx) sankey_link['value'].append(1) + # if nothing changed, we give up + if nodes == next_nodes: + break + nodes = next_nodes # make sure we have no independent (unconnected) nodes @@ -1879,6 +1904,19 @@ class PipelineController(object): description="pipeline state: {}".format(hash_dict(pipeline_dag)), config_text=json.dumps(pipeline_dag, indent=2)) + def _update_progress(self): + # type: () -> () + """ + Update progress of the pipeline every PipelineController._update_progress_interval seconds. + Progress is calculated as the mean of the progress of each step in the pipeline. + """ + if time() - self._last_progress_update_time < self._update_progress_interval: + return + job_progress = [(node.job.task.get_progress() or 0) if node.job else 0 for node in self._nodes.values()] + if len(job_progress): + self._task.set_progress(int(sum(job_progress) / len(job_progress))) + self._last_progress_update_time = time() + def _daemon(self): # type: () -> () """ @@ -1900,6 +1938,7 @@ class PipelineController(object): if self._pipeline_time_limit and (time() - self._start_time) > self._pipeline_time_limit: break + self._update_progress() # check the state of all current jobs # if no a job ended, continue completed_jobs = [] @@ -2534,6 +2573,7 @@ class PipelineDecorator(PipelineController): if self._pipeline_time_limit and (time() - self._start_time) > self._pipeline_time_limit: break + self._update_progress() # check the state of all current jobs # if no a job ended, continue completed_jobs = [] @@ -3088,7 +3128,8 @@ class PipelineDecorator(PipelineController): abort_on_failure=False, # type: bool pipeline_execution_queue='services', # type: Optional[str] multi_instance_support=False, # type: bool - add_run_number=True # type: bool + add_run_number=True, # type: bool + args_map=None # type: dict[str, List[str]] ): # type: (...) -> Callable """ @@ -3122,6 +3163,19 @@ class PipelineDecorator(PipelineController): Default False, no multi instance pipeline support. :param add_run_number: If True (default), add the run number of the pipeline to the pipeline name. Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2" + :param args_map: Map arguments to their specific configuration section. Arguments not included in this map + will default to `Args` section. For example, for the following code: + + .. code-block:: python + @PipelineDecorator.pipeline(args_map={'sectionA':['paramA'], 'sectionB:['paramB','paramC'] + def executing_pipeline(paramA, paramB, paramC, paramD): + pass + + Parameters would be stored as: + - paramA: sectionA/paramA + - paramB: sectionB/paramB + - paramC: sectionB/paramC + - paramD: Args/paramD """ def decorator_wrap(func): @@ -3201,6 +3255,8 @@ class PipelineDecorator(PipelineController): add_run_number=add_run_number, ) + a_pipeline._args_map = args_map or {} + if PipelineDecorator._debug_execute_step_process: a_pipeline._clearml_job_class = LocalClearmlJob a_pipeline._default_execution_queue = 'mock'