From 0e0763d566c9979827cd130c883e70b0b1c809ff Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 18 Jan 2024 16:06:46 +0200 Subject: [PATCH] Report upload/download with tqdm if installed --- clearml/storage/callbacks.py | 122 ++++++++++++++++++++++++++++------- clearml/storage/helper.py | 61 +++++++++++++----- 2 files changed, 145 insertions(+), 38 deletions(-) diff --git a/clearml/storage/callbacks.py b/clearml/storage/callbacks.py index 20d94f2f..d1b22a68 100644 --- a/clearml/storage/callbacks.py +++ b/clearml/storage/callbacks.py @@ -4,12 +4,19 @@ from time import time from typing import Optional, AnyStr, IO from ..config import config +try: + from tqdm import tqdm # noqa +except ImportError: + tqdm = None + class ProgressReport(object): report_upload_chunk_size_mb = None report_download_chunk_size_mb = None - def __init__(self, verbose, total_size, log, report_chunk_size_mb): + def __init__(self, verbose, total_size, log, report_chunk_size_mb, + description_prefix=None, description_suffix=None, + max_time_between_reports_sec=10.0, report_start=None): self.current_status_mb = 0. self.last_reported = 0. self._tic = time() @@ -18,45 +25,117 @@ class ProgressReport(object): self._log = log self._log_flag = False self._total_size = total_size + self._description_prefix = description_prefix + self._description_suffix = description_suffix + self._max_time_between_reports_sec = max_time_between_reports_sec + self._report_start = report_start if report_start is not None else bool(tqdm is not None) + self._tqdm = None + self._tqdm_init = False + + def close(self, report_completed=False, report_summary=False, report_prefix=None, report_suffix=None): + # call this one when we are done + if self._tqdm is not None: + # if we created a self._tqdm object we need to close it + if report_completed: + self._tqdm.update( + self._tqdm.total - min(self._tqdm.total, self.last_reported) + ) + self._tqdm.close() + self._tqdm = None + + if report_summary: + self._log.info( + '{} {:.2f} MB successfully {}'.format( + report_prefix or self._description_prefix, self._total_size, + report_suffix or self._description_suffix).strip() + ) + + def _get_tqdm(self): + if self._tqdm_init: + return self._tqdm + + self._tqdm_init = True + + # create the tqdm progress bar + if tqdm: + # noinspection PyBroadException + try: + self._tqdm = tqdm( + total=round(float(self._total_size), 2), + # desc="{} {}".format(description_prefix, description_suffix).strip(), + unit="MB", + unit_scale=False, + ncols=80, + bar_format="{bar} {percentage:3.0f}% | {n_fmt}/{total_fmt} MB " + "[{elapsed}<{remaining}, {rate_fmt}{postfix}]: {desc}", + ) + except Exception: + # failed initializing TQDM (maybe interface changed?) + self._tqdm = None + + return self._tqdm def __call__(self, chunk_size, *_, **__): chunk_size /= 1024. * 1024. self.current_status_mb += chunk_size last_part = self.current_status_mb - self.last_reported - if self._verbose or (last_part >= self._report_chunk_size): + if (self._verbose or (last_part >= self._report_chunk_size) or + (self.last_reported and self.current_status_mb >= self._total_size-0.01) or + (time()-self._tic > self._max_time_between_reports_sec)): time_diff = time() - self._tic self.speed = (last_part / time_diff) if time_diff != 0 else 0 + self._report(self._total_size, self.current_status_mb, self.speed) self._tic = time() self.last_reported = self.current_status_mb - self._report(self._total_size, self.current_status_mb, self.speed) def _report(self, total_mb, current_mb, speed_mbps): # type: (float, float, float) -> None - pass + if self._report_start and self.last_reported <= 0: + # first time - print before initializing the tqdm bar + self._log.info( + "{}: {:.2f}MB {}".format( + self._description_prefix, total_mb, self._description_suffix).strip(" :") + ) + + # initialize or reuse the bar + _tqdm = self._get_tqdm() + if _tqdm: + # make sure we do not spill over due to rounding + if round(float(current_mb), 2) >= _tqdm.total: + _tqdm.update(_tqdm.total - self.last_reported) + else: + _tqdm.update(current_mb - self.last_reported) + else: + self._log.info( + "{}: {:.2f}MB / {:.2f}MB @ {:.2f}MBs {}".format( + self._description_prefix, + current_mb, + total_mb, + speed_mbps, + self._description_suffix + ).strip(" :") + ) class UploadProgressReport(ProgressReport): - def __init__(self, filename, verbose, total_size, log, report_chunk_size_mb=None): + def __init__(self, filename, verbose, total_size, log, report_chunk_size_mb=None, report_start=None): report_chunk_size_mb = report_chunk_size_mb if report_chunk_size_mb is not None \ else ProgressReport.report_upload_chunk_size_mb or \ int(config.get("storage.log.report_upload_chunk_size_mb", 5)) - super(UploadProgressReport, self).__init__(verbose, total_size, log, report_chunk_size_mb) - self._filename = filename - - def _report(self, total_mb, current_mb, speed_mbps): - # type: (float, float, float) -> None - self._log.info( - 'Uploading: %.2fMB / %.2fMB @ %.2fMBs from %s' % - (current_mb, total_mb, speed_mbps, self._filename) + super(UploadProgressReport, self).__init__( + verbose, total_size, log, report_chunk_size_mb, + description_prefix="Uploading", description_suffix="to {}".format(filename), + report_start=report_start, ) + self._filename = filename @classmethod def from_stream(cls, stream, filename, verbose, log): # type: (IO[AnyStr], str, bool, logging.Logger) -> Optional[UploadProgressReport] if hasattr(stream, 'seek'): - total_size = cls._get_stream_length(stream) - return UploadProgressReport(filename, verbose, total_size, log) + total_size_mb = cls._get_stream_length(stream) // (1024 * 1024) + return UploadProgressReport(filename, verbose, total_size_mb, log) @classmethod def from_file(cls, filename, verbose, log): @@ -78,14 +157,13 @@ class UploadProgressReport(ProgressReport): class DownloadProgressReport(ProgressReport): - def __init__(self, total_size, verbose, remote_path, log, report_chunk_size_mb=None): + def __init__(self, total_size, verbose, remote_path, log, report_chunk_size_mb=None, report_start=None): report_chunk_size_mb = report_chunk_size_mb if report_chunk_size_mb is not None \ else ProgressReport.report_download_chunk_size_mb or \ int(config.get("storage.log.report_download_chunk_size_mb", 5)) - super(DownloadProgressReport, self).__init__(verbose, total_size, log, report_chunk_size_mb) + super(DownloadProgressReport, self).__init__( + verbose, total_size, log, report_chunk_size_mb, + description_prefix="Downloading", description_suffix="from {}".format(remote_path), + report_start=report_start, + ) self._remote_path = remote_path - - def _report(self, total_mb, current_mb, speed_mbps): - # type: (float, float, float) -> None - self._log.info('Downloading: %.2fMB / %.2fMB @ %.2fMBs from %s' % - (current_mb, total_mb, speed_mbps, self._remote_path)) diff --git a/clearml/storage/helper.py b/clearml/storage/helper.py index 8fca9c85..5da873db 100644 --- a/clearml/storage/helper.py +++ b/clearml/storage/helper.py @@ -615,7 +615,11 @@ class _Boto3Driver(_Driver): def async_download(a_obj, a_stream, cb, cfg): try: a_obj.download_fileobj(a_stream, Callback=cb, Config=cfg) + if cb: + cb.close(report_completed=True) except Exception as ex: + if cb: + cb.close() (log or self.get_logger()).error('Failed downloading: %s' % ex) a_stream.close() @@ -780,8 +784,8 @@ class _GoogleCloudStorageDriver(_Driver): class _Container(object): def __init__(self, name, cfg): try: - from google.cloud import storage - from google.oauth2 import service_account + from google.cloud import storage # noqa + from google.oauth2 import service_account # noqa except ImportError: raise UsageError( 'Google cloud driver not found. ' @@ -862,7 +866,7 @@ class _GoogleCloudStorageDriver(_Driver): object.delete() except Exception as ex: try: - from google.cloud.exceptions import NotFound + from google.cloud.exceptions import NotFound # noqa if isinstance(ex, NotFound): return False except ImportError: @@ -949,7 +953,7 @@ class _AzureBlobServiceStorageDriver(_Driver): except ImportError: try: from azure.storage.blob import BlockBlobService # noqa - from azure.common import AzureHttpError # noqa: F401 + from azure.common import AzureHttpError # noqa self.__legacy = True except ImportError: @@ -1193,6 +1197,7 @@ class _AzureBlobServiceStorageDriver(_Driver): obj.blob_name, progress_callback=cb, ) + cb.close() if container.is_legacy(): return blob.content else: @@ -1663,7 +1668,7 @@ class _FileStorageDriver(_Driver): try: os.unlink(path) - except Exception: + except Exception: # noqa return False # # Check and delete all the empty parent folders @@ -1767,14 +1772,14 @@ class _FileStorageDriver(_Driver): if six.PY3: from io import FileIO as file - if isinstance(iterator, (file)): + if isinstance(iterator, file): get_data = iterator.read args = (chunk_size,) else: get_data = next args = (iterator,) - data = bytes('') + data = bytes(b'') empty = False while not empty or len(data) > 0: @@ -2320,7 +2325,7 @@ class StorageHelper(object): return self._get_object_size_bytes(obj, silence_errors) def _get_object_size_bytes(self, obj, silence_errors=False): - # type: (object) -> [int, None] + # type: (object, bool) -> [int, None] """ Auxiliary function for `get_object_size_bytes`. Get size of the remote object in bytes. @@ -2448,6 +2453,10 @@ class StorageHelper(object): stream.seek(0) except Exception: pass + + if cb: + cb.close(report_completed=not bool(last_ex)) + if last_ex: raise last_ex @@ -2601,9 +2610,10 @@ class StorageHelper(object): return direct_access_path temp_local_path = None + cb = None try: if verbose: - self._log.info('Start downloading from %s' % remote_path) + self._log.info("Start downloading from {}".format(remote_path)) if not overwrite_existing and Path(local_path).is_file(): self._log.debug( 'File {} already exists, no need to download, thread id = {}'.format( @@ -2643,8 +2653,9 @@ class StorageHelper(object): # if driver supports download with callback, use it (it might be faster) if hasattr(self._driver, 'download_object'): - # callback - cb = DownloadProgressReport(total_size_mb, verbose, remote_path, self._log) + # callback if verbose we already reported download start, no need to do that again + cb = DownloadProgressReport(total_size_mb, verbose, remote_path, self._log, + report_start=True if verbose else None) self._driver.download_object(obj, temp_local_path, callback=cb) download_reported = bool(cb.last_reported) dl_total_mb = cb.current_status_mb @@ -2686,15 +2697,28 @@ class StorageHelper(object): raise Exception('Failed renaming partial file, downloaded file exists and a 0-sized file') # report download if we are on the second chunk - if verbose or download_reported: + if cb: + cb.close( + report_completed=True, + report_summary=verbose or download_reported, + report_prefix="Downloaded", + report_suffix="from {} , saved to {}".format(remote_path, local_path) + ) + elif verbose or download_reported: self._log.info( - 'Downloaded %.2f MB successfully from %s , saved to %s' % (dl_total_mb, remote_path, local_path)) + "Downloaded {:.2f} MB successfully from {} , saved to {}".format( + dl_total_mb, remote_path, local_path) + ) return local_path except DownloadError: + if cb: + cb.close() raise except Exception as e: + if cb: + cb.close() self._log.error("Could not download {} , err: {} ".format(remote_path, e)) - if delete_on_failure: + if delete_on_failure and temp_local_path: # noinspection PyBroadException try: os.remove(temp_local_path) @@ -2880,7 +2904,9 @@ class StorageHelper(object): def _do_async_upload(self, data): assert isinstance(data, self._UploadData) - return self._do_upload(data.src_path, data.dest_path, data.canonized_dest_path, extra=data.extra, cb=data.callback, verbose=True, retries=data.retries, return_canonized=data.return_canonized) + return self._do_upload(data.src_path, data.dest_path, data.canonized_dest_path, + extra=data.extra, cb=data.callback, verbose=True, + retries=data.retries, return_canonized=data.return_canonized) def _upload_from_file(self, local_path, dest_path, extra=None): if not hasattr(self._driver, 'upload_object'): @@ -2897,9 +2923,12 @@ class StorageHelper(object): object_name=object_name, callback=cb, extra=extra) + if cb: + cb.close() return res - def _do_upload(self, src_path, dest_path, canonized_dest_path, extra=None, cb=None, verbose=False, retries=1, return_canonized=False): + def _do_upload(self, src_path, dest_path, canonized_dest_path, + extra=None, cb=None, verbose=False, retries=1, return_canonized=False): object_name = self._normalize_object_name(canonized_dest_path) if cb: try: