This commit is contained in:
eajechiloae 2024-08-15 16:46:35 +03:00 committed by GitHub
commit b67eba1093
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 71 additions and 54 deletions

View File

@ -41,6 +41,17 @@ class Preprocess(object):
""" """
pass pass
def unload(self) -> None:
"""
OPTIONAL: provide unloading method for the model
For example:
```py
import torch
torch.cuda.empty_cache()
```
"""
pass
def preprocess( def preprocess(
self, self,
body: Union[bytes, dict], body: Union[bytes, dict],

View File

@ -9,6 +9,7 @@ echo CLEARML_EXTRA_PYTHON_PACKAGES="$CLEARML_EXTRA_PYTHON_PACKAGES"
echo CLEARML_SERVING_NUM_PROCESS="$CLEARML_SERVING_NUM_PROCESS" echo CLEARML_SERVING_NUM_PROCESS="$CLEARML_SERVING_NUM_PROCESS"
echo CLEARML_SERVING_POLL_FREQ="$CLEARML_SERVING_POLL_FREQ" echo CLEARML_SERVING_POLL_FREQ="$CLEARML_SERVING_POLL_FREQ"
echo CLEARML_DEFAULT_KAFKA_SERVE_URL="$CLEARML_DEFAULT_KAFKA_SERVE_URL" echo CLEARML_DEFAULT_KAFKA_SERVE_URL="$CLEARML_DEFAULT_KAFKA_SERVE_URL"
echo CLEARML_SERVING_RESTART_ON_FAILURE="$CLEARML_SERVING_RESTART_ON_FAILURE"
SERVING_PORT="${CLEARML_SERVING_PORT:-8080}" SERVING_PORT="${CLEARML_SERVING_PORT:-8080}"
GUNICORN_NUM_PROCESS="${CLEARML_SERVING_NUM_PROCESS:-4}" GUNICORN_NUM_PROCESS="${CLEARML_SERVING_NUM_PROCESS:-4}"
@ -40,29 +41,36 @@ then
python3 -m pip install $CLEARML_EXTRA_PYTHON_PACKAGES python3 -m pip install $CLEARML_EXTRA_PYTHON_PACKAGES
fi fi
if [ -z "$CLEARML_USE_GUNICORN" ] while : ; do
then if [ -z "$CLEARML_USE_GUNICORN" ]
if [ -z "$CLEARML_SERVING_NUM_PROCESS" ]
then then
echo "Starting Uvicorn server - single worker" if [ -z "$CLEARML_SERVING_NUM_PROCESS" ]
PYTHONPATH=$(pwd) python3 -m uvicorn \ then
clearml_serving.serving.main:app --log-level $UVICORN_LOG_LEVEL --host 0.0.0.0 --port $SERVING_PORT --loop $UVICORN_SERVE_LOOP \ echo "Starting Uvicorn server - single worker"
$UVICORN_EXTRA_ARGS PYTHONPATH=$(pwd) python3 -m uvicorn \
clearml_serving.serving.main:app --log-level $UVICORN_LOG_LEVEL --host 0.0.0.0 --port $SERVING_PORT --loop $UVICORN_SERVE_LOOP \
$UVICORN_EXTRA_ARGS
else
echo "Starting Uvicorn server - multi worker"
PYTHONPATH=$(pwd) python3 clearml_serving/serving/uvicorn_mp_entrypoint.py \
clearml_serving.serving.main:app --log-level $UVICORN_LOG_LEVEL --host 0.0.0.0 --port $SERVING_PORT --loop $UVICORN_SERVE_LOOP \
--workers $CLEARML_SERVING_NUM_PROCESS $UVICORN_EXTRA_ARGS
fi
else else
echo "Starting Uvicorn server - multi worker" echo "Starting Gunicorn server"
PYTHONPATH=$(pwd) python3 clearml_serving/serving/uvicorn_mp_entrypoint.py \ # start service
clearml_serving.serving.main:app --log-level $UVICORN_LOG_LEVEL --host 0.0.0.0 --port $SERVING_PORT --loop $UVICORN_SERVE_LOOP \ PYTHONPATH=$(pwd) python3 -m gunicorn \
--workers $CLEARML_SERVING_NUM_PROCESS $UVICORN_EXTRA_ARGS --preload clearml_serving.serving.main:app \
--workers $GUNICORN_NUM_PROCESS \
--worker-class uvicorn.workers.UvicornWorker \
--max-requests $GUNICORN_MAX_REQUESTS \
--timeout $GUNICORN_SERVING_TIMEOUT \
--bind 0.0.0.0:$SERVING_PORT \
$GUNICORN_EXTRA_ARGS
fi fi
else
echo "Starting Gunicorn server" if [ -z "$CLEARML_SERVING_RESTART_ON_FAILURE" ]
# start service then
PYTHONPATH=$(pwd) python3 -m gunicorn \ break
--preload clearml_serving.serving.main:app \ fi
--workers $GUNICORN_NUM_PROCESS \ done
--worker-class uvicorn.workers.UvicornWorker \
--max-requests $GUNICORN_MAX_REQUESTS \
--timeout $GUNICORN_SERVING_TIMEOUT \
--bind 0.0.0.0:$SERVING_PORT \
$GUNICORN_EXTRA_ARGS
fi

View File

@ -1,13 +1,9 @@
import os import os
import traceback import traceback
import gzip import gzip
import asyncio
from fastapi import FastAPI, Request, Response, APIRouter, HTTPException from fastapi import FastAPI, Request, Response, APIRouter, HTTPException
from fastapi.routing import APIRoute from fastapi.routing import APIRoute
from fastapi.responses import PlainTextResponse
from starlette.background import BackgroundTask
from typing import Optional, Dict, Any, Callable, Union from typing import Optional, Dict, Any, Callable, Union
@ -52,9 +48,6 @@ try:
except (ValueError, TypeError): except (ValueError, TypeError):
pass pass
class CUDAException(Exception):
def __init__(self, exception: str):
self.exception = exception
# start FastAPI app # start FastAPI app
app = FastAPI(title="ClearML Serving Service", version=__version__, description="ClearML Service Service router") app = FastAPI(title="ClearML Serving Service", version=__version__, description="ClearML Service Service router")
@ -77,20 +70,6 @@ async def startup_event():
processor.launch(poll_frequency_sec=model_sync_frequency_secs*60) processor.launch(poll_frequency_sec=model_sync_frequency_secs*60)
@app.on_event('shutdown')
def shutdown_event():
print('RESTARTING INFERENCE SERVICE!')
async def exit_app():
loop = asyncio.get_running_loop()
loop.stop()
@app.exception_handler(CUDAException)
async def cuda_exception_handler(request, exc):
task = BackgroundTask(exit_app)
return PlainTextResponse("CUDA out of memory. Restarting service", status_code=500, background=task)
router = APIRouter( router = APIRouter(
prefix="/serve", prefix="/serve",
tags=["models"], tags=["models"],
@ -124,9 +103,9 @@ async def serve_model(model_id: str, version: Optional[str] = None, request: Uni
session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) instance_id, type(ex), ex, request, "".join(traceback.format_exc())))
if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex): if "CUDA out of memory. " in str(ex) or "NVML_SUCCESS == r INTERNAL ASSERT FAILED" in str(ex):
raise CUDAException(exception=ex) # can't always recover from this - prefer to exit the program such that it can be restarted
else: os._exit(1)
raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex)) raise HTTPException(status_code=422, detail="Error [{}] processing request: {}".format(type(ex), ex))
except Exception as ex: except Exception as ex:
session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format( session_logger.report_text("[{}] Exception [{}] {} while processing request: {}\n{}".format(
instance_id, type(ex), ex, request, "".join(traceback.format_exc()))) instance_id, type(ex), ex, request, "".join(traceback.format_exc())))

View File

@ -1,7 +1,6 @@
import json import json
import os import os
import gc import gc
import torch
from collections import deque from collections import deque
from pathlib import Path from pathlib import Path
from random import random from random import random
@ -169,6 +168,8 @@ class ModelRequestProcessor(object):
# retry to process # retry to process
return await self.process_request(base_url=base_url, version=version, request_body=request_body) return await self.process_request(base_url=base_url, version=version, request_body=request_body)
processor = None
url = None
try: try:
# normalize url and version # normalize url and version
url = self._normalize_endpoint_url(base_url, version) url = self._normalize_endpoint_url(base_url, version)
@ -190,6 +191,8 @@ class ModelRequestProcessor(object):
return_value = await self._process_request(processor=processor, url=url, body=request_body) return_value = await self._process_request(processor=processor, url=url, body=request_body)
finally: finally:
if url and processor is not None and processor is not self._engine_processor_lookup.get(url):
gc.collect()
self._request_processing_state.dec() self._request_processing_state.dec()
return return_value return return_value
@ -907,22 +910,22 @@ class ModelRequestProcessor(object):
if cleanup or model_monitor_update: if cleanup or model_monitor_update:
self._update_serving_plot() self._update_serving_plot()
if cleanup: if cleanup:
gc.collect()
self._engine_processor_lookup = dict() self._engine_processor_lookup = dict()
except Exception as ex: except Exception as ex:
print("Exception occurred in monitoring thread: {}".format(ex)) print("Exception occurred in monitoring thread: {}".format(ex))
sleep(poll_frequency_sec) sleep(poll_frequency_sec)
try: try:
# we assume that by now all old deleted endpoints requests already returned # we assume that by now all old deleted endpoints requests already returned
call_gc_collect = False
if model_monitor_update and not cleanup: if model_monitor_update and not cleanup:
for k in list(self._engine_processor_lookup.keys()): for k in list(self._engine_processor_lookup.keys()):
if k not in self._endpoints: if k not in self._endpoints:
# atomic # atomic
self._engine_processor_lookup[k]._model = None
self._engine_processor_lookup[k]._preprocess = None
del self._engine_processor_lookup[k]
self._engine_processor_lookup.pop(k, None) self._engine_processor_lookup.pop(k, None)
gc.collect() call_gc_collect = True
torch.cuda.empty_cache() if call_gc_collect:
gc.collect()
cleanup = False cleanup = False
model_monitor_update = False model_monitor_update = False
except Exception as ex: except Exception as ex:

View File

@ -90,7 +90,18 @@ class BasePreprocessRequest(object):
sys.modules[spec.name] = _preprocess sys.modules[spec.name] = _preprocess
spec.loader.exec_module(_preprocess) spec.loader.exec_module(_preprocess)
Preprocess = _preprocess.Preprocess # noqa class PreprocessDelWrapper(_preprocess.Preprocess):
def __del__(self):
super_ = super(PreprocessDelWrapper, self)
if callable(getattr(super_, "unload", None)):
try:
super_.unload()
except Exception as ex:
print("Failed unloading model: {}".format(ex))
if callable(getattr(super_, "__del__", None)):
super_.__del__()
Preprocess = PreprocessDelWrapper # noqa
# override `send_request` method # override `send_request` method
Preprocess.send_request = BasePreprocessRequest._preprocess_send_request Preprocess.send_request = BasePreprocessRequest._preprocess_send_request
# create preprocess class # create preprocess class

View File

@ -96,6 +96,7 @@ services:
CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092} CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092}
CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-clearml-serving-triton:8001} CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-clearml-serving-triton:8001}
CLEARML_USE_GUNICORN: ${CLEARML_USE_GUNICORN:-} CLEARML_USE_GUNICORN: ${CLEARML_USE_GUNICORN:-}
CLEARML_SERVING_RESTART_ON_FAILURE: ${CLEARML_SERVING_RESTART_ON_FAILURE:-}
CLEARML_SERVING_NUM_PROCESS: ${CLEARML_SERVING_NUM_PROCESS:-} CLEARML_SERVING_NUM_PROCESS: ${CLEARML_SERVING_NUM_PROCESS:-}
CLEARML_EXTRA_PYTHON_PACKAGES: ${CLEARML_EXTRA_PYTHON_PACKAGES:-} CLEARML_EXTRA_PYTHON_PACKAGES: ${CLEARML_EXTRA_PYTHON_PACKAGES:-}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-} AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-}

View File

@ -96,6 +96,7 @@ services:
CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092} CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092}
CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-clearml-serving-triton:8001} CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-clearml-serving-triton:8001}
CLEARML_USE_GUNICORN: ${CLEARML_USE_GUNICORN:-} CLEARML_USE_GUNICORN: ${CLEARML_USE_GUNICORN:-}
CLEARML_SERVING_RESTART_ON_FAILURE: ${CLEARML_SERVING_RESTART_ON_FAILURE:-}
CLEARML_SERVING_NUM_PROCESS: ${CLEARML_SERVING_NUM_PROCESS:-} CLEARML_SERVING_NUM_PROCESS: ${CLEARML_SERVING_NUM_PROCESS:-}
CLEARML_EXTRA_PYTHON_PACKAGES: ${CLEARML_EXTRA_PYTHON_PACKAGES:-} CLEARML_EXTRA_PYTHON_PACKAGES: ${CLEARML_EXTRA_PYTHON_PACKAGES:-}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-} AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-}

View File

@ -96,6 +96,7 @@ services:
CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092} CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092}
CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-} CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-}
CLEARML_USE_GUNICORN: ${CLEARML_USE_GUNICORN:-} CLEARML_USE_GUNICORN: ${CLEARML_USE_GUNICORN:-}
CLEARML_SERVING_RESTART_ON_FAILURE: ${CLEARML_SERVING_RESTART_ON_FAILURE:-}
CLEARML_SERVING_NUM_PROCESS: ${CLEARML_SERVING_NUM_PROCESS:-} CLEARML_SERVING_NUM_PROCESS: ${CLEARML_SERVING_NUM_PROCESS:-}
CLEARML_EXTRA_PYTHON_PACKAGES: ${CLEARML_EXTRA_PYTHON_PACKAGES:-} CLEARML_EXTRA_PYTHON_PACKAGES: ${CLEARML_EXTRA_PYTHON_PACKAGES:-}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-} AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-}

View File

@ -73,6 +73,8 @@ CLEARML_EXTRA_PYTHON_PACKAGES=transformers
# Change this depending on your machine and performance needs # Change this depending on your machine and performance needs
CLEARML_USE_GUNICORN=1 CLEARML_USE_GUNICORN=1
CLEARML_SERVING_NUM_PROCESS=8 CLEARML_SERVING_NUM_PROCESS=8
# Restarts if the serving process crashes
CLEARML_SERVING_RESTART_ON_FAILURE=1
``` ```
Huggingface models require Triton engine support, please use `docker-compose-triton.yml` / `docker-compose-triton-gpu.yml` or if running on Kubernetes, the matching helm chart to set things up. Check the repository main readme documentation if you need help. Huggingface models require Triton engine support, please use `docker-compose-triton.yml` / `docker-compose-triton-gpu.yml` or if running on Kubernetes, the matching helm chart to set things up. Check the repository main readme documentation if you need help.