diff --git a/examples/pipeline/pipeline_from_decorator.py b/examples/pipeline/pipeline_from_decorator.py index 0d7a065e..a37baeb4 100644 --- a/examples/pipeline/pipeline_from_decorator.py +++ b/examples/pipeline/pipeline_from_decorator.py @@ -26,8 +26,10 @@ def step_one(pickle_data_url: str, extra: int = 43): # notice all package imports inside the function will be automatically logged as # required packages for the pipeline execution step. # Specifying `return_values` makes sure the function step can return an object to the pipeline logic -# In this case, the returned tuple will be stored as an artifact named "processed_data" -@PipelineDecorator.component(return_values=['processed_data'], cache=True, task_type=TaskTypes.data_processing) +# In this case, the returned tuple will be stored as an artifact named "X_train, X_test, y_train, y_test" +@PipelineDecorator.component( + return_values=['X_train, X_test, y_train, y_test'], cache=True, task_type=TaskTypes.data_processing +) def step_two(data_frame, test_size=0.2, random_state=42): print('step_two') # make sure we have pandas for this step, we need it to use the data_frame @@ -36,7 +38,8 @@ def step_two(data_frame, test_size=0.2, random_state=42): y = data_frame['target'] X = data_frame[(c for c in data_frame.columns if c != 'target')] X_train, X_test, y_train, y_test = train_test_split( - X, y, test_size=test_size, random_state=random_state) + X, y, test_size=test_size, random_state=random_state + ) return X_train, X_test, y_train, y_test @@ -47,21 +50,26 @@ def step_two(data_frame, test_size=0.2, random_state=42): # Specifying `return_values` makes sure the function step can return an object to the pipeline logic # In this case, the returned object will be stored as an artifact named "model" @PipelineDecorator.component(return_values=['model'], cache=True, task_type=TaskTypes.training) -def step_three(data): +def step_three(X_train, y_train): print('step_three') # make sure we have pandas for this step, we need it to use the data_frame import pandas as pd # noqa from sklearn.linear_model import LogisticRegression - from clearml import Task - X_train, X_test, y_train, y_test = data model = LogisticRegression(solver='liblinear', multi_class='auto') model.fit(X_train, y_train) - score = model.score(X_test,y_test) - # Get current step's Task - task = Task.current_task() - task.get_logger().report_single_value(name='accuracy',value=score) return model +# Make the following function an independent pipeline component step +# notice all package imports inside the function will be automatically logged as +# required packages for the pipeline execution step +# Specifying `return_values` makes sure the function step can return an object to the pipeline logic +# In this case, the returned object will be stored as an artifact named "accuracy" +@PipelineDecorator.component(return_values=['accuracy'], cache=True, task_type=TaskTypes.qc) +def step_four(model, X_data, Y_data): + from sklearn.linear_model import LogisticRegression # noqa + from sklearn.metrics import accuracy_score + Y_pred = model.predict(X_data) + return accuracy_score(Y_data, Y_pred, normalize=True) # The actual pipeline execution context # notice that all pipeline component function calls are actually executed remotely @@ -80,14 +88,21 @@ def executing_pipeline(pickle_url, mock_parameter='mock'): # When actually passing the `data_frame` object into a new step, # It waits for the creating step/function (`step_one`) to complete the execution print('launch step two') - processed_data = step_two(data_frame) + X_train, X_test, y_train, y_test = step_two(data_frame) print('launch step three') - model = step_three(processed_data) + model = step_three(X_train, y_train) # Notice since we are "printing" the `model` object, # we actually deserialize the object from the third step, and thus wait for the third step to complete. - print('pipeline completed with model: {}'.format(model)) + print('returned model: {}'.format(model)) + + print('launch step four') + accuracy = 100 * step_four(model, X_data=X_test, Y_data=y_test) + + # Notice since we are "printing" the `accuracy` object, + # we actually deserialize the object from the fourth step, and thus wait for the fourth step to complete. + print(f"Accuracy={accuracy}%") if __name__ == '__main__':