2022-03-05 23:25:56 +00:00
|
|
|
import os
|
2024-03-01 11:12:56 +00:00
|
|
|
import traceback
|
2022-03-05 23:25:56 +00:00
|
|
|
import gzip
|
2024-07-07 12:54:08 +00:00
|
|
|
import asyncio
|
2022-03-05 23:25:56 +00:00
|
|
|
|
|
|
|
from fastapi import FastAPI, Request, Response, APIRouter, HTTPException
|
|
|
|
from fastapi.routing import APIRoute
|
2024-07-07 12:54:08 +00:00
|
|
|
from fastapi.responses import PlainTextResponse
|
|
|
|
|
|
|
|
from starlette.background import BackgroundTask
|
2022-03-05 23:25:56 +00:00
|
|
|
|
2022-10-06 11:31:54 +00:00
|
|
|
from typing import Optional, Dict, Any, Callable, Union
|
2022-03-05 23:25:56 +00:00
|
|
|
|
|
|
|
from clearml_serving.version import __version__
|
2022-10-07 23:11:57 +00:00
|
|
|
from clearml_serving.serving.init import setup_task
|
2024-03-01 11:12:56 +00:00
|
|
|
from clearml_serving.serving.model_request_processor import ModelRequestProcessor, EndpointNotFoundException, \
|
|
|
|
EndpointBackendEngineException, EndpointModelLoadException, ServingInitializationException
|
2022-03-05 23:25:56 +00:00
|
|
|
|
|
|
|
|
|
|
|
class GzipRequest(Request):
|
|
|
|
async def body(self) -> bytes:
|
|
|
|
if not hasattr(self, "_body"):
|
|
|
|
body = await super().body()
|
|
|
|
if "gzip" in self.headers.getlist("Content-Encoding"):
|
|
|
|
body = gzip.decompress(body)
|
2022-03-09 02:02:03 +00:00
|
|
|
self._body = body # noqa
|
2022-03-05 23:25:56 +00:00
|
|
|
return self._body
|
|
|
|
|
|
|
|
|
|
|
|
class GzipRoute(APIRoute):
|
|
|
|
def get_route_handler(self) -> Callable:
|
|
|
|
original_route_handler = super().get_route_handler()
|
|
|
|
|
|
|
|
async def custom_route_handler(request: Request) -> Response:
|
|
|
|
request = GzipRequest(request.scope, request.receive)
|
|
|
|
return await original_route_handler(request)
|
|
|
|
|
|
|
|
return custom_route_handler
|
|
|
|
|
|
|
|
|
|
|
|
# process Lock, so that we can have only a single process doing the model reloading at a time
|
2022-10-07 23:11:57 +00:00
|
|
|
singleton_sync_lock = None # Lock()
|
|
|
|
# shared Model processor object
|
|
|
|
processor = None # type: Optional[ModelRequestProcessor]
|
2022-03-05 23:25:56 +00:00
|
|
|
|
2022-10-07 23:11:57 +00:00
|
|
|
# create clearml Task and load models
|
2024-03-01 11:12:56 +00:00
|
|
|
serving_service_task_id, session_logger, instance_id = setup_task()
|
2022-10-07 23:11:57 +00:00
|
|
|
# polling frequency
|
2022-03-05 23:25:56 +00:00
|
|
|
model_sync_frequency_secs = 5
|
|
|
|
try:
|
|
|
|
model_sync_frequency_secs = float(os.environ.get("CLEARML_SERVING_POLL_FREQ", model_sync_frequency_secs))
|
|
|
|
except (ValueError, TypeError):
|
|
|
|
pass
|
|
|
|
|
2024-07-07 12:54:08 +00:00
|
|
|
class CUDAException(Exception):
|
|
|
|
def __init__(self, exception: str):
|
|
|
|
self.exception = exception
|
2022-10-07 23:11:57 +00:00
|
|
|
|
2022-03-05 23:25:56 +00:00
|
|
|
# start FastAPI app
|
|
|
|
app = FastAPI(title="ClearML Serving Service", version=__version__, description="ClearML Service Service router")
|
|
|
|
|
|
|
|
|
|
|
|
@app.on_event("startup")
|
|
|
|
async def startup_event():
|
|
|
|
global processor
|
2022-10-07 23:11:57 +00:00
|
|
|
|
|
|
|
if processor:
|
|
|
|
print("ModelRequestProcessor already initialized [pid={}] [service_id={}]".format(
|
|
|
|
os.getpid(), serving_service_task_id))
|
|
|
|
else:
|
|
|
|
print("Starting up ModelRequestProcessor [pid={}] [service_id={}]".format(
|
|
|
|
os.getpid(), serving_service_task_id))
|
|
|
|
processor = ModelRequestProcessor(
|
|
|
|
task_id=serving_service_task_id, update_lock_guard=singleton_sync_lock,
|
|
|
|
)
|
|
|
|
print("ModelRequestProcessor [id={}] loaded".format(processor.get_id()))
|
|
|
|
processor.launch(poll_frequency_sec=model_sync_frequency_secs*60)
|
2022-03-05 23:25:56 +00:00
|
|
|
|
|
|
|
|
2024-07-07 12:54:08 +00:00
|
|
|
@app.on_event('shutdown')
|
|
|
|
def shutdown_event():
|
|
|
|
print('RESTARTING INFERENCE SERVICE!')
|
|
|
|
|
|
|
|
async def exit_app():
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
loop.stop()
|
|
|
|
|
|
|
|
@app.exception_handler(CUDAException)
|
|
|
|
async def cuda_exception_handler(request, exc):
|
|
|
|
task = BackgroundTask(exit_app)
|
|
|
|
return PlainTextResponse("CUDA out of memory. Restarting service", status_code=500, background=task)
|
|
|
|
|
|
|
|
|
2022-03-05 23:25:56 +00:00
|
|
|
router = APIRouter(
|
|
|
|
prefix="/serve",
|
|
|
|
tags=["models"],
|
|
|
|
responses={404: {"description": "Model Serving Endpoint Not found"}},
|
|
|
|
route_class=GzipRoute, # mark-out to remove support for GZip content encoding
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2022-03-06 01:14:06 +00:00
|
|
|
# cover all routing options for model version `/{model_id}`, `/{model_id}/123`, `/{model_id}?version=123`
|
|
|
|
@router.post("/{model_id}/{version}")
|
|
|
|
@router.post("/{model_id}/")
|
2022-03-05 23:25:56 +00:00
|
|
|
@router.post("/{model_id}")
|
2022-10-06 11:31:54 +00:00
|
|
|
async def serve_model(model_id: str, version: Optional[str] = None, request: Union[bytes, Dict[Any, Any]] = None):
|
2022-03-05 23:25:56 +00:00
|
|
|
try:
|
2022-10-07 23:11:57 +00:00
|
|
|
return_value = await processor.process_request(
|
2022-03-05 23:25:56 +00:00
|
|
|
base_url=model_id,
|
|
|
|
version=version,
|
|
|
|
request_body=request
|
|
|
|
)
|
2024-03-01 11:12:56 +00:00
|
|
|
except EndpointNotFoundException as ex:
|
|
|
|
raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex))
|
|
|
|
except (EndpointModelLoadException, EndpointBackendEngineException) as ex:
|
|
|
|
session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
|
|
|
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
|
|
|
raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
|
|
|
except ServingInitializationException as ex:
|
|
|
|
session_logger.report_text("[{}] Exception [{}] {} while loading serving inference: {}\n{}".format(
|
|
|
|
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
|
|
|
raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
2022-10-07 23:11:57 +00:00
|
|
|
except ValueError as ex:
|
2024-03-01 11:12:56 +00:00
|
|
|
session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
|
|
|
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
2024-07-07 12:54:08 +00:00
|
|
|
if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex):
|
|
|
|
raise CUDAException(exception=ex)
|
|
|
|
else:
|
|
|
|
raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
2022-10-07 23:11:57 +00:00
|
|
|
except Exception as ex:
|
2024-03-01 11:12:56 +00:00
|
|
|
session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
|
|
|
|
instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
|
|
|
|
raise HTTPException(status_code=500, detail="Error [{}] processing request: {}".format(type(ex), ex))
|
2022-03-05 23:25:56 +00:00
|
|
|
return return_value
|
|
|
|
|
|
|
|
|
|
|
|
app.include_router(router)
|