diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1e12a66 --- /dev/null +++ b/.gitignore @@ -0,0 +1,37 @@ +# python build +dist/ +build/ +*.egg-info/ + + +# Compiled Python bytecode +*.py[cod] + +# Log files +*.log + +# JetBrains IDE +.idea/ + +# Generated by MacOS +.DS_Store + +# Generated by Windows +Thumbs.db + +# Applications +*.app +*.exe +*.war + +# Large media files +*.mp4 +*.tiff +*.avi +*.flv +*.mov +*.wmv + +# models +*.pbtxt +*.h5 diff --git a/README.md b/README.md new file mode 100644 index 0000000..3c9e43f --- /dev/null +++ b/README.md @@ -0,0 +1,144 @@ + +
+ + + +**ClearML Serving - ML-Ops made easy** + +## **`clearml-serving`
Model-Serving Orchestration and Repository Solution** + + +[![GitHub license](https://img.shields.io/github/license/allegroai/clearml-serving.svg)](https://img.shields.io/github/license/allegroai/clearml-serving.svg) +[![PyPI pyversions](https://img.shields.io/pypi/pyversions/clearml-serving.svg)](https://img.shields.io/pypi/pyversions/clearml-serving.svg) +[![PyPI version shields.io](https://img.shields.io/pypi/v/clearml-serving.svg)](https://img.shields.io/pypi/v/clearml-serving.svg) +[![PyPI status](https://img.shields.io/pypi/status/clearml-serving.svg)](https://pypi.python.org/pypi/clearml-serving/) +[![Slack Channel](https://img.shields.io/badge/slack-%23clearml--community-blueviolet?logo=slack)](https://join.slack.com/t/allegroai-trains/shared_invite/zt-c0t13pty-aVUZZW1TSSSg2vyIGVPBhg) + + +
+ +![alt screenshots](docs/webapp_screenshots.gif "screenshots") + + +**`clearml-serving`** is a command line utility for the flexible orchestration of your model deployment. +**`clearml-serving`** can make use of a variety of serving engines (**Nvidia Triton, OpenVino Model Serving, KFServing**) +setting them up for serving wherever you designate a ClearML Agent or on your ClearML Kubernetes cluster + +Features: +* Spin serving engines on your Kubernetes cluster or ClearML Agent machine from CLI +* Full usage & performance metrics integrated with ClearML UI +* Multi-model support in a single serving engine container +* Automatically deploy new model versions +* Support Canary model releases +* Integrates to ClearML Model Repository +* Deploy & upgrade endpoints directly from ClearML UI +* Programmatic interface for endpoint/versions/metric control + + +## Installing ClearML Serving + +1. Setup your [**ClearML Server**](https://github.com/allegroai/clearml-server) or use the [Free tier Hosting](https://app.community.clear.ml) +2. Connect your ClearML Worker(s) to your **ClearML Server** (see [**ClearML Agent**](https://github.com/allegroai/clearml-agent) / [Kubernetes integration](https://github.com/allegroai/clearml-agent#kubernetes-integration-optional)) +3. Install `clearml-serving` (Note: `clearml-serving` is merely a control utility, it does not require any resources for actual serving) +```bash +pip install clearml-serving +``` + +## Using ClearML Serving + +Clearml-Serving will automatically serve *published* models from your ClearML model repository, so the first step is getting a model into your ClearML model repository. +Background: When using `clearml` in your training code, any model stored by your python code is automatically registered (and, optionally, uploaded) to the model repository. This auto-magic logging is key for continuous model deployment. +To learn more on training models and the ClearML model repository, see the [ClearML documentation](https://allegro.ai/clearml/docs) + +### Training a toy model with Keras (about 2 minutes on a laptop) + +The main goal of `clearml-serving` is to seamlessly integrate with the development process and the model repository. +This is achieved by combining ClearML's auto-magic logging which creates and uploads models directly from +the python training code, with accessing these models as they are automatically added into the model repository using the ClearML Server's REST API and its pythonic interface. +Let's demonstrate this seamless integration by training a toy Keras model to classify images based on the MNIST dataset. +Once we have a trained model in the model repository we will serve it using `clearml-serving`. + +We'll also see how we can retrain another version of the model, and have the model serving engine automatically upgrade to the new model version. + +#### Keras mnist toy train example (single epoch mock training): + +1. install `tensorflow` (and of course `cleamrl`) + ```bash + pip install "tensorflow>2" clearml + ``` + +2. Execute the training code + ```bash + cd examples/keras + python keras_mnist.py + ``` + **Notice:** The only required integration code with `clearml` are the following two lines: + ```python + from clearml import Task + task = Task.init(project_name="examples", task_name="Keras MNIST serve example", output_uri=True) + ``` + This call will make sure all outputs are automatically logged to the ClearML Server, this includes: console, Tensorboard, cmdline arguments, git repo etc. + It also means any model stored by the code will be automatically uploaded and logged in the ClearML model repository. + + +3. Review the models in the ClearML web UI: + Go to the "Projects" section of your ClearML server ([free hosted](https://app.community.clear.ml) or [self-deployed](https://github.com/allegroai/clearml-server)). + in the "examples" project, go to the Models tab (model repository). + We should have a model named "Keras MNIST serve example - serving_model". + Once a model-serving service is available, Right-clicking on the model and selecting "Publish" will trigger upgrading the model on the serving engine container. + +Next we will spin the Serving Service and the serving-engine + +### Serving your models + +In order to serve your models, `clearml-serving` will spawn a serving service which stores multiple endpoints and their configuration, +collects metric reports, and updates models when new versions are published in the model repository. +In addition, a serving engine is launched, which is the container actually running the inference engine. +(Currently supported engines are Nvidia-Triton, coming soon are Intel OpenVIno serving-engine and KFServing) + +Now that we have a published model in the ClearML model repository, we can spin a serving service and a serving engine. + +Starting a Serving Service: + +1. Create a new serving instance. + This is the control plane Task, we will see all its configuration logs and metrics in the "serving" project. We can have multiple serving services running in the same system. + In this example we will make use of Nvidia-Triton engines. +```bash +clearml-serving triton --project "serving" --name "serving example" +``` +2. Add models to the serving engine with specific endpoints. +Reminder: to view your model repository, login to your ClearML account, + go to "examples" project and review the "Models" Tab +```bash +clearml-serving triton --endpoint "keras_mnist" --model-project "examples" --model-name "Keras MNIST serve example - serving_model" +``` + +3. Launch the serving service. + The service will be launched on your "services" queue, which by default runs services on the ClearML server machine. + (Read more on services queue [here](https://allegro.ai/clearml/docs/docs/concepts_fundamentals/concepts_fundamentals_clearml_server.html#clearml-agent-services-container)) + We set our serving-engine to launch on the "default" queue, +```bash +clearml-serving launch --queue default +``` + +4. Optional: If you do not have a machine connected to your ClearML cluster, either read more on our Kubernetes integration, or spin a bare-metal worker and connect it with your ClearML Server. + `clearml-serving` is leveraging the orchestration capabilities of `ClearML` to launch the serving engine on the cluster. + Read more on the [ClearML Agent](https://github.com/allegroai/clearml-agent) orchestration module [here](https://allegro.ai/clearml/docs/docs/concepts_fundamentals/concepts_fundamentals_clearml_agent.html) + If you have not yet setup a ClearML worker connected to your `clearml` account, you can do this now using: + ```bash + pip install clearml-agent + clearml-agent daemon --docker --queue default --detached + ``` + + +**We are done!** +To test the new served model, you can `curl` to the new endpoint: +```bash +curl :8000/v2/models/keras_mnist/versions/1 +``` + +**Notice**: If we re-run our keras training example and publish a new model in the repository, the engine will automatically update to the new model. + +Further reading on advanced topics [here](coming-soon) + + diff --git a/clearml_serving/__init__.py b/clearml_serving/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/clearml_serving/__main__.py b/clearml_serving/__main__.py new file mode 100644 index 0000000..b0b32a7 --- /dev/null +++ b/clearml_serving/__main__.py @@ -0,0 +1,164 @@ +import json +import os +from argparse import ArgumentParser, FileType + +from .serving_service import ServingService + + +def restore_state(args): + session_state_file = os.path.expanduser('~/.clearml_serving.json') + # noinspection PyBroadException + try: + with open(session_state_file, 'rt') as f: + state = json.load(f) + except Exception: + state = {} + + args.id = getattr(args, 'id', None) or state.get('id') + return args + + +def store_state(args, clear=False): + session_state_file = os.path.expanduser('~/.clearml_serving.json') + if clear: + state = {} + else: + state = {str(k): str(v) if v is not None else None + for k, v in args.__dict__.items() if not str(k).startswith('_') and k not in ('command', )} + # noinspection PyBroadException + try: + with open(session_state_file, 'wt') as f: + json.dump(state, f, sort_keys=True) + except Exception: + pass + + +def cmd_triton(args): + if not args.id and not args.name: + raise ValueError("Serving service must have a name, use --name ") + + if args.id: + a_serving = ServingService(task_id=args.id) + else: + a_serving = ServingService(task_project=args.project, task_name=args.name, engine_type='triton') + args.id = a_serving.get_id() + + if args.endpoint: + print("Nvidia Triton Engine ID: {} - Adding serving endpoint: \n".format(args.id) + + ("model-project: '{}', model-name: '{}', model-tags: '{}', config-file: '{}'".format( + args.model_project or '', + args.model_name or '', + args.model_tags or '', + args.config or '') if not args.model_id else + "model-id: '{}', config-file: '{}'".format(args.model_id or '', args.config or ''))) + + if not args.endpoint and (args.model_project or args.model_tags or args.model_id or args.model_name): + raise ValueError("Serving endpoint must be provided, add --endpoint ") + + if args.endpoint: + a_serving.add_model_serving( + serving_url=args.endpoint, + model_project=args.model_project, + model_name=args.model_name, + model_tags=args.model_tags, + model_ids=[args.model_id] if args.model_id else None, + config_file=args.config, + max_versions=args.versions, + ) + + a_serving.serialize(force=True) + store_state(args) + + +def cmd_launch(args): + print('Launching Serving Engine: service: {}, queue: {}'.format(args.id, args.queue)) + + if not args.id: + raise ValueError("Serving service must specify serving service ID, use --id ") + + a_serving = ServingService(task_id=args.id) + + if a_serving.get_engine_type() not in ('triton',): + raise ValueError("Error, serving engine type \'{}\' is not supported".format(a_serving.get_engine_type())) + + # launch services queue + a_serving.launch(queue_name=args.service_queue) + # launch engine + a_serving.launch_engine(queue_name=args.queue) + + +def cli(): + title = 'clearml-serving - CLI for launching ClearML serving engine' + print(title) + parser = ArgumentParser(prog='clearml-serving', description=title) + subparsers = parser.add_subparsers(help='Serving engine commands', dest='command') + + # create the launch command + parser_launch = subparsers.add_parser('launch', help='Launch a previously configured serving service') + parser_launch.add_argument( + '--id', default=None, type=str, + help='Specify a previously configured service ID, if not provided use the last created service') + parser_launch.add_argument( + '--queue', default=None, type=str, required=True, + help='Specify the clearml queue to be used for the serving engine server') + parser_launch.add_argument( + '--service-queue', default='services', type=str, + help='Specify the service queue to be used for the serving service, default: services queue') + parser_launch.set_defaults(func=cmd_launch) + + # create the parser for the "triton" command + parser_trt = subparsers.add_parser('triton', help='Nvidia Triton Serving Engine') + parser_trt.add_argument( + '--id', default=None, type=str, + help='Add configuration to running serving session, pass serving Task ID, ' + 'if passed ignore --name / --project') + parser_trt.add_argument( + '--name', default=None, type=str, + help='Give serving service a name, should be a unique name') + parser_trt.add_argument( + '--project', default='DevOps', type=str, + help='Serving service project name, default: DevOps') + parser_trt.add_argument( + '--endpoint', required=False, type=str, + help='Serving endpoint, one per model, unique ') + parser_trt.add_argument( + '--versions', type=int, + help='Serving endpoint, support multiple versions, ' + 'max versions to deploy (version number always increase). Default (no versioning).') + parser_trt.add_argument( + '--config', required=False, type=FileType, + help='Model `config.pbtxt` file, one per model, order matching with models') + parser_trt.add_argument( + '--model-id', type=str, + help='(Optional) Model ID to deploy, if passed model-project/model-name/model-tags are ignored') + parser_trt.add_argument( + '--model-project', type=str, help='Automatic model deployment and upgrade, select model project (exact match)') + parser_trt.add_argument( + '--model-name', type=str, help='Automatic model deployment and upgrade, select model name (exact match)') + parser_trt.add_argument( + '--model-tags', nargs='*', type=str, + help='Automatic model deployment and upgrade, select model name tags to include, ' + 'model has to have all tags to be deployed/upgraded') + parser_trt.set_defaults(func=cmd_triton) + + args = parser.parse_args() + args = restore_state(args) + + if args.command: + args.func(args) + else: + parser.print_help() + + +def main(): + try: + cli() + except KeyboardInterrupt: + print('\nUser aborted') + except Exception as ex: + print('\nError: {}'.format(ex)) + exit(1) + + +if __name__ == '__main__': + main() diff --git a/clearml_serving/service.py b/clearml_serving/service.py new file mode 100644 index 0000000..89de65f --- /dev/null +++ b/clearml_serving/service.py @@ -0,0 +1,17 @@ +from time import sleep +from clearml import Task +from clearml_serving.serving_service import ServingService + + +def main(): + # we should only be running in remotely by an agent + task = Task.init() + serving = ServingService(task=task) + while True: + serving.update() + serving.stats() + sleep(60.) + + +if __name__ == '__main__': + main() diff --git a/clearml_serving/serving_service.py b/clearml_serving/serving_service.py new file mode 100644 index 0000000..f036d74 --- /dev/null +++ b/clearml_serving/serving_service.py @@ -0,0 +1,568 @@ +import json +import shutil +from logging import getLogger +from pathlib import Path as Path3 +from time import time +from typing import Optional, Union, Dict, Sequence + +from attr import attrib, attrs, asdict +from pathlib2 import Path + +from clearml import Task, Model, InputModel + + +class ServingService(object): + _config_pbtxt_section = 'config.pbtxt' + _supported_serving_engines = ('triton', 'ovms', 'kfserving') + + @attrs + class EndPoint(object): + serving_url = attrib(type=str) + model_ids = attrib(type=list) + model_project = attrib(type=str) + model_name = attrib(type=str) + model_tags = attrib(type=list) + model_config_blob = attrib(type=str, default=None) + max_num_revisions = attrib(type=int, default=None) + versions = attrib(type=dict, default={}) + + def as_dict(self): + return asdict(self) + + def __init__(self, task_id=None, task_project=None, task_name=None, task=None, engine_type='triton'): + # type: (Optional[str], Optional[str], Optional[str], Optional[Task], Optional[str]) -> None + """ + :param task_id: Optional specify existing Task ID of the ServingService + :param task_project: Select the project where the new ServingService task will be created + :param task_name: Specify the Task name for the newly created ServingService + :param task: Optional pass existing ServingService Task object + :param engine_type: Specify the serving engine Type. Examples: triton, ovms, kfserving + """ + assert engine_type in self._supported_serving_engines + + if task: + self._task = task + elif task_id: + self._task = Task.get_task(task_id=task_id) + else: + # noinspection PyProtectedMember + if Task._query_tasks(project_name=task_project, task_name=task_name): + self._task = Task.get_task(project_name=task_project, task_name=task_name) + else: + self._task = Task.create( + project_name=task_project, task_name=task_name, task_type=Task.TaskTypes.service, + repo="https://github.com/allegroai/clearml-serving.git", + branch="master", + script="clearml_serving/service.py", + working_directory=".", + add_task_init_call=False, + ) + self._task.set_system_tags(list(self._task.get_system_tags()) + ['serving']) + + # self._current_serving_endpoints = {'an_enpoint_url': {1: 'model_id'}} + self._current_serving_endpoints = {} # type: Dict[str, Dict[int, str]] + # self._endpoints = {'an_enpoint_url': ServingService.EndPoint()} + self._endpoints = {} # type: Dict[str, ServingService.EndPoint] + self._engine_type = engine_type + self._dirty = False + self._last_update_step = None + # try to deserialize from Task + # noinspection PyBroadException + try: + self._deserialize() + except Exception: + pass + + def add_model_serving( + self, + serving_url, # type: str + model_ids=None, # type: Optional[Sequence[str]] + model_project=None, # type: Optional[str] + model_name=None, # type: Optional[str] + model_tags=None, # type: Optional[Sequence[str]] + config_file=None, # type: Optional[Union[Path, Path3, str]] + max_versions=1, # type: Optional[int] + ): + """ + Add new model serving endpoint, automatically published + + :param serving_url: + :param model_ids: + :param model_project: + :param model_name: + :param model_tags: + :param config_file: + :param max_versions: + :return: + """ + if not serving_url: + raise ValueError("serving_url is required") + + if model_tags and not isinstance(model_tags, (list, tuple)): + raise ValueError("model_tags must be a list of strings") + + # normalize endpoint url + serving_url = str(serving_url).strip('/') + + endpoint = self.EndPoint( + serving_url=serving_url, + model_ids=list(model_ids) if model_ids else None, + model_name=model_name, + model_project=model_project, + model_tags=model_tags, + max_num_revisions=max_versions or None, + versions={}, + model_config_blob='', + ) + # load config file + if config_file: + with open(str(config_file), 'rt') as f: + endpoint.model_config_blob = f.read() + else: + # Look for the config on the Model generated Task + found_models = Model.query_models(project_name=model_project, model_name=model_name, tags=model_tags) or [] + + selected_model = None + # find the first model with config.pbtxt configuration + # prefer published models + found_models = [m for m in found_models if m.published] + [m for m in found_models if not m.published] + for m in found_models: + task_id = m.task + task = Task.get_task(task_id=task_id) + config_pbtxt = task.get_configuration_object(self._config_pbtxt_section) + if config_pbtxt and str(config_pbtxt).strip(): + endpoint.model_config_blob = config_pbtxt + selected_model = m + break + + if not selected_model: + raise ValueError( + "Requested Model project={} name={} tags={} not found. 'config.pbtxt' could not be inferred. " + "please provide specific config.pbtxt definition.".format(model_project, model_name, model_tags)) + elif len(found_models) > 1: + getLogger('clearml-serving').warning( + "Found more than one Model, using model id={}".format(selected_model.id)) + + self._endpoints[serving_url] = endpoint + self._dirty = True + + def launch(self, queue_name='services', queue_id=None, force=False, verbose=True): + # type: (Optional[str], Optional[str], bool, bool) -> None + """ + Launch serving service on a remote machine using the specified queue + + :param queue_name: Queue name to launch the serving service control plane + :param queue_id: specify queue id (unique stand stable) instead of queue_name + :param force: if False check if service Task is already running before enqueuing + :param verbose: If True print progress to console + """ + # check if we are not already running + if not force and ((self._task.data.execution.queue and self._task.status == 'in_progress') + or self._task.status == 'queued'): + if verbose: + print('Serving service already running') + else: + if verbose: + print('Launching Serving service on {} queue'.format(queue_id or queue_name)) + self.update_endpoint_graph(force=True) + self.update_model_endpoint_state() + self.serialize() + self._task.flush(wait_for_uploads=True) + self._task.reset() + self._task.enqueue(task=self._task, queue_name=queue_name, queue_id=queue_id) + + def launch_engine(self, queue_name, queue_id=None, verbose=True): + # type: (Optional[str], Optional[str], bool) -> None + """ + Launch serving engine on a specific queue + + :param queue_name: Queue name to launch the engine service running the inference on. + :param queue_id: specify queue id (unique stand stable) instead of queue_name + :param verbose: If True print progress to console + """ + + # todo: add more engines + if self._engine_type == 'triton': + # create the serving engine Task + engine_task = Task.create( + project_name=self._task.get_project_name(), + task_name="triton serving engine", + task_type=Task.TaskTypes.inference, + repo="https://github.com/allegroai/clearml-serving.git", + branch="master", + script="clearml_serving/triton_helper.py", + working_directory=".", + docker="nvcr.io/nvidia/tritonserver:21.03-py3 --ipc=host -p 8000:8000 -p 8001:8001 -p 8002:8002", + argparse_args=[('serving_id', self._task.id), ], + add_task_init_call=False, + ) + if verbose: + print('Launching engine {} on queue {}'.format(self._engine_type, queue_id or queue_name)) + engine_task.enqueue(task=engine_task, queue_name=queue_name, queue_id=queue_id) + + def update_endpoint_graph(self, force=False): + # type: (bool) -> None + """ + Update the endpoint serving graph + + :param force: If True always update, otherwise skip if service was not changed since lat time + """ + if not force and not self._dirty: + return + + # Generate configuration table and details + table_values = [["Endpoint", "Model ID", "Model Project", "Model Name", "Model Tags", "Max Versions"]] + for endpoint in sorted(self._endpoints.keys()): + n = self._endpoints[endpoint] + table_values.append([ + str(n.serving_url or ''), + str(n.model_ids or ''), + str(n.model_project or ''), + str(n.model_name or ''), + str(n.model_tags or ''), + str(n.max_num_revisions or '') + ]) + self._task.get_logger().report_table( + title='Serving Endpoint Configuration', series='Details', iteration=0, table_plot=table_values, + extra_layout={"title": "Model Endpoints Details"}) + + # generate current endpoint view + sankey_node = dict( + label=[], + color=[], + customdata=[], + hovertemplate='%{customdata}', + hoverlabel={"align": "left"}, + ) + sankey_link = dict( + source=[], + target=[], + value=[], + hovertemplate='', + ) + # root + sankey_node['color'].append("mediumpurple") + sankey_node['label'].append('{}'.format('serving')) + sankey_node['customdata'].append("") + + # Generate table and details + table_values = [["Endpoint", "Version", "Model ID"]] + # noinspection PyProtectedMember + base_url = self._task._get_app_server() + '/projects/*/models/{model_id}/general' + for i, serve_url in enumerate(sorted(self._endpoints.keys())): + ep = self._endpoints[serve_url] + sankey_node['color'].append("blue") + sankey_node['label'].append('{}'.format(serve_url)) + sankey_node['customdata'].append( + "project: {}
name: {}
tags: {}".format( + ep.model_project or '', ep.model_name or '', ep.model_tags or '') + ) + sankey_link['source'].append(0) + sankey_link['target'].append(i + 1) + sankey_link['value'].append(1. / len(self._endpoints)) + + for v in sorted(self._current_serving_endpoints.get(serve_url, [])): + model_id = self._current_serving_endpoints[serve_url][v] + href = ' {} '.format(base_url.format(model_id=model_id), model_id) + table_values.append([str(serve_url), str(v), href]) + sankey_node['color'].append("lightblue") + sankey_node['label'].append('{}'.format(v)) + sankey_node['customdata'].append(model_id) + + sankey_link['source'].append(i + 1) + sankey_link['target'].append(len(sankey_node['color']) - 1) + sankey_link['value'].append(1. / len(self._current_serving_endpoints[serve_url])) + + # create the sankey graph + dag_flow = dict( + link=sankey_link, + node=sankey_node, + textfont=dict(color='rgba(0,0,0,255)', size=10), + type='sankey', + orientation='h' + ) + fig = dict(data=[dag_flow], layout={'xaxis': {'visible': False}, 'yaxis': {'visible': False}}) + + self._task.get_logger().report_plotly( + title='Model Serving Endpoints', series='', iteration=0, figure=fig) + + # report detailed table + self._task.get_logger().report_table( + title='Serving Endpoint', series='Details', iteration=0, table_plot=table_values, + extra_layout={"title": "Model Endpoints Details"}) + + self._dirty = False + + def update_model_endpoint_state(self): + # type: () -> bool + """ + Update model endpoint state from the model repository + + :return: True if endpoints were updated + """ + + for endpoint, node in self._endpoints.items(): + # model ID supersedes everything + if node.model_ids: + model_ids = node.model_ids + else: + # get list of models sorted by descending update time + models = Model.query_models( + project_name=node.model_project, + model_name=node.model_name, + tags=node.model_tags + ) + # prefer published models + model_ids = [m.id for m in models if m.published] + [m.id for m in models if not m.published] + + cur_endpoint = self._current_serving_endpoints.get(node.serving_url, {}) + cur_endpoint = {int(k): v for k, v in cur_endpoint.items() if v in model_ids} + cur_endpoint_m_ids = list(cur_endpoint.values()) + max_v = max(list(cur_endpoint.keys()) or [0]) + for i, m_id in enumerate(model_ids): + # only pick the latest in the history + if node.max_num_revisions and max_v >= node.max_num_revisions: + break + + if m_id in cur_endpoint_m_ids: + continue + max_v += 1 + cur_endpoint[max_v] = m_id + + # check if we need to update, + if self._current_serving_endpoints.get(node.serving_url) != cur_endpoint: + # set dirty flag + self._dirty = True + # store updated results + self._current_serving_endpoints[node.serving_url] = cur_endpoint + + return self._dirty + + def stats(self): + pass + + def get_endpoints(self): + # type: () -> Dict[str, ServingService.EndPoint] + """ + return the internal endpoints configuration + + :return: dict where the keys is the endpoint url and the value is the endpoint configuration + """ + return self._endpoints + + def get_endpoint_version_model_id(self, serving_url): + # type: (str) -> Dict[int, str] + """ + Return dict with model versions and model id for the specific serving url + If serving url is not found, return None + + :param serving_url: sering url string + + :return: dictionary keys are the versions (integers) and values are the model IDs (str) + """ + return self._current_serving_endpoints.get(serving_url) or {} + + def _serialize(self): + configuration = dict() + for name, ep in self._endpoints.items(): + # noinspection PyProtectedMember + self._task.set_configuration_object( + name="model.{}".format(name), + description='Model Serving Configuration', + config_type='pbtxt', + config_text=ep.model_config_blob) + ep_conf = ep.as_dict() + ep_conf.pop('model_config_blob', None) + configuration['"{}"'.format(name)] = ep_conf + # noinspection PyProtectedMember + self._task._set_configuration( + config_dict=configuration, name='endpoints', + config_type='hocon', description='Serving Endpoints Configuration') + # set configuration of current served endpoints + # noinspection PyProtectedMember + self._task._set_configuration( + config_dict=self._current_serving_endpoints, name='serving_state', + config_type='hocon', description='Current Serving Endpoints State', + ) + serving = dict(engine=self._engine_type) + self._task.connect(serving, name='serving') + + def _deserialize(self): + # type: () -> bool + """ + deserialize internal state from Task backend + + :return: return True if new state a was updated. + """ + # update if the task was updated + if self._endpoints: + last_update = self._task.data.last_update + try: + # noinspection PyProtectedMember + if last_update == self._task._get_last_update(): + return True + except AttributeError: + # support old clearml packages + pass + + self._task.reload() + + # noinspection PyProtectedMember + configuration = self._task._get_configuration_dict(name='endpoints') + if not configuration: + return False + + self._endpoints = {} + self._current_serving_endpoints = {} + serving = dict(engine='') + task_parameters = self._task.get_parameters_as_dict() + serving.update(task_parameters.get('serving', {})) + self._engine_type = serving['engine'] + + for name, endpoint in configuration.items(): + ep = self.EndPoint(model_config_blob='', **endpoint) + ep.model_config_blob = self._task.get_configuration_object( + name="model.{}".format(ep.serving_url)) + self._endpoints[ep.serving_url] = ep + + # get configuration of current served endpoints + # noinspection PyProtectedMember + self._current_serving_endpoints = self._task._get_configuration_dict(name='serving_state') + + self._dirty = True + return True + + def update(self, force=False): + # type: (bool) -> bool + """ + Update internal endpoint state based on Task configuration and model repository + + :param force: if True force update + + :return: True if internal state updated. + """ + if not self._task: + return False + + # store current internal state + state_hash = self.__state_hash() + + if not self._deserialize(): + return False + + # check if current internal state changed + if not force and state_hash == self.__state_hash(): + print("Skipping update, nothing changed") + return False + + return self.update_model_endpoint_state() + + def get_id(self): + # type: () -> str + """ + Return the Serving Service Task ID + :return: Unique Task ID (str) + """ + return self._task.id + + def get_engine_type(self): + # type: () -> str + """ + return the engine type used ib the serving service + :return: engine type (str). example: triton, ovms, kfserving + """ + return self._engine_type + + def serialize(self, force=False): + # type: (bool) -> None + """ + Serialize current service state to the Task + + :param force: If True synchronize an aborted/completed Task + """ + if force and self._task.status not in (Task.TaskStatusEnum.created, Task.TaskStatusEnum.in_progress): + self._task.mark_started(force=True) + + self._serialize() + + def triton_model_service_update_step(self, model_repository_folder=None, verbose=True): + # type: (Optional[str], bool) -> None + + # check if something changed since last time + if not self.update(force=self._last_update_step is None): + return + + self._last_update_step = time() + + if not model_repository_folder: + model_repository_folder = '/models/' + + if verbose: + print('Updating local model folder: {}'.format(model_repository_folder)) + + for url, endpoint in self.get_endpoints().items(): + folder = Path(model_repository_folder) / url + folder.mkdir(parents=True, exist_ok=True) + with open((folder / 'config.pbtxt').as_posix(), 'wt') as f: + f.write(endpoint.model_config_blob) + + # download model versions + for version, model_id in self.get_endpoint_version_model_id(serving_url=url).items(): + model_folder = folder / str(version) + + model_folder.mkdir(parents=True, exist_ok=True) + model = None + # noinspection PyBroadException + try: + model = InputModel(model_id) + local_path = model.get_local_copy() + except Exception: + local_path = None + if not local_path: + print("Error retrieving model ID {} []".format(model_id, model.url if model else '')) + continue + + local_path = Path(local_path) + + if verbose: + print('Update model v{} in {}'.format(version, model_folder)) + + # if this is a folder copy every and delete the temp folder + if local_path.is_dir(): + # we assume we have a `tensorflow.savedmodel` folder + model_folder /= 'model.savedmodel' + model_folder.mkdir(parents=True, exist_ok=True) + # rename to old + old_folder = None + if model_folder.exists(): + old_folder = model_folder.parent / '.old.{}'.format(model_folder.name) + model_folder.replace(old_folder) + if verbose: + print('copy model into {}'.format(model_folder)) + shutil.copytree( + local_path.as_posix(), model_folder.as_posix(), symlinks=False, + ) + if old_folder: + shutil.rmtree(path=old_folder.as_posix()) + # delete temp folder + shutil.rmtree(local_path.as_posix()) + else: + # single file should be moved + target_path = model_folder / local_path.name + old_file = None + if target_path.exists(): + old_file = target_path.parent / '.old.{}'.format(target_path.name) + target_path.replace(old_file) + shutil.move(local_path.as_posix(), target_path.as_posix()) + if old_file: + old_file.unlink() + + def __state_hash(self): + # type: () -> int + """ + Return Hash of the internal state (use only for in process comparison + :return: hash int + """ + return hash(json.dumps( + [self._current_serving_endpoints, {k: v.as_dict() for k, v in self._endpoints.items()}], + sort_keys=True)) diff --git a/clearml_serving/triton_helper.py b/clearml_serving/triton_helper.py new file mode 100644 index 0000000..8428d94 --- /dev/null +++ b/clearml_serving/triton_helper.py @@ -0,0 +1,214 @@ +import re +import subprocess +from argparse import ArgumentParser +from time import time +from typing import Optional + +from pathlib2 import Path + +from clearml import Task, Logger +from clearml.backend_api.utils import get_http_session_with_retry +from clearml_serving.serving_service import ServingService + + +class TritonHelper(object): + _metric_line_parsing = r"(\w+){(gpu_uuid=\"[\w\W]*\",)?model=\"(\w+)\",\s*version=\"(\d+)\"}\s*([0-9.]*)" + _default_metrics_port = 8002 + + def __init__( + self, + args, # Any + task, # type: Task + serving_id, # type: str + metric_host=None, # type: Optional[str] + metric_port=None, # type: int + ): + # type: (...) -> None + self._http_session = get_http_session_with_retry() + self.args = dict(**args.__dict__) if args else {} + self.task = task + self.serving_id = serving_id + self.metric_host = metric_host or '0.0.0.0' + self.metric_port = metric_port or self._default_metrics_port + self._parse_metric = re.compile(self._metric_line_parsing) + self._timestamp = time() + print('String Triton Helper service\n{}\n'.format(self.args)) + + def report_metrics(self, remote_logger): + # type: (Optional[Logger]) -> bool + # iterations are seconds from start + iteration = int(time() - self._timestamp) + + report_msg = "reporting metrics: relative time {} sec".format(iteration) + self.task.get_logger().report_text(report_msg) + if remote_logger: + remote_logger.report_text(report_msg) + + # noinspection PyBroadException + try: + request = self._http_session.get('http://{}:{}/metrics'.format( + self.metric_host, self.metric_port)) + if not request.ok: + return False + content = request.content.decode().split('\n') + except Exception: + return False + + for line in content: + line = line.strip() + if not line or line.startswith('#'): + continue + # noinspection PyBroadException + try: + metric, gpu_uuid, variant, version, value = self._parse_metric.match(line).groups() + value = float(value) + except Exception: + continue + self.task.get_logger().report_scalar( + title=metric, + series='{}.v{}'.format(variant, version), + iteration=iteration, + value=value + ) + # on the remote logger we add our own Task ID (unique ID), + # to support multiple servers reporting to the same service controller + if remote_logger: + remote_logger.report_scalar( + title=metric, + series='{}.v{}.{}'.format(variant, version, self.task.id), + iteration=iteration, + value=value + ) + + def maintenance_daemon( + self, + local_model_repo='/models', # type: str + update_frequency_sec=60.0, # type: float + metric_frequency_sec=60.0 # type: float + ): + # type: (...) -> None + + Path(local_model_repo).mkdir(parents=True, exist_ok=True) + + a_service = ServingService(task_id=self.serving_id) + a_service.triton_model_service_update_step(model_repository_folder=local_model_repo) + + # noinspection PyProtectedMember + remote_logger = a_service._task.get_logger() + + # todo: log triton server outputs when running locally + + # we assume we can run the triton server + cmd = [ + 'tritonserver', + '--model-control-mode=poll', + '--model-repository={}'.format(local_model_repo), + '--repository-poll-secs={}'.format(update_frequency_sec), + '--metrics-port={}'.format(self._default_metrics_port), + '--allow-metrics=true', + '--allow-gpu-metrics=true', + ] + for k, v in self.args.items(): + if not v or not str(k).startswith('t_'): + continue + cmd.append('--{}={}'.format(k, v)) + + print('Starting server: {}'.format(cmd)) + proc = subprocess.Popen(cmd) + base_freq = min(update_frequency_sec, metric_frequency_sec) + metric_tic = update_tic = time() + while True: + try: + error_code = proc.wait(timeout=base_freq) + if error_code == 0: + print("triton-server process ended with error code {}".format(error_code)) + return + raise ValueError("triton-server process ended with error code {}".format(error_code)) + except subprocess.TimeoutExpired: + pass + pass + + # update models + if time() - update_tic > update_frequency_sec: + a_service.triton_model_service_update_step(model_repository_folder=local_model_repo) + update_tic = time() + + # update stats + if time() - metric_tic > metric_frequency_sec: + metric_tic = time() + self.report_metrics(remote_logger) + + +def main(): + title = 'clearml-serving - Nvidia Triton Engine Helper' + print(title) + parser = ArgumentParser(prog='clearml-serving', description=title) + parser.add_argument( + '--serving-id', default=None, type=str, required=True, + help='Specify main serving service Task ID') + parser.add_argument( + '--project', default='serving', type=str, + help='Optional specify project for the serving engine Task') + parser.add_argument( + '--name', default='nvidia-triton', type=str, + help='Optional specify task name for the serving engine Task') + parser.add_argument( + '--update-frequency', default=10, type=float, + help='Model update frequency in minutes') + parser.add_argument( + '--metric-frequency', default=1, type=float, + help='Metric reporting update frequency in minutes') + parser.add_argument( + '--t-http-port', type=str, help=' The port for the server to listen on for HTTP requests') + parser.add_argument( + '--t-http-thread-count', type=str, help=' Number of threads handling HTTP requests') + parser.add_argument( + '--t-allow-grpc', type=str, help=' Allow the server to listen for GRPC requests') + parser.add_argument( + '--t-grpc-port', type=str, help=' The port for the server to listen on for GRPC requests') + parser.add_argument( + '--t-grpc-infer-allocation-pool-size', type=str, + help=' The maximum number of inference request/response objects that remain ' + 'allocated for reuse. As long as the number of in-flight requests doesn\'t exceed ' + 'this value there will be no allocation/deallocation of request/response objects') + parser.add_argument( + '--t-pinned-memory-pool-byte-size', type=str, + help=' The total byte size that can be allocated as pinned system ' + 'memory. If GPU support is enabled, the server will allocate pinned ' + 'system memory to accelerate data transfer between host and devices ' + 'until it exceeds the specified byte size. This option will not affect ' + 'the allocation conducted by the backend frameworks. Default is 256 MB') + parser.add_argument( + '--t-cuda-memory-pool-byte-size', type=str, + help='<:> The total byte size that can be allocated as CUDA memory for ' + 'the GPU device. If GPU support is enabled, the server will allocate ' + 'CUDA memory to minimize data transfer between host and devices ' + 'until it exceeds the specified byte size. This option will not affect ' + 'the allocation conducted by the backend frameworks. The argument ' + 'should be 2 integers separated by colons in the format :. This option can be used multiple times, but only ' + 'once per GPU device. Subsequent uses will overwrite previous uses for ' + 'the same GPU device. Default is 64 MB') + parser.add_argument( + '--t-min-supported-compute-capability', type=str, + help=' The minimum supported CUDA compute capability. GPUs that ' + 'don\'t support this compute capability will not be used by the server') + parser.add_argument( + '--t-buffer-manager-thread-count', type=str, + help=' The number of threads used to accelerate copies and other' + 'operations required to manage input and output tensor contents.' + 'Default is 0') + + args = parser.parse_args() + task = Task.init(project_name=args.project, task_name=args.name, task_type=Task.TaskTypes.inference) + helper = TritonHelper(args, task, serving_id=args.serving_id) + # this function will never end + helper.maintenance_daemon( + local_model_repo='/models', + update_frequency_sec=args.update_frequency*60.0, + metric_frequency_sec=args.metric_frequency*60.0, + ) + + +if __name__ == '__main__': + main() diff --git a/clearml_serving/version.py b/clearml_serving/version.py new file mode 100644 index 0000000..e1424ed --- /dev/null +++ b/clearml_serving/version.py @@ -0,0 +1 @@ +__version__ = '0.3.1' diff --git a/docs/webapp_screenshots.gif b/docs/webapp_screenshots.gif new file mode 100644 index 0000000..c2ceca5 Binary files /dev/null and b/docs/webapp_screenshots.gif differ diff --git a/examples/keras/keras_mnist.py b/examples/keras/keras_mnist.py new file mode 100644 index 0000000..90972ec --- /dev/null +++ b/examples/keras/keras_mnist.py @@ -0,0 +1,169 @@ +# ClearML - Keras with Tensorboard example code, automatic logging model and Tensorboard outputs +# +# Train a simple deep NN on the MNIST dataset. +# Then store a model to be served by clearml-serving +import argparse +import os +import tempfile + +import numpy as np +import tensorflow as tf +from pathlib import Path +from tensorflow.keras import utils as np_utils +from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard +from tensorflow.keras.datasets import mnist +from tensorflow.keras.layers import Activation, Dense +from tensorflow.keras.models import Sequential +from tensorflow.keras.optimizers import RMSprop + +from clearml import Task + + +class TensorBoardImage(TensorBoard): + @staticmethod + def make_image(tensor): + from PIL import Image + import io + tensor = np.stack((tensor, tensor, tensor), axis=2) + height, width, channels = tensor.shape + image = Image.fromarray(tensor) + output = io.BytesIO() + image.save(output, format='PNG') + image_string = output.getvalue() + output.close() + return tf.Summary.Image(height=height, + width=width, + colorspace=channels, + encoded_image_string=image_string) + + def on_epoch_end(self, epoch, logs=None): + if logs is None: + logs = {} + super(TensorBoardImage, self).on_epoch_end(epoch, logs) + images = self.validation_data[0] # 0 - data; 1 - labels + img = (255 * images[0].reshape(28, 28)).astype('uint8') + + image = self.make_image(img) + summary = tf.Summary(value=[tf.Summary.Value(tag='image', image=image)]) + self.writer.add_summary(summary, epoch) + + +def create_config_pbtxt(model, config_pbtxt_file): + platform = "tensorflow_savedmodel" + input_name = model.input_names[0] + output_name = model.output_names[0] + input_data_type = "TYPE_FP32" + output_data_type = "TYPE_FP32" + input_dims = str(model.input.shape.as_list()).replace("None", "-1") + output_dims = str(model.output.shape.as_list()).replace("None", "-1") + + config_pbtxt = """ + platform: "%s" + input [ + { + name: "%s" + data_type: %s + dims: %s + } + ] + output [ + { + name: "%s" + data_type: %s + dims: %s + } + ] + """ % ( + platform, + input_name, input_data_type, input_dims, + output_name, output_data_type, output_dims + ) + + with open(config_pbtxt_file, "w") as config_file: + config_file.write(config_pbtxt) + + +def main(): + parser = argparse.ArgumentParser(description='Keras MNIST Example - training CNN classification model') + parser.add_argument('--batch-size', type=int, default=128, help='input batch size for training (default: 128)') + parser.add_argument('--epochs', type=int, default=1, help='number of epochs to train (default: 6)') + args = parser.parse_args() + + # the data, shuffled and split between train and test sets + nb_classes = 10 + (X_train, y_train), (X_test, y_test) = mnist.load_data() + + X_train = X_train.reshape(60000, 784).astype('float32') / 255. + X_test = X_test.reshape(10000, 784).astype('float32') / 255. + print(X_train.shape[0], 'train samples') + print(X_test.shape[0], 'test samples') + + # convert class vectors to binary class matrices + Y_train = np_utils.to_categorical(y_train, nb_classes) + Y_test = np_utils.to_categorical(y_test, nb_classes) + + model = Sequential() + model.add(Dense(512, input_shape=(784,))) + model.add(Activation('relu')) + # model.add(Dropout(0.2)) + model.add(Dense(512)) + model.add(Activation('relu')) + # model.add(Dropout(0.2)) + model.add(Dense(10)) + model.add(Activation('softmax')) + + model2 = Sequential() + model2.add(Dense(512, input_shape=(784,))) + model2.add(Activation('relu')) + + model.summary() + + model.compile( + loss='categorical_crossentropy', + optimizer=RMSprop(), + metrics=['accuracy'] + ) + + # Connecting ClearML with the current process, + # from here on everything is logged automatically + task = Task.init(project_name='examples', task_name='Keras MNIST serve example', output_uri=True) + + # Advanced: setting model class enumeration + labels = dict(('digit_%d' % i, i) for i in range(10)) + task.set_model_label_enumeration(labels) + + output_folder = os.path.join(tempfile.gettempdir(), 'keras_example_new_temp_now') + + board = TensorBoard(histogram_freq=1, log_dir=output_folder, write_images=False) + model_store = ModelCheckpoint(filepath=os.path.join(output_folder, 'weight.{epoch}.hdf5')) + + # load previous model, if it is there + # noinspection PyBroadException + try: + model.load_weights(os.path.join(output_folder, 'weight.1.hdf5')) + except Exception: + pass + + model.fit( + X_train, Y_train, + batch_size=args.batch_size, epochs=args.epochs, + callbacks=[board, model_store], + verbose=1, validation_data=(X_test, Y_test) + ) + score = model.evaluate(X_test, Y_test, verbose=0) + + # store the model in a format that can be served + model.save('serving_model', include_optimizer=False) + + # create the config.pbtxt for triton to be able to serve the model + create_config_pbtxt(model=model, config_pbtxt_file='config.pbtxt') + # store the configuration on the creating Task, + # this will allow us to skip over manually setting the config.pbtxt for `clearml-serving` + task.connect_configuration(configuration=Path('config.pbtxt'), name='config.pbtxt') + + print('Test score: {}'.format(score[0])) + print('Test accuracy: {}'.format(score[1])) + + +if __name__ == '__main__': + main() diff --git a/examples/keras/requirements.txt b/examples/keras/requirements.txt new file mode 100644 index 0000000..68a3593 --- /dev/null +++ b/examples/keras/requirements.txt @@ -0,0 +1,2 @@ +tensorflow>=2.0 +clearml diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7c60ed3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +clearml >= 0.17.6rc1 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..5904169 --- /dev/null +++ b/setup.py @@ -0,0 +1,75 @@ +""" +`clearml-serving` - Model-Serving Orchestration and Repository Solution +https://github.com/allegroai/clearml-serving +""" + +import os.path +# Always prefer setuptools over distutils +from setuptools import setup, find_packages + + +def read_text(filepath): + with open(filepath, "r", encoding="utf-8") as f: + return f.read() + + +here = os.path.dirname(__file__) +# Get the long description from the README file +long_description = read_text(os.path.join(here, 'README.md')) + + +def read_version_string(version_file): + for line in read_text(version_file).splitlines(): + if line.startswith('__version__'): + delim = '"' if '"' in line else "'" + return line.split(delim)[1] + else: + raise RuntimeError("Unable to find version string.") + + +version = read_version_string("clearml_serving/version.py") + +requirements = read_text(os.path.join(here, 'requirements.txt')).splitlines() + +setup( + name='clearml-serving', + version=version, + description='clearml-serving - Model-Serving Orchestration and Repository Solution', + long_description=long_description, + long_description_content_type='text/markdown', + # The project's main homepage. + url='https://github.com/allegroai/clearml-serving.git', + author='Allegroai', + author_email='clearml@allegro.ai', + license='Apache License 2.0', + classifiers=[ + 'Development Status :: 4 - Beta', + 'Intended Audience :: Developers', + 'Intended Audience :: Science/Research', + 'Operating System :: POSIX :: Linux', + 'Operating System :: MacOS :: MacOS X', + 'Operating System :: Microsoft', + 'Topic :: Scientific/Engineering :: Artificial Intelligence', + 'Topic :: Software Development', + 'Topic :: Software Development :: Version Control', + 'Topic :: System :: Logging', + 'Topic :: System :: Monitoring', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'License :: OSI Approved :: Apache Software License', + ], + keywords='clearml mlops devops trains development machine deep learning version control machine-learning ' + 'machinelearning deeplearning deep-learning model-serving', + packages=find_packages(exclude=['contrib', 'docs', 'data', 'examples', 'tests']), + install_requires=requirements, + # To provide executable scripts, use entry points in preference to the + # "scripts" keyword. Entry points provide cross-platform support and allow + # pip to create the appropriate form of executable for the target platform. + entry_points={ + 'console_scripts': [ + 'clearml-serving = clearml_serving.__main__:main', + ], + }, +)