add getattr for process methods

This commit is contained in:
IlyaMescheryakov1402 2025-03-11 22:42:59 +03:00
parent 9bb0dbb182
commit fedfcdadeb
3 changed files with 89 additions and 54 deletions

View File

@ -1225,23 +1225,28 @@ class ModelRequestProcessor(object):
preprocessed = await processor.preprocess(body, state, stats_collect_fn) \ preprocessed = await processor.preprocess(body, state, stats_collect_fn) \
if processor.is_preprocess_async \ if processor.is_preprocess_async \
else processor.preprocess(body, state, stats_collect_fn) else processor.preprocess(body, state, stats_collect_fn)
if serve_type == "process": processed_func = getattr(processor, serve_type.replace("/", "_"))
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
processed = await processor.process(preprocessed, state, stats_collect_fn) \ processed = await processed_func(preprocessed, state, stats_collect_fn) \
if processor.is_process_async \ if processor.is_process_async \
else processor.process(preprocessed, state, stats_collect_fn) else processed_func(preprocessed, state, stats_collect_fn)
elif serve_type == "completions": # if serve_type == "process":
# noinspection PyUnresolvedReferences # # noinspection PyUnresolvedReferences
processed = await processor.completions(preprocessed, state, stats_collect_fn) \ # processed = await processor.process(preprocessed, state, stats_collect_fn) \
if processor.is_process_async \ # if processor.is_process_async \
else processor.completions(preprocessed, state, stats_collect_fn) # else processor.process(preprocessed, state, stats_collect_fn)
elif serve_type == "chat/completions": # elif serve_type == "completions":
# noinspection PyUnresolvedReferences # # noinspection PyUnresolvedReferences
processed = await processor.chat_completions(preprocessed, state, stats_collect_fn) \ # processed = await processor.completions(preprocessed, state, stats_collect_fn) \
if processor.is_process_async \ # if processor.is_process_async \
else processor.chat_completions(preprocessed, state, stats_collect_fn) # else processor.completions(preprocessed, state, stats_collect_fn)
else: # elif serve_type == "chat/completions":
raise ValueError(f"wrong url_type: expected 'process', 'completions' or 'chat/completions', got {serve_type}") # # noinspection PyUnresolvedReferences
# processed = await processor.chat_completions(preprocessed, state, stats_collect_fn) \
# if processor.is_process_async \
# else processor.chat_completions(preprocessed, state, stats_collect_fn)
# else:
# raise ValueError(f"wrong url_type: expected 'process', 'completions' or 'chat/completions', got {serve_type}")
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
return_value = await processor.postprocess(processed, state, stats_collect_fn) \ return_value = await processor.postprocess(processed, state, stats_collect_fn) \
if processor.is_postprocess_async \ if processor.is_postprocess_async \

View File

@ -606,26 +606,32 @@ class CustomAsyncPreprocessRequest(BasePreprocessRequest):
class VllmEngine(Singleton): class VllmEngine(Singleton):
_model = None
_vllm = None _vllm = None
_fastapi = None _fastapi = None
is_already_loaded = False is_already_loaded = False
def __init__(self): def __init__(self) -> None:
# load vLLM Modules # load vLLM Modules
if self._vllm is None: if self._vllm is None:
# from vllm import entrypoints, engine, usage
from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm.entrypoints.logger import RequestLogger from vllm.entrypoints.logger import RequestLogger
from vllm.entrypoints.openai.serving_engine import OpenAIServing from vllm.entrypoints.openai.serving_engine import OpenAIServing
from vllm.entrypoints.openai.serving_models import OpenAIServingModels, LoRAModulePath, PromptAdapterPath, BaseModelPath from vllm.entrypoints.openai.serving_models import (
OpenAIServingModels,
LoRAModulePath,
PromptAdapterPath,
BaseModelPath
)
from vllm.entrypoints.openai.serving_chat import OpenAIServingChat from vllm.entrypoints.openai.serving_chat import OpenAIServingChat
from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion
from vllm.entrypoints.openai.serving_embedding import OpenAIServingEmbedding from vllm.entrypoints.openai.serving_embedding import OpenAIServingEmbedding
from vllm.entrypoints.openai.serving_tokenization import OpenAIServingTokenization from vllm.entrypoints.openai.serving_tokenization import OpenAIServingTokenization
from vllm.entrypoints.openai.protocol import ChatCompletionResponse, CompletionResponse, ErrorResponse from vllm.entrypoints.openai.protocol import (
ChatCompletionResponse,
CompletionResponse,
ErrorResponse
)
from vllm.entrypoints.chat_utils import ChatTemplateContentFormatOption from vllm.entrypoints.chat_utils import ChatTemplateContentFormatOption
from vllm.usage.usage_lib import UsageContext from vllm.usage.usage_lib import UsageContext
self._vllm = { self._vllm = {
@ -669,7 +675,7 @@ class VllmEngine(Singleton):
model_path: str, model_path: str,
vllm_model_config: dict, vllm_model_config: dict,
chat_settings: dict chat_settings: dict
): ) -> None:
if self.is_already_loaded: if self.is_already_loaded:
self.add_models(name=name, model_path=model_path) self.add_models(name=name, model_path=model_path)
return None return None
@ -686,7 +692,7 @@ class VllmEngine(Singleton):
request_logger = self._vllm["RequestLogger"]( request_logger = self._vllm["RequestLogger"](
max_log_len=vllm_model_config["max_log_len"] max_log_len=vllm_model_config["max_log_len"]
) )
self._model["openai_serving_models"] = self._vllm["OpenAIServingModels"]( self.openai_serving_models = self._vllm["OpenAIServingModels"](
async_engine_client, async_engine_client,
model_config, model_config,
[ [
@ -698,18 +704,18 @@ class VllmEngine(Singleton):
lora_modules=vllm_model_config["lora_modules"], lora_modules=vllm_model_config["lora_modules"],
prompt_adapters=vllm_model_config["prompt_adapters"], prompt_adapters=vllm_model_config["prompt_adapters"],
) )
# await self._model["openai_serving_models"].init_static_loras() # await self.openai_serving_models.init_static_loras()
self._model["openai_serving"] = self._vllm["OpenAIServing"]( self.openai_serving = self._vllm["OpenAIServing"](
async_engine_client, async_engine_client,
model_config, model_config,
self._model["openai_serving_models"], self.openai_serving_models,
request_logger=request_logger, request_logger=request_logger,
return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"] return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"]
) )
self._model["openai_serving_chat"] = self._vllm["OpenAIServingChat"]( self.openai_serving_chat = self._vllm["OpenAIServingChat"](
async_engine_client, async_engine_client,
model_config, model_config,
self._model["openai_serving_models"], self.openai_serving_models,
response_role=vllm_model_config["response_role"], response_role=vllm_model_config["response_role"],
request_logger=request_logger, request_logger=request_logger,
chat_template=vllm_model_config["chat_template"], chat_template=vllm_model_config["chat_template"],
@ -721,25 +727,25 @@ class VllmEngine(Singleton):
tool_parser=chat_settings["tool_parser"], tool_parser=chat_settings["tool_parser"],
enable_prompt_tokens_details=chat_settings["enable_prompt_tokens_details"] enable_prompt_tokens_details=chat_settings["enable_prompt_tokens_details"]
) if model_config.runner_type == "generate" else None ) if model_config.runner_type == "generate" else None
self._model["openai_serving_completion"] = self._vllm["OpenAIServingCompletion"]( self.openai_serving_completion = self._vllm["OpenAIServingCompletion"](
async_engine_client, async_engine_client,
model_config, model_config,
self._model["openai_serving_models"], self.openai_serving_models,
request_logger=request_logger, request_logger=request_logger,
return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"] return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"]
) if model_config.runner_type == "generate" else None ) if model_config.runner_type == "generate" else None
self._model["openai_serving_embedding"] = self._vllm["OpenAIServingEmbedding"]( self.openai_serving_embedding = self._vllm["OpenAIServingEmbedding"](
async_engine_client, async_engine_client,
model_config, model_config,
self._model["openai_serving_models"], self.openai_serving_models,
request_logger=request_logger, request_logger=request_logger,
chat_template=vllm_model_config["chat_template"], chat_template=vllm_model_config["chat_template"],
chat_template_content_format=chat_settings["chat_template_content_format"] chat_template_content_format=chat_settings["chat_template_content_format"]
) if model_config.task == "embed" else None ) if model_config.task == "embed" else None
self._model["openai_serving_tokenization"] = self._vllm["OpenAIServingTokenization"]( self.openai_serving_tokenization = self._vllm["OpenAIServingTokenization"](
async_engine_client, async_engine_client,
model_config, model_config,
self._model["openai_serving_models"], self.openai_serving_models,
request_logger=request_logger, request_logger=request_logger,
chat_template=vllm_model_config["chat_template"], chat_template=vllm_model_config["chat_template"],
chat_template_content_format=chat_settings["chat_template_content_format"] chat_template_content_format=chat_settings["chat_template_content_format"]
@ -748,8 +754,8 @@ class VllmEngine(Singleton):
self.is_already_loaded = True self.is_already_loaded = True
return None return None
def add_models(self, name: str, model_path: str): def add_models(self, name: str, model_path: str) -> None:
self._model["openai_serving_models"].base_model_paths.append( self.openai_serving_models.base_model_paths.append(
self._vllm["BaseModelPath"]( self._vllm["BaseModelPath"](
name=name, name=name,
model_path=model_path model_path=model_path
@ -769,10 +775,9 @@ class VllmEngine(Singleton):
We run the process in this context We run the process in this context
""" """
request, raw_request = data["request"], data["raw_request"] request, raw_request = data["request"], data["raw_request"]
# analog of completion(raw_request) in https://github.com/vllm-project/vllm/blob/v0.7.3/vllm/entrypoints/openai/api_server.py#L405 handler = self.openai_serving_completion
handler = self._model["openai_serving_completion"]
if handler is None: if handler is None:
return self._model["openai_serving"].create_error_response( return self.openai_serving.create_error_response(
message="The model does not support Completions API" message="The model does not support Completions API"
) )
generator = await handler.create_completion(request=request, raw_request=raw_request) generator = await handler.create_completion(request=request, raw_request=raw_request)
@ -793,10 +798,9 @@ class VllmEngine(Singleton):
We run the process in this context We run the process in this context
""" """
request, raw_request = data["request"], data["raw_request"] request, raw_request = data["request"], data["raw_request"]
# analog of chat(raw_request) in https://github.com/vllm-project/vllm/blob/v0.7.3/vllm/entrypoints/openai/api_server.py#L405 handler = self.openai_serving_chat
handler = self._model["openai_serving_chat"]
if handler is None: if handler is None:
return self._model["openai_serving"].create_error_response( return self.openai_serving.create_error_response(
message="The model does not support Chat Completions API" message="The model does not support Chat Completions API"
) )
generator = await handler.create_chat_completion(request=request, raw_request=raw_request) generator = await handler.create_chat_completion(request=request, raw_request=raw_request)
@ -812,7 +816,9 @@ class VllmEngine(Singleton):
state: dict, state: dict,
collect_custom_statistics_fn: Callable[[dict], None] = None collect_custom_statistics_fn: Callable[[dict], None] = None
) -> Any: ) -> Any:
pass request, raw_request = data["request"], data["raw_request"]
models_ = await self.openai_serving_models.show_available_models()
return JSONResponse(content=models_.model_dump())
@BasePreprocessRequest.register_engine("vllm", modules=["vllm", "fastapi"]) @BasePreprocessRequest.register_engine("vllm", modules=["vllm", "fastapi"])
@ -900,28 +906,53 @@ class VllmPreprocessRequest(BasePreprocessRequest):
return await self._preprocess.postprocess(data, state, collect_custom_statistics_fn) return await self._preprocess.postprocess(data, state, collect_custom_statistics_fn)
return data return data
async def completions(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: async def completions(
self,
data: Any,
state: dict,
collect_custom_statistics_fn: Callable[[dict], None] = None
) -> Any:
""" """
The actual processing function. The actual processing function.
We run the process in this context We run the process in this context
""" """
return await self._vllm_engine.completions(data=data, state=state, collect_custom_statistics_fn=collect_custom_statistics_fn) return await self._vllm_engine.completions(
data=data,
state=state,
collect_custom_statistics_fn=collect_custom_statistics_fn
)
async def chat_completions(
async def chat_completions(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: self,
data: Any,
state: dict,
collect_custom_statistics_fn: Callable[[dict], None] = None
) -> Any:
""" """
The actual processing function. The actual processing function.
We run the process in this context We run the process in this context
""" """
return await self._vllm_engine.chat_completions(data=data, state=state, collect_custom_statistics_fn=collect_custom_statistics_fn) return await self._vllm_engine.chat_completions(
data=data,
state=state,
collect_custom_statistics_fn=collect_custom_statistics_fn
)
async def models(
async def models(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: self,
data: Any,
state: dict,
collect_custom_statistics_fn: Callable[[dict], None] = None
) -> Any:
""" """
The actual processing function. The actual processing function.
We run the process in this context We run the process in this context
""" """
return self._vllm_engine.models(data=data, state=state, collect_custom_statistics_fn=collect_custom_statistics_fn) return await self._vllm_engine.models(
data=data,
state=state,
collect_custom_statistics_fn=collect_custom_statistics_fn
)
@staticmethod @staticmethod
async def _preprocess_send_request(_, endpoint: str, version: str = None, data: dict = None) -> Optional[dict]: async def _preprocess_send_request(_, endpoint: str, version: str = None, data: dict = None) -> Optional[dict]:

View File

@ -10,7 +10,6 @@ class Preprocess:
self.model_endpoint = None self.model_endpoint = None
def load(self, local_file_name: str) -> Optional[Any]: # noqa def load(self, local_file_name: str) -> Optional[Any]: # noqa
vllm_model_config = { vllm_model_config = {
"lora_modules": None, # [LoRAModulePath(name=a, path=b)] "lora_modules": None, # [LoRAModulePath(name=a, path=b)]
"prompt_adapters": None, # [PromptAdapterPath(name=a, path=b)] "prompt_adapters": None, # [PromptAdapterPath(name=a, path=b)]