From b1f17db65712dc4c8ac70b95cee9e9d7fd1c4eff Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 7 Feb 2023 17:32:31 +0200 Subject: [PATCH] Add pre and post-callbacks to the Pipeline decorator --- clearml/automation/controller.py | 38 +++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 6d8e4f68..01beccbf 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -3318,7 +3318,9 @@ class PipelineDecorator(PipelineController): monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]] - retry_on_failure=None # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa + retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa + pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa + post_execute_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa ): # type: (...) -> Callable """ @@ -3409,6 +3411,38 @@ class PipelineDecorator(PipelineController): # allow up to 5 retries (total of 6 runs) return retries < 5 + :param pre_execute_callback: Callback function, called when the step (Task) is created + and before it is sent for execution. Allows a user to modify the Task before launch. + Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object. + `parameters` are the configuration arguments passed to the ClearmlJob. + + If the callback returned value is `False`, + the Node is skipped and so is any node in the DAG that relies on this node. + + Notice the `parameters` are already parsed, + e.g. `${step1.parameters.Args/param}` is replaced with relevant value. + + .. code-block:: py + + def step_created_callback( + pipeline, # type: PipelineController, + node, # type: PipelineController.Node, + parameters, # type: dict + ): + pass + + :param post_execute_callback: Callback function, called when a step (Task) is completed + and it other jobs are executed. Allows a user to modify the Task status after completion. + + .. code-block:: py + + def step_completed_callback( + pipeline, # type: PipelineController, + node, # type: PipelineController.Node, + ): + pass + + :return: function wrapper """ def decorator_wrap(func): @@ -3447,6 +3481,8 @@ class PipelineDecorator(PipelineController): monitor_metrics=monitor_metrics, monitor_models=monitor_models, monitor_artifacts=monitor_artifacts, + pre_execute_callback=pre_execute_callback, + post_execute_callback=post_execute_callback ) if cls._singleton: