mirror of
https://github.com/clearml/clearml
synced 2025-04-03 20:41:07 +00:00
Add tabular-data multi stage pipeline process example
This commit is contained in:
parent
8968753467
commit
52d8835710
179
examples/pipeline/full_tabular_data_process_pipeline_example.py
Normal file
179
examples/pipeline/full_tabular_data_process_pipeline_example.py
Normal file
@ -0,0 +1,179 @@
|
||||
from clearml import PipelineDecorator, Task
|
||||
|
||||
|
||||
@PipelineDecorator.component(cache=True)
|
||||
def create_dataset(source_url: str, project: str, dataset_name: str) -> str:
|
||||
print("starting create_dataset")
|
||||
from clearml import StorageManager, Dataset
|
||||
import pandas as pd
|
||||
local_file = StorageManager.get_local_copy(source_url)
|
||||
df = pd.read_csv(local_file, header=None)
|
||||
df.to_csv(path_or_buf="./dataset.csv", index=False)
|
||||
dataset = Dataset.create(dataset_project=project, dataset_name=dataset_name)
|
||||
dataset.add_files("./dataset.csv")
|
||||
dataset.get_logger().report_table(title="sample", series="head", table_plot=df.head())
|
||||
dataset.finalize(auto_upload=True)
|
||||
|
||||
print("done create_dataset")
|
||||
return dataset.id
|
||||
|
||||
|
||||
@PipelineDecorator.component(cache=True)
|
||||
def preprocess_dataset(dataset_id: str):
|
||||
print("starting preprocess_dataset")
|
||||
from clearml import Dataset
|
||||
from pathlib import Path
|
||||
import pandas as pd
|
||||
dataset = Dataset.get(dataset_id=dataset_id)
|
||||
local_folder = dataset.get_local_copy()
|
||||
df = pd.read_csv(Path(local_folder) / "dataset.csv", header=None)
|
||||
# "preprocessing" - adding columns
|
||||
df.columns = [
|
||||
'age', 'workclass', 'fnlwgt', 'degree', 'education-yrs', 'marital-status',
|
||||
'occupation', 'relationship', 'ethnicity', 'gender', 'capital-gain',
|
||||
'capital-loss', 'hours-per-week', 'native-country', 'income-cls',
|
||||
]
|
||||
df.to_csv(path_or_buf="./dataset.csv", index=False)
|
||||
|
||||
# store in a new dataset
|
||||
new_dataset = Dataset.create(
|
||||
dataset_project=dataset.project, dataset_name="{} v2".format(dataset.name),
|
||||
parent_datasets=[dataset]
|
||||
)
|
||||
new_dataset.add_files("./dataset.csv")
|
||||
new_dataset.get_logger().report_table(title="sample", series="head", table_plot=df.head())
|
||||
new_dataset.finalize(auto_upload=True)
|
||||
|
||||
print("done preprocess_dataset")
|
||||
return new_dataset.id
|
||||
|
||||
|
||||
@PipelineDecorator.component(cache=True)
|
||||
def verify_dataset_integrity(dataset_id: str, expected_num_columns: int):
|
||||
print("starting verify_dataset_integrity")
|
||||
from clearml import Dataset, Logger
|
||||
from pathlib import Path
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
dataset = Dataset.get(dataset_id=dataset_id)
|
||||
local_folder = dataset.get_local_copy()
|
||||
df = pd.read_csv(Path(local_folder) / "dataset.csv")
|
||||
print("Verifying dataset")
|
||||
assert len(df.columns) == expected_num_columns
|
||||
print("PASSED")
|
||||
# log some stats on the age column
|
||||
Logger.current_logger().report_histogram(
|
||||
title="histogram", series="age", values=np.histogram(df["age"])
|
||||
)
|
||||
|
||||
print("done verify_dataset_integrity")
|
||||
return True
|
||||
|
||||
|
||||
@PipelineDecorator.component(output_uri=True)
|
||||
def train_model(dataset_id: str, training_args: dict):
|
||||
print("starting train_model")
|
||||
from clearml import Dataset, OutputModel, Task
|
||||
from pathlib import Path
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import xgboost as xgb
|
||||
from sklearn.model_selection import train_test_split
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
dataset = Dataset.get(dataset_id=dataset_id)
|
||||
local_folder = dataset.get_local_copy()
|
||||
df = pd.read_csv(Path(local_folder) / "dataset.csv")
|
||||
|
||||
# prepare data (i.e. select specific columns)
|
||||
columns = ["age", "fnlwgt", "education-yrs", "capital-gain", "capital-loss", "hours-per-week"]
|
||||
X = df[columns].drop("age", axis=1)
|
||||
y = df["age"]
|
||||
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
|
||||
# create matrix
|
||||
dtrain = xgb.DMatrix(X_train, label=y_train)
|
||||
dtest = xgb.DMatrix(X_test, label=y_test)
|
||||
|
||||
# train with XGBoost
|
||||
params = {"objective": "reg:squarederror", "eval_metric": "rmse"}
|
||||
bst = xgb.train(
|
||||
params,
|
||||
dtrain,
|
||||
num_boost_round=training_args.get("num_boost_round", 100),
|
||||
evals=[(dtrain, "train"), (dtest, "test")],
|
||||
verbose_eval=0,
|
||||
)
|
||||
# evaluate
|
||||
y_pred = bst.predict(dtest)
|
||||
plt.plot(y_test, 'r')
|
||||
plt.plot(y_pred, 'b')
|
||||
|
||||
# let's store the eval score
|
||||
error = np.linalg.norm(y_test-y_pred)
|
||||
bst.save_model("a_model.xgb")
|
||||
|
||||
Task.current_task().reload()
|
||||
model_id = Task.current_task().models['output'][-1].id
|
||||
print("done train_model")
|
||||
return dict(error=error, model_id=model_id)
|
||||
|
||||
|
||||
@PipelineDecorator.component(monitor_models=["best"])
|
||||
def select_best_model(models_score: list):
|
||||
print("starting select_best_model:", models_score)
|
||||
from clearml import OutputModel, Task
|
||||
best_model = None
|
||||
for m in models_score:
|
||||
if not best_model or m["error"] < best_model["error"]:
|
||||
best_model = m
|
||||
|
||||
print("The best model is {}".format(best_model))
|
||||
# lets store it on the pipeline
|
||||
best_model = OutputModel(base_model_id=best_model["model_id"])
|
||||
# let's make sure we have it
|
||||
best_model.connect(task=Task.current_task(), name="best")
|
||||
|
||||
print("done select_best_model")
|
||||
return best_model.id
|
||||
|
||||
|
||||
@PipelineDecorator.pipeline(
|
||||
name='xgboost_pipeline',
|
||||
project='xgboost_pipe_demo',
|
||||
version='0.1'
|
||||
)
|
||||
def pipeline(data_url: str, project: str):
|
||||
|
||||
dataset_id = create_dataset(source_url=data_url, project=project, dataset_name="mock")
|
||||
|
||||
preprocessed_dataset_id = preprocess_dataset(dataset_id=dataset_id)
|
||||
|
||||
if not bool(verify_dataset_integrity(
|
||||
dataset_id=preprocessed_dataset_id,
|
||||
expected_num_columns=15)
|
||||
):
|
||||
print("Verification Failed!")
|
||||
return False
|
||||
|
||||
print("start training models")
|
||||
models_score = []
|
||||
for i in [100, 150]:
|
||||
model_score = train_model(
|
||||
dataset_id=preprocessed_dataset_id, training_args=dict(num_boost_round=i)
|
||||
)
|
||||
models_score.append(model_score)
|
||||
|
||||
model_id = select_best_model(models_score=models_score)
|
||||
print("selected model_id = {}".format(model_id))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data"
|
||||
|
||||
# comment to run the entire pipeline remotely
|
||||
if Task.running_locally():
|
||||
# this is for demonstration purpose only,
|
||||
# it will run the entire pipeline logic and components locally
|
||||
PipelineDecorator.run_locally()
|
||||
|
||||
pipeline(data_url=url, project="xgboost_pipe_demo")
|
@ -2,4 +2,5 @@ joblib>=0.13.2
|
||||
matplotlib >= 3.1.1 ; python_version >= '3.6'
|
||||
matplotlib >= 2.2.4 ; python_version < '3.6'
|
||||
scikit-learn
|
||||
pandas
|
||||
clearml
|
Loading…
Reference in New Issue
Block a user