From 5b73bdf085b89bb048806ba6a2a529902b94e041 Mon Sep 17 00:00:00 2001 From: IlyaMescheryakov1402 Date: Thu, 27 Feb 2025 22:56:39 +0300 Subject: [PATCH] fix suffix and add router --- clearml_serving/serving/entrypoint.sh | 4 +- clearml_serving/serving/main.py | 78 ++++++++++++------- .../serving/model_request_processor.py | 41 ++++++---- clearml_serving/statistics/entrypoint.sh | 3 +- docker/docker-compose-triton-gpu.yml | 1 + docker/docker-compose-triton.yml | 1 + docker/docker-compose.yml | 1 + 7 files changed, 84 insertions(+), 45 deletions(-) diff --git a/clearml_serving/serving/entrypoint.sh b/clearml_serving/serving/entrypoint.sh index 0e51dda..06cb4be 100755 --- a/clearml_serving/serving/entrypoint.sh +++ b/clearml_serving/serving/entrypoint.sh @@ -18,7 +18,8 @@ UVICORN_SERVE_LOOP="${UVICORN_SERVE_LOOP:-uvloop}" UVICORN_LOG_LEVEL="${UVICORN_LOG_LEVEL:-warning}" # set default internal serve endpoint (for request pipelining) -CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/clearml}" +CLEARML_DEFAULT_SERVE_SUFFIX="${CLEARML_DEFAULT_SERVE_SUFFIX:-serve}" +CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/$CLEARML_DEFAULT_SERVE_SUFFIX}" CLEARML_DEFAULT_TRITON_GRPC_ADDR="${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-127.0.0.1:8001}" # print configuration @@ -31,6 +32,7 @@ echo GUNICORN_EXTRA_ARGS="$GUNICORN_EXTRA_ARGS" echo UVICORN_SERVE_LOOP="$UVICORN_SERVE_LOOP" echo UVICORN_EXTRA_ARGS="$UVICORN_EXTRA_ARGS" echo UVICORN_LOG_LEVEL="$UVICORN_LOG_LEVEL" +echo CLEARML_DEFAULT_SERVE_SUFFIX="$CLEARML_DEFAULT_SERVE_SUFFIX" echo CLEARML_DEFAULT_BASE_SERVE_URL="$CLEARML_DEFAULT_BASE_SERVE_URL" echo CLEARML_DEFAULT_TRITON_GRPC_ADDR="$CLEARML_DEFAULT_TRITON_GRPC_ADDR" diff --git a/clearml_serving/serving/main.py b/clearml_serving/serving/main.py index c1cc698..1950265 100644 --- a/clearml_serving/serving/main.py +++ b/clearml_serving/serving/main.py @@ -60,9 +60,6 @@ try: except (ValueError, TypeError): pass -class CUDAException(Exception): - def __init__(self, exception: str): - self.exception = exception grpc_aio_ignore_errors = parse_grpc_errors(shlex.split(os.environ.get("CLEARML_SERVING_AIO_RPC_IGNORE_ERRORS", ""))) grpc_aio_verbose_errors = parse_grpc_errors(shlex.split(os.environ.get("CLEARML_SERVING_AIO_RPC_VERBOSE_ERRORS", ""))) @@ -113,36 +110,18 @@ async def cuda_exception_handler(request, exc): return PlainTextResponse("CUDA out of memory. Restarting service", status_code=500, background=task) -@app.on_event('shutdown') -def shutdown_event(): - print('RESTARTING INFERENCE SERVICE!') - -async def exit_app(): - loop = asyncio.get_running_loop() - loop.stop() - -@app.exception_handler(CUDAException) -async def cuda_exception_handler(request, exc): - task = BackgroundTask(exit_app) - return PlainTextResponse("CUDA out of memory. Restarting service", status_code=500, background=task) - - -router = APIRouter( - prefix="/clearml", - tags=["models"], - responses={404: {"description": "Model Serving Endpoint Not found"}}, - route_class=GzipRoute, # mark-out to remove support for GZip content encoding -) - - -@router.post("/v1/{endpoint_type:path}") -@router.post("/v1/{endpoint_type:path}/") -async def llm_serve_model(endpoint_type: str, request: Union[bytes, Dict[Any, Any]] = None): +def process_with_exceptions( + base_url: str, + version: Optional[str], + request: Union[bytes, Dict[Any, Any]], + serve_type: str +): try: return_value = await processor.process_request( - base_url=request["model"], + base_url=base_url, + version=version, request_body=request, - url_type=endpoint_type + serve_type=serve_type ) except EndpointNotFoundException as ex: raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex)) @@ -190,4 +169,43 @@ async def llm_serve_model(endpoint_type: str, request: Union[bytes, Dict[Any, An return return_value +router = APIRouter( + prefix=f"/{os.environ.get("CLEARML_DEFAULT_SERVE_SUFFIX", "serve")}", + tags=["models"], + responses={404: {"description": "Model Serving Endpoint Not found"}}, + route_class=GzipRoute, # mark-out to remove support for GZip content encoding +) + + +@router.post("/{model_id}/{version}") +@router.post("/{model_id}/") +@router.post("/{model_id}") +async def base_serve_model( + model_id: str, + version: Optional[str] = None, + request: Union[bytes, Dict[Any, Any]] = None +): + return_value = process_with_exceptions( + base_url=model_id, + version=version, + request_body=request, + serve_type="process" + ) + return return_value + + +@router.post("/openai/v1/{endpoint_type:path}") +@router.post("/openai/v1/{endpoint_type:path}/") +async def openai_serve_model( + endpoint_type: str, + request: Union[bytes, Dict[Any, Any]] = None +): + return_value = process_with_exceptions( + base_url=request.get("model", None), + version=None, + request_body=request, + serve_type=endpoint_type + ) + return return_value + app.include_router(router) diff --git a/clearml_serving/serving/model_request_processor.py b/clearml_serving/serving/model_request_processor.py index bc6ee12..44eaf9e 100644 --- a/clearml_serving/serving/model_request_processor.py +++ b/clearml_serving/serving/model_request_processor.py @@ -159,7 +159,7 @@ class ModelRequestProcessor(object): self._serving_base_url = None self._metric_log_freq = None - async def process_request(self, base_url: str, request_body: dict, url_type: str) -> dict: + async def process_request(self, base_url: str, version: str, request_body: dict, serve_type: str) -> dict: """ Process request coming in, Raise Value error if url does not match existing endpoints @@ -171,11 +171,16 @@ class ModelRequestProcessor(object): while self._update_lock_flag: await asyncio.sleep(0.5+random()) # retry to process - return await self.process_request(base_url=base_url, request_body=request_body, url_type=url_type) + return await self.process_request( + base_url=base_url, + version=version, + request_body=request_body, + serve_type=serve_type + ) try: # normalize url and version - url = self._normalize_endpoint_url(base_url) + url = self._normalize_endpoint_url(base_url, version) # check canary canary_url = self._process_canary(base_url=url) @@ -192,7 +197,12 @@ class ModelRequestProcessor(object): processor = processor_cls(model_endpoint=ep, task=self._task) self._engine_processor_lookup[url] = processor - return_value = await self._process_request(processor=processor, url=url, body=request_body, url_type=url_type) + return_value = await self._process_request( + processor=processor, + url=url, + body=request_body, + serve_type=serve_type + ) finally: self._request_processing_state.dec() @@ -1193,7 +1203,7 @@ class ModelRequestProcessor(object): # update preprocessing classes BasePreprocessRequest.set_server_config(self._configuration) - async def _process_request(self, processor: BasePreprocessRequest, url: str, body: dict, url_type: str) -> dict: + async def _process_request(self, processor: BasePreprocessRequest, url: str, body: dict, serve_type: str) -> dict: # collect statistics for this request stats_collect_fn = None collect_stats = False @@ -1215,13 +1225,19 @@ class ModelRequestProcessor(object): preprocessed = await processor.preprocess(body, state, stats_collect_fn) \ if processor.is_preprocess_async \ else processor.preprocess(body, state, stats_collect_fn) - # noinspection PyUnresolvedReferences - if url_type == "completions": - processed = await processor.completion(preprocessed, state, stats_collect_fn) \ + if serve_type == "process": + # noinspection PyUnresolvedReferences + processed = await processor.process(preprocessed, state, stats_collect_fn) \ + if processor.is_process_async \ + else processor.process(preprocessed, state, stats_collect_fn) + elif serve_type == "completions": + # noinspection PyUnresolvedReferences + processed = await processor.completions(preprocessed, state, stats_collect_fn) \ if processor.is_process_async \ else processor.completion(preprocessed, state, stats_collect_fn) - elif url_type == "chat/completions": - processed = await processor.chat_completion(preprocessed, state, stats_collect_fn) \ + elif serve_type == "chat/completions": + # noinspection PyUnresolvedReferences + processed = await processor.chat_completions(preprocessed, state, stats_collect_fn) \ if processor.is_process_async \ else processor.chat_completion(preprocessed, state, stats_collect_fn) else: @@ -1346,9 +1362,8 @@ class ModelRequestProcessor(object): return task @classmethod - def _normalize_endpoint_url(cls, endpoint: str) -> str: - # return "{}/{}".format(endpoint.rstrip("/"), version or "").rstrip("/") - return endpoint + def _normalize_endpoint_url(cls, endpoint: str, version: Optional[str] = None) -> str: + return "{}/{}".format(endpoint.rstrip("/"), version or "").rstrip("/") @classmethod def _validate_model(cls, endpoint: Union[ModelEndpoint, ModelMonitoring]) -> bool: diff --git a/clearml_serving/statistics/entrypoint.sh b/clearml_serving/statistics/entrypoint.sh index f2e7e72..5b4de93 100755 --- a/clearml_serving/statistics/entrypoint.sh +++ b/clearml_serving/statistics/entrypoint.sh @@ -10,7 +10,8 @@ echo CLEARML_DEFAULT_KAFKA_SERVE_URL="$CLEARML_DEFAULT_KAFKA_SERVE_URL" SERVING_PORT="${CLEARML_SERVING_PORT:-9999}" # set default internal serve endpoint (for request pipelining) -CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/clearml}" +CLEARML_DEFAULT_SERVE_SUFFIX="${CLEARML_DEFAULT_SERVE_SUFFIX:-serve}" +CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/$CLEARML_DEFAULT_SERVE_SUFFIX}" CLEARML_DEFAULT_TRITON_GRPC_ADDR="${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-127.0.0.1:8001}" # print configuration diff --git a/docker/docker-compose-triton-gpu.yml b/docker/docker-compose-triton-gpu.yml index 8e54073..a2ec754 100644 --- a/docker/docker-compose-triton-gpu.yml +++ b/docker/docker-compose-triton-gpu.yml @@ -92,6 +92,7 @@ services: CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-} CLEARML_SERVING_PORT: ${CLEARML_SERVING_PORT:-8080} CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1.0} + CLEARML_DEFAULT_SERVE_SUFFIX: ${CLEARML_DEFAULT_SERVE_SUFFIX:-serve} CLEARML_DEFAULT_BASE_SERVE_URL: ${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:8080/serve} CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092} CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-clearml-serving-triton:8001} diff --git a/docker/docker-compose-triton.yml b/docker/docker-compose-triton.yml index b815583..edf92a4 100644 --- a/docker/docker-compose-triton.yml +++ b/docker/docker-compose-triton.yml @@ -92,6 +92,7 @@ services: CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-} CLEARML_SERVING_PORT: ${CLEARML_SERVING_PORT:-8080} CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1.0} + CLEARML_DEFAULT_SERVE_SUFFIX: ${CLEARML_DEFAULT_SERVE_SUFFIX:-serve} CLEARML_DEFAULT_BASE_SERVE_URL: ${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:8080/serve} CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092} CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-clearml-serving-triton:8001} diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 24e3b95..e73e184 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -92,6 +92,7 @@ services: CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-} CLEARML_SERVING_PORT: ${CLEARML_SERVING_PORT:-8080} CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1.0} + CLEARML_DEFAULT_SERVE_SUFFIX: ${CLEARML_DEFAULT_SERVE_SUFFIX:-serve} CLEARML_DEFAULT_BASE_SERVE_URL: ${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:8080/serve} CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092} CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-}