mirror of
https://github.com/clearml/clearml
synced 2025-03-03 10:42:00 +00:00
Fix Azure storage upload (#598)
This commit is contained in:
parent
835d95177e
commit
237c7241b3
@ -565,6 +565,7 @@ class StorageHelper(object):
|
|||||||
dest_path = os.path.basename(src_path)
|
dest_path = os.path.basename(src_path)
|
||||||
|
|
||||||
dest_path = self._canonize_url(dest_path)
|
dest_path = self._canonize_url(dest_path)
|
||||||
|
dest_path = dest_path.replace('\\', '/')
|
||||||
|
|
||||||
if cb and self.scheme in _HttpDriver.schemes:
|
if cb and self.scheme in _HttpDriver.schemes:
|
||||||
# store original callback
|
# store original callback
|
||||||
@ -687,7 +688,7 @@ class StorageHelper(object):
|
|||||||
# try to get file size
|
# try to get file size
|
||||||
try:
|
try:
|
||||||
if isinstance(self._driver, _HttpDriver) and obj:
|
if isinstance(self._driver, _HttpDriver) and obj:
|
||||||
obj = self._driver._get_download_object(obj)
|
obj = self._driver._get_download_object(obj) # noqa
|
||||||
total_size_mb = float(obj.headers.get('Content-Length', 0)) / (1024 * 1024)
|
total_size_mb = float(obj.headers.get('Content-Length', 0)) / (1024 * 1024)
|
||||||
elif hasattr(obj, 'size'):
|
elif hasattr(obj, 'size'):
|
||||||
size = obj.size
|
size = obj.size
|
||||||
@ -722,9 +723,10 @@ class StorageHelper(object):
|
|||||||
if not skip_zero_size_check and Path(temp_local_path).stat().st_size <= 0:
|
if not skip_zero_size_check and Path(temp_local_path).stat().st_size <= 0:
|
||||||
raise Exception('downloaded a 0-sized file')
|
raise Exception('downloaded a 0-sized file')
|
||||||
|
|
||||||
# if we are on windows, we need to remove the target file before renaming
|
# if we are on Windows, we need to remove the target file before renaming
|
||||||
# otherwise posix rename will overwrite the target
|
# otherwise posix rename will overwrite the target
|
||||||
if os.name != 'posix':
|
if os.name != 'posix':
|
||||||
|
# noinspection PyBroadException
|
||||||
try:
|
try:
|
||||||
os.remove(local_path)
|
os.remove(local_path)
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -757,8 +759,7 @@ class StorageHelper(object):
|
|||||||
if delete_on_failure:
|
if delete_on_failure:
|
||||||
# noinspection PyBroadException
|
# noinspection PyBroadException
|
||||||
try:
|
try:
|
||||||
if temp_local_path:
|
os.remove(temp_local_path)
|
||||||
os.remove(temp_local_path)
|
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
return None
|
return None
|
||||||
@ -1710,7 +1711,7 @@ class _AzureBlobServiceStorageDriver(_Driver):
|
|||||||
|
|
||||||
class _Container(object):
|
class _Container(object):
|
||||||
def __init__(self, name, config, account_url):
|
def __init__(self, name, config, account_url):
|
||||||
self.MAX_SINGLE_PUT_SIZE = 16 * 1024 * 1024
|
self.MAX_SINGLE_PUT_SIZE = 4 * 1024 * 1024
|
||||||
self.SOCKET_TIMEOUT = (300, 2000)
|
self.SOCKET_TIMEOUT = (300, 2000)
|
||||||
self.name = name
|
self.name = name
|
||||||
self.config = config
|
self.config = config
|
||||||
@ -1775,17 +1776,9 @@ class _AzureBlobServiceStorageDriver(_Driver):
|
|||||||
progress_callback=progress_callback,
|
progress_callback=progress_callback,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
client = self.__blob_service.get_blob_client(container_name, blob_name)
|
self.create_blob_from_data(
|
||||||
with open(path, "rb") as file:
|
container_name, None, blob_name, open(path, "rb"), max_connections=max_connections
|
||||||
first_chunk = True
|
)
|
||||||
for chunk in iter((lambda: file.read(self.MAX_SINGLE_PUT_SIZE)), b""):
|
|
||||||
if first_chunk:
|
|
||||||
client.upload_blob(chunk, overwrite=True, max_concurrency=max_connections)
|
|
||||||
first_chunk = False
|
|
||||||
else:
|
|
||||||
from azure.storage.blob import BlockType # noqa
|
|
||||||
|
|
||||||
client.upload_blob(chunk, BlockType.AppendBlob)
|
|
||||||
|
|
||||||
def delete_blob(self, container_name, blob_name):
|
def delete_blob(self, container_name, blob_name):
|
||||||
if self.__legacy:
|
if self.__legacy:
|
||||||
@ -1839,13 +1832,17 @@ class _AzureBlobServiceStorageDriver(_Driver):
|
|||||||
progress_callback=progress_callback,
|
progress_callback=progress_callback,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
client = self.__blob_service.get_blob_client(container_name, blob_name, max_concurrency=max_connections)
|
client = self.__blob_service.get_blob_client(container_name, blob_name)
|
||||||
with open(path, "wb") as file:
|
with open(path, "wb") as file:
|
||||||
return client.download_blob().download_to_stream(file)
|
return client.download_blob(max_concurrency=max_connections).download_to_stream(file)
|
||||||
|
|
||||||
def is_legacy(self):
|
def is_legacy(self):
|
||||||
return self.__legacy
|
return self.__legacy
|
||||||
|
|
||||||
|
@property
|
||||||
|
def blob_service(self):
|
||||||
|
return self.__blob_service
|
||||||
|
|
||||||
@attrs
|
@attrs
|
||||||
class _Object(object):
|
class _Object(object):
|
||||||
container = attrib()
|
container = attrib()
|
||||||
@ -1953,7 +1950,10 @@ class _AzureBlobServiceStorageDriver(_Driver):
|
|||||||
obj.blob_name,
|
obj.blob_name,
|
||||||
progress_callback=cb,
|
progress_callback=cb,
|
||||||
)
|
)
|
||||||
return blob.content
|
if container.is_legacy():
|
||||||
|
return blob.content
|
||||||
|
else:
|
||||||
|
return blob
|
||||||
|
|
||||||
def download_object(self, obj, local_path, overwrite_existing=True, delete_on_failure=True, callback=None, **_):
|
def download_object(self, obj, local_path, overwrite_existing=True, delete_on_failure=True, callback=None, **_):
|
||||||
p = Path(local_path)
|
p = Path(local_path)
|
||||||
@ -1981,7 +1981,8 @@ class _AzureBlobServiceStorageDriver(_Driver):
|
|||||||
max_connections=10,
|
max_connections=10,
|
||||||
progress_callback=callback_func,
|
progress_callback=callback_func,
|
||||||
)
|
)
|
||||||
download_done.wait()
|
if container.is_legacy():
|
||||||
|
download_done.wait()
|
||||||
|
|
||||||
def test_upload(self, test_path, config, **_):
|
def test_upload(self, test_path, config, **_):
|
||||||
container = self.get_container(config=config)
|
container = self.get_container(config=config)
|
||||||
|
@ -232,6 +232,7 @@ class StorageManager(object):
|
|||||||
if not Path(local_folder).is_dir():
|
if not Path(local_folder).is_dir():
|
||||||
base_logger.error("Local folder '{}' does not exist".format(local_folder))
|
base_logger.error("Local folder '{}' does not exist".format(local_folder))
|
||||||
return
|
return
|
||||||
|
local_folder = str(Path(local_folder))
|
||||||
results = []
|
results = []
|
||||||
helper = StorageHelper.get(remote_url)
|
helper = StorageHelper.get(remote_url)
|
||||||
with ThreadPool() as pool:
|
with ThreadPool() as pool:
|
||||||
@ -299,7 +300,7 @@ class StorageManager(object):
|
|||||||
continue
|
continue
|
||||||
local_url = os.path.join(
|
local_url = os.path.join(
|
||||||
str(Path(local_folder)),
|
str(Path(local_folder)),
|
||||||
str(Path(remote_path[len(remote_url):].lstrip(os.path.sep)))
|
str(Path(remote_path[len(remote_url):])).lstrip(os.path.sep)
|
||||||
)
|
)
|
||||||
if not os.path.exists(local_url) or os.path.getsize(local_url) == 0:
|
if not os.path.exists(local_url) or os.path.getsize(local_url) == 0:
|
||||||
results.append(
|
results.append(
|
||||||
|
Loading…
Reference in New Issue
Block a user