diff --git a/examples/pipeline/pipeline_from_tasks.py b/examples/pipeline/pipeline_from_tasks.py index df55f061..bf1f590b 100644 --- a/examples/pipeline/pipeline_from_tasks.py +++ b/examples/pipeline/pipeline_from_tasks.py @@ -4,7 +4,11 @@ from clearml.automation import PipelineController def pre_execute_callback_example(a_pipeline, a_node, current_param_override): # type (PipelineController, PipelineController.Node, dict) -> bool - print('Cloning Task id={} with parameters: {}'.format(a_node.base_task_id, current_param_override)) + print( + "Cloning Task id={} with parameters: {}".format( + a_node.base_task_id, current_param_override + ) + ) # if we want to skip this node (and subtree of this node) we return False # return True to continue DAG execution return True @@ -12,7 +16,7 @@ def pre_execute_callback_example(a_pipeline, a_node, current_param_override): def post_execute_callback_example(a_pipeline, a_node): # type (PipelineController, PipelineController.Node) -> None - print('Completed Task id={}'.format(a_node.executed)) + print("Completed Task id={}".format(a_node.executed)) # if we need the actual executed Task: Task.get_task(task_id=a_node.executed) return @@ -20,25 +24,43 @@ def post_execute_callback_example(a_pipeline, a_node): # Connecting ClearML with the current pipeline, # from here on everything is logged automatically pipe = PipelineController( - name='Pipeline demo', - project='examples', - version='0.0.1', - add_pipeline_tags=False, + name="Pipeline demo", project="examples", version="0.0.1", add_pipeline_tags=False ) -pipe.set_default_execution_queue('default') +pipe.add_parameter( + "url", + "https://files.community.clear.ml/examples%252F.pipelines%252FPipeline%20demo/stage_data.8f17b6316ce442ce8904f6fccb1763de/artifacts/dataset/f6d08388e9bc44c86cab497ad31403c4.iris_dataset.pkl", + "dataset_url", +) -pipe.add_step(name='stage_data', base_task_project='examples', base_task_name='Pipeline step 1 dataset artifact') -pipe.add_step(name='stage_process', parents=['stage_data', ], - base_task_project='examples', base_task_name='Pipeline step 2 process dataset', - parameter_override={'General/dataset_url': '${stage_data.artifacts.dataset.url}', - 'General/test_size': 0.25}, - pre_execute_callback=pre_execute_callback_example, - post_execute_callback=post_execute_callback_example - ) -pipe.add_step(name='stage_train', parents=['stage_process', ], - base_task_project='examples', base_task_name='Pipeline step 3 train model', - parameter_override={'General/dataset_task_id': '${stage_process.id}'}) +pipe.set_default_execution_queue("default") + +pipe.add_step( + name="stage_data", + base_task_project="examples", + base_task_name="Pipeline step 1 dataset artifact", + parameter_override={"General/dataset_url": "${pipeline.url}"}, +) + +pipe.add_step( + name="stage_process", + parents=["stage_data"], + base_task_project="examples", + base_task_name="Pipeline step 2 process dataset", + parameter_override={ + "General/dataset_url": "${stage_data.artifacts.dataset.url}", + "General/test_size": 0.25, + }, + pre_execute_callback=pre_execute_callback_example, + post_execute_callback=post_execute_callback_example, +) +pipe.add_step( + name="stage_train", + parents=["stage_process"], + base_task_project="examples", + base_task_name="Pipeline step 3 train model", + parameter_override={"General/dataset_task_id": "${stage_process.id}"}, +) # for debugging purposes use local jobs # pipe.start_locally() @@ -46,4 +68,4 @@ pipe.add_step(name='stage_train', parents=['stage_process', ], # Starting the pipeline (in the background) pipe.start() -print('done') +print("done")