Add pipeline parameter to task pipeline example (#664)

This commit is contained in:
pollfly 2022-05-05 12:13:48 +03:00 committed by GitHub
parent bd6c38f8d1
commit 4ccb357de4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -4,7 +4,11 @@ from clearml.automation import PipelineController
def pre_execute_callback_example(a_pipeline, a_node, current_param_override):
# type (PipelineController, PipelineController.Node, dict) -> bool
print('Cloning Task id={} with parameters: {}'.format(a_node.base_task_id, current_param_override))
print(
"Cloning Task id={} with parameters: {}".format(
a_node.base_task_id, current_param_override
)
)
# if we want to skip this node (and subtree of this node) we return False
# return True to continue DAG execution
return True
@ -12,7 +16,7 @@ def pre_execute_callback_example(a_pipeline, a_node, current_param_override):
def post_execute_callback_example(a_pipeline, a_node):
# type (PipelineController, PipelineController.Node) -> None
print('Completed Task id={}'.format(a_node.executed))
print("Completed Task id={}".format(a_node.executed))
# if we need the actual executed Task: Task.get_task(task_id=a_node.executed)
return
@ -20,25 +24,43 @@ def post_execute_callback_example(a_pipeline, a_node):
# Connecting ClearML with the current pipeline,
# from here on everything is logged automatically
pipe = PipelineController(
name='Pipeline demo',
project='examples',
version='0.0.1',
add_pipeline_tags=False,
name="Pipeline demo", project="examples", version="0.0.1", add_pipeline_tags=False
)
pipe.set_default_execution_queue('default')
pipe.add_parameter(
"url",
"https://files.community.clear.ml/examples%252F.pipelines%252FPipeline%20demo/stage_data.8f17b6316ce442ce8904f6fccb1763de/artifacts/dataset/f6d08388e9bc44c86cab497ad31403c4.iris_dataset.pkl",
"dataset_url",
)
pipe.add_step(name='stage_data', base_task_project='examples', base_task_name='Pipeline step 1 dataset artifact')
pipe.add_step(name='stage_process', parents=['stage_data', ],
base_task_project='examples', base_task_name='Pipeline step 2 process dataset',
parameter_override={'General/dataset_url': '${stage_data.artifacts.dataset.url}',
'General/test_size': 0.25},
pipe.set_default_execution_queue("default")
pipe.add_step(
name="stage_data",
base_task_project="examples",
base_task_name="Pipeline step 1 dataset artifact",
parameter_override={"General/dataset_url": "${pipeline.url}"},
)
pipe.add_step(
name="stage_process",
parents=["stage_data"],
base_task_project="examples",
base_task_name="Pipeline step 2 process dataset",
parameter_override={
"General/dataset_url": "${stage_data.artifacts.dataset.url}",
"General/test_size": 0.25,
},
pre_execute_callback=pre_execute_callback_example,
post_execute_callback=post_execute_callback_example
post_execute_callback=post_execute_callback_example,
)
pipe.add_step(
name="stage_train",
parents=["stage_process"],
base_task_project="examples",
base_task_name="Pipeline step 3 train model",
parameter_override={"General/dataset_task_id": "${stage_process.id}"},
)
pipe.add_step(name='stage_train', parents=['stage_process', ],
base_task_project='examples', base_task_name='Pipeline step 3 train model',
parameter_override={'General/dataset_task_id': '${stage_process.id}'})
# for debugging purposes use local jobs
# pipe.start_locally()
@ -46,4 +68,4 @@ pipe.add_step(name='stage_train', parents=['stage_process', ],
# Starting the pipeline (in the background)
pipe.start()
print('done')
print("done")