Update pipeline example (#99)

This commit is contained in:
pollfly 2021-10-27 14:26:01 +03:00 committed by GitHub
parent c53b393726
commit d3e90f31e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -9,12 +9,13 @@ This pipeline is composed of three steps:
1. Process data
3. Train a network.
It is implemented using the [automation.controller.PipelineController](../../references/sdk/automation_controller_pipelinecontroller.md)
It is implemented using the [PipelineController](../../references/sdk/automation_controller_pipelinecontroller.md)
class. This class includes functionality to:
* Create a pipeline controller
* Add steps to the pipeline
* Pass data from one step to another
* Control the dependencies of a step beginning only after other steps complete
* Add callback functions to be executed pre- and post-step execution
* Run the pipeline
* Wait for the pipeline to complete
* Cleanup after pipeline completes execution
@ -29,7 +30,7 @@ This example implements the pipeline with four Tasks (each Task is created using
* **Step 3 Task** ([step3_train_model.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/step3_train_model.py)) -
Loads the processed data (from Step 2) and trains a network.
When the pipeline runs, the Step 1, Step 2, and Step 3 Tasks are cloned, and the newly cloned Tasks execute. The Tasks
When the pipeline runs, the Step 1, Step 2, and Step 3 Tasks are cloned, and the newly cloned Tasks are executed. The Tasks
they are cloned from, called the base Tasks, do not execute. This way, the pipeline can run multiple times. These
base Tasks must have already run at least once for them to be in **ClearML Server** and to be cloned. The controller Task
itself can be run from a development environment (by running the script), or cloned, and the cloned Task executed remotely (if the
@ -42,10 +43,20 @@ The sections below describe in more detail what happens in the controller Task a
1. Create the pipeline controller object.
```python
pipe = PipelineController(default_execution_queue='default', add_pipeline_tags=False)
pipe = PipelineController(
name='pipeline demo',
project='examples',
version='0.0.1',
add_pipeline_tags=False,
)
```
* `name` - Name the pipeline controller task
* `project` - Project where pipeline controller and tasks will be stored
* `version` - Provide a pipeline version. If `auto_version_bump` is set to `True`, then the version number will be
automatically bumped if the same version already exists.
* `add_pipeline_tags` - If `True`, then all pipeline steps are tagged with `pipe: <pipeline_task_id>`
1. Add Step 1. Call the [automation.controller.PipelineController.add_step](../../references/sdk/automation_controller_pipelinecontroller.md#add_step)
1. Add Step 1. Call the [PipelineController.add_step](../../references/sdk/automation_controller_pipelinecontroller.md#add_step)
method.
```python
@ -58,10 +69,18 @@ The sections below describe in more detail what happens in the controller Task a
1. Add Step 2.
```python
pipe.add_step(name='stage_process', parents=['stage_data', ],
base_task_project='examples', base_task_name='pipeline step 2 process dataset',
parameter_override={'General/dataset_url': '${stage_data.artifacts.dataset.url}',
'General/test_size': 0.25})
pipe.add_step(
name='stage_process',
parents=['stage_data', ],
base_task_project='examples',
base_task_name='pipeline step 2 process dataset',
parameter_override={
'General/dataset_url': '${stage_data.artifacts.dataset.url}',
'General/test_size': 0.25
},
pre_execute_callback=pre_execute_callback_example,
post_execute_callback=post_execute_callback_example
)
```
@ -71,16 +90,22 @@ The sections below describe in more detail what happens in the controller Task a
* `parameter_override` - Pass the URL of the data artifact from Step 1 to Step 2. Override the value of the parameter
whose key is `dataset_url` (in the parameter group named `General`). Override it with the URL of the artifact named `dataset`. Also override the test size.
:::important
The syntax of the ``parameter_override`` value.
For other examples of ``parameter_override`` syntax, see the [automation.controller.PipelineController.add_step](../../references/sdk/automation_controller_pipelinecontroller.md#add_step).
:::important Syntax of the parameter_override Value
For other examples of ``parameter_override`` syntax, see [PipelineController.add_step](../../references/sdk/automation_controller_pipelinecontroller.md#add_step).
:::
* `pre_execute_callback` - The pipeline controller will execute the input callback function before the pipeline step is
executed. If the callback function returns `False`, the pipeline step will be skipped.
* `post_execute_callback` - The pipeline controller will execute the input callback function after the pipeline step is
executed
1. Add Step 3.
```python
pipe.add_step(name='stage_train', parents=['stage_process', ],
base_task_project='examples', base_task_name='pipeline step 3 train model',
pipe.add_step(
name='stage_train',
parents=['stage_process', ],
base_task_project='examples',
base_task_name='pipeline step 3 train model',
parameter_override={'General/dataset_task_id': '${stage_process.id}'})
```
@ -88,14 +113,10 @@ The sections below describe in more detail what happens in the controller Task a
* `parents` - The start of Step 3 (`stage_train`) depends upon the completion of Step 2 (`stage_process`).
* `parameter_override` - Pass the ID of the Step 2 Task to the Step 3 Task. This is the ID of the cloned Task, not the base Task.
1. Run the pipeline, wait for it to complete, and cleanup.
1. Run the pipeline.
```python
# Starting the pipeline (in the background)
pipe.start()
# Wait until pipeline terminates
pipe.wait()
# cleanup everything
pipe.stop()
```
## Step 1 - Downloading the Data