From 237c7241b360b094fb5f0946f29bdb05102e5da7 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 15 Mar 2022 20:01:41 +0200 Subject: [PATCH] Fix Azure storage upload (#598) --- clearml/storage/helper.py | 41 +++++++++++++++++++------------------- clearml/storage/manager.py | 3 ++- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/clearml/storage/helper.py b/clearml/storage/helper.py index a741f1af..c94b320a 100644 --- a/clearml/storage/helper.py +++ b/clearml/storage/helper.py @@ -565,6 +565,7 @@ class StorageHelper(object): dest_path = os.path.basename(src_path) dest_path = self._canonize_url(dest_path) + dest_path = dest_path.replace('\\', '/') if cb and self.scheme in _HttpDriver.schemes: # store original callback @@ -687,7 +688,7 @@ class StorageHelper(object): # try to get file size try: if isinstance(self._driver, _HttpDriver) and obj: - obj = self._driver._get_download_object(obj) + obj = self._driver._get_download_object(obj) # noqa total_size_mb = float(obj.headers.get('Content-Length', 0)) / (1024 * 1024) elif hasattr(obj, 'size'): size = obj.size @@ -722,9 +723,10 @@ class StorageHelper(object): if not skip_zero_size_check and Path(temp_local_path).stat().st_size <= 0: raise Exception('downloaded a 0-sized file') - # if we are on windows, we need to remove the target file before renaming + # if we are on Windows, we need to remove the target file before renaming # otherwise posix rename will overwrite the target if os.name != 'posix': + # noinspection PyBroadException try: os.remove(local_path) except Exception: @@ -757,8 +759,7 @@ class StorageHelper(object): if delete_on_failure: # noinspection PyBroadException try: - if temp_local_path: - os.remove(temp_local_path) + os.remove(temp_local_path) except Exception: pass return None @@ -1710,7 +1711,7 @@ class _AzureBlobServiceStorageDriver(_Driver): class _Container(object): def __init__(self, name, config, account_url): - self.MAX_SINGLE_PUT_SIZE = 16 * 1024 * 1024 + self.MAX_SINGLE_PUT_SIZE = 4 * 1024 * 1024 self.SOCKET_TIMEOUT = (300, 2000) self.name = name self.config = config @@ -1775,17 +1776,9 @@ class _AzureBlobServiceStorageDriver(_Driver): progress_callback=progress_callback, ) else: - client = self.__blob_service.get_blob_client(container_name, blob_name) - with open(path, "rb") as file: - first_chunk = True - for chunk in iter((lambda: file.read(self.MAX_SINGLE_PUT_SIZE)), b""): - if first_chunk: - client.upload_blob(chunk, overwrite=True, max_concurrency=max_connections) - first_chunk = False - else: - from azure.storage.blob import BlockType # noqa - - client.upload_blob(chunk, BlockType.AppendBlob) + self.create_blob_from_data( + container_name, None, blob_name, open(path, "rb"), max_connections=max_connections + ) def delete_blob(self, container_name, blob_name): if self.__legacy: @@ -1839,13 +1832,17 @@ class _AzureBlobServiceStorageDriver(_Driver): progress_callback=progress_callback, ) else: - client = self.__blob_service.get_blob_client(container_name, blob_name, max_concurrency=max_connections) + client = self.__blob_service.get_blob_client(container_name, blob_name) with open(path, "wb") as file: - return client.download_blob().download_to_stream(file) + return client.download_blob(max_concurrency=max_connections).download_to_stream(file) def is_legacy(self): return self.__legacy + @property + def blob_service(self): + return self.__blob_service + @attrs class _Object(object): container = attrib() @@ -1953,7 +1950,10 @@ class _AzureBlobServiceStorageDriver(_Driver): obj.blob_name, progress_callback=cb, ) - return blob.content + if container.is_legacy(): + return blob.content + else: + return blob def download_object(self, obj, local_path, overwrite_existing=True, delete_on_failure=True, callback=None, **_): p = Path(local_path) @@ -1981,7 +1981,8 @@ class _AzureBlobServiceStorageDriver(_Driver): max_connections=10, progress_callback=callback_func, ) - download_done.wait() + if container.is_legacy(): + download_done.wait() def test_upload(self, test_path, config, **_): container = self.get_container(config=config) diff --git a/clearml/storage/manager.py b/clearml/storage/manager.py index a7029c85..9bcda57a 100644 --- a/clearml/storage/manager.py +++ b/clearml/storage/manager.py @@ -232,6 +232,7 @@ class StorageManager(object): if not Path(local_folder).is_dir(): base_logger.error("Local folder '{}' does not exist".format(local_folder)) return + local_folder = str(Path(local_folder)) results = [] helper = StorageHelper.get(remote_url) with ThreadPool() as pool: @@ -299,7 +300,7 @@ class StorageManager(object): continue local_url = os.path.join( str(Path(local_folder)), - str(Path(remote_path[len(remote_url):].lstrip(os.path.sep))) + str(Path(remote_path[len(remote_url):])).lstrip(os.path.sep) ) if not os.path.exists(local_url) or os.path.getsize(local_url) == 0: results.append(