mirror of
https://github.com/clearml/clearml
synced 2025-04-02 00:26:05 +00:00
Add Markdown in pipeline jupyter notebooks (#502)
* Add jupyter notebook to pipeline * Add markdown explanations to Jupyter Notebook about pipeline
This commit is contained in:
parent
b8a06c6d9c
commit
f5040b59d7
@ -37,11 +37,15 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"task = Task.init(project_name='Table Example', task_name='tabular preprocessing')\n",
|
||||
"task = Task.init(project_name=\"Table Example\", task_name=\"tabular preprocessing\")\n",
|
||||
"logger = task.get_logger()\n",
|
||||
"configuration_dict = {'test_size': 0.1, 'split_random_state': 0}\n",
|
||||
"configuration_dict = task.connect(configuration_dict) # enabling configuration override by clearml\n",
|
||||
"print(configuration_dict) # printing actual configuration (after override in remote mode)"
|
||||
"configuration_dict = {\"test_size\": 0.1, \"split_random_state\": 0}\n",
|
||||
"configuration_dict = task.connect(\n",
|
||||
" configuration_dict\n",
|
||||
") # enabling configuration override by clearml\n",
|
||||
"print(\n",
|
||||
" configuration_dict\n",
|
||||
") # printing actual configuration (after override in remote mode)"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -52,8 +56,8 @@
|
||||
"source": [
|
||||
"# Download shelter-animal-outcomes dataset (https://www.kaggle.com/c/shelter-animal-outcomes)\n",
|
||||
"# This dataset aims to improve understanding trends in animal outcome,\n",
|
||||
"# Which could help shelters focus their energy on specific animals who need extra help finding a new home. \n",
|
||||
"path_to_ShelterAnimal = './data'"
|
||||
"# Which could help shelters focus their energy on specific animals who need extra help finding a new home.\n",
|
||||
"path_to_ShelterAnimal = \"./data\""
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -62,8 +66,13 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"train_set = pd.read_csv(Path(path_to_ShelterAnimal) / 'train.csv')\n",
|
||||
"logger.report_table(title='Trainset - raw',series='pandas DataFrame',iteration=0, table_plot=train_set.head())"
|
||||
"train_set = pd.read_csv(Path(path_to_ShelterAnimal) / \"train.csv\")\n",
|
||||
"logger.report_table(\n",
|
||||
" title=\"Trainset - raw\",\n",
|
||||
" series=\"pandas DataFrame\",\n",
|
||||
" iteration=0,\n",
|
||||
" table_plot=train_set.head(),\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -80,9 +89,9 @@
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# Remove hour and year from DateTime data\n",
|
||||
"timestamp = pd.to_datetime(train_set['DateTime'])\n",
|
||||
"timestamp = pd.to_datetime(train_set[\"DateTime\"])\n",
|
||||
"months = [d.month for d in timestamp]\n",
|
||||
"train_set['Month'] = pd.DataFrame(months).astype('object')"
|
||||
"train_set[\"Month\"] = pd.DataFrame(months).astype(\"object\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -91,23 +100,23 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"age = train_set['AgeuponOutcome']\n",
|
||||
"age = train_set[\"AgeuponOutcome\"]\n",
|
||||
"months_age = []\n",
|
||||
"for val in age:\n",
|
||||
" if pd.isnull(val):\n",
|
||||
" months_age.append(val)\n",
|
||||
" else:\n",
|
||||
" amount, time_type = val.split(' ')\n",
|
||||
" if 'day' in time_type:\n",
|
||||
" mult = 1./30\n",
|
||||
" if 'week' in time_type:\n",
|
||||
" mult = 1./4\n",
|
||||
" if 'month' in time_type:\n",
|
||||
" mult = 1.\n",
|
||||
" if 'year' in time_type:\n",
|
||||
" mult = 12.\n",
|
||||
" amount, time_type = val.split(\" \")\n",
|
||||
" if \"day\" in time_type:\n",
|
||||
" mult = 1.0 / 30\n",
|
||||
" if \"week\" in time_type:\n",
|
||||
" mult = 1.0 / 4\n",
|
||||
" if \"month\" in time_type:\n",
|
||||
" mult = 1.0\n",
|
||||
" if \"year\" in time_type:\n",
|
||||
" mult = 12.0\n",
|
||||
" months_age.append(int(amount) * mult)\n",
|
||||
"train_set['Age'] = pd.DataFrame(months_age).astype(np.float32)"
|
||||
"train_set[\"Age\"] = pd.DataFrame(months_age).astype(np.float32)"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -116,26 +125,26 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"sex_neutered = train_set['SexuponOutcome']\n",
|
||||
"sex_neutered = train_set[\"SexuponOutcome\"]\n",
|
||||
"sex = []\n",
|
||||
"neutered = []\n",
|
||||
"for val in sex_neutered:\n",
|
||||
" if pd.isnull(val):\n",
|
||||
" sex.append(val)\n",
|
||||
" neutered.append(val)\n",
|
||||
" elif 'Unknown' in val:\n",
|
||||
" elif \"Unknown\" in val:\n",
|
||||
" sex.append(np.nan)\n",
|
||||
" neutered.append(np.nan)\n",
|
||||
" else:\n",
|
||||
" n, s = val.split(' ')\n",
|
||||
" if n in ['Neutered', 'Spayed']:\n",
|
||||
" neutered.append('Yes')\n",
|
||||
" n, s = val.split(\" \")\n",
|
||||
" if n in [\"Neutered\", \"Spayed\"]:\n",
|
||||
" neutered.append(\"Yes\")\n",
|
||||
" else:\n",
|
||||
" neutered.append('No')\n",
|
||||
" neutered.append(\"No\")\n",
|
||||
" sex.append(s)\n",
|
||||
"\n",
|
||||
"train_set['Sex'] = pd.DataFrame(sex)\n",
|
||||
"train_set['Neutered'] = pd.DataFrame(neutered)"
|
||||
"train_set[\"Sex\"] = pd.DataFrame(sex)\n",
|
||||
"train_set[\"Neutered\"] = pd.DataFrame(neutered)"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -145,8 +154,23 @@
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# Remove irrelevant columns\n",
|
||||
"train_set.drop(columns= ['Name', 'OutcomeSubtype', 'AnimalID', 'DateTime', 'AgeuponOutcome', 'SexuponOutcome'], inplace=True)\n",
|
||||
"logger.report_table(title='Trainset - after preprocessing',series='pandas DataFrame',iteration=0, table_plot=train_set.head())"
|
||||
"train_set.drop(\n",
|
||||
" columns=[\n",
|
||||
" \"Name\",\n",
|
||||
" \"OutcomeSubtype\",\n",
|
||||
" \"AnimalID\",\n",
|
||||
" \"DateTime\",\n",
|
||||
" \"AgeuponOutcome\",\n",
|
||||
" \"SexuponOutcome\",\n",
|
||||
" ],\n",
|
||||
" inplace=True,\n",
|
||||
")\n",
|
||||
"logger.report_table(\n",
|
||||
" title=\"Trainset - after preprocessing\",\n",
|
||||
" series=\"pandas DataFrame\",\n",
|
||||
" iteration=0,\n",
|
||||
" table_plot=train_set.head(),\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -162,8 +186,8 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"object_columns = train_set.select_dtypes(include=['object']).copy()\n",
|
||||
"numerical_columns = train_set.select_dtypes(include=['number']).copy()"
|
||||
"object_columns = train_set.select_dtypes(include=[\"object\"]).copy()\n",
|
||||
"numerical_columns = train_set.select_dtypes(include=[\"number\"]).copy()"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -176,7 +200,7 @@
|
||||
" if object_columns[col].isnull().sum() > 0:\n",
|
||||
" most_common = Counter(object_columns[col]).most_common(1)[0][0]\n",
|
||||
" print('Column \"{}\": replacing null values with \"{}\"'.format(col, most_common))\n",
|
||||
" train_set[col].fillna(most_common, inplace=True)\n"
|
||||
" train_set[col].fillna(most_common, inplace=True)"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -198,7 +222,12 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"logger.report_table(title='Trainset - after filling missing values',series='pandas DataFrame',iteration=0, table_plot=train_set.head())"
|
||||
"logger.report_table(\n",
|
||||
" title=\"Trainset - after filling missing values\",\n",
|
||||
" series=\"pandas DataFrame\",\n",
|
||||
" iteration=0,\n",
|
||||
" table_plot=train_set.head(),\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -214,9 +243,9 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"out_encoding = train_set['OutcomeType'].astype('category').cat.categories\n",
|
||||
"outcome_dict = {key: val for val,key in enumerate(out_encoding)}\n",
|
||||
"task.upload_artifact('Outcome dictionary', outcome_dict)"
|
||||
"out_encoding = train_set[\"OutcomeType\"].astype(\"category\").cat.categories\n",
|
||||
"outcome_dict = {key: val for val, key in enumerate(out_encoding)}\n",
|
||||
"task.upload_artifact(\"Outcome dictionary\", outcome_dict)"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -226,8 +255,13 @@
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"for col in object_columns.columns:\n",
|
||||
" train_set[col] = train_set[col].astype('category').cat.codes\n",
|
||||
"logger.report_table(title='Trainset - after labels encoding',series='pandas DataFrame',iteration=0, table_plot=train_set.head())"
|
||||
" train_set[col] = train_set[col].astype(\"category\").cat.codes\n",
|
||||
"logger.report_table(\n",
|
||||
" title=\"Trainset - after labels encoding\",\n",
|
||||
" series=\"pandas DataFrame\",\n",
|
||||
" iteration=0,\n",
|
||||
" table_plot=train_set.head(),\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -243,10 +277,14 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"X = train_set.drop(columns= ['OutcomeType'])\n",
|
||||
"Y = train_set['OutcomeType']\n",
|
||||
"X_train, X_val, Y_train, Y_val = train_test_split(X, Y, test_size=configuration_dict.get('test_size', 0.1), \n",
|
||||
" random_state=configuration_dict.get('split_random_state', 0))"
|
||||
"X = train_set.drop(columns=[\"OutcomeType\"])\n",
|
||||
"Y = train_set[\"OutcomeType\"]\n",
|
||||
"X_train, X_val, Y_train, Y_val = train_test_split(\n",
|
||||
" X,\n",
|
||||
" Y,\n",
|
||||
" test_size=configuration_dict.get(\"test_size\", 0.1),\n",
|
||||
" random_state=configuration_dict.get(\"split_random_state\", 0),\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -256,11 +294,11 @@
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# making all variables categorical\n",
|
||||
"object_columns_names = object_columns.drop(columns= ['OutcomeType']).columns\n",
|
||||
"object_columns_names = object_columns.drop(columns=[\"OutcomeType\"]).columns\n",
|
||||
"for col in object_columns_names:\n",
|
||||
" X[col] = X[col].astype('category')\n",
|
||||
"columns_categries = {col: len(X[col].cat.categories) for col in object_columns_names}\n",
|
||||
"task.upload_artifact('Categries per column', columns_categries)"
|
||||
" X[col] = X[col].astype(\"category\")\n",
|
||||
"columns_categories = {col: len(X[col].cat.categories) for col in object_columns_names}\n",
|
||||
"task.upload_artifact(\"Categries per column\", columns_categories)"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -270,9 +308,9 @@
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"train_df = X_train.join(Y_train)\n",
|
||||
"train_df.to_csv(Path(path_to_ShelterAnimal) / 'train_processed.csv', index=False)\n",
|
||||
"train_df.to_csv(Path(path_to_ShelterAnimal) / \"train_processed.csv\", index=False)\n",
|
||||
"val_df = X_val.join(Y_val)\n",
|
||||
"val_df.to_csv(Path(path_to_ShelterAnimal) / 'val_processed.csv', index=False)"
|
||||
"val_df.to_csv(Path(path_to_ShelterAnimal) / \"val_processed.csv\", index=False)"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -281,8 +319,11 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"paths = {'train_data': str(Path(path_to_ShelterAnimal) / 'train_processed.csv'), 'val_data': str(Path(path_to_ShelterAnimal) / 'val_processed.csv')}\n",
|
||||
"task.upload_artifact('Processed data', paths)"
|
||||
"paths = {\n",
|
||||
" \"train_data\": str(Path(path_to_ShelterAnimal) / \"train_processed.csv\"),\n",
|
||||
" \"val_data\": str(Path(path_to_ShelterAnimal) / \"val_processed.csv\"),\n",
|
||||
"}\n",
|
||||
"task.upload_artifact(\"Processed data\", paths)"
|
||||
]
|
||||
}
|
||||
],
|
||||
@ -302,9 +343,9 @@
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.7.4"
|
||||
"version": "3.8.10"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 4
|
||||
}
|
||||
}
|
@ -32,11 +32,17 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"task = Task.init(project_name='Tabular Example', task_name='Download and split tabular dataset')\n",
|
||||
"task = Task.init(\n",
|
||||
" project_name=\"Tabular Example\", task_name=\"Download and split tabular dataset\"\n",
|
||||
")\n",
|
||||
"logger = task.get_logger()\n",
|
||||
"configuration_dict = {'test_size': 0.1, 'split_random_state': 0}\n",
|
||||
"configuration_dict = task.connect(configuration_dict) # enabling configuration override by clearml\n",
|
||||
"print(configuration_dict) # printing actual configuration (after override in remote mode)"
|
||||
"configuration_dict = {\"test_size\": 0.1, \"split_random_state\": 0}\n",
|
||||
"configuration_dict = task.connect(\n",
|
||||
" configuration_dict\n",
|
||||
") # enabling configuration override by clearml\n",
|
||||
"print(\n",
|
||||
" configuration_dict\n",
|
||||
") # printing actual configuration (after override in remote mode)"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -56,9 +62,9 @@
|
||||
"# and save it to your cloud storage or your mounted local storage\n",
|
||||
"# If the data is on your cloud storage, you can use clearml' storage manager to get a local copy of it:\n",
|
||||
"# from clearml.storage import StorageManager\n",
|
||||
"# path_to_ShelterAnimal = StorageManager.get_local_copy(\"https://allegro-datasets.s3.amazonaws.com/clearml/UrbanSound8K.zip\", \n",
|
||||
"# path_to_ShelterAnimal = StorageManager.get_local_copy(\"https://allegro-datasets.s3.amazonaws.com/clearml/UrbanSound8K.zip\",\n",
|
||||
"# extract_archive=True)\n",
|
||||
"path_to_ShelterAnimal = '/home/sam/Datasets/shelter-animal-outcomes'"
|
||||
"path_to_ShelterAnimal = \"/home/sam/Datasets/shelter-animal-outcomes\""
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -67,8 +73,13 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"train_set = pd.read_csv(Path(path_to_ShelterAnimal) / 'train.csv')\n",
|
||||
"logger.report_table(title='Trainset - raw',series='pandas DataFrame',iteration=0, table_plot=train_set.head())"
|
||||
"train_set = pd.read_csv(Path(path_to_ShelterAnimal) / \"train.csv\")\n",
|
||||
"logger.report_table(\n",
|
||||
" title=\"Trainset - raw\",\n",
|
||||
" series=\"pandas DataFrame\",\n",
|
||||
" iteration=0,\n",
|
||||
" table_plot=train_set.head(),\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -84,10 +95,14 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"X = train_set.drop(columns= ['OutcomeType'])\n",
|
||||
"Y = train_set['OutcomeType']\n",
|
||||
"X_train, X_val, Y_train, Y_val = train_test_split(X, Y, test_size=configuration_dict.get('test_size', 0.1), \n",
|
||||
" random_state=configuration_dict.get('split_random_state', 0))"
|
||||
"X = train_set.drop(columns=[\"OutcomeType\"])\n",
|
||||
"Y = train_set[\"OutcomeType\"]\n",
|
||||
"X_train, X_val, Y_train, Y_val = train_test_split(\n",
|
||||
" X,\n",
|
||||
" Y,\n",
|
||||
" test_size=configuration_dict.get(\"test_size\", 0.1),\n",
|
||||
" random_state=configuration_dict.get(\"split_random_state\", 0),\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -106,8 +121,8 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"task.upload_artifact('train_data', artifact_object=train_df)\n",
|
||||
"task.upload_artifact('val_data', artifact_object=val_df)"
|
||||
"task.upload_artifact(\"train_data\", artifact_object=train_df)\n",
|
||||
"task.upload_artifact(\"val_data\", artifact_object=val_df)"
|
||||
]
|
||||
}
|
||||
],
|
||||
@ -127,7 +142,7 @@
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.7.4"
|
||||
"version": "3.8.10"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
|
@ -1,5 +1,12 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Pick Best Model Step"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
@ -19,16 +26,45 @@
|
||||
"from clearml import Task, OutputModel"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Configure Task\n",
|
||||
"Instantiate a ClearML Task using `Task.init`. \n",
|
||||
"\n",
|
||||
"A Configuration dictionary is connected to the task using `Task.connect`. This will enable the pipeline controller to access this task's configurations and override the values when the pipeline is executed.\n",
|
||||
"\n",
|
||||
"Notice in the [pipeline controller script](tabular_ml_pipeline.ipynb) that when this task is added as a step in the pipeline, the value of `train_task_ids` is overridden. "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"task = Task.init(project_name='Tabular Example', task_name='pick best model')\n",
|
||||
"configuration_dict = {'train_tasks_ids': ['c9bff3d15309487a9e5aaa00358ff091', 'c9bff3d15309487a9e5aaa00358ff091']}\n",
|
||||
"configuration_dict = task.connect(configuration_dict) # enabling configuration override by clearml\n",
|
||||
"print(configuration_dict) # printing actual configuration (after override in remote mode)"
|
||||
"task = Task.init(project_name=\"Tabular Example\", task_name=\"pick best model\")\n",
|
||||
"configuration_dict = {\n",
|
||||
" \"train_tasks_ids\": [\n",
|
||||
" \"c9bff3d15309487a9e5aaa00358ff091\",\n",
|
||||
" \"c9bff3d15309487a9e5aaa00358ff091\",\n",
|
||||
" ]\n",
|
||||
"}\n",
|
||||
"configuration_dict = task.connect(\n",
|
||||
" configuration_dict\n",
|
||||
") # enabling configuration override by clearml\n",
|
||||
"print(\n",
|
||||
" configuration_dict\n",
|
||||
") # printing actual configuration (after override in remote mode)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Compare Models\n",
|
||||
"The task retrieves the IDs of the training tasks from the configuration dictionary. Then each training task's last scalar metrics are retrieved and compared."
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -38,9 +74,9 @@
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"results = {}\n",
|
||||
"for task_id in configuration_dict.get('train_tasks_ids'):\n",
|
||||
"for task_id in configuration_dict.get(\"train_tasks_ids\"):\n",
|
||||
" train_task = Task.get_task(task_id)\n",
|
||||
" results[task_id] = train_task.get_last_scalar_metrics()['accuracy']['total']['last']"
|
||||
" results[task_id] = train_task.get_last_scalar_metrics()[\"accuracy\"][\"total\"][\"last\"]"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -88,7 +124,7 @@
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.7.4"
|
||||
"version": "3.8.10"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
|
@ -1,5 +1,12 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Preprocessing Step\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
@ -26,17 +33,13 @@
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"task = Task.init(project_name='Tabular Example', task_name='tabular preprocessing')\n",
|
||||
"logger = task.get_logger()\n",
|
||||
"configuration_dict = {'data_task_id': '39fbf86fc4a341359ac6df4aa70ff91b',\n",
|
||||
" 'fill_categorical_NA': True, 'fill_numerical_NA': True}\n",
|
||||
"configuration_dict = task.connect(configuration_dict) # enabling configuration override by clearml\n",
|
||||
"print(configuration_dict) # printing actual configuration (after override in remote mode)"
|
||||
"## Configure Task\n",
|
||||
"Instantiate a ClearML Task using `Task.init`. \n",
|
||||
"\n",
|
||||
"A Configuration dictionary is connected to the task using `Task.connect`. This will enable the pipeline controller to access this task's configurations and override the value when the pipeline is executed. "
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -45,17 +48,52 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"data_task = Task.get_task(configuration_dict.get('data_task_id'))\n",
|
||||
"train_set = data_task.artifacts['train_data'].get().drop(columns=['Unnamed: 0'])\n",
|
||||
"val_set = data_task.artifacts['val_data'].get().drop(columns=['Unnamed: 0'])\n",
|
||||
"logger.report_table(title='Trainset - raw',series='pandas DataFrame',iteration=0, table_plot=train_set.head())"
|
||||
"task = Task.init(project_name=\"Tabular Example\", task_name=\"tabular preprocessing\")\n",
|
||||
"logger = task.get_logger()\n",
|
||||
"configuration_dict = {\n",
|
||||
" \"data_task_id\": \"39fbf86fc4a341359ac6df4aa70ff91b\",\n",
|
||||
" \"fill_categorical_NA\": True,\n",
|
||||
" \"fill_numerical_NA\": True,\n",
|
||||
"}\n",
|
||||
"configuration_dict = task.connect(\n",
|
||||
" configuration_dict\n",
|
||||
") # enabling configuration override by clearml\n",
|
||||
"print(\n",
|
||||
" configuration_dict\n",
|
||||
") # printing actual configuration (after override in remote mode)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# **Pre-processing**"
|
||||
"## Get Data\n",
|
||||
"\n",
|
||||
"ClearML retrieves that data which will be processed. First, the data task is fetched using `Task.get_task` and inputting the task's ID from the configuration dictionary. Then the data task's artifacts are accessed in order to retrieve the training and validations sets. \n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"data_task = Task.get_task(configuration_dict.get(\"data_task_id\"))\n",
|
||||
"train_set = data_task.artifacts[\"train_data\"].get().drop(columns=[\"Unnamed: 0\"])\n",
|
||||
"val_set = data_task.artifacts[\"val_data\"].get().drop(columns=[\"Unnamed: 0\"])\n",
|
||||
"logger.report_table(\n",
|
||||
" title=\"Trainset - raw\",\n",
|
||||
" series=\"pandas DataFrame\",\n",
|
||||
" iteration=0,\n",
|
||||
" table_plot=train_set.head(),\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Preprocess Data"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -66,12 +104,13 @@
|
||||
"source": [
|
||||
"# Remove hour and year from DateTime data\n",
|
||||
"def change_time_format(data_frame):\n",
|
||||
" timestamp = pd.to_datetime(data_frame['DateTime'])\n",
|
||||
" timestamp = pd.to_datetime(data_frame[\"DateTime\"])\n",
|
||||
" months = [d.month for d in timestamp]\n",
|
||||
" data_frame['Month'] = pd.DataFrame(months).astype('object')\n",
|
||||
" data_frame.drop(columns= ['DateTime'], inplace=True)\n",
|
||||
" data_frame[\"Month\"] = pd.DataFrame(months).astype(\"object\")\n",
|
||||
" data_frame.drop(columns=[\"DateTime\"], inplace=True)\n",
|
||||
" return data_frame\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"train_set = change_time_format(train_set)\n",
|
||||
"val_set = change_time_format(val_set)"
|
||||
]
|
||||
@ -82,27 +121,28 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"def change_age_format(data_frame): \n",
|
||||
" age = data_frame['AgeuponOutcome']\n",
|
||||
"def change_age_format(data_frame):\n",
|
||||
" age = data_frame[\"AgeuponOutcome\"]\n",
|
||||
" months_age = []\n",
|
||||
" for val in age:\n",
|
||||
" if pd.isnull(val):\n",
|
||||
" months_age.append(val)\n",
|
||||
" else:\n",
|
||||
" amount, time_type = val.split(' ')\n",
|
||||
" if 'day' in time_type:\n",
|
||||
" mult = 1./30\n",
|
||||
" if 'week' in time_type:\n",
|
||||
" mult = 1./4\n",
|
||||
" if 'month' in time_type:\n",
|
||||
" mult = 1.\n",
|
||||
" if 'year' in time_type:\n",
|
||||
" mult = 12.\n",
|
||||
" amount, time_type = val.split(\" \")\n",
|
||||
" if \"day\" in time_type:\n",
|
||||
" mult = 1.0 / 30\n",
|
||||
" if \"week\" in time_type:\n",
|
||||
" mult = 1.0 / 4\n",
|
||||
" if \"month\" in time_type:\n",
|
||||
" mult = 1.0\n",
|
||||
" if \"year\" in time_type:\n",
|
||||
" mult = 12.0\n",
|
||||
" months_age.append(int(amount) * mult)\n",
|
||||
" data_frame['Age'] = pd.DataFrame(months_age).astype(np.float32)\n",
|
||||
" data_frame.drop(columns= ['AgeuponOutcome'], inplace=True)\n",
|
||||
" data_frame[\"Age\"] = pd.DataFrame(months_age).astype(np.float32)\n",
|
||||
" data_frame.drop(columns=[\"AgeuponOutcome\"], inplace=True)\n",
|
||||
" return data_frame\n",
|
||||
" \n",
|
||||
"\n",
|
||||
"\n",
|
||||
"train_set = change_age_format(train_set)\n",
|
||||
"val_set = change_age_format(val_set)"
|
||||
]
|
||||
@ -113,30 +153,31 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"def change_sex_format(data_frame): \n",
|
||||
" sex_neutered = data_frame['SexuponOutcome']\n",
|
||||
"def change_sex_format(data_frame):\n",
|
||||
" sex_neutered = data_frame[\"SexuponOutcome\"]\n",
|
||||
" sex = []\n",
|
||||
" neutered = []\n",
|
||||
" for val in sex_neutered:\n",
|
||||
" if pd.isnull(val):\n",
|
||||
" sex.append(val)\n",
|
||||
" neutered.append(val)\n",
|
||||
" elif 'Unknown' in val:\n",
|
||||
" elif \"Unknown\" in val:\n",
|
||||
" sex.append(np.nan)\n",
|
||||
" neutered.append(np.nan)\n",
|
||||
" else:\n",
|
||||
" n, s = val.split(' ')\n",
|
||||
" if n in ['Neutered', 'Spayed']:\n",
|
||||
" neutered.append('Yes')\n",
|
||||
" n, s = val.split(\" \")\n",
|
||||
" if n in [\"Neutered\", \"Spayed\"]:\n",
|
||||
" neutered.append(\"Yes\")\n",
|
||||
" else:\n",
|
||||
" neutered.append('No')\n",
|
||||
" neutered.append(\"No\")\n",
|
||||
" sex.append(s)\n",
|
||||
"\n",
|
||||
" data_frame['Sex'] = pd.DataFrame(sex)\n",
|
||||
" data_frame['Neutered'] = pd.DataFrame(neutered)\n",
|
||||
" data_frame.drop(columns= ['SexuponOutcome'], inplace=True)\n",
|
||||
" data_frame[\"Sex\"] = pd.DataFrame(sex)\n",
|
||||
" data_frame[\"Neutered\"] = pd.DataFrame(neutered)\n",
|
||||
" data_frame.drop(columns=[\"SexuponOutcome\"], inplace=True)\n",
|
||||
" return data_frame\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"train_set = change_sex_format(train_set)\n",
|
||||
"val_set = change_sex_format(val_set)"
|
||||
]
|
||||
@ -150,13 +191,19 @@
|
||||
"# Remove irrelevant columns\n",
|
||||
"def remove_columns(data_frame, list_columns_names=None):\n",
|
||||
" if list_columns_names is not None:\n",
|
||||
" data_frame.drop(columns= list_columns_names, inplace=True)\n",
|
||||
" data_frame.drop(columns=list_columns_names, inplace=True)\n",
|
||||
" return data_frame\n",
|
||||
"\n",
|
||||
"train_set = remove_columns(train_set, ['Name', 'OutcomeSubtype', 'AnimalID'])\n",
|
||||
"val_set = remove_columns(val_set, ['Name', 'OutcomeSubtype', 'AnimalID'])\n",
|
||||
"\n",
|
||||
"logger.report_table(title='Trainset - after preprocessing',series='pandas DataFrame',iteration=0, table_plot=train_set.head())"
|
||||
"train_set = remove_columns(train_set, [\"Name\", \"OutcomeSubtype\", \"AnimalID\"])\n",
|
||||
"val_set = remove_columns(val_set, [\"Name\", \"OutcomeSubtype\", \"AnimalID\"])\n",
|
||||
"\n",
|
||||
"logger.report_table(\n",
|
||||
" title=\"Trainset - after preprocessing\",\n",
|
||||
" series=\"pandas DataFrame\",\n",
|
||||
" iteration=0,\n",
|
||||
" table_plot=train_set.head(),\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -172,8 +219,15 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"object_columns = train_set.select_dtypes(include=['object']).copy()\n",
|
||||
"numerical_columns = train_set.select_dtypes(include=['number']).copy()"
|
||||
"object_columns = train_set.select_dtypes(include=[\"object\"]).copy()\n",
|
||||
"numerical_columns = train_set.select_dtypes(include=[\"number\"]).copy()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Notice that the configuration dictionary is accessed below to access `fill_categorical_NA`'s value. This value can be overridden by the pipeline controller. "
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -182,26 +236,37 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"if configuration_dict.get('fill_categorical_NA', True):\n",
|
||||
"if configuration_dict.get(\"fill_categorical_NA\", True):\n",
|
||||
" for col in object_columns.columns:\n",
|
||||
" if object_columns[col].isnull().sum() > 0:\n",
|
||||
" most_common = Counter(object_columns[col]).most_common(1)[0][0]\n",
|
||||
" print('Column \"{}\": replacing null values with \"{}\"'.format(col, most_common))\n",
|
||||
" print(\n",
|
||||
" 'Column \"{}\": replacing null values with \"{}\"'.format(col, most_common)\n",
|
||||
" )\n",
|
||||
" train_set[col].fillna(most_common, inplace=True)\n",
|
||||
" val_set[col].fillna(most_common, inplace=True)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Notice that the configuration dictionary is accessed below to access `fill_numerical_NA`'s value. This value can be overridden by the pipeline controller. "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"if configuration_dict.get('fill_numerical_NA', True):\n",
|
||||
"if configuration_dict.get(\"fill_numerical_NA\", True):\n",
|
||||
" for col in numerical_columns.columns:\n",
|
||||
" if numerical_columns[col].isnull().sum() > 0:\n",
|
||||
" median_val = numerical_columns[col].median()\n",
|
||||
" print('Column \"{}\": replacing null values with \"{}\"'.format(col, median_val))\n",
|
||||
" print(\n",
|
||||
" 'Column \"{}\": replacing null values with \"{}\"'.format(col, median_val)\n",
|
||||
" )\n",
|
||||
" train_set[col].fillna(median_val, inplace=True)\n",
|
||||
" val_set[col].fillna(median_val, inplace=True)"
|
||||
]
|
||||
@ -215,8 +280,15 @@
|
||||
"# Drop rows with NA values if were chosen not to be filled\n",
|
||||
"train_set.dropna(inplace=True)\n",
|
||||
"val_set.dropna(inplace=True)\n",
|
||||
"if configuration_dict.get('fill_categorical_NA', True) or configuration_dict.get('fill_numerical_NA', True):\n",
|
||||
" logger.report_table(title='Trainset - after filling missing values',series='pandas DataFrame',iteration=0, table_plot=train_set.head())"
|
||||
"if configuration_dict.get(\"fill_categorical_NA\", True) or configuration_dict.get(\n",
|
||||
" \"fill_numerical_NA\", True\n",
|
||||
"):\n",
|
||||
" logger.report_table(\n",
|
||||
" title=\"Trainset - after filling missing values\",\n",
|
||||
" series=\"pandas DataFrame\",\n",
|
||||
" iteration=0,\n",
|
||||
" table_plot=train_set.head(),\n",
|
||||
" )"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -233,9 +305,9 @@
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"all_data = pd.concat([train_set, val_set])\n",
|
||||
"outcome_categories = all_data['OutcomeType'].astype('category').cat.categories\n",
|
||||
"outcome_dict = {key: val for val,key in enumerate(outcome_categories)}\n",
|
||||
"task.upload_artifact('Outcome dictionary', outcome_dict)"
|
||||
"outcome_categories = all_data[\"OutcomeType\"].astype(\"category\").cat.categories\n",
|
||||
"outcome_dict = {key: val for val, key in enumerate(outcome_categories)}\n",
|
||||
"task.upload_artifact(\"Outcome dictionary\", outcome_dict)"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -245,10 +317,15 @@
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"for col in object_columns.columns:\n",
|
||||
" all_data[col] = all_data[col].astype('category').cat.codes\n",
|
||||
"train_set = all_data.iloc[:len(train_set.index), :]\n",
|
||||
"val_set = all_data.iloc[len(train_set.index):, :]\n",
|
||||
"logger.report_table(title='Trainset - after labels encoding',series='pandas DataFrame',iteration=0, table_plot=train_set.head())"
|
||||
" all_data[col] = all_data[col].astype(\"category\").cat.codes\n",
|
||||
"train_set = all_data.iloc[: len(train_set.index), :]\n",
|
||||
"val_set = all_data.iloc[len(train_set.index) :, :]\n",
|
||||
"logger.report_table(\n",
|
||||
" title=\"Trainset - after labels encoding\",\n",
|
||||
" series=\"pandas DataFrame\",\n",
|
||||
" iteration=0,\n",
|
||||
" table_plot=train_set.head(),\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -258,11 +335,13 @@
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# making all variables categorical\n",
|
||||
"object_columns_names = object_columns.drop(columns= ['OutcomeType']).columns\n",
|
||||
"object_columns_names = object_columns.drop(columns=[\"OutcomeType\"]).columns\n",
|
||||
"for col in object_columns_names:\n",
|
||||
" all_data[col] = all_data[col].astype('category')\n",
|
||||
"columns_categries = {col: len(all_data[col].cat.categories) for col in object_columns_names}\n",
|
||||
"task.upload_artifact('Categries per column', columns_categries)"
|
||||
" all_data[col] = all_data[col].astype(\"category\")\n",
|
||||
"columns_categories = {\n",
|
||||
" col: len(all_data[col].cat.categories) for col in object_columns_names\n",
|
||||
"}\n",
|
||||
"task.upload_artifact(\"Categories per column\", columns_categories)"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -271,8 +350,8 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"task.upload_artifact('train_data', artifact_object=train_set)\n",
|
||||
"task.upload_artifact('val_data', artifact_object=val_set)"
|
||||
"task.upload_artifact(\"train_data\", artifact_object=train_set)\n",
|
||||
"task.upload_artifact(\"val_data\", artifact_object=val_set)"
|
||||
]
|
||||
}
|
||||
],
|
||||
@ -292,9 +371,9 @@
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.7.4"
|
||||
"version": "3.8.10"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 4
|
||||
}
|
||||
}
|
@ -1,14 +1,51 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Tabular Data Pipeline with Concurrent Steps\n",
|
||||
"\n",
|
||||
"This example demonstrates an ML pipeline which preprocesses data in two concurrent steps, trains two networks, where each network's training depends upon the completion of its own preprocessed data, and picks the best model. It is implemented using the PipelineController class.\n",
|
||||
"\n",
|
||||
"The pipeline uses four tasks (each Task is created using a different notebook):\n",
|
||||
"* The pipeline controller Task (the current task)\n",
|
||||
"* A data preprocessing Task ([preprocessing_and_encoding.ipynb](preprocessing_and_encoding.ipynb))\n",
|
||||
"* A training Task [(train_tabular_predictor.ipynb](train_tabular_predictor.ipynb))\n",
|
||||
"* A comparison Task ([pick_best_model.ipynb](pick_best_model.ipynb))\n",
|
||||
"\n",
|
||||
"In this pipeline example, the data preprocessing Task and training Task are each added to the pipeline twice (each is in two steps). When the pipeline runs, the data preprocessing Task and training Task are cloned twice, and the newly cloned Tasks execute. The Task they are cloned from, called the base Task, does not execute. The pipeline controller passes different data to each cloned Task by overriding parameters. In this way, the same Task can run more than once in the pipeline, but with different data.\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Prerequisite\n",
|
||||
"Make sure to download the data needed for this task. See the [download_and_split.ipynb](download_and_split.ipynb) notebook"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"metadata": {
|
||||
"pycharm": {
|
||||
"is_executing": true
|
||||
}
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Requirement already satisfied: pip in /home/revital/PycharmProjects/venvs/clearml/lib/python3.8/site-packages (21.3.1)\r\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"# pip install with locked versions\n",
|
||||
"! pip install -U pip\n",
|
||||
"! pip install -U clearml==0.16.2rc0"
|
||||
"! pip install -U clearml"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -17,8 +54,7 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from clearml import Task\n",
|
||||
"from clearml.automation.controller import PipelineController"
|
||||
"from clearml import Task, PipelineController"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -27,15 +63,57 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"TABULAR_DATASET_ID = Task.get_task(task_name=\"Download and split tabular dataset\", project_name=\"Tabular Example\").id\n",
|
||||
"TABULAR_DATASET_ID = Task.get_task(\n",
|
||||
" task_name=\"Download and split tabular dataset\", project_name=\"Tabular Example\"\n",
|
||||
").id"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Create Pipeline Controller\n",
|
||||
"\n",
|
||||
"pipe = PipelineController( \n",
|
||||
"The PipelineController class includes functionality to create a pipeline controller, add steps to the pipeline, pass data from one step to another, control the dependencies of a step beginning only after other steps complete, run the pipeline, wait for it to complete, and cleanup afterwards.\n",
|
||||
"\n",
|
||||
"Input the following parameters:\n",
|
||||
"* `name` - Name of the PipelineController task which will created\n",
|
||||
"* `project` - Project which the controller will be associated with\n",
|
||||
"* `version` - Pipeline's version number. This version allows to uniquely identify the pipeline template execution.\n",
|
||||
"* `auto_version_bump` (default True) - if the same pipeline version already exists (with any difference from the current one), the current pipeline version will be bumped to a new version (e.g. 1.0.0 -> 1.0.1 , 1.2 -> 1.3, 10 -> 11)\n",
|
||||
" "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"pipe = PipelineController(\n",
|
||||
" project=\"Tabular Example\",\n",
|
||||
" name=\"tabular training pipeline\", \n",
|
||||
" add_pipeline_tags=True, \n",
|
||||
" version=\"0.1\"\n",
|
||||
")\n",
|
||||
"pipe.set_default_execution_queue(default_execution_queue=\"default\")\n",
|
||||
" name=\"tabular training pipeline\",\n",
|
||||
" add_pipeline_tags=True,\n",
|
||||
" version=\"0.1\",\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Add Preprocessing Step\n",
|
||||
"Two preprocessing nodes are added to the pipeline: `preprocessing_1` and `preprocessing_2`. These two nodes will be cloned from the same base task, created from the [preprocessing_and_encoding.ipynb](preprocessing_and_encoding.ipynb) script. These steps will run concurrently.\n",
|
||||
"\n",
|
||||
"The preprocessing data task fills in values of NaN data based on the values of the parameters named `fill_categorical_NA` and `fill_numerical_NA`. It will connect a parameter dictionary to the task which contains keys with those same names. The pipeline will override the values of those keys when the pipeline executes the cloned tasks of the base Task. In this way, two sets of data are created in the pipeline."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"pipe.add_step(\n",
|
||||
" name=\"preprocessing_1\",\n",
|
||||
" base_task_project=\"Tabular Example\",\n",
|
||||
@ -56,7 +134,33 @@
|
||||
" \"General/fill_categorical_NA\": \"False\",\n",
|
||||
" \"General/fill_numerical_NA\": \"True\",\n",
|
||||
" },\n",
|
||||
")\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Add Training Step"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Two training nodes are added to the pipeline: `train_1` and `train_2`. These two nodes will be cloned from the same base task, created from the [train_tabular_predictor.ipynb](train_tabular_predictor.ipynb) script.\n",
|
||||
"\n",
|
||||
"Each training node depends upon the completion of one preprocessing node. The `parents` parameter is a list of step names indicating all steps that must complete before the new step starts. In this case, `preprocessing_1` must complete before `train_1` begins, and `preprocessing_2` must complete before `train_2` begins.\n",
|
||||
"\n",
|
||||
"The ID of a task whose artifact contains a set of preprocessed data for training will be overridden using the `data_task_id key`. Its value takes the form `${<stage-name>.<part-of-task>}`. In this case, `${preprocessing_1.id}` is the ID of one of the preprocessing node tasks. In this way, each training task consumes its own set of data."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"pipe.add_step(\n",
|
||||
" name=\"train_1\",\n",
|
||||
" parents=[\"preprocessing_1\"],\n",
|
||||
@ -70,8 +174,23 @@
|
||||
" base_task_project=\"Tabular Example\",\n",
|
||||
" base_task_name=\"tabular prediction\",\n",
|
||||
" parameter_override={\"General/data_task_id\": \"${preprocessing_2.id}\"},\n",
|
||||
")\n",
|
||||
"\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Add Model Comparison Step\n",
|
||||
"The model comparison step depends upon both training nodes completing and takes the two training node task IDs to override the parameters in the base task. The IDs of the training tasks from the steps named `train_1` and `train_2` are passed to the model comparison Task. They take the form `${<stage-name>.<part-of-Task>}`."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"pipe.add_step(\n",
|
||||
" name=\"pick_best\",\n",
|
||||
" parents=[\"train_1\", \"train_2\"],\n",
|
||||
@ -81,6 +200,38 @@
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Set Default Execution Queue\n",
|
||||
"Set the default execution queue for pipeline steps that did not specify an execution queue. The pipeline steps will be enqueued for execution in this queue.\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"> **_Note_** Make sure to assign a ClearML Agent to the queue which the steps are enqueued, so they will be executed\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"pipe.set_default_execution_queue(default_execution_queue=\"default\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Execute the Pipeline\n",
|
||||
"Start the pipeline! The `start` method launches the pipeline controller remotely, by default on the `services` queue (change the queue by passing `queue=<queue_name>`).\n",
|
||||
"\n",
|
||||
"In order to launch the pipeline control logic locally, use the `start_locally` method instead. \n",
|
||||
"\n",
|
||||
"Once the pipeline starts, wait for it to complete. Finally, cleanup the pipeline processes."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
@ -112,9 +263,9 @@
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.8.5"
|
||||
"version": "3.8.10"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 4
|
||||
}
|
||||
}
|
@ -1,5 +1,12 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Training Step\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
@ -32,18 +39,39 @@
|
||||
"from clearml import Task"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Configure Task\n",
|
||||
"Instantiate a ClearML Task using `Task.init`. \n",
|
||||
"\n",
|
||||
"A Configuration dictionary is connected to the task using `Task.connect`. This will enable the pipeline controller to access this task's configurations and override the values when the pipeline is executed.\n",
|
||||
"\n",
|
||||
"Notice in the [pipeline controller script](tabular_ml_pipeline.ipynb) that when this task is added as a step in the pipeline, the value of `data_task_id` is overridden with the ID of another task in the pipeline. "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"task = Task.init(project_name='Tabular Example', task_name='tabular prediction')\n",
|
||||
"task = Task.init(project_name=\"Tabular Example\", task_name=\"tabular prediction\")\n",
|
||||
"logger = task.get_logger()\n",
|
||||
"configuration_dict = {'data_task_id': 'b605d76398f941e69fc91b43420151d2', \n",
|
||||
" 'number_of_epochs': 15, 'batch_size': 100, 'dropout': 0.3, 'base_lr': 0.1}\n",
|
||||
"configuration_dict = task.connect(configuration_dict) # enabling configuration override by clearml\n",
|
||||
"print(configuration_dict) # printing actual configuration (after override in remote mode)"
|
||||
"configuration_dict = {\n",
|
||||
" \"data_task_id\": \"b605d76398f941e69fc91b43420151d2\",\n",
|
||||
" \"number_of_epochs\": 15,\n",
|
||||
" \"batch_size\": 100,\n",
|
||||
" \"dropout\": 0.3,\n",
|
||||
" \"base_lr\": 0.1,\n",
|
||||
"}\n",
|
||||
"configuration_dict = task.connect(\n",
|
||||
" configuration_dict\n",
|
||||
") # enabling configuration override by clearml\n",
|
||||
"print(\n",
|
||||
" configuration_dict\n",
|
||||
") # printing actual configuration (after override in remote mode)"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -52,7 +80,7 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"data_task = Task.get_task(configuration_dict.get('data_task_id'))"
|
||||
"data_task = Task.get_task(configuration_dict.get(\"data_task_id\"))"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -61,8 +89,8 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"train_set = data_task.artifacts['train_data'].get().drop(columns=['Unnamed: 0'])\n",
|
||||
"test_set = data_task.artifacts['val_data'].get().drop(columns=['Unnamed: 0'])"
|
||||
"train_set = data_task.artifacts[\"train_data\"].get().drop(columns=[\"Unnamed: 0\"])\n",
|
||||
"test_set = data_task.artifacts[\"val_data\"].get().drop(columns=[\"Unnamed: 0\"])"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -71,10 +99,22 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"columns_categories = data_task.artifacts['Categries per column'].get()\n",
|
||||
"columns_categories_ordered = {key: columns_categories[key] for key in train_set.columns if key in columns_categories.keys()}\n",
|
||||
"columns_numerical = [key for key in train_set.drop(columns= ['OutcomeType']).drop(columns=columns_categories_ordered).keys()]\n",
|
||||
"embedding_sizes = [(n_categories, min(32, (n_categories+1)//2)) for _,n_categories in columns_categories_ordered.items()]"
|
||||
"columns_categories = data_task.artifacts[\"Categries per column\"].get()\n",
|
||||
"columns_categories_ordered = {\n",
|
||||
" key: columns_categories[key]\n",
|
||||
" for key in train_set.columns\n",
|
||||
" if key in columns_categories.keys()\n",
|
||||
"}\n",
|
||||
"columns_numerical = [\n",
|
||||
" key\n",
|
||||
" for key in train_set.drop(columns=[\"OutcomeType\"])\n",
|
||||
" .drop(columns=columns_categories_ordered)\n",
|
||||
" .keys()\n",
|
||||
"]\n",
|
||||
"embedding_sizes = [\n",
|
||||
" (n_categories, min(32, (n_categories + 1) // 2))\n",
|
||||
" for _, n_categories in columns_categories_ordered.items()\n",
|
||||
"]"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -83,10 +123,17 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"outcome_dict = data_task.artifacts['Outcome dictionary'].get()\n",
|
||||
"outcome_dict = data_task.artifacts[\"Outcome dictionary\"].get()\n",
|
||||
"reveresed_outcome_dict = {val: key for key, val in outcome_dict.items()}"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Train Model"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
@ -96,19 +143,32 @@
|
||||
"class ShelterDataset(Dataset):\n",
|
||||
" def __init__(self, X, Y, embedded_col_names):\n",
|
||||
" X = X.copy()\n",
|
||||
" self.X1 = X.loc[:,embedded_col_names].copy().values.astype(np.int64) #categorical columns\n",
|
||||
" self.X2 = X.drop(columns=embedded_col_names).copy().values.astype(np.float32) #numerical columns\n",
|
||||
" self.X1 = (\n",
|
||||
" X.loc[:, embedded_col_names].copy().values.astype(np.int64)\n",
|
||||
" ) # categorical columns\n",
|
||||
" self.X2 = (\n",
|
||||
" X.drop(columns=embedded_col_names).copy().values.astype(np.float32)\n",
|
||||
" ) # numerical columns\n",
|
||||
" self.y = Y\n",
|
||||
" \n",
|
||||
"\n",
|
||||
" def __len__(self):\n",
|
||||
" return len(self.y)\n",
|
||||
" \n",
|
||||
"\n",
|
||||
" def __getitem__(self, idx):\n",
|
||||
" return self.X1[idx], self.X2[idx], self.y[idx]\n",
|
||||
"\n",
|
||||
"#creating train and valid datasets\n",
|
||||
"train_ds = ShelterDataset(train_set.drop(columns= ['OutcomeType']), train_set['OutcomeType'], columns_categories_ordered.keys())\n",
|
||||
"valid_ds = ShelterDataset(test_set.drop(columns= ['OutcomeType']), test_set['OutcomeType'], columns_categories_ordered.keys())"
|
||||
"\n",
|
||||
"# creating train and valid datasets\n",
|
||||
"train_ds = ShelterDataset(\n",
|
||||
" train_set.drop(columns=[\"OutcomeType\"]),\n",
|
||||
" train_set[\"OutcomeType\"],\n",
|
||||
" columns_categories_ordered.keys(),\n",
|
||||
")\n",
|
||||
"valid_ds = ShelterDataset(\n",
|
||||
" test_set.drop(columns=[\"OutcomeType\"]),\n",
|
||||
" test_set[\"OutcomeType\"],\n",
|
||||
" columns_categories_ordered.keys(),\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -120,7 +180,9 @@
|
||||
"class ShelterModel(nn.Module):\n",
|
||||
" def __init__(self, embedding_sizes, n_cont):\n",
|
||||
" super().__init__()\n",
|
||||
" self.embeddings = nn.ModuleList([nn.Embedding(categories, size) for categories,size in embedding_sizes])\n",
|
||||
" self.embeddings = nn.ModuleList(\n",
|
||||
" [nn.Embedding(categories, size) for categories, size in embedding_sizes]\n",
|
||||
" )\n",
|
||||
" n_emb = sum(e.embedding_dim for e in self.embeddings)\n",
|
||||
" self.n_emb, self.n_cont = n_emb, n_cont\n",
|
||||
" self.lin1 = nn.Linear(self.n_emb + self.n_cont, 200)\n",
|
||||
@ -130,10 +192,10 @@
|
||||
" self.bn2 = nn.BatchNorm1d(200)\n",
|
||||
" self.bn3 = nn.BatchNorm1d(70)\n",
|
||||
" self.emb_drop = nn.Dropout(0.6)\n",
|
||||
" self.drops = nn.Dropout(configuration_dict.get('dropout', 0.25))\n",
|
||||
" self.drops = nn.Dropout(configuration_dict.get(\"dropout\", 0.25))\n",
|
||||
"\n",
|
||||
" def forward(self, x_cat, x_cont):\n",
|
||||
" x = [e(x_cat[:,i]) for i,e in enumerate(self.embeddings)]\n",
|
||||
" x = [e(x_cat[:, i]) for i, e in enumerate(self.embeddings)]\n",
|
||||
" x = torch.cat(x, 1)\n",
|
||||
" x = self.emb_drop(x)\n",
|
||||
" x2 = self.bn1(x_cont)\n",
|
||||
@ -147,6 +209,7 @@
|
||||
" x = self.lin3(x)\n",
|
||||
" return x\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"model = ShelterModel(embedding_sizes, 1)"
|
||||
]
|
||||
},
|
||||
@ -156,8 +219,12 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"optimizer = torch.optim.SGD(model.parameters(), lr = configuration_dict.get('base_lr', 0.1), momentum = 0.9)\n",
|
||||
"scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = configuration_dict.get('number_of_epochs', 15)//3, gamma = 0.1)"
|
||||
"optimizer = torch.optim.SGD(\n",
|
||||
" model.parameters(), lr=configuration_dict.get(\"base_lr\", 0.1), momentum=0.9\n",
|
||||
")\n",
|
||||
"scheduler = torch.optim.lr_scheduler.StepLR(\n",
|
||||
" optimizer, step_size=configuration_dict.get(\"number_of_epochs\", 15) // 3, gamma=0.1\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -166,8 +233,10 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"device = torch.cuda.current_device() if torch.cuda.is_available() else torch.device('cpu')\n",
|
||||
"print('Device to use: {}'.format(device))\n",
|
||||
"device = (\n",
|
||||
" torch.cuda.current_device() if torch.cuda.is_available() else torch.device(\"cpu\")\n",
|
||||
")\n",
|
||||
"print(\"Device to use: {}\".format(device))\n",
|
||||
"model.to(device)"
|
||||
]
|
||||
},
|
||||
@ -177,7 +246,7 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"tensorboard_writer = SummaryWriter('./tensorboard_logs')"
|
||||
"tensorboard_writer = SummaryWriter(\"./tensorboard_logs\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -193,13 +262,13 @@
|
||||
" for x1, x2, y in train_dl:\n",
|
||||
" batch = y.shape[0]\n",
|
||||
" output = model(x1.to(device), x2.to(device))\n",
|
||||
" loss = F.cross_entropy(output, y.to(device)) \n",
|
||||
" loss = F.cross_entropy(output, y.to(device))\n",
|
||||
" optim.zero_grad()\n",
|
||||
" loss.backward()\n",
|
||||
" optim.step()\n",
|
||||
" total += batch\n",
|
||||
" sum_loss += batch*(loss.item())\n",
|
||||
" return sum_loss/total"
|
||||
" sum_loss += batch * (loss.item())\n",
|
||||
" return sum_loss / total"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -218,20 +287,31 @@
|
||||
" current_batch_size = y.shape[0]\n",
|
||||
" out = model(x1.to(device), x2.to(device))\n",
|
||||
" loss = F.cross_entropy(out, y.to(device))\n",
|
||||
" sum_loss += current_batch_size*(loss.item())\n",
|
||||
" sum_loss += current_batch_size * (loss.item())\n",
|
||||
" total += current_batch_size\n",
|
||||
" pred = torch.max(out, 1)[1]\n",
|
||||
" correct += (pred.cpu() == y).float().sum().item()\n",
|
||||
" print(\"\\t valid loss %.3f and accuracy %.3f\" % (sum_loss/total, correct/total))\n",
|
||||
" tensorboard_writer.add_scalar('accuracy/total', correct/total, epoch)\n",
|
||||
" \n",
|
||||
" debug_categories = pd.DataFrame(x1.numpy(), columns=columns_categories_ordered.keys())\n",
|
||||
" print(\"\\t valid loss %.3f and accuracy %.3f\" % (sum_loss / total, correct / total))\n",
|
||||
" tensorboard_writer.add_scalar(\"accuracy/total\", correct / total, epoch)\n",
|
||||
"\n",
|
||||
" debug_categories = pd.DataFrame(\n",
|
||||
" x1.numpy(), columns=columns_categories_ordered.keys()\n",
|
||||
" )\n",
|
||||
" debug_numercal = pd.DataFrame(x2.numpy(), columns=columns_numerical)\n",
|
||||
" debug_gt = pd.DataFrame(np.array([reveresed_outcome_dict[int(e)] for e in y]), columns=['GT'])\n",
|
||||
" debug_pred = pd.DataFrame(np.array([reveresed_outcome_dict[int(e)] for e in pred.cpu()]), columns=['Pred'])\n",
|
||||
" debug_gt = pd.DataFrame(\n",
|
||||
" np.array([reveresed_outcome_dict[int(e)] for e in y]), columns=[\"GT\"]\n",
|
||||
" )\n",
|
||||
" debug_pred = pd.DataFrame(\n",
|
||||
" np.array([reveresed_outcome_dict[int(e)] for e in pred.cpu()]), columns=[\"Pred\"]\n",
|
||||
" )\n",
|
||||
" debug_table = debug_categories.join([debug_numercal, debug_gt, debug_pred])\n",
|
||||
" logger.report_table(title='Trainset - after labels encoding',series='pandas DataFrame',iteration=epoch, table_plot=debug_table.head())\n",
|
||||
" return sum_loss/total, correct/total"
|
||||
" logger.report_table(\n",
|
||||
" title=\"Trainset - after labels encoding\",\n",
|
||||
" series=\"pandas DataFrame\",\n",
|
||||
" iteration=epoch,\n",
|
||||
" table_plot=debug_table.head(),\n",
|
||||
" )\n",
|
||||
" return sum_loss / total, correct / total"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -241,12 +321,14 @@
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"def train_loop(model, epochs):\n",
|
||||
" for i in range(epochs): \n",
|
||||
" for i in range(epochs):\n",
|
||||
" loss = train_model(model, optimizer, train_dl)\n",
|
||||
" print(\"Epoch {}: training loss {}\".format(i, loss))\n",
|
||||
" tensorboard_writer.add_scalar('training loss/loss', loss, i)\n",
|
||||
" tensorboard_writer.add_scalar('learning rate/lr', optimizer.param_groups[0]['lr'], i)\n",
|
||||
" \n",
|
||||
" tensorboard_writer.add_scalar(\"training loss/loss\", loss, i)\n",
|
||||
" tensorboard_writer.add_scalar(\n",
|
||||
" \"learning rate/lr\", optimizer.param_groups[0][\"lr\"], i\n",
|
||||
" )\n",
|
||||
"\n",
|
||||
" val_loss(model, valid_dl, i)\n",
|
||||
" scheduler.step()"
|
||||
]
|
||||
@ -257,8 +339,20 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"train_dl = torch.utils.data.DataLoader(train_ds, batch_size=configuration_dict.get('batch_size', 100), shuffle=True, pin_memory=True, num_workers=1)\n",
|
||||
"valid_dl = torch.utils.data.DataLoader(valid_ds, batch_size=configuration_dict.get('batch_size', 100), shuffle=False, pin_memory=True, num_workers=1)"
|
||||
"train_dl = torch.utils.data.DataLoader(\n",
|
||||
" train_ds,\n",
|
||||
" batch_size=configuration_dict.get(\"batch_size\", 100),\n",
|
||||
" shuffle=True,\n",
|
||||
" pin_memory=True,\n",
|
||||
" num_workers=1,\n",
|
||||
")\n",
|
||||
"valid_dl = torch.utils.data.DataLoader(\n",
|
||||
" valid_ds,\n",
|
||||
" batch_size=configuration_dict.get(\"batch_size\", 100),\n",
|
||||
" shuffle=False,\n",
|
||||
" pin_memory=True,\n",
|
||||
" num_workers=1,\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -267,7 +361,16 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"train_loop(model, epochs=configuration_dict.get('number_of_epochs', 30))"
|
||||
"train_loop(model, epochs=configuration_dict.get(\"number_of_epochs\", 30))"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Save Model\n",
|
||||
"\n",
|
||||
"ClearML automatically captures the model logged with Torch, and saves it as an artifact."
|
||||
]
|
||||
},
|
||||
{
|
||||
@ -276,7 +379,7 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"PATH = './model_checkpoint.pth'\n",
|
||||
"PATH = \"./model_checkpoint.pth\"\n",
|
||||
"torch.save(model.state_dict(), PATH)\n",
|
||||
"tensorboard_writer.close()"
|
||||
]
|
||||
@ -298,7 +401,7 @@
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.7.4"
|
||||
"version": "3.8.10"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
|
Loading…
Reference in New Issue
Block a user