diff --git a/clearml_serving/serving/main.py b/clearml_serving/serving/main.py index c3c5f7c..8ac27d4 100644 --- a/clearml_serving/serving/main.py +++ b/clearml_serving/serving/main.py @@ -99,84 +99,14 @@ router = APIRouter( ) -# cover all routing options for model version `/{model_id}`, `/{model_id}/123`, `/{model_id}?version=123` -# @router.post("/{model_id}/{version}") -# @router.post("/{model_id}/") -# @router.post("/{model_id}") -# async def serve_model(model_id: str, version: Optional[str] = None, request: Union[bytes, Dict[Any, Any]] = None): -# try: -# return_value = await processor.process_request( -# base_url=model_id, -# version=version, -# request_body=request -# ) -# except EndpointNotFoundException as ex: -# raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex)) -# except (EndpointModelLoadException, EndpointBackendEngineException) as ex: -# session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( -# instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) -# raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex)) -# except ServingInitializationException as ex: -# session_logger.report_text("[{}] Exception [{}] {} while loading serving inference: {}\n{}".format( -# instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) -# raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex)) -# except ValueError as ex: -# session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( -# instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) -# if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex): -# raise CUDAException(exception=ex) -# else: -# raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex)) -# except Exception as ex: -# session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( -# instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) -# raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex)) -# return return_value - - -@router.post("/{model_id}/v1/chat/completions") -@router.post("/{model_id}/v1/chat/completions/") -async def serve_model(model_id: str, request: Union[bytes, Dict[Any, Any]] = None): +@router.post("/v1/{endpoint_type}") +@router.post("/v1/{endpoint_type}/") +async def llm_serve_model(endpoint_type: str, request: Union[bytes, Dict[Any, Any]] = None): try: return_value = await processor.process_request( - base_url=model_id, - version=None, + base_url=request["model"], request_body=request, - url_type="chat_completion" - ) - except EndpointNotFoundException as ex: - raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex)) - except (EndpointModelLoadException, EndpointBackendEngineException) as ex: - session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( - instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) - raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex)) - except ServingInitializationException as ex: - session_logger.report_text("[{}] Exception [{}] {} while loading serving inference: {}\n{}".format( - instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) - raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex)) - except ValueError as ex: - session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( - instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) - if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex): - raise CUDAException(exception=ex) - else: - raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex)) - except Exception as ex: - session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( - instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) - raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex)) - return return_value - - -@router.post("/{model_id}/v1/completions") -@router.post("/{model_id}/v1/completions/") -async def serve_model(model_id: str, request: Union[bytes, Dict[Any, Any]] = None): - try: - return_value = await processor.process_request( - base_url=model_id, - version=None, - request_body=request, - url_type="completion" + url_type=endpoint_type ) except EndpointNotFoundException as ex: raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex)) diff --git a/clearml_serving/serving/model_request_processor.py b/clearml_serving/serving/model_request_processor.py index 93198f8..04c79e6 100644 --- a/clearml_serving/serving/model_request_processor.py +++ b/clearml_serving/serving/model_request_processor.py @@ -155,7 +155,7 @@ class ModelRequestProcessor(object): self._serving_base_url = None self._metric_log_freq = None - async def process_request(self, base_url: str, version: str, request_body: dict, url_type: str) -> dict: + async def process_request(self, base_url: str, request_body: dict, url_type: str) -> dict: """ Process request coming in, Raise Value error if url does not match existing endpoints @@ -167,11 +167,11 @@ class ModelRequestProcessor(object): while self._update_lock_flag: await asyncio.sleep(0.5+random()) # retry to process - return await self.process_request(base_url=base_url, version=version, request_body=request_body) + return await self.process_request(base_url=base_url, request_body=request_body, url_type=url_type) try: # normalize url and version - url = self._normalize_endpoint_url(base_url, version) + url = self._normalize_endpoint_url(base_url) # check canary canary_url = self._process_canary(base_url=url) @@ -1211,16 +1211,16 @@ class ModelRequestProcessor(object): if processor.is_preprocess_async \ else processor.preprocess(body, state, stats_collect_fn) # noinspection PyUnresolvedReferences - if url_type == "completion": + if url_type == "completions": processed = await processor.completion(preprocessed, state, stats_collect_fn) \ if processor.is_process_async \ else processor.completion(preprocessed, state, stats_collect_fn) - elif url_type == "chat_completion": + elif url_type == "chat/completions": processed = await processor.chat_completion(preprocessed, state, stats_collect_fn) \ if processor.is_process_async \ else processor.chat_completion(preprocessed, state, stats_collect_fn) else: - raise ValueError(f"wrong url_type: expected 'completion' and 'chat_completion', got {url_type}") + raise ValueError(f"wrong url_type: expected 'completions' and 'chat/completions', got {url_type}") # noinspection PyUnresolvedReferences return_value = await processor.postprocess(processed, state, stats_collect_fn) \ if processor.is_postprocess_async \ @@ -1341,8 +1341,9 @@ class ModelRequestProcessor(object): return task @classmethod - def _normalize_endpoint_url(cls, endpoint: str, version: Optional[str] = None) -> str: - return "{}/{}".format(endpoint.rstrip("/"), version or "").rstrip("/") + def _normalize_endpoint_url(cls, endpoint: str) -> str: + # return "{}/{}".format(endpoint.rstrip("/"), version or "").rstrip("/") + return endpoint @classmethod def _validate_model(cls, endpoint: Union[ModelEndpoint, ModelMonitoring]) -> bool: diff --git a/clearml_serving/serving/preprocess_service.py b/clearml_serving/serving/preprocess_service.py index 0618a0a..e065144 100644 --- a/clearml_serving/serving/preprocess_service.py +++ b/clearml_serving/serving/preprocess_service.py @@ -19,7 +19,7 @@ class BasePreprocessRequest(object): __preprocessing_lookup = {} __preprocessing_modules = set() _grpc_env_conf_prefix = "CLEARML_GRPC_" - _default_serving_base_url = "http://127.0.0.1:8080/serve/" + _default_serving_base_url = "http://127.0.0.1:8080/clearml/" _server_config = {} # externally configured by the serving inference service _timeout = None # timeout in seconds for the entire request, set in __init__ is_preprocess_async = False @@ -292,7 +292,7 @@ class TritonPreprocessRequest(BasePreprocessRequest): self._grpc_stub = {} - async def process( + async def chat_completion( self, data: Any, state: dict, @@ -428,74 +428,28 @@ class TritonPreprocessRequest(BasePreprocessRequest): return output_results[0] if index == 1 else output_results -@BasePreprocessRequest.register_engine("sklearn", modules=["joblib", "sklearn"]) -class SKLearnPreprocessRequest(BasePreprocessRequest): - def __init__(self, model_endpoint: ModelEndpoint, task: Task = None): - super(SKLearnPreprocessRequest, self).__init__( - model_endpoint=model_endpoint, task=task) - if self._model is None: - # get model - import joblib # noqa - self._model = joblib.load(filename=self._get_local_model_file()) - - def process(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: - """ - The actual processing function. - We run the model in this context - """ - return self._model.predict(data) - - -@BasePreprocessRequest.register_engine("xgboost", modules=["xgboost"]) -class XGBoostPreprocessRequest(BasePreprocessRequest): - def __init__(self, model_endpoint: ModelEndpoint, task: Task = None): - super(XGBoostPreprocessRequest, self).__init__( - model_endpoint=model_endpoint, task=task) - if self._model is None: - # get model - import xgboost # noqa - self._model = xgboost.Booster() - self._model.load_model(self._get_local_model_file()) - - def process(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: - """ - The actual processing function. - We run the model in this context - """ - return self._model.predict(data) - - -@BasePreprocessRequest.register_engine("lightgbm", modules=["lightgbm"]) -class LightGBMPreprocessRequest(BasePreprocessRequest): - def __init__(self, model_endpoint: ModelEndpoint, task: Task = None): - super(LightGBMPreprocessRequest, self).__init__( - model_endpoint=model_endpoint, task=task) - if self._model is None: - # get model - import lightgbm # noqa - self._model = lightgbm.Booster(model_file=self._get_local_model_file()) - - def process(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: - """ - The actual processing function. - We run the model in this context - """ - return self._model.predict(data) - - @BasePreprocessRequest.register_engine("custom") class CustomPreprocessRequest(BasePreprocessRequest): def __init__(self, model_endpoint: ModelEndpoint, task: Task = None): super(CustomPreprocessRequest, self).__init__( model_endpoint=model_endpoint, task=task) - def process(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: + def completion(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: """ The actual processing function. We run the process in this context """ - if self._preprocess is not None and hasattr(self._preprocess, 'process'): - return self._preprocess.process(data, state, collect_custom_statistics_fn) + if self._preprocess is not None and hasattr(self._preprocess, 'completion'): + return self._preprocess.completion(data, state, collect_custom_statistics_fn) + return None + + def chat_completion(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: + """ + The actual processing function. + We run the process in this context + """ + if self._preprocess is not None and hasattr(self._preprocess, 'chat_completion'): + return self._preprocess.chat_completion(data, state, collect_custom_statistics_fn) return None