mirror of
https://github.com/clearml/clearml
synced 2025-01-31 17:17:00 +00:00
104 lines
4.0 KiB
Python
104 lines
4.0 KiB
Python
from clearml import PipelineController
|
|
|
|
|
|
# We will use the following function an independent pipeline component step
|
|
# notice all package imports inside the function will be automatically logged as
|
|
# required packages for the pipeline execution step
|
|
def step_one(pickle_data_url):
|
|
# make sure we have scikit-learn for this step, we need it to use to unpickle the object
|
|
import sklearn # noqa
|
|
import pickle
|
|
import pandas as pd
|
|
from clearml import StorageManager
|
|
pickle_data_url = \
|
|
pickle_data_url or \
|
|
'https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl'
|
|
local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url)
|
|
with open(local_iris_pkl, 'rb') as f:
|
|
iris = pickle.load(f)
|
|
data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names'])
|
|
data_frame.columns += ['target']
|
|
data_frame['target'] = iris['target']
|
|
return data_frame
|
|
|
|
|
|
# We will use the following function an independent pipeline component step
|
|
# notice all package imports inside the function will be automatically logged as
|
|
# required packages for the pipeline execution step
|
|
def step_two(data_frame, test_size=0.2, random_state=42):
|
|
# make sure we have pandas for this step, we need it to use the data_frame
|
|
import pandas as pd # noqa
|
|
from sklearn.model_selection import train_test_split
|
|
y = data_frame['target']
|
|
X = data_frame[(c for c in data_frame.columns if c != 'target')]
|
|
X_train, X_test, y_train, y_test = train_test_split(
|
|
X, y, test_size=test_size, random_state=random_state)
|
|
|
|
return X_train, X_test, y_train, y_test
|
|
|
|
|
|
# We will use the following function an independent pipeline component step
|
|
# notice all package imports inside the function will be automatically logged as
|
|
# required packages for the pipeline execution step
|
|
def step_three(data):
|
|
# make sure we have pandas for this step, we need it to use the data_frame
|
|
import pandas as pd # noqa
|
|
from sklearn.linear_model import LogisticRegression
|
|
X_train, X_test, y_train, y_test = data
|
|
model = LogisticRegression(solver='liblinear', multi_class='auto')
|
|
model.fit(X_train, y_train)
|
|
return model
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
# create the pipeline controller
|
|
pipe = PipelineController(
|
|
project='examples',
|
|
name='pipeline demo',
|
|
version='1.1',
|
|
add_pipeline_tags=False,
|
|
)
|
|
|
|
# set the default execution queue to be used (per step we can override the execution)
|
|
pipe.set_default_execution_queue('default')
|
|
|
|
# add pipeline components
|
|
pipe.add_parameter(
|
|
name='url',
|
|
description='url to pickle file',
|
|
default='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl'
|
|
)
|
|
pipe.add_function_step(
|
|
name='step_one',
|
|
function=step_one,
|
|
function_kwargs=dict(pickle_data_url='${pipeline.url}'),
|
|
function_return=['data_frame'],
|
|
cache_executed_step=True,
|
|
)
|
|
pipe.add_function_step(
|
|
name='step_two',
|
|
# parents=['step_one'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
|
|
function=step_two,
|
|
function_kwargs=dict(data_frame='${step_one.data_frame}'),
|
|
function_return=['processed_data'],
|
|
cache_executed_step=True,
|
|
)
|
|
pipe.add_function_step(
|
|
name='step_three',
|
|
# parents=['step_two'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
|
|
function=step_three,
|
|
function_kwargs=dict(data='${step_two.processed_data}'),
|
|
function_return=['model'],
|
|
cache_executed_step=True,
|
|
)
|
|
|
|
# For debugging purposes run on the pipeline on current machine
|
|
# Use run_pipeline_steps_locally=True to further execute the pipeline component Tasks as subprocesses.
|
|
# pipe.start_locally(run_pipeline_steps_locally=False)
|
|
|
|
# Start the pipeline on the services queue (remote machine, default on the clearml-server)
|
|
pipe.start()
|
|
|
|
print('pipeline completed')
|