mirror of
https://github.com/clearml/clearml-docs
synced 2025-02-01 15:04:00 +00:00
241 lines
9.3 KiB
Markdown
241 lines
9.3 KiB
Markdown
---
|
||
title: Pipeline from Tasks
|
||
---
|
||
|
||
The [pipeline_from_tasks.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_tasks.py)
|
||
example demonstrates a simple pipeline, where each step is a [ClearML Task](../../fundamentals/task.md).
|
||
|
||
The pipeline is implemented using the [PipelineController](../../references/sdk/automation_controller_pipelinecontroller.md)
|
||
class. Steps are added to a PipelineController object, which launches and monitors the steps when executed.
|
||
|
||
This example incorporates four tasks, each of which is created using a different script:
|
||
* **Controller Task** ([pipeline_from_tasks.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_tasks.py)) -
|
||
Implements the pipeline controller, adds the steps (tasks) to the pipeline, and runs the pipeline.
|
||
* **Step 1** ([step1_dataset_artifact.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/step1_dataset_artifact.py)) -
|
||
Downloads data and stores the data as an artifact.
|
||
* **Step 2** ([step2_data_processing.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/step2_data_processing.py)) -
|
||
Loads the stored data (from Step 1), processes it, and stores the processed data as artifacts.
|
||
* **Step 3** ([step3_train_model.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/step3_train_model.py)) -
|
||
Loads the processed data (from Step 2) and trains a network.
|
||
|
||
When the controller task is executed, it clones the step tasks, and enqueues the newly cloned tasks for execution. Note
|
||
that the base tasks from which the steps are cloned are only used as templates and not executed themselves. Also note
|
||
that for the controller to clone, these base tasks need to exist in the system (as a result of a previous run or using
|
||
[clearml-task](../../apps/clearml_task.md)).
|
||
|
||
The controller task itself can be run locally, or, if the controller task has already run at least once and is in the
|
||
ClearML Server, the controller can be cloned, and the cloned task can be executed remotely.
|
||
|
||
The sections below describe in more detail what happens in the controller task and in each step task.
|
||
|
||
## The Pipeline Controller
|
||
|
||
1. Create the [pipeline controller](../../references/sdk/automation_controller_pipelinecontroller.md) object.
|
||
|
||
```python
|
||
pipe = PipelineController(
|
||
name='pipeline demo',
|
||
project='examples',
|
||
version='0.0.1',
|
||
add_pipeline_tags=False,
|
||
)
|
||
```
|
||
|
||
1. Set the default execution queue to be used. All the pipeline steps will be enqueued for execution in this queue.
|
||
|
||
```python
|
||
pipe.set_default_execution_queue('default')
|
||
```
|
||
|
||
1. Build the pipeline (see [PipelineController.add_step](../../references/sdk/automation_controller_pipelinecontroller.md#add_step)
|
||
method for complete reference):
|
||
|
||
The pipeline’s [first step](#step-1---downloading-the-datae) uses the pre-existing task
|
||
`pipeline step 1 dataset artifact` in the `examples` project. The step uploads local data and stores it as an artifact.
|
||
|
||
```python
|
||
pipe.add_step(
|
||
name='stage_data',
|
||
base_task_project='examples',
|
||
base_task_name='pipeline step 1 dataset artifact'
|
||
)
|
||
```
|
||
|
||
The [second step](#step-2---processing-the-data) uses the pre-existing task `pipeline step 2 process dataset` in
|
||
the `examples` project. The second step’s dependency upon the first step’s completion is designated by setting it as
|
||
its parent.
|
||
|
||
Custom configuration values specific to this step execution are defined through the `parameter_override` parameter,
|
||
where the first step’s artifact is fed into the second step.
|
||
|
||
Special pre-execution and post-execution logic is added for this step through the use of `pre_execute_callback`
|
||
and `post_execute_callback` respectively.
|
||
|
||
```python
|
||
pipe.add_step(
|
||
name='stage_process',
|
||
parents=['stage_data', ],
|
||
base_task_project='examples',
|
||
base_task_name='pipeline step 2 process dataset',
|
||
parameter_override={
|
||
'General/dataset_url': '${stage_data.artifacts.dataset.url}',
|
||
'General/test_size': 0.25
|
||
},
|
||
pre_execute_callback=pre_execute_callback_example,
|
||
post_execute_callback=post_execute_callback_example
|
||
)
|
||
```
|
||
|
||
The [third step](#step-3---training-the-network) uses the pre-existing task `pipeline step 3 train model` in the
|
||
`examples` projects. The step uses Step 2’s artifacts.
|
||
|
||
1. Run the pipeline.
|
||
|
||
```python
|
||
pipe.start()
|
||
```
|
||
|
||
The pipeline launches remotely, through the services queue, unless otherwise specified.
|
||
|
||
## Step 1 - Downloading the Data
|
||
|
||
The pipeline’s first step ([step1_dataset_artifact.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/step1_dataset_artifact.py))
|
||
does the following:
|
||
|
||
1. Download data using [`StorageManager.get_local_copy`](../../references/sdk/storage.md#storagemanagerget_local_copy)
|
||
|
||
```python
|
||
# simulate local dataset, download one, so we have something local
|
||
local_iris_pkl = StorageManager.get_local_copy(
|
||
remote_url='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl'
|
||
)
|
||
```
|
||
1. Store the data as an artifact named `dataset` using [`Task.upload_artifact`](../../references/sdk/task.md#upload_artifact)
|
||
```python
|
||
# add and upload local file containing our toy dataset
|
||
task.upload_artifact('dataset', artifact_object=local_iris_pkl)
|
||
```
|
||
|
||
## Step 2 - Processing the Data
|
||
|
||
The pipeline's second step ([step2_data_processing.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/step2_data_processing.py))
|
||
does the following:
|
||
|
||
1. Connect its configuration parameters with the ClearML task:
|
||
|
||
```python
|
||
args = {
|
||
'dataset_task_id': '',
|
||
'dataset_url': '',
|
||
'random_state': 42,
|
||
'test_size': 0.2,
|
||
}
|
||
|
||
# store arguments, later we will be able to change them from outside the code
|
||
task.connect(args)
|
||
```
|
||
|
||
1. Download the data created in the previous step (specified through the `dataset_url` parameter) using
|
||
[`StorageManager.get_local_copy`](../../references/sdk/storage.md#storagemanagerget_local_copy)
|
||
|
||
```python
|
||
iris_pickle = StorageManager.get_local_copy(remote_url=args['dataset_url'])
|
||
```
|
||
|
||
1. Generate testing and training sets from the data and store them as artifacts.
|
||
|
||
```python
|
||
task.upload_artifact('X_train', X_train)
|
||
task.upload_artifact('X_test', X_test)
|
||
task.upload_artifact('y_train', y_train)
|
||
task.upload_artifact('y_test', y_test)
|
||
```
|
||
|
||
## Step 3 - Training the Network
|
||
|
||
The pipeline's third step ([step3_train_model.py](https://github.com/allegroai/clearml/blob/master/examples/pipeline/step3_train_model.py))
|
||
does the following:
|
||
1. Connect its configuration parameters with the ClearML task. This allows the [pipeline controller](#the-pipeline-controller)
|
||
to override the `dataset_task_id` value as the pipeline is run.
|
||
|
||
```python
|
||
# Arguments
|
||
args = {
|
||
'dataset_task_id': 'REPLACE_WITH_DATASET_TASK_ID',
|
||
}
|
||
task.connect(args)
|
||
```
|
||
|
||
1. Clone the base task and enqueue it using [`Task.execute_remotely`](../../references/sdk/task.md#execute_remotely).
|
||
|
||
```python
|
||
task.execute_remotely()
|
||
```
|
||
|
||
1. Access the data created in the previous task.
|
||
|
||
```python
|
||
dataset_task = Task.get_task(task_id=args['dataset_task_id'])
|
||
X_train = dataset_task.artifacts['X_train'].get()
|
||
X_test = dataset_task.artifacts['X_test'].get()
|
||
y_train = dataset_task.artifacts['y_train'].get()
|
||
y_test = dataset_task.artifacts['y_test'].get()
|
||
```
|
||
|
||
1. Train the network and log plots.
|
||
|
||
## Running the Pipeline
|
||
|
||
**To run the pipeline:**
|
||
|
||
1. If the pipeline steps tasks do not yet exist, run their code to create the ClearML tasks.
|
||
```bash
|
||
python step1_dataset_artifact.py
|
||
python step2_data_processing.py
|
||
python step3_train_model.py
|
||
```
|
||
|
||
1. Run the pipeline controller.
|
||
|
||
```bash
|
||
python pipeline_from_tasks.py
|
||
```
|
||
|
||
:::note
|
||
If you enqueue a Task, make sure an [agent](../../clearml_agent.md) is assigned to the queue, so
|
||
it will execute the Task.
|
||
:::
|
||
|
||
|
||
## WebApp
|
||
|
||
When the experiment is executed, the terminal returns the task ID, and links to the pipeline controller task page and
|
||
pipeline page.
|
||
|
||
```
|
||
ClearML Task: created new task id=bc93610688f242ecbbe70f413ff2cf5f
|
||
ClearML results page: https://app.clear.ml/projects/462f48dba7b441ffb34bddb783711da7/experiments/bc93610688f242ecbbe70f413ff2cf5f/output/log
|
||
ClearML pipeline page: https://app.clear.ml/pipelines/462f48dba7b441ffb34bddb783711da7/experiments/bc93610688f242ecbbe70f413ff2cf5f
|
||
```
|
||
|
||
The pipeline run’s page contains the pipeline’s structure, the execution status of every step, as well as the run’s
|
||
configuration parameters and output.
|
||
|
||
![Pipeline DAG](../../img/examples_pipeline_from_tasks_DAG.png)
|
||
|
||
To view a run’s complete information, click **Full details** on the bottom of the **Run Info** panel, which will open
|
||
the pipeline’s [controller task page](../../webapp/webapp_exp_track_visual.md).
|
||
|
||
Click a step to see its summary information.
|
||
|
||
![Pipeline step info](../../img/examples_pipeline_from_tasks_step_info.png)
|
||
|
||
### Console
|
||
|
||
Click **DETAILS** to view a log of the pipeline controller’s console output.
|
||
|
||
![Pipeline console](../../img/examples_pipeline_from_tasks_console.png)
|
||
|
||
Click on a step to view its console output.
|
||
|