Add pre and post-callbacks to the Pipeline decorator

This commit is contained in:
allegroai 2023-02-07 17:32:31 +02:00
parent a48f1c4b58
commit b1f17db657

View File

@ -3318,7 +3318,9 @@ class PipelineDecorator(PipelineController):
monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
retry_on_failure=None # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
post_execute_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
):
# type: (...) -> Callable
"""
@ -3409,6 +3411,38 @@ class PipelineDecorator(PipelineController):
# allow up to 5 retries (total of 6 runs)
return retries < 5
:param pre_execute_callback: Callback function, called when the step (Task) is created
and before it is sent for execution. Allows a user to modify the Task before launch.
Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object.
`parameters` are the configuration arguments passed to the ClearmlJob.
If the callback returned value is `False`,
the Node is skipped and so is any node in the DAG that relies on this node.
Notice the `parameters` are already parsed,
e.g. `${step1.parameters.Args/param}` is replaced with relevant value.
.. code-block:: py
def step_created_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
parameters, # type: dict
):
pass
:param post_execute_callback: Callback function, called when a step (Task) is completed
and it other jobs are executed. Allows a user to modify the Task status after completion.
.. code-block:: py
def step_completed_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
):
pass
:return: function wrapper
"""
def decorator_wrap(func):
@ -3447,6 +3481,8 @@ class PipelineDecorator(PipelineController):
monitor_metrics=monitor_metrics,
monitor_models=monitor_models,
monitor_artifacts=monitor_artifacts,
pre_execute_callback=pre_execute_callback,
post_execute_callback=post_execute_callback
)
if cls._singleton: