mirror of
https://github.com/clearml/clearml-serving
synced 2025-06-26 18:16:00 +00:00
Add model metric logging
This commit is contained in:
parent
d684169367
commit
4355c1b1f4
7
.gitignore
vendored
7
.gitignore
vendored
@ -2,6 +2,7 @@
|
||||
dist/
|
||||
build/
|
||||
*.egg-info/
|
||||
.tmp/
|
||||
|
||||
|
||||
# Compiled Python bytecode
|
||||
@ -23,6 +24,12 @@ Thumbs.db
|
||||
*.app
|
||||
*.exe
|
||||
*.war
|
||||
*.pkl
|
||||
*.pt
|
||||
*.pb
|
||||
data/
|
||||
runs/
|
||||
variables/
|
||||
|
||||
# Large media files
|
||||
*.mp4
|
||||
|
191
README.md
191
README.md
@ -61,6 +61,73 @@ Features:
|
||||
|
||||
## Installation
|
||||
|
||||
### prerequisites
|
||||
|
||||
* ClearML-Server : Model repository, Service Health, Control plane
|
||||
* Kubernetes / Single-instance Machine : Deploying containers
|
||||
* CLI : Configuration & model deployment interface
|
||||
|
||||
### :nail_care: Initial Setup
|
||||
|
||||
1. Setup your [**ClearML Server**](https://github.com/allegroai/clearml-server) or use the [Free tier Hosting](https://app.clear.ml)
|
||||
2. Setup local access (if you haven't already), see introductions [here](https://clear.ml/docs/latest/docs/getting_started/ds/ds_first_steps#install-clearml)
|
||||
3. Install clearml-serving CLI:
|
||||
```bash
|
||||
pip3 istall clearml-serving
|
||||
```
|
||||
4. Create the Serving Service Controller
|
||||
- `clearml-serving create --name "serving example"`
|
||||
- The new serving service UID should be printed `"New Serving Service created: id=aa11bb22aa11bb22`
|
||||
5. Write down the Serving Service UID
|
||||
6. Clone clearml-serving repository
|
||||
```bash
|
||||
git clone https://github.com/allegroai/clearml-serving.git
|
||||
```
|
||||
7. Edit the environment variables file (`docker/example.env`) with your clearml-server credentials and Serving Service UID. For example, you should have something like
|
||||
```bash
|
||||
cat docker/example.env
|
||||
```
|
||||
```bash
|
||||
CLEARML_WEB_HOST="https://app.clear.ml"
|
||||
CLEARML_API_HOST="https://api.clear.ml"
|
||||
CLEARML_FILES_HOST="https://files.clear.ml"
|
||||
CLEARML_API_ACCESS_KEY="<access_key_here>"
|
||||
CLEARML_API_SECRET_KEY="<secret_key_here>"
|
||||
CLEARML_SERVING_TASK_ID="<serving_service_id_here>"
|
||||
```
|
||||
8. Spin the clearml-serving containers with docker-compose (or if running on Kubernetes use the helm chart)
|
||||
```bash
|
||||
cd docker && docker-compose --env-file example.env -f docker-compose.yml up
|
||||
```
|
||||
If you need Triton support (keras/pytorch/onnx etc.), use the triton docker-compose file
|
||||
```bash
|
||||
cd docker && docker-compose --env-file example.env -f docker-compose-triton.yml up
|
||||
```
|
||||
:muscle: If running on a GPU instance w/ Triton support (keras/pytorch/onnx etc.), use the triton gpu docker-compose file
|
||||
```bash
|
||||
cd docker && docker-compose --env-file example.env -f docker-compose-triton-gpu.yml up
|
||||
```
|
||||
|
||||
> **Notice**: Any model that registers with "Triton" engine, will run the pre/post processing code on the Inference service container, and the model inference itself will be executed on the Triton Engine container.
|
||||
|
||||
|
||||
### :ocean: Optional: advanced setup - S3/GS/Azure access
|
||||
|
||||
To add access credentials and allow the inference containers to download models from your S3/GS/Azure object-storage,
|
||||
add the respected environment variables to your env files (`example.env`)
|
||||
See further details on configuring the storage access [here](https://clear.ml/docs/latest/docs/integrations/storage#configuring-storage)
|
||||
|
||||
```bash
|
||||
AWS_ACCESS_KEY_ID
|
||||
AWS_SECRET_ACCESS_KEY
|
||||
AWS_DEFAULT_REGION
|
||||
|
||||
GOOGLE_APPLICATION_CREDENTIALS
|
||||
|
||||
AZURE_STORAGE_ACCOUNT
|
||||
AZURE_STORAGE_KEY
|
||||
```
|
||||
|
||||
### :information_desk_person: Concepts
|
||||
|
||||
**CLI** - Secure configuration interface for on-line model upgrade/deployment on running Serving Services
|
||||
@ -77,24 +144,6 @@ Features:
|
||||
|
||||
**Dashboards** - Customizable dashboard-ing solution on top of the collected statistics, e.g. Grafana
|
||||
|
||||
### prerequisites
|
||||
|
||||
* ClearML-Server : Model repository, Service Health, Control plane
|
||||
* Kubernetes / Single-instance VM : Deploying containers
|
||||
* CLI : Configuration & model deployment interface
|
||||
|
||||
|
||||
### :nail_care: Initial Setup
|
||||
|
||||
1. Setup your [**ClearML Server**](https://github.com/allegroai/clearml-server) or use the [Free tier Hosting](https://app.community.clear.ml)
|
||||
2. Install the CLI on your laptop `clearml` and `clearml-serving`
|
||||
- `pip3 install https://github.com/allegroai/clearml-serving.git@dev`
|
||||
- Make sure to configure your machine to connect to your `clearml-server` see [clearml-init](https://clear.ml/docs/latest/docs/getting_started/ds/ds_first_steps#install-clearml) for details
|
||||
3. Create the Serving Service Controller
|
||||
- `clearml-serving create --name "serving example"`
|
||||
- The new serving service UID should be printed `"New Serving Service created: id=aa11bb22aa11bb22`
|
||||
4. Write down the Serving Service UID
|
||||
|
||||
### :point_right: Toy model (scikit learn) deployment example
|
||||
|
||||
1. Train toy scikit-learn model
|
||||
@ -123,61 +172,6 @@ Features:
|
||||
|
||||
> To learn more on training models and the ClearML model repository, see the [ClearML documentation](https://clear.ml/docs)
|
||||
|
||||
|
||||
### :muscle: Nvidia Triton serving engine setup
|
||||
|
||||
Nvidia Triton Serving Engine is used by clearml-serving to do the heavy lifting of deep-learning models on both GPU & CPU nodes.
|
||||
Inside the Triton container a clearml controller is spinning and monitoring the Triton server.
|
||||
All the triton models are automatically downloaded into the triton container in real-time, configured, and served.
|
||||
A single Triton serving container is serving multiple models, based on the registered models on the Serving Service
|
||||
Communication from the Inference container to the Triton container is done transparently over compressed gRPC channel.
|
||||
|
||||
#### setup
|
||||
|
||||
Optional: build the Triton container
|
||||
- Customize container [Dockerfile](clearml_serving/engines/triton/Dockerfile)
|
||||
- Build container `docker build --tag clearml-serving-triton:latest -f clearml_serving/engines/triton/Dockerfile .`
|
||||
|
||||
Spin the triton engine container: `docker run -v ~/clearml.conf:/root/clearml.conf -p 8001:8001 -e CLEARML_SERVING_TASK_ID=<service_id> -e CLEARML_TRITON_POLL_FREQ=5 -e CLEARML_TRITON_METRIC_FREQ=1 clearml-serving-triton:latest`
|
||||
|
||||
Configure the "Serving Service" with the new Triton Engine gRPC IP:Port. Notice that when deploying on a Kubernetes cluster this should be a TCP ingest endpoint, to allow for transparent auto-scaling of the Triton Engine Containers
|
||||
|
||||
`clearml-serving --id <service_id> config --triton-grpc-server <local_ip_here>:8001`
|
||||
|
||||
Spin the inference service (this is the external RestAPI interface)
|
||||
`docker run -v ~/clearml.conf:/root/clearml.conf -p 8080:8080 -e CLEARML_SERVING_TASK_ID=<service_id> -e CLEARML_SERVING_POLL_FREQ=5 clearml-serving-inference:latest`
|
||||
|
||||
Now eny model that will register with "Triton" engine, will run the pre/post processing code on the Inference service container, and the model inference itself will be executed on the Triton Engine container.
|
||||
See Tensorflow [example](examples/keras/readme.md) and Pytorch [example](examples/pytorch/readme.md) for further details.
|
||||
|
||||
|
||||
### :ocean: Container Configuration Variables
|
||||
|
||||
When spinning the Inference container or the Triton Engine container,
|
||||
we need to specify the `clearml-server` address and access credentials
|
||||
One way of achieving that is by mounting the `clearml.conf` file into the container's HOME folder (i.e. `-v ~/clearml.conf:/root/clearml.conf`)
|
||||
We can also pass environment variables instead (see [details](https://clear.ml/docs/latest/docs/configs/env_vars#server-connection):
|
||||
```bash
|
||||
CLEARML_API_HOST="https://api.clear.ml"
|
||||
CLEARML_WEB_HOST="https://app.clear.ml"
|
||||
CLEARML_FILES_HOST="https://files.clear.ml"
|
||||
CLEARML_API_ACCESS_KEY="access_key_here"
|
||||
CLEARML_API_SECRET_KEY="secret_key_here"
|
||||
```
|
||||
|
||||
To access models stored on an S3 buckets, Google Storage or Azure blob storage (notice that with GS you also need to make sure the access json is available inside the containers). See further details on configuring the storage access [here](https://clear.ml/docs/latest/docs/integrations/storage#configuring-storage)
|
||||
|
||||
```bash
|
||||
AWS_ACCESS_KEY_ID
|
||||
AWS_SECRET_ACCESS_KEY
|
||||
AWS_DEFAULT_REGION
|
||||
|
||||
GOOGLE_APPLICATION_CREDENTIALS
|
||||
|
||||
AZURE_STORAGE_ACCOUNT
|
||||
AZURE_STORAGE_KEY
|
||||
```
|
||||
|
||||
### :turtle: Registering & Deploying new models manually
|
||||
|
||||
Uploading an existing model file into the model repository can be done via the `clearml` RestAPI, the python interface, or with the `clearml-serving` CLI
|
||||
@ -200,7 +194,7 @@ Uploading an existing model file into the model repository can be done via the `
|
||||
|
||||
The clearml Serving Service support automatic model deployment and upgrades, directly connected with the model repository and API. When the model auto-deploy is configured, a new model versions will be automatically deployed when you "publish" or "tag" a new model in the `clearml` model repository. This automation interface allows for simpler CI/CD model deployment process, as a single API automatically deploy (or remove) a model from the Serving Service.
|
||||
|
||||
#### automatic model deployment example
|
||||
#### Automatic model deployment example
|
||||
|
||||
1. Configure the model auto-update on the Serving Service
|
||||
- `clearml-serving --id <service_id> model auto-update --engine sklearn --endpoint "test_model_sklearn_auto" --preprocess "preprocess.py" --name "train sklearn model" --project "serving examples" --max-versions 2`
|
||||
@ -248,6 +242,42 @@ Example:
|
||||
- `curl -X POST "http://127.0.0.1:8080/serve/test_model" -H "accept: application/json" -H "Content-Type: application/json" -d '{"x0": 1, "x1": 2}'`
|
||||
|
||||
|
||||
### Model monitoring and performance metrics
|
||||
|
||||
ClearML serving instances send serving statistics (count/latency) automatically to Prometheus and Grafana can be used
|
||||
to visualize and create live dashboards.
|
||||
|
||||
The default docker-compose installation is preconfigured with Prometheus and Grafana, do notice that by default data/ate of both containers is *not* persistent. To add persistence we do recommend adding a volume mount.
|
||||
|
||||
You can also add many custom metrics on the input/predictions of your models.
|
||||
Once a model endpoint is registered, adding custom metric can be done using the CLI.
|
||||
For example, assume we have our mock scikit-learn model deployed on endpoint `test_model_sklearn`,
|
||||
we can log the requests inputs and outputs (see examples/sklearn/preprocess.py example):
|
||||
```bash
|
||||
clearml-serving --id <serving_service_id_here> metrics add --endpoint test_model_sklearn --variable-scalar
|
||||
x0=0,0.1,0.5,1,10 x1=0,0.1,0.5,1,10 y=0,0.1,0.5,0.75,1
|
||||
```
|
||||
|
||||
This will create a distribution histogram (buckets specified via a list of less-equal values after `=` sign),
|
||||
that we will be able to visualize on Grafana.
|
||||
Notice we can also log time-series values with `--variable-value x2` or discrete results (e.g. classifications strings) with `--variable-enum animal=cat,dog,sheep`.
|
||||
Additional custom variables can be in the preprocess and postprocess with a call to `collect_custom_statistics_fn({'new_var': 1.337})` see clearml_serving/preprocess/preprocess_template.py
|
||||
|
||||
With the new metrics logged we can create a visualization dashboard over the latency of the calls, and the output distribution.
|
||||
|
||||
Grafana model performance example:
|
||||
|
||||
- browse to http://localhost:3000
|
||||
- login with: admin/admin
|
||||
- create a new dashboard
|
||||
- select Prometheus as data source
|
||||
- Add a query: `100 * delta(test_model_sklearn:_latency_bucket[1m]) / delta(test_model_sklearn:_latency_sum[1m])`
|
||||
- Change type to heatmap, and select on the right hand-side under "Data Format" select "Time series buckets"
|
||||
- You now have the latency distribution, over time.
|
||||
- Repeat the same process for x0, the query would be `100 * delta(test_model_sklearn:x0_bucket[1m]) / delta(test_model_sklearn:x0_sum[1m])`
|
||||
|
||||
> **Notice**: If not specified all serving requests will be logged, to change the default configure "CLEARML_DEFAULT_METRIC_LOG_FREQ", for example CLEARML_DEFAULT_METRIC_LOG_FREQ=0.2 means only 20% of all requests will be logged. You can also specify per endpoint log frequency with the `clearml-serving` CLI. Check the CLI documentation with `cleamrl-serving metrics --help`
|
||||
|
||||
### :fire: Model Serving Examples
|
||||
|
||||
- Scikit-Learn [example](examples/sklearn/readme.md) - random data
|
||||
@ -274,8 +304,9 @@ Example:
|
||||
- [x] CLI configuration tool
|
||||
- [x] Nvidia Triton integration
|
||||
- [x] GZip request compression
|
||||
- [ ] TorchServe engine integration
|
||||
- [ ] Prebuilt Docker containers (dockerhub)
|
||||
- [x] TorchServe engine integration
|
||||
- [x] Prebuilt Docker containers (dockerhub)
|
||||
- [x] Docker-compose deployment (CPU/GPU)
|
||||
- [x] Scikit-Learn example
|
||||
- [x] XGBoost example
|
||||
- [x] LightGBM example
|
||||
@ -283,10 +314,10 @@ Example:
|
||||
- [x] TensorFlow/Keras example
|
||||
- [x] Model ensemble example
|
||||
- [x] Model pipeline example
|
||||
- [ ] Statistics Service
|
||||
- [ ] Kafka install instructions
|
||||
- [ ] Prometheus install instructions
|
||||
- [ ] Grafana install instructions
|
||||
- [x] Statistics Service
|
||||
- [x] Kafka install instructions
|
||||
- [x] Prometheus install instructions
|
||||
- [x] Grafana install instructions
|
||||
- [ ] Kubernetes Helm Chart
|
||||
|
||||
## Contributing
|
||||
|
@ -4,11 +4,78 @@ from argparse import ArgumentParser
|
||||
from pathlib import Path
|
||||
|
||||
from clearml_serving.serving.model_request_processor import ModelRequestProcessor, CanaryEP
|
||||
from clearml_serving.serving.endpoints import ModelMonitoring, ModelEndpoint
|
||||
from clearml_serving.serving.endpoints import ModelMonitoring, ModelEndpoint, EndpointMetricLogging
|
||||
|
||||
verbosity = False
|
||||
|
||||
|
||||
def func_metric_ls(args):
|
||||
request_processor = ModelRequestProcessor(task_id=args.id)
|
||||
print("List endpoint metrics, control task id={}".format(request_processor.get_id()))
|
||||
request_processor.deserialize(skip_sync=True)
|
||||
print("Logged Metrics:\n{}".format(
|
||||
json.dumps({k: v.as_dict() for k, v in request_processor.list_metric_logging().items()}, indent=2)))
|
||||
|
||||
|
||||
def func_metric_rm(args):
|
||||
request_processor = ModelRequestProcessor(task_id=args.id)
|
||||
print("Serving service Task {}, Removing metrics from endpoint={}".format(
|
||||
request_processor.get_id(), args.endpoint))
|
||||
request_processor.deserialize(skip_sync=True)
|
||||
for v in (args.variable or []):
|
||||
if request_processor.remove_metric_logging(endpoint=args.endpoint, variable_name=v):
|
||||
print("Removing static endpoint: {}".format(args.endpoint))
|
||||
else:
|
||||
raise ValueError("Could not remove {} from endpoin {}".format(v, args.endpoint))
|
||||
print("Updating serving service")
|
||||
request_processor.serialize()
|
||||
|
||||
|
||||
def func_metric_add(args):
|
||||
request_processor = ModelRequestProcessor(task_id=args.id)
|
||||
print("Serving service Task {}, Adding metric logging endpoint \'/{}/\'".format(
|
||||
request_processor.get_id(), args.endpoint))
|
||||
request_processor.deserialize(skip_sync=True)
|
||||
metric = EndpointMetricLogging(endpoint=args.endpoint)
|
||||
if args.log_freq is not None:
|
||||
metric.log_frequency = float(args.log_freq)
|
||||
for v in (args.variable_scalar or []):
|
||||
if '=' not in v:
|
||||
raise ValueError("Variable '{}' should be in the form of <name>=<buckets> "
|
||||
"example: x1=0,1,2,3,4,5".format(v))
|
||||
name, buckets = v.split('=', 1)
|
||||
if name in metric.metrics:
|
||||
print("Warning: {} defined twice".format(name))
|
||||
if '/' in buckets:
|
||||
b_min, b_max, b_step = [float(b.strip()) for b in buckets.split('/', 2)]
|
||||
buckets = list(range(b_min, b_max, b_step))
|
||||
else:
|
||||
buckets = [float(b.strip()) for b in buckets.split(',')]
|
||||
metric.metrics[name] = dict(type="scalar", buckets=buckets)
|
||||
|
||||
for v in (args.variable_enum or []):
|
||||
if '=' not in v:
|
||||
raise ValueError("Variable '{}' should be in the form of <name>=<buckets> "
|
||||
"example: x1=cat,dog,sheep".format(v))
|
||||
name, buckets = v.split('=', 1)
|
||||
if name in metric.metrics:
|
||||
print("Warning: {} defined twice".format(name))
|
||||
buckets = [str(b.strip()) for b in buckets.split(',')]
|
||||
metric.metrics[name] = dict(type="enum", buckets=buckets)
|
||||
|
||||
for v in (args.variable_value or []):
|
||||
name = v.strip()
|
||||
if name in metric.metrics:
|
||||
print("Warning: {} defined twice".format(name))
|
||||
metric.metrics[name] = dict(type="variable", buckets=None)
|
||||
|
||||
if not request_processor.add_metric_logging(metric=metric):
|
||||
raise ValueError("Could not add metric logging endpoint {}".format(args.endpoint))
|
||||
|
||||
print("Updating serving service")
|
||||
request_processor.serialize()
|
||||
|
||||
|
||||
def func_model_upload(args):
|
||||
if not args.path and not args.url:
|
||||
raise ValueError("Either --path or --url must be specified")
|
||||
@ -46,9 +113,12 @@ def func_model_ls(args):
|
||||
request_processor = ModelRequestProcessor(task_id=args.id)
|
||||
print("List model serving and endpoints, control task id={}".format(request_processor.get_id()))
|
||||
request_processor.deserialize(skip_sync=True)
|
||||
print("Endpoints:\n{}".format(json.dumps(request_processor.get_endpoints(), indent=2)))
|
||||
print("Model Monitoring:\n{}".format(json.dumps(request_processor.get_model_monitoring(), indent=2)))
|
||||
print("Canary:\n{}".format(json.dumps(request_processor.get_canary_endpoints(), indent=2)))
|
||||
print("Endpoints:\n{}".format(
|
||||
json.dumps({k: v.as_dict() for k, v in request_processor.get_endpoints().items()}, indent=2)))
|
||||
print("Model Monitoring:\n{}".format(
|
||||
json.dumps({k: v.as_dict() for k, v in request_processor.get_model_monitoring().items()}, indent=2)))
|
||||
print("Canary:\n{}".format(
|
||||
json.dumps({k: v.as_dict() for k, v in request_processor.get_canary_endpoints().items()}, indent=2)))
|
||||
|
||||
|
||||
def func_create_service(args):
|
||||
@ -69,6 +139,10 @@ def func_config_service(args):
|
||||
print("Configuring serving service [id={}] triton_grpc_server={}".format(
|
||||
request_processor.get_id(), args.triton_grpc_server))
|
||||
request_processor.configure(external_triton_grpc_server=args.triton_grpc_server)
|
||||
if args.kafka_metric_server:
|
||||
request_processor.configure(external_kafka_service_server=args.kafka_metric_server)
|
||||
if args.metric_log_freq is not None:
|
||||
pass
|
||||
|
||||
|
||||
def func_list_services(_):
|
||||
@ -224,6 +298,47 @@ def cli():
|
||||
help='[Optional] Specify project for the serving service. Default: DevOps')
|
||||
parser_create.set_defaults(func=func_create_service)
|
||||
|
||||
parser_metrics = subparsers.add_parser('metrics', help='Configure inference metrics Service')
|
||||
parser_metrics.set_defaults(func=parser_metrics.print_help)
|
||||
|
||||
metric_cmd = parser_metrics.add_subparsers(help='model metric command help')
|
||||
|
||||
parser_metrics_add = metric_cmd.add_parser('add', help='Add/modify metric for a specific endpoint')
|
||||
parser_metrics_add.add_argument(
|
||||
'--endpoint', type=str, required=True,
|
||||
help='metric endpoint name including version, e.g. "model/1" or a prefix "model/*" '
|
||||
'Notice: it will override any previous endpoint logged metrics')
|
||||
parser_metrics_add.add_argument(
|
||||
'--log-freq', type=float,
|
||||
help='Optional: logging request frequency, between 0.0 to 1.0 '
|
||||
'example: 1.0 means all requests are logged, 0.5 means half of the requests are logged '
|
||||
'if not specified, use global logging frequency, see `config --metric-log-freq`')
|
||||
parser_metrics_add.add_argument(
|
||||
'--variable-scalar', type=str, nargs='+',
|
||||
help='Add float (scalar) argument to the metric logger, '
|
||||
'<name>=<histogram> example with specific buckets: "x1=0,0.2,0.4,0.6,0.8,1" or '
|
||||
'with min/max/num_buckets "x1=0.0/1.0/5"')
|
||||
parser_metrics_add.add_argument(
|
||||
'--variable-enum', type=str, nargs='+',
|
||||
help='Add enum (string) argument to the metric logger, '
|
||||
'<name>=<optional_values> example: "detect=cat,dog,sheep"')
|
||||
parser_metrics_add.add_argument(
|
||||
'--variable-value', type=str, nargs='+',
|
||||
help='Add non-samples scalar argument to the metric logger, '
|
||||
'<name> example: "latency"')
|
||||
parser_metrics_add.set_defaults(func=func_metric_add)
|
||||
|
||||
parser_metrics_rm = metric_cmd.add_parser('remove', help='Remove metric from a specific endpoint')
|
||||
parser_metrics_rm.add_argument(
|
||||
'--endpoint', type=str, help='metric endpoint name including version, e.g. "model/1" or a prefix "model/*"')
|
||||
parser_metrics_rm.add_argument(
|
||||
'--variable', type=str, nargs='+',
|
||||
help='Remove (scalar/enum) argument from the metric logger, <name> example: "x1"')
|
||||
parser_metrics_rm.set_defaults(func=func_metric_rm)
|
||||
|
||||
parser_metrics_ls = metric_cmd.add_parser('list', help='list metrics logged on all endpoints')
|
||||
parser_metrics_ls.set_defaults(func=func_metric_ls)
|
||||
|
||||
parser_config = subparsers.add_parser('config', help='Configure a new Serving Service')
|
||||
parser_config.add_argument(
|
||||
'--base-serving-url', type=str,
|
||||
@ -231,6 +346,12 @@ def cli():
|
||||
parser_config.add_argument(
|
||||
'--triton-grpc-server', type=str,
|
||||
help='External ClearML-Triton serving container gRPC address. example: 127.0.0.1:9001')
|
||||
parser_config.add_argument(
|
||||
'--kafka-metric-server', type=str,
|
||||
help='External Kafka service url. example: 127.0.0.1:9092')
|
||||
parser_config.add_argument(
|
||||
'--metric-log-freq', type=float,
|
||||
help='Set default metric logging frequency. 1.0 is 100% of all requests are logged')
|
||||
parser_config.set_defaults(func=func_config_service)
|
||||
|
||||
parser_model = subparsers.add_parser('model', help='Configure Model endpoints for an already running Service')
|
||||
@ -273,7 +394,7 @@ def cli():
|
||||
'https://domain/model.bin)')
|
||||
parser_model_upload.add_argument(
|
||||
'--destination', type=str,
|
||||
help='Optional, Specifying the target destination for the model to be uploaded'
|
||||
help='Optional, Specifying the target destination for the model to be uploaded '
|
||||
'(e.g. s3://bucket/folder/, gs://bucket/folder/, azure://bucket/folder/)')
|
||||
parser_model_upload.set_defaults(func=func_model_upload)
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
clearml >= 1.1.6
|
||||
clearml >= 1.3.1
|
||||
clearml-serving
|
||||
tritonclient[grpc]
|
||||
tritonclient[grpc]>=2.18.0,<2.19
|
||||
grpcio
|
||||
Pillow
|
||||
Pillow>=9.0.1,<10
|
||||
pathlib2
|
@ -10,7 +10,7 @@ import numpy as np
|
||||
from clearml import Task, Logger, InputModel
|
||||
from clearml.backend_api.utils import get_http_session_with_retry
|
||||
from clearml.utilities.pyhocon import ConfigFactory, ConfigTree, HOCONConverter
|
||||
from pathlib2 import Path
|
||||
from pathlib import Path
|
||||
|
||||
from clearml_serving.serving.endpoints import ModelEndpoint
|
||||
from clearml_serving.serving.model_request_processor import ModelRequestProcessor
|
||||
@ -413,10 +413,10 @@ def main():
|
||||
'--serving-id', default=os.environ.get('CLEARML_SERVING_TASK_ID'), type=str,
|
||||
help='Specify main serving service Task ID')
|
||||
parser.add_argument(
|
||||
'--project', default='serving', type=str,
|
||||
'--project', default=None, type=str,
|
||||
help='Optional specify project for the serving engine Task')
|
||||
parser.add_argument(
|
||||
'--name', default='nvidia-triton', type=str,
|
||||
'--name', default='triton engine', type=str,
|
||||
help='Optional specify task name for the serving engine Task')
|
||||
parser.add_argument(
|
||||
'--update-frequency', default=os.environ.get('CLEARML_TRITON_POLL_FREQ') or 10., type=float,
|
||||
@ -481,8 +481,13 @@ def main():
|
||||
t = type(getattr(args, args_var, None))
|
||||
setattr(args, args_var, type(t)(v) if t is not None else v)
|
||||
|
||||
# noinspection PyProtectedMember
|
||||
serving_task = ModelRequestProcessor._get_control_plane_task(task_id=args.inference_task_id)
|
||||
|
||||
task = Task.init(
|
||||
project_name=args.project, task_name=args.name, task_type=Task.TaskTypes.inference,
|
||||
project_name=args.project or serving_task.get_project_name() or "serving",
|
||||
task_name="{} - {}".format(serving_task.name, args.name),
|
||||
task_type=Task.TaskTypes.inference,
|
||||
continue_last_task=args.inference_task_id or None
|
||||
)
|
||||
print("configuration args: {}".format(args))
|
||||
|
@ -1,14 +1,14 @@
|
||||
from typing import Any, Optional
|
||||
from typing import Any, Optional, List, Callable
|
||||
|
||||
|
||||
# Notice Preprocess class Must be named "Preprocess"
|
||||
# Otherwise there are No limitations, No need to inherit or to implement all methods
|
||||
# Preprocess class Must be named "Preprocess"
|
||||
# No need to inherit or to implement all methods
|
||||
class Preprocess(object):
|
||||
serving_config = None
|
||||
# example: {
|
||||
# 'base_serving_url': 'http://127.0.0.1:8080/serve/',
|
||||
# 'triton_grpc_server': '127.0.0.1:9001',
|
||||
# }"
|
||||
"""
|
||||
Preprocess class Must be named "Preprocess"
|
||||
Otherwise there are No limitations, No need to inherit or to implement all methods
|
||||
Notice! This is not thread safe! the same instance may be accessed from multiple threads simultaneously
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# set internal state, this will be called only once. (i.e. not per request)
|
||||
@ -16,31 +16,66 @@ class Preprocess(object):
|
||||
|
||||
def load(self, local_file_name: str) -> Optional[Any]: # noqa
|
||||
"""
|
||||
Optional, provide loading method for the model
|
||||
Optional: provide loading method for the model
|
||||
useful if we need to load a model in a specific way for the prediction engine to work
|
||||
:param local_file_name: file name / path to read load the model from
|
||||
:return: Object that will be called with .predict() method for inference
|
||||
"""
|
||||
pass
|
||||
|
||||
def preprocess(self, body: dict) -> Any: # noqa
|
||||
def preprocess(self, body: dict, collect_custom_statistics_fn: Optional[Callable[[dict], None]]) -> Any: # noqa
|
||||
"""
|
||||
do something with the request data, return any type of object.
|
||||
Optional: do something with the request data, return any type of object.
|
||||
The returned object will be passed as is to the inference engine
|
||||
|
||||
:param body: dictionary as recieved from the RestAPI
|
||||
:param collect_custom_statistics_fn: Optional, if provided allows to send a custom set of key/values
|
||||
to the statictics collector servicd.
|
||||
None is passed if statiscs collector is not configured, or if the current request should not be collected
|
||||
|
||||
Usage example:
|
||||
>>> print(body)
|
||||
{"x0": 1, "x1": 2}
|
||||
>>> if collect_custom_statistics_fn:
|
||||
>>> collect_custom_statistics_fn({"x0": 1, "x1": 2})
|
||||
|
||||
:return: Object to be passed directly to the model inference
|
||||
"""
|
||||
return body
|
||||
|
||||
def postprocess(self, data: Any) -> dict: # noqa
|
||||
def postprocess(self, data: Any, collect_custom_statistics_fn: Optional[Callable[[dict], None]]) -> dict: # noqa
|
||||
"""
|
||||
post process the data returned from the model inference engine
|
||||
Optional: post process the data returned from the model inference engine
|
||||
returned dict will be passed back as the request result as is.
|
||||
|
||||
:param data: object as recieved from the inference model function
|
||||
:param collect_custom_statistics_fn: Optional, if provided allows to send a custom set of key/values
|
||||
to the statictics collector servicd.
|
||||
None is passed if statiscs collector is not configured, or if the current request should not be collected
|
||||
|
||||
Usage example:
|
||||
>>> if collect_custom_statistics_fn:
|
||||
>>> collect_custom_statistics_fn({"y": 1})
|
||||
|
||||
:return: Dictionary passed directly as the returned result of the RestAPI
|
||||
"""
|
||||
return data
|
||||
|
||||
def process(self, data: Any) -> Any: # noqa
|
||||
def process(self, data: Any, collect_custom_statistics_fn: Optional[Callable[[dict], None]]) -> Any: # noqa
|
||||
"""
|
||||
do something with the actual data, return any type of object.
|
||||
Optional: do something with the actual data, return any type of object.
|
||||
The returned object will be passed as is to the postprocess function engine
|
||||
|
||||
:param data: object as recieved from the preprocessing function
|
||||
:param collect_custom_statistics_fn: Optional, if provided allows to send a custom set of key/values
|
||||
to the statictics collector servicd.
|
||||
None is passed if statiscs collector is not configured, or if the current request should not be collected
|
||||
|
||||
Usage example:
|
||||
>>> if collect_custom_statistics_fn:
|
||||
>>> collect_custom_statistics_fn({"type": "classification"})
|
||||
|
||||
:return: Object to be passed tp the post-processing function
|
||||
"""
|
||||
return data
|
||||
|
||||
@ -63,4 +98,4 @@ class Preprocess(object):
|
||||
>>> result = self.send_request(endpoint="test_model_sklearn", version="1", data={"x0": x0, "x1": x1})
|
||||
>>> y = result["y"]
|
||||
"""
|
||||
return None
|
||||
pass
|
||||
|
@ -1,5 +1,5 @@
|
||||
import numpy as np
|
||||
from attr import attrib, attrs, asdict
|
||||
from attr import attrib, attrs, asdict, validators
|
||||
|
||||
|
||||
def _engine_validator(inst, attr, value): # noqa
|
||||
@ -14,7 +14,15 @@ def _matrix_type_validator(inst, attr, value): # noqa
|
||||
|
||||
|
||||
@attrs
|
||||
class ModelMonitoring(object):
|
||||
class BaseStruct(object):
|
||||
def as_dict(self, remove_null_entries=False):
|
||||
if not remove_null_entries:
|
||||
return asdict(self)
|
||||
return {k: v for k, v in asdict(self).items() if v is not None}
|
||||
|
||||
|
||||
@attrs
|
||||
class ModelMonitoring(BaseStruct):
|
||||
base_serving_url = attrib(type=str) # serving point url prefix (example: "detect_cat")
|
||||
engine_type = attrib(type=str, validator=_engine_validator) # engine type
|
||||
monitor_project = attrib(type=str, default=None) # monitor model project (for model auto update)
|
||||
@ -32,14 +40,9 @@ class ModelMonitoring(object):
|
||||
type=str, default=None) # optional artifact name storing the model preprocessing code
|
||||
auxiliary_cfg = attrib(type=dict, default=None) # Auxiliary configuration (e.g. triton conf), Union[str, dict]
|
||||
|
||||
def as_dict(self, remove_null_entries=False):
|
||||
if not remove_null_entries:
|
||||
return asdict(self)
|
||||
return {k: v for k, v in asdict(self).items() if v is not None}
|
||||
|
||||
|
||||
@attrs
|
||||
class ModelEndpoint(object):
|
||||
class ModelEndpoint(BaseStruct):
|
||||
engine_type = attrib(type=str, validator=_engine_validator) # engine type
|
||||
serving_url = attrib(type=str) # full serving point url (including version) example: "detect_cat/v1"
|
||||
model_id = attrib(type=str, default=None) # model ID to serve (and download)
|
||||
@ -54,14 +57,9 @@ class ModelEndpoint(object):
|
||||
output_name = attrib(type=str, default=None) # optional, layer name to pull the results from
|
||||
auxiliary_cfg = attrib(type=dict, default=None) # Optional: Auxiliary configuration (e.g. triton conf), [str, dict]
|
||||
|
||||
def as_dict(self, remove_null_entries=False):
|
||||
if not remove_null_entries:
|
||||
return asdict(self)
|
||||
return {k: v for k, v in asdict(self).items() if v is not None}
|
||||
|
||||
|
||||
@attrs
|
||||
class CanaryEP(object):
|
||||
class CanaryEP(BaseStruct):
|
||||
endpoint = attrib(type=str) # load balancer endpoint
|
||||
weights = attrib(type=list) # list of weights (order should be matching fixed_endpoints or prefix)
|
||||
load_endpoints = attrib(type=list, default=[]) # list of endpoints to balance and route
|
||||
@ -69,7 +67,34 @@ class CanaryEP(object):
|
||||
type=str, default=None) # endpoint prefix to list
|
||||
# (any endpoint starting with this prefix will be listed, sorted lexicographically, or broken into /<int>)
|
||||
|
||||
|
||||
@attrs
|
||||
class EndpointMetricLogging(BaseStruct):
|
||||
@attrs
|
||||
class MetricType(BaseStruct):
|
||||
type = attrib(type=str, validator=validators.in_(("scalar", "enum", "value", "counter")))
|
||||
buckets = attrib(type=list, default=None)
|
||||
|
||||
endpoint = attrib(type=str) # Specific endpoint to log metrics w/ version (example: "model/1")
|
||||
# If endpoint name ends with a "*" any endpoint with a matching prefix will be selected
|
||||
|
||||
log_frequency = attrib(type=float, default=None) # Specific endpoint to log frequency
|
||||
# (0.0 to 1.0, where 1.0 is 100% of all requests are logged)
|
||||
|
||||
metrics = attrib(
|
||||
type=dict, default={},
|
||||
converter=lambda x: {k: v if isinstance(v, EndpointMetricLogging.MetricType)
|
||||
else EndpointMetricLogging.MetricType(**v) for k, v in x.items()}) # key=variable, value=MetricType)
|
||||
# example:
|
||||
# {"x1": dict(type="scalar", buckets=[0,1,2,3]),
|
||||
# "y": dict(type="enum", buckets=["cat", "dog"]).
|
||||
# "latency": dict(type="value", buckets=[]).
|
||||
# }
|
||||
|
||||
def as_dict(self, remove_null_entries=False):
|
||||
if not remove_null_entries:
|
||||
return asdict(self)
|
||||
return {k: v for k, v in asdict(self).items() if v is not None}
|
||||
return {k: v.as_dict(remove_null_entries) if isinstance(v, BaseStruct) else v
|
||||
for k, v in asdict(self).items()}
|
||||
|
||||
return {k: v.as_dict(remove_null_entries) if isinstance(v, BaseStruct) else v
|
||||
for k, v in asdict(self).items() if v is not None}
|
||||
|
@ -3,26 +3,53 @@
|
||||
# print configuration
|
||||
echo CLEARML_SERVING_TASK_ID="$CLEARML_SERVING_TASK_ID"
|
||||
echo CLEARML_SERVING_PORT="$CLEARML_SERVING_PORT"
|
||||
echo CLEARML_USE_GUNICORN="$CLEARML_USE_GUNICORN"
|
||||
echo EXTRA_PYTHON_PACKAGES="$EXTRA_PYTHON_PACKAGES"
|
||||
echo CLEARML_SERVING_NUM_PROCESS="$CLEARML_SERVING_NUM_PROCESS"
|
||||
echo CLEARML_SERVING_POLL_FREQ="$CLEARML_SERVING_POLL_FREQ"
|
||||
echo CLEARML_DEFAULT_KAFKA_SERVE_URL="$CLEARML_DEFAULT_KAFKA_SERVE_URL"
|
||||
echo CLEARML_DEFAULT_KAFKA_SERVE_URL="$CLEARML_DEFAULT_KAFKA_SERVE_URL"
|
||||
|
||||
SERVING_PORT="${CLEARML_SERVING_PORT:-8080}"
|
||||
GUNICORN_NUM_PROCESS="${CLEARML_SERVING_NUM_PROCESS:-4}"
|
||||
GUNICORN_SERVING_PORT="${CLEARML_SERVING_PORT:-8080}"
|
||||
GUNICORN_SERVING_TIMEOUT="${GUNICORN_SERVING_TIMEOUT:-600}"
|
||||
UVICORN_SERVE_LOOP="${UVICORN_SERVE_LOOP:-asyncio}"
|
||||
|
||||
# set default internal serve endpoint (for request pipelining)
|
||||
CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/serve}"
|
||||
CLEARML_DEFAULT_TRITON_GRPC_ADDR="${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-127.0.0.1:8001}"
|
||||
|
||||
# print configuration
|
||||
echo WEB_CONCURRENCY="$WEB_CONCURRENCY"
|
||||
echo SERVING_PORT="$SERVING_PORT"
|
||||
echo GUNICORN_NUM_PROCESS="$GUNICORN_NUM_PROCESS"
|
||||
echo GUNICORN_SERVING_PORT="$GUNICORN_SERVING_PORT"
|
||||
|
||||
# we should also have clearml-server configurations
|
||||
echo GUNICORN_SERVING_TIMEOUT="$GUNICORN_SERVING_PORT"
|
||||
echo GUNICORN_EXTRA_ARGS="$GUNICORN_EXTRA_ARGS"
|
||||
echo UVICORN_SERVE_LOOP="$UVICORN_SERVE_LOOP"
|
||||
echo UVICORN_EXTRA_ARGS="$UVICORN_EXTRA_ARGS"
|
||||
echo CLEARML_DEFAULT_BASE_SERVE_URL="$CLEARML_DEFAULT_BASE_SERVE_URL"
|
||||
echo CLEARML_DEFAULT_TRITON_GRPC_ADDR="$CLEARML_DEFAULT_TRITON_GRPC_ADDR"
|
||||
|
||||
# runtime add extra python packages
|
||||
if [ ! -z "$EXTRA_PYTHON_PACKAGES" ]
|
||||
then
|
||||
python3 -m pip install $EXTRA_PYTHON_PACKAGES
|
||||
fi
|
||||
|
||||
# start service
|
||||
PYTHONPATH=$(pwd) python3 -m gunicorn \
|
||||
--preload clearml_serving.serving.main:app \
|
||||
--workers $GUNICORN_NUM_PROCESS \
|
||||
--worker-class uvicorn.workers.UvicornWorker \
|
||||
--bind 0.0.0.0:$GUNICORN_SERVING_PORT
|
||||
if [ -z "$CLEARML_USE_GUNICORN" ]
|
||||
then
|
||||
echo "Starting Uvicorn server"
|
||||
PYTHONPATH=$(pwd) python3 -m uvicorn \
|
||||
clearml_serving.serving.main:app --host 0.0.0.0 --port $SERVING_PORT --loop $UVICORN_SERVE_LOOP \
|
||||
$UVICORN_EXTRA_ARGS
|
||||
else
|
||||
echo "Starting Gunicorn server"
|
||||
# start service
|
||||
PYTHONPATH=$(pwd) python3 -m gunicorn \
|
||||
--preload clearml_serving.serving.main:app \
|
||||
--workers $GUNICORN_NUM_PROCESS \
|
||||
--worker-class uvicorn.workers.UvicornWorker \
|
||||
--timeout $GUNICORN_SERVING_TIMEOUT \
|
||||
--bind 0.0.0.0:$SERVING_PORT \
|
||||
$GUNICORN_EXTRA_ARGS
|
||||
fi
|
||||
|
@ -47,12 +47,16 @@ except (ValueError, TypeError):
|
||||
# get the serving controller task
|
||||
# noinspection PyProtectedMember
|
||||
serving_task = ModelRequestProcessor._get_control_plane_task(task_id=serving_service_task_id)
|
||||
# set to running (because we are here)
|
||||
if serving_task.status != "in_progress":
|
||||
serving_task.started(force=True)
|
||||
# create a new serving instance (for visibility and monitoring)
|
||||
instance_task = Task.init(
|
||||
project_name=serving_task.get_project_name(),
|
||||
task_name="{} - serve instance".format(serving_task.name),
|
||||
task_type="inference",
|
||||
)
|
||||
instance_task.set_system_tags(["service"])
|
||||
processor = None # type: Optional[ModelRequestProcessor]
|
||||
# preload modules into memory before forking
|
||||
BasePreprocessRequest.load_modules()
|
||||
|
@ -1,7 +1,9 @@
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from time import sleep
|
||||
from queue import Queue
|
||||
from random import random
|
||||
from time import sleep, time
|
||||
from typing import Optional, Union, Dict, List
|
||||
import itertools
|
||||
import threading
|
||||
@ -11,7 +13,7 @@ from numpy.random import choice
|
||||
from clearml import Task, Model
|
||||
from clearml.storage.util import hash_dict
|
||||
from .preprocess_service import BasePreprocessRequest
|
||||
from .endpoints import ModelEndpoint, ModelMonitoring, CanaryEP
|
||||
from .endpoints import ModelEndpoint, ModelMonitoring, CanaryEP, EndpointMetricLogging
|
||||
|
||||
|
||||
class FastWriteCounter(object):
|
||||
@ -30,7 +32,12 @@ class FastWriteCounter(object):
|
||||
|
||||
|
||||
class ModelRequestProcessor(object):
|
||||
_system_tag = 'serving-control-plane'
|
||||
_system_tag = "serving-control-plane"
|
||||
_kafka_topic = "clearml_inference_stats"
|
||||
_config_key_serving_base_url = "serving_base_url"
|
||||
_config_key_triton_grpc = "triton_grpc_server"
|
||||
_config_key_kafka_stats = "kafka_service_server"
|
||||
_config_key_def_metric_freq = "metric_logging_freq"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@ -60,15 +67,24 @@ class ModelRequestProcessor(object):
|
||||
self._canary_endpoints = dict() # type: Dict[str, CanaryEP]
|
||||
self._canary_route = dict() # type: Dict[str, dict]
|
||||
self._engine_processor_lookup = dict() # type: Dict[str, BasePreprocessRequest]
|
||||
self._metric_logging = dict() # type: Dict[str, EndpointMetricLogging]
|
||||
self._endpoint_metric_logging = dict() # type: Dict[str, EndpointMetricLogging]
|
||||
self._last_update_hash = None
|
||||
self._sync_daemon_thread = None
|
||||
self._stats_sending_thread = None
|
||||
self._stats_queue = Queue()
|
||||
# this is used for Fast locking mechanisms (so we do not actually need to use Locks)
|
||||
self._update_lock_flag = False
|
||||
self._request_processing_state = FastWriteCounter()
|
||||
self._update_lock_guard = update_lock_guard or threading.Lock()
|
||||
self._instance_task = None
|
||||
# serving server config
|
||||
self._configuration = {}
|
||||
self._instance_task = None
|
||||
# deserialized values go here
|
||||
self._kafka_stats_url = None
|
||||
self._triton_grpc = None
|
||||
self._serving_base_url = None
|
||||
self._metric_log_freq = None
|
||||
|
||||
def process_request(self, base_url: str, version: str, request_body: dict) -> dict:
|
||||
"""
|
||||
@ -100,9 +116,7 @@ class ModelRequestProcessor(object):
|
||||
processor = self._engine_processor_lookup.get(url)
|
||||
if not processor:
|
||||
processor_cls = BasePreprocessRequest.get_engine_cls(ep.engine_type)
|
||||
processor = processor_cls(
|
||||
model_endpoint=ep, task=self._task, server_config=dict(**self._configuration)
|
||||
)
|
||||
processor = processor_cls(model_endpoint=ep, task=self._task)
|
||||
self._engine_processor_lookup[url] = processor
|
||||
|
||||
return_value = self._process_request(processor=processor, url=url, body=request_body)
|
||||
@ -124,6 +138,8 @@ class ModelRequestProcessor(object):
|
||||
self,
|
||||
external_serving_base_url: Optional[str] = None,
|
||||
external_triton_grpc_server: Optional[str] = None,
|
||||
external_kafka_service_server: Optional[str] = None,
|
||||
default_metric_log_freq: Optional[float] = None,
|
||||
):
|
||||
"""
|
||||
Set ModelRequestProcessor configuration arguments.
|
||||
@ -133,21 +149,40 @@ class ModelRequestProcessor(object):
|
||||
allowing it to concatenate and combine multiple model requests into one
|
||||
:param external_triton_grpc_server: set the external grpc tcp port of the Nvidia Triton clearml container.
|
||||
Used by the clearml triton engine class to send inference requests
|
||||
:param external_kafka_service_server: Optional, Kafka endpoint for the statistics controller collection.
|
||||
:param default_metric_log_freq: Default request metric logging (0 to 1.0, 1. means 100% of requests are logged)
|
||||
"""
|
||||
if external_serving_base_url is not None:
|
||||
self._task.set_parameter(
|
||||
name="General/serving_base_url",
|
||||
name="General/{}".format(self._config_key_serving_base_url),
|
||||
value=str(external_serving_base_url),
|
||||
value_type="str",
|
||||
description="external base http endpoint for the serving service"
|
||||
)
|
||||
if external_triton_grpc_server is not None:
|
||||
self._task.set_parameter(
|
||||
name="General/triton_grpc_server",
|
||||
name="General/{}".format(self._config_key_triton_grpc),
|
||||
value=str(external_triton_grpc_server),
|
||||
value_type="str",
|
||||
description="external grpc tcp port of the Nvidia Triton ClearML container running"
|
||||
)
|
||||
if external_kafka_service_server is not None:
|
||||
self._task.set_parameter(
|
||||
name="General/{}".format(self._config_key_kafka_stats),
|
||||
value=str(external_kafka_service_server),
|
||||
value_type="str",
|
||||
description="external Kafka service url for the statistics controller server"
|
||||
)
|
||||
if default_metric_log_freq is not None:
|
||||
self._task.set_parameter(
|
||||
name="General/{}".format(self._config_key_def_metric_freq),
|
||||
value=str(default_metric_log_freq),
|
||||
value_type="float",
|
||||
description="Request metric logging frequency"
|
||||
)
|
||||
|
||||
def get_configuration(self) -> dict:
|
||||
return dict(**self._configuration)
|
||||
|
||||
def add_endpoint(
|
||||
self,
|
||||
@ -301,13 +336,92 @@ class ModelRequestProcessor(object):
|
||||
self._canary_endpoints.pop(endpoint_url, None)
|
||||
return True
|
||||
|
||||
def deserialize(self, task: Task = None, prefetch_artifacts=False, skip_sync=False) -> bool:
|
||||
def add_metric_logging(self, metric: Union[EndpointMetricLogging, dict]) -> bool:
|
||||
"""
|
||||
Add metric logging to a specific endpoint
|
||||
Valid metric variable are any variables on the request or response dictionary,
|
||||
or a custom preprocess reported variable
|
||||
|
||||
When overwriting and existing monitored variable, output a warning.
|
||||
|
||||
:param metric: Metric variable to monitor
|
||||
:return: True if successful
|
||||
"""
|
||||
if not isinstance(metric, EndpointMetricLogging):
|
||||
metric = EndpointMetricLogging(**metric)
|
||||
|
||||
name = str(metric.endpoint).strip("/")
|
||||
metric.endpoint = name
|
||||
|
||||
if name not in self._endpoints and not name.endswith('*'):
|
||||
raise ValueError("Metric logging \'{}\' references a nonexistent endpoint".format(name))
|
||||
|
||||
if name in self._metric_logging:
|
||||
print("Warning: Metric logging \'{}\' overwritten".format(name))
|
||||
|
||||
self._metric_logging[name] = metric
|
||||
return True
|
||||
|
||||
def remove_metric_logging(
|
||||
self,
|
||||
endpoint: str,
|
||||
variable_name: str = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Remove existing logged metric variable. Use variable name and endpoint as unique identifier
|
||||
|
||||
:param endpoint: Endpoint name (including version, e.g. "model/1" or "model/*")
|
||||
:param variable_name: Variable name (str), pass None to remove the entire endpoint logging
|
||||
|
||||
:return: True if successful
|
||||
"""
|
||||
|
||||
name = str(endpoint).strip("/")
|
||||
|
||||
if name not in self._metric_logging or \
|
||||
(variable_name and variable_name not in self._metric_logging[name].metrics):
|
||||
return False
|
||||
|
||||
if not variable_name:
|
||||
self._metric_logging.pop(name, None)
|
||||
else:
|
||||
self._metric_logging[name].metrics.pop(variable_name, None)
|
||||
|
||||
return True
|
||||
|
||||
def list_metric_logging(self) -> Dict[str, EndpointMetricLogging]:
|
||||
"""
|
||||
List existing logged metric variables.
|
||||
|
||||
:return: Dictionary, key='endpoint/version' value=EndpointMetricLogging
|
||||
"""
|
||||
|
||||
return dict(**self._metric_logging)
|
||||
|
||||
def list_endpoint_logging(self) -> Dict[str, EndpointMetricLogging]:
|
||||
"""
|
||||
List endpoints (fully synced) current metric logging state.
|
||||
|
||||
:return: Dictionary, key='endpoint/version' value=EndpointMetricLogging
|
||||
"""
|
||||
|
||||
return dict(**self._endpoint_metric_logging)
|
||||
|
||||
def deserialize(
|
||||
self,
|
||||
task: Task = None,
|
||||
prefetch_artifacts: bool = False,
|
||||
skip_sync: bool = False,
|
||||
update_current_task: bool = True
|
||||
) -> bool:
|
||||
"""
|
||||
Restore ModelRequestProcessor state from Task
|
||||
return True if actually needed serialization, False nothing changed
|
||||
:param task: Load data from Task
|
||||
:param prefetch_artifacts: If True prefetch artifacts requested by the endpoints
|
||||
:param skip_sync: If True do not update the canary/monitoring state
|
||||
:param update_current_task: is not skip_sync, and is True,
|
||||
update the current Task with the configuration synced from the serving service Task
|
||||
"""
|
||||
if not task:
|
||||
task = self._task
|
||||
@ -315,7 +429,15 @@ class ModelRequestProcessor(object):
|
||||
endpoints = task.get_configuration_object_as_dict(name='endpoints') or {}
|
||||
canary_ep = task.get_configuration_object_as_dict(name='canary') or {}
|
||||
model_monitoring = task.get_configuration_object_as_dict(name='model_monitoring') or {}
|
||||
hashed_conf = hash_dict(dict(endpoints=endpoints, canary_ep=canary_ep, model_monitoring=model_monitoring))
|
||||
metric_logging = task.get_configuration_object_as_dict(name='metric_logging') or {}
|
||||
|
||||
hashed_conf = hash_dict(
|
||||
dict(endpoints=endpoints,
|
||||
canary_ep=canary_ep,
|
||||
model_monitoring=model_monitoring,
|
||||
metric_logging=metric_logging,
|
||||
configuration=configuration)
|
||||
)
|
||||
if self._last_update_hash == hashed_conf and not self._model_monitoring_update_request:
|
||||
return False
|
||||
print("Info: syncing model endpoint configuration, state hash={}".format(hashed_conf))
|
||||
@ -333,13 +455,18 @@ class ModelRequestProcessor(object):
|
||||
k: CanaryEP(**{i: j for i, j in v.items() if hasattr(CanaryEP.__attrs_attrs__, i)})
|
||||
for k, v in canary_ep.items()
|
||||
}
|
||||
metric_logging = {
|
||||
k: EndpointMetricLogging(**{i: j for i, j in v.items() if hasattr(EndpointMetricLogging.__attrs_attrs__, i)})
|
||||
for k, v in metric_logging.items()
|
||||
}
|
||||
|
||||
# if there is no need to sync Canary and Models we can just leave
|
||||
if skip_sync:
|
||||
self._endpoints = endpoints
|
||||
self._model_monitoring = model_monitoring
|
||||
self._canary_endpoints = canary_endpoints
|
||||
self._configuration = configuration
|
||||
self._metric_logging = metric_logging
|
||||
self._deserialize_conf_dict(configuration)
|
||||
return True
|
||||
|
||||
# make sure we only have one stall request at any given moment
|
||||
@ -366,18 +493,21 @@ class ModelRequestProcessor(object):
|
||||
self._endpoints = endpoints
|
||||
self._model_monitoring = model_monitoring
|
||||
self._canary_endpoints = canary_endpoints
|
||||
self._configuration = configuration
|
||||
self._metric_logging = metric_logging
|
||||
self._deserialize_conf_dict(configuration)
|
||||
|
||||
# if we have models we need to sync, now is the time
|
||||
self._sync_monitored_models()
|
||||
|
||||
self._update_canary_lookup()
|
||||
|
||||
self._sync_metric_logging()
|
||||
|
||||
# release stall lock
|
||||
self._update_lock_flag = False
|
||||
|
||||
# update the state on the inference task
|
||||
if Task.current_task() and Task.current_task().id != self._task.id:
|
||||
if update_current_task and Task.current_task() and Task.current_task().id != self._task.id:
|
||||
self.serialize(task=Task.current_task())
|
||||
|
||||
return True
|
||||
@ -394,6 +524,8 @@ class ModelRequestProcessor(object):
|
||||
task.set_configuration_object(name='canary', config_dict=config_dict)
|
||||
config_dict = {k: v.as_dict(remove_null_entries=True) for k, v in self._model_monitoring.items()}
|
||||
task.set_configuration_object(name='model_monitoring', config_dict=config_dict)
|
||||
config_dict = {k: v.as_dict(remove_null_entries=True) for k, v in self._metric_logging.items()}
|
||||
task.set_configuration_object(name='metric_logging', config_dict=config_dict)
|
||||
|
||||
def _update_canary_lookup(self):
|
||||
canary_route = {}
|
||||
@ -547,6 +679,32 @@ class ModelRequestProcessor(object):
|
||||
)
|
||||
return True
|
||||
|
||||
def _sync_metric_logging(self, force: bool = False) -> bool:
|
||||
if not force and not self._metric_logging:
|
||||
return False
|
||||
|
||||
fixed_metric_endpoint = {
|
||||
k: v for k, v in self._metric_logging.items() if "*/" not in k
|
||||
}
|
||||
prefix_metric_endpoint = {k.split("*/")[0]: v for k, v in self._metric_logging.items() if "*/" in k}
|
||||
|
||||
endpoint_metric_logging = {}
|
||||
for k, ep in list(self._endpoints.items()) + list(self._model_monitoring_endpoints.items()):
|
||||
if k in fixed_metric_endpoint:
|
||||
if k not in endpoint_metric_logging:
|
||||
endpoint_metric_logging[k] = fixed_metric_endpoint[k]
|
||||
|
||||
continue
|
||||
for p, v in prefix_metric_endpoint.items():
|
||||
if k.startswith(p):
|
||||
if k not in endpoint_metric_logging:
|
||||
endpoint_metric_logging[k] = v
|
||||
|
||||
break
|
||||
|
||||
self._endpoint_metric_logging = endpoint_metric_logging
|
||||
return True
|
||||
|
||||
def launch(self, poll_frequency_sec=300):
|
||||
"""
|
||||
Launch the background synchronization thread and monitoring thread
|
||||
@ -572,10 +730,15 @@ class ModelRequestProcessor(object):
|
||||
return
|
||||
self._sync_daemon_thread = threading.Thread(
|
||||
target=self._sync_daemon, args=(poll_frequency_sec, ), daemon=True)
|
||||
self._stats_sending_thread = threading.Thread(
|
||||
target=self._stats_send_loop, daemon=True)
|
||||
|
||||
self._sync_daemon_thread.start()
|
||||
self._stats_sending_thread.start()
|
||||
|
||||
# we return immediately
|
||||
|
||||
def _sync_daemon(self, poll_frequency_sec=300):
|
||||
def _sync_daemon(self, poll_frequency_sec: float = 300) -> None:
|
||||
"""
|
||||
Background thread, syncing model changes into request service.
|
||||
"""
|
||||
@ -617,6 +780,44 @@ class ModelRequestProcessor(object):
|
||||
except Exception as ex:
|
||||
print("Exception occurred in monitoring thread: {}".format(ex))
|
||||
|
||||
def _stats_send_loop(self) -> None:
|
||||
"""
|
||||
Background thread for sending stats to Kafka service
|
||||
"""
|
||||
if not self._kafka_stats_url:
|
||||
print("No Kafka Statistics service configured, shutting down statistics report")
|
||||
return
|
||||
|
||||
print("Starting Kafka Statistics reporting: {}".format(self._kafka_stats_url))
|
||||
|
||||
from kafka import KafkaProducer # noqa
|
||||
|
||||
while True:
|
||||
try:
|
||||
producer = KafkaProducer(
|
||||
bootstrap_servers=self._kafka_stats_url, # ['localhost:9092'],
|
||||
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
|
||||
compression_type='lz4', # requires python lz4 package
|
||||
)
|
||||
break
|
||||
except Exception as ex:
|
||||
print("Error: failed opening Kafka consumer [{}]: {}".format(self._kafka_stats_url, ex))
|
||||
print("Retrying in 30 seconds")
|
||||
sleep(30)
|
||||
|
||||
while True:
|
||||
try:
|
||||
stats_dict = self._stats_queue.get(block=True)
|
||||
except Exception as ex:
|
||||
print("Warning: Statistics thread exception: {}".format(ex))
|
||||
break
|
||||
# send into kafka service
|
||||
try:
|
||||
producer.send(self._kafka_topic, value=stats_dict).get()
|
||||
except Exception as ex:
|
||||
print("Warning: Failed to send statistics packet to Kafka service: {}".format(ex))
|
||||
pass
|
||||
|
||||
def get_id(self) -> str:
|
||||
return self._task.id
|
||||
|
||||
@ -781,12 +982,76 @@ class ModelRequestProcessor(object):
|
||||
self._instance_task.get_logger().report_plotly(
|
||||
title='Serving Endpoints Layout', series='', iteration=0, figure=fig)
|
||||
|
||||
@staticmethod
|
||||
def _process_request(processor: BasePreprocessRequest, url: str, body: dict) -> dict:
|
||||
# todo: add some statistics
|
||||
preprocessed = processor.preprocess(body)
|
||||
processed = processor.process(data=preprocessed)
|
||||
return processor.postprocess(data=processed)
|
||||
def _deserialize_conf_dict(self, configuration: dict) -> None:
|
||||
self._configuration = configuration
|
||||
|
||||
# deserialized values go here
|
||||
self._kafka_stats_url = \
|
||||
configuration.get(self._config_key_kafka_stats) or \
|
||||
os.environ.get("CLEARML_DEFAULT_KAFKA_SERVE_URL")
|
||||
self._triton_grpc = \
|
||||
configuration.get(self._config_key_triton_grpc) or \
|
||||
os.environ.get("CLEARML_DEFAULT_TRITON_GRPC_ADDR")
|
||||
self._serving_base_url = \
|
||||
configuration.get(self._config_key_serving_base_url) or \
|
||||
os.environ.get("CLEARML_DEFAULT_BASE_SERVE_URL")
|
||||
self._metric_log_freq = \
|
||||
float(configuration.get(self._config_key_def_metric_freq,
|
||||
os.environ.get("CLEARML_DEFAULT_METRIC_LOG_FREQ", 1.0)))
|
||||
# update back configuration
|
||||
self._configuration[self._config_key_kafka_stats] = self._kafka_stats_url
|
||||
self._configuration[self._config_key_triton_grpc] = self._triton_grpc
|
||||
self._configuration[self._config_key_serving_base_url] = self._serving_base_url
|
||||
self._configuration[self._config_key_def_metric_freq] = self._metric_log_freq
|
||||
# update preprocessing classes
|
||||
BasePreprocessRequest.set_server_config(self._configuration)
|
||||
|
||||
def _process_request(self, processor: BasePreprocessRequest, url: str, body: dict) -> dict:
|
||||
# collect statistics for this request
|
||||
stats = {}
|
||||
stats_collect_fn = None
|
||||
collect_stats = False
|
||||
freq = 1
|
||||
# decide if we are collecting the stats
|
||||
metric_endpoint = self._metric_logging.get(url)
|
||||
if self._kafka_stats_url:
|
||||
freq = metric_endpoint.log_frequency if metric_endpoint and metric_endpoint.log_frequency is not None \
|
||||
else self._metric_log_freq
|
||||
|
||||
if freq and random() <= freq:
|
||||
stats_collect_fn = stats.update
|
||||
collect_stats = True
|
||||
|
||||
tic = time()
|
||||
preprocessed = processor.preprocess(body, stats_collect_fn)
|
||||
processed = processor.process(preprocessed, stats_collect_fn)
|
||||
return_value = processor.postprocess(processed, stats_collect_fn)
|
||||
tic = time() - tic
|
||||
if collect_stats:
|
||||
# 10th of a millisecond should be enough
|
||||
stats['_latency'] = round(tic, 4)
|
||||
stats['_count'] = int(1.0/freq)
|
||||
stats['_url'] = url
|
||||
|
||||
# collect inputs
|
||||
if metric_endpoint and body:
|
||||
for k, v in body.items():
|
||||
if k in metric_endpoint.metrics:
|
||||
stats[k] = v
|
||||
# collect outputs
|
||||
if metric_endpoint and return_value:
|
||||
for k, v in return_value.items():
|
||||
if k in metric_endpoint.metrics:
|
||||
stats[k] = v
|
||||
|
||||
# send stats in background, push it into a thread queue
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self._stats_queue.put(stats, block=False)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return return_value
|
||||
|
||||
@classmethod
|
||||
def list_control_plane_tasks(
|
||||
@ -894,4 +1159,3 @@ class ModelRequestProcessor(object):
|
||||
if not endpoint.auxiliary_cfg and missing:
|
||||
raise ValueError("Triton engine requires input description - missing values in {}".format(missing))
|
||||
return True
|
||||
|
||||
|
@ -14,13 +14,13 @@ class BasePreprocessRequest(object):
|
||||
__preprocessing_lookup = {}
|
||||
__preprocessing_modules = set()
|
||||
_default_serving_base_url = "http://127.0.0.1:8080/serve/"
|
||||
_server_config = {} # externally configured by the serving inference service
|
||||
_timeout = None # timeout in seconds for the entire request, set in __init__
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model_endpoint: ModelEndpoint,
|
||||
task: Task = None,
|
||||
server_config: dict = None,
|
||||
):
|
||||
"""
|
||||
Notice this object is not be created per request, but once per Process
|
||||
@ -29,71 +29,102 @@ class BasePreprocessRequest(object):
|
||||
self.model_endpoint = model_endpoint
|
||||
self._preprocess = None
|
||||
self._model = None
|
||||
self._server_config = server_config or {}
|
||||
if self._timeout is None:
|
||||
self._timeout = int(float(os.environ.get('GUNICORN_SERVING_TIMEOUT', 600)) * 0.8)
|
||||
|
||||
# load preprocessing code here
|
||||
if self.model_endpoint.preprocess_artifact:
|
||||
if not task or self.model_endpoint.preprocess_artifact not in task.artifacts:
|
||||
print("Warning: could not find preprocessing artifact \'{}\' on Task id={}".format(
|
||||
raise ValueError("Error: could not find preprocessing artifact \'{}\' on Task id={}".format(
|
||||
self.model_endpoint.preprocess_artifact, task.id))
|
||||
else:
|
||||
try:
|
||||
path = task.artifacts[self.model_endpoint.preprocess_artifact].get_local_copy()
|
||||
# check file content hash, should only happens once?!
|
||||
# noinspection PyProtectedMember
|
||||
file_hash, _ = sha256sum(path, block_size=Artifacts._hash_block_size)
|
||||
if file_hash != task.artifacts[self.model_endpoint.preprocess_artifact].hash:
|
||||
print("INFO: re-downloading artifact '{}' hash changed".format(
|
||||
self.model_endpoint.preprocess_artifact))
|
||||
path = task.artifacts[self.model_endpoint.preprocess_artifact].get_local_copy(
|
||||
extract_archive=True,
|
||||
force_download=True,
|
||||
)
|
||||
else:
|
||||
# extract zip if we need to, otherwise it will be the same
|
||||
path = task.artifacts[self.model_endpoint.preprocess_artifact].get_local_copy(
|
||||
extract_archive=True,
|
||||
)
|
||||
|
||||
import importlib.util
|
||||
spec = importlib.util.spec_from_file_location("Preprocess", path)
|
||||
_preprocess = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(_preprocess)
|
||||
Preprocess = _preprocess.Preprocess # noqa
|
||||
# override `send_request` method
|
||||
Preprocess.send_request = BasePreprocessRequest._preprocess_send_request
|
||||
self._preprocess = Preprocess()
|
||||
self._preprocess.serving_config = server_config or {}
|
||||
if callable(getattr(self._preprocess, 'load', None)):
|
||||
self._model = self._preprocess.load(self._get_local_model_file())
|
||||
self._instantiate_custom_preprocess_cls(task)
|
||||
except Exception as ex:
|
||||
print("Warning: Failed loading preprocess code for \'{}\': {}".format(
|
||||
raise ValueError("Error: Failed loading preprocess code for \'{}\': {}".format(
|
||||
self.model_endpoint.preprocess_artifact, ex))
|
||||
|
||||
def preprocess(self, request):
|
||||
# type: (dict) -> Optional[Any]
|
||||
def _instantiate_custom_preprocess_cls(self, task: Task) -> None:
|
||||
path = task.artifacts[self.model_endpoint.preprocess_artifact].get_local_copy()
|
||||
# check file content hash, should only happens once?!
|
||||
# noinspection PyProtectedMember
|
||||
file_hash, _ = sha256sum(path, block_size=Artifacts._hash_block_size)
|
||||
if file_hash != task.artifacts[self.model_endpoint.preprocess_artifact].hash:
|
||||
print("INFO: re-downloading artifact '{}' hash changed".format(
|
||||
self.model_endpoint.preprocess_artifact))
|
||||
path = task.artifacts[self.model_endpoint.preprocess_artifact].get_local_copy(
|
||||
extract_archive=True,
|
||||
force_download=True,
|
||||
)
|
||||
else:
|
||||
# extract zip if we need to, otherwise it will be the same
|
||||
path = task.artifacts[self.model_endpoint.preprocess_artifact].get_local_copy(
|
||||
extract_archive=True,
|
||||
)
|
||||
|
||||
import importlib.util
|
||||
spec = importlib.util.spec_from_file_location("Preprocess", path)
|
||||
_preprocess = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(_preprocess)
|
||||
Preprocess = _preprocess.Preprocess # noqa
|
||||
# override `send_request` method
|
||||
Preprocess.send_request = BasePreprocessRequest._preprocess_send_request
|
||||
# create preprocess class
|
||||
self._preprocess = Preprocess()
|
||||
# custom model load callback function
|
||||
if callable(getattr(self._preprocess, 'load', None)):
|
||||
self._model = self._preprocess.load(self._get_local_model_file())
|
||||
|
||||
def preprocess(self, request: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Optional[Any]:
|
||||
"""
|
||||
Raise exception to report an error
|
||||
Return value will be passed to serving engine
|
||||
|
||||
:param request: dictionary as recieved from the RestAPI
|
||||
:param collect_custom_statistics_fn: Optional, allows to send a custom set of key/values
|
||||
to the statictics collector servicd
|
||||
|
||||
Usage example:
|
||||
>>> print(request)
|
||||
{"x0": 1, "x1": 2}
|
||||
>>> collect_custom_statistics_fn({"x0": 1, "x1": 2})
|
||||
|
||||
:return: Object to be passed directly to the model inference
|
||||
"""
|
||||
if self._preprocess is not None and hasattr(self._preprocess, 'preprocess'):
|
||||
return self._preprocess.preprocess(request)
|
||||
return self._preprocess.preprocess(request, collect_custom_statistics_fn)
|
||||
return request
|
||||
|
||||
def postprocess(self, data):
|
||||
# type: (Any) -> Optional[dict]
|
||||
def postprocess(self, data: Any, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Optional[dict]:
|
||||
"""
|
||||
Raise exception to report an error
|
||||
Return value will be passed to serving engine
|
||||
|
||||
:param data: object as recieved from the inference model function
|
||||
:param collect_custom_statistics_fn: Optional, allows to send a custom set of key/values
|
||||
to the statictics collector servicd
|
||||
|
||||
Usage example:
|
||||
>>> collect_custom_statistics_fn({"y": 1})
|
||||
|
||||
:return: Dictionary passed directly as the returned result of the RestAPI
|
||||
"""
|
||||
if self._preprocess is not None and hasattr(self._preprocess, 'postprocess'):
|
||||
return self._preprocess.postprocess(data)
|
||||
return self._preprocess.postprocess(data, collect_custom_statistics_fn)
|
||||
return data
|
||||
|
||||
def process(self, data: Any) -> Any:
|
||||
def process(self, data: Any, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
|
||||
"""
|
||||
The actual processing function. Can be send to external service
|
||||
|
||||
:param data: object as recieved from the preprocessing function
|
||||
:param collect_custom_statistics_fn: Optional, allows to send a custom set of key/values
|
||||
to the statictics collector servicd
|
||||
|
||||
Usage example:
|
||||
>>> collect_custom_statistics_fn({"type": "classification"})
|
||||
|
||||
:return: Object to be passed tp the post-processing function
|
||||
"""
|
||||
pass
|
||||
|
||||
@ -101,6 +132,14 @@ class BasePreprocessRequest(object):
|
||||
model_repo_object = Model(model_id=self.model_endpoint.model_id)
|
||||
return model_repo_object.get_local_copy()
|
||||
|
||||
@classmethod
|
||||
def set_server_config(cls, server_config: dict) -> None:
|
||||
cls._server_config = server_config
|
||||
|
||||
@classmethod
|
||||
def get_server_config(cls) -> dict:
|
||||
return cls._server_config
|
||||
|
||||
@classmethod
|
||||
def validate_engine_type(cls, engine: str) -> bool:
|
||||
return engine in cls.__preprocessing_lookup
|
||||
@ -137,7 +176,7 @@ class BasePreprocessRequest(object):
|
||||
@staticmethod
|
||||
def _preprocess_send_request(self, endpoint: str, version: str = None, data: dict = None) -> Optional[dict]:
|
||||
endpoint = "{}/{}".format(endpoint.strip("/"), version.strip("/")) if version else endpoint.strip("/")
|
||||
base_url = self.serving_config.get("base_serving_url") if self.serving_config else None
|
||||
base_url = BasePreprocessRequest.get_server_config().get("base_serving_url")
|
||||
base_url = (base_url or BasePreprocessRequest._default_serving_base_url).strip("/")
|
||||
url = "{}/{}".format(base_url, endpoint.strip("/"))
|
||||
return_value = request_post(url, json=data, timeout=BasePreprocessRequest._timeout)
|
||||
@ -159,40 +198,50 @@ class TritonPreprocessRequest(BasePreprocessRequest):
|
||||
np.float32: 'fp32_contents',
|
||||
np.float64: 'fp64_contents',
|
||||
}
|
||||
_default_grpc_address = "127.0.0.1:8001"
|
||||
_ext_grpc = None
|
||||
_ext_np_to_triton_dtype = None
|
||||
_ext_service_pb2 = None
|
||||
_ext_service_pb2_grpc = None
|
||||
|
||||
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None, server_config: dict = None):
|
||||
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None):
|
||||
super(TritonPreprocessRequest, self).__init__(
|
||||
model_endpoint=model_endpoint, task=task, server_config=server_config)
|
||||
model_endpoint=model_endpoint, task=task)
|
||||
|
||||
# load Triton Module
|
||||
if self._ext_grpc is None:
|
||||
import grpc
|
||||
import grpc # noqa
|
||||
self._ext_grpc = grpc
|
||||
|
||||
if self._ext_np_to_triton_dtype is None:
|
||||
from tritonclient.utils import np_to_triton_dtype
|
||||
from tritonclient.utils import np_to_triton_dtype # noqa
|
||||
self._ext_np_to_triton_dtype = np_to_triton_dtype
|
||||
|
||||
if self._ext_service_pb2 is None:
|
||||
from tritonclient.grpc import service_pb2, service_pb2_grpc
|
||||
from tritonclient.grpc import service_pb2, service_pb2_grpc # noqa
|
||||
self._ext_service_pb2 = service_pb2
|
||||
self._ext_service_pb2_grpc = service_pb2_grpc
|
||||
|
||||
def process(self, data: Any) -> Any:
|
||||
def process(self, data: Any, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
|
||||
"""
|
||||
The actual processing function.
|
||||
Detect gRPC server and send the request to it
|
||||
|
||||
:param data: object as recieved from the preprocessing function
|
||||
:param collect_custom_statistics_fn: Optional, allows to send a custom set of key/values
|
||||
to the statictics collector servicd
|
||||
|
||||
Usage example:
|
||||
>>> collect_custom_statistics_fn({"type": "classification"})
|
||||
|
||||
:return: Object to be passed tp the post-processing function
|
||||
"""
|
||||
# allow to override bt preprocessing class
|
||||
if self._preprocess is not None and hasattr(self._preprocess, "process"):
|
||||
return self._preprocess.process(data)
|
||||
return self._preprocess.process(data, collect_custom_statistics_fn)
|
||||
|
||||
# Create gRPC stub for communicating with the server
|
||||
triton_server_address = self._server_config.get("triton_grpc_server")
|
||||
triton_server_address = self._server_config.get("triton_grpc_server") or self._default_grpc_address
|
||||
if not triton_server_address:
|
||||
raise ValueError("External Triton gRPC server is not configured!")
|
||||
try:
|
||||
@ -255,15 +304,15 @@ class TritonPreprocessRequest(BasePreprocessRequest):
|
||||
|
||||
@BasePreprocessRequest.register_engine("sklearn", modules=["joblib", "sklearn"])
|
||||
class SKLearnPreprocessRequest(BasePreprocessRequest):
|
||||
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None, server_config: dict = None):
|
||||
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None):
|
||||
super(SKLearnPreprocessRequest, self).__init__(
|
||||
model_endpoint=model_endpoint, task=task, server_config=server_config)
|
||||
model_endpoint=model_endpoint, task=task)
|
||||
if self._model is None:
|
||||
# get model
|
||||
import joblib
|
||||
import joblib # noqa
|
||||
self._model = joblib.load(filename=self._get_local_model_file())
|
||||
|
||||
def process(self, data: Any) -> Any:
|
||||
def process(self, data: Any, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
|
||||
"""
|
||||
The actual processing function.
|
||||
We run the model in this context
|
||||
@ -273,16 +322,16 @@ class SKLearnPreprocessRequest(BasePreprocessRequest):
|
||||
|
||||
@BasePreprocessRequest.register_engine("xgboost", modules=["xgboost"])
|
||||
class XGBoostPreprocessRequest(BasePreprocessRequest):
|
||||
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None, server_config: dict = None):
|
||||
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None):
|
||||
super(XGBoostPreprocessRequest, self).__init__(
|
||||
model_endpoint=model_endpoint, task=task, server_config=server_config)
|
||||
model_endpoint=model_endpoint, task=task)
|
||||
if self._model is None:
|
||||
# get model
|
||||
import xgboost
|
||||
import xgboost # noqa
|
||||
self._model = xgboost.Booster()
|
||||
self._model.load_model(self._get_local_model_file())
|
||||
|
||||
def process(self, data: Any) -> Any:
|
||||
def process(self, data: Any, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
|
||||
"""
|
||||
The actual processing function.
|
||||
We run the model in this context
|
||||
@ -292,15 +341,15 @@ class XGBoostPreprocessRequest(BasePreprocessRequest):
|
||||
|
||||
@BasePreprocessRequest.register_engine("lightgbm", modules=["lightgbm"])
|
||||
class LightGBMPreprocessRequest(BasePreprocessRequest):
|
||||
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None, server_config: dict = None):
|
||||
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None):
|
||||
super(LightGBMPreprocessRequest, self).__init__(
|
||||
model_endpoint=model_endpoint, task=task, server_config=server_config)
|
||||
model_endpoint=model_endpoint, task=task)
|
||||
if self._model is None:
|
||||
# get model
|
||||
import lightgbm
|
||||
import lightgbm # noqa
|
||||
self._model = lightgbm.Booster(model_file=self._get_local_model_file())
|
||||
|
||||
def process(self, data: Any) -> Any:
|
||||
def process(self, data: Any, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
|
||||
"""
|
||||
The actual processing function.
|
||||
We run the model in this context
|
||||
@ -310,15 +359,15 @@ class LightGBMPreprocessRequest(BasePreprocessRequest):
|
||||
|
||||
@BasePreprocessRequest.register_engine("custom")
|
||||
class CustomPreprocessRequest(BasePreprocessRequest):
|
||||
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None, server_config: dict = None):
|
||||
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None):
|
||||
super(CustomPreprocessRequest, self).__init__(
|
||||
model_endpoint=model_endpoint, task=task, server_config=server_config)
|
||||
model_endpoint=model_endpoint, task=task)
|
||||
|
||||
def process(self, data: Any) -> Any:
|
||||
def process(self, data: Any, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
|
||||
"""
|
||||
The actual processing function.
|
||||
We run the process in this context
|
||||
"""
|
||||
if self._preprocess is not None and hasattr(self._preprocess, 'process'):
|
||||
return self._preprocess.process(data)
|
||||
return self._preprocess.process(data, collect_custom_statistics_fn)
|
||||
return None
|
||||
|
@ -1,17 +1,17 @@
|
||||
clearml >= 1.1.6
|
||||
attrs
|
||||
fastapi[all]
|
||||
clearml>=1.3.1
|
||||
attrs>=20.3.0,<21
|
||||
fastapi[all]>=0.75.0,<0.76
|
||||
uvicorn[standard]
|
||||
gunicorn
|
||||
pyzmq
|
||||
asyncio
|
||||
aiocache
|
||||
tritonclient[grpc]
|
||||
numpy
|
||||
pandas
|
||||
scikit-learn
|
||||
gunicorn>=20.1.0,<20.2
|
||||
asyncio>=3.4.3,<3.5
|
||||
aiocache>=0.11.1,<0.12
|
||||
tritonclient[grpc]>=2.18.0,<2.19
|
||||
numpy>=1.20,<1.24
|
||||
scikit-learn>=1.0.2,<1.1
|
||||
grpcio
|
||||
Pillow
|
||||
xgboost
|
||||
lightgbm
|
||||
requests
|
||||
Pillow>=9.0.1,<10
|
||||
xgboost>=1.5.2,<1.6
|
||||
lightgbm>=3.3.2,<3.4
|
||||
requests>=2.25.1,<2.26
|
||||
kafka-python>=2.0.2,<2.1
|
||||
lz4>=4.0.0,<5
|
21
clearml_serving/statistics/Dockerfile
Normal file
21
clearml_serving/statistics/Dockerfile
Normal file
@ -0,0 +1,21 @@
|
||||
FROM python:3.9-bullseye
|
||||
|
||||
|
||||
ENV LC_ALL=C.UTF-8
|
||||
|
||||
# install base package
|
||||
RUN pip3 install clearml-serving
|
||||
|
||||
# get latest execution code from the git repository
|
||||
# RUN cd $HOME && git clone https://github.com/allegroai/clearml-serving.git
|
||||
COPY clearml_serving /root/clearml/clearml_serving
|
||||
|
||||
RUN pip3 install -r /root/clearml/clearml_serving/statistics/requirements.txt
|
||||
|
||||
# default serving port
|
||||
EXPOSE 9999
|
||||
|
||||
# environement variable to load Task from CLEARML_SERVING_TASK_ID, CLEARML_SERVING_PORT
|
||||
|
||||
WORKDIR /root/clearml/
|
||||
ENTRYPOINT ["clearml_serving/statistics/entrypoint.sh"]
|
0
clearml_serving/statistics/__init__.py
Normal file
0
clearml_serving/statistics/__init__.py
Normal file
26
clearml_serving/statistics/entrypoint.sh
Executable file
26
clearml_serving/statistics/entrypoint.sh
Executable file
@ -0,0 +1,26 @@
|
||||
#!/bin/bash
|
||||
|
||||
# print configuration
|
||||
echo CLEARML_SERVING_TASK_ID="$CLEARML_SERVING_TASK_ID"
|
||||
echo CLEARML_SERVING_PORT="$CLEARML_SERVING_PORT"
|
||||
echo EXTRA_PYTHON_PACKAGES="$EXTRA_PYTHON_PACKAGES"
|
||||
echo CLEARML_SERVING_POLL_FREQ="$CLEARML_SERVING_POLL_FREQ"
|
||||
echo CLEARML_DEFAULT_KAFKA_SERVE_URL="$CLEARML_DEFAULT_KAFKA_SERVE_URL"
|
||||
|
||||
SERVING_PORT="${CLEARML_SERVING_PORT:-9999}"
|
||||
|
||||
# set default internal serve endpoint (for request pipelining)
|
||||
CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/serve}"
|
||||
CLEARML_DEFAULT_TRITON_GRPC_ADDR="${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-127.0.0.1:8001}"
|
||||
|
||||
# print configuration
|
||||
echo SERVING_PORT="$SERVING_PORT"
|
||||
|
||||
# runtime add extra python packages
|
||||
if [ ! -z "$EXTRA_PYTHON_PACKAGES" ]
|
||||
then
|
||||
python3 -m pip install $EXTRA_PYTHON_PACKAGES
|
||||
fi
|
||||
|
||||
echo "Starting Statistics Controller server"
|
||||
PYTHONPATH=$(pwd) python3 clearml_serving/statistics/main.py
|
41
clearml_serving/statistics/main.py
Normal file
41
clearml_serving/statistics/main.py
Normal file
@ -0,0 +1,41 @@
|
||||
import os
|
||||
|
||||
import prometheus_client
|
||||
from clearml import Task
|
||||
|
||||
from clearml_serving.serving.model_request_processor import ModelRequestProcessor
|
||||
from clearml_serving.statistics.metrics import StatisticsController
|
||||
|
||||
|
||||
def main():
|
||||
serving_service_task_id = os.environ.get("CLEARML_SERVING_TASK_ID", None)
|
||||
model_sync_frequency_secs = 5
|
||||
try:
|
||||
model_sync_frequency_secs = float(os.environ.get("CLEARML_SERVING_POLL_FREQ", model_sync_frequency_secs))
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
# noinspection PyProtectedMember
|
||||
serving_task = ModelRequestProcessor._get_control_plane_task(task_id=serving_service_task_id)
|
||||
# create a new serving instance (for visibility and monitoring)
|
||||
instance_task = Task.init(
|
||||
project_name=serving_task.get_project_name(),
|
||||
task_name="{} - statistics controller".format(serving_task.name),
|
||||
task_type="monitor",
|
||||
)
|
||||
instance_task.set_system_tags(["service"])
|
||||
# noinspection PyProtectedMember
|
||||
kafka_server_url = os.environ.get("CLEARML_DEFAULT_KAFKA_SERVE_URL", "localhost:9092")
|
||||
stats_controller = StatisticsController(
|
||||
task=instance_task,
|
||||
kafka_server_url=kafka_server_url,
|
||||
serving_id=serving_service_task_id,
|
||||
poll_frequency_min=model_sync_frequency_secs
|
||||
)
|
||||
prometheus_client.start_http_server(int(os.environ.get("CLEARML_SERVING_PORT", 9999)))
|
||||
# we will never leave here
|
||||
stats_controller.start()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
352
clearml_serving/statistics/metrics.py
Normal file
352
clearml_serving/statistics/metrics.py
Normal file
@ -0,0 +1,352 @@
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from copy import deepcopy
|
||||
from functools import partial
|
||||
from threading import Event, Thread
|
||||
from time import time, sleep
|
||||
|
||||
from clearml import Task
|
||||
from typing import Optional, Dict, Any, Iterable
|
||||
|
||||
from prometheus_client import Histogram, Enum, Gauge, Counter, values
|
||||
from kafka import KafkaConsumer
|
||||
from prometheus_client.metrics import MetricWrapperBase, _validate_exemplar
|
||||
from prometheus_client.registry import REGISTRY
|
||||
from prometheus_client.samples import Exemplar, Sample
|
||||
from prometheus_client.context_managers import Timer
|
||||
from prometheus_client.utils import floatToGoString
|
||||
|
||||
from ..serving.endpoints import EndpointMetricLogging
|
||||
from ..serving.model_request_processor import ModelRequestProcessor
|
||||
|
||||
|
||||
class ScalarHistogram(Histogram):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def observe(self, amount, exemplar=None):
|
||||
"""Observe the given amount.
|
||||
|
||||
The amount is usually positive or zero. Negative values are
|
||||
accepted but prevent current versions of Prometheus from
|
||||
properly detecting counter resets in the sum of
|
||||
observations. See
|
||||
https://prometheus.io/docs/practices/histograms/#count-and-sum-of-observations
|
||||
for details.
|
||||
"""
|
||||
self._raise_if_not_observable()
|
||||
self._sum.inc(1)
|
||||
for i, bound in enumerate(self._upper_bounds):
|
||||
if amount <= bound:
|
||||
self._buckets[i].inc(1)
|
||||
if exemplar:
|
||||
_validate_exemplar(exemplar)
|
||||
self._buckets[i].set_exemplar(Exemplar(exemplar, amount, time()))
|
||||
break
|
||||
|
||||
def _child_samples(self) -> Iterable[Sample]:
|
||||
samples = []
|
||||
for i, bound in enumerate(self._upper_bounds):
|
||||
acc = self._buckets[i].get()
|
||||
samples.append(
|
||||
Sample('_bucket', {'le': floatToGoString(bound)}, acc, None, self._buckets[i].get_exemplar())
|
||||
)
|
||||
samples.append(Sample('_sum', {'le': floatToGoString(bound)}, self._sum.get(), None, None))
|
||||
|
||||
return tuple(samples)
|
||||
|
||||
|
||||
class EnumHistogram(MetricWrapperBase):
|
||||
"""A Histogram tracks the size and number of events in buckets.
|
||||
|
||||
You can use Histograms for aggregatable calculation of quantiles.
|
||||
|
||||
Example use cases:
|
||||
- Response latency
|
||||
- Request size
|
||||
|
||||
Example for a Histogram:
|
||||
|
||||
from prometheus_client import Histogram
|
||||
|
||||
h = Histogram('request_size_bytes', 'Request size (bytes)')
|
||||
h.observe(512) # Observe 512 (bytes)
|
||||
|
||||
Example for a Histogram using time:
|
||||
|
||||
from prometheus_client import Histogram
|
||||
|
||||
REQUEST_TIME = Histogram('response_latency_seconds', 'Response latency (seconds)')
|
||||
|
||||
@REQUEST_TIME.time()
|
||||
def create_response(request):
|
||||
'''A dummy function'''
|
||||
time.sleep(1)
|
||||
|
||||
Example of using the same Histogram object as a context manager:
|
||||
|
||||
with REQUEST_TIME.time():
|
||||
pass # Logic to be timed
|
||||
|
||||
The default buckets are intended to cover a typical web/rpc request from milliseconds to seconds.
|
||||
They can be overridden by passing `buckets` keyword argument to `Histogram`.
|
||||
"""
|
||||
_type = 'histogram'
|
||||
|
||||
def __init__(self,
|
||||
name,
|
||||
documentation,
|
||||
buckets,
|
||||
labelnames=(),
|
||||
namespace='',
|
||||
subsystem='',
|
||||
unit='',
|
||||
registry=REGISTRY,
|
||||
_labelvalues=None,
|
||||
):
|
||||
self._prepare_buckets(buckets)
|
||||
super().__init__(
|
||||
name=name,
|
||||
documentation=documentation,
|
||||
labelnames=labelnames,
|
||||
namespace=namespace,
|
||||
subsystem=subsystem,
|
||||
unit=unit,
|
||||
registry=registry,
|
||||
_labelvalues=_labelvalues,
|
||||
)
|
||||
self._kwargs['buckets'] = buckets
|
||||
|
||||
def _prepare_buckets(self, buckets):
|
||||
buckets = [str(b) for b in buckets]
|
||||
if buckets != sorted(buckets):
|
||||
# This is probably an error on the part of the user,
|
||||
# so raise rather than sorting for them.
|
||||
raise ValueError('Buckets not in sorted order')
|
||||
|
||||
if len(buckets) < 2:
|
||||
raise ValueError('Must have at least two buckets')
|
||||
self._upper_bounds = buckets
|
||||
|
||||
def _metric_init(self):
|
||||
self._buckets = {}
|
||||
self._created = time()
|
||||
bucket_labelnames = self._upper_bounds
|
||||
self._sum = values.ValueClass(
|
||||
self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues)
|
||||
for b in self._upper_bounds:
|
||||
self._buckets[b] = values.ValueClass(
|
||||
self._type,
|
||||
self._name,
|
||||
self._name + '_bucket',
|
||||
bucket_labelnames,
|
||||
self._labelvalues + (b,))
|
||||
|
||||
def observe(self, amount, exemplar=None):
|
||||
"""Observe the given amount.
|
||||
|
||||
The amount is usually positive or zero. Negative values are
|
||||
accepted but prevent current versions of Prometheus from
|
||||
properly detecting counter resets in the sum of
|
||||
observations. See
|
||||
https://prometheus.io/docs/practices/histograms/#count-and-sum-of-observations
|
||||
for details.
|
||||
"""
|
||||
self._raise_if_not_observable()
|
||||
if not isinstance(amount, (list, tuple)):
|
||||
amount = [amount]
|
||||
self._sum.inc(len(amount))
|
||||
for v in amount:
|
||||
self._buckets[v].inc(1)
|
||||
if exemplar:
|
||||
_validate_exemplar(exemplar)
|
||||
self._buckets[v].set_exemplar(Exemplar(exemplar, 1, time()))
|
||||
|
||||
def time(self):
|
||||
"""Time a block of code or function, and observe the duration in seconds.
|
||||
|
||||
Can be used as a function decorator or context manager.
|
||||
"""
|
||||
return Timer(self, 'observe')
|
||||
|
||||
def _child_samples(self) -> Iterable[Sample]:
|
||||
samples = []
|
||||
for i in self._buckets:
|
||||
acc = self._buckets[i].get()
|
||||
samples.append(Sample(
|
||||
'_bucket', {'enum': i}, acc, None, self._buckets[i].get_exemplar()))
|
||||
samples.append(Sample('_sum', {'enum': i}, self._sum.get(), None, None))
|
||||
|
||||
return tuple(samples)
|
||||
|
||||
|
||||
class StatisticsController(object):
|
||||
_reserved = {
|
||||
'_latency': partial(ScalarHistogram, buckets=(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0)),
|
||||
'_count': Counter
|
||||
}
|
||||
_metric_type_class = {"scalar": ScalarHistogram, "enum": EnumHistogram, "value": Gauge, "counter": Counter}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
task: Task,
|
||||
kafka_server_url: str,
|
||||
serving_id: Optional[str],
|
||||
poll_frequency_min: float = 5
|
||||
):
|
||||
self.task = task
|
||||
self._serving_service_task_id = serving_id
|
||||
self._poll_frequency_min = float(poll_frequency_min)
|
||||
self._serving_service = None # type: Optional[ModelRequestProcessor]
|
||||
self._current_endpoints = {} # type: Optional[Dict[str, EndpointMetricLogging]]
|
||||
self._prometheus_metrics = {} # type: Optional[Dict[str, Dict[str, MetricWrapperBase]]]
|
||||
self._timestamp = time()
|
||||
self._sync_thread = None
|
||||
self._last_sync_time = time()
|
||||
self._dirty = False
|
||||
self._sync_event = Event()
|
||||
self._sync_threshold_sec = 30
|
||||
self._kafka_server = kafka_server_url
|
||||
# noinspection PyProtectedMember
|
||||
self._kafka_topic = ModelRequestProcessor._kafka_topic
|
||||
|
||||
def start(self):
|
||||
self._serving_service = ModelRequestProcessor(task_id=self._serving_service_task_id)
|
||||
|
||||
if not self._sync_thread:
|
||||
self._sync_thread = Thread(target=self._sync_daemon, daemon=True)
|
||||
self._sync_thread.start()
|
||||
|
||||
# noinspection PyProtectedMember
|
||||
kafka_server = \
|
||||
self._serving_service.get_configuration().get(ModelRequestProcessor._config_key_kafka_stats) or \
|
||||
self._kafka_server
|
||||
|
||||
print("Starting Kafka Statistics processing: {}".format(kafka_server))
|
||||
|
||||
while True:
|
||||
try:
|
||||
consumer = KafkaConsumer(self._kafka_topic, bootstrap_servers=kafka_server)
|
||||
break
|
||||
except Exception as ex:
|
||||
print("Error: failed opening Kafka consumer [{}]: {}".format(kafka_server, ex))
|
||||
print("Retrying in 30 seconds")
|
||||
sleep(30)
|
||||
|
||||
# we will never leave this loop
|
||||
for message in consumer:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
data = json.loads(message.value.decode("utf-8"))
|
||||
except Exception:
|
||||
print("Warning: failed to decode kafka stats message")
|
||||
continue
|
||||
try:
|
||||
url = data.pop("_url", None)
|
||||
if not url:
|
||||
# should not happen
|
||||
continue
|
||||
endpoint_metric = self._current_endpoints.get(url)
|
||||
if not endpoint_metric:
|
||||
# add default one, we will just log the reserved valued:
|
||||
endpoint_metric = dict()
|
||||
self._current_endpoints[url] = EndpointMetricLogging(endpoint=url)
|
||||
# we should sync,
|
||||
if time()-self._last_sync_time > self._sync_threshold_sec:
|
||||
self._last_sync_time = time()
|
||||
self._sync_event.set()
|
||||
|
||||
metric_url_log = self._prometheus_metrics.get(url)
|
||||
if not metric_url_log:
|
||||
# create a new one
|
||||
metric_url_log = dict()
|
||||
self._prometheus_metrics[url] = metric_url_log
|
||||
|
||||
# check if we have the prometheus_logger
|
||||
for k, v in data.items():
|
||||
prometheus_logger = metric_url_log.get(k)
|
||||
if not prometheus_logger:
|
||||
prometheus_logger = self._create_prometheus_logger_class(url, k, endpoint_metric)
|
||||
if not prometheus_logger:
|
||||
continue
|
||||
metric_url_log[k] = prometheus_logger
|
||||
|
||||
self._report_value(prometheus_logger, v)
|
||||
|
||||
except Exception as ex:
|
||||
print("Warning: failed to report stat to Prometheus: {}".format(ex))
|
||||
continue
|
||||
|
||||
@staticmethod
|
||||
def _report_value(prometheus_logger: Optional[MetricWrapperBase], v: Any) -> bool:
|
||||
if not prometheus_logger:
|
||||
# this means no one configured the variable to log
|
||||
return False
|
||||
elif isinstance(prometheus_logger, (Histogram, EnumHistogram)):
|
||||
prometheus_logger.observe(amount=v)
|
||||
elif isinstance(prometheus_logger, Gauge):
|
||||
prometheus_logger.set(value=v)
|
||||
elif isinstance(prometheus_logger, Counter):
|
||||
prometheus_logger.inc(amount=v)
|
||||
elif isinstance(prometheus_logger, Enum):
|
||||
prometheus_logger.state(state=v)
|
||||
else:
|
||||
# we should not get here
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _create_prometheus_logger_class(
|
||||
self,
|
||||
url: str,
|
||||
variable_name: str,
|
||||
endpoint_config: EndpointMetricLogging
|
||||
) -> Optional[MetricWrapperBase]:
|
||||
reserved_cls = self._reserved.get(variable_name)
|
||||
name = "{}:{}".format(url, variable_name)
|
||||
name = re.sub(r"[^(a-zA-Z0-9_:)]", "_", name)
|
||||
if reserved_cls:
|
||||
return reserved_cls(name=name, documentation="Built in {}".format(variable_name))
|
||||
|
||||
if not endpoint_config:
|
||||
# we should not end up here
|
||||
return None
|
||||
|
||||
metric_ = endpoint_config.metrics.get(variable_name)
|
||||
if not metric_:
|
||||
return None
|
||||
metric_cls = self._metric_type_class.get(metric_.type)
|
||||
if not metric_cls:
|
||||
return None
|
||||
if metric_cls in (Histogram, EnumHistogram):
|
||||
return metric_cls(
|
||||
name=name,
|
||||
documentation="User defined metric {}".format(metric_.type),
|
||||
buckets=metric_.buckets
|
||||
)
|
||||
return metric_cls(name=name, documentation="User defined metric {}".format(metric_.type))
|
||||
|
||||
def _sync_daemon(self):
|
||||
self._last_sync_time = time()
|
||||
poll_freq_sec = self._poll_frequency_min*60
|
||||
print("Instance [{}, pid={}]: Launching - configuration sync every {} sec".format(
|
||||
self.task.id, os.getpid(), poll_freq_sec))
|
||||
while True:
|
||||
try:
|
||||
self._serving_service.deserialize()
|
||||
endpoint_metrics = self._serving_service.list_endpoint_logging()
|
||||
self._last_sync_time = time()
|
||||
if self._current_endpoints == endpoint_metrics:
|
||||
self._sync_event.wait(timeout=poll_freq_sec)
|
||||
self._sync_event.clear()
|
||||
continue
|
||||
|
||||
# update metrics:
|
||||
self._dirty = True
|
||||
self._current_endpoints = deepcopy(endpoint_metrics)
|
||||
print("New configuration synced")
|
||||
except Exception as ex:
|
||||
print("Warning: failed to sync state from serving service Task: {}".format(ex))
|
||||
continue
|
6
clearml_serving/statistics/requirements.txt
Normal file
6
clearml_serving/statistics/requirements.txt
Normal file
@ -0,0 +1,6 @@
|
||||
clearml>=1.3.1
|
||||
numpy>=1.20,<1.24
|
||||
requests>=2.25.1,<2.26
|
||||
kafka-python>=2.0.2,<2.1
|
||||
prometheus_client>=0.13.1,<0.14
|
||||
lz4>=4.0.0,<5
|
8
docker/datasource.yml
Normal file
8
docker/datasource.yml
Normal file
@ -0,0 +1,8 @@
|
||||
apiVersion: 1
|
||||
|
||||
datasources:
|
||||
- name: Prometheus
|
||||
type: prometheus
|
||||
# Access mode - proxy (server in the UI) or direct (browser in the UI).
|
||||
access: proxy
|
||||
url: http://clearml-serving-prometheus:9090
|
151
docker/docker-compose-triton-gpu.yml
Normal file
151
docker/docker-compose-triton-gpu.yml
Normal file
@ -0,0 +1,151 @@
|
||||
version: "3"
|
||||
|
||||
services:
|
||||
zookeeper:
|
||||
image: bitnami/zookeeper:3.7.0
|
||||
container_name: clearml-serving-zookeeper
|
||||
# ports:
|
||||
# - "2181:2181"
|
||||
environment:
|
||||
- ALLOW_ANONYMOUS_LOGIN=yes
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
kafka:
|
||||
image: bitnami/kafka:3.1.0
|
||||
container_name: clearml-serving-kafka
|
||||
# ports:
|
||||
# - "9092:9092"
|
||||
environment:
|
||||
- KAFKA_BROKER_ID=1
|
||||
- KAFKA_CFG_LISTENERS=PLAINTEXT://clearml-serving-kafka:9092
|
||||
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://clearml-serving-kafka:9092
|
||||
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
|
||||
- ALLOW_PLAINTEXT_LISTENER=yes
|
||||
- KAFKA_CREATE_TOPICS="topic_test:1:1"
|
||||
depends_on:
|
||||
- zookeeper
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
prometheus:
|
||||
image: prom/prometheus:v2.34.0
|
||||
container_name: clearml-serving-prometheus
|
||||
volumes:
|
||||
- ./prometheus.yml:/prometheus.yml
|
||||
command:
|
||||
- '--config.file=/prometheus.yml'
|
||||
- '--storage.tsdb.path=/prometheus'
|
||||
- '--web.console.libraries=/etc/prometheus/console_libraries'
|
||||
- '--web.console.templates=/etc/prometheus/consoles'
|
||||
- '--storage.tsdb.retention.time=200h'
|
||||
- '--web.enable-lifecycle'
|
||||
restart: unless-stopped
|
||||
# ports:
|
||||
# - "9090:9090"
|
||||
depends_on:
|
||||
- clearml-serving-statistics
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
alertmanager:
|
||||
image: prom/alertmanager:v0.23.0
|
||||
container_name: clearml-serving-alertmanager
|
||||
restart: unless-stopped
|
||||
# ports:
|
||||
# - "9093:9093"
|
||||
depends_on:
|
||||
- prometheus
|
||||
- grafana
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
grafana:
|
||||
image: grafana/grafana:8.4.4-ubuntu
|
||||
container_name: clearml-serving-grafana
|
||||
volumes:
|
||||
- './datasource.yml:/etc/grafana/provisioning/datasources/datasource.yaml'
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- "3000:3000"
|
||||
depends_on:
|
||||
- prometheus
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
|
||||
clearml-serving-inference:
|
||||
image: allegroai/clearml-serving-inference:latest
|
||||
container_name: clearml-serving-inference
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- "8080:8080"
|
||||
environment:
|
||||
CLEARML_WEB_HOST: ${CLEARML_WEB_HOST:-https://app.clear.ml}
|
||||
CLEARML_API_HOST: ${CLEARML_API_HOST:-"https://api.clear.ml"}
|
||||
CLEARML_FILES_HOST: ${CLEARML_FILES_HOST:-"https://files.clear.ml"}
|
||||
CLEARML_API_ACCESS_KEY: ${CLEARML_API_ACCESS_KEY:-}
|
||||
CLEARML_API_SECRET_KEY: ${CLEARML_API_SECRET_KEY:-}
|
||||
CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-}
|
||||
CLEARML_SERVING_PORT: ${CLEARML_SERVING_PORT:-8080}
|
||||
CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1}
|
||||
CLEARML_DEFAULT_BASE_SERVE_URL: ${CLEARML_DEFAULT_BASE_SERVE_URL:-"http://127.0.0.1:8080/serve"}
|
||||
CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-"clearml-serving-kafka:9092"}
|
||||
CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-"clearml-serving-triton:8001"}
|
||||
CLEARML_USE_GUNICORN: ${CLEARML_USE_GUNICORN}
|
||||
CLEARML_SERVING_NUM_PROCESS: ${CLEARML_SERVING_NUM_PROCESS}
|
||||
depends_on:
|
||||
- kafka
|
||||
- clearml-serving-triton
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
clearml-serving-triton:
|
||||
image: allegroai/clearml-serving-triton:latest
|
||||
container_name: clearml-serving-triton
|
||||
restart: unless-stopped
|
||||
# ports:
|
||||
# - "8001:8001"
|
||||
environment:
|
||||
CLEARML_WEB_HOST: ${CLEARML_WEB_HOST:-"https://app.clear.ml"}
|
||||
CLEARML_API_HOST: ${CLEARML_API_HOST:-"https://api.clear.ml"}
|
||||
CLEARML_FILES_HOST: ${CLEARML_FILES_HOST:-"https://files.clear.ml"}
|
||||
CLEARML_API_ACCESS_KEY: ${CLEARML_API_ACCESS_KEY:-}
|
||||
CLEARML_API_SECRET_KEY: ${CLEARML_API_SECRET_KEY:-}
|
||||
CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-}
|
||||
CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1}
|
||||
CLEARML_TRITON_METRIC_FREQ: $CLEARML_TRITON_METRIC_FREQ:-1}
|
||||
depends_on:
|
||||
- kafka
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
deploy:
|
||||
resources:
|
||||
reservations:
|
||||
devices:
|
||||
- capabilities: [gpu]
|
||||
|
||||
clearml-serving-statistics:
|
||||
image: allegroai/clearml-serving-statistics:latest
|
||||
container_name: clearml-serving-statistics
|
||||
restart: unless-stopped
|
||||
# ports:
|
||||
# - "9999:9999"
|
||||
environment:
|
||||
CLEARML_WEB_HOST: ${CLEARML_WEB_HOST:-"https://app.clear.ml"}
|
||||
CLEARML_API_HOST: ${CLEARML_API_HOST:-"https://api.clear.ml"}
|
||||
CLEARML_FILES_HOST: ${CLEARML_FILES_HOST:-"https://files.clear.ml"}
|
||||
CLEARML_API_ACCESS_KEY: ${CLEARML_API_ACCESS_KEY:-}
|
||||
CLEARML_API_SECRET_KEY: ${CLEARML_API_SECRET_KEY:-}
|
||||
CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-}
|
||||
CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-"clearml-serving-kafka:9092"}
|
||||
CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1}
|
||||
depends_on:
|
||||
- kafka
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
|
||||
networks:
|
||||
clearml-serving-backend:
|
||||
driver: bridge
|
146
docker/docker-compose-triton.yml
Normal file
146
docker/docker-compose-triton.yml
Normal file
@ -0,0 +1,146 @@
|
||||
version: "3"
|
||||
|
||||
services:
|
||||
zookeeper:
|
||||
image: bitnami/zookeeper:3.7.0
|
||||
container_name: clearml-serving-zookeeper
|
||||
# ports:
|
||||
# - "2181:2181"
|
||||
environment:
|
||||
- ALLOW_ANONYMOUS_LOGIN=yes
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
kafka:
|
||||
image: bitnami/kafka:3.1.0
|
||||
container_name: clearml-serving-kafka
|
||||
# ports:
|
||||
# - "9092:9092"
|
||||
environment:
|
||||
- KAFKA_BROKER_ID=1
|
||||
- KAFKA_CFG_LISTENERS=PLAINTEXT://clearml-serving-kafka:9092
|
||||
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://clearml-serving-kafka:9092
|
||||
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
|
||||
- ALLOW_PLAINTEXT_LISTENER=yes
|
||||
- KAFKA_CREATE_TOPICS="topic_test:1:1"
|
||||
depends_on:
|
||||
- zookeeper
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
prometheus:
|
||||
image: prom/prometheus:v2.34.0
|
||||
container_name: clearml-serving-prometheus
|
||||
volumes:
|
||||
- ./prometheus.yml:/prometheus.yml
|
||||
command:
|
||||
- '--config.file=/prometheus.yml'
|
||||
- '--storage.tsdb.path=/prometheus'
|
||||
- '--web.console.libraries=/etc/prometheus/console_libraries'
|
||||
- '--web.console.templates=/etc/prometheus/consoles'
|
||||
- '--storage.tsdb.retention.time=200h'
|
||||
- '--web.enable-lifecycle'
|
||||
restart: unless-stopped
|
||||
# ports:
|
||||
# - "9090:9090"
|
||||
depends_on:
|
||||
- clearml-serving-statistics
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
alertmanager:
|
||||
image: prom/alertmanager:v0.23.0
|
||||
container_name: clearml-serving-alertmanager
|
||||
restart: unless-stopped
|
||||
# ports:
|
||||
# - "9093:9093"
|
||||
depends_on:
|
||||
- prometheus
|
||||
- grafana
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
grafana:
|
||||
image: grafana/grafana:8.4.4-ubuntu
|
||||
container_name: clearml-serving-grafana
|
||||
volumes:
|
||||
- './datasource.yml:/etc/grafana/provisioning/datasources/datasource.yaml'
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- "3000:3000"
|
||||
depends_on:
|
||||
- prometheus
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
|
||||
clearml-serving-inference:
|
||||
image: allegroai/clearml-serving-inference:latest
|
||||
container_name: clearml-serving-inference
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- "8080:8080"
|
||||
environment:
|
||||
CLEARML_WEB_HOST: ${CLEARML_WEB_HOST:-https://app.clear.ml}
|
||||
CLEARML_API_HOST: ${CLEARML_API_HOST:-"https://api.clear.ml"}
|
||||
CLEARML_FILES_HOST: ${CLEARML_FILES_HOST:-"https://files.clear.ml"}
|
||||
CLEARML_API_ACCESS_KEY: ${CLEARML_API_ACCESS_KEY:-}
|
||||
CLEARML_API_SECRET_KEY: ${CLEARML_API_SECRET_KEY:-}
|
||||
CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-}
|
||||
CLEARML_SERVING_PORT: ${CLEARML_SERVING_PORT:-8080}
|
||||
CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1}
|
||||
CLEARML_DEFAULT_BASE_SERVE_URL: ${CLEARML_DEFAULT_BASE_SERVE_URL:-"http://127.0.0.1:8080/serve"}
|
||||
CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-"clearml-serving-kafka:9092"}
|
||||
CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-"clearml-serving-triton:8001"}
|
||||
CLEARML_USE_GUNICORN: ${CLEARML_USE_GUNICORN}
|
||||
CLEARML_SERVING_NUM_PROCESS: ${CLEARML_SERVING_NUM_PROCESS}
|
||||
depends_on:
|
||||
- kafka
|
||||
- clearml-serving-triton
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
clearml-serving-triton:
|
||||
image: allegroai/clearml-serving-triton:latest
|
||||
container_name: clearml-serving-triton
|
||||
restart: unless-stopped
|
||||
# ports:
|
||||
# - "8001:8001"
|
||||
environment:
|
||||
CLEARML_WEB_HOST: ${CLEARML_WEB_HOST:-"https://app.clear.ml"}
|
||||
CLEARML_API_HOST: ${CLEARML_API_HOST:-"https://api.clear.ml"}
|
||||
CLEARML_FILES_HOST: ${CLEARML_FILES_HOST:-"https://files.clear.ml"}
|
||||
CLEARML_API_ACCESS_KEY: ${CLEARML_API_ACCESS_KEY:-}
|
||||
CLEARML_API_SECRET_KEY: ${CLEARML_API_SECRET_KEY:-}
|
||||
CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-}
|
||||
CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1}
|
||||
CLEARML_TRITON_METRIC_FREQ: $CLEARML_TRITON_METRIC_FREQ:-1}
|
||||
depends_on:
|
||||
- kafka
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
clearml-serving-statistics:
|
||||
image: allegroai/clearml-serving-statistics:latest
|
||||
container_name: clearml-serving-statistics
|
||||
restart: unless-stopped
|
||||
# ports:
|
||||
# - "9999:9999"
|
||||
environment:
|
||||
CLEARML_WEB_HOST: ${CLEARML_WEB_HOST:-"https://app.clear.ml"}
|
||||
CLEARML_API_HOST: ${CLEARML_API_HOST:-"https://api.clear.ml"}
|
||||
CLEARML_FILES_HOST: ${CLEARML_FILES_HOST:-"https://files.clear.ml"}
|
||||
CLEARML_API_ACCESS_KEY: ${CLEARML_API_ACCESS_KEY:-}
|
||||
CLEARML_API_SECRET_KEY: ${CLEARML_API_SECRET_KEY:-}
|
||||
CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-}
|
||||
CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-"clearml-serving-kafka:9092"}
|
||||
CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1}
|
||||
depends_on:
|
||||
- kafka
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
|
||||
networks:
|
||||
clearml-serving-backend:
|
||||
driver: bridge
|
125
docker/docker-compose.yml
Normal file
125
docker/docker-compose.yml
Normal file
@ -0,0 +1,125 @@
|
||||
version: "3"
|
||||
|
||||
services:
|
||||
zookeeper:
|
||||
image: bitnami/zookeeper:3.7.0
|
||||
container_name: clearml-serving-zookeeper
|
||||
# ports:
|
||||
# - "2181:2181"
|
||||
environment:
|
||||
- ALLOW_ANONYMOUS_LOGIN=yes
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
kafka:
|
||||
image: bitnami/kafka:3.1.0
|
||||
container_name: clearml-serving-kafka
|
||||
# ports:
|
||||
# - "9092:9092"
|
||||
environment:
|
||||
- KAFKA_BROKER_ID=1
|
||||
- KAFKA_CFG_LISTENERS=PLAINTEXT://clearml-serving-kafka:9092
|
||||
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://clearml-serving-kafka:9092
|
||||
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
|
||||
- ALLOW_PLAINTEXT_LISTENER=yes
|
||||
- KAFKA_CREATE_TOPICS="topic_test:1:1"
|
||||
depends_on:
|
||||
- zookeeper
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
prometheus:
|
||||
image: prom/prometheus:v2.34.0
|
||||
container_name: clearml-serving-prometheus
|
||||
volumes:
|
||||
- ./prometheus.yml:/prometheus.yml
|
||||
command:
|
||||
- '--config.file=/prometheus.yml'
|
||||
- '--storage.tsdb.path=/prometheus'
|
||||
- '--web.console.libraries=/etc/prometheus/console_libraries'
|
||||
- '--web.console.templates=/etc/prometheus/consoles'
|
||||
- '--storage.tsdb.retention.time=200h'
|
||||
- '--web.enable-lifecycle'
|
||||
restart: unless-stopped
|
||||
# ports:
|
||||
# - "9090:9090"
|
||||
depends_on:
|
||||
- clearml-serving-statistics
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
alertmanager:
|
||||
image: prom/alertmanager:v0.23.0
|
||||
container_name: clearml-serving-alertmanager
|
||||
restart: unless-stopped
|
||||
# ports:
|
||||
# - "9093:9093"
|
||||
depends_on:
|
||||
- prometheus
|
||||
- grafana
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
grafana:
|
||||
image: grafana/grafana:8.4.4-ubuntu
|
||||
container_name: clearml-serving-grafana
|
||||
volumes:
|
||||
- './datasource.yml:/etc/grafana/provisioning/datasources/datasource.yaml'
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- "3000:3000"
|
||||
depends_on:
|
||||
- prometheus
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
|
||||
clearml-serving-inference:
|
||||
image: allegroai/clearml-serving-inference:latest
|
||||
container_name: clearml-serving-inference
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- "8080:8080"
|
||||
environment:
|
||||
CLEARML_WEB_HOST: ${CLEARML_WEB_HOST:-https://app.clear.ml}
|
||||
CLEARML_API_HOST: ${CLEARML_API_HOST:-"https://api.clear.ml"}
|
||||
CLEARML_FILES_HOST: ${CLEARML_FILES_HOST:-"https://files.clear.ml"}
|
||||
CLEARML_API_ACCESS_KEY: ${CLEARML_API_ACCESS_KEY:-}
|
||||
CLEARML_API_SECRET_KEY: ${CLEARML_API_SECRET_KEY:-}
|
||||
CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-}
|
||||
CLEARML_SERVING_PORT: ${CLEARML_SERVING_PORT:-8080}
|
||||
CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1}
|
||||
CLEARML_DEFAULT_BASE_SERVE_URL: ${CLEARML_DEFAULT_BASE_SERVE_URL:-"http://127.0.0.1:8080/serve"}
|
||||
CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-"clearml-serving-kafka:9092"}
|
||||
CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR}
|
||||
CLEARML_USE_GUNICORN: ${CLEARML_USE_GUNICORN}
|
||||
CLEARML_SERVING_NUM_PROCESS: ${CLEARML_SERVING_NUM_PROCESS}
|
||||
depends_on:
|
||||
- kafka
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
clearml-serving-statistics:
|
||||
image: allegroai/clearml-serving-statistics:latest
|
||||
container_name: clearml-serving-statistics
|
||||
restart: unless-stopped
|
||||
# ports:
|
||||
# - "9999:9999"
|
||||
environment:
|
||||
CLEARML_WEB_HOST: ${CLEARML_WEB_HOST:-"https://app.clear.ml"}
|
||||
CLEARML_API_HOST: ${CLEARML_API_HOST:-"https://api.clear.ml"}
|
||||
CLEARML_FILES_HOST: ${CLEARML_FILES_HOST:-"https://files.clear.ml"}
|
||||
CLEARML_API_ACCESS_KEY: ${CLEARML_API_ACCESS_KEY:-}
|
||||
CLEARML_API_SECRET_KEY: ${CLEARML_API_SECRET_KEY:-}
|
||||
CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-}
|
||||
CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-"clearml-serving-kafka:9092"}
|
||||
CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1}
|
||||
depends_on:
|
||||
- kafka
|
||||
networks:
|
||||
- clearml-serving-backend
|
||||
|
||||
|
||||
networks:
|
||||
clearml-serving-backend:
|
||||
driver: bridge
|
6
docker/example.env
Normal file
6
docker/example.env
Normal file
@ -0,0 +1,6 @@
|
||||
CLEARML_WEB_HOST="https://app.clear.ml"
|
||||
CLEARML_API_HOST="https://api.clear.ml"
|
||||
CLEARML_FILES_HOST="https://files.clear.ml"
|
||||
CLEARML_API_ACCESS_KEY="<access_key_here>"
|
||||
CLEARML_API_SECRET_KEY="<secret_key_here>"
|
||||
CLEARML_SERVING_TASK_ID="<serving_service_id_here>"
|
22
docker/prometheus.yml
Normal file
22
docker/prometheus.yml
Normal file
@ -0,0 +1,22 @@
|
||||
global:
|
||||
scrape_interval: 15s # By default, scrape targets every 15 seconds.
|
||||
evaluation_interval: 15s # By default, scrape targets every 15 seconds.
|
||||
external_labels:
|
||||
monitor: 'clearml-serving'
|
||||
|
||||
scrape_configs:
|
||||
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
|
||||
- job_name: 'prometheus'
|
||||
|
||||
scrape_interval: 5s
|
||||
|
||||
static_configs:
|
||||
- targets: ['localhost:9090']
|
||||
|
||||
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
|
||||
- job_name: 'clearml-inference-stats'
|
||||
|
||||
scrape_interval: 5s
|
||||
|
||||
static_configs:
|
||||
- targets: ['clearml-serving-statistics:9999']
|
@ -9,11 +9,11 @@ class Preprocess(object):
|
||||
# set internal state, this will be called only once. (i.e. not per request)
|
||||
pass
|
||||
|
||||
def preprocess(self, body: dict) -> Any:
|
||||
def preprocess(self, body: dict, collect_custom_statistics_fn=None) -> Any:
|
||||
# we expect to get two valid on the dict x0, and x1
|
||||
return [[body.get("x0", None), body.get("x1", None)], ]
|
||||
|
||||
def postprocess(self, data: Any) -> dict:
|
||||
def postprocess(self, data: Any, collect_custom_statistics_fn=None) -> dict:
|
||||
# post process the data returned from the model inference engine
|
||||
# data is the return value from model.predict we will put is inside a return value as Y
|
||||
return dict(y=data.tolist() if isinstance(data, np.ndarray) else data)
|
||||
|
@ -29,3 +29,4 @@ Or add Canary endpoint
|
||||
|
||||
> **_Notice:_** You can also change the serving service while it is already running!
|
||||
This includes adding/removing endpoints, adding canary model routing etc.
|
||||
by default new endpoints/models will be automatically updated after 1 minute
|
||||
|
@ -13,7 +13,7 @@ class Preprocess(object):
|
||||
# set internal state, this will be called only once. (i.e. not per request)
|
||||
pass
|
||||
|
||||
def preprocess(self, body: dict) -> Any:
|
||||
def preprocess(self, body: dict, collect_custom_statistics_fn=None) -> Any:
|
||||
# we expect to get two valid on the dict x0, and x1
|
||||
url = body.get("url")
|
||||
if not url:
|
||||
@ -25,7 +25,7 @@ class Preprocess(object):
|
||||
|
||||
return np.array(image).flatten()
|
||||
|
||||
def postprocess(self, data: Any) -> dict:
|
||||
def postprocess(self, data: Any, collect_custom_statistics_fn=None) -> dict:
|
||||
# post process the data returned from the model inference engine
|
||||
# data is the return value from model.predict we will put is inside a return value as Y
|
||||
if not isinstance(data, np.ndarray):
|
||||
|
@ -12,6 +12,8 @@ The output will be a model created on the project "serving examples", by the nam
|
||||
|
||||
## setting up the serving service
|
||||
|
||||
Prerequisites, Keras/Tensorflow models require Triton engine support, please use `docker-compose-triton.yml` / `docker-compose-triton-gpu.yml` or if running on Kubernetes, the matching helm chart.
|
||||
|
||||
1. Create serving Service: `clearml-serving create --name "serving example"` (write down the service ID)
|
||||
2. Create model endpoint:
|
||||
|
||||
@ -36,3 +38,4 @@ Or add Canary endpoint
|
||||
|
||||
> **_Notice:_** You can also change the serving service while it is already running!
|
||||
This includes adding/removing endpoints, adding canary model routing etc.
|
||||
by default new endpoints/models will be automatically updated after 1 minute
|
@ -9,14 +9,14 @@ class Preprocess(object):
|
||||
# set internal state, this will be called only once. (i.e. not per request)
|
||||
pass
|
||||
|
||||
def preprocess(self, body: dict) -> Any:
|
||||
def preprocess(self, body: dict, collect_custom_statistics_fn=None) -> Any:
|
||||
# we expect to get four valid numbers on the dict: x0, x1, x2, x3
|
||||
return np.array(
|
||||
[[body.get("x0", None), body.get("x1", None), body.get("x2", None), body.get("x3", None)], ],
|
||||
dtype=np.float32
|
||||
)
|
||||
|
||||
def postprocess(self, data: Any) -> dict:
|
||||
def postprocess(self, data: Any, collect_custom_statistics_fn=None) -> dict:
|
||||
# post process the data returned from the model inference engine
|
||||
# data is the return value from model.predict we will put is inside a return value as Y
|
||||
# we pick the most probably class and return the class index (argmax)
|
||||
|
@ -7,14 +7,14 @@ class Preprocess(object):
|
||||
# set internal state, this will be called only once. (i.e. not per request)
|
||||
pass
|
||||
|
||||
def postprocess(self, data: List[dict]) -> dict:
|
||||
def postprocess(self, data: List[dict], collect_custom_statistics_fn=None) -> dict:
|
||||
# we will here average the results and return the new value
|
||||
# assume data is a list of dicts greater than 1
|
||||
|
||||
# average result
|
||||
return dict(y=0.5 * data[0]['y'][0] + 0.5 * data[1]['y'][0])
|
||||
|
||||
def process(self, data: Any) -> Any:
|
||||
def process(self, data: Any, collect_custom_statistics_fn=None) -> Any:
|
||||
"""
|
||||
do something with the actual data, return any type of object.
|
||||
The returned object will be passed as is to the postprocess function engine
|
||||
|
@ -24,3 +24,4 @@ Training a scikit-learn model (see example/sklearn)
|
||||
|
||||
> **_Notice:_** You can also change the serving service while it is already running!
|
||||
This includes adding/removing endpoints, adding canary model routing etc.
|
||||
by default new endpoints/models will be automatically updated after 1 minute
|
||||
|
@ -13,7 +13,7 @@ class Preprocess(object):
|
||||
# set internal state, this will be called only once. (i.e. not per request)
|
||||
pass
|
||||
|
||||
def preprocess(self, body: dict) -> Any:
|
||||
def preprocess(self, body: dict, collect_custom_statistics_fn=None) -> Any:
|
||||
# we expect to get two valid on the dict x0, and x1
|
||||
url = body.get("url")
|
||||
if not url:
|
||||
@ -24,7 +24,7 @@ class Preprocess(object):
|
||||
image = ImageOps.grayscale(image).resize((28, 28))
|
||||
return np.array(image).flatten()
|
||||
|
||||
def postprocess(self, data: Any) -> dict:
|
||||
def postprocess(self, data: Any, collect_custom_statistics_fn=None) -> dict:
|
||||
# post process the data returned from the model inference engine
|
||||
# data is the return value from model.predict we will put is inside a return value as Y
|
||||
if not isinstance(data, np.ndarray):
|
||||
|
@ -13,6 +13,9 @@ The output will be a model created on the project "serving examples", by the nam
|
||||
|
||||
## setting up the serving service
|
||||
|
||||
|
||||
Prerequisites, PyTorch models require Triton engine support, please use `docker-compose-triton.yml` / `docker-compose-triton-gpu.yml` or if running on Kubernetes, the matching helm chart.
|
||||
|
||||
1. Create serving Service: `clearml-serving create --name "serving example"` (write down the service ID)
|
||||
2. Create model endpoint:
|
||||
|
||||
@ -39,4 +42,4 @@ Or add Canary endpoint
|
||||
|
||||
> **_Notice:_** You can also change the serving service while it is already running!
|
||||
This includes adding/removing endpoints, adding canary model routing etc.
|
||||
|
||||
by default new endpoints/models will be automatically updated after 1 minute
|
||||
|
@ -9,11 +9,11 @@ class Preprocess(object):
|
||||
# set internal state, this will be called only once. (i.e. not per request)
|
||||
pass
|
||||
|
||||
def preprocess(self, body: dict) -> Any:
|
||||
def preprocess(self, body: dict, collect_custom_statistics_fn=None) -> Any:
|
||||
# we expect to get two valid on the dict x0, and x1
|
||||
return [[body.get("x0", None), body.get("x1", None)], ]
|
||||
|
||||
def postprocess(self, data: Any) -> dict:
|
||||
def postprocess(self, data: Any, collect_custom_statistics_fn=None) -> dict:
|
||||
# post process the data returned from the model inference engine
|
||||
# data is the return value from model.predict we will put is inside a return value as Y
|
||||
return dict(y=data.tolist() if isinstance(data, np.ndarray) else data)
|
||||
|
@ -29,3 +29,4 @@ Or add Canary endpoint
|
||||
|
||||
> **_Notice:_** You can also change the serving service while it is already running!
|
||||
This includes adding/removing endpoints, adding canary model routing etc.
|
||||
by default new endpoints/models will be automatically updated after 1 minute
|
||||
|
@ -10,12 +10,12 @@ class Preprocess(object):
|
||||
# set internal state, this will be called only once. (i.e. not per request)
|
||||
pass
|
||||
|
||||
def preprocess(self, body: dict) -> Any:
|
||||
def preprocess(self, body: dict, collect_custom_statistics_fn=None) -> Any:
|
||||
# we expect to get four valid numbers on the dict: x0, x1, x2, x3
|
||||
return xgb.DMatrix(
|
||||
[[body.get("x0", None), body.get("x1", None), body.get("x2", None), body.get("x3", None)]])
|
||||
|
||||
def postprocess(self, data: Any) -> dict:
|
||||
def postprocess(self, data: Any, collect_custom_statistics_fn=None) -> dict:
|
||||
# post process the data returned from the model inference engine
|
||||
# data is the return value from model.predict we will put is inside a return value as Y
|
||||
return dict(y=data.tolist() if isinstance(data, np.ndarray) else data)
|
||||
|
5
setup.py
5
setup.py
@ -39,8 +39,8 @@ setup(
|
||||
long_description_content_type='text/markdown',
|
||||
# The project's main homepage.
|
||||
url='https://github.com/allegroai/clearml-serving.git',
|
||||
author='Allegroai',
|
||||
author_email='clearml@allegro.ai',
|
||||
author='ClearML',
|
||||
author_email='support@clear.ml',
|
||||
license='Apache License 2.0',
|
||||
classifiers=[
|
||||
'Development Status :: 4 - Beta',
|
||||
@ -54,7 +54,6 @@ setup(
|
||||
'Topic :: Software Development :: Version Control',
|
||||
'Topic :: System :: Logging',
|
||||
'Topic :: System :: Monitoring',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
'Programming Language :: Python :: 3.6',
|
||||
'Programming Language :: Python :: 3.7',
|
||||
'Programming Language :: Python :: 3.8',
|
||||
|
Loading…
Reference in New Issue
Block a user