diff --git a/README.md b/README.md index 7147f40..63f9016 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,10 @@ cat docker/example.env ```bash cd docker && docker-compose --env-file example.env -f docker-compose.yml up ``` +If running on a GPU instance, use gpu docker-compose file +```bash +cd docker && docker-compose --env-file example.env -f docker-compose-gpu.yml up +``` If you need Triton support (keras/pytorch/onnx etc.), use the triton docker-compose file ```bash cd docker && docker-compose --env-file example.env -f docker-compose-triton.yml up diff --git a/clearml_serving/engines/triton/triton_helper.py b/clearml_serving/engines/triton/triton_helper.py index 19fd241..b4921c5 100644 --- a/clearml_serving/engines/triton/triton_helper.py +++ b/clearml_serving/engines/triton/triton_helper.py @@ -561,7 +561,7 @@ def main(): setattr(args, args_var, type(t)(v) if t is not None else v) # noinspection PyProtectedMember - serving_task = ModelRequestProcessor._get_control_plane_task(task_id=args.inference_task_id) + serving_task = ModelRequestProcessor._get_control_plane_task(task_id=args.serving_id) task = Task.init( project_name=args.project or serving_task.get_project_name() or "serving", diff --git a/clearml_serving/serving/main.py b/clearml_serving/serving/main.py index 1950265..f158181 100644 --- a/clearml_serving/serving/main.py +++ b/clearml_serving/serving/main.py @@ -110,17 +110,17 @@ async def cuda_exception_handler(request, exc): return PlainTextResponse("CUDA out of memory. Restarting service", status_code=500, background=task) -def process_with_exceptions( +async def process_with_exceptions( base_url: str, version: Optional[str], - request: Union[bytes, Dict[Any, Any]], + request_body: Union[bytes, Dict[Any, Any]], serve_type: str ): try: return_value = await processor.process_request( base_url=base_url, version=version, - request_body=request, + request_body=request_body, serve_type=serve_type ) except EndpointNotFoundException as ex: @@ -128,21 +128,21 @@ def process_with_exceptions( except (EndpointModelLoadException, EndpointBackendEngineException) as ex: session_logger.report_text( "[{}] Exception [{}] {} while processing request: {}\n{}".format( - instance_id, type(ex), ex, request, "".join(traceback.format_exc()) + instance_id, type(ex), ex, request_body, "".join(traceback.format_exc()) ) ) raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex)) except ServingInitializationException as ex: session_logger.report_text( "[{}] Exception [{}] {} while loading serving inference: {}\n{}".format( - instance_id, type(ex), ex, request, "".join(traceback.format_exc()) + instance_id, type(ex), ex, request_body, "".join(traceback.format_exc()) ) ) raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex)) except ValueError as ex: session_logger.report_text( "[{}] Exception [{}] {} while processing request: {}\n{}".format( - instance_id, type(ex), ex, request, "".join(traceback.format_exc()) + instance_id, type(ex), ex, request_body, "".join(traceback.format_exc()) ) ) if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex): @@ -152,7 +152,7 @@ def process_with_exceptions( except AioRpcError as ex: if grpc_aio_verbose_errors and ex.code() in grpc_aio_verbose_errors: session_logger.report_text( - "[{}] Exception [AioRpcError] {} while processing request: {}".format(instance_id, ex, request) + "[{}] Exception [AioRpcError] {} while processing request: {}".format(instance_id, ex, request_body) ) elif not grpc_aio_ignore_errors or ex.code() not in grpc_aio_ignore_errors: session_logger.report_text("[{}] Exception [AioRpcError] status={} ".format(instance_id, ex.code())) @@ -162,7 +162,7 @@ def process_with_exceptions( except Exception as ex: session_logger.report_text( "[{}] Exception [{}] {} while processing request: {}\n{}".format( - instance_id, type(ex), ex, request, "".join(traceback.format_exc()) + instance_id, type(ex), ex, request_body, "".join(traceback.format_exc()) ) ) raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex)) @@ -170,7 +170,7 @@ def process_with_exceptions( router = APIRouter( - prefix=f"/{os.environ.get("CLEARML_DEFAULT_SERVE_SUFFIX", "serve")}", + prefix=f"/{os.environ.get('CLEARML_DEFAULT_SERVE_SUFFIX', 'serve')}", tags=["models"], responses={404: {"description": "Model Serving Endpoint Not found"}}, route_class=GzipRoute, # mark-out to remove support for GZip content encoding @@ -185,7 +185,7 @@ async def base_serve_model( version: Optional[str] = None, request: Union[bytes, Dict[Any, Any]] = None ): - return_value = process_with_exceptions( + return_value = await process_with_exceptions( base_url=model_id, version=version, request_body=request, @@ -200,7 +200,7 @@ async def openai_serve_model( endpoint_type: str, request: Union[bytes, Dict[Any, Any]] = None ): - return_value = process_with_exceptions( + return_value = await process_with_exceptions( base_url=request.get("model", None), version=None, request_body=request, diff --git a/clearml_serving/serving/preprocess_service.py b/clearml_serving/serving/preprocess_service.py index b9b6cda..45946c7 100644 --- a/clearml_serving/serving/preprocess_service.py +++ b/clearml_serving/serving/preprocess_service.py @@ -14,6 +14,19 @@ from requests import post as request_post from .endpoints import ModelEndpoint +# try: +# import prometheus_client +# from fastapi.responses import JSONResponse, StreamingResponse +# from vllm.entrypoints.openai.protocol import ( +# ChatCompletionRequest, +# ChatCompletionResponse, +# CompletionRequest, +# ErrorResponse +# ) +# from vllm.logger import init_logger +# except ImportError: +# print("WARNING: install vllm==0.5.4 and prometheus_client==0.21.1 to serve vllm engine") + class BasePreprocessRequest(object): __preprocessing_lookup = {} @@ -598,65 +611,11 @@ class CustomAsyncPreprocessRequest(BasePreprocessRequest): return return_value.json() -@BasePreprocessRequest.register_engine("vllm") +@BasePreprocessRequest.register_engine("vllm", modules=["vllm", "fastapi"]) class VllmPreprocessRequest(BasePreprocessRequest): - import prometheus_client - - from typing import Any, Union, Optional, Callable - - from fastapi.responses import JSONResponse, StreamingResponse - - from vllm.engine.arg_utils import AsyncEngineArgs - from vllm.engine.async_llm_engine import AsyncLLMEngine - from vllm.entrypoints.logger import RequestLogger - # yapf conflicts with isort for this block - # yapf: disable - from vllm.entrypoints.openai.protocol import ( - ChatCompletionRequest, - ChatCompletionResponse, - CompletionRequest, - ErrorResponse - ) - - # yapf: enable - from vllm.entrypoints.openai.serving_chat import OpenAIServingChat - from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion - from vllm.entrypoints.openai.serving_embedding import OpenAIServingEmbedding - from vllm.entrypoints.openai.serving_tokenization import OpenAIServingTokenization - from vllm.logger import init_logger - from vllm.usage.usage_lib import UsageContext - from vllm.entrypoints.openai.serving_engine import LoRAModulePath, PromptAdapterPath - - logger = init_logger(__name__) - - REMOVE_WEB_ADDITIONAL_PROMPTS = True - - if VllmPreprocessRequest.asyncio_to_thread is None: - from asyncio import to_thread as asyncio_to_thread - VllmPreprocessRequest.asyncio_to_thread = asyncio_to_thread - - def remove_extra_system_prompts(messages: list) -> list: - """ - Removes all 'system' prompts except the last one. - - :param messages: List of message dicts with 'role' and 'content'. - :return: Modified list of messages with only the last 'system' prompt preserved. - """ - # Фильтруем только системные сообщения - system_messages_indices = [] - for i, msg in enumerate(messages): - if msg["role"] == "system": - system_messages_indices.append(i) - else: - break - - # Если есть больше одного системного сообщения, удалим все, кроме последнего - if len(system_messages_indices) > 1: - last_system_index = system_messages_indices[-1] - # Удаляем все системные сообщения, кроме последнего - messages = [msg for i, msg in enumerate(messages) if msg["role"] != "system" or i == last_system_index] - - return messages + asyncio_to_thread = None + _vllm = None + _fastapi = None class CustomRequest: def __init__(self, headers: Optional[dict] = None): @@ -669,12 +628,39 @@ class VllmPreprocessRequest(BasePreprocessRequest): super(VllmPreprocessRequest, self).__init__( model_endpoint=model_endpoint, task=task) - def is_port_in_use(port: int) -> bool: - import socket - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - return s.connect_ex(('localhost', port)) == 0 - if not is_port_in_use(8000): - prometheus_client.start_http_server(8000) + # load vLLM Modules + if self._vllm is None: + from vllm.entrypoints.openai.protocol import ( + ChatCompletionRequest, + ChatCompletionResponse, + CompletionRequest, + ErrorResponse + ) + self._vllm = {} + self._vllm["chat_completion_request"] = ChatCompletionRequest + self._vllm["chat_completion_response"] = ChatCompletionResponse + self._vllm["completion_request"] = CompletionRequest + self._vllm["error_response"] = ErrorResponse + + if self._fastapi is None: + from fastapi.responses import JSONResponse, StreamingResponse + self._fastapi = {} + self._fastapi["json_response"] = JSONResponse + self._fastapi["streaming_response"] = StreamingResponse + + from vllm.logger import init_logger + self.logger = init_logger(__name__) + + if VllmPreprocessRequest.asyncio_to_thread is None: + from asyncio import to_thread as asyncio_to_thread + VllmPreprocessRequest.asyncio_to_thread = asyncio_to_thread + + import socket + import prometheus_client + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + if not s.connect_ex(('localhost', 8000)) == 0: + prometheus_client.start_http_server(8000) + # override `send_request` method with the async version self._preprocess.__class__.send_request = VllmPreprocessRequest._preprocess_send_request @@ -738,15 +724,11 @@ class VllmPreprocessRequest(BasePreprocessRequest): return await self._preprocess.postprocess(data, state, collect_custom_statistics_fn) return data - async def completions(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: """ The actual processing function. We run the process in this context """ - if REMOVE_WEB_ADDITIONAL_PROMPTS: - if "messages" in body: - body["messages"] = remove_extra_system_prompts(body["messages"]) raw_request = CustomRequest( headers = { @@ -754,18 +736,18 @@ class VllmPreprocessRequest(BasePreprocessRequest): "tracestate": None } ) - request = CompletionRequest(**body) - logger.info(f"Received chat completion request: {request}") + request = self._vllm["completion_request"](**data) + self.logger.info(f"Received chat completion request: {request}") generator = await self._model["openai_serving_completion"].create_completion( request=request, raw_request=raw_request ) - if isinstance(generator, ErrorResponse): - return JSONResponse(content=generator.model_dump(), status_code=generator.code) + if isinstance(generator, self._vllm["error_response"]): + return self._fastapi["json_response"](content=generator.model_dump(), status_code=generator.code) if request.stream: - return StreamingResponse(content=generator, media_type="text/event-stream") + return self._fastapi["streaming_response"](content=generator, media_type="text/event-stream") else: - return JSONResponse(content=generator.model_dump()) + return self._fastapi["json_response"](content=generator.model_dump()) async def chat_completions(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: @@ -773,30 +755,27 @@ class VllmPreprocessRequest(BasePreprocessRequest): The actual processing function. We run the process in this context """ - if REMOVE_WEB_ADDITIONAL_PROMPTS: - if "messages" in body: - body["messages"] = remove_extra_system_prompts(body["messages"]) - request = ChatCompletionRequest(**body) - logger.info(f"Received chat completion request: {request}") - generator = await self._model["self.openai_serving_chat"].create_chat_completion( + request = self._vllm["chat_completion_request"](**data) + self.logger.info(f"Received chat completion request: {request}") + generator = await self._model["openai_serving_chat"].create_chat_completion( request=request, raw_request=None ) - if isinstance(generator, ErrorResponse): - return JSONResponse(content=generator.model_dump(), status_code=generator.code) + if isinstance(generator, self._vllm["error_response"]): + return self._fastapi["json_response"](content=generator.model_dump(), status_code=generator.code) if request.stream: - return StreamingResponse(content=generator, media_type="text/event-stream") + return self._fastapi["streaming_response"](content=generator, media_type="text/event-stream") else: - assert isinstance(generator, ChatCompletionResponse) - return JSONResponse(content=generator.model_dump()) + assert isinstance(generator, self._vllm["chat_completion_response"]) + return self._fastapi["json_response"](content=generator.model_dump()) @staticmethod async def _preprocess_send_request(_, endpoint: str, version: str = None, data: dict = None) -> Optional[dict]: - endpoint = "{}/{}".format(endpoint.strip("/"), version.strip("/")) if version else endpoint.strip("/") + endpoint = "/openai/v1/{}".format(endpoint.strip("/")) base_url = BasePreprocessRequest.get_server_config().get("base_serving_url") base_url = (base_url or BasePreprocessRequest._default_serving_base_url).strip("/") url = "{}/{}".format(base_url, endpoint.strip("/")) - return_value = await CustomAsyncPreprocessRequest.asyncio_to_thread( + return_value = await VllmPreprocessRequest.asyncio_to_thread( request_post, url, json=data, timeout=BasePreprocessRequest._timeout) if not return_value.ok: return None diff --git a/clearml_serving/serving/requirements.txt b/clearml_serving/serving/requirements.txt index da12834..366b19c 100644 --- a/clearml_serving/serving/requirements.txt +++ b/clearml_serving/serving/requirements.txt @@ -18,3 +18,5 @@ lightgbm>=3.3.2,<3.4 requests>=2.31.0 kafka-python>=2.0.2,<2.1 lz4>=4.0.0,<5 +prometheus_client==0.21.1 +vllm==0.5.4 diff --git a/clearml_serving/statistics/Dockerfile b/clearml_serving/statistics/Dockerfile index a55ccc2..fa01199 100644 --- a/clearml_serving/statistics/Dockerfile +++ b/clearml_serving/statistics/Dockerfile @@ -4,7 +4,7 @@ FROM python:3.11-bullseye ENV LC_ALL=C.UTF-8 # install base package -RUN pip3 install --no-cache-dir clearml-serving +# RUN pip3 install --no-cache-dir clearml-serving # get latest execution code from the git repository # RUN cd $HOME && git clone https://github.com/allegroai/clearml-serving.git diff --git a/clearml_serving/version.py b/clearml_serving/version.py index e398332..5b8f37a 100644 --- a/clearml_serving/version.py +++ b/clearml_serving/version.py @@ -1 +1 @@ -__version__ = '1.3.2' +__version__ = '1.3.5' diff --git a/docker/docker-compose-gpu.yml b/docker/docker-compose-gpu.yml new file mode 100644 index 0000000..0009fa8 --- /dev/null +++ b/docker/docker-compose-gpu.yml @@ -0,0 +1,146 @@ +version: "3" + +services: + zookeeper: + image: bitnami/zookeeper:3.7.0 + container_name: clearml-serving-zookeeper + # ports: + # - "2181:2181" + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + networks: + - clearml-serving-backend + + kafka: + image: bitnami/kafka:3.1.1 + container_name: clearml-serving-kafka + # ports: + # - "9092:9092" + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_LISTENERS=PLAINTEXT://clearml-serving-kafka:9092 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://clearml-serving-kafka:9092 + - KAFKA_CFG_ZOOKEEPER_CONNECT=clearml-serving-zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_CREATE_TOPICS="topic_test:1:1" + depends_on: + - zookeeper + networks: + - clearml-serving-backend + + prometheus: + image: prom/prometheus:v2.34.0 + container_name: clearml-serving-prometheus + volumes: + - ./prometheus.yml:/prometheus.yml + command: + - '--config.file=/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + - '--web.console.libraries=/etc/prometheus/console_libraries' + - '--web.console.templates=/etc/prometheus/consoles' + - '--storage.tsdb.retention.time=200h' + - '--web.enable-lifecycle' + restart: unless-stopped + # ports: + # - "9090:9090" + depends_on: + - clearml-serving-statistics + networks: + - clearml-serving-backend + + alertmanager: + image: prom/alertmanager:v0.23.0 + container_name: clearml-serving-alertmanager + restart: unless-stopped + # ports: + # - "9093:9093" + depends_on: + - prometheus + - grafana + networks: + - clearml-serving-backend + + grafana: + image: grafana/grafana:8.4.4-ubuntu + container_name: clearml-serving-grafana + volumes: + - './datasource.yml:/etc/grafana/provisioning/datasources/datasource.yaml' + restart: unless-stopped + ports: + - "3000:3000" + depends_on: + - prometheus + networks: + - clearml-serving-backend + + + clearml-serving-inference: + image: clearml-serving-inference:latest + container_name: clearml-serving-inference + restart: unless-stopped + # optimize perforamnce + security_opt: + - seccomp:unconfined + ports: + - "8080:8080" + environment: + CLEARML_WEB_HOST: ${CLEARML_WEB_HOST:-https://app.clear.ml} + CLEARML_API_HOST: ${CLEARML_API_HOST:-https://api.clear.ml} + CLEARML_FILES_HOST: ${CLEARML_FILES_HOST:-https://files.clear.ml} + CLEARML_API_ACCESS_KEY: ${CLEARML_API_ACCESS_KEY} + CLEARML_API_SECRET_KEY: ${CLEARML_API_SECRET_KEY} + CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-} + CLEARML_SERVING_PORT: ${CLEARML_SERVING_PORT:-8080} + CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1.0} + CLEARML_DEFAULT_SERVE_SUFFIX: ${CLEARML_DEFAULT_SERVE_SUFFIX:-serve} + CLEARML_DEFAULT_BASE_SERVE_URL: ${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:8080/serve} + CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092} + CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-} + CLEARML_USE_GUNICORN: ${CLEARML_USE_GUNICORN:-} + CLEARML_SERVING_NUM_PROCESS: ${CLEARML_SERVING_NUM_PROCESS:-} + CLEARML_EXTRA_PYTHON_PACKAGES: ${CLEARML_EXTRA_PYTHON_PACKAGES:-} + AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-} + AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-} + AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION:-} + GOOGLE_APPLICATION_CREDENTIALS: ${GOOGLE_APPLICATION_CREDENTIALS:-} + AZURE_STORAGE_ACCOUNT: ${AZURE_STORAGE_ACCOUNT:-} + AZURE_STORAGE_KEY: ${AZURE_STORAGE_KEY:-} + depends_on: + - kafka + networks: + - clearml-serving-backend + deploy: + resources: + reservations: + devices: + - driver: nvidia + device_ids: ['0'] + capabilities: [gpu] + + clearml-serving-statistics: + image: allegroai/clearml-serving-statistics:latest + container_name: clearml-serving-statistics + restart: unless-stopped + # optimize perforamnce + security_opt: + - seccomp:unconfined + # ports: + # - "9999:9999" + environment: + CLEARML_WEB_HOST: ${CLEARML_WEB_HOST:-https://app.clear.ml} + CLEARML_API_HOST: ${CLEARML_API_HOST:-https://api.clear.ml} + CLEARML_FILES_HOST: ${CLEARML_FILES_HOST:-https://files.clear.ml} + CLEARML_API_ACCESS_KEY: ${CLEARML_API_ACCESS_KEY} + CLEARML_API_SECRET_KEY: ${CLEARML_API_SECRET_KEY} + CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-} + CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092} + CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1.0} + depends_on: + - kafka + networks: + - clearml-serving-backend + + +networks: + clearml-serving-backend: + driver: bridge diff --git a/docker/prometheus.yml b/docker/prometheus.yml index 469e220..b7aa51e 100644 --- a/docker/prometheus.yml +++ b/docker/prometheus.yml @@ -20,3 +20,10 @@ scrape_configs: static_configs: - targets: ['clearml-serving-statistics:9999'] + + - job_name: 'vllm' + + scrape_interval: 5s + + static_configs: + - targets: ['clearml-serving-inference'] \ No newline at end of file diff --git a/examples/vllm/preprocess.py b/examples/vllm/preprocess.py index f54b390..a4191d6 100644 --- a/examples/vllm/preprocess.py +++ b/examples/vllm/preprocess.py @@ -1,8 +1,16 @@ """Hugginface preprocessing module for ClearML Serving.""" -from typing import Any +from typing import Any, Optional +from vllm.engine.arg_utils import AsyncEngineArgs +from vllm.engine.async_llm_engine import AsyncLLMEngine +from vllm.entrypoints.logger import RequestLogger +from vllm.entrypoints.openai.serving_chat import OpenAIServingChat +from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion +from vllm.entrypoints.openai.serving_embedding import OpenAIServingEmbedding +from vllm.entrypoints.openai.serving_tokenization import OpenAIServingTokenization +from vllm.usage.usage_lib import UsageContext +from vllm.entrypoints.openai.serving_engine import LoRAModulePath, PromptAdapterPath -# Notice Preprocess class Must be named "Preprocess" class Preprocess: """Processing class will be run by the ClearML inference services before and after each request.""" @@ -12,14 +20,15 @@ class Preprocess: def load(self, local_file_name: str) -> Optional[Any]: # noqa vllm_engine_config = { - "model":f"{local_file_name}/model", - "tokenizer":f"{local_file_name}/tokenizer", + "model": local_file_name, + "tokenizer": local_file_name, "disable_log_requests": True, "disable_log_stats": False, "gpu_memory_utilization": 0.9, "quantization": None, "enforce_eager": True, - "served_model_name": "ai_operator_hyp22v4" + "served_model_name": "test_vllm", + "dtype": "float16" } vllm_model_config = { "lora_modules": None, # [LoRAModulePath(name=a, path=b)] @@ -30,16 +39,12 @@ class Preprocess: "max_log_len": None } self._model = {} - self._model["engine_args"] = AsyncEngineArgs(**vllm_engine_config) - self._model["async_engine_client"] = AsyncLLMEngine.from_engine_args(self.engine_args, usage_context=UsageContext.OPENAI_API_SERVER) - - - self._model["model_config"] = self.async_engine_client.engine.get_model_config() - - self._model["request_logger"] = RequestLogger(max_log_len=vllm_model_config["max_log_len"]) - - self._model["self.openai_serving_chat"] = OpenAIServingChat( - self.async_engine_client, + engine_args = AsyncEngineArgs(**vllm_engine_config) + async_engine_client = AsyncLLMEngine.from_engine_args(engine_args, usage_context=UsageContext.OPENAI_API_SERVER) + model_config = async_engine_client.engine.get_model_config() + request_logger = RequestLogger(max_log_len=vllm_model_config["max_log_len"]) + self._model["openai_serving_chat"] = OpenAIServingChat( + async_engine_client, model_config, served_model_names=[vllm_engine_config["served_model_name"]], response_role=vllm_model_config["response_role"], @@ -50,7 +55,7 @@ class Preprocess: return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"] ) self._model["openai_serving_completion"] = OpenAIServingCompletion( - self.async_engine_client, + async_engine_client, model_config, served_model_names=[vllm_engine_config["served_model_name"]], lora_modules=vllm_model_config["lora_modules"], @@ -58,17 +63,40 @@ class Preprocess: request_logger=request_logger, return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"] ) - self._model["self.openai_serving_embedding"] = OpenAIServingEmbedding( - self.async_engine_client, + self._model["openai_serving_embedding"] = OpenAIServingEmbedding( + async_engine_client, model_config, served_model_names=[vllm_engine_config["served_model_name"]], request_logger=request_logger ) - self._model["self.openai_serving_tokenization"] = OpenAIServingTokenization( - self.async_engine_client, + self._model["openai_serving_tokenization"] = OpenAIServingTokenization( + async_engine_client, model_config, served_model_names=[vllm_engine_config["served_model_name"]], lora_modules=vllm_model_config["lora_modules"], request_logger=request_logger, chat_template=vllm_model_config["chat_template"] ) + return self._model + + def remove_extra_system_prompts(self, messages: List) -> List: + system_messages_indices = [] + for i, msg in enumerate(messages): + if msg["role"] == "system": + system_messages_indices.append(i) + else: + break + if len(system_messages_indices) > 1: + last_system_index = system_messages_indices[-1] + messages = [msg for i, msg in enumerate(messages) if msg["role"] != "system" or i == last_system_index] + return messages + + def preprocess( + self, + body: Union[bytes, dict], + state: dict, + collect_custom_statistics_fn: Optional[Callable[[dict], None]], + ) -> Any: # noqa + if "messages" in body: + body["messages"] = self.remove_extra_system_prompts(body["messages"]) + return body diff --git a/examples/vllm/readme.md b/examples/vllm/readme.md index fd03eb9..8abb6d3 100644 --- a/examples/vllm/readme.md +++ b/examples/vllm/readme.md @@ -3,21 +3,22 @@ ## setting up the serving service 1. Create serving Service: `clearml-serving create --name "serving example"` (write down the service ID) + 2. Make sure to add any required additional packages (for your custom model) to the [docker-compose.yml](https://github.com/allegroai/clearml-serving/blob/826f503cf4a9b069b89eb053696d218d1ce26f47/docker/docker-compose.yml#L97) (or as environment variable to the `clearml-serving-inference` container), by defining for example: `CLEARML_EXTRA_PYTHON_PACKAGES="vllm==0.5.4"` 3. Create model endpoint: -`clearml-serving --id model add --engine vllm --endpoint "test_vllm" --preprocess "examples/vllm/preprocess.py" --name "test vllm" --project "serving examples"` +`clearml-serving --id model add --model-id --engine vllm --endpoint "test_vllm" --preprocess "examples/vllm/preprocess.py" --name "test vllm" --project "serving examples"` -Or auto update + Or auto update -`clearml-serving --id model auto-update --engine vllm --endpoint "test_vllm" --preprocess "examples/vllm/preprocess.py" --name "test vllm" --project "serving examples" --max-versions 2` + `clearml-serving --id model auto-update --engine vllm --endpoint "test_vllm" --preprocess "examples/vllm/preprocess.py" --name "test vllm" --project "serving examples" --max-versions 2` -Or add Canary endpoint + Or add Canary endpoint -`clearml-serving --id model canary --endpoint "test_vllm" --weights 0.1 0.9 --input-endpoint-prefix test_vllm` + `clearml-serving --id model canary --endpoint "test_vllm" --weights 0.1 0.9 --input-endpoint-prefix test_vllm` 4. If you already have the `clearml-serving` docker-compose running, it might take it a minute or two to sync with the new endpoint. -Or you can run the clearml-serving container independently `docker run -v ~/clearml.conf:/root/clearml.conf -p 8080:8080 -e CLEARML_SERVING_TASK_ID= clearml-serving:latest` + Or you can run the clearml-serving container independently `docker run -v ~/clearml.conf:/root/clearml.conf -p 8080:8080 -e CLEARML_SERVING_TASK_ID= clearml-serving:latest` 5. Test new endpoint (do notice the first call will trigger the model pulling, so it might take longer, from here on, it's all in memory): @@ -49,3 +50,14 @@ r1 = await openai.Completion.acreate( print(f"Completion: \n {r1['choices'][0]['text']}") ``` +NOTE! + +If you want to use send_request method, keep in mind that you have to pass "completions" or "chat/completions" in entrypoint (and pass model as a part of "data" parameter) and use it for non-streaming models: + +```python +prompt = "Hi there, goodman!" +result = self.send_request(endpoint="chat/completions", version=None, data={"model": "test_vllm", "messages": [{"role": "system", "content": "You are a helpful assistant"}, {"role": "user", "content": prompt}]}) +answer = result.choises[0].message.content +``` +OR +If you want to use send_request method, use openai client instead \ No newline at end of file