From 71c104c9df4352229624b005cfb0d456e2294e2b Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Fri, 1 Mar 2024 13:12:56 +0200 Subject: [PATCH] Add exception prints to serving session Task and inference Task, for better debugging capabilities Add report instance ID when reporting back to the main serving session task --- clearml_serving/serving/init.py | 6 +- clearml_serving/serving/main.py | 25 ++++++-- .../serving/model_request_processor.py | 63 ++++++++++++++----- 3 files changed, 73 insertions(+), 21 deletions(-) diff --git a/clearml_serving/serving/init.py b/clearml_serving/serving/init.py index 34e3da9..2ae54a8 100644 --- a/clearml_serving/serving/init.py +++ b/clearml_serving/serving/init.py @@ -26,8 +26,12 @@ def setup_task(force_threaded_logging=None): task_type="inference", # noqa ) instance_task.set_system_tags(["service"]) + # make sure we start logging thread/process + instance_logger = instance_task.get_logger() # noqa + # this will use the main thread/process + session_logger = serving_task.get_logger() # preload modules into memory before forking BasePreprocessRequest.load_modules() - return serving_service_task_id + return serving_service_task_id, session_logger, instance_task.id diff --git a/clearml_serving/serving/main.py b/clearml_serving/serving/main.py index 845accf..6865c93 100644 --- a/clearml_serving/serving/main.py +++ b/clearml_serving/serving/main.py @@ -1,5 +1,5 @@ import os -from multiprocessing import Lock +import traceback import gzip from fastapi import FastAPI, Request, Response, APIRouter, HTTPException @@ -9,7 +9,8 @@ from typing import Optional, Dict, Any, Callable, Union from clearml_serving.version import __version__ from clearml_serving.serving.init import setup_task -from clearml_serving.serving.model_request_processor import ModelRequestProcessor +from clearml_serving.serving.model_request_processor import ModelRequestProcessor, EndpointNotFoundException, \ + EndpointBackendEngineException, EndpointModelLoadException, ServingInitializationException class GzipRequest(Request): @@ -39,7 +40,7 @@ singleton_sync_lock = None # Lock() processor = None # type: Optional[ModelRequestProcessor] # create clearml Task and load models -serving_service_task_id = setup_task() +serving_service_task_id, session_logger, instance_id = setup_task() # polling frequency model_sync_frequency_secs = 5 try: @@ -88,10 +89,24 @@ async def serve_model(model_id: str, version: Optional[str] = None, request: Uni version=version, request_body=request ) + except EndpointNotFoundException as ex: + raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex)) + except (EndpointModelLoadException, EndpointBackendEngineException) as ex: + session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( + instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) + raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex)) + except ServingInitializationException as ex: + session_logger.report_text("[{}] Exception [{}] {} while loading serving inference: {}\n{}".format( + instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) + raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex)) except ValueError as ex: - raise HTTPException(status_code=422, detail="Error processing request: {}".format(ex)) + session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( + instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) + raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex)) except Exception as ex: - raise HTTPException(status_code=500, detail="Error processing request: {}".format(ex)) + session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( + instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) + raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex)) return return_value diff --git a/clearml_serving/serving/model_request_processor.py b/clearml_serving/serving/model_request_processor.py index 9c854df..ba9242d 100644 --- a/clearml_serving/serving/model_request_processor.py +++ b/clearml_serving/serving/model_request_processor.py @@ -18,6 +18,36 @@ from .preprocess_service import BasePreprocessRequest from .endpoints import ModelEndpoint, ModelMonitoring, CanaryEP, EndpointMetricLogging +class ModelRequestProcessorException(Exception): + def __init__(self, message): + super().__init__(message) + + +class EndpointNotFoundException(ModelRequestProcessorException): + def __init__(self, message): + super().__init__(message) + + +class EndpointModelLoadException(ModelRequestProcessorException): + def __init__(self, message): + super().__init__(message) + + +class EndpointBackendEngineException(ModelRequestProcessorException): + def __init__(self, message): + super().__init__(message) + + +class ServingInitializationException(Exception): + def __init__(self, message): + super().__init__(message) + + +class MetricLoggingException(Exception): + def __init__(self, message): + super().__init__(message) + + class FastWriteCounter(object): def __init__(self): self._counter_inc = itertools.count() @@ -148,7 +178,7 @@ class ModelRequestProcessor(object): ep = self._endpoints.get(url, None) or self._model_monitoring_endpoints.get(url, None) if not ep: - raise ValueError("Model inference endpoint '{}' not found".format(url)) + raise EndpointNotFoundException("Model inference endpoint '{}' not found".format(url)) processor = self._engine_processor_lookup.get(url) if not processor: @@ -270,7 +300,7 @@ class ModelRequestProcessor(object): ) models = Model.query_models(max_results=2, **model_query) if not models: - raise ValueError("Could not find any Model to serve {}".format(model_query)) + raise EndpointModelLoadException("Could not find any Model to serve {}".format(model_query)) if len(models) > 1: print("Warning: Found multiple Models for \'{}\', selecting id={}".format(model_query, models[0].id)) endpoint.model_id = models[0].id @@ -281,7 +311,7 @@ class ModelRequestProcessor(object): # upload as new artifact if preprocess_code: if not Path(preprocess_code).exists(): - raise ValueError("Preprocessing code \'{}\' could not be found".format(preprocess_code)) + raise EndpointModelLoadException("Preprocessing code \'{}\' could not be found".format(preprocess_code)) preprocess_artifact_name = "py_code_{}".format(url.replace("/", "_")) self._task.upload_artifact( name=preprocess_artifact_name, artifact_object=Path(preprocess_code), wait_on_upload=True) @@ -311,8 +341,8 @@ class ModelRequestProcessor(object): # make sure we actually have something to monitor if not any([monitoring.monitor_project, monitoring.monitor_name, monitoring.monitor_tags]): - raise ValueError("Model monitoring requires at least a " - "project / name / tag to monitor, none were provided.") + raise EndpointModelLoadException( + "Model monitoring requires at least a project / name / tag to monitor, none were provided.") # make sure we have everything configured self._validate_model(monitoring) @@ -324,7 +354,8 @@ class ModelRequestProcessor(object): # upload as new artifact if preprocess_code: if not Path(preprocess_code).exists(): - raise ValueError("Preprocessing code \'{}\' could not be found".format(preprocess_code)) + raise EndpointModelLoadException( + "Preprocessing code \'{}\' could not be found".format(preprocess_code)) preprocess_artifact_name = "py_code_{}".format(name.replace("/", "_")) self._task.upload_artifact( name=preprocess_artifact_name, artifact_object=Path(preprocess_code), wait_on_upload=True) @@ -367,7 +398,7 @@ class ModelRequestProcessor(object): if not isinstance(canary, CanaryEP): canary = CanaryEP(**canary) if canary.load_endpoints and canary.load_endpoint_prefix: - raise ValueError( + raise EndpointModelLoadException( "Could not add canary endpoint with both " "prefix ({}) and fixed set of endpoints ({})".format( canary.load_endpoints, canary.load_endpoint_prefix)) @@ -406,7 +437,7 @@ class ModelRequestProcessor(object): metric.endpoint = name if name not in self._endpoints and not name.endswith('*'): - raise ValueError("Metric logging \'{}\' references a nonexistent endpoint".format(name)) + raise MetricLoggingException("Metric logging \'{}\' references a nonexistent endpoint".format(name)) if name in self._metric_logging: print("Warning: Metric logging \'{}\' {}".format(name, "updated" if update else "overwritten")) @@ -1250,11 +1281,11 @@ class ModelRequestProcessor(object): if task_id: task = Task.get_task(task_id=task_id) if not task: - raise ValueError("Could not find Control Task ID={}".format(task_id)) + raise ServingInitializationException("Could not find Control Task ID={}".format(task_id)) task_status = task.status if task_status not in ("created", "in_progress",): if disable_change_state: - raise ValueError( + raise ServingInitializationException( "Could Control Task ID={} status [{}] " "is not valid (only 'draft', 'running' are supported)".format(task_id, task_status)) else: @@ -1271,7 +1302,7 @@ class ModelRequestProcessor(object): 'system_tags': [cls._system_tag]} ) if not tasks: - raise ValueError("Could not find any valid Control Tasks") + raise ServingInitializationException("Could not find any valid Control Tasks") if len(tasks) > 1: print("Warning: more than one valid Controller Tasks found, using Task ID={}".format(tasks[0])) @@ -1352,15 +1383,16 @@ class ModelRequestProcessor(object): except Exception: suggested_cli = "?" - raise ValueError( + raise EndpointBackendEngineException( "Triton engine requires *manual* input/output specification, " "You input/output in your pbtxt, please remove them and specify manually.\n" "{}".format(suggested_cli) ) if aux_config_dict.get("default_model_filename", None): - raise ValueError("ERROR: You have `default_model_filename` in your config pbtxt, " - "please remove it. It will be added automatically by the system.") + raise EndpointBackendEngineException( + "ERROR: You have `default_model_filename` in your config pbtxt, " + "please remove it. It will be added automatically by the system.") # verify we have all the info we need d = endpoint.as_dict() @@ -1372,7 +1404,8 @@ class ModelRequestProcessor(object): ] if missing: - raise ValueError("Triton engine requires input description - missing values in {}".format(missing)) + raise EndpointBackendEngineException( + "Triton engine requires input description - missing values in {}".format(missing)) return True def _add_registered_input_model(self, endpoint_url: str, model_id: str) -> bool: