Add pipeline v2 examples (#137)

This commit is contained in:
pollfly 2021-12-23 13:54:02 +02:00 committed by GitHub
parent 59e52bbeeb
commit e1a8df7388
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 284 additions and 53 deletions

View File

@ -1,46 +1,36 @@
---
title: Simple Pipeline - Serialized Data
title: Pipeline from Tasks
---
The [pipeline_from_tasks.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_tasks.py)
example demonstrates a simple pipeline in **ClearML**.
This pipeline is composed of three steps:
1. Download data
1. Process data
3. Train a network.
It is implemented using the [PipelineController](../../references/sdk/automation_controller_pipelinecontroller.md)
class. This class includes functionality to:
* Create a pipeline controller
* Add steps to the pipeline
* Pass data from one step to another
* Control the dependencies of a step beginning only after other steps complete
* Add callback functions to be executed pre- and post-step execution
* Run the pipeline
* Wait for the pipeline to complete
* Cleanup after pipeline completes execution
example demonstrates a simple pipeline, where each step is a [ClearML Task](../../fundamentals/task.md).
The pipeline is implemented using the [PipelineController](../../references/sdk/automation_controller_pipelinecontroller.md)
class. Steps are added to a PipelineController object, which launches and monitors the steps when executed.
This example implements the pipeline with four Tasks (each Task is created using a different script):
This example incorporates four tasks, each of which is created using a different script:
* **Controller Task** ([pipeline_from_tasks.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_tasks.py)) -
Creates a pipeline controller, adds the steps (Tasks) to the pipeline, runs the pipeline.
* **Step 1 Task** ([step1_dataset_artifact.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/step1_dataset_artifact.py)) -
Implements the pipeline controller, adds the steps (tasks) to the pipeline, and runs the pipeline.
* **Step 1** ([step1_dataset_artifact.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/step1_dataset_artifact.py)) -
Downloads data and stores the data as an artifact.
* **Step 2 Task** ([step2_data_processing.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/step2_data_processing.py)) -
* **Step 2** ([step2_data_processing.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/step2_data_processing.py)) -
Loads the stored data (from Step 1), processes it, and stores the processed data as artifacts.
* **Step 3 Task** ([step3_train_model.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/step3_train_model.py)) -
* **Step 3** ([step3_train_model.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/step3_train_model.py)) -
Loads the processed data (from Step 2) and trains a network.
When the pipeline runs, the Step 1, Step 2, and Step 3 Tasks are cloned, and the newly cloned Tasks are executed. The Tasks
they are cloned from, called the base Tasks, do not execute. This way, the pipeline can run multiple times. These
base Tasks must have already run at least once for them to be in **ClearML Server** and to be cloned. The controller Task
itself can be run from a development environment (by running the script), or cloned, and the cloned Task executed remotely (if the
controller Task has already run at least once and is in **ClearML Server**).
When the controller task is executed, it clones the step tasks, and enqueues the newly cloned tasks for execution. Note
that the base tasks from which the steps are cloned are only used as templates and not executed themselves. Also note
that for the controller to clone, these base tasks need to exist in the system (as a result of a previous run or using
[clearml-task](../../apps/clearml_task.md)).
The sections below describe in more detail what happens in the controller Task and in each step Task.
The controller task itself can be run locally, or, if the controller task has already run at least once and is in the
ClearML Server, the controller can be cloned, and the cloned task can be executed remotely.
The sections below describe in more detail what happens in the controller task and in each step task.
## The Pipeline Controller
1. Create the pipeline controller object.
1. Create the [pipeline controller](../../references/sdk/automation_controller_pipelinecontroller.md) object.
```python
pipe = PipelineController(
@ -56,7 +46,7 @@ The sections below describe in more detail what happens in the controller Task a
automatically bumped if the same version already exists.
* `add_pipeline_tags` - If `True`, then all pipeline steps are tagged with `pipe: <pipeline_task_id>`
1. Add Step 1. Call the [PipelineController.add_step](../../references/sdk/automation_controller_pipelinecontroller.md#add_step)
1. Add Step 1 using the [PipelineController.add_step](../../references/sdk/automation_controller_pipelinecontroller.md#add_step)
method.
```python
@ -64,7 +54,7 @@ The sections below describe in more detail what happens in the controller Task a
```
* `name` - The name of Step 1 (`stage_data`).
* `base_task_project` and `base_task_name` - The Step 1 base Task to clone (the cloned Task will be executed when the pipeline runs).
* `base_task_project` and `base_task_name` - Step 1's base Task to clone (the cloned Task will be executed when the pipeline runs).
1. Add Step 2.
@ -82,13 +72,12 @@ The sections below describe in more detail what happens in the controller Task a
post_execute_callback=post_execute_callback_example
)
```
* `name` - The name of Step 2 (`stage_process`).
* `base_task_project` and `base_task_name` - The Step 2 base Task to clone.
* `parents` - The start of Step 2 (`stage_process`) depends upon the completion of Step 1 (`stage_data`).
In addition to the parameters included in Step 1, input the following:
* `parents` - The names of the steps, which the current step depends upon their completion to begin execution. In this
instance, the execution of Step 2 (`stage_process`) depends upon the completion of Step 1 (`stage_data`).
* `parameter_override` - Pass the URL of the data artifact from Step 1 to Step 2. Override the value of the parameter
whose key is `dataset_url` (in the parameter group named `General`). Override it with the URL of the artifact named `dataset`. Also override the test size.
whose key is `dataset_url` (in the parameter group named `General`). Override it with the URL of the artifact named
`dataset`. Also override the test size.
:::important Syntax of the parameter_override Value
For other examples of ``parameter_override`` syntax, see [PipelineController.add_step](../../references/sdk/automation_controller_pipelinecontroller.md#add_step).
@ -97,7 +86,8 @@ The sections below describe in more detail what happens in the controller Task a
* `pre_execute_callback` - The pipeline controller will execute the input callback function before the pipeline step is
executed. If the callback function returns `False`, the pipeline step will be skipped.
* `post_execute_callback` - The pipeline controller will execute the input callback function after the pipeline step is
executed
executed.
1. Add Step 3.
```python
@ -114,26 +104,27 @@ The sections below describe in more detail what happens in the controller Task a
* `parameter_override` - Pass the ID of the Step 2 Task to the Step 3 Task. This is the ID of the cloned Task, not the base Task.
1. Run the pipeline.
```python
# Starting the pipeline (in the background)
pipe.start()
```
```python
# Starting the pipeline (in the background)
pipe.start()
```
## Step 1 - Downloading the Data
In the Step 1 Task ([step1_dataset_artifact.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/step1_dataset_artifact.py)):
1. Clone base Task and enqueue it for execution
1. Clone base Task and enqueue it for execution using [`Task.execute_remotely`](../../references/sdk/task.md#execute_remotely).
```python
task.execute_remotely()
```
1. Download data and store it as an artifact named `dataset`. This is the same artifact name used in `parameter_override`
when the `add_step` method is called in the pipeline controller.
when the [`add_step`](../../references/sdk/automation_controller_pipelinecontroller.md#add_step) method is called in the pipeline controller.
```python
# simulate local dataset, download one, so we have something local
local_iris_pkl = StorageManager.get_local_copy(
remote_url='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl')
remote_url='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl'
)
# add and upload local file containing our toy dataset
task.upload_artifact('dataset', artifact_object=local_iris_pkl)
@ -156,9 +147,10 @@ In the Step 2 Task ([step2_data_processing.py](https://github.com/allegroai/clea
task.connect(args)
```
The parameter `dataset_url` is the same parameter name used by `parameter_override` when the `add_step` method is called in the pipeline controller.
The parameter `dataset_url` is the same parameter name used by `parameter_override` when the [`add_step`](../../references/sdk/automation_controller_pipelinecontroller.md#add_step)
method is called in the pipeline controller.
1. Clone base Task and enqueue it for execution.
1. Clone base Task and enqueue it for execution using [`Task.execute_remotely`](../../references/sdk/task.md#execute_remotely).
```python
task.execute_remotely()
@ -194,7 +186,7 @@ In the Step 3 Task ([step3_train_model.py](https://github.com/allegroai/clearml/
The parameter `dataset_task_id` is later overridden by the ID of the Step 2 Task (cloned Task, not base Task).
1. Clone the Step 3 base Task and enqueue it.
1. Clone the Step 3 base Task and enqueue it using [`Task.execute_remotely`](../../references/sdk/task.md#execute_remotely).
```python
task.execute_remotely()
@ -210,7 +202,7 @@ In the Step 3 Task ([step3_train_model.py](https://github.com/allegroai/clearml/
y_test = dataset_task.artifacts['y_test'].get()
```
1. Train the network and log plots, along with **ClearML** automatic logging.
1. Train the network and log plots, along with ClearML automatic logging.
## Running the Pipeline
@ -226,12 +218,13 @@ In the Step 3 Task ([step3_train_model.py](https://github.com/allegroai/clearml/
* Run the script.
python pipeline_controller.py
python pipeline_controller.py
* Remotely execute the Task - If the Task `pipeline demo` in the project `examples` already exists in **ClearML Server**, clone it and enqueue it to execute.
* Remotely execute the Task - If the Task `pipeline demo` in the project `examples` already exists in ClearML Server, clone it and enqueue it to execute.
:::note
If you enqueue a Task, a worker must be listening to that queue for the Task to execute.
If you enqueue a Task, make sure an [agent](../../clearml_agent.md) is assigned to the queue, so
it will execute the Task.
:::

View File

@ -0,0 +1,101 @@
---
title: Pipeline from Decorators
---
The [pipeline_from_decorator.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_decorator.py)
example demonstrates the creation of a pipeline in ClearML using the [`PipelineDecorator`](../../references/sdk/automation_controller_pipelinecontroller.md#class-automationcontrollerpipelinedecorator)
class.
This example creates a pipeline incorporating four tasks, each of which is created from a python function using a custom decorator:
* `executing_pipeline`- Implements the pipeline controller which defines the pipeline structure and execution logic.
* `step_one` - Downloads and processes data.
* `step_two` - Further processes the data from `step_one`.
* `step_three` - Uses the processed data from `step_two` to train a model.
The pipeline steps, defined in the `step_one`, `step_two`, and `step_three` functions, are each wrapped with the
[`@PipelineDecorator.component`](../../references/sdk/automation_controller_pipelinecontroller.md#pipelinedecoratorcomponent)
decorator, which creates a ClearML pipeline step for each one when the pipeline is executed.
The logic that executes these steps and controls the interaction between them is implemented in the `executing_pipeline`
function. This function is wrapped with the [`@PipelineDecorator.pipeline`](../../references/sdk/automation_controller_pipelinecontroller.md#pipelinedecoratorpipeline)
decorator which creates the ClearML pipeline task when it is executed.
The sections below describe in more detail what happens in the pipeline controller and steps.
## Pipeline Controller
In this example, the pipeline controller is implemented by the `executing_pipeline` function.
Using the `@PipelineDecorator.pipeline` decorator creates a ClearML Controller Task from the function when it is executed.
For detailed information, see [`@PipelineDecorator.pipeline`](../../references/sdk/automation_controller_pipelinecontroller.md#pipelinedecoratorpipeline).
In the example script, the controller defines the interactions between the pipeline steps in the following way:
1. The controller function passes its argument, `pickle_url`, to the pipeline's first step (`step_one`)
1. The returned data from the first step, `data_frame`, is passed to `step_two`
1. The second step's output, `preprocessed_data`, is modified within the pipeline execution logic
1. The modified data is passed to the third step, `step_three`.
## Pipeline Steps
Using the `@PipelineDecorator.component` decorator will make the function a pipeline component that can be called from the
pipeline controller, which implements the pipeline's execution logic. For detailed information, see [`@PipelineDecorator.component`](../../references/sdk/automation_controller_pipelinecontroller.md#pipelinedecoratorcomponent).
When the pipeline controller calls a pipeline step, a corresponding ClearML task will be created. For this reason, each
function which makes up a pipeline step needs to be self-contained. Notice that all package imports inside the function
will be automatically logged as required packages for the pipeline execution step.
## Pipeline Execution
```python
PipelineDecorator.set_default_execution_queue('default')
# PipelineDecorator.debug_pipeline()
executing_pipeline(
pickle_url='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl',
)
```
By default, the pipeline controller and the pipeline steps are launched through ClearML [queues](../../fundamentals/agents_and_queues.md#what-is-a-queue).
Use the [`PipelineDecorator.set_default_execution_queue`](../../references/sdk/automation_controller_pipelinecontroller.md#pipelinedecoratorset_default_execution_queue)
method to specify the execution queue of all pipeline steps. The` execution_queue` parameter of the `PipelineDecorator.component`
decorator overrides the default queue value for the specific step for which it was specified.
:::note Execution Modes
ClearML provides different pipeline execution modes to accommodate development and production use cases. For additional
details, see [Execution Modes](../../fundamentals/pipelines.md#pipeline-controller-execution-options).
:::
To run the pipeline, call the pipeline controller function.
## WebApp
### Pipeline Controller
The pipeline controllers **CONFIGURATION** page contains the pipeline structure and step definitions in its **Configuration Objects**
section.
The **Pipeline** configuration object contains the pipeline structure and execution parameters.
![Pipeline configuration](../../img/pipeline_decorator_configurations.png)
An additional configuration object per pipeline step contains the steps definitions and execution parameters.
The pipeline controllers **RESULTS > PLOTS** page provides summary details for the pipeline execution.
The **Execution Flow** graphically summarizes the pipeline's execution. Hover over each step to view its details.
![Pipeline execution flow plot](../../img/pipeline_decorator_plot_1.png)
The **Execution Details** table provides the pipeline execution details in table format.
![Pipeline execution details plot](../../img/pipeline_decorator_plot_2.png)
### Pipeline Steps
Each function steps arguments are stored in their respective tasks **CONFIGURATION > HYPER PARAMETERS > kwargs**.
![Pipeline step configuration](../../img/pipeline_decorator_step_configuration.png)
Values that were listed in the `return_values`parameter of the `PipelineDecorator.component` decorator are stored as
artifacts in the relevant step's task. These artifacts can be viewed in the step tasks **ARTIFACTS** tab.
![Pipeline step artifacts](../../img/pipeline_decorator_step_artifacts.png)

View File

@ -0,0 +1,137 @@
---
title: Pipeline from Functions
---
The [pipeline_from_functions.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_functions.py)
example script demonstrates the creation of a pipeline using the [PipelineController](../../references/sdk/automation_controller_pipelinecontroller.md)
class.
This example creates a pipeline incorporating four tasks, each of which is created from a function:
* `executing_pipeline`- Implements the pipeline controller which defines the pipeline structure and execution logic.
* `step_one` - Downloads and processes data.
* `step_two` - Further processes the data from `step_one`.
* `step_three` - Uses the processed data from `step_two` to train a model.
The step functions will be registered as pipeline steps when they are added to the pipeline controller. The pipeline
execution logic is defined in the pipeline controller function.
When the pipeline steps are executed, corresponding ClearML Tasks are created. For this reason, each function which makes
up a pipeline step needs to be self-contained. Notice that all package imports inside the function will be automatically
logged as required packages for the pipeline execution step.
## Pipeline Controller
1. Create the [PipelineController](../../references/sdk/automation_controller_pipelinecontroller.md) object.
```python
pipe = PipelineController(
name='pipeline demo',
project='examples',
version='0.0.1',
add_pipeline_tags=False,
)
```
1. Set the default execution queue to be used. All the pipeline steps will be enqueued for execution in this queue
(unless overridden by the `execution_queue` parameter of the `add_function_step` method).
```python
pipe.set_default_execution_queue('default')
```
1. Add a pipeline level parameter that can be referenced from any step in the pipeline (see `step_one` below).
```python
pipe.add_parameter(
name='url',
description='url to pickle file',
default='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl'
)
```
1. Build the pipeline (see [`PipelineController.add_function_step`](../../references/sdk/automation_controller_pipelinecontroller.md#add_function_step)
for complete reference).
The first step in the pipeline uses the `step_one` function and uses as its input the pipeline level argument defined
above. Its return object will be stored as an artifact under the name `data_frame`.
```python
pipe.add_function_step(
name='step_one',
function=step_one,
function_kwargs=dict(pickle_data_url='${pipeline.url}'),
function_return=['data_frame'],
cache_executed_step=True,
)
```
The second step in the pipeline uses the `step_two` function and uses as its input the first steps output.This reference
implicitly defines the pipeline structure, making `step_one` the parent step of `step_two`.
Its return object will be stored as an artifact under the name `processed_data`.
```python
pipe.add_function_step(
name='step_two',
# parents=['step_one'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
function=step_two,
function_kwargs=dict(data_frame='${step_one.data_frame}'),
function_return=['processed_data'],
cache_executed_step=True,
)
```
The third step in the pipeline uses the `step_three` function and uses as its input the second steps output. This
reference implicitly defines the pipeline structure, making `step_two`the parent step of `step_three`.
Its return object will be stored as an artifact under the name `model`:
```python
pipe.add_function_step(
name='step_three',
# parents=['step_two'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
function=step_three,
function_kwargs=dict(data='${step_two.processed_data}'),
function_return=['model'],
cache_executed_step=True,
)
```
1. Run the pipeline.
```python
pipe.start()
```
The pipeline will be launched remotely, through the `services` queue, unless otherwise specified.
## WebApp
### Pipeline Controller
The pipeline controllers **CONFIGURATION** page contains the pipeline structure and step definitions in its **Configuration Objects**
section.
The **Pipeline** configuration object contains the pipeline structure and execution parameters.
![Pipeline configuration](../../img/pipeline_function_config.png)
An additional configuration object per pipeline step contains the steps definitions and execution parameters.
The pipeline controllers **RESULTS > PLOTS** page provides summary details for the pipeline execution.
The **Execution Flow** graphically summarizes the pipeline's execution. Hover over each step to view its details.
![Pipeline execution flow plot](../../img/pipeline_decorator_plot_1.png)
The **Execution Details** table provides the pipeline execution details in table format.
![pipeline execution details plot](../../img/pipeline_function_plot.png)
### Pipeline Steps
Each function steps arguments are stored in their respective tasks **CONFIGURATION > HYPER PARAMETERS > kwargs**.
![Pipeline step configurations](../../img/pipeline_function_step_configuration.png)
Values that were listed in the `return_values`parameter of the `PipelineDecorator.component` decorator are stored as
artifacts in the relevant step's task. These artifacts can be viewed in the step tasks ARTIFACTS tab.
![Pipeline step artifacts](../../img/pipeline_decorator_step_artifacts.png)

Binary file not shown.

After

Width:  |  Height:  |  Size: 108 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 82 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 71 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 93 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 68 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB

View File

@ -97,7 +97,7 @@ module.exports = {
{'IDEs': ['guides/ide/remote_jupyter_tutorial', 'guides/ide/integration_pycharm', 'guides/ide/google_colab']},
{'Offline Mode':['guides/set_offline']},
{'Optimization': ['guides/optimization/hyper-parameter-optimization/examples_hyperparam_opt']},
{'Pipelines': ['guides/pipeline/pipeline_controller']},
{'Pipelines': ['guides/pipeline/pipeline_controller', 'guides/pipeline/pipeline_decorator', 'guides/pipeline/pipeline_functions']},
{'Reporting': ['guides/reporting/explicit_reporting','guides/reporting/3d_plots_reporting', 'guides/reporting/artifacts', 'guides/reporting/using_artifacts', 'guides/reporting/clearml_logging_example', 'guides/reporting/html_reporting',
'guides/reporting/hyper_parameters', 'guides/reporting/image_reporting', 'guides/reporting/manual_matplotlib_reporting', 'guides/reporting/media_reporting',