Improve pipeline concurrency

This commit is contained in:
allegroai 2022-04-18 23:24:04 +03:00
parent 8314074edf
commit 49e5acba53

View File

@ -1,3 +1,4 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Any, List from typing import Any, List
@ -5,7 +6,7 @@ from typing import Any, List
class Preprocess(object): class Preprocess(object):
def __init__(self): def __init__(self):
# set internal state, this will be called only once. (i.e. not per request) # set internal state, this will be called only once. (i.e. not per request)
pass self.executor = ThreadPoolExecutor(max_workers=32)
def postprocess(self, data: List[dict], collect_custom_statistics_fn=None) -> dict: def postprocess(self, data: List[dict], collect_custom_statistics_fn=None) -> dict:
# we will here average the results and return the new value # we will here average the results and return the new value
@ -19,8 +20,12 @@ class Preprocess(object):
do something with the actual data, return any type of object. do something with the actual data, return any type of object.
The returned object will be passed as is to the postprocess function engine The returned object will be passed as is to the postprocess function engine
""" """
predict_a = self.send_request(endpoint="/test_model_sklearn_a/", version=None, data=data) predict_a = self.executor.submit(self.send_request, endpoint="/test_model_sklearn_a/", version=None, data=data)
predict_b = self.send_request(endpoint="/test_model_sklearn_b/", version=None, data=data) predict_b = self.executor.submit(self.send_request, endpoint="/test_model_sklearn_b/", version=None, data=data)
predict_a = predict_a.result()
predict_b = predict_b.result()
if not predict_b or not predict_a: if not predict_b or not predict_a:
raise ValueError("Error requesting inference endpoint test_model_sklearn a/b") raise ValueError("Error requesting inference endpoint test_model_sklearn a/b")