Add artifacts upload retries (default 3)

This commit is contained in:
allegroai 2019-10-05 15:25:48 +03:00
parent 005f445600
commit 99a702d149
4 changed files with 55 additions and 25 deletions

View File

@ -39,8 +39,10 @@ class _RetryFilter(logging.Filter):
if record.args and len(record.args) > 0 and isinstance(record.args[0], Retry): if record.args and len(record.args) > 0 and isinstance(record.args[0], Retry):
left = (record.args[0].total, record.args[0].connect, record.args[0].read, left = (record.args[0].total, record.args[0].connect, record.args[0].read,
record.args[0].redirect, record.args[0].status) record.args[0].redirect, record.args[0].status)
retry_left = max(left) - min(left) left = [l for l in left if isinstance(l, int)]
return retry_left >= self.display_warning_after if left:
retry_left = max(left) - min(left)
return retry_left >= self.display_warning_after
return True return True

View File

@ -25,6 +25,7 @@ class Metrics(InterfaceBase):
""" Metrics manager and batch writer """ """ Metrics manager and batch writer """
_storage_lock = Lock() _storage_lock = Lock()
_file_upload_starvation_warning_sec = config.get('network.metrics.file_upload_starvation_warning_sec', None) _file_upload_starvation_warning_sec = config.get('network.metrics.file_upload_starvation_warning_sec', None)
_file_upload_retries = 3
@property @property
def storage_key_prefix(self): def storage_key_prefix(self):
@ -149,7 +150,7 @@ class Metrics(InterfaceBase):
try: try:
storage = self._get_storage(upload_uri) storage = self._get_storage(upload_uri)
url = storage.upload_from_stream(e.stream, e.url) url = storage.upload_from_stream(e.stream, e.url, retries=self._file_upload_retries)
e.event.update(url=url) e.event.update(url=url)
except Exception as exp: except Exception as exp:
log.warning("Failed uploading to {} ({})".format( log.warning("Failed uploading to {} ({})".format(

View File

@ -7,8 +7,8 @@ from copy import deepcopy
from datetime import datetime from datetime import datetime
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from tempfile import mkdtemp, mkstemp from tempfile import mkdtemp, mkstemp
from threading import Thread, Event from threading import Thread
from multiprocessing import RLock from multiprocessing import RLock, Event
from time import time from time import time
import humanfriendly import humanfriendly

View File

@ -148,11 +148,16 @@ class StorageHelper(object):
def callback(self): def callback(self):
return self._callback return self._callback
def __init__(self, src_path, dest_path, extra, callback): @property
def retries(self):
return self._retries
def __init__(self, src_path, dest_path, extra, callback, retries):
self._src_path = src_path self._src_path = src_path
self._dest_path = dest_path self._dest_path = dest_path
self._extra = extra self._extra = extra
self._callback = callback self._callback = callback
self._retries = retries
def __str__(self): def __str__(self):
return "src=%s" % self.src_path return "src=%s" % self.src_path
@ -472,30 +477,44 @@ class StorageHelper(object):
return folder_uri return folder_uri
def upload_from_stream(self, stream, dest_path, extra=None): def upload_from_stream(self, stream, dest_path, extra=None, retries=1):
dest_path = self._canonize_url(dest_path) dest_path = self._canonize_url(dest_path)
object_name = self._normalize_object_name(dest_path) object_name = self._normalize_object_name(dest_path)
extra = extra.copy() if extra else {} extra = extra.copy() if extra else {}
extra.update(self._extra) extra.update(self._extra)
self._driver.upload_object_via_stream( last_ex = None
iterator=stream, for i in range(max(1, retries)):
container=self._container, try:
object_name=object_name, self._driver.upload_object_via_stream(
extra=extra) iterator=stream,
container=self._container,
object_name=object_name,
extra=extra)
last_ex = None
break
except Exception as ex:
last_ex = ex
# seek to beginning if possible
try:
stream.seek(0)
except:
pass
if last_ex:
raise last_ex
return dest_path return dest_path
def upload(self, src_path, dest_path=None, extra=None, async_enable=False, cb=None): def upload(self, src_path, dest_path=None, extra=None, async_enable=False, cb=None, retries=1):
if not dest_path: if not dest_path:
dest_path = os.path.basename(src_path) dest_path = os.path.basename(src_path)
dest_path = self._canonize_url(dest_path) dest_path = self._canonize_url(dest_path)
if async_enable: if async_enable:
data = self._UploadData(src_path=src_path, dest_path=dest_path, extra=extra, callback=cb) data = self._UploadData(src_path=src_path, dest_path=dest_path, extra=extra, callback=cb, retries=retries)
return upload_pool.apply_async(self._do_async_upload, args=(data,)) return upload_pool.apply_async(self._do_async_upload, args=(data,))
else: else:
return self._do_upload(src_path, dest_path, extra, cb, verbose=False) return self._do_upload(src_path, dest_path, extra, cb, verbose=False, retries=retries)
def list(self, prefix=None): def list(self, prefix=None):
""" """
@ -643,7 +662,7 @@ class StorageHelper(object):
except DownloadError as e: except DownloadError as e:
raise raise
except Exception as e: except Exception as e:
self._log.error("Could not download %s , err: %s " % (remote_path, str(e))) self._log.error("Could not download {} , err: {} ".format(remote_path, e))
if delete_on_failure: if delete_on_failure:
try: try:
if temp_local_path: if temp_local_path:
@ -805,7 +824,8 @@ class StorageHelper(object):
def _do_async_upload(self, data): def _do_async_upload(self, data):
assert isinstance(data, self._UploadData) assert isinstance(data, self._UploadData)
return self._do_upload(data.src_path, data.dest_path, data.extra, data.callback, verbose=True) return self._do_upload(data.src_path, data.dest_path, extra=data.extra, cb=data.callback,
verbose=True, retries=data.retries)
def _upload_from_file(self, local_path, dest_path, extra=None): def _upload_from_file(self, local_path, dest_path, extra=None):
if not hasattr(self._driver, 'upload_object'): if not hasattr(self._driver, 'upload_object'):
@ -822,7 +842,7 @@ class StorageHelper(object):
extra=extra) extra=extra)
return res return res
def _do_upload(self, src_path, dest_path, extra=None, cb=None, verbose=False): def _do_upload(self, src_path, dest_path, extra=None, cb=None, verbose=False, retries=1):
object_name = self._normalize_object_name(dest_path) object_name = self._normalize_object_name(dest_path)
if cb: if cb:
try: try:
@ -835,16 +855,23 @@ class StorageHelper(object):
self._log.debug(msg) self._log.debug(msg)
else: else:
self._log.info(msg) self._log.info(msg)
try: last_ex = None
self._upload_from_file(local_path=src_path, dest_path=dest_path, extra=extra) for i in range(max(1, retries)):
except Exception as e: try:
# TODO - exception is xml, need to parse. self._upload_from_file(local_path=src_path, dest_path=dest_path, extra=extra)
self._log.error("Exception encountered while uploading %s" % str(e)) last_ex = None
break
except Exception as e:
last_ex = e
if last_ex:
self._log.error("Exception encountered while uploading %s" % str(last_ex))
try: try:
cb(False) cb(False)
except Exception as e: except Exception as e:
self._log.warning("Exception on upload callback: %s" % str(e)) self._log.warning("Exception on upload callback: %s" % str(e))
raise raise last_ex
if verbose: if verbose:
self._log.debug("Finished upload: %s => %s" % (src_path, object_name)) self._log.debug("Finished upload: %s => %s" % (src_path, object_name))
if cb: if cb: