From c85c05ef6aaca4e07e739ba53d13f16e6a994b05 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 5 Sep 2021 00:30:53 +0300 Subject: [PATCH] Add pipeline v2 examples --- examples/pipeline/full_custom_pipeline.py | 98 ++++++++++++++++++ examples/pipeline/pipeline_from_functions.py | 103 +++++++++++++++++++ examples/pipeline/pipeline_from_tasks.py | 49 +++++++++ examples/pipeline/pipeline_self_contained.py | 89 ---------------- 4 files changed, 250 insertions(+), 89 deletions(-) create mode 100644 examples/pipeline/full_custom_pipeline.py create mode 100644 examples/pipeline/pipeline_from_functions.py create mode 100644 examples/pipeline/pipeline_from_tasks.py delete mode 100644 examples/pipeline/pipeline_self_contained.py diff --git a/examples/pipeline/full_custom_pipeline.py b/examples/pipeline/full_custom_pipeline.py new file mode 100644 index 00000000..2adbc75b --- /dev/null +++ b/examples/pipeline/full_custom_pipeline.py @@ -0,0 +1,98 @@ +from clearml.automation.controller import PipelineDecorator + + +# Make the following function an independent pipeline component step +# notice all package imports inside the function will be automatically logged as +# required packages for the pipeline execution step +@PipelineDecorator.component(return_values=['data_frame'], cache=True) +def step_one(pickle_data_url: str, extra: int = 43): + print('step_one') + # make sure we have scikit-learn for this step, we need it to use to unpickle the object + import sklearn # noqa + import pickle + import pandas as pd + from clearml import StorageManager + local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url) + with open(local_iris_pkl, 'rb') as f: + iris = pickle.load(f) + data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names']) + data_frame.columns += ['target'] + data_frame['target'] = iris['target'] + return data_frame + + +# Make the following function an independent pipeline component step +# notice all package imports inside the function will be automatically logged as +# required packages for the pipeline execution step. +# Specifying `return_values` makes sure the function step can return an object to the pipeline logic +# In this case, the returned tuple will be stored as an artifact named "processed_data" +@PipelineDecorator.component(return_values=['processed_data'], cache=True,) +def step_two(data_frame, test_size=0.2, random_state=42): + print('step_two') + # make sure we have pandas for this step, we need it to use the data_frame + import pandas as pd # noqa + from sklearn.model_selection import train_test_split + y = data_frame['target'] + X = data_frame[(c for c in data_frame.columns if c != 'target')] + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=test_size, random_state=random_state) + + return X_train, X_test, y_train, y_test + + +# Make the following function an independent pipeline component step +# notice all package imports inside the function will be automatically logged as +# required packages for the pipeline execution step +# Specifying `return_values` makes sure the function step can return an object to the pipeline logic +# In this case, the returned object will be stored as an artifact named "model" +@PipelineDecorator.component(return_values=['model'], cache=True,) +def step_three(data): + print('step_three') + # make sure we have pandas for this step, we need it to use the data_frame + import pandas as pd # noqa + from sklearn.linear_model import LogisticRegression + X_train, X_test, y_train, y_test = data + model = LogisticRegression(solver='liblinear', multi_class='auto') + model.fit(X_train, y_train) + return model + + +# The actual pipeline execution context +# notice that all pipeline component function calls are actually executed remotely +# Only when a return value is used, the pipeline logic will wait for the component execution to complete +@PipelineDecorator.pipeline(name='custom pipeline logic', project='examples', version='0.0.1') +def executing_pipeline(pickle_url, mock_parameter='mock'): + print('pipeline args:', pickle_url, mock_parameter) + + # Use the pipeline argument to start the pipeline and pass it ot the first step + data_frame = step_one(pickle_url) + + # Use the returned data from the first step (`step_one`), and pass it to the next step (`step_two`) + # Notice! unless we actually access the `data_frame` object, + # the pipeline logic does not actually load the artifact itself. + # When actually passing the `data_frame` object into a new step, + # It waits for the creating step/function (`step_one`) to complete the execution + processed_data = step_two(data_frame) + + # Notice we can actually process/modify the returned values inside the pipeline logic context. + # This means the modified object will be stored on the pipeline Task. + processed_data = [processed_data[0], processed_data[1]*2, processed_data[2], processed_data[3]] + model = step_three(processed_data) + + # Notice since we are "printing" the `model` object, + # we actually deserialize the object from the third step, and thus wait for the third step to complete. + print('pipeline completed with model: {}'.format(model)) + + +if __name__ == '__main__': + # set the pipeline steps default execution queue (per specific step we can override it with the decorator) + PipelineDecorator.set_default_execution_queue('default') + # run the pipeline steps as subprocess on the current machine, for debugging purposes + # PipelineDecorator.debug_pipeline() + + # Start the pipeline execution logic. + executing_pipeline( + pickle_url='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl', + ) + + print('process completed') diff --git a/examples/pipeline/pipeline_from_functions.py b/examples/pipeline/pipeline_from_functions.py new file mode 100644 index 00000000..f6e24f82 --- /dev/null +++ b/examples/pipeline/pipeline_from_functions.py @@ -0,0 +1,103 @@ +from clearml import PipelineController + + +# We will use the following function an independent pipeline component step +# notice all package imports inside the function will be automatically logged as +# required packages for the pipeline execution step +def step_one(pickle_data_url): + # make sure we have scikit-learn for this step, we need it to use to unpickle the object + import sklearn # noqa + import pickle + import pandas as pd + from clearml import StorageManager + pickle_data_url = \ + pickle_data_url or \ + 'https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl' + local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url) + with open(local_iris_pkl, 'rb') as f: + iris = pickle.load(f) + data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names']) + data_frame.columns += ['target'] + data_frame['target'] = iris['target'] + return data_frame + + +# We will use the following function an independent pipeline component step +# notice all package imports inside the function will be automatically logged as +# required packages for the pipeline execution step +def step_two(data_frame, test_size=0.2, random_state=42): + # make sure we have pandas for this step, we need it to use the data_frame + import pandas as pd # noqa + from sklearn.model_selection import train_test_split + y = data_frame['target'] + X = data_frame[(c for c in data_frame.columns if c != 'target')] + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=test_size, random_state=random_state) + + return X_train, X_test, y_train, y_test + + +# We will use the following function an independent pipeline component step +# notice all package imports inside the function will be automatically logged as +# required packages for the pipeline execution step +def step_three(data): + # make sure we have pandas for this step, we need it to use the data_frame + import pandas as pd # noqa + from sklearn.linear_model import LogisticRegression + X_train, X_test, y_train, y_test = data + model = LogisticRegression(solver='liblinear', multi_class='auto') + model.fit(X_train, y_train) + return model + + +if __name__ == '__main__': + + # create the pipeline controller + pipe = PipelineController( + project='examples', + name='pipeline demo', + version='1.1', + add_pipeline_tags=False, + ) + + # set the default execution queue to be used (per step we can override the execution) + pipe.set_default_execution_queue('default') + + # add pipeline components + pipe.add_parameter( + name='url', + description='url to pickle file', + default='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl' + ) + pipe.add_function_step( + name='step_one', + function=step_one, + function_kwargs=dict(pickle_data_url='${pipeline.url}'), + function_return=['data_frame'], + cache_executed_step=True, + ) + pipe.add_function_step( + name='step_two', + # parents=['step_one'], # the pipeline will automatically detect the dependencies based on the kwargs inputs + function=step_two, + function_kwargs=dict(data_frame='${step_one.data_frame}'), + function_return=['processed_data'], + cache_executed_step=True, + ) + pipe.add_function_step( + name='step_three', + # parents=['step_two'], # the pipeline will automatically detect the dependencies based on the kwargs inputs + function=step_three, + function_kwargs=dict(data='${step_two.processed_data}'), + function_return=['model'], + cache_executed_step=True, + ) + + # For debugging purposes run on the pipeline on current machine + # Use run_pipeline_steps_locally=True to further execute the pipeline component Tasks as subprocesses. + # pipe.start_locally(run_pipeline_steps_locally=False) + + # Start the pipeline on the services queue (remote machine, default on the clearml-server) + pipe.start() + + print('pipeline completed') diff --git a/examples/pipeline/pipeline_from_tasks.py b/examples/pipeline/pipeline_from_tasks.py new file mode 100644 index 00000000..4bbb5b39 --- /dev/null +++ b/examples/pipeline/pipeline_from_tasks.py @@ -0,0 +1,49 @@ +from clearml import Task +from clearml.automation import PipelineController + + +def pre_execute_callback_example(a_pipeline, a_node, current_param_override): + # type (PipelineController, PipelineController.Node, dict) -> bool + print('Cloning Task id={} with parameters: {}'.format(a_node.base_task_id, current_param_override)) + # if we want to skip this node (and subtree of this node) we return False + # return True to continue DAG execution + return True + + +def post_execute_callback_example(a_pipeline, a_node): + # type (PipelineController, PipelineController.Node) -> None + print('Completed Task id={}'.format(a_node.executed)) + # if we need the actual executed Task: Task.get_task(task_id=a_node.executed) + return + + +# Connecting ClearML with the current pipeline, +# from here on everything is logged automatically +pipe = PipelineController( + name='pipeline demo', + project='examples', + version='0.0.1', + add_pipeline_tags=False, +) + +pipe.set_default_execution_queue('default') + +pipe.add_step(name='stage_data', base_task_project='examples', base_task_name='pipeline step 1 dataset artifact') +pipe.add_step(name='stage_process', parents=['stage_data', ], + base_task_project='examples', base_task_name='pipeline step 2 process dataset', + parameter_override={'General/dataset_url': '${stage_data.artifacts.dataset.url}', + 'General/test_size': 0.25}, + pre_execute_callback=pre_execute_callback_example, + post_execute_callback=post_execute_callback_example + ) +pipe.add_step(name='stage_train', parents=['stage_process', ], + base_task_project='examples', base_task_name='pipeline step 3 train model', + parameter_override={'General/dataset_task_id': '${stage_process.id}'}) + +# for debugging purposes use local jobs +# pipe.start_locally() + +# Starting the pipeline (in the background) +pipe.start() + +print('done') diff --git a/examples/pipeline/pipeline_self_contained.py b/examples/pipeline/pipeline_self_contained.py deleted file mode 100644 index 55e989a5..00000000 --- a/examples/pipeline/pipeline_self_contained.py +++ /dev/null @@ -1,89 +0,0 @@ -from clearml import PipelineController - - -def step_one(pickle_data_url): - import pickle - import pandas as pd - from clearml import StorageManager - pickle_data_url = \ - pickle_data_url or \ - 'https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl' - local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url) - with open(local_iris_pkl, 'rb') as f: - iris = pickle.load(f) - data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names']) - data_frame.columns += ['target'] - data_frame['target'] = iris['target'] - return data_frame - - -def step_two(data_frame, test_size=0.2, random_state=42): - from sklearn.model_selection import train_test_split - y = data_frame['target'] - X = data_frame[(c for c in data_frame.columns if c != 'target')] - X_train, X_test, y_train, y_test = train_test_split( - X, y, test_size=test_size, random_state=random_state) - - return X_train, X_test, y_train, y_test - - -def step_three(data): - from sklearn.linear_model import LogisticRegression - X_train, X_test, y_train, y_test = data - model = LogisticRegression(solver='liblinear', multi_class='auto') - model.fit(X_train, y_train) - return model - - -def debug_testing_our_pipeline(pickle_url): - data_frame = step_one(pickle_url) - processed_data = step_two(data_frame) - model = step_three(processed_data) - print(model) - - -pipe = PipelineController( - project='examples', - name='pipeline demo', - version='1.1', - add_pipeline_tags=False, -) - -pipe.set_default_execution_queue('default') - -pipe.add_parameter( - name='url', - description='url to pickle file', - default='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl' -) -pipe.add_function_step( - name='step_one', - function=step_one, - function_kwargs=dict(pickle_data_url='${pipeline.url}'), - function_return=['data_frame'], - cache_executed_step=True, -) -pipe.add_function_step( - name='step_two', - # parents=['step_one'], # the pipeline will automatically detect the dependencies based on the kwargs inputs - function=step_two, - function_kwargs=dict(data_frame='${step_one.data_frame}'), - function_return=['processed_data'], - cache_executed_step=True, -) -pipe.add_function_step( - name='step_three', - # parents=['step_two'], # the pipeline will automatically detect the dependencies based on the kwargs inputs - function=step_three, - function_kwargs=dict(data='${step_two.processed_data}'), - function_return=['model'], - cache_executed_step=True, -) - -# for debugging purposes use local jobs -# pipe.start_locally(run_pipeline_steps_locally=False) - -# Starting the pipeline on the services queue (remote machine, default on the clearml-server) -pipe.start() - -print('pipeline done')