Initial commit

This commit is contained in:
allegroai 2021-04-12 03:17:25 +03:00
parent 350bb057a0
commit 44c66c3848
13 changed files with 1392 additions and 0 deletions

37
.gitignore vendored Normal file
View File

@ -0,0 +1,37 @@
# python build
dist/
build/
*.egg-info/
# Compiled Python bytecode
*.py[cod]
# Log files
*.log
# JetBrains IDE
.idea/
# Generated by MacOS
.DS_Store
# Generated by Windows
Thumbs.db
# Applications
*.app
*.exe
*.war
# Large media files
*.mp4
*.tiff
*.avi
*.flv
*.mov
*.wmv
# models
*.pbtxt
*.h5

144
README.md Normal file
View File

@ -0,0 +1,144 @@
<div align="center">
<a href="https://app.community.clear.ml"><img src="https://github.com/allegroai/clearml/blob/master/docs/clearml-logo.svg?raw=true" width="250px"></a>
**ClearML Serving - ML-Ops made easy**
## **`clearml-serving` </br> Model-Serving Orchestration and Repository Solution**
[![GitHub license](https://img.shields.io/github/license/allegroai/clearml-serving.svg)](https://img.shields.io/github/license/allegroai/clearml-serving.svg)
[![PyPI pyversions](https://img.shields.io/pypi/pyversions/clearml-serving.svg)](https://img.shields.io/pypi/pyversions/clearml-serving.svg)
[![PyPI version shields.io](https://img.shields.io/pypi/v/clearml-serving.svg)](https://img.shields.io/pypi/v/clearml-serving.svg)
[![PyPI status](https://img.shields.io/pypi/status/clearml-serving.svg)](https://pypi.python.org/pypi/clearml-serving/)
[![Slack Channel](https://img.shields.io/badge/slack-%23clearml--community-blueviolet?logo=slack)](https://join.slack.com/t/allegroai-trains/shared_invite/zt-c0t13pty-aVUZZW1TSSSg2vyIGVPBhg)
</div>
![alt screenshots](docs/webapp_screenshots.gif "screenshots")
**`clearml-serving`** is a command line utility for the flexible orchestration of your model deployment.
**`clearml-serving`** can make use of a variety of serving engines (**Nvidia Triton, OpenVino Model Serving, KFServing**)
setting them up for serving wherever you designate a ClearML Agent or on your ClearML Kubernetes cluster
Features:
* Spin serving engines on your Kubernetes cluster or ClearML Agent machine from CLI
* Full usage & performance metrics integrated with ClearML UI
* Multi-model support in a single serving engine container
* Automatically deploy new model versions
* Support Canary model releases
* Integrates to ClearML Model Repository
* Deploy & upgrade endpoints directly from ClearML UI
* Programmatic interface for endpoint/versions/metric control
## Installing ClearML Serving
1. Setup your [**ClearML Server**](https://github.com/allegroai/clearml-server) or use the [Free tier Hosting](https://app.community.clear.ml)
2. Connect your ClearML Worker(s) to your **ClearML Server** (see [**ClearML Agent**](https://github.com/allegroai/clearml-agent) / [Kubernetes integration](https://github.com/allegroai/clearml-agent#kubernetes-integration-optional))
3. Install `clearml-serving` (Note: `clearml-serving` is merely a control utility, it does not require any resources for actual serving)
```bash
pip install clearml-serving
```
## Using ClearML Serving
Clearml-Serving will automatically serve *published* models from your ClearML model repository, so the first step is getting a model into your ClearML model repository.
Background: When using `clearml` in your training code, any model stored by your python code is automatically registered (and, optionally, uploaded) to the model repository. This auto-magic logging is key for continuous model deployment.
To learn more on training models and the ClearML model repository, see the [ClearML documentation](https://allegro.ai/clearml/docs)
### Training a toy model with Keras (about 2 minutes on a laptop)
The main goal of `clearml-serving` is to seamlessly integrate with the development process and the model repository.
This is achieved by combining ClearML's auto-magic logging which creates and uploads models directly from
the python training code, with accessing these models as they are automatically added into the model repository using the ClearML Server's REST API and its pythonic interface.
Let's demonstrate this seamless integration by training a toy Keras model to classify images based on the MNIST dataset.
Once we have a trained model in the model repository we will serve it using `clearml-serving`.
We'll also see how we can retrain another version of the model, and have the model serving engine automatically upgrade to the new model version.
#### Keras mnist toy train example (single epoch mock training):
1. install `tensorflow` (and of course `cleamrl`)
```bash
pip install "tensorflow>2" clearml
```
2. Execute the training code
```bash
cd examples/keras
python keras_mnist.py
```
**Notice:** The only required integration code with `clearml` are the following two lines:
```python
from clearml import Task
task = Task.init(project_name="examples", task_name="Keras MNIST serve example", output_uri=True)
```
This call will make sure all outputs are automatically logged to the ClearML Server, this includes: console, Tensorboard, cmdline arguments, git repo etc.
It also means any model stored by the code will be automatically uploaded and logged in the ClearML model repository.
3. Review the models in the ClearML web UI:
Go to the "Projects" section of your ClearML server ([free hosted](https://app.community.clear.ml) or [self-deployed](https://github.com/allegroai/clearml-server)).
in the "examples" project, go to the Models tab (model repository).
We should have a model named "Keras MNIST serve example - serving_model".
Once a model-serving service is available, Right-clicking on the model and selecting "Publish" will trigger upgrading the model on the serving engine container.
Next we will spin the Serving Service and the serving-engine
### Serving your models
In order to serve your models, `clearml-serving` will spawn a serving service which stores multiple endpoints and their configuration,
collects metric reports, and updates models when new versions are published in the model repository.
In addition, a serving engine is launched, which is the container actually running the inference engine.
(Currently supported engines are Nvidia-Triton, coming soon are Intel OpenVIno serving-engine and KFServing)
Now that we have a published model in the ClearML model repository, we can spin a serving service and a serving engine.
Starting a Serving Service:
1. Create a new serving instance.
This is the control plane Task, we will see all its configuration logs and metrics in the "serving" project. We can have multiple serving services running in the same system.
In this example we will make use of Nvidia-Triton engines.
```bash
clearml-serving triton --project "serving" --name "serving example"
```
2. Add models to the serving engine with specific endpoints.
Reminder: to view your model repository, login to your ClearML account,
go to "examples" project and review the "Models" Tab
```bash
clearml-serving triton --endpoint "keras_mnist" --model-project "examples" --model-name "Keras MNIST serve example - serving_model"
```
3. Launch the serving service.
The service will be launched on your "services" queue, which by default runs services on the ClearML server machine.
(Read more on services queue [here](https://allegro.ai/clearml/docs/docs/concepts_fundamentals/concepts_fundamentals_clearml_server.html#clearml-agent-services-container))
We set our serving-engine to launch on the "default" queue,
```bash
clearml-serving launch --queue default
```
4. Optional: If you do not have a machine connected to your ClearML cluster, either read more on our Kubernetes integration, or spin a bare-metal worker and connect it with your ClearML Server.
`clearml-serving` is leveraging the orchestration capabilities of `ClearML` to launch the serving engine on the cluster.
Read more on the [ClearML Agent](https://github.com/allegroai/clearml-agent) orchestration module [here](https://allegro.ai/clearml/docs/docs/concepts_fundamentals/concepts_fundamentals_clearml_agent.html)
If you have not yet setup a ClearML worker connected to your `clearml` account, you can do this now using:
```bash
pip install clearml-agent
clearml-agent daemon --docker --queue default --detached
```
**We are done!**
To test the new served model, you can `curl` to the new endpoint:
```bash
curl <serving-engine-ip>:8000/v2/models/keras_mnist/versions/1
```
**Notice**: If we re-run our keras training example and publish a new model in the repository, the engine will automatically update to the new model.
Further reading on advanced topics [here](coming-soon)

View File

164
clearml_serving/__main__.py Normal file
View File

@ -0,0 +1,164 @@
import json
import os
from argparse import ArgumentParser, FileType
from .serving_service import ServingService
def restore_state(args):
session_state_file = os.path.expanduser('~/.clearml_serving.json')
# noinspection PyBroadException
try:
with open(session_state_file, 'rt') as f:
state = json.load(f)
except Exception:
state = {}
args.id = getattr(args, 'id', None) or state.get('id')
return args
def store_state(args, clear=False):
session_state_file = os.path.expanduser('~/.clearml_serving.json')
if clear:
state = {}
else:
state = {str(k): str(v) if v is not None else None
for k, v in args.__dict__.items() if not str(k).startswith('_') and k not in ('command', )}
# noinspection PyBroadException
try:
with open(session_state_file, 'wt') as f:
json.dump(state, f, sort_keys=True)
except Exception:
pass
def cmd_triton(args):
if not args.id and not args.name:
raise ValueError("Serving service must have a name, use --name <service_name>")
if args.id:
a_serving = ServingService(task_id=args.id)
else:
a_serving = ServingService(task_project=args.project, task_name=args.name, engine_type='triton')
args.id = a_serving.get_id()
if args.endpoint:
print("Nvidia Triton Engine ID: {} - Adding serving endpoint: \n".format(args.id) +
("model-project: '{}', model-name: '{}', model-tags: '{}', config-file: '{}'".format(
args.model_project or '',
args.model_name or '',
args.model_tags or '',
args.config or '') if not args.model_id else
"model-id: '{}', config-file: '{}'".format(args.model_id or '', args.config or '')))
if not args.endpoint and (args.model_project or args.model_tags or args.model_id or args.model_name):
raise ValueError("Serving endpoint must be provided, add --endpoint <endpoint_name>")
if args.endpoint:
a_serving.add_model_serving(
serving_url=args.endpoint,
model_project=args.model_project,
model_name=args.model_name,
model_tags=args.model_tags,
model_ids=[args.model_id] if args.model_id else None,
config_file=args.config,
max_versions=args.versions,
)
a_serving.serialize(force=True)
store_state(args)
def cmd_launch(args):
print('Launching Serving Engine: service: {}, queue: {}'.format(args.id, args.queue))
if not args.id:
raise ValueError("Serving service must specify serving service ID, use --id <service_id>")
a_serving = ServingService(task_id=args.id)
if a_serving.get_engine_type() not in ('triton',):
raise ValueError("Error, serving engine type \'{}\' is not supported".format(a_serving.get_engine_type()))
# launch services queue
a_serving.launch(queue_name=args.service_queue)
# launch engine
a_serving.launch_engine(queue_name=args.queue)
def cli():
title = 'clearml-serving - CLI for launching ClearML serving engine'
print(title)
parser = ArgumentParser(prog='clearml-serving', description=title)
subparsers = parser.add_subparsers(help='Serving engine commands', dest='command')
# create the launch command
parser_launch = subparsers.add_parser('launch', help='Launch a previously configured serving service')
parser_launch.add_argument(
'--id', default=None, type=str,
help='Specify a previously configured service ID, if not provided use the last created service')
parser_launch.add_argument(
'--queue', default=None, type=str, required=True,
help='Specify the clearml queue to be used for the serving engine server')
parser_launch.add_argument(
'--service-queue', default='services', type=str,
help='Specify the service queue to be used for the serving service, default: services queue')
parser_launch.set_defaults(func=cmd_launch)
# create the parser for the "triton" command
parser_trt = subparsers.add_parser('triton', help='Nvidia Triton Serving Engine')
parser_trt.add_argument(
'--id', default=None, type=str,
help='Add configuration to running serving session, pass serving Task ID, '
'if passed ignore --name / --project')
parser_trt.add_argument(
'--name', default=None, type=str,
help='Give serving service a name, should be a unique name')
parser_trt.add_argument(
'--project', default='DevOps', type=str,
help='Serving service project name, default: DevOps')
parser_trt.add_argument(
'--endpoint', required=False, type=str,
help='Serving endpoint, one per model, unique ')
parser_trt.add_argument(
'--versions', type=int,
help='Serving endpoint, support multiple versions, '
'max versions to deploy (version number always increase). Default (no versioning).')
parser_trt.add_argument(
'--config', required=False, type=FileType,
help='Model `config.pbtxt` file, one per model, order matching with models')
parser_trt.add_argument(
'--model-id', type=str,
help='(Optional) Model ID to deploy, if passed model-project/model-name/model-tags are ignored')
parser_trt.add_argument(
'--model-project', type=str, help='Automatic model deployment and upgrade, select model project (exact match)')
parser_trt.add_argument(
'--model-name', type=str, help='Automatic model deployment and upgrade, select model name (exact match)')
parser_trt.add_argument(
'--model-tags', nargs='*', type=str,
help='Automatic model deployment and upgrade, select model name tags to include, '
'model has to have all tags to be deployed/upgraded')
parser_trt.set_defaults(func=cmd_triton)
args = parser.parse_args()
args = restore_state(args)
if args.command:
args.func(args)
else:
parser.print_help()
def main():
try:
cli()
except KeyboardInterrupt:
print('\nUser aborted')
except Exception as ex:
print('\nError: {}'.format(ex))
exit(1)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,17 @@
from time import sleep
from clearml import Task
from clearml_serving.serving_service import ServingService
def main():
# we should only be running in remotely by an agent
task = Task.init()
serving = ServingService(task=task)
while True:
serving.update()
serving.stats()
sleep(60.)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,568 @@
import json
import shutil
from logging import getLogger
from pathlib import Path as Path3
from time import time
from typing import Optional, Union, Dict, Sequence
from attr import attrib, attrs, asdict
from pathlib2 import Path
from clearml import Task, Model, InputModel
class ServingService(object):
_config_pbtxt_section = 'config.pbtxt'
_supported_serving_engines = ('triton', 'ovms', 'kfserving')
@attrs
class EndPoint(object):
serving_url = attrib(type=str)
model_ids = attrib(type=list)
model_project = attrib(type=str)
model_name = attrib(type=str)
model_tags = attrib(type=list)
model_config_blob = attrib(type=str, default=None)
max_num_revisions = attrib(type=int, default=None)
versions = attrib(type=dict, default={})
def as_dict(self):
return asdict(self)
def __init__(self, task_id=None, task_project=None, task_name=None, task=None, engine_type='triton'):
# type: (Optional[str], Optional[str], Optional[str], Optional[Task], Optional[str]) -> None
"""
:param task_id: Optional specify existing Task ID of the ServingService
:param task_project: Select the project where the new ServingService task will be created
:param task_name: Specify the Task name for the newly created ServingService
:param task: Optional pass existing ServingService Task object
:param engine_type: Specify the serving engine Type. Examples: triton, ovms, kfserving
"""
assert engine_type in self._supported_serving_engines
if task:
self._task = task
elif task_id:
self._task = Task.get_task(task_id=task_id)
else:
# noinspection PyProtectedMember
if Task._query_tasks(project_name=task_project, task_name=task_name):
self._task = Task.get_task(project_name=task_project, task_name=task_name)
else:
self._task = Task.create(
project_name=task_project, task_name=task_name, task_type=Task.TaskTypes.service,
repo="https://github.com/allegroai/clearml-serving.git",
branch="master",
script="clearml_serving/service.py",
working_directory=".",
add_task_init_call=False,
)
self._task.set_system_tags(list(self._task.get_system_tags()) + ['serving'])
# self._current_serving_endpoints = {'an_enpoint_url': {1: 'model_id'}}
self._current_serving_endpoints = {} # type: Dict[str, Dict[int, str]]
# self._endpoints = {'an_enpoint_url': ServingService.EndPoint()}
self._endpoints = {} # type: Dict[str, ServingService.EndPoint]
self._engine_type = engine_type
self._dirty = False
self._last_update_step = None
# try to deserialize from Task
# noinspection PyBroadException
try:
self._deserialize()
except Exception:
pass
def add_model_serving(
self,
serving_url, # type: str
model_ids=None, # type: Optional[Sequence[str]]
model_project=None, # type: Optional[str]
model_name=None, # type: Optional[str]
model_tags=None, # type: Optional[Sequence[str]]
config_file=None, # type: Optional[Union[Path, Path3, str]]
max_versions=1, # type: Optional[int]
):
"""
Add new model serving endpoint, automatically published
:param serving_url:
:param model_ids:
:param model_project:
:param model_name:
:param model_tags:
:param config_file:
:param max_versions:
:return:
"""
if not serving_url:
raise ValueError("serving_url is required")
if model_tags and not isinstance(model_tags, (list, tuple)):
raise ValueError("model_tags must be a list of strings")
# normalize endpoint url
serving_url = str(serving_url).strip('/')
endpoint = self.EndPoint(
serving_url=serving_url,
model_ids=list(model_ids) if model_ids else None,
model_name=model_name,
model_project=model_project,
model_tags=model_tags,
max_num_revisions=max_versions or None,
versions={},
model_config_blob='',
)
# load config file
if config_file:
with open(str(config_file), 'rt') as f:
endpoint.model_config_blob = f.read()
else:
# Look for the config on the Model generated Task
found_models = Model.query_models(project_name=model_project, model_name=model_name, tags=model_tags) or []
selected_model = None
# find the first model with config.pbtxt configuration
# prefer published models
found_models = [m for m in found_models if m.published] + [m for m in found_models if not m.published]
for m in found_models:
task_id = m.task
task = Task.get_task(task_id=task_id)
config_pbtxt = task.get_configuration_object(self._config_pbtxt_section)
if config_pbtxt and str(config_pbtxt).strip():
endpoint.model_config_blob = config_pbtxt
selected_model = m
break
if not selected_model:
raise ValueError(
"Requested Model project={} name={} tags={} not found. 'config.pbtxt' could not be inferred. "
"please provide specific config.pbtxt definition.".format(model_project, model_name, model_tags))
elif len(found_models) > 1:
getLogger('clearml-serving').warning(
"Found more than one Model, using model id={}".format(selected_model.id))
self._endpoints[serving_url] = endpoint
self._dirty = True
def launch(self, queue_name='services', queue_id=None, force=False, verbose=True):
# type: (Optional[str], Optional[str], bool, bool) -> None
"""
Launch serving service on a remote machine using the specified queue
:param queue_name: Queue name to launch the serving service control plane
:param queue_id: specify queue id (unique stand stable) instead of queue_name
:param force: if False check if service Task is already running before enqueuing
:param verbose: If True print progress to console
"""
# check if we are not already running
if not force and ((self._task.data.execution.queue and self._task.status == 'in_progress')
or self._task.status == 'queued'):
if verbose:
print('Serving service already running')
else:
if verbose:
print('Launching Serving service on {} queue'.format(queue_id or queue_name))
self.update_endpoint_graph(force=True)
self.update_model_endpoint_state()
self.serialize()
self._task.flush(wait_for_uploads=True)
self._task.reset()
self._task.enqueue(task=self._task, queue_name=queue_name, queue_id=queue_id)
def launch_engine(self, queue_name, queue_id=None, verbose=True):
# type: (Optional[str], Optional[str], bool) -> None
"""
Launch serving engine on a specific queue
:param queue_name: Queue name to launch the engine service running the inference on.
:param queue_id: specify queue id (unique stand stable) instead of queue_name
:param verbose: If True print progress to console
"""
# todo: add more engines
if self._engine_type == 'triton':
# create the serving engine Task
engine_task = Task.create(
project_name=self._task.get_project_name(),
task_name="triton serving engine",
task_type=Task.TaskTypes.inference,
repo="https://github.com/allegroai/clearml-serving.git",
branch="master",
script="clearml_serving/triton_helper.py",
working_directory=".",
docker="nvcr.io/nvidia/tritonserver:21.03-py3 --ipc=host -p 8000:8000 -p 8001:8001 -p 8002:8002",
argparse_args=[('serving_id', self._task.id), ],
add_task_init_call=False,
)
if verbose:
print('Launching engine {} on queue {}'.format(self._engine_type, queue_id or queue_name))
engine_task.enqueue(task=engine_task, queue_name=queue_name, queue_id=queue_id)
def update_endpoint_graph(self, force=False):
# type: (bool) -> None
"""
Update the endpoint serving graph
:param force: If True always update, otherwise skip if service was not changed since lat time
"""
if not force and not self._dirty:
return
# Generate configuration table and details
table_values = [["Endpoint", "Model ID", "Model Project", "Model Name", "Model Tags", "Max Versions"]]
for endpoint in sorted(self._endpoints.keys()):
n = self._endpoints[endpoint]
table_values.append([
str(n.serving_url or ''),
str(n.model_ids or ''),
str(n.model_project or ''),
str(n.model_name or ''),
str(n.model_tags or ''),
str(n.max_num_revisions or '')
])
self._task.get_logger().report_table(
title='Serving Endpoint Configuration', series='Details', iteration=0, table_plot=table_values,
extra_layout={"title": "Model Endpoints Details"})
# generate current endpoint view
sankey_node = dict(
label=[],
color=[],
customdata=[],
hovertemplate='%{customdata}<extra></extra>',
hoverlabel={"align": "left"},
)
sankey_link = dict(
source=[],
target=[],
value=[],
hovertemplate='<extra></extra>',
)
# root
sankey_node['color'].append("mediumpurple")
sankey_node['label'].append('{}'.format('serving'))
sankey_node['customdata'].append("")
# Generate table and details
table_values = [["Endpoint", "Version", "Model ID"]]
# noinspection PyProtectedMember
base_url = self._task._get_app_server() + '/projects/*/models/{model_id}/general'
for i, serve_url in enumerate(sorted(self._endpoints.keys())):
ep = self._endpoints[serve_url]
sankey_node['color'].append("blue")
sankey_node['label'].append('{}'.format(serve_url))
sankey_node['customdata'].append(
"project: {}<br />name: {}<br />tags: {}".format(
ep.model_project or '', ep.model_name or '', ep.model_tags or '')
)
sankey_link['source'].append(0)
sankey_link['target'].append(i + 1)
sankey_link['value'].append(1. / len(self._endpoints))
for v in sorted(self._current_serving_endpoints.get(serve_url, [])):
model_id = self._current_serving_endpoints[serve_url][v]
href = '<a href="{}"> {} </a>'.format(base_url.format(model_id=model_id), model_id)
table_values.append([str(serve_url), str(v), href])
sankey_node['color'].append("lightblue")
sankey_node['label'].append('{}'.format(v))
sankey_node['customdata'].append(model_id)
sankey_link['source'].append(i + 1)
sankey_link['target'].append(len(sankey_node['color']) - 1)
sankey_link['value'].append(1. / len(self._current_serving_endpoints[serve_url]))
# create the sankey graph
dag_flow = dict(
link=sankey_link,
node=sankey_node,
textfont=dict(color='rgba(0,0,0,255)', size=10),
type='sankey',
orientation='h'
)
fig = dict(data=[dag_flow], layout={'xaxis': {'visible': False}, 'yaxis': {'visible': False}})
self._task.get_logger().report_plotly(
title='Model Serving Endpoints', series='', iteration=0, figure=fig)
# report detailed table
self._task.get_logger().report_table(
title='Serving Endpoint', series='Details', iteration=0, table_plot=table_values,
extra_layout={"title": "Model Endpoints Details"})
self._dirty = False
def update_model_endpoint_state(self):
# type: () -> bool
"""
Update model endpoint state from the model repository
:return: True if endpoints were updated
"""
for endpoint, node in self._endpoints.items():
# model ID supersedes everything
if node.model_ids:
model_ids = node.model_ids
else:
# get list of models sorted by descending update time
models = Model.query_models(
project_name=node.model_project,
model_name=node.model_name,
tags=node.model_tags
)
# prefer published models
model_ids = [m.id for m in models if m.published] + [m.id for m in models if not m.published]
cur_endpoint = self._current_serving_endpoints.get(node.serving_url, {})
cur_endpoint = {int(k): v for k, v in cur_endpoint.items() if v in model_ids}
cur_endpoint_m_ids = list(cur_endpoint.values())
max_v = max(list(cur_endpoint.keys()) or [0])
for i, m_id in enumerate(model_ids):
# only pick the latest in the history
if node.max_num_revisions and max_v >= node.max_num_revisions:
break
if m_id in cur_endpoint_m_ids:
continue
max_v += 1
cur_endpoint[max_v] = m_id
# check if we need to update,
if self._current_serving_endpoints.get(node.serving_url) != cur_endpoint:
# set dirty flag
self._dirty = True
# store updated results
self._current_serving_endpoints[node.serving_url] = cur_endpoint
return self._dirty
def stats(self):
pass
def get_endpoints(self):
# type: () -> Dict[str, ServingService.EndPoint]
"""
return the internal endpoints configuration
:return: dict where the keys is the endpoint url and the value is the endpoint configuration
"""
return self._endpoints
def get_endpoint_version_model_id(self, serving_url):
# type: (str) -> Dict[int, str]
"""
Return dict with model versions and model id for the specific serving url
If serving url is not found, return None
:param serving_url: sering url string
:return: dictionary keys are the versions (integers) and values are the model IDs (str)
"""
return self._current_serving_endpoints.get(serving_url) or {}
def _serialize(self):
configuration = dict()
for name, ep in self._endpoints.items():
# noinspection PyProtectedMember
self._task.set_configuration_object(
name="model.{}".format(name),
description='Model Serving Configuration',
config_type='pbtxt',
config_text=ep.model_config_blob)
ep_conf = ep.as_dict()
ep_conf.pop('model_config_blob', None)
configuration['"{}"'.format(name)] = ep_conf
# noinspection PyProtectedMember
self._task._set_configuration(
config_dict=configuration, name='endpoints',
config_type='hocon', description='Serving Endpoints Configuration')
# set configuration of current served endpoints
# noinspection PyProtectedMember
self._task._set_configuration(
config_dict=self._current_serving_endpoints, name='serving_state',
config_type='hocon', description='Current Serving Endpoints State',
)
serving = dict(engine=self._engine_type)
self._task.connect(serving, name='serving')
def _deserialize(self):
# type: () -> bool
"""
deserialize internal state from Task backend
:return: return True if new state a was updated.
"""
# update if the task was updated
if self._endpoints:
last_update = self._task.data.last_update
try:
# noinspection PyProtectedMember
if last_update == self._task._get_last_update():
return True
except AttributeError:
# support old clearml packages
pass
self._task.reload()
# noinspection PyProtectedMember
configuration = self._task._get_configuration_dict(name='endpoints')
if not configuration:
return False
self._endpoints = {}
self._current_serving_endpoints = {}
serving = dict(engine='')
task_parameters = self._task.get_parameters_as_dict()
serving.update(task_parameters.get('serving', {}))
self._engine_type = serving['engine']
for name, endpoint in configuration.items():
ep = self.EndPoint(model_config_blob='', **endpoint)
ep.model_config_blob = self._task.get_configuration_object(
name="model.{}".format(ep.serving_url))
self._endpoints[ep.serving_url] = ep
# get configuration of current served endpoints
# noinspection PyProtectedMember
self._current_serving_endpoints = self._task._get_configuration_dict(name='serving_state')
self._dirty = True
return True
def update(self, force=False):
# type: (bool) -> bool
"""
Update internal endpoint state based on Task configuration and model repository
:param force: if True force update
:return: True if internal state updated.
"""
if not self._task:
return False
# store current internal state
state_hash = self.__state_hash()
if not self._deserialize():
return False
# check if current internal state changed
if not force and state_hash == self.__state_hash():
print("Skipping update, nothing changed")
return False
return self.update_model_endpoint_state()
def get_id(self):
# type: () -> str
"""
Return the Serving Service Task ID
:return: Unique Task ID (str)
"""
return self._task.id
def get_engine_type(self):
# type: () -> str
"""
return the engine type used ib the serving service
:return: engine type (str). example: triton, ovms, kfserving
"""
return self._engine_type
def serialize(self, force=False):
# type: (bool) -> None
"""
Serialize current service state to the Task
:param force: If True synchronize an aborted/completed Task
"""
if force and self._task.status not in (Task.TaskStatusEnum.created, Task.TaskStatusEnum.in_progress):
self._task.mark_started(force=True)
self._serialize()
def triton_model_service_update_step(self, model_repository_folder=None, verbose=True):
# type: (Optional[str], bool) -> None
# check if something changed since last time
if not self.update(force=self._last_update_step is None):
return
self._last_update_step = time()
if not model_repository_folder:
model_repository_folder = '/models/'
if verbose:
print('Updating local model folder: {}'.format(model_repository_folder))
for url, endpoint in self.get_endpoints().items():
folder = Path(model_repository_folder) / url
folder.mkdir(parents=True, exist_ok=True)
with open((folder / 'config.pbtxt').as_posix(), 'wt') as f:
f.write(endpoint.model_config_blob)
# download model versions
for version, model_id in self.get_endpoint_version_model_id(serving_url=url).items():
model_folder = folder / str(version)
model_folder.mkdir(parents=True, exist_ok=True)
model = None
# noinspection PyBroadException
try:
model = InputModel(model_id)
local_path = model.get_local_copy()
except Exception:
local_path = None
if not local_path:
print("Error retrieving model ID {} []".format(model_id, model.url if model else ''))
continue
local_path = Path(local_path)
if verbose:
print('Update model v{} in {}'.format(version, model_folder))
# if this is a folder copy every and delete the temp folder
if local_path.is_dir():
# we assume we have a `tensorflow.savedmodel` folder
model_folder /= 'model.savedmodel'
model_folder.mkdir(parents=True, exist_ok=True)
# rename to old
old_folder = None
if model_folder.exists():
old_folder = model_folder.parent / '.old.{}'.format(model_folder.name)
model_folder.replace(old_folder)
if verbose:
print('copy model into {}'.format(model_folder))
shutil.copytree(
local_path.as_posix(), model_folder.as_posix(), symlinks=False,
)
if old_folder:
shutil.rmtree(path=old_folder.as_posix())
# delete temp folder
shutil.rmtree(local_path.as_posix())
else:
# single file should be moved
target_path = model_folder / local_path.name
old_file = None
if target_path.exists():
old_file = target_path.parent / '.old.{}'.format(target_path.name)
target_path.replace(old_file)
shutil.move(local_path.as_posix(), target_path.as_posix())
if old_file:
old_file.unlink()
def __state_hash(self):
# type: () -> int
"""
Return Hash of the internal state (use only for in process comparison
:return: hash int
"""
return hash(json.dumps(
[self._current_serving_endpoints, {k: v.as_dict() for k, v in self._endpoints.items()}],
sort_keys=True))

View File

@ -0,0 +1,214 @@
import re
import subprocess
from argparse import ArgumentParser
from time import time
from typing import Optional
from pathlib2 import Path
from clearml import Task, Logger
from clearml.backend_api.utils import get_http_session_with_retry
from clearml_serving.serving_service import ServingService
class TritonHelper(object):
_metric_line_parsing = r"(\w+){(gpu_uuid=\"[\w\W]*\",)?model=\"(\w+)\",\s*version=\"(\d+)\"}\s*([0-9.]*)"
_default_metrics_port = 8002
def __init__(
self,
args, # Any
task, # type: Task
serving_id, # type: str
metric_host=None, # type: Optional[str]
metric_port=None, # type: int
):
# type: (...) -> None
self._http_session = get_http_session_with_retry()
self.args = dict(**args.__dict__) if args else {}
self.task = task
self.serving_id = serving_id
self.metric_host = metric_host or '0.0.0.0'
self.metric_port = metric_port or self._default_metrics_port
self._parse_metric = re.compile(self._metric_line_parsing)
self._timestamp = time()
print('String Triton Helper service\n{}\n'.format(self.args))
def report_metrics(self, remote_logger):
# type: (Optional[Logger]) -> bool
# iterations are seconds from start
iteration = int(time() - self._timestamp)
report_msg = "reporting metrics: relative time {} sec".format(iteration)
self.task.get_logger().report_text(report_msg)
if remote_logger:
remote_logger.report_text(report_msg)
# noinspection PyBroadException
try:
request = self._http_session.get('http://{}:{}/metrics'.format(
self.metric_host, self.metric_port))
if not request.ok:
return False
content = request.content.decode().split('\n')
except Exception:
return False
for line in content:
line = line.strip()
if not line or line.startswith('#'):
continue
# noinspection PyBroadException
try:
metric, gpu_uuid, variant, version, value = self._parse_metric.match(line).groups()
value = float(value)
except Exception:
continue
self.task.get_logger().report_scalar(
title=metric,
series='{}.v{}'.format(variant, version),
iteration=iteration,
value=value
)
# on the remote logger we add our own Task ID (unique ID),
# to support multiple servers reporting to the same service controller
if remote_logger:
remote_logger.report_scalar(
title=metric,
series='{}.v{}.{}'.format(variant, version, self.task.id),
iteration=iteration,
value=value
)
def maintenance_daemon(
self,
local_model_repo='/models', # type: str
update_frequency_sec=60.0, # type: float
metric_frequency_sec=60.0 # type: float
):
# type: (...) -> None
Path(local_model_repo).mkdir(parents=True, exist_ok=True)
a_service = ServingService(task_id=self.serving_id)
a_service.triton_model_service_update_step(model_repository_folder=local_model_repo)
# noinspection PyProtectedMember
remote_logger = a_service._task.get_logger()
# todo: log triton server outputs when running locally
# we assume we can run the triton server
cmd = [
'tritonserver',
'--model-control-mode=poll',
'--model-repository={}'.format(local_model_repo),
'--repository-poll-secs={}'.format(update_frequency_sec),
'--metrics-port={}'.format(self._default_metrics_port),
'--allow-metrics=true',
'--allow-gpu-metrics=true',
]
for k, v in self.args.items():
if not v or not str(k).startswith('t_'):
continue
cmd.append('--{}={}'.format(k, v))
print('Starting server: {}'.format(cmd))
proc = subprocess.Popen(cmd)
base_freq = min(update_frequency_sec, metric_frequency_sec)
metric_tic = update_tic = time()
while True:
try:
error_code = proc.wait(timeout=base_freq)
if error_code == 0:
print("triton-server process ended with error code {}".format(error_code))
return
raise ValueError("triton-server process ended with error code {}".format(error_code))
except subprocess.TimeoutExpired:
pass
pass
# update models
if time() - update_tic > update_frequency_sec:
a_service.triton_model_service_update_step(model_repository_folder=local_model_repo)
update_tic = time()
# update stats
if time() - metric_tic > metric_frequency_sec:
metric_tic = time()
self.report_metrics(remote_logger)
def main():
title = 'clearml-serving - Nvidia Triton Engine Helper'
print(title)
parser = ArgumentParser(prog='clearml-serving', description=title)
parser.add_argument(
'--serving-id', default=None, type=str, required=True,
help='Specify main serving service Task ID')
parser.add_argument(
'--project', default='serving', type=str,
help='Optional specify project for the serving engine Task')
parser.add_argument(
'--name', default='nvidia-triton', type=str,
help='Optional specify task name for the serving engine Task')
parser.add_argument(
'--update-frequency', default=10, type=float,
help='Model update frequency in minutes')
parser.add_argument(
'--metric-frequency', default=1, type=float,
help='Metric reporting update frequency in minutes')
parser.add_argument(
'--t-http-port', type=str, help='<integer> The port for the server to listen on for HTTP requests')
parser.add_argument(
'--t-http-thread-count', type=str, help='<integer> Number of threads handling HTTP requests')
parser.add_argument(
'--t-allow-grpc', type=str, help='<integer> Allow the server to listen for GRPC requests')
parser.add_argument(
'--t-grpc-port', type=str, help='<integer> The port for the server to listen on for GRPC requests')
parser.add_argument(
'--t-grpc-infer-allocation-pool-size', type=str,
help='<integer> The maximum number of inference request/response objects that remain '
'allocated for reuse. As long as the number of in-flight requests doesn\'t exceed '
'this value there will be no allocation/deallocation of request/response objects')
parser.add_argument(
'--t-pinned-memory-pool-byte-size', type=str,
help='<integer> The total byte size that can be allocated as pinned system '
'memory. If GPU support is enabled, the server will allocate pinned '
'system memory to accelerate data transfer between host and devices '
'until it exceeds the specified byte size. This option will not affect '
'the allocation conducted by the backend frameworks. Default is 256 MB')
parser.add_argument(
'--t-cuda-memory-pool-byte-size', type=str,
help='<<integer>:<integer>> The total byte size that can be allocated as CUDA memory for '
'the GPU device. If GPU support is enabled, the server will allocate '
'CUDA memory to minimize data transfer between host and devices '
'until it exceeds the specified byte size. This option will not affect '
'the allocation conducted by the backend frameworks. The argument '
'should be 2 integers separated by colons in the format <GPU device'
'ID>:<pool byte size>. This option can be used multiple times, but only '
'once per GPU device. Subsequent uses will overwrite previous uses for '
'the same GPU device. Default is 64 MB')
parser.add_argument(
'--t-min-supported-compute-capability', type=str,
help='<float> The minimum supported CUDA compute capability. GPUs that '
'don\'t support this compute capability will not be used by the server')
parser.add_argument(
'--t-buffer-manager-thread-count', type=str,
help='<integer> The number of threads used to accelerate copies and other'
'operations required to manage input and output tensor contents.'
'Default is 0')
args = parser.parse_args()
task = Task.init(project_name=args.project, task_name=args.name, task_type=Task.TaskTypes.inference)
helper = TritonHelper(args, task, serving_id=args.serving_id)
# this function will never end
helper.maintenance_daemon(
local_model_repo='/models',
update_frequency_sec=args.update_frequency*60.0,
metric_frequency_sec=args.metric_frequency*60.0,
)
if __name__ == '__main__':
main()

View File

@ -0,0 +1 @@
__version__ = '0.3.1'

BIN
docs/webapp_screenshots.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 128 KiB

View File

@ -0,0 +1,169 @@
# ClearML - Keras with Tensorboard example code, automatic logging model and Tensorboard outputs
#
# Train a simple deep NN on the MNIST dataset.
# Then store a model to be served by clearml-serving
import argparse
import os
import tempfile
import numpy as np
import tensorflow as tf
from pathlib import Path
from tensorflow.keras import utils as np_utils
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Activation, Dense
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import RMSprop
from clearml import Task
class TensorBoardImage(TensorBoard):
@staticmethod
def make_image(tensor):
from PIL import Image
import io
tensor = np.stack((tensor, tensor, tensor), axis=2)
height, width, channels = tensor.shape
image = Image.fromarray(tensor)
output = io.BytesIO()
image.save(output, format='PNG')
image_string = output.getvalue()
output.close()
return tf.Summary.Image(height=height,
width=width,
colorspace=channels,
encoded_image_string=image_string)
def on_epoch_end(self, epoch, logs=None):
if logs is None:
logs = {}
super(TensorBoardImage, self).on_epoch_end(epoch, logs)
images = self.validation_data[0] # 0 - data; 1 - labels
img = (255 * images[0].reshape(28, 28)).astype('uint8')
image = self.make_image(img)
summary = tf.Summary(value=[tf.Summary.Value(tag='image', image=image)])
self.writer.add_summary(summary, epoch)
def create_config_pbtxt(model, config_pbtxt_file):
platform = "tensorflow_savedmodel"
input_name = model.input_names[0]
output_name = model.output_names[0]
input_data_type = "TYPE_FP32"
output_data_type = "TYPE_FP32"
input_dims = str(model.input.shape.as_list()).replace("None", "-1")
output_dims = str(model.output.shape.as_list()).replace("None", "-1")
config_pbtxt = """
platform: "%s"
input [
{
name: "%s"
data_type: %s
dims: %s
}
]
output [
{
name: "%s"
data_type: %s
dims: %s
}
]
""" % (
platform,
input_name, input_data_type, input_dims,
output_name, output_data_type, output_dims
)
with open(config_pbtxt_file, "w") as config_file:
config_file.write(config_pbtxt)
def main():
parser = argparse.ArgumentParser(description='Keras MNIST Example - training CNN classification model')
parser.add_argument('--batch-size', type=int, default=128, help='input batch size for training (default: 128)')
parser.add_argument('--epochs', type=int, default=1, help='number of epochs to train (default: 6)')
args = parser.parse_args()
# the data, shuffled and split between train and test sets
nb_classes = 10
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train.reshape(60000, 784).astype('float32') / 255.
X_test = X_test.reshape(10000, 784).astype('float32') / 255.
print(X_train.shape[0], 'train samples')
print(X_test.shape[0], 'test samples')
# convert class vectors to binary class matrices
Y_train = np_utils.to_categorical(y_train, nb_classes)
Y_test = np_utils.to_categorical(y_test, nb_classes)
model = Sequential()
model.add(Dense(512, input_shape=(784,)))
model.add(Activation('relu'))
# model.add(Dropout(0.2))
model.add(Dense(512))
model.add(Activation('relu'))
# model.add(Dropout(0.2))
model.add(Dense(10))
model.add(Activation('softmax'))
model2 = Sequential()
model2.add(Dense(512, input_shape=(784,)))
model2.add(Activation('relu'))
model.summary()
model.compile(
loss='categorical_crossentropy',
optimizer=RMSprop(),
metrics=['accuracy']
)
# Connecting ClearML with the current process,
# from here on everything is logged automatically
task = Task.init(project_name='examples', task_name='Keras MNIST serve example', output_uri=True)
# Advanced: setting model class enumeration
labels = dict(('digit_%d' % i, i) for i in range(10))
task.set_model_label_enumeration(labels)
output_folder = os.path.join(tempfile.gettempdir(), 'keras_example_new_temp_now')
board = TensorBoard(histogram_freq=1, log_dir=output_folder, write_images=False)
model_store = ModelCheckpoint(filepath=os.path.join(output_folder, 'weight.{epoch}.hdf5'))
# load previous model, if it is there
# noinspection PyBroadException
try:
model.load_weights(os.path.join(output_folder, 'weight.1.hdf5'))
except Exception:
pass
model.fit(
X_train, Y_train,
batch_size=args.batch_size, epochs=args.epochs,
callbacks=[board, model_store],
verbose=1, validation_data=(X_test, Y_test)
)
score = model.evaluate(X_test, Y_test, verbose=0)
# store the model in a format that can be served
model.save('serving_model', include_optimizer=False)
# create the config.pbtxt for triton to be able to serve the model
create_config_pbtxt(model=model, config_pbtxt_file='config.pbtxt')
# store the configuration on the creating Task,
# this will allow us to skip over manually setting the config.pbtxt for `clearml-serving`
task.connect_configuration(configuration=Path('config.pbtxt'), name='config.pbtxt')
print('Test score: {}'.format(score[0]))
print('Test accuracy: {}'.format(score[1]))
if __name__ == '__main__':
main()

View File

@ -0,0 +1,2 @@
tensorflow>=2.0
clearml

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
clearml >= 0.17.6rc1

75
setup.py Normal file
View File

@ -0,0 +1,75 @@
"""
`clearml-serving` - Model-Serving Orchestration and Repository Solution
https://github.com/allegroai/clearml-serving
"""
import os.path
# Always prefer setuptools over distutils
from setuptools import setup, find_packages
def read_text(filepath):
with open(filepath, "r", encoding="utf-8") as f:
return f.read()
here = os.path.dirname(__file__)
# Get the long description from the README file
long_description = read_text(os.path.join(here, 'README.md'))
def read_version_string(version_file):
for line in read_text(version_file).splitlines():
if line.startswith('__version__'):
delim = '"' if '"' in line else "'"
return line.split(delim)[1]
else:
raise RuntimeError("Unable to find version string.")
version = read_version_string("clearml_serving/version.py")
requirements = read_text(os.path.join(here, 'requirements.txt')).splitlines()
setup(
name='clearml-serving',
version=version,
description='clearml-serving - Model-Serving Orchestration and Repository Solution',
long_description=long_description,
long_description_content_type='text/markdown',
# The project's main homepage.
url='https://github.com/allegroai/clearml-serving.git',
author='Allegroai',
author_email='clearml@allegro.ai',
license='Apache License 2.0',
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Science/Research',
'Operating System :: POSIX :: Linux',
'Operating System :: MacOS :: MacOS X',
'Operating System :: Microsoft',
'Topic :: Scientific/Engineering :: Artificial Intelligence',
'Topic :: Software Development',
'Topic :: Software Development :: Version Control',
'Topic :: System :: Logging',
'Topic :: System :: Monitoring',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'License :: OSI Approved :: Apache Software License',
],
keywords='clearml mlops devops trains development machine deep learning version control machine-learning '
'machinelearning deeplearning deep-learning model-serving',
packages=find_packages(exclude=['contrib', 'docs', 'data', 'examples', 'tests']),
install_requires=requirements,
# To provide executable scripts, use entry points in preference to the
# "scripts" keyword. Entry points provide cross-platform support and allow
# pip to create the appropriate form of executable for the target platform.
entry_points={
'console_scripts': [
'clearml-serving = clearml_serving.__main__:main',
],
},
)