From 52d88357105dcb03bc30926618cb434816174c21 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Wed, 20 Dec 2023 13:14:14 +0200 Subject: [PATCH] Add tabular-data multi stage pipeline process example --- ...l_tabular_data_process_pipeline_example.py | 179 ++++++++++++++++++ examples/pipeline/requirements.txt | 1 + 2 files changed, 180 insertions(+) create mode 100644 examples/pipeline/full_tabular_data_process_pipeline_example.py diff --git a/examples/pipeline/full_tabular_data_process_pipeline_example.py b/examples/pipeline/full_tabular_data_process_pipeline_example.py new file mode 100644 index 00000000..e8d63791 --- /dev/null +++ b/examples/pipeline/full_tabular_data_process_pipeline_example.py @@ -0,0 +1,179 @@ +from clearml import PipelineDecorator, Task + + +@PipelineDecorator.component(cache=True) +def create_dataset(source_url: str, project: str, dataset_name: str) -> str: + print("starting create_dataset") + from clearml import StorageManager, Dataset + import pandas as pd + local_file = StorageManager.get_local_copy(source_url) + df = pd.read_csv(local_file, header=None) + df.to_csv(path_or_buf="./dataset.csv", index=False) + dataset = Dataset.create(dataset_project=project, dataset_name=dataset_name) + dataset.add_files("./dataset.csv") + dataset.get_logger().report_table(title="sample", series="head", table_plot=df.head()) + dataset.finalize(auto_upload=True) + + print("done create_dataset") + return dataset.id + + +@PipelineDecorator.component(cache=True) +def preprocess_dataset(dataset_id: str): + print("starting preprocess_dataset") + from clearml import Dataset + from pathlib import Path + import pandas as pd + dataset = Dataset.get(dataset_id=dataset_id) + local_folder = dataset.get_local_copy() + df = pd.read_csv(Path(local_folder) / "dataset.csv", header=None) + # "preprocessing" - adding columns + df.columns = [ + 'age', 'workclass', 'fnlwgt', 'degree', 'education-yrs', 'marital-status', + 'occupation', 'relationship', 'ethnicity', 'gender', 'capital-gain', + 'capital-loss', 'hours-per-week', 'native-country', 'income-cls', + ] + df.to_csv(path_or_buf="./dataset.csv", index=False) + + # store in a new dataset + new_dataset = Dataset.create( + dataset_project=dataset.project, dataset_name="{} v2".format(dataset.name), + parent_datasets=[dataset] + ) + new_dataset.add_files("./dataset.csv") + new_dataset.get_logger().report_table(title="sample", series="head", table_plot=df.head()) + new_dataset.finalize(auto_upload=True) + + print("done preprocess_dataset") + return new_dataset.id + + +@PipelineDecorator.component(cache=True) +def verify_dataset_integrity(dataset_id: str, expected_num_columns: int): + print("starting verify_dataset_integrity") + from clearml import Dataset, Logger + from pathlib import Path + import numpy as np + import pandas as pd + dataset = Dataset.get(dataset_id=dataset_id) + local_folder = dataset.get_local_copy() + df = pd.read_csv(Path(local_folder) / "dataset.csv") + print("Verifying dataset") + assert len(df.columns) == expected_num_columns + print("PASSED") + # log some stats on the age column + Logger.current_logger().report_histogram( + title="histogram", series="age", values=np.histogram(df["age"]) + ) + + print("done verify_dataset_integrity") + return True + + +@PipelineDecorator.component(output_uri=True) +def train_model(dataset_id: str, training_args: dict): + print("starting train_model") + from clearml import Dataset, OutputModel, Task + from pathlib import Path + import pandas as pd + import numpy as np + import xgboost as xgb + from sklearn.model_selection import train_test_split + import matplotlib.pyplot as plt + + dataset = Dataset.get(dataset_id=dataset_id) + local_folder = dataset.get_local_copy() + df = pd.read_csv(Path(local_folder) / "dataset.csv") + + # prepare data (i.e. select specific columns) + columns = ["age", "fnlwgt", "education-yrs", "capital-gain", "capital-loss", "hours-per-week"] + X = df[columns].drop("age", axis=1) + y = df["age"] + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) + # create matrix + dtrain = xgb.DMatrix(X_train, label=y_train) + dtest = xgb.DMatrix(X_test, label=y_test) + + # train with XGBoost + params = {"objective": "reg:squarederror", "eval_metric": "rmse"} + bst = xgb.train( + params, + dtrain, + num_boost_round=training_args.get("num_boost_round", 100), + evals=[(dtrain, "train"), (dtest, "test")], + verbose_eval=0, + ) + # evaluate + y_pred = bst.predict(dtest) + plt.plot(y_test, 'r') + plt.plot(y_pred, 'b') + + # let's store the eval score + error = np.linalg.norm(y_test-y_pred) + bst.save_model("a_model.xgb") + + Task.current_task().reload() + model_id = Task.current_task().models['output'][-1].id + print("done train_model") + return dict(error=error, model_id=model_id) + + +@PipelineDecorator.component(monitor_models=["best"]) +def select_best_model(models_score: list): + print("starting select_best_model:", models_score) + from clearml import OutputModel, Task + best_model = None + for m in models_score: + if not best_model or m["error"] < best_model["error"]: + best_model = m + + print("The best model is {}".format(best_model)) + # lets store it on the pipeline + best_model = OutputModel(base_model_id=best_model["model_id"]) + # let's make sure we have it + best_model.connect(task=Task.current_task(), name="best") + + print("done select_best_model") + return best_model.id + + +@PipelineDecorator.pipeline( + name='xgboost_pipeline', + project='xgboost_pipe_demo', + version='0.1' +) +def pipeline(data_url: str, project: str): + + dataset_id = create_dataset(source_url=data_url, project=project, dataset_name="mock") + + preprocessed_dataset_id = preprocess_dataset(dataset_id=dataset_id) + + if not bool(verify_dataset_integrity( + dataset_id=preprocessed_dataset_id, + expected_num_columns=15) + ): + print("Verification Failed!") + return False + + print("start training models") + models_score = [] + for i in [100, 150]: + model_score = train_model( + dataset_id=preprocessed_dataset_id, training_args=dict(num_boost_round=i) + ) + models_score.append(model_score) + + model_id = select_best_model(models_score=models_score) + print("selected model_id = {}".format(model_id)) + + +if __name__ == '__main__': + url = "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data" + + # comment to run the entire pipeline remotely + if Task.running_locally(): + # this is for demonstration purpose only, + # it will run the entire pipeline logic and components locally + PipelineDecorator.run_locally() + + pipeline(data_url=url, project="xgboost_pipe_demo") diff --git a/examples/pipeline/requirements.txt b/examples/pipeline/requirements.txt index 07819fbf..788f94d0 100644 --- a/examples/pipeline/requirements.txt +++ b/examples/pipeline/requirements.txt @@ -2,4 +2,5 @@ joblib>=0.13.2 matplotlib >= 3.1.1 ; python_version >= '3.6' matplotlib >= 2.2.4 ; python_version < '3.6' scikit-learn +pandas clearml \ No newline at end of file