mirror of
https://github.com/clearml/clearml-serving
synced 2025-06-26 18:16:00 +00:00
let gc handle unload
This commit is contained in:
parent
5d5188de40
commit
1f4e1599e3
@ -168,6 +168,8 @@ class ModelRequestProcessor(object):
|
|||||||
# retry to process
|
# retry to process
|
||||||
return await self.process_request(base_url=base_url, version=version, request_body=request_body)
|
return await self.process_request(base_url=base_url, version=version, request_body=request_body)
|
||||||
|
|
||||||
|
processor = None
|
||||||
|
url = None
|
||||||
try:
|
try:
|
||||||
# normalize url and version
|
# normalize url and version
|
||||||
url = self._normalize_endpoint_url(base_url, version)
|
url = self._normalize_endpoint_url(base_url, version)
|
||||||
@ -189,6 +191,9 @@ class ModelRequestProcessor(object):
|
|||||||
|
|
||||||
return_value = await self._process_request(processor=processor, url=url, body=request_body)
|
return_value = await self._process_request(processor=processor, url=url, body=request_body)
|
||||||
finally:
|
finally:
|
||||||
|
if url and processor is not None and processor is not self._engine_processor_lookup.get(url):
|
||||||
|
self._report_text("calling gc collect in request processing")
|
||||||
|
gc.collect()
|
||||||
self._request_processing_state.dec()
|
self._request_processing_state.dec()
|
||||||
|
|
||||||
return return_value
|
return return_value
|
||||||
@ -906,6 +911,8 @@ class ModelRequestProcessor(object):
|
|||||||
if cleanup or model_monitor_update:
|
if cleanup or model_monitor_update:
|
||||||
self._update_serving_plot()
|
self._update_serving_plot()
|
||||||
if cleanup:
|
if cleanup:
|
||||||
|
self._report_text("calling gc collect in cleanup")
|
||||||
|
gc.collect()
|
||||||
self._engine_processor_lookup = dict()
|
self._engine_processor_lookup = dict()
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
print("Exception occurred in monitoring thread: {}".format(ex))
|
print("Exception occurred in monitoring thread: {}".format(ex))
|
||||||
@ -914,23 +921,16 @@ class ModelRequestProcessor(object):
|
|||||||
self._report_text("after sleep")
|
self._report_text("after sleep")
|
||||||
try:
|
try:
|
||||||
# we assume that by now all old deleted endpoints requests already returned
|
# we assume that by now all old deleted endpoints requests already returned
|
||||||
self._report_text("model_monitor_update and not cleanup")
|
call_gc_collect = False
|
||||||
if model_monitor_update and not cleanup:
|
if model_monitor_update and not cleanup:
|
||||||
self._report_text("for k in list(self._engine_processor_lookup")
|
|
||||||
for k in list(self._engine_processor_lookup.keys()):
|
for k in list(self._engine_processor_lookup.keys()):
|
||||||
self._report_text(f"if k now in self._endpoints {self._endpoints} {k}")
|
|
||||||
if k not in self._endpoints:
|
if k not in self._endpoints:
|
||||||
# atomic
|
# atomic
|
||||||
self._engine_processor_lookup[k]._model = None
|
|
||||||
gc.collect()
|
|
||||||
self._report_text(str(self._engine_processor_lookup[k]._preprocess))
|
|
||||||
self._report_text(dir(self._engine_processor_lookup[k]._preprocess))
|
|
||||||
if hasattr(self._engine_processor_lookup[k]._preprocess, "unload"):
|
|
||||||
try:
|
|
||||||
self._engine_processor_lookup[k]._preprocess.unload()
|
|
||||||
except Exception as ex:
|
|
||||||
print("Exception occurred unloading model: {}".format(ex))
|
|
||||||
self._engine_processor_lookup.pop(k, None)
|
self._engine_processor_lookup.pop(k, None)
|
||||||
|
call_gc_collect = True
|
||||||
|
if call_gc_collect:
|
||||||
|
self._report_text("calling gc collect in try")
|
||||||
|
gc.collect()
|
||||||
cleanup = False
|
cleanup = False
|
||||||
model_monitor_update = False
|
model_monitor_update = False
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
|
@ -90,7 +90,15 @@ class BasePreprocessRequest(object):
|
|||||||
sys.modules[spec.name] = _preprocess
|
sys.modules[spec.name] = _preprocess
|
||||||
spec.loader.exec_module(_preprocess)
|
spec.loader.exec_module(_preprocess)
|
||||||
|
|
||||||
Preprocess = _preprocess.Preprocess # noqa
|
class PreprocessDelWrapper(_preprocess.Preprocess):
|
||||||
|
def __del__(self):
|
||||||
|
super_ = super(PreprocessDelWrapper, self)
|
||||||
|
if callable(getattr(super_, "unload", None)):
|
||||||
|
super_.unload()
|
||||||
|
if callable(getattr(super_, "__del__", None)):
|
||||||
|
super_.__del__()
|
||||||
|
|
||||||
|
Preprocess = PreprocessDelWrapper # noqa
|
||||||
# override `send_request` method
|
# override `send_request` method
|
||||||
Preprocess.send_request = BasePreprocessRequest._preprocess_send_request
|
Preprocess.send_request = BasePreprocessRequest._preprocess_send_request
|
||||||
# create preprocess class
|
# create preprocess class
|
||||||
|
Loading…
Reference in New Issue
Block a user