Fix can't add S3 external links (#845)

This commit is contained in:
allegroai 2022-12-13 15:48:18 +02:00
parent 62a5ef102a
commit 16c8a037cc
3 changed files with 75 additions and 33 deletions

View File

@ -3016,7 +3016,7 @@ class Dataset(object):
# noinspection PyBroadException
try:
if StorageManager.exists_file(source_url):
remote_objects = [StorageManager.get_metadata(source_url)]
remote_objects = [StorageManager.get_metadata(source_url, return_full_path=True)]
elif not source_url.startswith(("http://", "https://")):
if source_url[-1] != "/":
source_url = source_url + "/"

View File

@ -73,7 +73,7 @@ class _Driver(object):
pass
@abstractmethod
def list_container_objects(self, container, ex_prefix, **kwargs):
def list_container_objects(self, container, ex_prefix=None, **kwargs):
pass
@abstractmethod
@ -100,6 +100,10 @@ class _Driver(object):
def get_object(self, container_name, object_name, **kwargs):
pass
@abstractmethod
def exists_file(self, container_name, object_name):
pass
@classmethod
def get_file_server_hosts(cls):
if cls._file_server_hosts is None:
@ -230,7 +234,6 @@ class StorageHelper(object):
:return: A StorageHelper instance.
"""
# Handle URL substitution etc before locating the correct storage driver
url = cls._canonize_url(url)
@ -374,7 +377,7 @@ class StorageHelper(object):
# if this is not a known scheme assume local file
# url2pathname is specifically intended to operate on (urlparse result).path
# and returns a cross-platform compatible result
new_url = normalize_local_path(url)
new_url = normalize_local_path(url[len("file://"):] if url.startswith("file://") else url)
self._driver = _FileStorageDriver(new_url)
# noinspection PyBroadException
try:
@ -638,10 +641,12 @@ class StorageHelper(object):
:return: A dict containing the metadata of the remote object
"""
return {
"name": obj.name if hasattr(obj, "name") else obj.url if hasattr(obj, "url") else None,
name_fields = ("name", "url", "key", "blob_name")
metadata = {
"size": self._get_object_size_bytes(obj),
"name": next(filter(None, (getattr(obj, f, None) for f in name_fields)), None),
}
return metadata
def verify_upload(self, folder_uri='', raise_on_error=True, log_on_error=True):
"""
@ -770,18 +775,19 @@ class StorageHelper(object):
"""
if prefix:
if prefix.startswith(self._base_url):
prefix = prefix[len(self.base_url):].lstrip("/")
try:
res = self._driver.list_container_objects(self._container, ex_prefix=prefix)
except TypeError:
res = self._driver.list_container_objects(self._container)
prefix = prefix[len(self._base_url):]
if self._base_url != "file://":
prefix = prefix.lstrip("/")
if self._base_url == "file://":
prefix = prefix.rstrip("/")
if prefix.startswith(str(self._driver.base_path)):
prefix = prefix[len(str(self._driver.base_path)):]
res = self._driver.list_container_objects(self._container, ex_prefix=prefix)
result = [
obj.name if not with_metadata else self.get_object_metadata(obj)
for obj in res
if (obj.name.startswith(prefix) or self._base_url == "file://") and obj.name != prefix
]
if self._base_url == "file://":
if not with_metadata:
result = [Path(f).as_posix() for f in result]
@ -1223,6 +1229,12 @@ class StorageHelper(object):
except Exception:
pass
def exists_file(self, remote_url):
object_name = self._normalize_object_name(remote_url)
return self._driver.exists_file(
container_name=self._container.name if self._container else "", object_name=object_name
)
class _HttpDriver(_Driver):
""" LibCloud http/https adapter (simple, enough for now) """
@ -1395,6 +1407,13 @@ class _HttpDriver(_Driver):
return self.upload_object_via_stream(iterator=stream, container=container,
object_name=object_name, extra=extra, callback=callback, **kwargs)
def exists_file(self, container_name, object_name):
# noinspection PyBroadException
try:
return requests.head(container_name + object_name, allow_redirects=True).ok
except Exception:
return False
class _Stream(object):
encoding = None
@ -1812,6 +1831,15 @@ class _Boto3Driver(_Driver):
def test_upload(self, test_path, config, **_):
return True
def exists_file(self, container_name, object_name):
obj = self.get_object(container_name, object_name)
# noinspection PyBroadException
try:
obj.load()
except Exception:
return False
return bool(obj)
class _GoogleCloudStorageDriver(_Driver):
"""Storage driver for google cloud storage"""
@ -1886,8 +1914,13 @@ class _GoogleCloudStorageDriver(_Driver):
return False
return True
def list_container_objects(self, container, **kwargs):
return list(container.bucket.list_blobs())
def list_container_objects(self, container, ex_prefix=None, **kwargs):
# noinspection PyBroadException
try:
return list(container.bucket.list_blobs(prefix=ex_prefix))
except TypeError:
# google-cloud-storage < 1.17
return [blob for blob in container.bucket.list_blobs() if blob.name.startswith(ex_prefix)]
def delete_object(self, object, **kwargs):
try:
@ -1957,6 +1990,9 @@ class _GoogleCloudStorageDriver(_Driver):
def get_direct_access(self, remote_path, **_):
return None
def exists_file(self, container_name, object_name):
return self.get_object(container_name, object_name).exists()
class _AzureBlobServiceStorageDriver(_Driver):
scheme = "azure"
@ -2302,6 +2338,10 @@ class _AzureBlobServiceStorageDriver(_Driver):
def get_direct_access(self, remote_path, **_):
return None
def exists_file(self, container_name, object_name):
container = self.get_container(container_name)
return container.exists(container_name, blob_name=object_name)
class _FileStorageDriver(_Driver):
"""
@ -2364,8 +2404,6 @@ class _FileStorageDriver(_Driver):
try:
stat = os.stat(full_path)
if not os.path.isdir(full_path):
raise OSError("Target path \"{}\" is not a directory".format(full_path))
except OSError:
raise OSError("Target path \"{}\" is not accessible or does not exist".format(full_path))
@ -2422,12 +2460,14 @@ class _FileStorageDriver(_Driver):
continue
yield self._make_container(container_name)
def _get_objects(self, container):
def _get_objects(self, container, prefix=None):
"""
Recursively iterate through the file-system and return the object names
"""
cpath = self.get_container_cdn_url(container, check=True)
if prefix:
cpath += "/" + prefix
for folder, subfolders, files in os.walk(cpath, topdown=True):
# Remove unwanted subfolders
@ -2440,17 +2480,20 @@ class _FileStorageDriver(_Driver):
object_name = os.path.relpath(full_path, start=cpath)
yield self._make_object(container, object_name)
def iterate_container_objects(self, container):
def iterate_container_objects(self, container, prefix=None):
"""
Returns a generator of objects for the given container.
:param container: Container instance
:type container: :class:`Container`
:param prefix: The path of a sub directory under the base container path.
The iterator will only include paths under that subdir.
:type prefix: Optional[str]
:return: A generator of Object instances.
"""
return self._get_objects(container)
return self._get_objects(container, prefix=prefix)
def get_container(self, container_name, **_):
"""
@ -2750,8 +2793,8 @@ class _FileStorageDriver(_Driver):
return True
def list_container_objects(self, container, **kwargs):
return list(self.iterate_container_objects(container))
def list_container_objects(self, container, ex_prefix=None, **kwargs):
return list(self.iterate_container_objects(container, prefix=ex_prefix))
@staticmethod
def _read_in_chunks(iterator, chunk_size=None, fill_size=False, yield_empty=False):
@ -2827,6 +2870,9 @@ class _FileStorageDriver(_Driver):
def test_upload(self, test_path, config, **kwargs):
return True
def exists_file(self, container_name, object_name):
return os.path.isfile(object_name)
def normalize_local_path(local_path):
"""

View File

@ -320,15 +320,8 @@ class StorageManager(object):
try:
if remote_url.endswith("/"):
return False
if remote_url.startswith("file://"):
return os.path.isfile(remote_url[len("file://"):])
if remote_url.startswith(("http://", "https://")):
return requests.head(remote_url).ok
helper = StorageHelper.get(remote_url)
obj = helper.get_object(remote_url)
if not obj:
return False
return True
return helper.exists_file(remote_url)
except Exception:
return False
@ -469,7 +462,7 @@ class StorageManager(object):
return helper_list_result
@classmethod
def get_metadata(cls, remote_url):
def get_metadata(cls, remote_url, return_full_path=False):
# type: (str) -> Optional[dict]
"""
Get the metadata of the a remote object.
@ -485,4 +478,7 @@ class StorageManager(object):
obj = helper.get_object(remote_url)
if not obj:
return None
return helper.get_object_metadata(obj)
metadata = helper.get_object_metadata(obj)
if return_full_path and not metadata["name"].startswith(helper.base_url):
metadata["name"] = helper.base_url + ("/" if not helper.base_url.endswith("/") else "") + metadata["name"]
return metadata