From d3e90f31e45036001d297c8aa42a4e18780e691b Mon Sep 17 00:00:00 2001 From: pollfly <75068813+pollfly@users.noreply.github.com> Date: Wed, 27 Oct 2021 14:26:01 +0300 Subject: [PATCH] Update pipeline example (#99) --- docs/guides/pipeline/pipeline_controller.md | 63 ++++++++++++++------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/docs/guides/pipeline/pipeline_controller.md b/docs/guides/pipeline/pipeline_controller.md index d350c5c2..839c7d70 100644 --- a/docs/guides/pipeline/pipeline_controller.md +++ b/docs/guides/pipeline/pipeline_controller.md @@ -9,12 +9,13 @@ This pipeline is composed of three steps: 1. Process data 3. Train a network. -It is implemented using the [automation.controller.PipelineController](../../references/sdk/automation_controller_pipelinecontroller.md) +It is implemented using the [PipelineController](../../references/sdk/automation_controller_pipelinecontroller.md) class. This class includes functionality to: * Create a pipeline controller * Add steps to the pipeline * Pass data from one step to another * Control the dependencies of a step beginning only after other steps complete +* Add callback functions to be executed pre- and post-step execution * Run the pipeline * Wait for the pipeline to complete * Cleanup after pipeline completes execution @@ -29,7 +30,7 @@ This example implements the pipeline with four Tasks (each Task is created using * **Step 3 Task** ([step3_train_model.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/step3_train_model.py)) - Loads the processed data (from Step 2) and trains a network. -When the pipeline runs, the Step 1, Step 2, and Step 3 Tasks are cloned, and the newly cloned Tasks execute. The Tasks +When the pipeline runs, the Step 1, Step 2, and Step 3 Tasks are cloned, and the newly cloned Tasks are executed. The Tasks they are cloned from, called the base Tasks, do not execute. This way, the pipeline can run multiple times. These base Tasks must have already run at least once for them to be in **ClearML Server** and to be cloned. The controller Task itself can be run from a development environment (by running the script), or cloned, and the cloned Task executed remotely (if the @@ -42,10 +43,20 @@ The sections below describe in more detail what happens in the controller Task a 1. Create the pipeline controller object. ```python - pipe = PipelineController(default_execution_queue='default', add_pipeline_tags=False) + pipe = PipelineController( + name='pipeline demo', + project='examples', + version='0.0.1', + add_pipeline_tags=False, + ) ``` + * `name` - Name the pipeline controller task + * `project` - Project where pipeline controller and tasks will be stored + * `version` - Provide a pipeline version. If `auto_version_bump` is set to `True`, then the version number will be + automatically bumped if the same version already exists. + * `add_pipeline_tags` - If `True`, then all pipeline steps are tagged with `pipe: ` -1. Add Step 1. Call the [automation.controller.PipelineController.add_step](../../references/sdk/automation_controller_pipelinecontroller.md#add_step) +1. Add Step 1. Call the [PipelineController.add_step](../../references/sdk/automation_controller_pipelinecontroller.md#add_step) method. ```python @@ -57,11 +68,19 @@ The sections below describe in more detail what happens in the controller Task a 1. Add Step 2. - ```python - pipe.add_step(name='stage_process', parents=['stage_data', ], - base_task_project='examples', base_task_name='pipeline step 2 process dataset', - parameter_override={'General/dataset_url': '${stage_data.artifacts.dataset.url}', - 'General/test_size': 0.25}) + ```python + pipe.add_step( + name='stage_process', + parents=['stage_data', ], + base_task_project='examples', + base_task_name='pipeline step 2 process dataset', + parameter_override={ + 'General/dataset_url': '${stage_data.artifacts.dataset.url}', + 'General/test_size': 0.25 + }, + pre_execute_callback=pre_execute_callback_example, + post_execute_callback=post_execute_callback_example + ) ``` @@ -71,31 +90,33 @@ The sections below describe in more detail what happens in the controller Task a * `parameter_override` - Pass the URL of the data artifact from Step 1 to Step 2. Override the value of the parameter whose key is `dataset_url` (in the parameter group named `General`). Override it with the URL of the artifact named `dataset`. Also override the test size. - :::important - The syntax of the ``parameter_override`` value. - For other examples of ``parameter_override`` syntax, see the [automation.controller.PipelineController.add_step](../../references/sdk/automation_controller_pipelinecontroller.md#add_step). + :::important Syntax of the parameter_override Value + For other examples of ``parameter_override`` syntax, see [PipelineController.add_step](../../references/sdk/automation_controller_pipelinecontroller.md#add_step). ::: - + + * `pre_execute_callback` - The pipeline controller will execute the input callback function before the pipeline step is + executed. If the callback function returns `False`, the pipeline step will be skipped. + * `post_execute_callback` - The pipeline controller will execute the input callback function after the pipeline step is + executed 1. Add Step 3. ```python - pipe.add_step(name='stage_train', parents=['stage_process', ], - base_task_project='examples', base_task_name='pipeline step 3 train model', - parameter_override={'General/dataset_task_id': '${stage_process.id}'}) + pipe.add_step( + name='stage_train', + parents=['stage_process', ], + base_task_project='examples', + base_task_name='pipeline step 3 train model', + parameter_override={'General/dataset_task_id': '${stage_process.id}'}) ``` * `name` - The name of Step 3 (`stage_train`). * `parents` - The start of Step 3 (`stage_train`) depends upon the completion of Step 2 (`stage_process`). * `parameter_override` - Pass the ID of the Step 2 Task to the Step 3 Task. This is the ID of the cloned Task, not the base Task. -1. Run the pipeline, wait for it to complete, and cleanup. +1. Run the pipeline. ```python # Starting the pipeline (in the background) pipe.start() - # Wait until pipeline terminates - pipe.wait() - # cleanup everything - pipe.stop() ``` ## Step 1 - Downloading the Data