From aff27c62b89f9d0f2fe2e55cc75ffdbc272a6f78 Mon Sep 17 00:00:00 2001 From: clearml <> Date: Thu, 12 Dec 2024 23:57:21 +0200 Subject: [PATCH] Fix gRPC errors print stack traces and full verbose details. Add support for controlling error printouts using `CLEARML_SERVING_AIO_RPC_IGNORE_ERRORS` and `CLEARML_SERVING_AIO_RPC_VERBOSE_ERRORS` (pass a whitespace-separated list of error codes or error names) --- clearml_serving/serving/main.py | 87 +++++++++++++++++++++++--------- clearml_serving/serving/utils.py | 17 +++++++ clearml_serving/version.py | 2 +- 3 files changed, 80 insertions(+), 26 deletions(-) create mode 100644 clearml_serving/serving/utils.py diff --git a/clearml_serving/serving/main.py b/clearml_serving/serving/main.py index 10ce9c9..2536760 100644 --- a/clearml_serving/serving/main.py +++ b/clearml_serving/serving/main.py @@ -1,4 +1,5 @@ import os +import shlex import traceback import gzip import asyncio @@ -6,6 +7,7 @@ import asyncio from fastapi import FastAPI, Request, Response, APIRouter, HTTPException from fastapi.routing import APIRoute from fastapi.responses import PlainTextResponse +from grpc.aio import AioRpcError from starlette.background import BackgroundTask @@ -13,8 +15,14 @@ from typing import Optional, Dict, Any, Callable, Union from clearml_serving.version import __version__ from clearml_serving.serving.init import setup_task -from clearml_serving.serving.model_request_processor import ModelRequestProcessor, EndpointNotFoundException, \ - EndpointBackendEngineException, EndpointModelLoadException, ServingInitializationException +from clearml_serving.serving.model_request_processor import ( + ModelRequestProcessor, + EndpointNotFoundException, + EndpointBackendEngineException, + EndpointModelLoadException, + ServingInitializationException, +) +from clearml_serving.serving.utils import parse_grpc_errors class GzipRequest(Request): @@ -52,10 +60,16 @@ try: except (ValueError, TypeError): pass + +grpc_aio_ignore_errors = parse_grpc_errors(shlex.split(os.environ.get("CLEARML_SERVING_AIO_RPC_IGNORE_ERRORS", ""))) +grpc_aio_verbose_errors = parse_grpc_errors(shlex.split(os.environ.get("CLEARML_SERVING_AIO_RPC_VERBOSE_ERRORS", ""))) + + class CUDAException(Exception): def __init__(self, exception: str): self.exception = exception + # start FastAPI app app = FastAPI(title="ClearML Serving Service", version=__version__, description="ClearML Service Service router") @@ -65,26 +79,31 @@ async def startup_event(): global processor if processor: - print("ModelRequestProcessor already initialized [pid={}] [service_id={}]".format( - os.getpid(), serving_service_task_id)) + print( + "ModelRequestProcessor already initialized [pid={}] [service_id={}]".format( + os.getpid(), serving_service_task_id + ) + ) else: - print("Starting up ModelRequestProcessor [pid={}] [service_id={}]".format( - os.getpid(), serving_service_task_id)) + print("Starting up ModelRequestProcessor [pid={}] [service_id={}]".format(os.getpid(), serving_service_task_id)) processor = ModelRequestProcessor( - task_id=serving_service_task_id, update_lock_guard=singleton_sync_lock, + task_id=serving_service_task_id, + update_lock_guard=singleton_sync_lock, ) print("ModelRequestProcessor [id={}] loaded".format(processor.get_id())) - processor.launch(poll_frequency_sec=model_sync_frequency_secs*60) + processor.launch(poll_frequency_sec=model_sync_frequency_secs * 60) -@app.on_event('shutdown') +@app.on_event("shutdown") def shutdown_event(): - print('RESTARTING INFERENCE SERVICE!') - + print("RESTARTING INFERENCE SERVICE!") + + async def exit_app(): loop = asyncio.get_running_loop() loop.stop() - + + @app.exception_handler(CUDAException) async def cuda_exception_handler(request, exc): task = BackgroundTask(exit_app) @@ -105,31 +124,49 @@ router = APIRouter( @router.post("/{model_id}") async def serve_model(model_id: str, version: Optional[str] = None, request: Union[bytes, Dict[Any, Any]] = None): try: - return_value = await processor.process_request( - base_url=model_id, - version=version, - request_body=request - ) + return_value = await processor.process_request(base_url=model_id, version=version, request_body=request) except EndpointNotFoundException as ex: raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex)) except (EndpointModelLoadException, EndpointBackendEngineException) as ex: - session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( - instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) + session_logger.report_text( + "[{}] Exception [{}] {} while processing request: {}\n{}".format( + instance_id, type(ex), ex, request, "".join(traceback.format_exc()) + ) + ) raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex)) except ServingInitializationException as ex: - session_logger.report_text("[{}] Exception [{}] {} while loading serving inference: {}\n{}".format( - instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) + session_logger.report_text( + "[{}] Exception [{}] {} while loading serving inference: {}\n{}".format( + instance_id, type(ex), ex, request, "".join(traceback.format_exc()) + ) + ) raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex)) except ValueError as ex: - session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( - instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) + session_logger.report_text( + "[{}] Exception [{}] {} while processing request: {}\n{}".format( + instance_id, type(ex), ex, request, "".join(traceback.format_exc()) + ) + ) if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex): raise CUDAException(exception=ex) else: raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex)) + except AioRpcError as ex: + if grpc_aio_verbose_errors and ex.code() in grpc_aio_verbose_errors: + session_logger.report_text( + "[{}] Exception [AioRpcError] {} while processing request: {}".format(instance_id, ex, request) + ) + elif not grpc_aio_ignore_errors or ex.code() not in grpc_aio_ignore_errors: + session_logger.report_text("[{}] Exception [AioRpcError] status={} ".format(instance_id, ex.code())) + raise HTTPException( + status_code=500, detail="Error [AioRpcError] processing request: status={}".format(ex.code()) + ) except Exception as ex: - session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( - instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) + session_logger.report_text( + "[{}] Exception [{}] {} while processing request: {}\n{}".format( + instance_id, type(ex), ex, request, "".join(traceback.format_exc()) + ) + ) raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex)) return return_value diff --git a/clearml_serving/serving/utils.py b/clearml_serving/serving/utils.py new file mode 100644 index 0000000..da18e45 --- /dev/null +++ b/clearml_serving/serving/utils.py @@ -0,0 +1,17 @@ +from typing import List, Set + +import grpc + + +def parse_grpc_errors(errors: List[str]) -> Set[grpc.StatusCode]: + try: + typed_errors = { + int(e) if e.isdigit() else e.lower().replace("-", " ").replace("_", " ") + for e in errors + } + if len(typed_errors) == 1 and next(iter(typed_errors)) in ("true", "false"): + return set(grpc.StatusCode if next(iter(typed_errors)) == "true" else []) + return {e for e in grpc.StatusCode if typed_errors.intersection(e.value)} + except (ValueError, TypeError): + pass + return set() diff --git a/clearml_serving/version.py b/clearml_serving/version.py index 19b4f1d..72837bd 100644 --- a/clearml_serving/version.py +++ b/clearml_serving/version.py @@ -1 +1 @@ -__version__ = '1.3.0' +__version__ = '1.3.1'