Add progress report logging for StorageHelper.upload_from_stream, StorageHelper.upload and StorageHelper.upload_from_stream

This commit is contained in:
allegroai 2021-02-21 14:55:04 +02:00
parent 7dbf5471cc
commit 41230ac2c7
2 changed files with 134 additions and 57 deletions

View File

@ -0,0 +1,80 @@
import os
from time import time
from typing import Optional, AnyStr, IO
class ProgressReport(object):
def __init__(self, verbose, total_size, log, report_chunk_size_mb):
self.current_status_mb = 0.
self.last_reported = 0.
self._tic = time()
self._verbose = verbose
self._report_chunk_size = report_chunk_size_mb
self._log = log
self._log_flag = False
self._total_size = total_size
def __call__(self, chunk_size, *_, **__):
chunk_size /= 1024. * 1024.
self.current_status_mb += chunk_size
last_part = self.current_status_mb - self.last_reported
if self._verbose or (last_part >= self._report_chunk_size):
time_diff = time() - self._tic
self.speed = (last_part / time_diff) if time_diff != 0 else 0
self._tic = time()
self.last_reported = self.current_status_mb
self._report(self._total_size, self.current_status_mb, self.speed)
def _report(self, total_mb, current_mb, speed_mbps):
# type: (float, float, float) -> None
pass
class UploadProgressReport(ProgressReport):
def __init__(self, filename, verbose, total_size, log, report_chunk_size_mb=5):
super(UploadProgressReport, self).__init__(verbose, total_size, log, report_chunk_size_mb)
self._filename = filename
def _report(self, total_mb, current_mb, speed_mbps):
# type: (float, float, float) -> None
self._log.info(
'Uploading: %.2fMB / %.2fMB @ %.2fMBs from %s' %
(current_mb, total_mb, speed_mbps, self._filename)
)
@classmethod
def from_stream(cls, stream, filename, verbose, log):
# type: (IO[AnyStr], str, bool, logging.Logger) -> Optional[UploadProgressReport]
if hasattr(stream, 'seek'):
total_size = cls._get_stream_length(stream)
return UploadProgressReport(filename, verbose, total_size, log)
@classmethod
def from_file(cls, filename, verbose, log):
# type: (str, bool, logging.Logger) -> UploadProgressReport
total_size_mb = float(os.path.getsize(filename)) / (1024. * 1024.)
return UploadProgressReport(filename, verbose, total_size_mb, log)
@staticmethod
def _get_stream_length(stream):
# type: (IO[AnyStr]) -> int
current_position = stream.tell()
# seek to end of file
stream.seek(0, 2)
total_length = stream.tell()
# seek back to current position to support
# partially read file-like objects
stream.seek(current_position or 0)
return total_length
class DownloadProgressReport(ProgressReport):
def __init__(self, total_size, verbose, remote_path, log, report_chunk_size_mb=5):
super(DownloadProgressReport, self).__init__(verbose, total_size, log, report_chunk_size_mb)
self._remote_path = remote_path
def _report(self, total_mb, current_mb, speed_mbps):
# type: (float, float, float) -> None
self._log.info('Downloading: %.2fMB / %.2fMB @ %.2fMBs from %s' %
(current_mb, total_mb, speed_mbps, self._remote_path))

View File

@ -30,6 +30,7 @@ from six.moves.queue import Queue, Empty
from six.moves.urllib.parse import urlparse
from six.moves.urllib.request import url2pathname
from .callbacks import UploadProgressReport, DownloadProgressReport
from .util import quote_url
from ..backend_api.utils import get_http_session_with_retry
from ..backend_config.bucket_config import S3BucketConfigurations, GSBucketConfigurations, AzureContainerConfigurations
@ -55,31 +56,6 @@ class DownloadError(Exception):
pass
class _DownloadProgressReport(object):
def __init__(self, total_size, verbose, remote_path, report_chunk_size_mb, log):
self._total_size = total_size
self._verbose = verbose
self.downloaded_mb = 0.
self._report_chunk_size = report_chunk_size_mb
self._log = log
self.last_reported = 0.
self._tic = time()
self._remote_path = remote_path
def __call__(self, chunk_size):
chunk_size /= 1024. * 1024.
self.downloaded_mb += chunk_size
last_part = self.downloaded_mb - self.last_reported
if self._verbose or (last_part >= self._report_chunk_size):
time_diff = time() - self._tic
speed = (last_part / time_diff) if time_diff != 0 else 0
self._tic = time()
self.last_reported = self.downloaded_mb
self._log.info('Downloading: %.0fMB / %.2fMb @ %.2fMbs from %s' %
(self.downloaded_mb, self._total_size, speed, self._remote_path))
@six.add_metaclass(ABCMeta)
class _Driver(object):
@ -534,12 +510,14 @@ class StorageHelper(object):
extra = extra.copy() if extra else {}
extra.update(self._extra)
last_ex = None
cb = UploadProgressReport.from_stream(stream, object_name, self._verbose, self._log)
for i in range(max(1, retries)):
try:
self._driver.upload_object_via_stream(
iterator=stream,
container=self._container,
object_name=object_name,
callback=cb,
extra=extra)
last_ex = None
break
@ -625,7 +603,6 @@ class StorageHelper(object):
def download_to_file(self, remote_path, local_path, overwrite_existing=False, delete_on_failure=True, verbose=None):
def next_chunk(astream):
_tic = time()
if isinstance(astream, binary_type):
chunk = astream
astream = None
@ -636,8 +613,7 @@ class StorageHelper(object):
chunk = None
else:
chunk = None
_tic = time() - _tic
return chunk, astream, _tic
return chunk, astream
remote_path = self._canonize_url(remote_path)
verbose = self._verbose if verbose is None else verbose
@ -699,27 +675,20 @@ class StorageHelper(object):
# if driver supports download with callback, use it (it might be faster)
if hasattr(self._driver, 'download_object'):
# callback
cb = _DownloadProgressReport(total_size_mb, verbose,
remote_path, chunk_size_mb, self._log)
cb = DownloadProgressReport(total_size_mb, verbose,
remote_path, self._log, chunk_size_mb)
self._driver.download_object(obj, temp_local_path, callback=cb)
download_reported = bool(cb.last_reported)
dl_total_mb = cb.downloaded_mb
dl_total_mb = cb.current_status_mb
else:
stream = self._driver.download_object_as_stream(obj, chunk_size_mb * 1024 * 1024)
if stream is None:
raise ValueError('Could not download %s' % remote_path)
with open(temp_local_path, 'wb') as fd:
data, stream, tic = next_chunk(stream)
data, stream = next_chunk(stream)
while data:
fd.write(data)
dl_rate = len(data) / float(1024 * 1024 * tic + 0.000001)
dl_total_mb += len(data) / float(1024 * 1024)
# report download if we are on the second chunk
if verbose or (dl_total_mb * 0.9 > chunk_size_mb):
download_reported = True
self._log.info('Downloading: %.0fMB / %.2fMb @ %.2fMbs from %s' %
(dl_total_mb, total_size_mb, dl_rate, remote_path))
data, stream, tic = next_chunk(stream)
data, stream = next_chunk(stream)
if Path(temp_local_path).stat().st_size <= 0:
raise Exception('downloaded a 0-sized file')
@ -756,7 +725,9 @@ class StorageHelper(object):
remote_path = self._canonize_url(remote_path)
try:
obj = self._get_object(remote_path)
return self._driver.download_object_as_stream(obj, chunk_size=chunk_size)
return self._driver.download_object_as_stream(
obj, chunk_size=chunk_size, verbose=self._verbose, log=self.log
)
except DownloadError:
raise
except Exception as e:
@ -918,10 +889,12 @@ class StorageHelper(object):
object_name = self._normalize_object_name(dest_path)
extra = extra.copy() if extra else {}
extra.update(self._extra)
cb = UploadProgressReport.from_file(local_path, self._verbose, self._log)
res = self._driver.upload_object(
file_path=local_path,
container=self._container,
object_name=object_name,
callback=cb,
extra=extra)
return res
@ -1039,12 +1012,13 @@ class _HttpDriver(_Driver):
self._containers[container_name] = self._Container(name=container_name, retries=self._retries, **kwargs)
return self._containers[container_name]
def upload_object_via_stream(self, iterator, container, object_name, extra=None, **kwargs):
def upload_object_via_stream(self, iterator, container, object_name, extra=None, callback=None, **kwargs):
url = object_name[:object_name.index('/')]
url_path = object_name[len(url) + 1:]
full_url = container.name + url
# when sending data in post, there is no connection timeout, just an entire upload timeout
timeout = self.timeout[-1]
stream_size = 0
if hasattr(iterator, 'tell') and hasattr(iterator, 'seek'):
pos = iterator.tell()
iterator.seek(0, 2)
@ -1056,6 +1030,12 @@ class _HttpDriver(_Driver):
headers=container.get_headers(full_url))
if res.status_code != requests.codes.ok:
raise ValueError('Failed uploading object %s (%d): %s' % (object_name, res.status_code, res.text))
if callback and stream_size:
try:
callback(stream_size)
except Exception as ex:
log.debug('Exception raised when running callback function: %s' % ex)
return res
def list_container_objects(self, *args, **kwargs):
@ -1118,10 +1098,10 @@ class _HttpDriver(_Driver):
def test_upload(self, test_path, config, **kwargs):
return True
def upload_object(self, file_path, container, object_name, extra, **kwargs):
def upload_object(self, file_path, container, object_name, extra, callback=None, **kwargs):
with open(file_path, 'rb') as stream:
return self.upload_object_via_stream(iterator=stream, container=container,
object_name=object_name, extra=extra, **kwargs)
object_name=object_name, extra=extra, callback=callback, **kwargs)
class _Stream(object):
@ -1291,26 +1271,29 @@ class _Boto3Driver(_Driver):
self._containers[container_name].config.retries = kwargs.get('retries', 5)
return self._containers[container_name]
def upload_object_via_stream(self, iterator, container, object_name, extra=None, **kwargs):
def upload_object_via_stream(self, iterator, container, object_name, callback=None, extra=None, **kwargs):
import boto3.s3.transfer
stream = _Stream(iterator)
try:
container.bucket.upload_fileobj(stream, object_name, Config=boto3.s3.transfer.TransferConfig(
use_threads=container.config.multipart,
max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1,
num_download_attempts=container.config.retries))
num_download_attempts=container.config.retries),
Callback=callback,
)
except Exception as ex:
log.error('Failed uploading: %s' % ex)
return False
return True
def upload_object(self, file_path, container, object_name, extra=None, **kwargs):
def upload_object(self, file_path, container, object_name, callback=None, extra=None, **kwargs):
import boto3.s3.transfer
try:
container.bucket.upload_file(file_path, object_name, Config=boto3.s3.transfer.TransferConfig(
use_threads=container.config.multipart,
max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1,
num_download_attempts=container.config.retries))
num_download_attempts=container.config.retries),
Callback=callback)
except Exception as ex:
log.error('Failed uploading: %s' % ex)
return False
@ -1341,10 +1324,10 @@ class _Boto3Driver(_Driver):
obj.container_name = full_container_name
return obj
def download_object_as_stream(self, obj, chunk_size=64 * 1024, **_):
def async_download(a_obj, a_stream, cfg):
def download_object_as_stream(self, obj, chunk_size=64 * 1024, verbose=None, log=None, **_):
def async_download(a_obj, a_stream, cb, cfg):
try:
a_obj.download_fileobj(a_stream, Config=cfg)
a_obj.download_fileobj(a_stream, Callback=cb, Config=cfg)
except Exception as ex:
log.error('Failed downloading: %s' % ex)
a_stream.close()
@ -1357,7 +1340,10 @@ class _Boto3Driver(_Driver):
use_threads=container.config.multipart,
max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1,
num_download_attempts=container.config.retries)
self._get_stream_download_pool().submit(async_download, obj, stream, config)
total_size_mb = obj.content_length / (1024. * 1024.)
remote_path = os.path.join(obj.container_name, obj.key)
cb = DownloadProgressReport(total_size_mb, verbose, remote_path, log)
self._get_stream_download_pool().submit(async_download, obj, stream, cb, config)
return stream
@ -1634,7 +1620,7 @@ class _AzureBlobServiceStorageDriver(_Driver):
# self._containers[container_name].config.retries = kwargs.get('retries', 5)
return self._containers[container_name]
def upload_object_via_stream(self, iterator, container, object_name, extra=None, **kwargs):
def upload_object_via_stream(self, iterator, container, object_name, callback=None, extra=None, **kwargs):
from azure.common import AzureHttpError
blob_name = self._blob_name_from_object_path(object_name, container.name) # noqa: F841
@ -1647,6 +1633,7 @@ class _AzureBlobServiceStorageDriver(_Driver):
iterator.read() if hasattr(iterator, "read") else bytes(iterator),
# timeout=300,
max_connections=2,
progress_callback=callback,
)
return True
except AzureHttpError as ex:
@ -1655,7 +1642,7 @@ class _AzureBlobServiceStorageDriver(_Driver):
log.error('Failed uploading: %s' % ex)
return False
def upload_object(self, file_path, container, object_name, extra=None, **kwargs):
def upload_object(self, file_path, container, object_name, callback=None, extra=None, **kwargs):
from azure.common import AzureHttpError
blob_name = self._blob_name_from_object_path(object_name, container.name)
@ -1671,7 +1658,8 @@ class _AzureBlobServiceStorageDriver(_Driver):
file_path,
# timeout=300,
max_connections=2,
content_settings=ContentSettings(content_type=guess_type(file_path))
content_settings=ContentSettings(content_type=guess_type(file_path)),
progress_callback=callback,
)
return True
except AzureHttpError as ex:
@ -1703,11 +1691,20 @@ class _AzureBlobServiceStorageDriver(_Driver):
return self._Object(container=container, blob_name=blob.name, content_length=blob.properties.content_length)
def download_object_as_stream(self, obj, *_, **__):
def download_object_as_stream(self, obj, verbose, *_, **__):
container = obj.container
total_size_mb = obj.content_length / (1024. * 1024.)
remote_path = os.path.join(
"{}://".format(self.scheme),
container.config.account_name,
container.name,
obj.blob_name
)
cb = DownloadProgressReport(total_size_mb, verbose, remote_path, log)
blob = container.blob_service.get_blob_to_bytes(
container.name,
container.blob_name,
obj.blob_name,
progress_callback=cb,
)
return blob.content