From 9441ae8473a078627e8b8acb073416565d4b1911 Mon Sep 17 00:00:00 2001 From: IlyaMescheryakov1402 Date: Mon, 10 Mar 2025 23:52:14 +0300 Subject: [PATCH] move engine init in separate class --- clearml_serving/serving/preprocess_service.py | 261 +++++++++++++----- docker/docker-compose-gpu.yml | 1 + examples/vllm/preprocess.py | 160 ++++++----- examples/vllm/test_openai_api.py | 2 +- 4 files changed, 273 insertions(+), 151 deletions(-) diff --git a/clearml_serving/serving/preprocess_service.py b/clearml_serving/serving/preprocess_service.py index 0e7d15a..09e5ded 100644 --- a/clearml_serving/serving/preprocess_service.py +++ b/clearml_serving/serving/preprocess_service.py @@ -1,4 +1,5 @@ import os +import json import sys import threading import traceback @@ -14,19 +15,12 @@ from requests import post as request_post from .endpoints import ModelEndpoint -# try: -# import prometheus_client -# from fastapi.responses import JSONResponse, StreamingResponse -# from vllm.entrypoints.openai.protocol import ( -# ChatCompletionRequest, -# ChatCompletionResponse, -# CompletionRequest, -# ErrorResponse -# ) -# from vllm.logger import init_logger -# except ImportError: -# print("WARNING: install vllm==0.5.4 and prometheus_client==0.21.1 to serve vllm engine") - +class Singleton(object): + _instance = None + def __new__(class_, *args, **kwargs): + if not isinstance(class_._instance, class_): + class_._instance = object.__new__(class_, *args, **kwargs) + return class_._instance class BasePreprocessRequest(object): __preprocessing_lookup = {} @@ -611,41 +605,21 @@ class CustomAsyncPreprocessRequest(BasePreprocessRequest): return return_value.json() -@BasePreprocessRequest.register_engine("vllm", modules=["vllm", "fastapi"]) -class VllmPreprocessRequest(BasePreprocessRequest): - is_preprocess_async = True - is_process_async = True - is_postprocess_async = True - asyncio_to_thread = None +class VllmEngine(Singleton): + _model = None _vllm = None _fastapi = None + is_already_loaded = False - class CustomRequest: - def __init__(self, headers: Optional[dict] = None): - self.headers = headers - - async def is_disconnected(self): - return False - - def __init__(self, model_endpoint: ModelEndpoint, task: Task = None): - super(VllmPreprocessRequest, self).__init__( - model_endpoint=model_endpoint, task=task) + def __init__(self): # load vLLM Modules if self._vllm is None: - from vllm.entrypoints.openai.protocol import ( - ChatCompletionRequest, - ChatCompletionResponse, - CompletionRequest, - CompletionResponse, - ErrorResponse - ) + from vllm import entrypoints, engine, usage self._vllm = {} - self._vllm["chat_completion_request"] = ChatCompletionRequest - self._vllm["chat_completion_response"] = ChatCompletionResponse - self._vllm["completion_request"] = CompletionRequest - self._vllm["completion_response"] = CompletionResponse - self._vllm["error_response"] = ErrorResponse + self._vllm["entrypoints"] = entrypoints + self._vllm["engine"] = engine + self._vllm["usage"] = usage if self._fastapi is None: from fastapi.responses import JSONResponse, StreamingResponse @@ -656,16 +630,189 @@ class VllmPreprocessRequest(BasePreprocessRequest): from vllm.logger import init_logger self.logger = init_logger(__name__) - if VllmPreprocessRequest.asyncio_to_thread is None: - from asyncio import to_thread as asyncio_to_thread - VllmPreprocessRequest.asyncio_to_thread = asyncio_to_thread - import socket import prometheus_client with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: if not s.connect_ex(('localhost', 8000)) == 0: prometheus_client.start_http_server(8000) + def load_engine( + self, + name: str, + model_path: str, + vllm_model_config: dict, + chat_settings: dict + ): + if self.is_already_loaded: + self.add_models(name=name, model_path=model_path) + return None + + vllm_engine_config = json.loads(os.environ.get("VLLM_ENGINE_ARGS")) + engine_args = self._vllm["engine"].arg_utils.AsyncEngineArgs(**vllm_engine_config) + async_engine_client = self._vllm["engine"].async_llm_engine.AsyncLLMEngine.from_engine_args( + engine_args, + usage_context=self._vllm["usage"].usage_lib.UsageContext.OPENAI_API_SERVER + ) + model_config = async_engine_client.engine.get_model_config() + request_logger = self._vllm["entrypoints"].logger.RequestLogger( + max_log_len=vllm_model_config["max_log_len"] + ) + self._model["openai_serving_models"] = self._vllm[ + "entrypoints" + ].openai.serving_models.OpenAIServingModels( + async_engine_client, + model_config, + [ + self._vllm["entrypoints"].openai.serving_models.BaseModelPath( + name=name, + model_path=model_path + ) + ], + lora_modules=svllm_model_config["lora_modules"], + prompt_adapters=vllm_model_config["prompt_adapters"], + ) + await self._model["openai_serving_models"].init_static_loras() + self._model["openai_serving"] = self._vllm[ + "entrypoints" + ].openai.serving_engine.OpenAIServing( + async_engine_client, + model_config, + self._model["openai_serving_models"], + request_logger=request_logger, + return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"] + ) + self._model["openai_serving_chat"] = self._vllm[ + "entrypoints" + ].openai.serving_chat.OpenAIServingChat( + async_engine_client, + model_config, + self._model["openai_serving_models"], + response_role=vllm_model_config["response_role"], + request_logger=request_logger, + chat_template=vllm_model_config["chat_template"], + chat_template_content_format=chat_settings["chat_template_content_format"], + return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"], + enable_reasoning=chat_settings["enable_reasoning"], + reasoning_parser=chat_settings["reasoning_parser"], + enable_auto_tools=chat_settings["enable_auto_tools"], + tool_parser=chat_settings["tool_parser"], + enable_prompt_tokens_details=chat_settings["enable_prompt_tokens_details"] + ) if model_config.runner_type == "generate" else None + self._model["openai_serving_completion"] = self._vllm[ + "entrypoints" + ].openai.serving_completion.OpenAIServingCompletion( + async_engine_client, + model_config, + self._model["openai_serving_models"], + request_logger=request_logger, + return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"] + ) if model_config.runner_type == "generate" else None + self._model["openai_serving_embedding"] = self._vllm[ + "entrypoints" + ].openai.serving_embedding.OpenAIServingEmbedding( + async_engine_client, + model_config, + self._model["openai_serving_models"], + request_logger=request_logger, + chat_template=vllm_model_config["chat_template"], + chat_template_content_format=chat_settings["chat_template_content_format"] + ) if model_config.task == "embed" else None + self._model["openai_serving_tokenization"] = self._vllm[ + "entrypoints" + ].openai.serving_tokenization.OpenAIServingTokenization( + async_engine_client, + model_config, + self._model["openai_serving_models"], + request_logger=request_logger, + chat_template=vllm_model_config["chat_template"], + chat_template_content_format=chat_settings["chat_template_content_format"] + ) + self.logger.info("vLLM Engine was successfully initialized") + self.is_already_loaded = True + return None + + def add_models(self, name: str, model_path: str): + self._model["openai_serving_models"].base_model_paths.append( + self._vllm["entrypoints"].openai.serving_models.BaseModelPath( + name=name, + model_path=model_path + ) + ) + self.logger.info("Model {} was added to vllm engine".format(name)) + return None + + async def completions( + self, + data: Any, + state: dict, + collect_custom_statistics_fn: Callable[[dict], None] = None + ) -> Any: + """ + The actual processing function. + We run the process in this context + """ + request, raw_request = data["request"], data["raw_request"] + # analog of completion(raw_request) in https://github.com/vllm-project/vllm/blob/v0.7.3/vllm/entrypoints/openai/api_server.py#L405 + handler = self._model["openai_serving_completion"] + if handler is None: + return self._model["openai_serving"].create_error_response( + message="The model does not support Completions API" + ) + generator = await handler.create_completion(request=request, raw_request=raw_request) + if isinstance(generator, self._vllm["entrypoints"].openai.protocol.ErrorResponse): + return self._fastapi["json_response"](content=generator.model_dump(), status_code=generator.code) + elif isinstance(generator, self._vllm["entrypoints"].openai.protocol.CompletionResponse): + return self._fastapi["json_response"](content=generator.model_dump()) + return self._fastapi["streaming_response"](content=generator, media_type="text/event-stream") + + + async def chat_completions( + self, + data: Any, + state: dict, + collect_custom_statistics_fn: Callable[[dict], None] = None + ) -> Any: + """ + The actual processing function. + We run the process in this context + """ + request, raw_request = data["request"], data["raw_request"] + # analog of chat(raw_request) in https://github.com/vllm-project/vllm/blob/v0.7.3/vllm/entrypoints/openai/api_server.py#L405 + handler = self._model["openai_serving_chat"] + if handler is None: + return self._model["openai_serving"].create_error_response( + message="The model does not support Chat Completions API" + ) + generator = await handler.create_chat_completion(request=request, raw_request=raw_request) + if isinstance(generator, self._vllm["entrypoints"].openai.protocol.ErrorResponse): + return self._fastapi["json_response"](content=generator.model_dump(), status_code=generator.code) + elif isinstance(generator, self._vllm["entrypoints"].openai.protocol.ChatCompletionResponse): + return self._fastapi["json_response"](content=generator.model_dump()) + return self._fastapi["streaming_response"](content=generator, media_type="text/event-stream") + + +@BasePreprocessRequest.register_engine("vllm", modules=["vllm", "fastapi"]) +class VllmPreprocessRequest(BasePreprocessRequest): + is_preprocess_async = True + is_process_async = True + is_postprocess_async = True + asyncio_to_thread = None + _vllm_engine = None + + def __init__(self, model_endpoint: ModelEndpoint, task: Task = None): + super(VllmPreprocessRequest, self).__init__( + model_endpoint=model_endpoint, task=task) + self._vllm_engine = VllmEngine() + self._vllm_engine.load_engine( + name=model_endpoint.serving_url, + model_path=self._get_local_model_file(), + **self._model + ) + + if VllmPreprocessRequest.asyncio_to_thread is None: + from asyncio import to_thread as asyncio_to_thread + VllmPreprocessRequest.asyncio_to_thread = asyncio_to_thread + # override `send_request` method with the async version self._preprocess.__class__.send_request = VllmPreprocessRequest._preprocess_send_request @@ -734,17 +881,7 @@ class VllmPreprocessRequest(BasePreprocessRequest): The actual processing function. We run the process in this context """ - request, raw_request = data["request"], data["raw_request"] - # analog of completion(raw_request) in https://github.com/vllm-project/vllm/blob/v0.7.3/vllm/entrypoints/openai/api_server.py#L405 - handler = self._model["openai_serving_completion"] - if handler is None: - return self._model["openai_serving"].create_error_response(message="The model does not support Completions API") - generator = await handler.create_completion(request=request, raw_request=raw_request) - if isinstance(generator, self._vllm["error_response"]): - return self._fastapi["json_response"](content=generator.model_dump(), status_code=generator.code) - elif isinstance(generator, self._vllm["completion_response"]): - return self._fastapi["json_response"](content=generator.model_dump()) - return self._fastapi["streaming_response"](content=generator, media_type="text/event-stream") + return self._vllm_engine.completions(data=data, state=state, collect_custom_statistics_fn=collect_custom_statistics_fn) async def chat_completions(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: @@ -752,17 +889,7 @@ class VllmPreprocessRequest(BasePreprocessRequest): The actual processing function. We run the process in this context """ - request, raw_request = data["request"], data["raw_request"] - # analog of chat(raw_request) in https://github.com/vllm-project/vllm/blob/v0.7.3/vllm/entrypoints/openai/api_server.py#L405 - handler = self._model["openai_serving_chat"] - if handler is None: - return self._model["openai_serving"].create_error_response(message="The model does not support Chat Completions API") - generator = await handler.create_chat_completion(request=request, raw_request=raw_request) - if isinstance(generator, self._vllm["error_response"]): - return self._fastapi["json_response"](content=generator.model_dump(), status_code=generator.code) - elif isinstance(generator, self._vllm["chat_completion_response"]): - return self._fastapi["json_response"](content=generator.model_dump()) - return self._fastapi["streaming_response"](content=generator, media_type="text/event-stream") + return self._vllm_engine.chat_completions(data=data, state=state, collect_custom_statistics_fn=collect_custom_statistics_fn) @staticmethod diff --git a/docker/docker-compose-gpu.yml b/docker/docker-compose-gpu.yml index 0009fa8..221f6d0 100644 --- a/docker/docker-compose-gpu.yml +++ b/docker/docker-compose-gpu.yml @@ -105,6 +105,7 @@ services: GOOGLE_APPLICATION_CREDENTIALS: ${GOOGLE_APPLICATION_CREDENTIALS:-} AZURE_STORAGE_ACCOUNT: ${AZURE_STORAGE_ACCOUNT:-} AZURE_STORAGE_KEY: ${AZURE_STORAGE_KEY:-} + VLLM_ENGINE_ARGS: ${VLLM_ENGINE_ARGS:-'{"disable_log_requests":true,"disable_log_stats":false,"gpu_memory_utilization":0.95,"quantization":null,"enforce_eager":true}'} depends_on: - kafka networks: diff --git a/examples/vllm/preprocess.py b/examples/vllm/preprocess.py index b2c6ce0..aa0f13a 100644 --- a/examples/vllm/preprocess.py +++ b/examples/vllm/preprocess.py @@ -1,16 +1,5 @@ """Hugginface preprocessing module for ClearML Serving.""" from typing import Any, Optional, List, Callable, Union -from vllm.engine.arg_utils import AsyncEngineArgs -from vllm.engine.async_llm_engine import AsyncLLMEngine -from vllm.entrypoints.logger import RequestLogger -from vllm.entrypoints.openai.serving_engine import OpenAIServing -from vllm.entrypoints.openai.serving_models import OpenAIServingModels, LoRAModulePath, PromptAdapterPath, BaseModelPath -from vllm.entrypoints.openai.serving_chat import OpenAIServingChat -from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion -from vllm.entrypoints.openai.serving_embedding import OpenAIServingEmbedding -from vllm.entrypoints.openai.serving_tokenization import OpenAIServingTokenization -from vllm.entrypoints.chat_utils import ChatTemplateContentFormatOption -from vllm.usage.usage_lib import UsageContext class Preprocess: @@ -22,19 +11,19 @@ class Preprocess: def load(self, local_file_name: str) -> Optional[Any]: # noqa - self.vllm_engine_config = { - "model": local_file_name, - "tokenizer": local_file_name, - "disable_log_requests": True, - "disable_log_stats": False, - "gpu_memory_utilization": 0.9, - "quantization": None, - "enforce_eager": True, - "served_model_name": "test_vllm", - "dtype": "float16", - "max_model_len": 8192 - } - self.vllm_model_config = { + # vllm_engine_config = { + # "model": local_file_name, + # "tokenizer": local_file_name, + # "disable_log_requests": True, + # "disable_log_stats": False, + # "gpu_memory_utilization": 0.9, + # "quantization": None, + # "enforce_eager": True, + # "served_model_name": "test_vllm", + # "dtype": "float16", + # "max_model_len": 8192 + # } + vllm_model_config = { "lora_modules": None, # [LoRAModulePath(name=a, path=b)] "prompt_adapters": None, # [PromptAdapterPath(name=a, path=b)] "response_role": "assistant", @@ -42,7 +31,7 @@ class Preprocess: "return_tokens_as_token_ids": False, "max_log_len": None } - self.chat_settings = { + chat_settings = { "enable_reasoning": False, "reasoning_parser": None, "enable_auto_tools": False, @@ -50,64 +39,69 @@ class Preprocess: "enable_prompt_tokens_details": False, "chat_template_content_format": "auto" } - self._model = {} - self.engine_args = AsyncEngineArgs(**self.vllm_engine_config) - async_engine_client = AsyncLLMEngine.from_engine_args(self.engine_args, usage_context=UsageContext.OPENAI_API_SERVER) - self.model_config = async_engine_client.engine.get_model_config() - request_logger = RequestLogger(max_log_len=self.vllm_model_config["max_log_len"]) - self._model["openai_serving_models"] = OpenAIServingModels( - async_engine_client, - self.model_config, - [BaseModelPath(name=self.vllm_engine_config["served_model_name"], model_path=self.vllm_engine_config["model"])], - lora_modules=self.vllm_model_config["lora_modules"], - prompt_adapters=self.vllm_model_config["prompt_adapters"], - ) - self._model["openai_serving"] = OpenAIServing( - async_engine_client, - self.model_config, - self._model["openai_serving_models"], - request_logger=request_logger, - return_tokens_as_token_ids=self.vllm_model_config["return_tokens_as_token_ids"] - ) - self._model["openai_serving_chat"] = OpenAIServingChat( - async_engine_client, - self.model_config, - self._model["openai_serving_models"], - response_role=self.vllm_model_config["response_role"], - request_logger=request_logger, - chat_template=self.vllm_model_config["chat_template"], - chat_template_content_format=self.chat_settings["chat_template_content_format"], - return_tokens_as_token_ids=self.vllm_model_config["return_tokens_as_token_ids"], - enable_reasoning=self.chat_settings["enable_reasoning"], - reasoning_parser=self.chat_settings["reasoning_parser"], - enable_auto_tools=self.chat_settings["enable_auto_tools"], - tool_parser=self.chat_settings["tool_parser"], - enable_prompt_tokens_details=self.chat_settings["enable_prompt_tokens_details"] - ) if self.model_config.runner_type == "generate" else None - self._model["openai_serving_completion"] = OpenAIServingCompletion( - async_engine_client, - self.model_config, - self._model["openai_serving_models"], - request_logger=request_logger, - return_tokens_as_token_ids=self.vllm_model_config["return_tokens_as_token_ids"] - ) if self.model_config.runner_type == "generate" else None - self._model["openai_serving_embedding"] = OpenAIServingEmbedding( - async_engine_client, - self.model_config, - self._model["openai_serving_models"], - request_logger=request_logger, - chat_template=self.vllm_model_config["chat_template"], - chat_template_content_format=self.chat_settings["chat_template_content_format"] - ) if self.model_config.task == "embed" else None - self._model["openai_serving_tokenization"] = OpenAIServingTokenization( - async_engine_client, - self.model_config, - self._model["openai_serving_models"], - request_logger=request_logger, - chat_template=self.vllm_model_config["chat_template"], - chat_template_content_format=self.chat_settings["chat_template_content_format"] - ) - return self._model + # self._model = {} + # engine_args = AsyncEngineArgs(**self.vllm_engine_config) + # async_engine_client = AsyncLLMEngine.from_engine_args(self.engine_args, usage_context=UsageContext.OPENAI_API_SERVER) + # model_config = async_engine_client.engine.get_model_config() + # request_logger = RequestLogger(max_log_len=self.vllm_model_config["max_log_len"]) + # self._model["openai_serving_models"] = OpenAIServingModels( + # async_engine_client, + # self.model_config, + # [BaseModelPath(name=self.vllm_engine_config["served_model_name"], model_path=self.vllm_engine_config["model"])], + # lora_modules=self.vllm_model_config["lora_modules"], + # prompt_adapters=self.vllm_model_config["prompt_adapters"], + # ) + # self._model["openai_serving"] = OpenAIServing( + # async_engine_client, + # self.model_config, + # self._model["openai_serving_models"], + # request_logger=request_logger, + # return_tokens_as_token_ids=self.vllm_model_config["return_tokens_as_token_ids"] + # ) + # self._model["openai_serving_chat"] = OpenAIServingChat( + # async_engine_client, + # self.model_config, + # self._model["openai_serving_models"], + # response_role=self.vllm_model_config["response_role"], + # request_logger=request_logger, + # chat_template=self.vllm_model_config["chat_template"], + # chat_template_content_format=self.chat_settings["chat_template_content_format"], + # return_tokens_as_token_ids=self.vllm_model_config["return_tokens_as_token_ids"], + # enable_reasoning=self.chat_settings["enable_reasoning"], + # reasoning_parser=self.chat_settings["reasoning_parser"], + # enable_auto_tools=self.chat_settings["enable_auto_tools"], + # tool_parser=self.chat_settings["tool_parser"], + # enable_prompt_tokens_details=self.chat_settings["enable_prompt_tokens_details"] + # ) if self.model_config.runner_type == "generate" else None + # self._model["openai_serving_completion"] = OpenAIServingCompletion( + # async_engine_client, + # self.model_config, + # self._model["openai_serving_models"], + # request_logger=request_logger, + # return_tokens_as_token_ids=self.vllm_model_config["return_tokens_as_token_ids"] + # ) if self.model_config.runner_type == "generate" else None + # self._model["openai_serving_embedding"] = OpenAIServingEmbedding( + # async_engine_client, + # self.model_config, + # self._model["openai_serving_models"], + # request_logger=request_logger, + # chat_template=self.vllm_model_config["chat_template"], + # chat_template_content_format=self.chat_settings["chat_template_content_format"] + # ) if self.model_config.task == "embed" else None + # self._model["openai_serving_tokenization"] = OpenAIServingTokenization( + # async_engine_client, + # self.model_config, + # self._model["openai_serving_models"], + # request_logger=request_logger, + # chat_template=self.vllm_model_config["chat_template"], + # chat_template_content_format=self.chat_settings["chat_template_content_format"] + # ) + # return self._model + return { + # "vllm_engine_config": vllm_engine_config, + "vllm_model_config": vllm_model_config, + "chat_settings": chat_settings + } def remove_extra_system_prompts(self, messages: List) -> List: system_messages_indices = [] diff --git a/examples/vllm/test_openai_api.py b/examples/vllm/test_openai_api.py index 4290427..55d7b27 100644 --- a/examples/vllm/test_openai_api.py +++ b/examples/vllm/test_openai_api.py @@ -15,7 +15,7 @@ def main(model_name: str = "test_vllm"): top_p=1.0 ) - print(f"ChatCompletion: {chat_response.choices[0].message}") + print(f"ChatCompletion: {chat_response.choices[0].message.content}") comp_response = client.completions.create( model=model_name,