Add support for HTTP file upload progress reporting

This commit is contained in:
allegroai 2024-07-04 15:26:45 +03:00
parent 3fa5409283
commit 7dc601598b
2 changed files with 21 additions and 17 deletions

View File

@ -66,7 +66,7 @@ class ProgressReport(object):
unit="MB", unit="MB",
unit_scale=False, unit_scale=False,
ncols=80, ncols=80,
bar_format="{bar} {percentage:3.0f}% | {n_fmt}/{total_fmt} MB " bar_format="{bar} {percentage:3.0f}% | {n:.2f}/{total_fmt} MB "
"[{elapsed}<{remaining}, {rate_fmt}{postfix}]: {desc}", "[{elapsed}<{remaining}, {rate_fmt}{postfix}]: {desc}",
) )
except Exception: except Exception:

View File

@ -33,7 +33,7 @@ from six import binary_type, StringIO
from six.moves.queue import Queue, Empty from six.moves.queue import Queue, Empty
from six.moves.urllib.parse import urlparse from six.moves.urllib.parse import urlparse
from clearml.utilities.requests_toolbelt import MultipartEncoder from clearml.utilities.requests_toolbelt import MultipartEncoderMonitor, MultipartEncoder
from .callbacks import UploadProgressReport, DownloadProgressReport from .callbacks import UploadProgressReport, DownloadProgressReport
from .util import quote_url from .util import quote_url
from ..backend_api.session import Session from ..backend_api.session import Session
@ -180,6 +180,14 @@ class _HttpDriver(_Driver):
return self._containers[container_name] return self._containers[container_name]
def upload_object_via_stream(self, iterator, container, object_name, extra=None, callback=None, **kwargs): def upload_object_via_stream(self, iterator, container, object_name, extra=None, callback=None, **kwargs):
def monitor_callback(monitor):
new_chunk = monitor.bytes_read - monitor.previous_read
monitor.previous_read = monitor.bytes_read
try:
callback(new_chunk)
except Exception as ex:
self.get_logger().debug('Exception raised when running callback function: {}'.format(ex))
# when sending data in post, there is no connection timeout, just an entire upload timeout # when sending data in post, there is no connection timeout, just an entire upload timeout
timeout = int(self.timeout_total) timeout = int(self.timeout_total)
url = container.name url = container.name
@ -188,15 +196,7 @@ class _HttpDriver(_Driver):
host, _, path = object_name.partition('/') host, _, path = object_name.partition('/')
url += host + '/' url += host + '/'
m = MultipartEncoder(fields={ stream_size = None
path: (path, iterator, get_file_mimetype(object_name))
})
headers = {
'Content-Type': m.content_type,
}
headers.update(container.get_headers(url) or {})
if hasattr(iterator, 'tell') and hasattr(iterator, 'seek'): if hasattr(iterator, 'tell') and hasattr(iterator, 'seek'):
pos = iterator.tell() pos = iterator.tell()
iterator.seek(0, 2) iterator.seek(0, 2)
@ -204,6 +204,16 @@ class _HttpDriver(_Driver):
iterator.seek(pos, 0) iterator.seek(pos, 0)
timeout = max(timeout, (stream_size / 1024) / float(self.min_kbps_speed)) timeout = max(timeout, (stream_size / 1024) / float(self.min_kbps_speed))
m = MultipartEncoder(fields={path: (path, iterator, get_file_mimetype(object_name))})
if callback and stream_size:
m = MultipartEncoderMonitor(m, callback=monitor_callback)
m.previous_read = 0
headers = {
'Content-Type': m.content_type,
}
headers.update(container.get_headers(url) or {})
res = container.session.post( res = container.session.post(
url, data=m, timeout=timeout, headers=headers url, data=m, timeout=timeout, headers=headers
) )
@ -211,12 +221,6 @@ class _HttpDriver(_Driver):
raise ValueError('Failed uploading object %s (%d): %s' % (object_name, res.status_code, res.text)) raise ValueError('Failed uploading object %s (%d): %s' % (object_name, res.status_code, res.text))
# call back is useless because we are not calling it while uploading... # call back is useless because we are not calling it while uploading...
# if callback and stream_size:
# try:
# callback(stream_size)
# except Exception as ex:
# log.debug('Exception raised when running callback function: %s' % ex)
return res return res
def list_container_objects(self, *args, **kwargs): def list_container_objects(self, *args, **kwargs):