diff --git a/trains/storage/helper.py b/trains/storage/helper.py index 9cc9d965..7f9ebc64 100644 --- a/trains/storage/helper.py +++ b/trains/storage/helper.py @@ -46,8 +46,6 @@ if level: except (TypeError, ValueError): log.error('invalid storage log level in configuration: %s' % level) -upload_pool = ThreadPool(processes=1) - class StorageError(Exception): pass @@ -213,6 +211,7 @@ class StorageHelper(object): # global terminate event for async upload threads _terminate = threading.Event() _async_upload_threads = set() + _upload_pool = None # collect all bucket credentials that aren't empty (ignore entries with an empty key or secret) _s3_configurations = S3BucketConfigurations.from_config(config.get('aws.s3', {})) @@ -565,7 +564,8 @@ class StorageHelper(object): if async_enable: data = self._UploadData(src_path=src_path, dest_path=dest_path, extra=extra, callback=cb, retries=retries) - return upload_pool.apply_async(self._do_async_upload, args=(data,)) + StorageHelper._initialize_upload_pool() + return StorageHelper._upload_pool.apply_async(self._do_async_upload, args=(data,)) else: return self._do_upload(src_path, dest_path, extra, cb, verbose=False, retries=retries) @@ -952,6 +952,22 @@ class StorageHelper(object): self.log.warning('Storage helper problem for {}: {}'.format(str(object_name), str(e))) return None + @staticmethod + def _initialize_upload_pool(): + if not StorageHelper._upload_pool: + StorageHelper._upload_pool = ThreadPool(processes=1) + + @staticmethod + def close_async_threads(): + if StorageHelper._upload_pool: + pool = StorageHelper._upload_pool + StorageHelper._upload_pool = None + try: + pool.close() + pool.join() + except: + pass + class _HttpDriver(_Driver): """ LibCloud http/https adapter (simple, enough for now) """ diff --git a/trains/task.py b/trains/task.py index 79fee37c..1f906b60 100644 --- a/trains/task.py +++ b/trains/task.py @@ -1436,6 +1436,13 @@ class Task(_Task): Metrics.close_async_threads() # notice: this will close the jupyter monitoring ScriptInfo.close() + if self.is_main_task(): + try: + from .storage.helper import StorageHelper + StorageHelper.close_async_threads() + except: + pass + if print_done_waiting: self.log.info('Finished uploading') elif self._logger: