Report upload/download with tqdm if installed

This commit is contained in:
allegroai 2024-01-18 16:06:46 +02:00
parent cc4a9912bf
commit 0e0763d566
2 changed files with 145 additions and 38 deletions

View File

@ -4,12 +4,19 @@ from time import time
from typing import Optional, AnyStr, IO
from ..config import config
try:
from tqdm import tqdm # noqa
except ImportError:
tqdm = None
class ProgressReport(object):
report_upload_chunk_size_mb = None
report_download_chunk_size_mb = None
def __init__(self, verbose, total_size, log, report_chunk_size_mb):
def __init__(self, verbose, total_size, log, report_chunk_size_mb,
description_prefix=None, description_suffix=None,
max_time_between_reports_sec=10.0, report_start=None):
self.current_status_mb = 0.
self.last_reported = 0.
self._tic = time()
@ -18,45 +25,117 @@ class ProgressReport(object):
self._log = log
self._log_flag = False
self._total_size = total_size
self._description_prefix = description_prefix
self._description_suffix = description_suffix
self._max_time_between_reports_sec = max_time_between_reports_sec
self._report_start = report_start if report_start is not None else bool(tqdm is not None)
self._tqdm = None
self._tqdm_init = False
def close(self, report_completed=False, report_summary=False, report_prefix=None, report_suffix=None):
# call this one when we are done
if self._tqdm is not None:
# if we created a self._tqdm object we need to close it
if report_completed:
self._tqdm.update(
self._tqdm.total - min(self._tqdm.total, self.last_reported)
)
self._tqdm.close()
self._tqdm = None
if report_summary:
self._log.info(
'{} {:.2f} MB successfully {}'.format(
report_prefix or self._description_prefix, self._total_size,
report_suffix or self._description_suffix).strip()
)
def _get_tqdm(self):
if self._tqdm_init:
return self._tqdm
self._tqdm_init = True
# create the tqdm progress bar
if tqdm:
# noinspection PyBroadException
try:
self._tqdm = tqdm(
total=round(float(self._total_size), 2),
# desc="{} {}".format(description_prefix, description_suffix).strip(),
unit="MB",
unit_scale=False,
ncols=80,
bar_format="{bar} {percentage:3.0f}% | {n_fmt}/{total_fmt} MB "
"[{elapsed}<{remaining}, {rate_fmt}{postfix}]: {desc}",
)
except Exception:
# failed initializing TQDM (maybe interface changed?)
self._tqdm = None
return self._tqdm
def __call__(self, chunk_size, *_, **__):
chunk_size /= 1024. * 1024.
self.current_status_mb += chunk_size
last_part = self.current_status_mb - self.last_reported
if self._verbose or (last_part >= self._report_chunk_size):
if (self._verbose or (last_part >= self._report_chunk_size) or
(self.last_reported and self.current_status_mb >= self._total_size-0.01) or
(time()-self._tic > self._max_time_between_reports_sec)):
time_diff = time() - self._tic
self.speed = (last_part / time_diff) if time_diff != 0 else 0
self._report(self._total_size, self.current_status_mb, self.speed)
self._tic = time()
self.last_reported = self.current_status_mb
self._report(self._total_size, self.current_status_mb, self.speed)
def _report(self, total_mb, current_mb, speed_mbps):
# type: (float, float, float) -> None
pass
if self._report_start and self.last_reported <= 0:
# first time - print before initializing the tqdm bar
self._log.info(
"{}: {:.2f}MB {}".format(
self._description_prefix, total_mb, self._description_suffix).strip(" :")
)
# initialize or reuse the bar
_tqdm = self._get_tqdm()
if _tqdm:
# make sure we do not spill over due to rounding
if round(float(current_mb), 2) >= _tqdm.total:
_tqdm.update(_tqdm.total - self.last_reported)
else:
_tqdm.update(current_mb - self.last_reported)
else:
self._log.info(
"{}: {:.2f}MB / {:.2f}MB @ {:.2f}MBs {}".format(
self._description_prefix,
current_mb,
total_mb,
speed_mbps,
self._description_suffix
).strip(" :")
)
class UploadProgressReport(ProgressReport):
def __init__(self, filename, verbose, total_size, log, report_chunk_size_mb=None):
def __init__(self, filename, verbose, total_size, log, report_chunk_size_mb=None, report_start=None):
report_chunk_size_mb = report_chunk_size_mb if report_chunk_size_mb is not None \
else ProgressReport.report_upload_chunk_size_mb or \
int(config.get("storage.log.report_upload_chunk_size_mb", 5))
super(UploadProgressReport, self).__init__(verbose, total_size, log, report_chunk_size_mb)
self._filename = filename
def _report(self, total_mb, current_mb, speed_mbps):
# type: (float, float, float) -> None
self._log.info(
'Uploading: %.2fMB / %.2fMB @ %.2fMBs from %s' %
(current_mb, total_mb, speed_mbps, self._filename)
super(UploadProgressReport, self).__init__(
verbose, total_size, log, report_chunk_size_mb,
description_prefix="Uploading", description_suffix="to {}".format(filename),
report_start=report_start,
)
self._filename = filename
@classmethod
def from_stream(cls, stream, filename, verbose, log):
# type: (IO[AnyStr], str, bool, logging.Logger) -> Optional[UploadProgressReport]
if hasattr(stream, 'seek'):
total_size = cls._get_stream_length(stream)
return UploadProgressReport(filename, verbose, total_size, log)
total_size_mb = cls._get_stream_length(stream) // (1024 * 1024)
return UploadProgressReport(filename, verbose, total_size_mb, log)
@classmethod
def from_file(cls, filename, verbose, log):
@ -78,14 +157,13 @@ class UploadProgressReport(ProgressReport):
class DownloadProgressReport(ProgressReport):
def __init__(self, total_size, verbose, remote_path, log, report_chunk_size_mb=None):
def __init__(self, total_size, verbose, remote_path, log, report_chunk_size_mb=None, report_start=None):
report_chunk_size_mb = report_chunk_size_mb if report_chunk_size_mb is not None \
else ProgressReport.report_download_chunk_size_mb or \
int(config.get("storage.log.report_download_chunk_size_mb", 5))
super(DownloadProgressReport, self).__init__(verbose, total_size, log, report_chunk_size_mb)
super(DownloadProgressReport, self).__init__(
verbose, total_size, log, report_chunk_size_mb,
description_prefix="Downloading", description_suffix="from {}".format(remote_path),
report_start=report_start,
)
self._remote_path = remote_path
def _report(self, total_mb, current_mb, speed_mbps):
# type: (float, float, float) -> None
self._log.info('Downloading: %.2fMB / %.2fMB @ %.2fMBs from %s' %
(current_mb, total_mb, speed_mbps, self._remote_path))

View File

@ -615,7 +615,11 @@ class _Boto3Driver(_Driver):
def async_download(a_obj, a_stream, cb, cfg):
try:
a_obj.download_fileobj(a_stream, Callback=cb, Config=cfg)
if cb:
cb.close(report_completed=True)
except Exception as ex:
if cb:
cb.close()
(log or self.get_logger()).error('Failed downloading: %s' % ex)
a_stream.close()
@ -780,8 +784,8 @@ class _GoogleCloudStorageDriver(_Driver):
class _Container(object):
def __init__(self, name, cfg):
try:
from google.cloud import storage
from google.oauth2 import service_account
from google.cloud import storage # noqa
from google.oauth2 import service_account # noqa
except ImportError:
raise UsageError(
'Google cloud driver not found. '
@ -862,7 +866,7 @@ class _GoogleCloudStorageDriver(_Driver):
object.delete()
except Exception as ex:
try:
from google.cloud.exceptions import NotFound
from google.cloud.exceptions import NotFound # noqa
if isinstance(ex, NotFound):
return False
except ImportError:
@ -949,7 +953,7 @@ class _AzureBlobServiceStorageDriver(_Driver):
except ImportError:
try:
from azure.storage.blob import BlockBlobService # noqa
from azure.common import AzureHttpError # noqa: F401
from azure.common import AzureHttpError # noqa
self.__legacy = True
except ImportError:
@ -1193,6 +1197,7 @@ class _AzureBlobServiceStorageDriver(_Driver):
obj.blob_name,
progress_callback=cb,
)
cb.close()
if container.is_legacy():
return blob.content
else:
@ -1663,7 +1668,7 @@ class _FileStorageDriver(_Driver):
try:
os.unlink(path)
except Exception:
except Exception: # noqa
return False
# # Check and delete all the empty parent folders
@ -1767,14 +1772,14 @@ class _FileStorageDriver(_Driver):
if six.PY3:
from io import FileIO as file
if isinstance(iterator, (file)):
if isinstance(iterator, file):
get_data = iterator.read
args = (chunk_size,)
else:
get_data = next
args = (iterator,)
data = bytes('')
data = bytes(b'')
empty = False
while not empty or len(data) > 0:
@ -2320,7 +2325,7 @@ class StorageHelper(object):
return self._get_object_size_bytes(obj, silence_errors)
def _get_object_size_bytes(self, obj, silence_errors=False):
# type: (object) -> [int, None]
# type: (object, bool) -> [int, None]
"""
Auxiliary function for `get_object_size_bytes`.
Get size of the remote object in bytes.
@ -2448,6 +2453,10 @@ class StorageHelper(object):
stream.seek(0)
except Exception:
pass
if cb:
cb.close(report_completed=not bool(last_ex))
if last_ex:
raise last_ex
@ -2601,9 +2610,10 @@ class StorageHelper(object):
return direct_access_path
temp_local_path = None
cb = None
try:
if verbose:
self._log.info('Start downloading from %s' % remote_path)
self._log.info("Start downloading from {}".format(remote_path))
if not overwrite_existing and Path(local_path).is_file():
self._log.debug(
'File {} already exists, no need to download, thread id = {}'.format(
@ -2643,8 +2653,9 @@ class StorageHelper(object):
# if driver supports download with callback, use it (it might be faster)
if hasattr(self._driver, 'download_object'):
# callback
cb = DownloadProgressReport(total_size_mb, verbose, remote_path, self._log)
# callback if verbose we already reported download start, no need to do that again
cb = DownloadProgressReport(total_size_mb, verbose, remote_path, self._log,
report_start=True if verbose else None)
self._driver.download_object(obj, temp_local_path, callback=cb)
download_reported = bool(cb.last_reported)
dl_total_mb = cb.current_status_mb
@ -2686,15 +2697,28 @@ class StorageHelper(object):
raise Exception('Failed renaming partial file, downloaded file exists and a 0-sized file')
# report download if we are on the second chunk
if verbose or download_reported:
if cb:
cb.close(
report_completed=True,
report_summary=verbose or download_reported,
report_prefix="Downloaded",
report_suffix="from {} , saved to {}".format(remote_path, local_path)
)
elif verbose or download_reported:
self._log.info(
'Downloaded %.2f MB successfully from %s , saved to %s' % (dl_total_mb, remote_path, local_path))
"Downloaded {:.2f} MB successfully from {} , saved to {}".format(
dl_total_mb, remote_path, local_path)
)
return local_path
except DownloadError:
if cb:
cb.close()
raise
except Exception as e:
if cb:
cb.close()
self._log.error("Could not download {} , err: {} ".format(remote_path, e))
if delete_on_failure:
if delete_on_failure and temp_local_path:
# noinspection PyBroadException
try:
os.remove(temp_local_path)
@ -2880,7 +2904,9 @@ class StorageHelper(object):
def _do_async_upload(self, data):
assert isinstance(data, self._UploadData)
return self._do_upload(data.src_path, data.dest_path, data.canonized_dest_path, extra=data.extra, cb=data.callback, verbose=True, retries=data.retries, return_canonized=data.return_canonized)
return self._do_upload(data.src_path, data.dest_path, data.canonized_dest_path,
extra=data.extra, cb=data.callback, verbose=True,
retries=data.retries, return_canonized=data.return_canonized)
def _upload_from_file(self, local_path, dest_path, extra=None):
if not hasattr(self._driver, 'upload_object'):
@ -2897,9 +2923,12 @@ class StorageHelper(object):
object_name=object_name,
callback=cb,
extra=extra)
if cb:
cb.close()
return res
def _do_upload(self, src_path, dest_path, canonized_dest_path, extra=None, cb=None, verbose=False, retries=1, return_canonized=False):
def _do_upload(self, src_path, dest_path, canonized_dest_path,
extra=None, cb=None, verbose=False, retries=1, return_canonized=False):
object_name = self._normalize_object_name(canonized_dest_path)
if cb:
try: