mirror of
https://github.com/clearml/clearml-serving
synced 2025-06-26 18:16:00 +00:00
initial commit
This commit is contained in:
parent
64daef23ba
commit
b8f5d81636
@ -99,84 +99,14 @@ router = APIRouter(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# cover all routing options for model version `/{model_id}`, `/{model_id}/123`, `/{model_id}?version=123`
|
@router.post("/v1/{endpoint_type}")
|
||||||
# @router.post("/{model_id}/{version}")
|
@router.post("/v1/{endpoint_type}/")
|
||||||
# @router.post("/{model_id}/")
|
async def llm_serve_model(endpoint_type: str, request: Union[bytes, Dict[Any, Any]] = None):
|
||||||
# @router.post("/{model_id}")
|
|
||||||
# async def serve_model(model_id: str, version: Optional[str] = None, request: Union[bytes, Dict[Any, Any]] = None):
|
|
||||||
# try:
|
|
||||||
# return_value = await processor.process_request(
|
|
||||||
# base_url=model_id,
|
|
||||||
# version=version,
|
|
||||||
# request_body=request
|
|
||||||
# )
|
|
||||||
# except EndpointNotFoundException as ex:
|
|
||||||
# raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex))
|
|
||||||
# except (EndpointModelLoadException, EndpointBackendEngineException) as ex:
|
|
||||||
# session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
|
||||||
# instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
|
||||||
# raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
|
||||||
# except ServingInitializationException as ex:
|
|
||||||
# session_logger.report_text("[{}] Exception [{}] {} while loading serving inference: {}\n{}".format(
|
|
||||||
# instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
|
||||||
# raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
|
||||||
# except ValueError as ex:
|
|
||||||
# session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
|
||||||
# instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
|
||||||
# if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex):
|
|
||||||
# raise CUDAException(exception=ex)
|
|
||||||
# else:
|
|
||||||
# raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
|
||||||
# except Exception as ex:
|
|
||||||
# session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
|
||||||
# instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
|
||||||
# raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
|
||||||
# return return_value
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/{model_id}/v1/chat/completions")
|
|
||||||
@router.post("/{model_id}/v1/chat/completions/")
|
|
||||||
async def serve_model(model_id: str, request: Union[bytes, Dict[Any, Any]] = None):
|
|
||||||
try:
|
try:
|
||||||
return_value = await processor.process_request(
|
return_value = await processor.process_request(
|
||||||
base_url=model_id,
|
base_url=request["model"],
|
||||||
version=None,
|
|
||||||
request_body=request,
|
request_body=request,
|
||||||
url_type="chat_completion"
|
url_type=endpoint_type
|
||||||
)
|
|
||||||
except EndpointNotFoundException as ex:
|
|
||||||
raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex))
|
|
||||||
except (EndpointModelLoadException, EndpointBackendEngineException) as ex:
|
|
||||||
session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
|
||||||
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
|
||||||
raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
|
||||||
except ServingInitializationException as ex:
|
|
||||||
session_logger.report_text("[{}] Exception [{}] {} while loading serving inference: {}\n{}".format(
|
|
||||||
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
|
||||||
raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
|
||||||
except ValueError as ex:
|
|
||||||
session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
|
||||||
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
|
||||||
if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex):
|
|
||||||
raise CUDAException(exception=ex)
|
|
||||||
else:
|
|
||||||
raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
|
||||||
except Exception as ex:
|
|
||||||
session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
|
||||||
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
|
||||||
raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
|
||||||
return return_value
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/{model_id}/v1/completions")
|
|
||||||
@router.post("/{model_id}/v1/completions/")
|
|
||||||
async def serve_model(model_id: str, request: Union[bytes, Dict[Any, Any]] = None):
|
|
||||||
try:
|
|
||||||
return_value = await processor.process_request(
|
|
||||||
base_url=model_id,
|
|
||||||
version=None,
|
|
||||||
request_body=request,
|
|
||||||
url_type="completion"
|
|
||||||
)
|
)
|
||||||
except EndpointNotFoundException as ex:
|
except EndpointNotFoundException as ex:
|
||||||
raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex))
|
raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex))
|
||||||
|
@ -155,7 +155,7 @@ class ModelRequestProcessor(object):
|
|||||||
self._serving_base_url = None
|
self._serving_base_url = None
|
||||||
self._metric_log_freq = None
|
self._metric_log_freq = None
|
||||||
|
|
||||||
async def process_request(self, base_url: str, version: str, request_body: dict, url_type: str) -> dict:
|
async def process_request(self, base_url: str, request_body: dict, url_type: str) -> dict:
|
||||||
"""
|
"""
|
||||||
Process request coming in,
|
Process request coming in,
|
||||||
Raise Value error if url does not match existing endpoints
|
Raise Value error if url does not match existing endpoints
|
||||||
@ -167,11 +167,11 @@ class ModelRequestProcessor(object):
|
|||||||
while self._update_lock_flag:
|
while self._update_lock_flag:
|
||||||
await asyncio.sleep(0.5+random())
|
await asyncio.sleep(0.5+random())
|
||||||
# retry to process
|
# retry to process
|
||||||
return await self.process_request(base_url=base_url, version=version, request_body=request_body)
|
return await self.process_request(base_url=base_url, request_body=request_body, url_type=url_type)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# normalize url and version
|
# normalize url and version
|
||||||
url = self._normalize_endpoint_url(base_url, version)
|
url = self._normalize_endpoint_url(base_url)
|
||||||
|
|
||||||
# check canary
|
# check canary
|
||||||
canary_url = self._process_canary(base_url=url)
|
canary_url = self._process_canary(base_url=url)
|
||||||
@ -1211,16 +1211,16 @@ class ModelRequestProcessor(object):
|
|||||||
if processor.is_preprocess_async \
|
if processor.is_preprocess_async \
|
||||||
else processor.preprocess(body, state, stats_collect_fn)
|
else processor.preprocess(body, state, stats_collect_fn)
|
||||||
# noinspection PyUnresolvedReferences
|
# noinspection PyUnresolvedReferences
|
||||||
if url_type == "completion":
|
if url_type == "completions":
|
||||||
processed = await processor.completion(preprocessed, state, stats_collect_fn) \
|
processed = await processor.completion(preprocessed, state, stats_collect_fn) \
|
||||||
if processor.is_process_async \
|
if processor.is_process_async \
|
||||||
else processor.completion(preprocessed, state, stats_collect_fn)
|
else processor.completion(preprocessed, state, stats_collect_fn)
|
||||||
elif url_type == "chat_completion":
|
elif url_type == "chat/completions":
|
||||||
processed = await processor.chat_completion(preprocessed, state, stats_collect_fn) \
|
processed = await processor.chat_completion(preprocessed, state, stats_collect_fn) \
|
||||||
if processor.is_process_async \
|
if processor.is_process_async \
|
||||||
else processor.chat_completion(preprocessed, state, stats_collect_fn)
|
else processor.chat_completion(preprocessed, state, stats_collect_fn)
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"wrong url_type: expected 'completion' and 'chat_completion', got {url_type}")
|
raise ValueError(f"wrong url_type: expected 'completions' and 'chat/completions', got {url_type}")
|
||||||
# noinspection PyUnresolvedReferences
|
# noinspection PyUnresolvedReferences
|
||||||
return_value = await processor.postprocess(processed, state, stats_collect_fn) \
|
return_value = await processor.postprocess(processed, state, stats_collect_fn) \
|
||||||
if processor.is_postprocess_async \
|
if processor.is_postprocess_async \
|
||||||
@ -1341,8 +1341,9 @@ class ModelRequestProcessor(object):
|
|||||||
return task
|
return task
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _normalize_endpoint_url(cls, endpoint: str, version: Optional[str] = None) -> str:
|
def _normalize_endpoint_url(cls, endpoint: str) -> str:
|
||||||
return "{}/{}".format(endpoint.rstrip("/"), version or "").rstrip("/")
|
# return "{}/{}".format(endpoint.rstrip("/"), version or "").rstrip("/")
|
||||||
|
return endpoint
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _validate_model(cls, endpoint: Union[ModelEndpoint, ModelMonitoring]) -> bool:
|
def _validate_model(cls, endpoint: Union[ModelEndpoint, ModelMonitoring]) -> bool:
|
||||||
|
@ -19,7 +19,7 @@ class BasePreprocessRequest(object):
|
|||||||
__preprocessing_lookup = {}
|
__preprocessing_lookup = {}
|
||||||
__preprocessing_modules = set()
|
__preprocessing_modules = set()
|
||||||
_grpc_env_conf_prefix = "CLEARML_GRPC_"
|
_grpc_env_conf_prefix = "CLEARML_GRPC_"
|
||||||
_default_serving_base_url = "http://127.0.0.1:8080/serve/"
|
_default_serving_base_url = "http://127.0.0.1:8080/clearml/"
|
||||||
_server_config = {} # externally configured by the serving inference service
|
_server_config = {} # externally configured by the serving inference service
|
||||||
_timeout = None # timeout in seconds for the entire request, set in __init__
|
_timeout = None # timeout in seconds for the entire request, set in __init__
|
||||||
is_preprocess_async = False
|
is_preprocess_async = False
|
||||||
@ -292,7 +292,7 @@ class TritonPreprocessRequest(BasePreprocessRequest):
|
|||||||
|
|
||||||
self._grpc_stub = {}
|
self._grpc_stub = {}
|
||||||
|
|
||||||
async def process(
|
async def chat_completion(
|
||||||
self,
|
self,
|
||||||
data: Any,
|
data: Any,
|
||||||
state: dict,
|
state: dict,
|
||||||
@ -428,74 +428,28 @@ class TritonPreprocessRequest(BasePreprocessRequest):
|
|||||||
return output_results[0] if index == 1 else output_results
|
return output_results[0] if index == 1 else output_results
|
||||||
|
|
||||||
|
|
||||||
@BasePreprocessRequest.register_engine("sklearn", modules=["joblib", "sklearn"])
|
|
||||||
class SKLearnPreprocessRequest(BasePreprocessRequest):
|
|
||||||
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None):
|
|
||||||
super(SKLearnPreprocessRequest, self).__init__(
|
|
||||||
model_endpoint=model_endpoint, task=task)
|
|
||||||
if self._model is None:
|
|
||||||
# get model
|
|
||||||
import joblib # noqa
|
|
||||||
self._model = joblib.load(filename=self._get_local_model_file())
|
|
||||||
|
|
||||||
def process(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
|
|
||||||
"""
|
|
||||||
The actual processing function.
|
|
||||||
We run the model in this context
|
|
||||||
"""
|
|
||||||
return self._model.predict(data)
|
|
||||||
|
|
||||||
|
|
||||||
@BasePreprocessRequest.register_engine("xgboost", modules=["xgboost"])
|
|
||||||
class XGBoostPreprocessRequest(BasePreprocessRequest):
|
|
||||||
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None):
|
|
||||||
super(XGBoostPreprocessRequest, self).__init__(
|
|
||||||
model_endpoint=model_endpoint, task=task)
|
|
||||||
if self._model is None:
|
|
||||||
# get model
|
|
||||||
import xgboost # noqa
|
|
||||||
self._model = xgboost.Booster()
|
|
||||||
self._model.load_model(self._get_local_model_file())
|
|
||||||
|
|
||||||
def process(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
|
|
||||||
"""
|
|
||||||
The actual processing function.
|
|
||||||
We run the model in this context
|
|
||||||
"""
|
|
||||||
return self._model.predict(data)
|
|
||||||
|
|
||||||
|
|
||||||
@BasePreprocessRequest.register_engine("lightgbm", modules=["lightgbm"])
|
|
||||||
class LightGBMPreprocessRequest(BasePreprocessRequest):
|
|
||||||
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None):
|
|
||||||
super(LightGBMPreprocessRequest, self).__init__(
|
|
||||||
model_endpoint=model_endpoint, task=task)
|
|
||||||
if self._model is None:
|
|
||||||
# get model
|
|
||||||
import lightgbm # noqa
|
|
||||||
self._model = lightgbm.Booster(model_file=self._get_local_model_file())
|
|
||||||
|
|
||||||
def process(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
|
|
||||||
"""
|
|
||||||
The actual processing function.
|
|
||||||
We run the model in this context
|
|
||||||
"""
|
|
||||||
return self._model.predict(data)
|
|
||||||
|
|
||||||
|
|
||||||
@BasePreprocessRequest.register_engine("custom")
|
@BasePreprocessRequest.register_engine("custom")
|
||||||
class CustomPreprocessRequest(BasePreprocessRequest):
|
class CustomPreprocessRequest(BasePreprocessRequest):
|
||||||
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None):
|
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None):
|
||||||
super(CustomPreprocessRequest, self).__init__(
|
super(CustomPreprocessRequest, self).__init__(
|
||||||
model_endpoint=model_endpoint, task=task)
|
model_endpoint=model_endpoint, task=task)
|
||||||
|
|
||||||
def process(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
|
def completion(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
|
||||||
"""
|
"""
|
||||||
The actual processing function.
|
The actual processing function.
|
||||||
We run the process in this context
|
We run the process in this context
|
||||||
"""
|
"""
|
||||||
if self._preprocess is not None and hasattr(self._preprocess, 'process'):
|
if self._preprocess is not None and hasattr(self._preprocess, 'completion'):
|
||||||
return self._preprocess.process(data, state, collect_custom_statistics_fn)
|
return self._preprocess.completion(data, state, collect_custom_statistics_fn)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def chat_completion(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
|
||||||
|
"""
|
||||||
|
The actual processing function.
|
||||||
|
We run the process in this context
|
||||||
|
"""
|
||||||
|
if self._preprocess is not None and hasattr(self._preprocess, 'chat_completion'):
|
||||||
|
return self._preprocess.chat_completion(data, state, collect_custom_statistics_fn)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user