From 7dc601598b71d80c6c8e4b4b3df08bd668684c26 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 4 Jul 2024 15:26:45 +0300 Subject: [PATCH] Add support for HTTP file upload progress reporting --- clearml/storage/callbacks.py | 2 +- clearml/storage/helper.py | 36 ++++++++++++++++++++---------------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/clearml/storage/callbacks.py b/clearml/storage/callbacks.py index d1b22a68..bb8d5bf4 100644 --- a/clearml/storage/callbacks.py +++ b/clearml/storage/callbacks.py @@ -66,7 +66,7 @@ class ProgressReport(object): unit="MB", unit_scale=False, ncols=80, - bar_format="{bar} {percentage:3.0f}% | {n_fmt}/{total_fmt} MB " + bar_format="{bar} {percentage:3.0f}% | {n:.2f}/{total_fmt} MB " "[{elapsed}<{remaining}, {rate_fmt}{postfix}]: {desc}", ) except Exception: diff --git a/clearml/storage/helper.py b/clearml/storage/helper.py index dd63d275..a2928e03 100644 --- a/clearml/storage/helper.py +++ b/clearml/storage/helper.py @@ -33,7 +33,7 @@ from six import binary_type, StringIO from six.moves.queue import Queue, Empty from six.moves.urllib.parse import urlparse -from clearml.utilities.requests_toolbelt import MultipartEncoder +from clearml.utilities.requests_toolbelt import MultipartEncoderMonitor, MultipartEncoder from .callbacks import UploadProgressReport, DownloadProgressReport from .util import quote_url from ..backend_api.session import Session @@ -180,6 +180,14 @@ class _HttpDriver(_Driver): return self._containers[container_name] def upload_object_via_stream(self, iterator, container, object_name, extra=None, callback=None, **kwargs): + def monitor_callback(monitor): + new_chunk = monitor.bytes_read - monitor.previous_read + monitor.previous_read = monitor.bytes_read + try: + callback(new_chunk) + except Exception as ex: + self.get_logger().debug('Exception raised when running callback function: {}'.format(ex)) + # when sending data in post, there is no connection timeout, just an entire upload timeout timeout = int(self.timeout_total) url = container.name @@ -188,15 +196,7 @@ class _HttpDriver(_Driver): host, _, path = object_name.partition('/') url += host + '/' - m = MultipartEncoder(fields={ - path: (path, iterator, get_file_mimetype(object_name)) - }) - - headers = { - 'Content-Type': m.content_type, - } - headers.update(container.get_headers(url) or {}) - + stream_size = None if hasattr(iterator, 'tell') and hasattr(iterator, 'seek'): pos = iterator.tell() iterator.seek(0, 2) @@ -204,6 +204,16 @@ class _HttpDriver(_Driver): iterator.seek(pos, 0) timeout = max(timeout, (stream_size / 1024) / float(self.min_kbps_speed)) + m = MultipartEncoder(fields={path: (path, iterator, get_file_mimetype(object_name))}) + if callback and stream_size: + m = MultipartEncoderMonitor(m, callback=monitor_callback) + m.previous_read = 0 + + headers = { + 'Content-Type': m.content_type, + } + headers.update(container.get_headers(url) or {}) + res = container.session.post( url, data=m, timeout=timeout, headers=headers ) @@ -211,12 +221,6 @@ class _HttpDriver(_Driver): raise ValueError('Failed uploading object %s (%d): %s' % (object_name, res.status_code, res.text)) # call back is useless because we are not calling it while uploading... - - # if callback and stream_size: - # try: - # callback(stream_size) - # except Exception as ex: - # log.debug('Exception raised when running callback function: %s' % ex) return res def list_container_objects(self, *args, **kwargs):