From 1d0be8c8e183d74fa7a84659f72c50b1107e1bc8 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Wed, 25 Aug 2021 16:30:52 +0300 Subject: [PATCH] Add pipeline controller callback example --- examples/pipeline/pipeline_controller.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/examples/pipeline/pipeline_controller.py b/examples/pipeline/pipeline_controller.py index 47883b91..d2839315 100644 --- a/examples/pipeline/pipeline_controller.py +++ b/examples/pipeline/pipeline_controller.py @@ -2,6 +2,21 @@ from clearml import Task from clearml.automation.controller import PipelineController +def pre_execute_callback_example(a_pipeline, a_node, current_param_override): + # type (PipelineController, PipelineController.Node, dict) -> bool + print('Cloning Task id={} with parameters: {}'.format(a_node.base_task_id, current_param_override)) + # if we want to skip this node (and subtree of this node) we return False + # return True to continue DAG execution + return True + + +def post_execute_callback_example(a_pipeline, a_node): + # type (PipelineController, PipelineController.Node) -> None + print('Completed Task id={}'.format(a_node.executed)) + # if we need the actual Task to change Task.get_task(task_id=a_node.executed) + return + + # Connecting ClearML with the current process, # from here on everything is logged automatically task = Task.init(project_name='examples', task_name='pipeline demo', @@ -12,7 +27,10 @@ pipe.add_step(name='stage_data', base_task_project='examples', base_task_name='p pipe.add_step(name='stage_process', parents=['stage_data', ], base_task_project='examples', base_task_name='pipeline step 2 process dataset', parameter_override={'General/dataset_url': '${stage_data.artifacts.dataset.url}', - 'General/test_size': 0.25}) + 'General/test_size': 0.25}, + pre_execute_callback=pre_execute_callback_example, + post_execute_callback=post_execute_callback_example + ) pipe.add_step(name='stage_train', parents=['stage_process', ], base_task_project='examples', base_task_name='pipeline step 3 train model', parameter_override={'General/dataset_task_id': '${stage_process.id}'})