diff --git a/trains/backend_api/utils.py b/trains/backend_api/utils.py index 1e7fede3..e8ce392d 100644 --- a/trains/backend_api/utils.py +++ b/trains/backend_api/utils.py @@ -39,8 +39,10 @@ class _RetryFilter(logging.Filter): if record.args and len(record.args) > 0 and isinstance(record.args[0], Retry): left = (record.args[0].total, record.args[0].connect, record.args[0].read, record.args[0].redirect, record.args[0].status) - retry_left = max(left) - min(left) - return retry_left >= self.display_warning_after + left = [l for l in left if isinstance(l, int)] + if left: + retry_left = max(left) - min(left) + return retry_left >= self.display_warning_after return True diff --git a/trains/backend_interface/metrics/interface.py b/trains/backend_interface/metrics/interface.py index ccafb7c6..09f2d3d7 100644 --- a/trains/backend_interface/metrics/interface.py +++ b/trains/backend_interface/metrics/interface.py @@ -25,6 +25,7 @@ class Metrics(InterfaceBase): """ Metrics manager and batch writer """ _storage_lock = Lock() _file_upload_starvation_warning_sec = config.get('network.metrics.file_upload_starvation_warning_sec', None) + _file_upload_retries = 3 @property def storage_key_prefix(self): @@ -149,7 +150,7 @@ class Metrics(InterfaceBase): try: storage = self._get_storage(upload_uri) - url = storage.upload_from_stream(e.stream, e.url) + url = storage.upload_from_stream(e.stream, e.url, retries=self._file_upload_retries) e.event.update(url=url) except Exception as exp: log.warning("Failed uploading to {} ({})".format( diff --git a/trains/binding/artifacts.py b/trains/binding/artifacts.py index 5be9b90d..09531f71 100644 --- a/trains/binding/artifacts.py +++ b/trains/binding/artifacts.py @@ -7,8 +7,8 @@ from copy import deepcopy from datetime import datetime from multiprocessing.pool import ThreadPool from tempfile import mkdtemp, mkstemp -from threading import Thread, Event -from multiprocessing import RLock +from threading import Thread +from multiprocessing import RLock, Event from time import time import humanfriendly diff --git a/trains/storage/helper.py b/trains/storage/helper.py index bd2c5566..4a828961 100644 --- a/trains/storage/helper.py +++ b/trains/storage/helper.py @@ -148,11 +148,16 @@ class StorageHelper(object): def callback(self): return self._callback - def __init__(self, src_path, dest_path, extra, callback): + @property + def retries(self): + return self._retries + + def __init__(self, src_path, dest_path, extra, callback, retries): self._src_path = src_path self._dest_path = dest_path self._extra = extra self._callback = callback + self._retries = retries def __str__(self): return "src=%s" % self.src_path @@ -472,30 +477,44 @@ class StorageHelper(object): return folder_uri - def upload_from_stream(self, stream, dest_path, extra=None): + def upload_from_stream(self, stream, dest_path, extra=None, retries=1): dest_path = self._canonize_url(dest_path) object_name = self._normalize_object_name(dest_path) extra = extra.copy() if extra else {} extra.update(self._extra) - self._driver.upload_object_via_stream( - iterator=stream, - container=self._container, - object_name=object_name, - extra=extra) - + last_ex = None + for i in range(max(1, retries)): + try: + self._driver.upload_object_via_stream( + iterator=stream, + container=self._container, + object_name=object_name, + extra=extra) + last_ex = None + break + except Exception as ex: + last_ex = ex + # seek to beginning if possible + try: + stream.seek(0) + except: + pass + if last_ex: + raise last_ex + return dest_path - def upload(self, src_path, dest_path=None, extra=None, async_enable=False, cb=None): + def upload(self, src_path, dest_path=None, extra=None, async_enable=False, cb=None, retries=1): if not dest_path: dest_path = os.path.basename(src_path) dest_path = self._canonize_url(dest_path) if async_enable: - data = self._UploadData(src_path=src_path, dest_path=dest_path, extra=extra, callback=cb) + data = self._UploadData(src_path=src_path, dest_path=dest_path, extra=extra, callback=cb, retries=retries) return upload_pool.apply_async(self._do_async_upload, args=(data,)) else: - return self._do_upload(src_path, dest_path, extra, cb, verbose=False) + return self._do_upload(src_path, dest_path, extra, cb, verbose=False, retries=retries) def list(self, prefix=None): """ @@ -643,7 +662,7 @@ class StorageHelper(object): except DownloadError as e: raise except Exception as e: - self._log.error("Could not download %s , err: %s " % (remote_path, str(e))) + self._log.error("Could not download {} , err: {} ".format(remote_path, e)) if delete_on_failure: try: if temp_local_path: @@ -805,7 +824,8 @@ class StorageHelper(object): def _do_async_upload(self, data): assert isinstance(data, self._UploadData) - return self._do_upload(data.src_path, data.dest_path, data.extra, data.callback, verbose=True) + return self._do_upload(data.src_path, data.dest_path, extra=data.extra, cb=data.callback, + verbose=True, retries=data.retries) def _upload_from_file(self, local_path, dest_path, extra=None): if not hasattr(self._driver, 'upload_object'): @@ -822,7 +842,7 @@ class StorageHelper(object): extra=extra) return res - def _do_upload(self, src_path, dest_path, extra=None, cb=None, verbose=False): + def _do_upload(self, src_path, dest_path, extra=None, cb=None, verbose=False, retries=1): object_name = self._normalize_object_name(dest_path) if cb: try: @@ -835,16 +855,23 @@ class StorageHelper(object): self._log.debug(msg) else: self._log.info(msg) - try: - self._upload_from_file(local_path=src_path, dest_path=dest_path, extra=extra) - except Exception as e: - # TODO - exception is xml, need to parse. - self._log.error("Exception encountered while uploading %s" % str(e)) + last_ex = None + for i in range(max(1, retries)): + try: + self._upload_from_file(local_path=src_path, dest_path=dest_path, extra=extra) + last_ex = None + break + except Exception as e: + last_ex = e + + if last_ex: + self._log.error("Exception encountered while uploading %s" % str(last_ex)) try: cb(False) except Exception as e: self._log.warning("Exception on upload callback: %s" % str(e)) - raise + raise last_ex + if verbose: self._log.debug("Finished upload: %s => %s" % (src_path, object_name)) if cb: