From 16c8a037cc6520ac37ed4160c2dd09b792b20f92 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 13 Dec 2022 15:48:18 +0200 Subject: [PATCH] Fix can't add S3 external links (#845) --- clearml/datasets/dataset.py | 2 +- clearml/storage/helper.py | 90 ++++++++++++++++++++++++++++--------- clearml/storage/manager.py | 16 +++---- 3 files changed, 75 insertions(+), 33 deletions(-) diff --git a/clearml/datasets/dataset.py b/clearml/datasets/dataset.py index e0af68e4..da27734b 100644 --- a/clearml/datasets/dataset.py +++ b/clearml/datasets/dataset.py @@ -3016,7 +3016,7 @@ class Dataset(object): # noinspection PyBroadException try: if StorageManager.exists_file(source_url): - remote_objects = [StorageManager.get_metadata(source_url)] + remote_objects = [StorageManager.get_metadata(source_url, return_full_path=True)] elif not source_url.startswith(("http://", "https://")): if source_url[-1] != "/": source_url = source_url + "/" diff --git a/clearml/storage/helper.py b/clearml/storage/helper.py index 19d22d56..ea9d84c6 100644 --- a/clearml/storage/helper.py +++ b/clearml/storage/helper.py @@ -73,7 +73,7 @@ class _Driver(object): pass @abstractmethod - def list_container_objects(self, container, ex_prefix, **kwargs): + def list_container_objects(self, container, ex_prefix=None, **kwargs): pass @abstractmethod @@ -100,6 +100,10 @@ class _Driver(object): def get_object(self, container_name, object_name, **kwargs): pass + @abstractmethod + def exists_file(self, container_name, object_name): + pass + @classmethod def get_file_server_hosts(cls): if cls._file_server_hosts is None: @@ -230,7 +234,6 @@ class StorageHelper(object): :return: A StorageHelper instance. """ - # Handle URL substitution etc before locating the correct storage driver url = cls._canonize_url(url) @@ -374,7 +377,7 @@ class StorageHelper(object): # if this is not a known scheme assume local file # url2pathname is specifically intended to operate on (urlparse result).path # and returns a cross-platform compatible result - new_url = normalize_local_path(url) + new_url = normalize_local_path(url[len("file://"):] if url.startswith("file://") else url) self._driver = _FileStorageDriver(new_url) # noinspection PyBroadException try: @@ -638,10 +641,12 @@ class StorageHelper(object): :return: A dict containing the metadata of the remote object """ - return { - "name": obj.name if hasattr(obj, "name") else obj.url if hasattr(obj, "url") else None, + name_fields = ("name", "url", "key", "blob_name") + metadata = { "size": self._get_object_size_bytes(obj), + "name": next(filter(None, (getattr(obj, f, None) for f in name_fields)), None), } + return metadata def verify_upload(self, folder_uri='', raise_on_error=True, log_on_error=True): """ @@ -770,18 +775,19 @@ class StorageHelper(object): """ if prefix: if prefix.startswith(self._base_url): - prefix = prefix[len(self.base_url):].lstrip("/") - - try: - res = self._driver.list_container_objects(self._container, ex_prefix=prefix) - except TypeError: - res = self._driver.list_container_objects(self._container) - + prefix = prefix[len(self._base_url):] + if self._base_url != "file://": + prefix = prefix.lstrip("/") + if self._base_url == "file://": + prefix = prefix.rstrip("/") + if prefix.startswith(str(self._driver.base_path)): + prefix = prefix[len(str(self._driver.base_path)):] + res = self._driver.list_container_objects(self._container, ex_prefix=prefix) result = [ obj.name if not with_metadata else self.get_object_metadata(obj) for obj in res - if (obj.name.startswith(prefix) or self._base_url == "file://") and obj.name != prefix ] + if self._base_url == "file://": if not with_metadata: result = [Path(f).as_posix() for f in result] @@ -1223,6 +1229,12 @@ class StorageHelper(object): except Exception: pass + def exists_file(self, remote_url): + object_name = self._normalize_object_name(remote_url) + return self._driver.exists_file( + container_name=self._container.name if self._container else "", object_name=object_name + ) + class _HttpDriver(_Driver): """ LibCloud http/https adapter (simple, enough for now) """ @@ -1395,6 +1407,13 @@ class _HttpDriver(_Driver): return self.upload_object_via_stream(iterator=stream, container=container, object_name=object_name, extra=extra, callback=callback, **kwargs) + def exists_file(self, container_name, object_name): + # noinspection PyBroadException + try: + return requests.head(container_name + object_name, allow_redirects=True).ok + except Exception: + return False + class _Stream(object): encoding = None @@ -1812,6 +1831,15 @@ class _Boto3Driver(_Driver): def test_upload(self, test_path, config, **_): return True + def exists_file(self, container_name, object_name): + obj = self.get_object(container_name, object_name) + # noinspection PyBroadException + try: + obj.load() + except Exception: + return False + return bool(obj) + class _GoogleCloudStorageDriver(_Driver): """Storage driver for google cloud storage""" @@ -1886,8 +1914,13 @@ class _GoogleCloudStorageDriver(_Driver): return False return True - def list_container_objects(self, container, **kwargs): - return list(container.bucket.list_blobs()) + def list_container_objects(self, container, ex_prefix=None, **kwargs): + # noinspection PyBroadException + try: + return list(container.bucket.list_blobs(prefix=ex_prefix)) + except TypeError: + # google-cloud-storage < 1.17 + return [blob for blob in container.bucket.list_blobs() if blob.name.startswith(ex_prefix)] def delete_object(self, object, **kwargs): try: @@ -1957,6 +1990,9 @@ class _GoogleCloudStorageDriver(_Driver): def get_direct_access(self, remote_path, **_): return None + def exists_file(self, container_name, object_name): + return self.get_object(container_name, object_name).exists() + class _AzureBlobServiceStorageDriver(_Driver): scheme = "azure" @@ -2302,6 +2338,10 @@ class _AzureBlobServiceStorageDriver(_Driver): def get_direct_access(self, remote_path, **_): return None + def exists_file(self, container_name, object_name): + container = self.get_container(container_name) + return container.exists(container_name, blob_name=object_name) + class _FileStorageDriver(_Driver): """ @@ -2364,8 +2404,6 @@ class _FileStorageDriver(_Driver): try: stat = os.stat(full_path) - if not os.path.isdir(full_path): - raise OSError("Target path \"{}\" is not a directory".format(full_path)) except OSError: raise OSError("Target path \"{}\" is not accessible or does not exist".format(full_path)) @@ -2422,12 +2460,14 @@ class _FileStorageDriver(_Driver): continue yield self._make_container(container_name) - def _get_objects(self, container): + def _get_objects(self, container, prefix=None): """ Recursively iterate through the file-system and return the object names """ cpath = self.get_container_cdn_url(container, check=True) + if prefix: + cpath += "/" + prefix for folder, subfolders, files in os.walk(cpath, topdown=True): # Remove unwanted subfolders @@ -2440,17 +2480,20 @@ class _FileStorageDriver(_Driver): object_name = os.path.relpath(full_path, start=cpath) yield self._make_object(container, object_name) - def iterate_container_objects(self, container): + def iterate_container_objects(self, container, prefix=None): """ Returns a generator of objects for the given container. :param container: Container instance :type container: :class:`Container` + :param prefix: The path of a sub directory under the base container path. + The iterator will only include paths under that subdir. + :type prefix: Optional[str] :return: A generator of Object instances. """ - return self._get_objects(container) + return self._get_objects(container, prefix=prefix) def get_container(self, container_name, **_): """ @@ -2750,8 +2793,8 @@ class _FileStorageDriver(_Driver): return True - def list_container_objects(self, container, **kwargs): - return list(self.iterate_container_objects(container)) + def list_container_objects(self, container, ex_prefix=None, **kwargs): + return list(self.iterate_container_objects(container, prefix=ex_prefix)) @staticmethod def _read_in_chunks(iterator, chunk_size=None, fill_size=False, yield_empty=False): @@ -2827,6 +2870,9 @@ class _FileStorageDriver(_Driver): def test_upload(self, test_path, config, **kwargs): return True + def exists_file(self, container_name, object_name): + return os.path.isfile(object_name) + def normalize_local_path(local_path): """ diff --git a/clearml/storage/manager.py b/clearml/storage/manager.py index 7025c243..5fb4f2da 100644 --- a/clearml/storage/manager.py +++ b/clearml/storage/manager.py @@ -320,15 +320,8 @@ class StorageManager(object): try: if remote_url.endswith("/"): return False - if remote_url.startswith("file://"): - return os.path.isfile(remote_url[len("file://"):]) - if remote_url.startswith(("http://", "https://")): - return requests.head(remote_url).ok helper = StorageHelper.get(remote_url) - obj = helper.get_object(remote_url) - if not obj: - return False - return True + return helper.exists_file(remote_url) except Exception: return False @@ -469,7 +462,7 @@ class StorageManager(object): return helper_list_result @classmethod - def get_metadata(cls, remote_url): + def get_metadata(cls, remote_url, return_full_path=False): # type: (str) -> Optional[dict] """ Get the metadata of the a remote object. @@ -485,4 +478,7 @@ class StorageManager(object): obj = helper.get_object(remote_url) if not obj: return None - return helper.get_object_metadata(obj) + metadata = helper.get_object_metadata(obj) + if return_full_path and not metadata["name"].startswith(helper.base_url): + metadata["name"] = helper.base_url + ("/" if not helper.base_url.endswith("/") else "") + metadata["name"] + return metadata