Fix support for Task init/close multiple times

This commit is contained in:
allegroai 2020-03-20 10:20:06 +02:00
parent 667ddcab88
commit dc915d0241

View File

@ -15,9 +15,6 @@ from ...storage import StorageHelper
from .events import MetricsEventAdapter from .events import MetricsEventAdapter
upload_pool = ThreadPool(processes=1)
file_upload_pool = ThreadPool(processes=config.get('network.metrics.file_upload_threads', 4))
log = get_logger('metrics') log = get_logger('metrics')
@ -26,6 +23,8 @@ class Metrics(InterfaceBase):
_storage_lock = Lock() _storage_lock = Lock()
_file_upload_starvation_warning_sec = config.get('network.metrics.file_upload_starvation_warning_sec', None) _file_upload_starvation_warning_sec = config.get('network.metrics.file_upload_starvation_warning_sec', None)
_file_upload_retries = 3 _file_upload_retries = 3
_upload_pool = None
_file_upload_pool = None
@property @property
def storage_key_prefix(self): def storage_key_prefix(self):
@ -77,7 +76,8 @@ class Metrics(InterfaceBase):
except Exception as e: except Exception as e:
return e return e
return upload_pool.apply_async( self._initialize_upload_pools()
return self._upload_pool.apply_async(
safe_call, safe_call,
args=(events, storage_uri), args=(events, storage_uri),
callback=partial(self._callback_wrapper, callback)) callback=partial(self._callback_wrapper, callback))
@ -173,7 +173,8 @@ class Metrics(InterfaceBase):
except Exception: except Exception:
pass pass
res = file_upload_pool.map_async(upload, entries) self._initialize_upload_pools()
res = self._file_upload_pool.map_async(upload, entries)
res.wait() res.wait()
# remember the last time we uploaded a file # remember the last time we uploaded a file
@ -205,17 +206,31 @@ class Metrics(InterfaceBase):
return None return None
@staticmethod @staticmethod
def close_async_threads(): def _initialize_upload_pools():
global file_upload_pool if not Metrics._upload_pool:
global upload_pool Metrics._upload_pool = ThreadPool(processes=1)
try: if not Metrics._file_upload_pool:
file_upload_pool.close() Metrics._file_upload_pool = ThreadPool(
file_upload_pool.join() processes=config.get('network.metrics.file_upload_threads', 4))
except:
pass @staticmethod
def close_async_threads():
file_pool = Metrics._file_upload_pool
Metrics._file_upload_pool = None
pool = Metrics._upload_pool
Metrics._upload_pool = None
if file_pool:
try:
file_pool.close()
file_pool.join()
except:
pass
if pool:
try:
pool.close()
pool.join()
except:
pass
try:
upload_pool.close()
upload_pool.join()
except:
pass