From 09ed480bc21b909fbc8ae261efca456d95ca2f7d Mon Sep 17 00:00:00 2001 From: Aleksandar Ivanovski Date: Thu, 6 Oct 2022 13:31:54 +0200 Subject: [PATCH 1/4] [DEV] feature/bytes-payload | Add bytes as payload --- clearml_serving/serving/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clearml_serving/serving/main.py b/clearml_serving/serving/main.py index 369aa4b..88b9ddc 100644 --- a/clearml_serving/serving/main.py +++ b/clearml_serving/serving/main.py @@ -5,7 +5,7 @@ import gzip from fastapi import FastAPI, Request, Response, APIRouter, HTTPException from fastapi.routing import APIRoute -from typing import Optional, Dict, Any, Callable +from typing import Optional, Dict, Any, Callable, Union from clearml import Task from clearml_serving.version import __version__ @@ -87,7 +87,7 @@ router = APIRouter( @router.post("/{model_id}/{version}") @router.post("/{model_id}/") @router.post("/{model_id}") -async def serve_model(model_id: str, version: Optional[str] = None, request: Dict[Any, Any] = None): +async def serve_model(model_id: str, version: Optional[str] = None, request: Union[bytes, Dict[Any, Any]] = None): try: return_value = processor.process_request( base_url=model_id, From 2aa91a3d438e4c9ba68271e3b11addec0f41254e Mon Sep 17 00:00:00 2001 From: Aleksandar Ivanovski Date: Thu, 6 Oct 2022 15:13:36 +0200 Subject: [PATCH 2/4] [DEV] feature/bytes-payload | Handle keys when req is bytes --- clearml_serving/serving/model_request_processor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clearml_serving/serving/model_request_processor.py b/clearml_serving/serving/model_request_processor.py index 084ee1a..67238fc 100644 --- a/clearml_serving/serving/model_request_processor.py +++ b/clearml_serving/serving/model_request_processor.py @@ -9,6 +9,7 @@ from typing import Optional, Union, Dict, List import itertools import threading from multiprocessing import Lock +from numpy import isin from numpy.random import choice from clearml import Task, Model @@ -1167,7 +1168,7 @@ class ModelRequestProcessor(object): if metric_endpoint: metric_keys = set(metric_endpoint.metrics.keys()) # collect inputs - if body: + if body and isinstance(body, dict): keys = set(body.keys()) & metric_keys stats.update({k: body[k] for k in keys}) # collect outputs From d89d1370d831333f39ec506772269c7ea4a0a1df Mon Sep 17 00:00:00 2001 From: Aleksandar Ivanovski Date: Thu, 6 Oct 2022 16:01:31 +0200 Subject: [PATCH 3/4] [DEV] feature/bytes-payload | Add typing --- clearml_serving/preprocess/preprocess_template.py | 10 +++++----- clearml_serving/serving/model_request_processor.py | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/clearml_serving/preprocess/preprocess_template.py b/clearml_serving/preprocess/preprocess_template.py index 6d07ae7..1b7029b 100644 --- a/clearml_serving/preprocess/preprocess_template.py +++ b/clearml_serving/preprocess/preprocess_template.py @@ -1,4 +1,4 @@ -from typing import Any, Optional, Callable +from typing import Any, Optional, Callable, Union # Preprocess class Must be named "Preprocess" @@ -12,8 +12,8 @@ class Preprocess(object): Notice the execution flows is synchronous as follows: - 1. RestAPI(...) -> body: dict - 2. preprocess(body: dict, ...) -> data: Any + 1. RestAPI(...) -> body: Union[bytes, dict] + 2. preprocess(body: Union[bytes, dict], ...) -> data: Any 3. process(data: Any, ...) -> data: Any 4. postprocess(data: Any, ...) -> result: dict 5. RestAPI(result: dict) -> returned request @@ -35,7 +35,7 @@ class Preprocess(object): def preprocess( self, - body: dict, + body: Union[bytes, dict], state: dict, collect_custom_statistics_fn: Optional[Callable[[dict], None]], ) -> Any: # noqa @@ -43,7 +43,7 @@ class Preprocess(object): Optional: do something with the request data, return any type of object. The returned object will be passed as is to the inference engine - :param body: dictionary as recieved from the RestAPI + :param body: dictionary or bytes as recieved from the RestAPI :param state: Use state dict to store data passed to the post-processing function call. This is a per-request state dict (meaning a new empty dict will be passed per request) Usage example: diff --git a/clearml_serving/serving/model_request_processor.py b/clearml_serving/serving/model_request_processor.py index 67238fc..3e932ec 100644 --- a/clearml_serving/serving/model_request_processor.py +++ b/clearml_serving/serving/model_request_processor.py @@ -124,7 +124,7 @@ class ModelRequestProcessor(object): self._serving_base_url = None self._metric_log_freq = None - def process_request(self, base_url: str, version: str, request_body: dict) -> dict: + def process_request(self, base_url: str, version: str, request_body: Union[dict, bytes]) -> dict: """ Process request coming in, Raise Value error if url does not match existing endpoints @@ -1133,7 +1133,7 @@ class ModelRequestProcessor(object): # update preprocessing classes BasePreprocessRequest.set_server_config(self._configuration) - def _process_request(self, processor: BasePreprocessRequest, url: str, body: dict) -> dict: + def _process_request(self, processor: BasePreprocessRequest, url: str, body: Union[bytes, dict]) -> dict: # collect statistics for this request stats_collect_fn = None collect_stats = False From 29c9732e8e94064d44b29bd750a439c04516554f Mon Sep 17 00:00:00 2001 From: Aleksandar Ivanovski Date: Fri, 7 Oct 2022 09:19:17 +0200 Subject: [PATCH 4/4] [DEV] feature/bytes-payload | Add examples --- examples/keras/preprocess.py | 23 +++++++++++++++-------- examples/pytorch/preprocess.py | 24 ++++++++++++++++-------- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/examples/keras/preprocess.py b/examples/keras/preprocess.py index b10b693..1738ea8 100644 --- a/examples/keras/preprocess.py +++ b/examples/keras/preprocess.py @@ -1,4 +1,5 @@ -from typing import Any +import io +from typing import Any, Union import numpy as np from PIL import Image, ImageOps @@ -13,16 +14,22 @@ class Preprocess(object): # set internal state, this will be called only once. (i.e. not per request) pass - def preprocess(self, body: dict, state: dict, collect_custom_statistics_fn=None) -> Any: + def preprocess(self, body: Union[bytes, dict], state: dict, collect_custom_statistics_fn=None) -> Any: # we expect to get two valid on the dict x0, and x1 - url = body.get("url") - if not url: - raise ValueError("'url' entry not provided, expected http/s link to image") + if isinstance(body, bytes): + # we expect to get a stream of encoded image bytes + try: + image = Image.open(io.BytesIO(body)).convert("RGB") + except Exception: + raise ValueError("Image could not be decoded") - local_file = StorageManager.get_local_copy(remote_url=url) - image = Image.open(local_file) + if isinstance(body, dict) and "url" in body.keys(): + # image is given as url, and is fetched + url = body.get("url") + local_file = StorageManager.get_local_copy(remote_url=url) + image = Image.open(local_file) + image = ImageOps.grayscale(image).resize((28, 28)) - return np.array([np.array(image).flatten()]) def postprocess(self, data: Any, state: dict, collect_custom_statistics_fn=None) -> dict: diff --git a/examples/pytorch/preprocess.py b/examples/pytorch/preprocess.py index 2fd1581..1738ea8 100644 --- a/examples/pytorch/preprocess.py +++ b/examples/pytorch/preprocess.py @@ -1,4 +1,5 @@ -from typing import Any +import io +from typing import Any, Union import numpy as np from PIL import Image, ImageOps @@ -13,16 +14,23 @@ class Preprocess(object): # set internal state, this will be called only once. (i.e. not per request) pass - def preprocess(self, body: dict, state: dict, collect_custom_statistics_fn=None) -> Any: + def preprocess(self, body: Union[bytes, dict], state: dict, collect_custom_statistics_fn=None) -> Any: # we expect to get two valid on the dict x0, and x1 - url = body.get("url") - if not url: - raise ValueError("'url' entry not provided, expected http/s link to image") + if isinstance(body, bytes): + # we expect to get a stream of encoded image bytes + try: + image = Image.open(io.BytesIO(body)).convert("RGB") + except Exception: + raise ValueError("Image could not be decoded") - local_file = StorageManager.get_local_copy(remote_url=url) - image = Image.open(local_file) + if isinstance(body, dict) and "url" in body.keys(): + # image is given as url, and is fetched + url = body.get("url") + local_file = StorageManager.get_local_copy(remote_url=url) + image = Image.open(local_file) + image = ImageOps.grayscale(image).resize((28, 28)) - return np.array([np.array(image)]) + return np.array([np.array(image).flatten()]) def postprocess(self, data: Any, state: dict, collect_custom_statistics_fn=None) -> dict: # post process the data returned from the model inference engine