From fedfcdadeb7da3e4b76d691a150b02f624993618 Mon Sep 17 00:00:00 2001 From: IlyaMescheryakov1402 Date: Tue, 11 Mar 2025 22:42:59 +0300 Subject: [PATCH] add getattr for process methods --- .../serving/model_request_processor.py | 39 ++++--- clearml_serving/serving/preprocess_service.py | 103 ++++++++++++------ examples/vllm/preprocess.py | 1 - 3 files changed, 89 insertions(+), 54 deletions(-) diff --git a/clearml_serving/serving/model_request_processor.py b/clearml_serving/serving/model_request_processor.py index eaa4b49..0cf9084 100644 --- a/clearml_serving/serving/model_request_processor.py +++ b/clearml_serving/serving/model_request_processor.py @@ -1225,23 +1225,28 @@ class ModelRequestProcessor(object): preprocessed = await processor.preprocess(body, state, stats_collect_fn) \ if processor.is_preprocess_async \ else processor.preprocess(body, state, stats_collect_fn) - if serve_type == "process": - # noinspection PyUnresolvedReferences - processed = await processor.process(preprocessed, state, stats_collect_fn) \ - if processor.is_process_async \ - else processor.process(preprocessed, state, stats_collect_fn) - elif serve_type == "completions": - # noinspection PyUnresolvedReferences - processed = await processor.completions(preprocessed, state, stats_collect_fn) \ - if processor.is_process_async \ - else processor.completions(preprocessed, state, stats_collect_fn) - elif serve_type == "chat/completions": - # noinspection PyUnresolvedReferences - processed = await processor.chat_completions(preprocessed, state, stats_collect_fn) \ - if processor.is_process_async \ - else processor.chat_completions(preprocessed, state, stats_collect_fn) - else: - raise ValueError(f"wrong url_type: expected 'process', 'completions' or 'chat/completions', got {serve_type}") + processed_func = getattr(processor, serve_type.replace("/", "_")) + # noinspection PyUnresolvedReferences + processed = await processed_func(preprocessed, state, stats_collect_fn) \ + if processor.is_process_async \ + else processed_func(preprocessed, state, stats_collect_fn) + # if serve_type == "process": + # # noinspection PyUnresolvedReferences + # processed = await processor.process(preprocessed, state, stats_collect_fn) \ + # if processor.is_process_async \ + # else processor.process(preprocessed, state, stats_collect_fn) + # elif serve_type == "completions": + # # noinspection PyUnresolvedReferences + # processed = await processor.completions(preprocessed, state, stats_collect_fn) \ + # if processor.is_process_async \ + # else processor.completions(preprocessed, state, stats_collect_fn) + # elif serve_type == "chat/completions": + # # noinspection PyUnresolvedReferences + # processed = await processor.chat_completions(preprocessed, state, stats_collect_fn) \ + # if processor.is_process_async \ + # else processor.chat_completions(preprocessed, state, stats_collect_fn) + # else: + # raise ValueError(f"wrong url_type: expected 'process', 'completions' or 'chat/completions', got {serve_type}") # noinspection PyUnresolvedReferences return_value = await processor.postprocess(processed, state, stats_collect_fn) \ if processor.is_postprocess_async \ diff --git a/clearml_serving/serving/preprocess_service.py b/clearml_serving/serving/preprocess_service.py index 2d584c9..ed9f270 100644 --- a/clearml_serving/serving/preprocess_service.py +++ b/clearml_serving/serving/preprocess_service.py @@ -606,26 +606,32 @@ class CustomAsyncPreprocessRequest(BasePreprocessRequest): class VllmEngine(Singleton): - _model = None _vllm = None _fastapi = None is_already_loaded = False - def __init__(self): - + def __init__(self) -> None: # load vLLM Modules if self._vllm is None: - # from vllm import entrypoints, engine, usage from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.async_llm_engine import AsyncLLMEngine from vllm.entrypoints.logger import RequestLogger from vllm.entrypoints.openai.serving_engine import OpenAIServing - from vllm.entrypoints.openai.serving_models import OpenAIServingModels, LoRAModulePath, PromptAdapterPath, BaseModelPath + from vllm.entrypoints.openai.serving_models import ( + OpenAIServingModels, + LoRAModulePath, + PromptAdapterPath, + BaseModelPath + ) from vllm.entrypoints.openai.serving_chat import OpenAIServingChat from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion from vllm.entrypoints.openai.serving_embedding import OpenAIServingEmbedding from vllm.entrypoints.openai.serving_tokenization import OpenAIServingTokenization - from vllm.entrypoints.openai.protocol import ChatCompletionResponse, CompletionResponse, ErrorResponse + from vllm.entrypoints.openai.protocol import ( + ChatCompletionResponse, + CompletionResponse, + ErrorResponse + ) from vllm.entrypoints.chat_utils import ChatTemplateContentFormatOption from vllm.usage.usage_lib import UsageContext self._vllm = { @@ -669,7 +675,7 @@ class VllmEngine(Singleton): model_path: str, vllm_model_config: dict, chat_settings: dict - ): + ) -> None: if self.is_already_loaded: self.add_models(name=name, model_path=model_path) return None @@ -686,7 +692,7 @@ class VllmEngine(Singleton): request_logger = self._vllm["RequestLogger"]( max_log_len=vllm_model_config["max_log_len"] ) - self._model["openai_serving_models"] = self._vllm["OpenAIServingModels"]( + self.openai_serving_models = self._vllm["OpenAIServingModels"]( async_engine_client, model_config, [ @@ -698,18 +704,18 @@ class VllmEngine(Singleton): lora_modules=vllm_model_config["lora_modules"], prompt_adapters=vllm_model_config["prompt_adapters"], ) - # await self._model["openai_serving_models"].init_static_loras() - self._model["openai_serving"] = self._vllm["OpenAIServing"]( + # await self.openai_serving_models.init_static_loras() + self.openai_serving = self._vllm["OpenAIServing"]( async_engine_client, model_config, - self._model["openai_serving_models"], + self.openai_serving_models, request_logger=request_logger, return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"] ) - self._model["openai_serving_chat"] = self._vllm["OpenAIServingChat"]( + self.openai_serving_chat = self._vllm["OpenAIServingChat"]( async_engine_client, model_config, - self._model["openai_serving_models"], + self.openai_serving_models, response_role=vllm_model_config["response_role"], request_logger=request_logger, chat_template=vllm_model_config["chat_template"], @@ -721,25 +727,25 @@ class VllmEngine(Singleton): tool_parser=chat_settings["tool_parser"], enable_prompt_tokens_details=chat_settings["enable_prompt_tokens_details"] ) if model_config.runner_type == "generate" else None - self._model["openai_serving_completion"] = self._vllm["OpenAIServingCompletion"]( + self.openai_serving_completion = self._vllm["OpenAIServingCompletion"]( async_engine_client, model_config, - self._model["openai_serving_models"], + self.openai_serving_models, request_logger=request_logger, return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"] ) if model_config.runner_type == "generate" else None - self._model["openai_serving_embedding"] = self._vllm["OpenAIServingEmbedding"]( + self.openai_serving_embedding = self._vllm["OpenAIServingEmbedding"]( async_engine_client, model_config, - self._model["openai_serving_models"], + self.openai_serving_models, request_logger=request_logger, chat_template=vllm_model_config["chat_template"], chat_template_content_format=chat_settings["chat_template_content_format"] ) if model_config.task == "embed" else None - self._model["openai_serving_tokenization"] = self._vllm["OpenAIServingTokenization"]( + self.openai_serving_tokenization = self._vllm["OpenAIServingTokenization"]( async_engine_client, model_config, - self._model["openai_serving_models"], + self.openai_serving_models, request_logger=request_logger, chat_template=vllm_model_config["chat_template"], chat_template_content_format=chat_settings["chat_template_content_format"] @@ -748,8 +754,8 @@ class VllmEngine(Singleton): self.is_already_loaded = True return None - def add_models(self, name: str, model_path: str): - self._model["openai_serving_models"].base_model_paths.append( + def add_models(self, name: str, model_path: str) -> None: + self.openai_serving_models.base_model_paths.append( self._vllm["BaseModelPath"]( name=name, model_path=model_path @@ -769,10 +775,9 @@ class VllmEngine(Singleton): We run the process in this context """ request, raw_request = data["request"], data["raw_request"] - # analog of completion(raw_request) in https://github.com/vllm-project/vllm/blob/v0.7.3/vllm/entrypoints/openai/api_server.py#L405 - handler = self._model["openai_serving_completion"] + handler = self.openai_serving_completion if handler is None: - return self._model["openai_serving"].create_error_response( + return self.openai_serving.create_error_response( message="The model does not support Completions API" ) generator = await handler.create_completion(request=request, raw_request=raw_request) @@ -793,10 +798,9 @@ class VllmEngine(Singleton): We run the process in this context """ request, raw_request = data["request"], data["raw_request"] - # analog of chat(raw_request) in https://github.com/vllm-project/vllm/blob/v0.7.3/vllm/entrypoints/openai/api_server.py#L405 - handler = self._model["openai_serving_chat"] + handler = self.openai_serving_chat if handler is None: - return self._model["openai_serving"].create_error_response( + return self.openai_serving.create_error_response( message="The model does not support Chat Completions API" ) generator = await handler.create_chat_completion(request=request, raw_request=raw_request) @@ -812,7 +816,9 @@ class VllmEngine(Singleton): state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None ) -> Any: - pass + request, raw_request = data["request"], data["raw_request"] + models_ = await self.openai_serving_models.show_available_models() + return JSONResponse(content=models_.model_dump()) @BasePreprocessRequest.register_engine("vllm", modules=["vllm", "fastapi"]) @@ -900,28 +906,53 @@ class VllmPreprocessRequest(BasePreprocessRequest): return await self._preprocess.postprocess(data, state, collect_custom_statistics_fn) return data - async def completions(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: + async def completions( + self, + data: Any, + state: dict, + collect_custom_statistics_fn: Callable[[dict], None] = None + ) -> Any: """ The actual processing function. We run the process in this context """ - return await self._vllm_engine.completions(data=data, state=state, collect_custom_statistics_fn=collect_custom_statistics_fn) + return await self._vllm_engine.completions( + data=data, + state=state, + collect_custom_statistics_fn=collect_custom_statistics_fn + ) - - async def chat_completions(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: + async def chat_completions( + self, + data: Any, + state: dict, + collect_custom_statistics_fn: Callable[[dict], None] = None + ) -> Any: """ The actual processing function. We run the process in this context """ - return await self._vllm_engine.chat_completions(data=data, state=state, collect_custom_statistics_fn=collect_custom_statistics_fn) + return await self._vllm_engine.chat_completions( + data=data, + state=state, + collect_custom_statistics_fn=collect_custom_statistics_fn + ) - - async def models(self, data: Any, state: dict, collect_custom_statistics_fn: Callable[[dict], None] = None) -> Any: + async def models( + self, + data: Any, + state: dict, + collect_custom_statistics_fn: Callable[[dict], None] = None + ) -> Any: """ The actual processing function. We run the process in this context """ - return self._vllm_engine.models(data=data, state=state, collect_custom_statistics_fn=collect_custom_statistics_fn) + return await self._vllm_engine.models( + data=data, + state=state, + collect_custom_statistics_fn=collect_custom_statistics_fn + ) @staticmethod async def _preprocess_send_request(_, endpoint: str, version: str = None, data: dict = None) -> Optional[dict]: diff --git a/examples/vllm/preprocess.py b/examples/vllm/preprocess.py index 001dd58..87ca1be 100644 --- a/examples/vllm/preprocess.py +++ b/examples/vllm/preprocess.py @@ -10,7 +10,6 @@ class Preprocess: self.model_endpoint = None def load(self, local_file_name: str) -> Optional[Any]: # noqa - vllm_model_config = { "lora_modules": None, # [LoRAModulePath(name=a, path=b)] "prompt_adapters": None, # [PromptAdapterPath(name=a, path=b)]