Add Pipeline Controller automation and example

This commit is contained in:
allegroai
2020-09-01 18:00:19 +03:00
parent 4f06ab1c59
commit 5beecbb078
7 changed files with 884 additions and 8 deletions

View File

@@ -0,0 +1,24 @@
from trains import Task
from trains.automation.controller import PipelineController
task = Task.init(project_name='examples', task_name='pipeline demo', task_type=Task.TaskTypes.controller)
pipe = PipelineController(default_execution_queue='default')
pipe.add_step(name='stage_data', base_task_project='examples', base_task_name='pipeline step 1 dataset artifact')
pipe.add_step(name='stage_process', parents=['stage_data', ],
base_task_project='examples', base_task_name='pipeline step 2 process dataset',
parameter_override={'General/dataset_url': '${stage_data.artifacts.dataset.url}',
'General/test_size': '0.25'})
pipe.add_step(name='stage_train', parents=['stage_process', ],
base_task_project='examples', base_task_name='pipeline step 3 train model',
parameter_override={'General/dataset_task_id': '${stage_process.id}'})
# Starting the pipeline (in the background)
pipe.start()
# Wait until pipeline terminates
pipe.wait()
# cleanup everything
pipe.stop()
print('done')

View File

@@ -0,0 +1,19 @@
from trains import Task, StorageManager
# create an dataset experiment
task = Task.init(project_name="examples", task_name="pipeline step 1 dataset artifact")
# only create the task, we will actually execute it later
task.execute_remotely()
# simulate local dataset, download one, so we have something local
local_iris_pkl = StorageManager.get_local_copy(
remote_url='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl')
# add and upload local file containing our toy dataset
task.upload_artifact('dataset', artifact_object=local_iris_pkl)
print('uploading artifacts in the background')
# we are done
print('Done')

View File

@@ -0,0 +1,55 @@
import pickle
from trains import Task, StorageManager
from sklearn.model_selection import train_test_split
# Connecting TRAINS
task = Task.init(project_name="examples", task_name="pipeline step 2 process dataset")
# program arguments
# Use either dataset_task_id to point to a tasks artifact or
# use a direct url with dataset_url
args = {
'dataset_task_id': '',
'dataset_url': '',
'random_state': 42,
'test_size': 0.2,
}
# store arguments, later we will be able to change them from outside the code
task.connect(args)
print('Arguments: {}'.format(args))
# only create the task, we will actually execute it later
task.execute_remotely()
# get dataset from task's artifact
if args['dataset_task_id']:
dataset_upload_task = Task.get_task(task_id=args['dataset_task_id'])
print('Input task id={} artifacts {}'.format(args['dataset_task_id'], list(dataset_upload_task.artifacts.keys())))
# download the artifact
iris_pickle = dataset_upload_task.artifacts['dataset'].get_local_copy()
# get the dataset from a direct url
elif args['dataset_url']:
iris_pickle = StorageManager.get_local_copy(remote_url=args['dataset_url'])
else:
raise ValueError("Missing dataset link")
# open the local copy
iris = pickle.load(open(iris_pickle, 'rb'))
# "process" data
X = iris.data
y = iris.target
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=args['test_size'], random_state=args['random_state'])
# upload processed data
print('Uploading process dataset')
task.upload_artifact('X_train', X_train)
task.upload_artifact('X_test', X_test)
task.upload_artifact('y_train', y_train)
task.upload_artifact('y_test', y_test)
print('Notice, artifacts are uploaded in the background')
print('Done')

View File

@@ -0,0 +1,56 @@
import joblib
import matplotlib.pyplot as plt
import numpy as np
from sklearn.linear_model import LogisticRegression
from trains import Task
# Connecting TRAINS
task = Task.init(project_name="examples", task_name="pipeline step 3 train model")
# Arguments
args = {
'dataset_task_id': 'REPLACE_WITH_DATASET_TASK_ID',
}
task.connect(args)
# only create the task, we will actually execute it later
task.execute_remotely()
print('Retrieving Iris dataset')
dataset_task = Task.get_task(task_id=args['dataset_task_id'])
X_train = dataset_task.artifacts['X_train'].get()
X_test = dataset_task.artifacts['X_test'].get()
y_train = dataset_task.artifacts['y_train'].get()
y_test = dataset_task.artifacts['y_test'].get()
print('Iris dataset loaded')
model = LogisticRegression(solver='liblinear', multi_class='auto')
model.fit(X_train, y_train)
joblib.dump(model, 'model.pkl', compress=True)
loaded_model = joblib.load('model.pkl')
result = loaded_model.score(X_test, y_test)
print('model trained & stored')
x_min, x_max = X_test[:, 0].min() - .5, X_test[:, 0].max() + .5
y_min, y_max = X_test[:, 1].min() - .5, X_test[:, 1].max() + .5
h = .02 # step size in the mesh
xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))
plt.figure(1, figsize=(4, 3))
plt.scatter(X_test[:, 0], X_test[:, 1], c=y_test, edgecolors='k', cmap=plt.cm.Paired)
plt.xlabel('Sepal length')
plt.ylabel('Sepal width')
plt.xlim(xx.min(), xx.max())
plt.ylim(yy.min(), yy.max())
plt.xticks(())
plt.yticks(())
plt.title('Iris Types')
plt.show()
print('Done')