Add hyperparameter sections support to pipeline decorator ()

Fix pipeline add_function_step breaks in remote execution
This commit is contained in:
allegroai 2022-06-06 14:03:34 +03:00
parent cb36da3ded
commit 2f42fc4830

View File

@ -53,6 +53,7 @@ class PipelineController(object):
_task_project_lookup = {}
_clearml_job_class = ClearmlJob
_update_execution_plot_interval = 5.*60
_update_progress_interval = 10.
_monitor_node_interval = 5.*60
_report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow')
_report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details')
@ -152,6 +153,7 @@ class PipelineController(object):
self._pipeline_args = dict()
self._pipeline_args_desc = dict()
self._pipeline_args_type = dict()
self._args_map = dict()
self._stop_event = None
self._experiment_created_cb = None
self._experiment_completed_cb = None
@ -166,6 +168,7 @@ class PipelineController(object):
self._auto_version_bump = bool(auto_version_bump)
self._mock_execution = False # used for nested pipelines (eager execution)
self._pipeline_as_sub_project = bool(Session.check_min_api_server_version("2.17"))
self._last_progress_update_time = 0
if not self._task:
task_name = name or project or '{}'.format(datetime.now())
if self._pipeline_as_sub_project:
@ -492,7 +495,7 @@ class PipelineController(object):
create_task_from_function(
mock_func,
function_input_artifacts={'matrix_np': 'aabb1122.previous_matrix'},
function_kwargs={'matrix_np': 'aabb1122.previous_matrix'},
function_return=['square_matrix']
)
@ -504,7 +507,7 @@ class PipelineController(object):
Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`:
{'numpy_matrix': 'aabbcc.answer'}
:param function_return: Provide a list of names for all the results.
If not provided no results will be stored as artifacts.
If not provided, no results will be stored as artifacts.
:param project_name: Set the project name for the task. Required if base_task_id is None.
:param task_name: Set the name of the remote task, if not provided use `name` argument.
:param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
@ -617,8 +620,10 @@ class PipelineController(object):
if step in self._nodes and artifact in self._nodes[step].return_artifacts:
function_input_artifacts[k] = "${{{}.id}}.{}".format(step, artifact)
continue
# verify the reference
self.__verify_step_reference(node=self.Node(name=name), step_ref_string=v)
# verify the reference only if we are running locally (on remote when we have multiple
# steps from tasks the _nodes is till empty, only after deserializing we will have the full DAG)
if self._task.running_locally():
self.__verify_step_reference(node=self.Node(name=name), step_ref_string=v)
function_kwargs = {k: v for k, v in function_kwargs.items() if k not in function_input_artifacts}
parameters = {"{}/{}".format(CreateFromFunction.kwargs_section, k): v for k, v in function_kwargs.items()}
@ -688,9 +693,6 @@ class PipelineController(object):
job_code_section=job_code_section,
)
if self._task and not self._task.running_locally() and not self._mock_execution:
self.update_execution_plot()
return True
def start(
@ -899,7 +901,7 @@ class PipelineController(object):
:param Any preview: The artifact preview
:param bool wait_on_upload: Whether or not the upload should be synchronous, forcing the upload to complete
:param bool wait_on_upload: Whether the upload should be synchronous, forcing the upload to complete
before continuing.
:return: The status of the upload.
@ -1225,10 +1227,17 @@ class PipelineController(object):
self._task._set_configuration(
name=self._config_section, config_type='dictionary',
config_text=json.dumps(pipeline_dag, indent=2))
args_map_inversed = {}
for section, arg_list in self._args_map.items():
for arg in arg_list:
args_map_inversed[arg] = section
pipeline_args = flatten_dictionary(self._pipeline_args)
# noinspection PyProtectedMember
self._task._set_parameters(
{'{}/{}'.format(self._args_section, k): v for k, v in pipeline_args.items()},
{
"{}/{}".format(args_map_inversed.get(k, self._args_section), k): v
for k, v in pipeline_args.items()
},
__parameters_descriptions=self._pipeline_args_desc,
__parameters_types=self._pipeline_args_type,
__update=True,
@ -1249,7 +1258,19 @@ class PipelineController(object):
})
else:
self._task.connect_configuration(pipeline_dag, name=self._config_section)
self._task.connect(self._pipeline_args, name=self._args_section)
connected_args = set()
new_pipeline_args = {}
for section, arg_list in self._args_map.items():
mutable_dict = {arg: self._pipeline_args.get(arg) for arg in arg_list}
self._task.connect(mutable_dict, name=section)
new_pipeline_args.update(mutable_dict)
connected_args.update(arg_list)
mutable_dict = {k: v for k, v in self._pipeline_args.items() if k not in connected_args}
self._task.connect(
mutable_dict, name=self._args_section
)
new_pipeline_args.update(mutable_dict)
self._pipeline_args = new_pipeline_args
self._task.connect(params, name=self._pipeline_section)
# noinspection PyProtectedMember
if self._task._get_runtime_properties().get(self._runtime_property_hash):
@ -1688,6 +1709,10 @@ class PipelineController(object):
sankey_link['target'].append(idx)
sankey_link['value'].append(1)
# if nothing changed, we give up
if nodes == next_nodes:
break
nodes = next_nodes
# make sure we have no independent (unconnected) nodes
@ -1879,6 +1904,19 @@ class PipelineController(object):
description="pipeline state: {}".format(hash_dict(pipeline_dag)),
config_text=json.dumps(pipeline_dag, indent=2))
def _update_progress(self):
# type: () -> ()
"""
Update progress of the pipeline every PipelineController._update_progress_interval seconds.
Progress is calculated as the mean of the progress of each step in the pipeline.
"""
if time() - self._last_progress_update_time < self._update_progress_interval:
return
job_progress = [(node.job.task.get_progress() or 0) if node.job else 0 for node in self._nodes.values()]
if len(job_progress):
self._task.set_progress(int(sum(job_progress) / len(job_progress)))
self._last_progress_update_time = time()
def _daemon(self):
# type: () -> ()
"""
@ -1900,6 +1938,7 @@ class PipelineController(object):
if self._pipeline_time_limit and (time() - self._start_time) > self._pipeline_time_limit:
break
self._update_progress()
# check the state of all current jobs
# if no a job ended, continue
completed_jobs = []
@ -2534,6 +2573,7 @@ class PipelineDecorator(PipelineController):
if self._pipeline_time_limit and (time() - self._start_time) > self._pipeline_time_limit:
break
self._update_progress()
# check the state of all current jobs
# if no a job ended, continue
completed_jobs = []
@ -3088,7 +3128,8 @@ class PipelineDecorator(PipelineController):
abort_on_failure=False, # type: bool
pipeline_execution_queue='services', # type: Optional[str]
multi_instance_support=False, # type: bool
add_run_number=True # type: bool
add_run_number=True, # type: bool
args_map=None # type: dict[str, List[str]]
):
# type: (...) -> Callable
"""
@ -3122,6 +3163,19 @@ class PipelineDecorator(PipelineController):
Default False, no multi instance pipeline support.
:param add_run_number: If True (default), add the run number of the pipeline to the pipeline name.
Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2"
:param args_map: Map arguments to their specific configuration section. Arguments not included in this map
will default to `Args` section. For example, for the following code:
.. code-block:: python
@PipelineDecorator.pipeline(args_map={'sectionA':['paramA'], 'sectionB:['paramB','paramC']
def executing_pipeline(paramA, paramB, paramC, paramD):
pass
Parameters would be stored as:
- paramA: sectionA/paramA
- paramB: sectionB/paramB
- paramC: sectionB/paramC
- paramD: Args/paramD
"""
def decorator_wrap(func):
@ -3201,6 +3255,8 @@ class PipelineDecorator(PipelineController):
add_run_number=add_run_number,
)
a_pipeline._args_map = args_map or {}
if PipelineDecorator._debug_execute_step_process:
a_pipeline._clearml_job_class = LocalClearmlJob
a_pipeline._default_execution_queue = 'mock'