fix suffix and add router

This commit is contained in:
IlyaMescheryakov1402 2025-02-27 22:56:39 +03:00
parent 2685d2a0e5
commit 5b73bdf085
7 changed files with 84 additions and 45 deletions

View File

@ -18,7 +18,8 @@ UVICORN_SERVE_LOOP="${UVICORN_SERVE_LOOP:-uvloop}"
UVICORN_LOG_LEVEL="${UVICORN_LOG_LEVEL:-warning}"
# set default internal serve endpoint (for request pipelining)
CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/clearml}"
CLEARML_DEFAULT_SERVE_SUFFIX="${CLEARML_DEFAULT_SERVE_SUFFIX:-serve}"
CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/$CLEARML_DEFAULT_SERVE_SUFFIX}"
CLEARML_DEFAULT_TRITON_GRPC_ADDR="${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-127.0.0.1:8001}"
# print configuration
@ -31,6 +32,7 @@ echo GUNICORN_EXTRA_ARGS="$GUNICORN_EXTRA_ARGS"
echo UVICORN_SERVE_LOOP="$UVICORN_SERVE_LOOP"
echo UVICORN_EXTRA_ARGS="$UVICORN_EXTRA_ARGS"
echo UVICORN_LOG_LEVEL="$UVICORN_LOG_LEVEL"
echo CLEARML_DEFAULT_SERVE_SUFFIX="$CLEARML_DEFAULT_SERVE_SUFFIX"
echo CLEARML_DEFAULT_BASE_SERVE_URL="$CLEARML_DEFAULT_BASE_SERVE_URL"
echo CLEARML_DEFAULT_TRITON_GRPC_ADDR="$CLEARML_DEFAULT_TRITON_GRPC_ADDR"

View File

@ -60,9 +60,6 @@ try:
except (ValueError, TypeError):
pass
class CUDAException(Exception):
def __init__(self, exception: str):
self.exception = exception
grpc_aio_ignore_errors = parse_grpc_errors(shlex.split(os.environ.get("CLEARML_SERVING_AIO_RPC_IGNORE_ERRORS", "")))
grpc_aio_verbose_errors = parse_grpc_errors(shlex.split(os.environ.get("CLEARML_SERVING_AIO_RPC_VERBOSE_ERRORS", "")))
@ -113,36 +110,18 @@ async def cuda_exception_handler(request, exc):
return PlainTextResponse("CUDA out of memory. Restarting service", status_code=500, background=task)
@app.on_event('shutdown')
def shutdown_event():
print('RESTARTING INFERENCE SERVICE!')
async def exit_app():
loop = asyncio.get_running_loop()
loop.stop()
@app.exception_handler(CUDAException)
async def cuda_exception_handler(request, exc):
task = BackgroundTask(exit_app)
return PlainTextResponse("CUDA out of memory. Restarting service", status_code=500, background=task)
router = APIRouter(
prefix="/clearml",
tags=["models"],
responses={404: {"description": "Model Serving Endpoint Not found"}},
route_class=GzipRoute, # mark-out to remove support for GZip content encoding
)
@router.post("/v1/{endpoint_type:path}")
@router.post("/v1/{endpoint_type:path}/")
async def llm_serve_model(endpoint_type: str, request: Union[bytes, Dict[Any, Any]] = None):
def process_with_exceptions(
base_url: str,
version: Optional[str],
request: Union[bytes, Dict[Any, Any]],
serve_type: str
):
try:
return_value = await processor.process_request(
base_url=request["model"],
base_url=base_url,
version=version,
request_body=request,
url_type=endpoint_type
serve_type=serve_type
)
except EndpointNotFoundException as ex:
raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex))
@ -190,4 +169,43 @@ async def llm_serve_model(endpoint_type: str, request: Union[bytes, Dict[Any, An
return return_value
router = APIRouter(
prefix=f"/{os.environ.get("CLEARML_DEFAULT_SERVE_SUFFIX", "serve")}",
tags=["models"],
responses={404: {"description": "Model Serving Endpoint Not found"}},
route_class=GzipRoute, # mark-out to remove support for GZip content encoding
)
@router.post("/{model_id}/{version}")
@router.post("/{model_id}/")
@router.post("/{model_id}")
async def base_serve_model(
model_id: str,
version: Optional[str] = None,
request: Union[bytes, Dict[Any, Any]] = None
):
return_value = process_with_exceptions(
base_url=model_id,
version=version,
request_body=request,
serve_type="process"
)
return return_value
@router.post("/openai/v1/{endpoint_type:path}")
@router.post("/openai/v1/{endpoint_type:path}/")
async def openai_serve_model(
endpoint_type: str,
request: Union[bytes, Dict[Any, Any]] = None
):
return_value = process_with_exceptions(
base_url=request.get("model", None),
version=None,
request_body=request,
serve_type=endpoint_type
)
return return_value
app.include_router(router)

View File

@ -159,7 +159,7 @@ class ModelRequestProcessor(object):
self._serving_base_url = None
self._metric_log_freq = None
async def process_request(self, base_url: str, request_body: dict, url_type: str) -> dict:
async def process_request(self, base_url: str, version: str, request_body: dict, serve_type: str) -> dict:
"""
Process request coming in,
Raise Value error if url does not match existing endpoints
@ -171,11 +171,16 @@ class ModelRequestProcessor(object):
while self._update_lock_flag:
await asyncio.sleep(0.5+random())
# retry to process
return await self.process_request(base_url=base_url, request_body=request_body, url_type=url_type)
return await self.process_request(
base_url=base_url,
version=version,
request_body=request_body,
serve_type=serve_type
)
try:
# normalize url and version
url = self._normalize_endpoint_url(base_url)
url = self._normalize_endpoint_url(base_url, version)
# check canary
canary_url = self._process_canary(base_url=url)
@ -192,7 +197,12 @@ class ModelRequestProcessor(object):
processor = processor_cls(model_endpoint=ep, task=self._task)
self._engine_processor_lookup[url] = processor
return_value = await self._process_request(processor=processor, url=url, body=request_body, url_type=url_type)
return_value = await self._process_request(
processor=processor,
url=url,
body=request_body,
serve_type=serve_type
)
finally:
self._request_processing_state.dec()
@ -1193,7 +1203,7 @@ class ModelRequestProcessor(object):
# update preprocessing classes
BasePreprocessRequest.set_server_config(self._configuration)
async def _process_request(self, processor: BasePreprocessRequest, url: str, body: dict, url_type: str) -> dict:
async def _process_request(self, processor: BasePreprocessRequest, url: str, body: dict, serve_type: str) -> dict:
# collect statistics for this request
stats_collect_fn = None
collect_stats = False
@ -1215,13 +1225,19 @@ class ModelRequestProcessor(object):
preprocessed = await processor.preprocess(body, state, stats_collect_fn) \
if processor.is_preprocess_async \
else processor.preprocess(body, state, stats_collect_fn)
# noinspection PyUnresolvedReferences
if url_type == "completions":
processed = await processor.completion(preprocessed, state, stats_collect_fn) \
if serve_type == "process":
# noinspection PyUnresolvedReferences
processed = await processor.process(preprocessed, state, stats_collect_fn) \
if processor.is_process_async \
else processor.process(preprocessed, state, stats_collect_fn)
elif serve_type == "completions":
# noinspection PyUnresolvedReferences
processed = await processor.completions(preprocessed, state, stats_collect_fn) \
if processor.is_process_async \
else processor.completion(preprocessed, state, stats_collect_fn)
elif url_type == "chat/completions":
processed = await processor.chat_completion(preprocessed, state, stats_collect_fn) \
elif serve_type == "chat/completions":
# noinspection PyUnresolvedReferences
processed = await processor.chat_completions(preprocessed, state, stats_collect_fn) \
if processor.is_process_async \
else processor.chat_completion(preprocessed, state, stats_collect_fn)
else:
@ -1346,9 +1362,8 @@ class ModelRequestProcessor(object):
return task
@classmethod
def _normalize_endpoint_url(cls, endpoint: str) -> str:
# return "{}/{}".format(endpoint.rstrip("/"), version or "").rstrip("/")
return endpoint
def _normalize_endpoint_url(cls, endpoint: str, version: Optional[str] = None) -> str:
return "{}/{}".format(endpoint.rstrip("/"), version or "").rstrip("/")
@classmethod
def _validate_model(cls, endpoint: Union[ModelEndpoint, ModelMonitoring]) -> bool:

View File

@ -10,7 +10,8 @@ echo CLEARML_DEFAULT_KAFKA_SERVE_URL="$CLEARML_DEFAULT_KAFKA_SERVE_URL"
SERVING_PORT="${CLEARML_SERVING_PORT:-9999}"
# set default internal serve endpoint (for request pipelining)
CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/clearml}"
CLEARML_DEFAULT_SERVE_SUFFIX="${CLEARML_DEFAULT_SERVE_SUFFIX:-serve}"
CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/$CLEARML_DEFAULT_SERVE_SUFFIX}"
CLEARML_DEFAULT_TRITON_GRPC_ADDR="${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-127.0.0.1:8001}"
# print configuration

View File

@ -92,6 +92,7 @@ services:
CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-}
CLEARML_SERVING_PORT: ${CLEARML_SERVING_PORT:-8080}
CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1.0}
CLEARML_DEFAULT_SERVE_SUFFIX: ${CLEARML_DEFAULT_SERVE_SUFFIX:-serve}
CLEARML_DEFAULT_BASE_SERVE_URL: ${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:8080/serve}
CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092}
CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-clearml-serving-triton:8001}

View File

@ -92,6 +92,7 @@ services:
CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-}
CLEARML_SERVING_PORT: ${CLEARML_SERVING_PORT:-8080}
CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1.0}
CLEARML_DEFAULT_SERVE_SUFFIX: ${CLEARML_DEFAULT_SERVE_SUFFIX:-serve}
CLEARML_DEFAULT_BASE_SERVE_URL: ${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:8080/serve}
CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092}
CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-clearml-serving-triton:8001}

View File

@ -92,6 +92,7 @@ services:
CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-}
CLEARML_SERVING_PORT: ${CLEARML_SERVING_PORT:-8080}
CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1.0}
CLEARML_DEFAULT_SERVE_SUFFIX: ${CLEARML_DEFAULT_SERVE_SUFFIX:-serve}
CLEARML_DEFAULT_BASE_SERVE_URL: ${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:8080/serve}
CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092}
CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-}