Fix StorageHelper upload on shutdown

This commit is contained in:
allegroai 2020-03-20 10:20:44 +02:00
parent dc915d0241
commit 0adbd79975
2 changed files with 26 additions and 3 deletions

View File

@ -46,8 +46,6 @@ if level:
except (TypeError, ValueError):
log.error('invalid storage log level in configuration: %s' % level)
upload_pool = ThreadPool(processes=1)
class StorageError(Exception):
pass
@ -213,6 +211,7 @@ class StorageHelper(object):
# global terminate event for async upload threads
_terminate = threading.Event()
_async_upload_threads = set()
_upload_pool = None
# collect all bucket credentials that aren't empty (ignore entries with an empty key or secret)
_s3_configurations = S3BucketConfigurations.from_config(config.get('aws.s3', {}))
@ -565,7 +564,8 @@ class StorageHelper(object):
if async_enable:
data = self._UploadData(src_path=src_path, dest_path=dest_path, extra=extra, callback=cb, retries=retries)
return upload_pool.apply_async(self._do_async_upload, args=(data,))
StorageHelper._initialize_upload_pool()
return StorageHelper._upload_pool.apply_async(self._do_async_upload, args=(data,))
else:
return self._do_upload(src_path, dest_path, extra, cb, verbose=False, retries=retries)
@ -952,6 +952,22 @@ class StorageHelper(object):
self.log.warning('Storage helper problem for {}: {}'.format(str(object_name), str(e)))
return None
@staticmethod
def _initialize_upload_pool():
if not StorageHelper._upload_pool:
StorageHelper._upload_pool = ThreadPool(processes=1)
@staticmethod
def close_async_threads():
if StorageHelper._upload_pool:
pool = StorageHelper._upload_pool
StorageHelper._upload_pool = None
try:
pool.close()
pool.join()
except:
pass
class _HttpDriver(_Driver):
""" LibCloud http/https adapter (simple, enough for now) """

View File

@ -1436,6 +1436,13 @@ class Task(_Task):
Metrics.close_async_threads()
# notice: this will close the jupyter monitoring
ScriptInfo.close()
if self.is_main_task():
try:
from .storage.helper import StorageHelper
StorageHelper.close_async_threads()
except:
pass
if print_done_waiting:
self.log.info('Finished uploading')
elif self._logger: