From 64daef23ba9ffaf111dd223126d4db28d1c942ab Mon Sep 17 00:00:00 2001 From: Meshcheryakov Ilya Date: Wed, 29 May 2024 21:18:39 +0300 Subject: [PATCH] initial commit --- clearml_serving/serving/Dockerfile | 2 +- clearml_serving/serving/entrypoint.sh | 2 +- clearml_serving/serving/main.py | 84 +++++++++++++++++-- .../serving/model_request_processor.py | 19 +++-- clearml_serving/serving/preprocess_service.py | 15 +++- clearml_serving/statistics/entrypoint.sh | 2 +- 6 files changed, 104 insertions(+), 20 deletions(-) diff --git a/clearml_serving/serving/Dockerfile b/clearml_serving/serving/Dockerfile index bd817ea..a2d6a47 100644 --- a/clearml_serving/serving/Dockerfile +++ b/clearml_serving/serving/Dockerfile @@ -4,7 +4,7 @@ FROM python:3.11-bullseye ENV LC_ALL=C.UTF-8 # install base package -RUN pip3 install --no-cache-dir clearml-serving +# RUN pip3 install --no-cache-dir clearml-serving # get latest execution code from the git repository # RUN cd $HOME && git clone https://github.com/allegroai/clearml-serving.git diff --git a/clearml_serving/serving/entrypoint.sh b/clearml_serving/serving/entrypoint.sh index a5efea1..0e51dda 100755 --- a/clearml_serving/serving/entrypoint.sh +++ b/clearml_serving/serving/entrypoint.sh @@ -18,7 +18,7 @@ UVICORN_SERVE_LOOP="${UVICORN_SERVE_LOOP:-uvloop}" UVICORN_LOG_LEVEL="${UVICORN_LOG_LEVEL:-warning}" # set default internal serve endpoint (for request pipelining) -CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/serve}" +CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/clearml}" CLEARML_DEFAULT_TRITON_GRPC_ADDR="${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-127.0.0.1:8001}" # print configuration diff --git a/clearml_serving/serving/main.py b/clearml_serving/serving/main.py index 98c9f21..c3c5f7c 100644 --- a/clearml_serving/serving/main.py +++ b/clearml_serving/serving/main.py @@ -92,7 +92,7 @@ async def cuda_exception_handler(request, exc): router = APIRouter( - prefix="/serve", + prefix="/clearml", tags=["models"], responses={404: {"description": "Model Serving Endpoint Not found"}}, route_class=GzipRoute, # mark-out to remove support for GZip content encoding @@ -100,15 +100,49 @@ router = APIRouter( # cover all routing options for model version `/{model_id}`, `/{model_id}/123`, `/{model_id}?version=123` -@router.post("/{model_id}/{version}") -@router.post("/{model_id}/") -@router.post("/{model_id}") -async def serve_model(model_id: str, version: Optional[str] = None, request: Union[bytes, Dict[Any, Any]] = None): +# @router.post("/{model_id}/{version}") +# @router.post("/{model_id}/") +# @router.post("/{model_id}") +# async def serve_model(model_id: str, version: Optional[str] = None, request: Union[bytes, Dict[Any, Any]] = None): +# try: +# return_value = await processor.process_request( +# base_url=model_id, +# version=version, +# request_body=request +# ) +# except EndpointNotFoundException as ex: +# raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex)) +# except (EndpointModelLoadException, EndpointBackendEngineException) as ex: +# session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( +# instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) +# raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex)) +# except ServingInitializationException as ex: +# session_logger.report_text("[{}] Exception [{}] {} while loading serving inference: {}\n{}".format( +# instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) +# raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex)) +# except ValueError as ex: +# session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( +# instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) +# if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex): +# raise CUDAException(exception=ex) +# else: +# raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex)) +# except Exception as ex: +# session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( +# instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) +# raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex)) +# return return_value + + +@router.post("/{model_id}/v1/chat/completions") +@router.post("/{model_id}/v1/chat/completions/") +async def serve_model(model_id: str, request: Union[bytes, Dict[Any, Any]] = None): try: return_value = await processor.process_request( base_url=model_id, - version=version, - request_body=request + version=None, + request_body=request, + url_type="chat_completion" ) except EndpointNotFoundException as ex: raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex)) @@ -123,7 +157,41 @@ async def serve_model(model_id: str, version: Optional[str] = None, request: Uni except ValueError as ex: session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) - if "CUDA out of memory. " in str(ex): + if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex): + raise CUDAException(exception=ex) + else: + raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex)) + except Exception as ex: + session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( + instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) + raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex)) + return return_value + + +@router.post("/{model_id}/v1/completions") +@router.post("/{model_id}/v1/completions/") +async def serve_model(model_id: str, request: Union[bytes, Dict[Any, Any]] = None): + try: + return_value = await processor.process_request( + base_url=model_id, + version=None, + request_body=request, + url_type="completion" + ) + except EndpointNotFoundException as ex: + raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex)) + except (EndpointModelLoadException, EndpointBackendEngineException) as ex: + session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( + instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) + raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex)) + except ServingInitializationException as ex: + session_logger.report_text("[{}] Exception [{}] {} while loading serving inference: {}\n{}".format( + instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) + raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex)) + except ValueError as ex: + session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( + instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) + if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex): raise CUDAException(exception=ex) else: raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex)) diff --git a/clearml_serving/serving/model_request_processor.py b/clearml_serving/serving/model_request_processor.py index 35f5120..93198f8 100644 --- a/clearml_serving/serving/model_request_processor.py +++ b/clearml_serving/serving/model_request_processor.py @@ -155,7 +155,7 @@ class ModelRequestProcessor(object): self._serving_base_url = None self._metric_log_freq = None - async def process_request(self, base_url: str, version: str, request_body: dict) -> dict: + async def process_request(self, base_url: str, version: str, request_body: dict, url_type: str) -> dict: """ Process request coming in, Raise Value error if url does not match existing endpoints @@ -188,7 +188,7 @@ class ModelRequestProcessor(object): processor = processor_cls(model_endpoint=ep, task=self._task) self._engine_processor_lookup[url] = processor - return_value = await self._process_request(processor=processor, url=url, body=request_body) + return_value = await self._process_request(processor=processor, url=url, body=request_body, url_type=url_type) finally: self._request_processing_state.dec() @@ -1188,7 +1188,7 @@ class ModelRequestProcessor(object): # update preprocessing classes BasePreprocessRequest.set_server_config(self._configuration) - async def _process_request(self, processor: BasePreprocessRequest, url: str, body: dict) -> dict: + async def _process_request(self, processor: BasePreprocessRequest, url: str, body: dict, url_type: str) -> dict: # collect statistics for this request stats_collect_fn = None collect_stats = False @@ -1211,9 +1211,16 @@ class ModelRequestProcessor(object): if processor.is_preprocess_async \ else processor.preprocess(body, state, stats_collect_fn) # noinspection PyUnresolvedReferences - processed = await processor.process(preprocessed, state, stats_collect_fn) \ - if processor.is_process_async \ - else processor.process(preprocessed, state, stats_collect_fn) + if url_type == "completion": + processed = await processor.completion(preprocessed, state, stats_collect_fn) \ + if processor.is_process_async \ + else processor.completion(preprocessed, state, stats_collect_fn) + elif url_type == "chat_completion": + processed = await processor.chat_completion(preprocessed, state, stats_collect_fn) \ + if processor.is_process_async \ + else processor.chat_completion(preprocessed, state, stats_collect_fn) + else: + raise ValueError(f"wrong url_type: expected 'completion' and 'chat_completion', got {url_type}") # noinspection PyUnresolvedReferences return_value = await processor.postprocess(processed, state, stats_collect_fn) \ if processor.is_postprocess_async \ diff --git a/clearml_serving/serving/preprocess_service.py b/clearml_serving/serving/preprocess_service.py index a5c069c..0618a0a 100644 --- a/clearml_serving/serving/preprocess_service.py +++ b/clearml_serving/serving/preprocess_service.py @@ -576,13 +576,22 @@ class CustomAsyncPreprocessRequest(BasePreprocessRequest): return await self._preprocess.postprocess(data, state, collect_custom_statistics_fn) return data - async def process(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: + async def completion(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: """ The actual processing function. We run the process in this context """ - if self._preprocess is not None and hasattr(self._preprocess, 'process'): - return await self._preprocess.process(data, state, collect_custom_statistics_fn) + if self._preprocess is not None and hasattr(self._preprocess, 'completion'): + return await self._preprocess.completion(data, state, collect_custom_statistics_fn) + return None + + async def chat_completion(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: + """ + The actual processing function. + We run the process in this context + """ + if self._preprocess is not None and hasattr(self._preprocess, 'chat_completion'): + return await self._preprocess.chat_completion(data, state, collect_custom_statistics_fn) return None @staticmethod diff --git a/clearml_serving/statistics/entrypoint.sh b/clearml_serving/statistics/entrypoint.sh index 1af8bef..f2e7e72 100755 --- a/clearml_serving/statistics/entrypoint.sh +++ b/clearml_serving/statistics/entrypoint.sh @@ -10,7 +10,7 @@ echo CLEARML_DEFAULT_KAFKA_SERVE_URL="$CLEARML_DEFAULT_KAFKA_SERVE_URL" SERVING_PORT="${CLEARML_SERVING_PORT:-9999}" # set default internal serve endpoint (for request pipelining) -CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/serve}" +CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/clearml}" CLEARML_DEFAULT_TRITON_GRPC_ADDR="${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-127.0.0.1:8001}" # print configuration