mirror of
https://github.com/clearml/clearml
synced 2025-03-09 21:40:51 +00:00
Add pipeline v2 examples
This commit is contained in:
parent
23fc9260f8
commit
c85c05ef6a
98
examples/pipeline/full_custom_pipeline.py
Normal file
98
examples/pipeline/full_custom_pipeline.py
Normal file
@ -0,0 +1,98 @@
|
||||
from clearml.automation.controller import PipelineDecorator
|
||||
|
||||
|
||||
# Make the following function an independent pipeline component step
|
||||
# notice all package imports inside the function will be automatically logged as
|
||||
# required packages for the pipeline execution step
|
||||
@PipelineDecorator.component(return_values=['data_frame'], cache=True)
|
||||
def step_one(pickle_data_url: str, extra: int = 43):
|
||||
print('step_one')
|
||||
# make sure we have scikit-learn for this step, we need it to use to unpickle the object
|
||||
import sklearn # noqa
|
||||
import pickle
|
||||
import pandas as pd
|
||||
from clearml import StorageManager
|
||||
local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url)
|
||||
with open(local_iris_pkl, 'rb') as f:
|
||||
iris = pickle.load(f)
|
||||
data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names'])
|
||||
data_frame.columns += ['target']
|
||||
data_frame['target'] = iris['target']
|
||||
return data_frame
|
||||
|
||||
|
||||
# Make the following function an independent pipeline component step
|
||||
# notice all package imports inside the function will be automatically logged as
|
||||
# required packages for the pipeline execution step.
|
||||
# Specifying `return_values` makes sure the function step can return an object to the pipeline logic
|
||||
# In this case, the returned tuple will be stored as an artifact named "processed_data"
|
||||
@PipelineDecorator.component(return_values=['processed_data'], cache=True,)
|
||||
def step_two(data_frame, test_size=0.2, random_state=42):
|
||||
print('step_two')
|
||||
# make sure we have pandas for this step, we need it to use the data_frame
|
||||
import pandas as pd # noqa
|
||||
from sklearn.model_selection import train_test_split
|
||||
y = data_frame['target']
|
||||
X = data_frame[(c for c in data_frame.columns if c != 'target')]
|
||||
X_train, X_test, y_train, y_test = train_test_split(
|
||||
X, y, test_size=test_size, random_state=random_state)
|
||||
|
||||
return X_train, X_test, y_train, y_test
|
||||
|
||||
|
||||
# Make the following function an independent pipeline component step
|
||||
# notice all package imports inside the function will be automatically logged as
|
||||
# required packages for the pipeline execution step
|
||||
# Specifying `return_values` makes sure the function step can return an object to the pipeline logic
|
||||
# In this case, the returned object will be stored as an artifact named "model"
|
||||
@PipelineDecorator.component(return_values=['model'], cache=True,)
|
||||
def step_three(data):
|
||||
print('step_three')
|
||||
# make sure we have pandas for this step, we need it to use the data_frame
|
||||
import pandas as pd # noqa
|
||||
from sklearn.linear_model import LogisticRegression
|
||||
X_train, X_test, y_train, y_test = data
|
||||
model = LogisticRegression(solver='liblinear', multi_class='auto')
|
||||
model.fit(X_train, y_train)
|
||||
return model
|
||||
|
||||
|
||||
# The actual pipeline execution context
|
||||
# notice that all pipeline component function calls are actually executed remotely
|
||||
# Only when a return value is used, the pipeline logic will wait for the component execution to complete
|
||||
@PipelineDecorator.pipeline(name='custom pipeline logic', project='examples', version='0.0.1')
|
||||
def executing_pipeline(pickle_url, mock_parameter='mock'):
|
||||
print('pipeline args:', pickle_url, mock_parameter)
|
||||
|
||||
# Use the pipeline argument to start the pipeline and pass it ot the first step
|
||||
data_frame = step_one(pickle_url)
|
||||
|
||||
# Use the returned data from the first step (`step_one`), and pass it to the next step (`step_two`)
|
||||
# Notice! unless we actually access the `data_frame` object,
|
||||
# the pipeline logic does not actually load the artifact itself.
|
||||
# When actually passing the `data_frame` object into a new step,
|
||||
# It waits for the creating step/function (`step_one`) to complete the execution
|
||||
processed_data = step_two(data_frame)
|
||||
|
||||
# Notice we can actually process/modify the returned values inside the pipeline logic context.
|
||||
# This means the modified object will be stored on the pipeline Task.
|
||||
processed_data = [processed_data[0], processed_data[1]*2, processed_data[2], processed_data[3]]
|
||||
model = step_three(processed_data)
|
||||
|
||||
# Notice since we are "printing" the `model` object,
|
||||
# we actually deserialize the object from the third step, and thus wait for the third step to complete.
|
||||
print('pipeline completed with model: {}'.format(model))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# set the pipeline steps default execution queue (per specific step we can override it with the decorator)
|
||||
PipelineDecorator.set_default_execution_queue('default')
|
||||
# run the pipeline steps as subprocess on the current machine, for debugging purposes
|
||||
# PipelineDecorator.debug_pipeline()
|
||||
|
||||
# Start the pipeline execution logic.
|
||||
executing_pipeline(
|
||||
pickle_url='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl',
|
||||
)
|
||||
|
||||
print('process completed')
|
103
examples/pipeline/pipeline_from_functions.py
Normal file
103
examples/pipeline/pipeline_from_functions.py
Normal file
@ -0,0 +1,103 @@
|
||||
from clearml import PipelineController
|
||||
|
||||
|
||||
# We will use the following function an independent pipeline component step
|
||||
# notice all package imports inside the function will be automatically logged as
|
||||
# required packages for the pipeline execution step
|
||||
def step_one(pickle_data_url):
|
||||
# make sure we have scikit-learn for this step, we need it to use to unpickle the object
|
||||
import sklearn # noqa
|
||||
import pickle
|
||||
import pandas as pd
|
||||
from clearml import StorageManager
|
||||
pickle_data_url = \
|
||||
pickle_data_url or \
|
||||
'https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl'
|
||||
local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url)
|
||||
with open(local_iris_pkl, 'rb') as f:
|
||||
iris = pickle.load(f)
|
||||
data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names'])
|
||||
data_frame.columns += ['target']
|
||||
data_frame['target'] = iris['target']
|
||||
return data_frame
|
||||
|
||||
|
||||
# We will use the following function an independent pipeline component step
|
||||
# notice all package imports inside the function will be automatically logged as
|
||||
# required packages for the pipeline execution step
|
||||
def step_two(data_frame, test_size=0.2, random_state=42):
|
||||
# make sure we have pandas for this step, we need it to use the data_frame
|
||||
import pandas as pd # noqa
|
||||
from sklearn.model_selection import train_test_split
|
||||
y = data_frame['target']
|
||||
X = data_frame[(c for c in data_frame.columns if c != 'target')]
|
||||
X_train, X_test, y_train, y_test = train_test_split(
|
||||
X, y, test_size=test_size, random_state=random_state)
|
||||
|
||||
return X_train, X_test, y_train, y_test
|
||||
|
||||
|
||||
# We will use the following function an independent pipeline component step
|
||||
# notice all package imports inside the function will be automatically logged as
|
||||
# required packages for the pipeline execution step
|
||||
def step_three(data):
|
||||
# make sure we have pandas for this step, we need it to use the data_frame
|
||||
import pandas as pd # noqa
|
||||
from sklearn.linear_model import LogisticRegression
|
||||
X_train, X_test, y_train, y_test = data
|
||||
model = LogisticRegression(solver='liblinear', multi_class='auto')
|
||||
model.fit(X_train, y_train)
|
||||
return model
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
# create the pipeline controller
|
||||
pipe = PipelineController(
|
||||
project='examples',
|
||||
name='pipeline demo',
|
||||
version='1.1',
|
||||
add_pipeline_tags=False,
|
||||
)
|
||||
|
||||
# set the default execution queue to be used (per step we can override the execution)
|
||||
pipe.set_default_execution_queue('default')
|
||||
|
||||
# add pipeline components
|
||||
pipe.add_parameter(
|
||||
name='url',
|
||||
description='url to pickle file',
|
||||
default='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl'
|
||||
)
|
||||
pipe.add_function_step(
|
||||
name='step_one',
|
||||
function=step_one,
|
||||
function_kwargs=dict(pickle_data_url='${pipeline.url}'),
|
||||
function_return=['data_frame'],
|
||||
cache_executed_step=True,
|
||||
)
|
||||
pipe.add_function_step(
|
||||
name='step_two',
|
||||
# parents=['step_one'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
|
||||
function=step_two,
|
||||
function_kwargs=dict(data_frame='${step_one.data_frame}'),
|
||||
function_return=['processed_data'],
|
||||
cache_executed_step=True,
|
||||
)
|
||||
pipe.add_function_step(
|
||||
name='step_three',
|
||||
# parents=['step_two'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
|
||||
function=step_three,
|
||||
function_kwargs=dict(data='${step_two.processed_data}'),
|
||||
function_return=['model'],
|
||||
cache_executed_step=True,
|
||||
)
|
||||
|
||||
# For debugging purposes run on the pipeline on current machine
|
||||
# Use run_pipeline_steps_locally=True to further execute the pipeline component Tasks as subprocesses.
|
||||
# pipe.start_locally(run_pipeline_steps_locally=False)
|
||||
|
||||
# Start the pipeline on the services queue (remote machine, default on the clearml-server)
|
||||
pipe.start()
|
||||
|
||||
print('pipeline completed')
|
49
examples/pipeline/pipeline_from_tasks.py
Normal file
49
examples/pipeline/pipeline_from_tasks.py
Normal file
@ -0,0 +1,49 @@
|
||||
from clearml import Task
|
||||
from clearml.automation import PipelineController
|
||||
|
||||
|
||||
def pre_execute_callback_example(a_pipeline, a_node, current_param_override):
|
||||
# type (PipelineController, PipelineController.Node, dict) -> bool
|
||||
print('Cloning Task id={} with parameters: {}'.format(a_node.base_task_id, current_param_override))
|
||||
# if we want to skip this node (and subtree of this node) we return False
|
||||
# return True to continue DAG execution
|
||||
return True
|
||||
|
||||
|
||||
def post_execute_callback_example(a_pipeline, a_node):
|
||||
# type (PipelineController, PipelineController.Node) -> None
|
||||
print('Completed Task id={}'.format(a_node.executed))
|
||||
# if we need the actual executed Task: Task.get_task(task_id=a_node.executed)
|
||||
return
|
||||
|
||||
|
||||
# Connecting ClearML with the current pipeline,
|
||||
# from here on everything is logged automatically
|
||||
pipe = PipelineController(
|
||||
name='pipeline demo',
|
||||
project='examples',
|
||||
version='0.0.1',
|
||||
add_pipeline_tags=False,
|
||||
)
|
||||
|
||||
pipe.set_default_execution_queue('default')
|
||||
|
||||
pipe.add_step(name='stage_data', base_task_project='examples', base_task_name='pipeline step 1 dataset artifact')
|
||||
pipe.add_step(name='stage_process', parents=['stage_data', ],
|
||||
base_task_project='examples', base_task_name='pipeline step 2 process dataset',
|
||||
parameter_override={'General/dataset_url': '${stage_data.artifacts.dataset.url}',
|
||||
'General/test_size': 0.25},
|
||||
pre_execute_callback=pre_execute_callback_example,
|
||||
post_execute_callback=post_execute_callback_example
|
||||
)
|
||||
pipe.add_step(name='stage_train', parents=['stage_process', ],
|
||||
base_task_project='examples', base_task_name='pipeline step 3 train model',
|
||||
parameter_override={'General/dataset_task_id': '${stage_process.id}'})
|
||||
|
||||
# for debugging purposes use local jobs
|
||||
# pipe.start_locally()
|
||||
|
||||
# Starting the pipeline (in the background)
|
||||
pipe.start()
|
||||
|
||||
print('done')
|
@ -1,89 +0,0 @@
|
||||
from clearml import PipelineController
|
||||
|
||||
|
||||
def step_one(pickle_data_url):
|
||||
import pickle
|
||||
import pandas as pd
|
||||
from clearml import StorageManager
|
||||
pickle_data_url = \
|
||||
pickle_data_url or \
|
||||
'https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl'
|
||||
local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url)
|
||||
with open(local_iris_pkl, 'rb') as f:
|
||||
iris = pickle.load(f)
|
||||
data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names'])
|
||||
data_frame.columns += ['target']
|
||||
data_frame['target'] = iris['target']
|
||||
return data_frame
|
||||
|
||||
|
||||
def step_two(data_frame, test_size=0.2, random_state=42):
|
||||
from sklearn.model_selection import train_test_split
|
||||
y = data_frame['target']
|
||||
X = data_frame[(c for c in data_frame.columns if c != 'target')]
|
||||
X_train, X_test, y_train, y_test = train_test_split(
|
||||
X, y, test_size=test_size, random_state=random_state)
|
||||
|
||||
return X_train, X_test, y_train, y_test
|
||||
|
||||
|
||||
def step_three(data):
|
||||
from sklearn.linear_model import LogisticRegression
|
||||
X_train, X_test, y_train, y_test = data
|
||||
model = LogisticRegression(solver='liblinear', multi_class='auto')
|
||||
model.fit(X_train, y_train)
|
||||
return model
|
||||
|
||||
|
||||
def debug_testing_our_pipeline(pickle_url):
|
||||
data_frame = step_one(pickle_url)
|
||||
processed_data = step_two(data_frame)
|
||||
model = step_three(processed_data)
|
||||
print(model)
|
||||
|
||||
|
||||
pipe = PipelineController(
|
||||
project='examples',
|
||||
name='pipeline demo',
|
||||
version='1.1',
|
||||
add_pipeline_tags=False,
|
||||
)
|
||||
|
||||
pipe.set_default_execution_queue('default')
|
||||
|
||||
pipe.add_parameter(
|
||||
name='url',
|
||||
description='url to pickle file',
|
||||
default='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl'
|
||||
)
|
||||
pipe.add_function_step(
|
||||
name='step_one',
|
||||
function=step_one,
|
||||
function_kwargs=dict(pickle_data_url='${pipeline.url}'),
|
||||
function_return=['data_frame'],
|
||||
cache_executed_step=True,
|
||||
)
|
||||
pipe.add_function_step(
|
||||
name='step_two',
|
||||
# parents=['step_one'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
|
||||
function=step_two,
|
||||
function_kwargs=dict(data_frame='${step_one.data_frame}'),
|
||||
function_return=['processed_data'],
|
||||
cache_executed_step=True,
|
||||
)
|
||||
pipe.add_function_step(
|
||||
name='step_three',
|
||||
# parents=['step_two'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
|
||||
function=step_three,
|
||||
function_kwargs=dict(data='${step_two.processed_data}'),
|
||||
function_return=['model'],
|
||||
cache_executed_step=True,
|
||||
)
|
||||
|
||||
# for debugging purposes use local jobs
|
||||
# pipe.start_locally(run_pipeline_steps_locally=False)
|
||||
|
||||
# Starting the pipeline on the services queue (remote machine, default on the clearml-server)
|
||||
pipe.start()
|
||||
|
||||
print('pipeline done')
|
Loading…
Reference in New Issue
Block a user