move engine init in separate class

This commit is contained in:
IlyaMescheryakov1402
2025-03-10 23:52:14 +03:00
parent 1c591f2d15
commit 9441ae8473
4 changed files with 273 additions and 151 deletions

View File

@@ -1,4 +1,5 @@
import os
import json
import sys
import threading
import traceback
@@ -14,19 +15,12 @@ from requests import post as request_post
from .endpoints import ModelEndpoint
# try:
# import prometheus_client
# from fastapi.responses import JSONResponse, StreamingResponse
# from vllm.entrypoints.openai.protocol import (
# ChatCompletionRequest,
# ChatCompletionResponse,
# CompletionRequest,
# ErrorResponse
# )
# from vllm.logger import init_logger
# except ImportError:
# print("WARNING: install vllm==0.5.4 and prometheus_client==0.21.1 to serve vllm engine")
class Singleton(object):
_instance = None
def __new__(class_, *args, **kwargs):
if not isinstance(class_._instance, class_):
class_._instance = object.__new__(class_, *args, **kwargs)
return class_._instance
class BasePreprocessRequest(object):
__preprocessing_lookup = {}
@@ -611,41 +605,21 @@ class CustomAsyncPreprocessRequest(BasePreprocessRequest):
return return_value.json()
@BasePreprocessRequest.register_engine("vllm", modules=["vllm", "fastapi"])
class VllmPreprocessRequest(BasePreprocessRequest):
is_preprocess_async = True
is_process_async = True
is_postprocess_async = True
asyncio_to_thread = None
class VllmEngine(Singleton):
_model = None
_vllm = None
_fastapi = None
is_already_loaded = False
class CustomRequest:
def __init__(self, headers: Optional[dict] = None):
self.headers = headers
async def is_disconnected(self):
return False
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None):
super(VllmPreprocessRequest, self).__init__(
model_endpoint=model_endpoint, task=task)
def __init__(self):
# load vLLM Modules
if self._vllm is None:
from vllm.entrypoints.openai.protocol import (
ChatCompletionRequest,
ChatCompletionResponse,
CompletionRequest,
CompletionResponse,
ErrorResponse
)
from vllm import entrypoints, engine, usage
self._vllm = {}
self._vllm["chat_completion_request"] = ChatCompletionRequest
self._vllm["chat_completion_response"] = ChatCompletionResponse
self._vllm["completion_request"] = CompletionRequest
self._vllm["completion_response"] = CompletionResponse
self._vllm["error_response"] = ErrorResponse
self._vllm["entrypoints"] = entrypoints
self._vllm["engine"] = engine
self._vllm["usage"] = usage
if self._fastapi is None:
from fastapi.responses import JSONResponse, StreamingResponse
@@ -656,16 +630,189 @@ class VllmPreprocessRequest(BasePreprocessRequest):
from vllm.logger import init_logger
self.logger = init_logger(__name__)
if VllmPreprocessRequest.asyncio_to_thread is None:
from asyncio import to_thread as asyncio_to_thread
VllmPreprocessRequest.asyncio_to_thread = asyncio_to_thread
import socket
import prometheus_client
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
if not s.connect_ex(('localhost', 8000)) == 0:
prometheus_client.start_http_server(8000)
def load_engine(
self,
name: str,
model_path: str,
vllm_model_config: dict,
chat_settings: dict
):
if self.is_already_loaded:
self.add_models(name=name, model_path=model_path)
return None
vllm_engine_config = json.loads(os.environ.get("VLLM_ENGINE_ARGS"))
engine_args = self._vllm["engine"].arg_utils.AsyncEngineArgs(**vllm_engine_config)
async_engine_client = self._vllm["engine"].async_llm_engine.AsyncLLMEngine.from_engine_args(
engine_args,
usage_context=self._vllm["usage"].usage_lib.UsageContext.OPENAI_API_SERVER
)
model_config = async_engine_client.engine.get_model_config()
request_logger = self._vllm["entrypoints"].logger.RequestLogger(
max_log_len=vllm_model_config["max_log_len"]
)
self._model["openai_serving_models"] = self._vllm[
"entrypoints"
].openai.serving_models.OpenAIServingModels(
async_engine_client,
model_config,
[
self._vllm["entrypoints"].openai.serving_models.BaseModelPath(
name=name,
model_path=model_path
)
],
lora_modules=svllm_model_config["lora_modules"],
prompt_adapters=vllm_model_config["prompt_adapters"],
)
await self._model["openai_serving_models"].init_static_loras()
self._model["openai_serving"] = self._vllm[
"entrypoints"
].openai.serving_engine.OpenAIServing(
async_engine_client,
model_config,
self._model["openai_serving_models"],
request_logger=request_logger,
return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"]
)
self._model["openai_serving_chat"] = self._vllm[
"entrypoints"
].openai.serving_chat.OpenAIServingChat(
async_engine_client,
model_config,
self._model["openai_serving_models"],
response_role=vllm_model_config["response_role"],
request_logger=request_logger,
chat_template=vllm_model_config["chat_template"],
chat_template_content_format=chat_settings["chat_template_content_format"],
return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"],
enable_reasoning=chat_settings["enable_reasoning"],
reasoning_parser=chat_settings["reasoning_parser"],
enable_auto_tools=chat_settings["enable_auto_tools"],
tool_parser=chat_settings["tool_parser"],
enable_prompt_tokens_details=chat_settings["enable_prompt_tokens_details"]
) if model_config.runner_type == "generate" else None
self._model["openai_serving_completion"] = self._vllm[
"entrypoints"
].openai.serving_completion.OpenAIServingCompletion(
async_engine_client,
model_config,
self._model["openai_serving_models"],
request_logger=request_logger,
return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"]
) if model_config.runner_type == "generate" else None
self._model["openai_serving_embedding"] = self._vllm[
"entrypoints"
].openai.serving_embedding.OpenAIServingEmbedding(
async_engine_client,
model_config,
self._model["openai_serving_models"],
request_logger=request_logger,
chat_template=vllm_model_config["chat_template"],
chat_template_content_format=chat_settings["chat_template_content_format"]
) if model_config.task == "embed" else None
self._model["openai_serving_tokenization"] = self._vllm[
"entrypoints"
].openai.serving_tokenization.OpenAIServingTokenization(
async_engine_client,
model_config,
self._model["openai_serving_models"],
request_logger=request_logger,
chat_template=vllm_model_config["chat_template"],
chat_template_content_format=chat_settings["chat_template_content_format"]
)
self.logger.info("vLLM Engine was successfully initialized")
self.is_already_loaded = True
return None
def add_models(self, name: str, model_path: str):
self._model["openai_serving_models"].base_model_paths.append(
self._vllm["entrypoints"].openai.serving_models.BaseModelPath(
name=name,
model_path=model_path
)
)
self.logger.info("Model {} was added to vllm engine".format(name))
return None
async def completions(
self,
data: Any,
state: dict,
collect_custom_statistics_fn: Callable[[dict], None] = None
) -> Any:
"""
The actual processing function.
We run the process in this context
"""
request, raw_request = data["request"], data["raw_request"]
# analog of completion(raw_request) in https://github.com/vllm-project/vllm/blob/v0.7.3/vllm/entrypoints/openai/api_server.py#L405
handler = self._model["openai_serving_completion"]
if handler is None:
return self._model["openai_serving"].create_error_response(
message="The model does not support Completions API"
)
generator = await handler.create_completion(request=request, raw_request=raw_request)
if isinstance(generator, self._vllm["entrypoints"].openai.protocol.ErrorResponse):
return self._fastapi["json_response"](content=generator.model_dump(), status_code=generator.code)
elif isinstance(generator, self._vllm["entrypoints"].openai.protocol.CompletionResponse):
return self._fastapi["json_response"](content=generator.model_dump())
return self._fastapi["streaming_response"](content=generator, media_type="text/event-stream")
async def chat_completions(
self,
data: Any,
state: dict,
collect_custom_statistics_fn: Callable[[dict], None] = None
) -> Any:
"""
The actual processing function.
We run the process in this context
"""
request, raw_request = data["request"], data["raw_request"]
# analog of chat(raw_request) in https://github.com/vllm-project/vllm/blob/v0.7.3/vllm/entrypoints/openai/api_server.py#L405
handler = self._model["openai_serving_chat"]
if handler is None:
return self._model["openai_serving"].create_error_response(
message="The model does not support Chat Completions API"
)
generator = await handler.create_chat_completion(request=request, raw_request=raw_request)
if isinstance(generator, self._vllm["entrypoints"].openai.protocol.ErrorResponse):
return self._fastapi["json_response"](content=generator.model_dump(), status_code=generator.code)
elif isinstance(generator, self._vllm["entrypoints"].openai.protocol.ChatCompletionResponse):
return self._fastapi["json_response"](content=generator.model_dump())
return self._fastapi["streaming_response"](content=generator, media_type="text/event-stream")
@BasePreprocessRequest.register_engine("vllm", modules=["vllm", "fastapi"])
class VllmPreprocessRequest(BasePreprocessRequest):
is_preprocess_async = True
is_process_async = True
is_postprocess_async = True
asyncio_to_thread = None
_vllm_engine = None
def __init__(self, model_endpoint: ModelEndpoint, task: Task = None):
super(VllmPreprocessRequest, self).__init__(
model_endpoint=model_endpoint, task=task)
self._vllm_engine = VllmEngine()
self._vllm_engine.load_engine(
name=model_endpoint.serving_url,
model_path=self._get_local_model_file(),
**self._model
)
if VllmPreprocessRequest.asyncio_to_thread is None:
from asyncio import to_thread as asyncio_to_thread
VllmPreprocessRequest.asyncio_to_thread = asyncio_to_thread
# override `send_request` method with the async version
self._preprocess.__class__.send_request = VllmPreprocessRequest._preprocess_send_request
@@ -734,17 +881,7 @@ class VllmPreprocessRequest(BasePreprocessRequest):
The actual processing function.
We run the process in this context
"""
request, raw_request = data["request"], data["raw_request"]
# analog of completion(raw_request) in https://github.com/vllm-project/vllm/blob/v0.7.3/vllm/entrypoints/openai/api_server.py#L405
handler = self._model["openai_serving_completion"]
if handler is None:
return self._model["openai_serving"].create_error_response(message="The model does not support Completions API")
generator = await handler.create_completion(request=request, raw_request=raw_request)
if isinstance(generator, self._vllm["error_response"]):
return self._fastapi["json_response"](content=generator.model_dump(), status_code=generator.code)
elif isinstance(generator, self._vllm["completion_response"]):
return self._fastapi["json_response"](content=generator.model_dump())
return self._fastapi["streaming_response"](content=generator, media_type="text/event-stream")
return self._vllm_engine.completions(data=data, state=state, collect_custom_statistics_fn=collect_custom_statistics_fn)
async def chat_completions(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any:
@@ -752,17 +889,7 @@ class VllmPreprocessRequest(BasePreprocessRequest):
The actual processing function.
We run the process in this context
"""
request, raw_request = data["request"], data["raw_request"]
# analog of chat(raw_request) in https://github.com/vllm-project/vllm/blob/v0.7.3/vllm/entrypoints/openai/api_server.py#L405
handler = self._model["openai_serving_chat"]
if handler is None:
return self._model["openai_serving"].create_error_response(message="The model does not support Chat Completions API")
generator = await handler.create_chat_completion(request=request, raw_request=raw_request)
if isinstance(generator, self._vllm["error_response"]):
return self._fastapi["json_response"](content=generator.model_dump(), status_code=generator.code)
elif isinstance(generator, self._vllm["chat_completion_response"]):
return self._fastapi["json_response"](content=generator.model_dump())
return self._fastapi["streaming_response"](content=generator, media_type="text/event-stream")
return self._vllm_engine.chat_completions(data=data, state=state, collect_custom_statistics_fn=collect_custom_statistics_fn)
@staticmethod

View File

@@ -105,6 +105,7 @@ services:
GOOGLE_APPLICATION_CREDENTIALS: ${GOOGLE_APPLICATION_CREDENTIALS:-}
AZURE_STORAGE_ACCOUNT: ${AZURE_STORAGE_ACCOUNT:-}
AZURE_STORAGE_KEY: ${AZURE_STORAGE_KEY:-}
VLLM_ENGINE_ARGS: ${VLLM_ENGINE_ARGS:-'{"disable_log_requests":true,"disable_log_stats":false,"gpu_memory_utilization":0.95,"quantization":null,"enforce_eager":true}'}
depends_on:
- kafka
networks:

View File

@@ -1,16 +1,5 @@
"""Hugginface preprocessing module for ClearML Serving."""
from typing import Any, Optional, List, Callable, Union
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm.entrypoints.logger import RequestLogger
from vllm.entrypoints.openai.serving_engine import OpenAIServing
from vllm.entrypoints.openai.serving_models import OpenAIServingModels, LoRAModulePath, PromptAdapterPath, BaseModelPath
from vllm.entrypoints.openai.serving_chat import OpenAIServingChat
from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion
from vllm.entrypoints.openai.serving_embedding import OpenAIServingEmbedding
from vllm.entrypoints.openai.serving_tokenization import OpenAIServingTokenization
from vllm.entrypoints.chat_utils import ChatTemplateContentFormatOption
from vllm.usage.usage_lib import UsageContext
class Preprocess:
@@ -22,19 +11,19 @@ class Preprocess:
def load(self, local_file_name: str) -> Optional[Any]: # noqa
self.vllm_engine_config = {
"model": local_file_name,
"tokenizer": local_file_name,
"disable_log_requests": True,
"disable_log_stats": False,
"gpu_memory_utilization": 0.9,
"quantization": None,
"enforce_eager": True,
"served_model_name": "test_vllm",
"dtype": "float16",
"max_model_len": 8192
}
self.vllm_model_config = {
# vllm_engine_config = {
# "model": local_file_name,
# "tokenizer": local_file_name,
# "disable_log_requests": True,
# "disable_log_stats": False,
# "gpu_memory_utilization": 0.9,
# "quantization": None,
# "enforce_eager": True,
# "served_model_name": "test_vllm",
# "dtype": "float16",
# "max_model_len": 8192
# }
vllm_model_config = {
"lora_modules": None, # [LoRAModulePath(name=a, path=b)]
"prompt_adapters": None, # [PromptAdapterPath(name=a, path=b)]
"response_role": "assistant",
@@ -42,7 +31,7 @@ class Preprocess:
"return_tokens_as_token_ids": False,
"max_log_len": None
}
self.chat_settings = {
chat_settings = {
"enable_reasoning": False,
"reasoning_parser": None,
"enable_auto_tools": False,
@@ -50,64 +39,69 @@ class Preprocess:
"enable_prompt_tokens_details": False,
"chat_template_content_format": "auto"
}
self._model = {}
self.engine_args = AsyncEngineArgs(**self.vllm_engine_config)
async_engine_client = AsyncLLMEngine.from_engine_args(self.engine_args, usage_context=UsageContext.OPENAI_API_SERVER)
self.model_config = async_engine_client.engine.get_model_config()
request_logger = RequestLogger(max_log_len=self.vllm_model_config["max_log_len"])
self._model["openai_serving_models"] = OpenAIServingModels(
async_engine_client,
self.model_config,
[BaseModelPath(name=self.vllm_engine_config["served_model_name"], model_path=self.vllm_engine_config["model"])],
lora_modules=self.vllm_model_config["lora_modules"],
prompt_adapters=self.vllm_model_config["prompt_adapters"],
)
self._model["openai_serving"] = OpenAIServing(
async_engine_client,
self.model_config,
self._model["openai_serving_models"],
request_logger=request_logger,
return_tokens_as_token_ids=self.vllm_model_config["return_tokens_as_token_ids"]
)
self._model["openai_serving_chat"] = OpenAIServingChat(
async_engine_client,
self.model_config,
self._model["openai_serving_models"],
response_role=self.vllm_model_config["response_role"],
request_logger=request_logger,
chat_template=self.vllm_model_config["chat_template"],
chat_template_content_format=self.chat_settings["chat_template_content_format"],
return_tokens_as_token_ids=self.vllm_model_config["return_tokens_as_token_ids"],
enable_reasoning=self.chat_settings["enable_reasoning"],
reasoning_parser=self.chat_settings["reasoning_parser"],
enable_auto_tools=self.chat_settings["enable_auto_tools"],
tool_parser=self.chat_settings["tool_parser"],
enable_prompt_tokens_details=self.chat_settings["enable_prompt_tokens_details"]
) if self.model_config.runner_type == "generate" else None
self._model["openai_serving_completion"] = OpenAIServingCompletion(
async_engine_client,
self.model_config,
self._model["openai_serving_models"],
request_logger=request_logger,
return_tokens_as_token_ids=self.vllm_model_config["return_tokens_as_token_ids"]
) if self.model_config.runner_type == "generate" else None
self._model["openai_serving_embedding"] = OpenAIServingEmbedding(
async_engine_client,
self.model_config,
self._model["openai_serving_models"],
request_logger=request_logger,
chat_template=self.vllm_model_config["chat_template"],
chat_template_content_format=self.chat_settings["chat_template_content_format"]
) if self.model_config.task == "embed" else None
self._model["openai_serving_tokenization"] = OpenAIServingTokenization(
async_engine_client,
self.model_config,
self._model["openai_serving_models"],
request_logger=request_logger,
chat_template=self.vllm_model_config["chat_template"],
chat_template_content_format=self.chat_settings["chat_template_content_format"]
)
return self._model
# self._model = {}
# engine_args = AsyncEngineArgs(**self.vllm_engine_config)
# async_engine_client = AsyncLLMEngine.from_engine_args(self.engine_args, usage_context=UsageContext.OPENAI_API_SERVER)
# model_config = async_engine_client.engine.get_model_config()
# request_logger = RequestLogger(max_log_len=self.vllm_model_config["max_log_len"])
# self._model["openai_serving_models"] = OpenAIServingModels(
# async_engine_client,
# self.model_config,
# [BaseModelPath(name=self.vllm_engine_config["served_model_name"], model_path=self.vllm_engine_config["model"])],
# lora_modules=self.vllm_model_config["lora_modules"],
# prompt_adapters=self.vllm_model_config["prompt_adapters"],
# )
# self._model["openai_serving"] = OpenAIServing(
# async_engine_client,
# self.model_config,
# self._model["openai_serving_models"],
# request_logger=request_logger,
# return_tokens_as_token_ids=self.vllm_model_config["return_tokens_as_token_ids"]
# )
# self._model["openai_serving_chat"] = OpenAIServingChat(
# async_engine_client,
# self.model_config,
# self._model["openai_serving_models"],
# response_role=self.vllm_model_config["response_role"],
# request_logger=request_logger,
# chat_template=self.vllm_model_config["chat_template"],
# chat_template_content_format=self.chat_settings["chat_template_content_format"],
# return_tokens_as_token_ids=self.vllm_model_config["return_tokens_as_token_ids"],
# enable_reasoning=self.chat_settings["enable_reasoning"],
# reasoning_parser=self.chat_settings["reasoning_parser"],
# enable_auto_tools=self.chat_settings["enable_auto_tools"],
# tool_parser=self.chat_settings["tool_parser"],
# enable_prompt_tokens_details=self.chat_settings["enable_prompt_tokens_details"]
# ) if self.model_config.runner_type == "generate" else None
# self._model["openai_serving_completion"] = OpenAIServingCompletion(
# async_engine_client,
# self.model_config,
# self._model["openai_serving_models"],
# request_logger=request_logger,
# return_tokens_as_token_ids=self.vllm_model_config["return_tokens_as_token_ids"]
# ) if self.model_config.runner_type == "generate" else None
# self._model["openai_serving_embedding"] = OpenAIServingEmbedding(
# async_engine_client,
# self.model_config,
# self._model["openai_serving_models"],
# request_logger=request_logger,
# chat_template=self.vllm_model_config["chat_template"],
# chat_template_content_format=self.chat_settings["chat_template_content_format"]
# ) if self.model_config.task == "embed" else None
# self._model["openai_serving_tokenization"] = OpenAIServingTokenization(
# async_engine_client,
# self.model_config,
# self._model["openai_serving_models"],
# request_logger=request_logger,
# chat_template=self.vllm_model_config["chat_template"],
# chat_template_content_format=self.chat_settings["chat_template_content_format"]
# )
# return self._model
return {
# "vllm_engine_config": vllm_engine_config,
"vllm_model_config": vllm_model_config,
"chat_settings": chat_settings
}
def remove_extra_system_prompts(self, messages: List) -> List:
system_messages_indices = []

View File

@@ -15,7 +15,7 @@ def main(model_name: str = "test_vllm"):
top_p=1.0
)
print(f"ChatCompletion: {chat_response.choices[0].message}")
print(f"ChatCompletion: {chat_response.choices[0].message.content}")
comp_response = client.completions.create(
model=model_name,