Add exception prints to serving session Task and inference Task, for better debugging capabilities

Add report instance ID when reporting back to the main serving session task
This commit is contained in:
allegroai 2024-03-01 13:12:56 +02:00
parent c658780d97
commit 71c104c9df
3 changed files with 73 additions and 21 deletions

View File

@ -26,8 +26,12 @@ def setup_task(force_threaded_logging=None):
task_type="inference", # noqa task_type="inference", # noqa
) )
instance_task.set_system_tags(["service"]) instance_task.set_system_tags(["service"])
# make sure we start logging thread/process
instance_logger = instance_task.get_logger() # noqa
# this will use the main thread/process
session_logger = serving_task.get_logger()
# preload modules into memory before forking # preload modules into memory before forking
BasePreprocessRequest.load_modules() BasePreprocessRequest.load_modules()
return serving_service_task_id return serving_service_task_id, session_logger, instance_task.id

View File

@ -1,5 +1,5 @@
import os import os
from multiprocessing import Lock import traceback
import gzip import gzip
from fastapi import FastAPI, Request, Response, APIRouter, HTTPException from fastapi import FastAPI, Request, Response, APIRouter, HTTPException
@ -9,7 +9,8 @@ from typing import Optional, Dict, Any, Callable, Union
from clearml_serving.version import __version__ from clearml_serving.version import __version__
from clearml_serving.serving.init import setup_task from clearml_serving.serving.init import setup_task
from clearml_serving.serving.model_request_processor import ModelRequestProcessor from clearml_serving.serving.model_request_processor import ModelRequestProcessor, EndpointNotFoundException, \
EndpointBackendEngineException, EndpointModelLoadException, ServingInitializationException
class GzipRequest(Request): class GzipRequest(Request):
@ -39,7 +40,7 @@ singleton_sync_lock = None # Lock()
processor = None # type: Optional[ModelRequestProcessor] processor = None # type: Optional[ModelRequestProcessor]
# create clearml Task and load models # create clearml Task and load models
serving_service_task_id = setup_task() serving_service_task_id, session_logger, instance_id = setup_task()
# polling frequency # polling frequency
model_sync_frequency_secs = 5 model_sync_frequency_secs = 5
try: try:
@ -88,10 +89,24 @@ async def serve_model(model_id: str, version: Optional[str] = None, request: Uni
version=version, version=version,
request_body=request request_body=request
) )
except EndpointNotFoundException as ex:
raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex))
except (EndpointModelLoadException, EndpointBackendEngineException) as ex:
session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
except ServingInitializationException as ex:
session_logger.report_text("[{}] Exception [{}] {} while loading serving inference: {}\n{}".format(
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex))
except ValueError as ex: except ValueError as ex:
raise HTTPException(status_code=422, detail="Error processing request: {}".format(ex)) session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
except Exception as ex: except Exception as ex:
raise HTTPException(status_code=500, detail="Error processing request: {}".format(ex)) session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex))
return return_value return return_value

View File

@ -18,6 +18,36 @@ from .preprocess_service import BasePreprocessRequest
from .endpoints import ModelEndpoint, ModelMonitoring, CanaryEP, EndpointMetricLogging from .endpoints import ModelEndpoint, ModelMonitoring, CanaryEP, EndpointMetricLogging
class ModelRequestProcessorException(Exception):
def __init__(self, message):
super().__init__(message)
class EndpointNotFoundException(ModelRequestProcessorException):
def __init__(self, message):
super().__init__(message)
class EndpointModelLoadException(ModelRequestProcessorException):
def __init__(self, message):
super().__init__(message)
class EndpointBackendEngineException(ModelRequestProcessorException):
def __init__(self, message):
super().__init__(message)
class ServingInitializationException(Exception):
def __init__(self, message):
super().__init__(message)
class MetricLoggingException(Exception):
def __init__(self, message):
super().__init__(message)
class FastWriteCounter(object): class FastWriteCounter(object):
def __init__(self): def __init__(self):
self._counter_inc = itertools.count() self._counter_inc = itertools.count()
@ -148,7 +178,7 @@ class ModelRequestProcessor(object):
ep = self._endpoints.get(url, None) or self._model_monitoring_endpoints.get(url, None) ep = self._endpoints.get(url, None) or self._model_monitoring_endpoints.get(url, None)
if not ep: if not ep:
raise ValueError("Model inference endpoint '{}' not found".format(url)) raise EndpointNotFoundException("Model inference endpoint '{}' not found".format(url))
processor = self._engine_processor_lookup.get(url) processor = self._engine_processor_lookup.get(url)
if not processor: if not processor:
@ -270,7 +300,7 @@ class ModelRequestProcessor(object):
) )
models = Model.query_models(max_results=2, **model_query) models = Model.query_models(max_results=2, **model_query)
if not models: if not models:
raise ValueError("Could not find any Model to serve {}".format(model_query)) raise EndpointModelLoadException("Could not find any Model to serve {}".format(model_query))
if len(models) > 1: if len(models) > 1:
print("Warning: Found multiple Models for \'{}\', selecting id={}".format(model_query, models[0].id)) print("Warning: Found multiple Models for \'{}\', selecting id={}".format(model_query, models[0].id))
endpoint.model_id = models[0].id endpoint.model_id = models[0].id
@ -281,7 +311,7 @@ class ModelRequestProcessor(object):
# upload as new artifact # upload as new artifact
if preprocess_code: if preprocess_code:
if not Path(preprocess_code).exists(): if not Path(preprocess_code).exists():
raise ValueError("Preprocessing code \'{}\' could not be found".format(preprocess_code)) raise EndpointModelLoadException("Preprocessing code \'{}\' could not be found".format(preprocess_code))
preprocess_artifact_name = "py_code_{}".format(url.replace("/", "_")) preprocess_artifact_name = "py_code_{}".format(url.replace("/", "_"))
self._task.upload_artifact( self._task.upload_artifact(
name=preprocess_artifact_name, artifact_object=Path(preprocess_code), wait_on_upload=True) name=preprocess_artifact_name, artifact_object=Path(preprocess_code), wait_on_upload=True)
@ -311,8 +341,8 @@ class ModelRequestProcessor(object):
# make sure we actually have something to monitor # make sure we actually have something to monitor
if not any([monitoring.monitor_project, monitoring.monitor_name, monitoring.monitor_tags]): if not any([monitoring.monitor_project, monitoring.monitor_name, monitoring.monitor_tags]):
raise ValueError("Model monitoring requires at least a " raise EndpointModelLoadException(
"project / name / tag to monitor, none were provided.") "Model monitoring requires at least a project / name / tag to monitor, none were provided.")
# make sure we have everything configured # make sure we have everything configured
self._validate_model(monitoring) self._validate_model(monitoring)
@ -324,7 +354,8 @@ class ModelRequestProcessor(object):
# upload as new artifact # upload as new artifact
if preprocess_code: if preprocess_code:
if not Path(preprocess_code).exists(): if not Path(preprocess_code).exists():
raise ValueError("Preprocessing code \'{}\' could not be found".format(preprocess_code)) raise EndpointModelLoadException(
"Preprocessing code \'{}\' could not be found".format(preprocess_code))
preprocess_artifact_name = "py_code_{}".format(name.replace("/", "_")) preprocess_artifact_name = "py_code_{}".format(name.replace("/", "_"))
self._task.upload_artifact( self._task.upload_artifact(
name=preprocess_artifact_name, artifact_object=Path(preprocess_code), wait_on_upload=True) name=preprocess_artifact_name, artifact_object=Path(preprocess_code), wait_on_upload=True)
@ -367,7 +398,7 @@ class ModelRequestProcessor(object):
if not isinstance(canary, CanaryEP): if not isinstance(canary, CanaryEP):
canary = CanaryEP(**canary) canary = CanaryEP(**canary)
if canary.load_endpoints and canary.load_endpoint_prefix: if canary.load_endpoints and canary.load_endpoint_prefix:
raise ValueError( raise EndpointModelLoadException(
"Could not add canary endpoint with both " "Could not add canary endpoint with both "
"prefix ({}) and fixed set of endpoints ({})".format( "prefix ({}) and fixed set of endpoints ({})".format(
canary.load_endpoints, canary.load_endpoint_prefix)) canary.load_endpoints, canary.load_endpoint_prefix))
@ -406,7 +437,7 @@ class ModelRequestProcessor(object):
metric.endpoint = name metric.endpoint = name
if name not in self._endpoints and not name.endswith('*'): if name not in self._endpoints and not name.endswith('*'):
raise ValueError("Metric logging \'{}\' references a nonexistent endpoint".format(name)) raise MetricLoggingException("Metric logging \'{}\' references a nonexistent endpoint".format(name))
if name in self._metric_logging: if name in self._metric_logging:
print("Warning: Metric logging \'{}\' {}".format(name, "updated" if update else "overwritten")) print("Warning: Metric logging \'{}\' {}".format(name, "updated" if update else "overwritten"))
@ -1250,11 +1281,11 @@ class ModelRequestProcessor(object):
if task_id: if task_id:
task = Task.get_task(task_id=task_id) task = Task.get_task(task_id=task_id)
if not task: if not task:
raise ValueError("Could not find Control Task ID={}".format(task_id)) raise ServingInitializationException("Could not find Control Task ID={}".format(task_id))
task_status = task.status task_status = task.status
if task_status not in ("created", "in_progress",): if task_status not in ("created", "in_progress",):
if disable_change_state: if disable_change_state:
raise ValueError( raise ServingInitializationException(
"Could Control Task ID={} status [{}] " "Could Control Task ID={} status [{}] "
"is not valid (only 'draft', 'running' are supported)".format(task_id, task_status)) "is not valid (only 'draft', 'running' are supported)".format(task_id, task_status))
else: else:
@ -1271,7 +1302,7 @@ class ModelRequestProcessor(object):
'system_tags': [cls._system_tag]} 'system_tags': [cls._system_tag]}
) )
if not tasks: if not tasks:
raise ValueError("Could not find any valid Control Tasks") raise ServingInitializationException("Could not find any valid Control Tasks")
if len(tasks) > 1: if len(tasks) > 1:
print("Warning: more than one valid Controller Tasks found, using Task ID={}".format(tasks[0])) print("Warning: more than one valid Controller Tasks found, using Task ID={}".format(tasks[0]))
@ -1352,15 +1383,16 @@ class ModelRequestProcessor(object):
except Exception: except Exception:
suggested_cli = "?" suggested_cli = "?"
raise ValueError( raise EndpointBackendEngineException(
"Triton engine requires *manual* input/output specification, " "Triton engine requires *manual* input/output specification, "
"You input/output in your pbtxt, please remove them and specify manually.\n" "You input/output in your pbtxt, please remove them and specify manually.\n"
"{}".format(suggested_cli) "{}".format(suggested_cli)
) )
if aux_config_dict.get("default_model_filename", None): if aux_config_dict.get("default_model_filename", None):
raise ValueError("ERROR: You have `default_model_filename` in your config pbtxt, " raise EndpointBackendEngineException(
"please remove it. It will be added automatically by the system.") "ERROR: You have `default_model_filename` in your config pbtxt, "
"please remove it. It will be added automatically by the system.")
# verify we have all the info we need # verify we have all the info we need
d = endpoint.as_dict() d = endpoint.as_dict()
@ -1372,7 +1404,8 @@ class ModelRequestProcessor(object):
] ]
if missing: if missing:
raise ValueError("Triton engine requires input description - missing values in {}".format(missing)) raise EndpointBackendEngineException(
"Triton engine requires input description - missing values in {}".format(missing))
return True return True
def _add_registered_input_model(self, endpoint_url: str, model_id: str) -> bool: def _add_registered_input_model(self, endpoint_url: str, model_id: str) -> bool: