major vllm engine update

This commit is contained in:
IlyaMescheryakov1402 2025-03-09 01:46:05 +03:00
parent 32d72bcd1c
commit 428be76642
11 changed files with 305 additions and 127 deletions

View File

@ -103,6 +103,10 @@ cat docker/example.env
```bash
cd docker && docker-compose --env-file example.env -f docker-compose.yml up
```
If running on a GPU instance, use gpu docker-compose file
```bash
cd docker && docker-compose --env-file example.env -f docker-compose-gpu.yml up
```
If you need Triton support (keras/pytorch/onnx etc.), use the triton docker-compose file
```bash
cd docker && docker-compose --env-file example.env -f docker-compose-triton.yml up

View File

@ -561,7 +561,7 @@ def main():
setattr(args, args_var, type(t)(v) if t is not None else v)
# noinspection PyProtectedMember
serving_task = ModelRequestProcessor._get_control_plane_task(task_id=args.inference_task_id)
serving_task = ModelRequestProcessor._get_control_plane_task(task_id=args.serving_id)
task = Task.init(
project_name=args.project or serving_task.get_project_name() or "serving",

View File

@ -110,17 +110,17 @@ async def cuda_exception_handler(request, exc):
return PlainTextResponse("CUDA out of memory. Restarting service", status_code=500, background=task)
def process_with_exceptions(
async def process_with_exceptions(
base_url: str,
version: Optional[str],
request: Union[bytes, Dict[Any, Any]],
request_body: Union[bytes, Dict[Any, Any]],
serve_type: str
):
try:
return_value = await processor.process_request(
base_url=base_url,
version=version,
request_body=request,
request_body=request_body,
serve_type=serve_type
)
except EndpointNotFoundException as ex:
@ -128,21 +128,21 @@ def process_with_exceptions(
except (EndpointModelLoadException, EndpointBackendEngineException) as ex:
session_logger.report_text(
"[{}] Exception [{}] {} while processing request: {}\n{}".format(
instance_id, type(ex), ex, request, "".join(traceback.format_exc())
instance_id, type(ex), ex, request_body, "".join(traceback.format_exc())
)
)
raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
except ServingInitializationException as ex:
session_logger.report_text(
"[{}] Exception [{}] {} while loading serving inference: {}\n{}".format(
instance_id, type(ex), ex, request, "".join(traceback.format_exc())
instance_id, type(ex), ex, request_body, "".join(traceback.format_exc())
)
)
raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex))
except ValueError as ex:
session_logger.report_text(
"[{}] Exception [{}] {} while processing request: {}\n{}".format(
instance_id, type(ex), ex, request, "".join(traceback.format_exc())
instance_id, type(ex), ex, request_body, "".join(traceback.format_exc())
)
)
if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex):
@ -152,7 +152,7 @@ def process_with_exceptions(
except AioRpcError as ex:
if grpc_aio_verbose_errors and ex.code() in grpc_aio_verbose_errors:
session_logger.report_text(
"[{}] Exception [AioRpcError] {} while processing request: {}".format(instance_id, ex, request)
"[{}] Exception [AioRpcError] {} while processing request: {}".format(instance_id, ex, request_body)
)
elif not grpc_aio_ignore_errors or ex.code() not in grpc_aio_ignore_errors:
session_logger.report_text("[{}] Exception [AioRpcError] status={} ".format(instance_id, ex.code()))
@ -162,7 +162,7 @@ def process_with_exceptions(
except Exception as ex:
session_logger.report_text(
"[{}] Exception [{}] {} while processing request: {}\n{}".format(
instance_id, type(ex), ex, request, "".join(traceback.format_exc())
instance_id, type(ex), ex, request_body, "".join(traceback.format_exc())
)
)
raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex))
@ -170,7 +170,7 @@ def process_with_exceptions(
router = APIRouter(
prefix=f"/{os.environ.get("CLEARML_DEFAULT_SERVE_SUFFIX", "serve")}",
prefix=f"/{os.environ.get('CLEARML_DEFAULT_SERVE_SUFFIX', 'serve')}",
tags=["models"],
responses={404: {"description": "Model Serving Endpoint Not found"}},
route_class=GzipRoute, # mark-out to remove support for GZip content encoding
@ -185,7 +185,7 @@ async def base_serve_model(
version: Optional[str] = None,
request: Union[bytes, Dict[Any, Any]] = None
):
return_value = process_with_exceptions(
return_value = await process_with_exceptions(
base_url=model_id,
version=version,
request_body=request,
@ -200,7 +200,7 @@ async def openai_serve_model(
endpoint_type: str,
request: Union[bytes, Dict[Any, Any]] = None
):
return_value = process_with_exceptions(
return_value = await process_with_exceptions(
base_url=request.get("model", None),
version=None,
request_body=request,

View File

@ -14,6 +14,19 @@ from requests import post as request_post
from .endpoints import ModelEndpoint
# try:
# import prometheus_client
# from fastapi.responses import JSONResponse, StreamingResponse
# from vllm.entrypoints.openai.protocol import (
# ChatCompletionRequest,
# ChatCompletionResponse,
# CompletionRequest,
# ErrorResponse
# )
# from vllm.logger import init_logger
# except ImportError:
# print("WARNING: install vllm==0.5.4 and prometheus_client==0.21.1 to serve vllm engine")
class BasePreprocessRequest(object):
__preprocessing_lookup = {}
@ -598,65 +611,11 @@ class CustomAsyncPreprocessRequest(BasePreprocessRequest):
return return_value.json()
@BasePreprocessRequest.register_engine("vllm")
@BasePreprocessRequest.register_engine("vllm", modules=["vllm", "fastapi"])
class VllmPreprocessRequest(BasePreprocessRequest):
import prometheus_client
from typing import Any, Union, Optional, Callable
from fastapi.responses import JSONResponse, StreamingResponse
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm.entrypoints.logger import RequestLogger
# yapf conflicts with isort for this block
# yapf: disable
from vllm.entrypoints.openai.protocol import (
ChatCompletionRequest,
ChatCompletionResponse,
CompletionRequest,
ErrorResponse
)
# yapf: enable
from vllm.entrypoints.openai.serving_chat import OpenAIServingChat
from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion
from vllm.entrypoints.openai.serving_embedding import OpenAIServingEmbedding
from vllm.entrypoints.openai.serving_tokenization import OpenAIServingTokenization
from vllm.logger import init_logger
from vllm.usage.usage_lib import UsageContext
from vllm.entrypoints.openai.serving_engine import LoRAModulePath, PromptAdapterPath
logger = init_logger(__name__)
REMOVE_WEB_ADDITIONAL_PROMPTS = True
if VllmPreprocessRequest.asyncio_to_thread is None:
from asyncio import to_thread as asyncio_to_thread
VllmPreprocessRequest.asyncio_to_thread = asyncio_to_thread
def remove_extra_system_prompts(messages: list) -> list:
"""
Removes all 'system' prompts except the last one.
:param messages: List of message dicts with 'role' and 'content'.
:return: Modified list of messages with only the last 'system' prompt preserved.
"""
# Фильтруем только системные сообщения
system_messages_indices = []
for i, msg in enumerate(messages):
if msg["role"] == "system":
system_messages_indices.append(i)
else:
break
# Если есть больше одного системного сообщения, удалим все, кроме последнего
if len(system_messages_indices) > 1:
last_system_index = system_messages_indices[-1]
# Удаляем все системные сообщения, кроме последнего
messages = [msg for i, msg in enumerate(messages) if msg["role"] != "system" or i == last_system_index]
return messages
asyncio_to_thread = None
_vllm = None
_fastapi = None
class CustomRequest:
def __init__(self, headers: Optional[dict] = None):
@ -669,12 +628,39 @@ class VllmPreprocessRequest(BasePreprocessRequest):
super(VllmPreprocessRequest, self).__init__(
model_endpoint=model_endpoint, task=task)
def is_port_in_use(port: int) -> bool:
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', port)) == 0
if not is_port_in_use(8000):
prometheus_client.start_http_server(8000)
# load vLLM Modules
if self._vllm is None:
from vllm.entrypoints.openai.protocol import (
ChatCompletionRequest,
ChatCompletionResponse,
CompletionRequest,
ErrorResponse
)
self._vllm = {}
self._vllm["chat_completion_request"] = ChatCompletionRequest
self._vllm["chat_completion_response"] = ChatCompletionResponse
self._vllm["completion_request"] = CompletionRequest
self._vllm["error_response"] = ErrorResponse
if self._fastapi is None:
from fastapi.responses import JSONResponse, StreamingResponse
self._fastapi = {}
self._fastapi["json_response"] = JSONResponse
self._fastapi["streaming_response"] = StreamingResponse
from vllm.logger import init_logger
self.logger = init_logger(__name__)
if VllmPreprocessRequest.asyncio_to_thread is None:
from asyncio import to_thread as asyncio_to_thread
VllmPreprocessRequest.asyncio_to_thread = asyncio_to_thread
import socket
import prometheus_client
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
if not s.connect_ex(('localhost', 8000)) == 0:
prometheus_client.start_http_server(8000)
# override `send_request` method with the async version
self._preprocess.__class__.send_request = VllmPreprocessRequest._preprocess_send_request
@ -738,15 +724,11 @@ class VllmPreprocessRequest(BasePreprocessRequest):
return await self._preprocess.postprocess(data, state, collect_custom_statistics_fn)
return data
async def completions(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
"""
The actual processing function.
We run the process in this context
"""
if REMOVE_WEB_ADDITIONAL_PROMPTS:
if "messages" in body:
body["messages"] = remove_extra_system_prompts(body["messages"])
raw_request = CustomRequest(
headers = {
@ -754,18 +736,18 @@ class VllmPreprocessRequest(BasePreprocessRequest):
"tracestate": None
}
)
request = CompletionRequest(**body)
logger.info(f"Received chat completion request: {request}")
request = self._vllm["completion_request"](**data)
self.logger.info(f"Received chat completion request: {request}")
generator = await self._model["openai_serving_completion"].create_completion(
request=request,
raw_request=raw_request
)
if isinstance(generator, ErrorResponse):
return JSONResponse(content=generator.model_dump(), status_code=generator.code)
if isinstance(generator, self._vllm["error_response"]):
return self._fastapi["json_response"](content=generator.model_dump(), status_code=generator.code)
if request.stream:
return StreamingResponse(content=generator, media_type="text/event-stream")
return self._fastapi["streaming_response"](content=generator, media_type="text/event-stream")
else:
return JSONResponse(content=generator.model_dump())
return self._fastapi["json_response"](content=generator.model_dump())
async def chat_completions(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
@ -773,30 +755,27 @@ class VllmPreprocessRequest(BasePreprocessRequest):
The actual processing function.
We run the process in this context
"""
if REMOVE_WEB_ADDITIONAL_PROMPTS:
if "messages" in body:
body["messages"] = remove_extra_system_prompts(body["messages"])
request = ChatCompletionRequest(**body)
logger.info(f"Received chat completion request: {request}")
generator = await self._model["self.openai_serving_chat"].create_chat_completion(
request = self._vllm["chat_completion_request"](**data)
self.logger.info(f"Received chat completion request: {request}")
generator = await self._model["openai_serving_chat"].create_chat_completion(
request=request, raw_request=None
)
if isinstance(generator, ErrorResponse):
return JSONResponse(content=generator.model_dump(), status_code=generator.code)
if isinstance(generator, self._vllm["error_response"]):
return self._fastapi["json_response"](content=generator.model_dump(), status_code=generator.code)
if request.stream:
return StreamingResponse(content=generator, media_type="text/event-stream")
return self._fastapi["streaming_response"](content=generator, media_type="text/event-stream")
else:
assert isinstance(generator, ChatCompletionResponse)
return JSONResponse(content=generator.model_dump())
assert isinstance(generator, self._vllm["chat_completion_response"])
return self._fastapi["json_response"](content=generator.model_dump())
@staticmethod
async def _preprocess_send_request(_, endpoint: str, version: str = None, data: dict = None) -> Optional[dict]:
endpoint = "{}/{}".format(endpoint.strip("/"), version.strip("/")) if version else endpoint.strip("/")
endpoint = "/openai/v1/{}".format(endpoint.strip("/"))
base_url = BasePreprocessRequest.get_server_config().get("base_serving_url")
base_url = (base_url or BasePreprocessRequest._default_serving_base_url).strip("/")
url = "{}/{}".format(base_url, endpoint.strip("/"))
return_value = await CustomAsyncPreprocessRequest.asyncio_to_thread(
return_value = await VllmPreprocessRequest.asyncio_to_thread(
request_post, url, json=data, timeout=BasePreprocessRequest._timeout)
if not return_value.ok:
return None

View File

@ -18,3 +18,5 @@ lightgbm>=3.3.2,<3.4
requests>=2.31.0
kafka-python>=2.0.2,<2.1
lz4>=4.0.0,<5
prometheus_client==0.21.1
vllm==0.5.4

View File

@ -4,7 +4,7 @@ FROM python:3.11-bullseye
ENV LC_ALL=C.UTF-8
# install base package
RUN pip3 install --no-cache-dir clearml-serving
# RUN pip3 install --no-cache-dir clearml-serving
# get latest execution code from the git repository
# RUN cd $HOME && git clone https://github.com/allegroai/clearml-serving.git

View File

@ -1 +1 @@
__version__ = '1.3.2'
__version__ = '1.3.5'

View File

@ -0,0 +1,146 @@
version: "3"
services:
zookeeper:
image: bitnami/zookeeper:3.7.0
container_name: clearml-serving-zookeeper
# ports:
# - "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- clearml-serving-backend
kafka:
image: bitnami/kafka:3.1.1
container_name: clearml-serving-kafka
# ports:
# - "9092:9092"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://clearml-serving-kafka:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://clearml-serving-kafka:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=clearml-serving-zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CREATE_TOPICS="topic_test:1:1"
depends_on:
- zookeeper
networks:
- clearml-serving-backend
prometheus:
image: prom/prometheus:v2.34.0
container_name: clearml-serving-prometheus
volumes:
- ./prometheus.yml:/prometheus.yml
command:
- '--config.file=/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
- '--storage.tsdb.retention.time=200h'
- '--web.enable-lifecycle'
restart: unless-stopped
# ports:
# - "9090:9090"
depends_on:
- clearml-serving-statistics
networks:
- clearml-serving-backend
alertmanager:
image: prom/alertmanager:v0.23.0
container_name: clearml-serving-alertmanager
restart: unless-stopped
# ports:
# - "9093:9093"
depends_on:
- prometheus
- grafana
networks:
- clearml-serving-backend
grafana:
image: grafana/grafana:8.4.4-ubuntu
container_name: clearml-serving-grafana
volumes:
- './datasource.yml:/etc/grafana/provisioning/datasources/datasource.yaml'
restart: unless-stopped
ports:
- "3000:3000"
depends_on:
- prometheus
networks:
- clearml-serving-backend
clearml-serving-inference:
image: clearml-serving-inference:latest
container_name: clearml-serving-inference
restart: unless-stopped
# optimize perforamnce
security_opt:
- seccomp:unconfined
ports:
- "8080:8080"
environment:
CLEARML_WEB_HOST: ${CLEARML_WEB_HOST:-https://app.clear.ml}
CLEARML_API_HOST: ${CLEARML_API_HOST:-https://api.clear.ml}
CLEARML_FILES_HOST: ${CLEARML_FILES_HOST:-https://files.clear.ml}
CLEARML_API_ACCESS_KEY: ${CLEARML_API_ACCESS_KEY}
CLEARML_API_SECRET_KEY: ${CLEARML_API_SECRET_KEY}
CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-}
CLEARML_SERVING_PORT: ${CLEARML_SERVING_PORT:-8080}
CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1.0}
CLEARML_DEFAULT_SERVE_SUFFIX: ${CLEARML_DEFAULT_SERVE_SUFFIX:-serve}
CLEARML_DEFAULT_BASE_SERVE_URL: ${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:8080/serve}
CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092}
CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-}
CLEARML_USE_GUNICORN: ${CLEARML_USE_GUNICORN:-}
CLEARML_SERVING_NUM_PROCESS: ${CLEARML_SERVING_NUM_PROCESS:-}
CLEARML_EXTRA_PYTHON_PACKAGES: ${CLEARML_EXTRA_PYTHON_PACKAGES:-}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-}
AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION:-}
GOOGLE_APPLICATION_CREDENTIALS: ${GOOGLE_APPLICATION_CREDENTIALS:-}
AZURE_STORAGE_ACCOUNT: ${AZURE_STORAGE_ACCOUNT:-}
AZURE_STORAGE_KEY: ${AZURE_STORAGE_KEY:-}
depends_on:
- kafka
networks:
- clearml-serving-backend
deploy:
resources:
reservations:
devices:
- driver: nvidia
device_ids: ['0']
capabilities: [gpu]
clearml-serving-statistics:
image: allegroai/clearml-serving-statistics:latest
container_name: clearml-serving-statistics
restart: unless-stopped
# optimize perforamnce
security_opt:
- seccomp:unconfined
# ports:
# - "9999:9999"
environment:
CLEARML_WEB_HOST: ${CLEARML_WEB_HOST:-https://app.clear.ml}
CLEARML_API_HOST: ${CLEARML_API_HOST:-https://api.clear.ml}
CLEARML_FILES_HOST: ${CLEARML_FILES_HOST:-https://files.clear.ml}
CLEARML_API_ACCESS_KEY: ${CLEARML_API_ACCESS_KEY}
CLEARML_API_SECRET_KEY: ${CLEARML_API_SECRET_KEY}
CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-}
CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092}
CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1.0}
depends_on:
- kafka
networks:
- clearml-serving-backend
networks:
clearml-serving-backend:
driver: bridge

View File

@ -20,3 +20,10 @@ scrape_configs:
static_configs:
- targets: ['clearml-serving-statistics:9999']
- job_name: 'vllm'
scrape_interval: 5s
static_configs:
- targets: ['clearml-serving-inference']

View File

@ -1,8 +1,16 @@
"""Hugginface preprocessing module for ClearML Serving."""
from typing import Any
from typing import Any, Optional
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm.entrypoints.logger import RequestLogger
from vllm.entrypoints.openai.serving_chat import OpenAIServingChat
from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion
from vllm.entrypoints.openai.serving_embedding import OpenAIServingEmbedding
from vllm.entrypoints.openai.serving_tokenization import OpenAIServingTokenization
from vllm.usage.usage_lib import UsageContext
from vllm.entrypoints.openai.serving_engine import LoRAModulePath, PromptAdapterPath
# Notice Preprocess class Must be named "Preprocess"
class Preprocess:
"""Processing class will be run by the ClearML inference services before and after each request."""
@ -12,14 +20,15 @@ class Preprocess:
def load(self, local_file_name: str) -> Optional[Any]: # noqa
vllm_engine_config = {
"model":f"{local_file_name}/model",
"tokenizer":f"{local_file_name}/tokenizer",
"model": local_file_name,
"tokenizer": local_file_name,
"disable_log_requests": True,
"disable_log_stats": False,
"gpu_memory_utilization": 0.9,
"quantization": None,
"enforce_eager": True,
"served_model_name": "ai_operator_hyp22v4"
"served_model_name": "test_vllm",
"dtype": "float16"
}
vllm_model_config = {
"lora_modules": None, # [LoRAModulePath(name=a, path=b)]
@ -30,16 +39,12 @@ class Preprocess:
"max_log_len": None
}
self._model = {}
self._model["engine_args"] = AsyncEngineArgs(**vllm_engine_config)
self._model["async_engine_client"] = AsyncLLMEngine.from_engine_args(self.engine_args, usage_context=UsageContext.OPENAI_API_SERVER)
self._model["model_config"] = self.async_engine_client.engine.get_model_config()
self._model["request_logger"] = RequestLogger(max_log_len=vllm_model_config["max_log_len"])
self._model["self.openai_serving_chat"] = OpenAIServingChat(
self.async_engine_client,
engine_args = AsyncEngineArgs(**vllm_engine_config)
async_engine_client = AsyncLLMEngine.from_engine_args(engine_args, usage_context=UsageContext.OPENAI_API_SERVER)
model_config = async_engine_client.engine.get_model_config()
request_logger = RequestLogger(max_log_len=vllm_model_config["max_log_len"])
self._model["openai_serving_chat"] = OpenAIServingChat(
async_engine_client,
model_config,
served_model_names=[vllm_engine_config["served_model_name"]],
response_role=vllm_model_config["response_role"],
@ -50,7 +55,7 @@ class Preprocess:
return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"]
)
self._model["openai_serving_completion"] = OpenAIServingCompletion(
self.async_engine_client,
async_engine_client,
model_config,
served_model_names=[vllm_engine_config["served_model_name"]],
lora_modules=vllm_model_config["lora_modules"],
@ -58,17 +63,40 @@ class Preprocess:
request_logger=request_logger,
return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"]
)
self._model["self.openai_serving_embedding"] = OpenAIServingEmbedding(
self.async_engine_client,
self._model["openai_serving_embedding"] = OpenAIServingEmbedding(
async_engine_client,
model_config,
served_model_names=[vllm_engine_config["served_model_name"]],
request_logger=request_logger
)
self._model["self.openai_serving_tokenization"] = OpenAIServingTokenization(
self.async_engine_client,
self._model["openai_serving_tokenization"] = OpenAIServingTokenization(
async_engine_client,
model_config,
served_model_names=[vllm_engine_config["served_model_name"]],
lora_modules=vllm_model_config["lora_modules"],
request_logger=request_logger,
chat_template=vllm_model_config["chat_template"]
)
return self._model
def remove_extra_system_prompts(self, messages: List) -> List:
system_messages_indices = []
for i, msg in enumerate(messages):
if msg["role"] == "system":
system_messages_indices.append(i)
else:
break
if len(system_messages_indices) > 1:
last_system_index = system_messages_indices[-1]
messages = [msg for i, msg in enumerate(messages) if msg["role"] != "system" or i == last_system_index]
return messages
def preprocess(
self,
body: Union[bytes, dict],
state: dict,
collect_custom_statistics_fn: Optional[Callable[[dict], None]],
) -> Any: # noqa
if "messages" in body:
body["messages"] = self.remove_extra_system_prompts(body["messages"])
return body

View File

@ -3,21 +3,22 @@
## setting up the serving service
1. Create serving Service: `clearml-serving create --name "serving example"` (write down the service ID)
2. Make sure to add any required additional packages (for your custom model) to the [docker-compose.yml](https://github.com/allegroai/clearml-serving/blob/826f503cf4a9b069b89eb053696d218d1ce26f47/docker/docker-compose.yml#L97) (or as environment variable to the `clearml-serving-inference` container), by defining for example: `CLEARML_EXTRA_PYTHON_PACKAGES="vllm==0.5.4"`
3. Create model endpoint:
`clearml-serving --id <service_id> model add --engine vllm --endpoint "test_vllm" --preprocess "examples/vllm/preprocess.py" --name "test vllm" --project "serving examples"`
`clearml-serving --id <service_id> model add --model-id <model_id> --engine vllm --endpoint "test_vllm" --preprocess "examples/vllm/preprocess.py" --name "test vllm" --project "serving examples"`
Or auto update
Or auto update
`clearml-serving --id <service_id> model auto-update --engine vllm --endpoint "test_vllm" --preprocess "examples/vllm/preprocess.py" --name "test vllm" --project "serving examples" --max-versions 2`
`clearml-serving --id <service_id> model auto-update --engine vllm --endpoint "test_vllm" --preprocess "examples/vllm/preprocess.py" --name "test vllm" --project "serving examples" --max-versions 2`
Or add Canary endpoint
Or add Canary endpoint
`clearml-serving --id <service_id> model canary --endpoint "test_vllm" --weights 0.1 0.9 --input-endpoint-prefix test_vllm`
`clearml-serving --id <service_id> model canary --endpoint "test_vllm" --weights 0.1 0.9 --input-endpoint-prefix test_vllm`
4. If you already have the `clearml-serving` docker-compose running, it might take it a minute or two to sync with the new endpoint.
Or you can run the clearml-serving container independently `docker run -v ~/clearml.conf:/root/clearml.conf -p 8080:8080 -e CLEARML_SERVING_TASK_ID=<service_id> clearml-serving:latest`
Or you can run the clearml-serving container independently `docker run -v ~/clearml.conf:/root/clearml.conf -p 8080:8080 -e CLEARML_SERVING_TASK_ID=<service_id> clearml-serving:latest`
5. Test new endpoint (do notice the first call will trigger the model pulling, so it might take longer, from here on, it's all in memory):
@ -49,3 +50,14 @@ r1 = await openai.Completion.acreate(
print(f"Completion: \n {r1['choices'][0]['text']}")
```
NOTE!
If you want to use send_request method, keep in mind that you have to pass "completions" or "chat/completions" in entrypoint (and pass model as a part of "data" parameter) and use it for non-streaming models:
```python
prompt = "Hi there, goodman!"
result = self.send_request(endpoint="chat/completions", version=None, data={"model": "test_vllm", "messages": [{"role": "system", "content": "You are a helpful assistant"}, {"role": "user", "content": prompt}]})
answer = result.choises[0].message.content
```
OR
If you want to use send_request method, use openai client instead