diff --git a/clearml_serving/serving/model_request_processor.py b/clearml_serving/serving/model_request_processor.py index 98f8b0d..49de499 100644 --- a/clearml_serving/serving/model_request_processor.py +++ b/clearml_serving/serving/model_request_processor.py @@ -168,6 +168,8 @@ class ModelRequestProcessor(object): # retry to process return await self.process_request(base_url=base_url, version=version, request_body=request_body) + processor = None + url = None try: # normalize url and version url = self._normalize_endpoint_url(base_url, version) @@ -189,6 +191,9 @@ class ModelRequestProcessor(object): return_value = await self._process_request(processor=processor, url=url, body=request_body) finally: + if url and processor is not None and processor is not self._engine_processor_lookup.get(url): + self._report_text("calling gc collect in request processing") + gc.collect() self._request_processing_state.dec() return return_value @@ -906,6 +911,8 @@ class ModelRequestProcessor(object): if cleanup or model_monitor_update: self._update_serving_plot() if cleanup: + self._report_text("calling gc collect in cleanup") + gc.collect() self._engine_processor_lookup = dict() except Exception as ex: print("Exception occurred in monitoring thread: {}".format(ex)) @@ -914,23 +921,16 @@ class ModelRequestProcessor(object): self._report_text("after sleep") try: # we assume that by now all old deleted endpoints requests already returned - self._report_text("model_monitor_update and not cleanup") + call_gc_collect = False if model_monitor_update and not cleanup: - self._report_text("for k in list(self._engine_processor_lookup") for k in list(self._engine_processor_lookup.keys()): - self._report_text(f"if k now in self._endpoints {self._endpoints} {k}") if k not in self._endpoints: # atomic - self._engine_processor_lookup[k]._model = None - gc.collect() - self._report_text(str(self._engine_processor_lookup[k]._preprocess)) - self._report_text(dir(self._engine_processor_lookup[k]._preprocess)) - if hasattr(self._engine_processor_lookup[k]._preprocess, "unload"): - try: - self._engine_processor_lookup[k]._preprocess.unload() - except Exception as ex: - print("Exception occurred unloading model: {}".format(ex)) self._engine_processor_lookup.pop(k, None) + call_gc_collect = True + if call_gc_collect: + self._report_text("calling gc collect in try") + gc.collect() cleanup = False model_monitor_update = False except Exception as ex: diff --git a/clearml_serving/serving/preprocess_service.py b/clearml_serving/serving/preprocess_service.py index a5c069c..81d4926 100644 --- a/clearml_serving/serving/preprocess_service.py +++ b/clearml_serving/serving/preprocess_service.py @@ -90,7 +90,15 @@ class BasePreprocessRequest(object): sys.modules[spec.name] = _preprocess spec.loader.exec_module(_preprocess) - Preprocess = _preprocess.Preprocess # noqa + class PreprocessDelWrapper(_preprocess.Preprocess): + def __del__(self): + super_ = super(PreprocessDelWrapper, self) + if callable(getattr(super_, "unload", None)): + super_.unload() + if callable(getattr(super_, "__del__", None)): + super_.__del__() + + Preprocess = PreprocessDelWrapper # noqa # override `send_request` method Preprocess.send_request = BasePreprocessRequest._preprocess_send_request # create preprocess class