mirror of
https://github.com/clearml/clearml-serving
synced 2025-06-26 18:16:00 +00:00
initial commit
This commit is contained in:
parent
6859920848
commit
64daef23ba
@ -4,7 +4,7 @@ FROM python:3.11-bullseye
|
|||||||
ENV LC_ALL=C.UTF-8
|
ENV LC_ALL=C.UTF-8
|
||||||
|
|
||||||
# install base package
|
# install base package
|
||||||
RUN pip3 install --no-cache-dir clearml-serving
|
# RUN pip3 install --no-cache-dir clearml-serving
|
||||||
|
|
||||||
# get latest execution code from the git repository
|
# get latest execution code from the git repository
|
||||||
# RUN cd $HOME && git clone https://github.com/allegroai/clearml-serving.git
|
# RUN cd $HOME && git clone https://github.com/allegroai/clearml-serving.git
|
||||||
|
@ -18,7 +18,7 @@ UVICORN_SERVE_LOOP="${UVICORN_SERVE_LOOP:-uvloop}"
|
|||||||
UVICORN_LOG_LEVEL="${UVICORN_LOG_LEVEL:-warning}"
|
UVICORN_LOG_LEVEL="${UVICORN_LOG_LEVEL:-warning}"
|
||||||
|
|
||||||
# set default internal serve endpoint (for request pipelining)
|
# set default internal serve endpoint (for request pipelining)
|
||||||
CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/serve}"
|
CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/clearml}"
|
||||||
CLEARML_DEFAULT_TRITON_GRPC_ADDR="${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-127.0.0.1:8001}"
|
CLEARML_DEFAULT_TRITON_GRPC_ADDR="${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-127.0.0.1:8001}"
|
||||||
|
|
||||||
# print configuration
|
# print configuration
|
||||||
|
@ -92,7 +92,7 @@ async def cuda_exception_handler(request, exc):
|
|||||||
|
|
||||||
|
|
||||||
router = APIRouter(
|
router = APIRouter(
|
||||||
prefix="/serve",
|
prefix="/clearml",
|
||||||
tags=["models"],
|
tags=["models"],
|
||||||
responses={404: {"description": "Model Serving Endpoint Not found"}},
|
responses={404: {"description": "Model Serving Endpoint Not found"}},
|
||||||
route_class=GzipRoute, # mark-out to remove support for GZip content encoding
|
route_class=GzipRoute, # mark-out to remove support for GZip content encoding
|
||||||
@ -100,15 +100,49 @@ router = APIRouter(
|
|||||||
|
|
||||||
|
|
||||||
# cover all routing options for model version `/{model_id}`, `/{model_id}/123`, `/{model_id}?version=123`
|
# cover all routing options for model version `/{model_id}`, `/{model_id}/123`, `/{model_id}?version=123`
|
||||||
@router.post("/{model_id}/{version}")
|
# @router.post("/{model_id}/{version}")
|
||||||
@router.post("/{model_id}/")
|
# @router.post("/{model_id}/")
|
||||||
@router.post("/{model_id}")
|
# @router.post("/{model_id}")
|
||||||
async def serve_model(model_id: str, version: Optional[str] = None, request: Union[bytes, Dict[Any, Any]] = None):
|
# async def serve_model(model_id: str, version: Optional[str] = None, request: Union[bytes, Dict[Any, Any]] = None):
|
||||||
|
# try:
|
||||||
|
# return_value = await processor.process_request(
|
||||||
|
# base_url=model_id,
|
||||||
|
# version=version,
|
||||||
|
# request_body=request
|
||||||
|
# )
|
||||||
|
# except EndpointNotFoundException as ex:
|
||||||
|
# raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex))
|
||||||
|
# except (EndpointModelLoadException, EndpointBackendEngineException) as ex:
|
||||||
|
# session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
||||||
|
# instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
||||||
|
# raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
||||||
|
# except ServingInitializationException as ex:
|
||||||
|
# session_logger.report_text("[{}] Exception [{}] {} while loading serving inference: {}\n{}".format(
|
||||||
|
# instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
||||||
|
# raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
||||||
|
# except ValueError as ex:
|
||||||
|
# session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
||||||
|
# instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
||||||
|
# if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex):
|
||||||
|
# raise CUDAException(exception=ex)
|
||||||
|
# else:
|
||||||
|
# raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
||||||
|
# except Exception as ex:
|
||||||
|
# session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
||||||
|
# instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
||||||
|
# raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
||||||
|
# return return_value
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/{model_id}/v1/chat/completions")
|
||||||
|
@router.post("/{model_id}/v1/chat/completions/")
|
||||||
|
async def serve_model(model_id: str, request: Union[bytes, Dict[Any, Any]] = None):
|
||||||
try:
|
try:
|
||||||
return_value = await processor.process_request(
|
return_value = await processor.process_request(
|
||||||
base_url=model_id,
|
base_url=model_id,
|
||||||
version=version,
|
version=None,
|
||||||
request_body=request
|
request_body=request,
|
||||||
|
url_type="chat_completion"
|
||||||
)
|
)
|
||||||
except EndpointNotFoundException as ex:
|
except EndpointNotFoundException as ex:
|
||||||
raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex))
|
raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex))
|
||||||
@ -123,7 +157,41 @@ async def serve_model(model_id: str, version: Optional[str] = None, request: Uni
|
|||||||
except ValueError as ex:
|
except ValueError as ex:
|
||||||
session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
||||||
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
||||||
if "CUDA out of memory. " in str(ex):
|
if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex):
|
||||||
|
raise CUDAException(exception=ex)
|
||||||
|
else:
|
||||||
|
raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
||||||
|
except Exception as ex:
|
||||||
|
session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
||||||
|
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
||||||
|
raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
||||||
|
return return_value
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/{model_id}/v1/completions")
|
||||||
|
@router.post("/{model_id}/v1/completions/")
|
||||||
|
async def serve_model(model_id: str, request: Union[bytes, Dict[Any, Any]] = None):
|
||||||
|
try:
|
||||||
|
return_value = await processor.process_request(
|
||||||
|
base_url=model_id,
|
||||||
|
version=None,
|
||||||
|
request_body=request,
|
||||||
|
url_type="completion"
|
||||||
|
)
|
||||||
|
except EndpointNotFoundException as ex:
|
||||||
|
raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex))
|
||||||
|
except (EndpointModelLoadException, EndpointBackendEngineException) as ex:
|
||||||
|
session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
||||||
|
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
||||||
|
raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
||||||
|
except ServingInitializationException as ex:
|
||||||
|
session_logger.report_text("[{}] Exception [{}] {} while loading serving inference: {}\n{}".format(
|
||||||
|
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
||||||
|
raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
||||||
|
except ValueError as ex:
|
||||||
|
session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
||||||
|
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
||||||
|
if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex):
|
||||||
raise CUDAException(exception=ex)
|
raise CUDAException(exception=ex)
|
||||||
else:
|
else:
|
||||||
raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
||||||
|
@ -155,7 +155,7 @@ class ModelRequestProcessor(object):
|
|||||||
self._serving_base_url = None
|
self._serving_base_url = None
|
||||||
self._metric_log_freq = None
|
self._metric_log_freq = None
|
||||||
|
|
||||||
async def process_request(self, base_url: str, version: str, request_body: dict) -> dict:
|
async def process_request(self, base_url: str, version: str, request_body: dict, url_type: str) -> dict:
|
||||||
"""
|
"""
|
||||||
Process request coming in,
|
Process request coming in,
|
||||||
Raise Value error if url does not match existing endpoints
|
Raise Value error if url does not match existing endpoints
|
||||||
@ -188,7 +188,7 @@ class ModelRequestProcessor(object):
|
|||||||
processor = processor_cls(model_endpoint=ep, task=self._task)
|
processor = processor_cls(model_endpoint=ep, task=self._task)
|
||||||
self._engine_processor_lookup[url] = processor
|
self._engine_processor_lookup[url] = processor
|
||||||
|
|
||||||
return_value = await self._process_request(processor=processor, url=url, body=request_body)
|
return_value = await self._process_request(processor=processor, url=url, body=request_body, url_type=url_type)
|
||||||
finally:
|
finally:
|
||||||
self._request_processing_state.dec()
|
self._request_processing_state.dec()
|
||||||
|
|
||||||
@ -1188,7 +1188,7 @@ class ModelRequestProcessor(object):
|
|||||||
# update preprocessing classes
|
# update preprocessing classes
|
||||||
BasePreprocessRequest.set_server_config(self._configuration)
|
BasePreprocessRequest.set_server_config(self._configuration)
|
||||||
|
|
||||||
async def _process_request(self, processor: BasePreprocessRequest, url: str, body: dict) -> dict:
|
async def _process_request(self, processor: BasePreprocessRequest, url: str, body: dict, url_type: str) -> dict:
|
||||||
# collect statistics for this request
|
# collect statistics for this request
|
||||||
stats_collect_fn = None
|
stats_collect_fn = None
|
||||||
collect_stats = False
|
collect_stats = False
|
||||||
@ -1211,9 +1211,16 @@ class ModelRequestProcessor(object):
|
|||||||
if processor.is_preprocess_async \
|
if processor.is_preprocess_async \
|
||||||
else processor.preprocess(body, state, stats_collect_fn)
|
else processor.preprocess(body, state, stats_collect_fn)
|
||||||
# noinspection PyUnresolvedReferences
|
# noinspection PyUnresolvedReferences
|
||||||
processed = await processor.process(preprocessed, state, stats_collect_fn) \
|
if url_type == "completion":
|
||||||
if processor.is_process_async \
|
processed = await processor.completion(preprocessed, state, stats_collect_fn) \
|
||||||
else processor.process(preprocessed, state, stats_collect_fn)
|
if processor.is_process_async \
|
||||||
|
else processor.completion(preprocessed, state, stats_collect_fn)
|
||||||
|
elif url_type == "chat_completion":
|
||||||
|
processed = await processor.chat_completion(preprocessed, state, stats_collect_fn) \
|
||||||
|
if processor.is_process_async \
|
||||||
|
else processor.chat_completion(preprocessed, state, stats_collect_fn)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"wrong url_type: expected 'completion' and 'chat_completion', got {url_type}")
|
||||||
# noinspection PyUnresolvedReferences
|
# noinspection PyUnresolvedReferences
|
||||||
return_value = await processor.postprocess(processed, state, stats_collect_fn) \
|
return_value = await processor.postprocess(processed, state, stats_collect_fn) \
|
||||||
if processor.is_postprocess_async \
|
if processor.is_postprocess_async \
|
||||||
|
@ -576,13 +576,22 @@ class CustomAsyncPreprocessRequest(BasePreprocessRequest):
|
|||||||
return await self._preprocess.postprocess(data, state, collect_custom_statistics_fn)
|
return await self._preprocess.postprocess(data, state, collect_custom_statistics_fn)
|
||||||
return data
|
return data
|
||||||
|
|
||||||
async def process(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
|
async def completion(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
|
||||||
"""
|
"""
|
||||||
The actual processing function.
|
The actual processing function.
|
||||||
We run the process in this context
|
We run the process in this context
|
||||||
"""
|
"""
|
||||||
if self._preprocess is not None and hasattr(self._preprocess, 'process'):
|
if self._preprocess is not None and hasattr(self._preprocess, 'completion'):
|
||||||
return await self._preprocess.process(data, state, collect_custom_statistics_fn)
|
return await self._preprocess.completion(data, state, collect_custom_statistics_fn)
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def chat_completion(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
|
||||||
|
"""
|
||||||
|
The actual processing function.
|
||||||
|
We run the process in this context
|
||||||
|
"""
|
||||||
|
if self._preprocess is not None and hasattr(self._preprocess, 'chat_completion'):
|
||||||
|
return await self._preprocess.chat_completion(data, state, collect_custom_statistics_fn)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -10,7 +10,7 @@ echo CLEARML_DEFAULT_KAFKA_SERVE_URL="$CLEARML_DEFAULT_KAFKA_SERVE_URL"
|
|||||||
SERVING_PORT="${CLEARML_SERVING_PORT:-9999}"
|
SERVING_PORT="${CLEARML_SERVING_PORT:-9999}"
|
||||||
|
|
||||||
# set default internal serve endpoint (for request pipelining)
|
# set default internal serve endpoint (for request pipelining)
|
||||||
CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/serve}"
|
CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/clearml}"
|
||||||
CLEARML_DEFAULT_TRITON_GRPC_ADDR="${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-127.0.0.1:8001}"
|
CLEARML_DEFAULT_TRITON_GRPC_ADDR="${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-127.0.0.1:8001}"
|
||||||
|
|
||||||
# print configuration
|
# print configuration
|
||||||
|
Loading…
Reference in New Issue
Block a user