Limit number of _serialize requests when adding list of links with add_external_files() (#813)

This commit is contained in:
allegroai 2022-11-09 11:37:52 +02:00
parent 8340d4b86c
commit b793f2dfc6
4 changed files with 247 additions and 114 deletions

View File

@ -116,7 +116,7 @@ class Dataset(object):
__default_dataset_version = "1.0.0"
__dataset_folder_template = CacheManager.set_context_folder_lookup(__cache_context, "{0}_archive_{1}")
__preview_max_file_entries = 15000
__preview_max_size = 5 * 1024 * 1024
__preview_max_size = 32 * 1024
__min_api_version = "2.20"
__hyperparams_section = "Datasets"
__datasets_runtime_prop = "datasets"
@ -409,11 +409,13 @@ class Dataset(object):
dataset_path=None, # type: Optional[str]
recursive=True, # type: bool
verbose=False, # type: bool
max_workers=None # type: Optional[int]
):
# type: (...) -> ()
# type: (...) -> int
"""
Adds an external file or a folder to the current dataset.
External file links can be from cloud storage (s3://, gs://, azure://) or local / network storage (file://).
Adds external files or folders to the current dataset.
External file links can be from cloud storage (s3://, gs://, azure://), local / network storage (file://)
or http(s)// files.
Calculates file size for each file and compares against parent.
A few examples:
@ -436,92 +438,32 @@ class Dataset(object):
'image.jpg' will be downloaded to 's3_files/image.jpg' (relative path to the dataset)
:param recursive: If True match all wildcard files recursively
:param verbose: If True print to console files added/modified
:return: number of file links added
:param max_workers: The number of threads to add the external files with. Useful when `source_url` is
a sequence. Defaults to the number of logical cores
:return: Number of file links added
"""
num_added = 0
self._dirty = True
if not isinstance(source_url, str):
for source_url_ in source_url:
num_added += self.add_external_files(
num_added = 0
num_modified = 0
source_url_list = source_url if not isinstance(source_url, str) else [source_url]
max_workers = max_workers or psutil.cpu_count()
futures_ = []
with ThreadPoolExecutor(max_workers=max_workers) as tp:
for source_url_ in source_url_list:
futures_.append(
tp.submit(
self._add_external_files,
source_url_,
wildcard=wildcard,
dataset_path=dataset_path,
recursive=recursive,
verbose=verbose
verbose=verbose,
)
)
return num_added
if dataset_path:
dataset_path = dataset_path.lstrip("/")
# noinspection PyBroadException
try:
if StorageManager.exists_file(source_url):
links = [source_url]
else:
if source_url[-1] != "/":
source_url = source_url + "/"
links = StorageManager.list(source_url, return_full_path=True)
except Exception:
self._task.get_logger().report_text(
"Could not list/find remote file(s) when adding {}".format(source_url)
)
return 0
num_modified = 0
for link in links:
relative_path = link[len(source_url):]
if not relative_path:
relative_path = source_url.split("/")[-1]
if not matches_any_wildcard(relative_path, wildcard, recursive=recursive):
continue
try:
relative_path = Path(os.path.join(dataset_path or ".", relative_path)).as_posix()
size = StorageManager.get_file_size_bytes(link, silence_errors=True)
already_added_file = self._dataset_file_entries.get(relative_path)
if relative_path not in self._dataset_link_entries:
if verbose:
self._task.get_logger().report_text(
"External file {} added".format(link),
print_console=False,
)
self._dataset_link_entries[relative_path] = LinkEntry(
link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size
)
num_added += 1
elif already_added_file and already_added_file.size != size:
if verbose:
self._task.get_logger().report_text(
"External file {} modified".format(link),
print_console=False,
)
del self._dataset_file_entries[relative_path]
self._dataset_link_entries[relative_path] = LinkEntry(
link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size
)
num_modified += 1
elif (
relative_path in self._dataset_link_entries
and self._dataset_link_entries[relative_path].size != size
):
if verbose:
self._task.get_logger().report_text(
"External file {} modified".format(link),
print_console=False,
)
self._dataset_link_entries[relative_path] = LinkEntry(
link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size
)
num_modified += 1
else:
if verbose:
self._task.get_logger().report_text(
"External file {} skipped as it was not modified".format(link),
print_console=False,
)
except Exception as e:
if verbose:
self._task.get_logger().report_text(
"Error '{}' encountered trying to add external file {}".format(e, link),
print_console=False,
)
for future_ in futures_:
num_added_this_call, num_modified_this_call = future_.result()
num_added += num_added_this_call
num_modified += num_modified_this_call
self._task.add_tags([self.__external_files_tag])
self._add_script_call(
"add_external_files",
@ -712,11 +654,21 @@ class Dataset(object):
)
total_size += zip_.size
chunks_count += 1
truncated_preview = ""
add_truncated_message = False
truncated_message = "...\ntruncated (too many files to preview)"
for preview_entry in zip_.archive_preview[: Dataset.__preview_max_file_entries]:
truncated_preview += preview_entry + "\n"
if len(truncated_preview) > Dataset.__preview_max_size:
add_truncated_message = True
break
if len(zip_.archive_preview) > Dataset.__preview_max_file_entries:
add_truncated_message = True
pool.submit(
self._task.upload_artifact,
name=artifact_name,
artifact_object=Path(zip_path),
preview=zip_.archive_preview,
preview=truncated_preview + (truncated_message if add_truncated_message else ""),
delete_after_upload=True,
wait_on_upload=True,
)
@ -2972,6 +2924,110 @@ class Dataset(object):
self._dependency_chunk_lookup = self._build_dependency_chunk_lookup()
return self._dependency_chunk_lookup
def _add_external_files(
self,
source_url, # type: str
wildcard=None, # type: Optional[Union[str, Sequence[str]]]
dataset_path=None, # type: Optional[str]
recursive=True, # type: bool
verbose=False, # type: bool
):
# type: (...) -> Tuple[int, int]
"""
Auxiliary function for `add_external_files`
Adds an external file or a folder to the current dataset.
External file links can be from cloud storage (s3://, gs://, azure://) or local / network storage (file://).
Calculates file size for each file and compares against parent.
:param source_url: Source url link (e.g. s3://bucket/folder/path)
:param wildcard: add only specific set of files.
Wildcard matching, can be a single string or a list of wildcards.
:param dataset_path: The location in the dataset where the file will be downloaded into.
e.g: for source_url='s3://bucket/remote_folder/image.jpg' and dataset_path='s3_files',
'image.jpg' will be downloaded to 's3_files/image.jpg' (relative path to the dataset)
:param recursive: If True match all wildcard files recursively
:param verbose: If True print to console files added/modified
:return: Number of file links added and modified
"""
if dataset_path:
dataset_path = dataset_path.lstrip("/")
remote_objects = None
# noinspection PyBroadException
try:
if StorageManager.exists_file(source_url):
remote_objects = [StorageManager.get_metadata(source_url)]
elif not source_url.startswith(("http://", "https://")):
if source_url[-1] != "/":
source_url = source_url + "/"
remote_objects = StorageManager.list(source_url, with_metadata=True, return_full_path=True)
except Exception:
pass
if not remote_objects:
self._task.get_logger().report_text(
"Could not list/find remote file(s) when adding {}".format(source_url)
)
return 0, 0
num_added = 0
num_modified = 0
for remote_object in remote_objects:
link = remote_object.get("name")
relative_path = link[len(source_url):]
if not relative_path:
relative_path = source_url.split("/")[-1]
if not matches_any_wildcard(relative_path, wildcard, recursive=recursive):
continue
try:
relative_path = Path(os.path.join(dataset_path or ".", relative_path)).as_posix()
size = remote_object.get("size")
already_added_file = self._dataset_file_entries.get(relative_path)
if relative_path not in self._dataset_link_entries:
if verbose:
self._task.get_logger().report_text(
"External file {} added".format(link),
print_console=False,
)
self._dataset_link_entries[relative_path] = LinkEntry(
link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size
)
num_added += 1
elif already_added_file and already_added_file.size != size:
if verbose:
self._task.get_logger().report_text(
"External file {} modified".format(link),
print_console=False,
)
del self._dataset_file_entries[relative_path]
self._dataset_link_entries[relative_path] = LinkEntry(
link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size
)
num_modified += 1
elif (
relative_path in self._dataset_link_entries
and self._dataset_link_entries[relative_path].size != size
):
if verbose:
self._task.get_logger().report_text(
"External file {} modified".format(link),
print_console=False,
)
self._dataset_link_entries[relative_path] = LinkEntry(
link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size
)
num_modified += 1
else:
if verbose:
self._task.get_logger().report_text(
"External file {} skipped as it was not modified".format(link),
print_console=False,
)
except Exception as e:
if verbose:
self._task.get_logger().report_text(
"Error '{}' encountered trying to add external file {}".format(e, link),
print_console=False,
)
return num_added, num_modified
def _build_chunk_selection(self, part, num_parts):
# type: (int, int) -> Dict[str, int]
"""

View File

@ -595,8 +595,20 @@ class StorageHelper(object):
:return: The size of the file in bytes.
None if the file could not be found or an error occurred.
"""
size = None
obj = self.get_object(remote_url, silence_errors=silence_errors)
return self._get_object_size_bytes(obj)
def _get_object_size_bytes(self, obj):
# type: (object, bool) -> [int, None]
"""
Auxiliary function for `get_object_size_bytes`.
Get size of the remote object in bytes.
:param object obj: The remote object
:return: The size of the object in bytes.
None if an error occurred.
"""
if not obj:
return None
try:
@ -615,6 +627,21 @@ class StorageHelper(object):
pass
return size
def get_object_metadata(self, obj):
# type: (object) -> dict
"""
Get the metadata of the a remote object.
The metadata is a dict containing the following keys: `name`, `size`.
:param object obj: The remote object
:return: A dict containing the metadata of the remote object
"""
return {
"name": obj.name if hasattr(obj, "name") else obj.url if hasattr(obj, "url") else None,
"size": self._get_object_size_bytes(obj),
}
def verify_upload(self, folder_uri='', raise_on_error=True, log_on_error=True):
"""
Verify that this helper can upload files to a folder.
@ -716,12 +743,13 @@ class StorageHelper(object):
res = quote_url(res)
return res
def list(self, prefix=None):
def list(self, prefix=None, with_metadata=False):
"""
List entries in the helper base path.
Return a list of names inside this helper base path. The base path is
determined at creation time and is specific for each storage medium.
Return a list of names inside this helper base path or a list of dictionaries containing
the objects' metadata. The base path is determined at creation time and is specific
for each storage medium.
For Google Storage and S3 it is the bucket of the path.
For local files it is the root directory.
@ -731,11 +759,14 @@ class StorageHelper(object):
must be a string - the path of a sub directory under the base path.
the returned list will include only objects under that subdir.
:return: The paths of all the objects in the storage base
path under prefix. Listed relative to the base path.
:param with_metadata: Instead of returning just the names of the objects, return a list of dictionaries
containing the name and metadata of the remote file. Thus, each dictionary will contain the following
keys: `name`, `size`.
:return: The paths of all the objects in the storage base path under prefix or
a list of dictionaries containing the objects' metadata.
Listed relative to the base path.
"""
if prefix:
if prefix.startswith(self._base_url):
prefix = prefix[len(self.base_url):].lstrip("/")
@ -746,15 +777,22 @@ class StorageHelper(object):
res = self._driver.list_container_objects(self._container)
result = [
obj.name
obj.name if not with_metadata else self.get_object_metadata(obj)
for obj in res
if (obj.name.startswith(prefix) or self._base_url == "file://") and obj.name != prefix
]
if self._base_url == "file://":
result = [Path(f).as_posix() for f in result]
if not with_metadata:
result = [Path(f).as_posix() for f in result]
else:
for metadata_entry in result:
metadata_entry["name"] = Path(metadata_entry["name"]).as_posix()
return result
else:
return [obj.name for obj in self._driver.list_container_objects(self._container)]
return [
obj.name if not with_metadata else self.get_object_metadata(obj)
for obj in self._driver.list_container_objects(self._container)
]
def download_to_file(
self,

View File

@ -314,15 +314,21 @@ class StorageManager(object):
:return: True is the remote_url stores a file and False otherwise
"""
if remote_url.startswith("file://"):
return os.path.isfile(remote_url[len("file://"):])
if remote_url.startswith("http://") or remote_url.startswith("https://"):
return requests.head(remote_url).status_code == requests.codes.ok
helper = StorageHelper.get(remote_url)
obj = helper.get_object(remote_url)
if not obj:
# noinspection PyBroadException
try:
if remote_url.endswith("/"):
return False
if remote_url.startswith("file://"):
return os.path.isfile(remote_url[len("file://"):])
if remote_url.startswith(("http://", "https://")):
return requests.head(remote_url).ok
helper = StorageHelper.get(remote_url)
obj = helper.get_object(remote_url)
if not obj:
return False
return True
except Exception:
return False
return len(StorageManager.list(remote_url)) == 0
@classmethod
def get_file_size_bytes(cls, remote_url, silence_errors=False):
@ -419,10 +425,11 @@ class StorageManager(object):
return local_folder
@classmethod
def list(cls, remote_url, return_full_path=False):
# type: (str, bool) -> Optional[List[str]]
def list(cls, remote_url, return_full_path=False, with_metadata=False):
# type: (str, bool) -> Optional[List[Union[str, dict]]]
"""
Return a list of object names inside the base path
Return a list of object names inside the base path or dictionaries containing the corresponding
objects' metadata (in case `with_metadata` is True)
:param str remote_url: The base path.
For Google Storage, Azure and S3 it is the bucket of the path, for local files it is the root directory.
@ -431,17 +438,49 @@ class StorageManager(object):
Azure blob storage: `azure://bucket/folder_` and also file system listing: `/mnt/share/folder_`
:param bool return_full_path: If True, return a list of full object paths, otherwise return a list of
relative object paths (default False).
:param with_metadata: Instead of returning just the names of the objects, return a list of dictionaries
containing the name and metadata of the remote file. Thus, each dictionary will contain the following
keys: `name`, `size`.
`return_full_path` will modify the name of each dictionary entry to the full path.
:return: The paths of all the objects in the storage base path under prefix, relative to the base path.
:return: The paths of all the objects the storage base path under prefix or the dictionaries containing the objects' metadata, relative to the base path.
None in case of list operation is not supported (http and https protocols for example)
"""
helper = StorageHelper.get(remote_url)
try:
names_list = helper.list(prefix=remote_url)
helper_list_result = helper.list(prefix=remote_url, with_metadata=with_metadata)
except Exception as ex:
LoggerRoot.get_base_logger().warning("Can not list files for '{}' - {}".format(remote_url, ex))
names_list = None
return None
if helper.base_url == 'file://':
return ["{}/{}".format(remote_url.rstrip('/'), name) for name in names_list] if return_full_path else names_list
return ["{}/{}".format(helper.base_url, name) for name in names_list] if return_full_path else names_list
prefix = remote_url.rstrip("/") if helper.base_url == "file://" else helper.base_url
if not with_metadata:
return (
["{}/{}".format(prefix, name) for name in helper_list_result]
if return_full_path
else helper_list_result
)
else:
if return_full_path:
for obj in helper_list_result:
obj["name"] = "{}/{}".format(prefix, obj.get("name"))
return helper_list_result
@classmethod
def get_metadata(cls, remote_url):
# type: (str) -> Optional[dict]
"""
Get the metadata of the a remote object.
The metadata is a dict containing the following keys: `name`, `size`.
:param str remote_url: Source remote storage location, tree structure of `remote_url` will
be created under the target local_folder. Supports S3/GS/Azure, shared filesystem and http(s).
Example: 's3://bucket/data/'
:return: A dict containing the metadata of the remote object. In case of an error, `None` is returned
"""
helper = StorageHelper.get(remote_url)
obj = helper.get_object(remote_url)
if not obj:
return None
return helper.get_object_metadata(obj)

View File

@ -238,7 +238,7 @@ class ParallelZipper(object):
self.fd, self.zip_path = mkstemp(prefix=zip_prefix, suffix=zip_suffix)
self.zip_path = Path(self.zip_path)
self.zip_file = ZipFile(self.zip_path.as_posix(), "w", allowZip64=allow_zip_64, compression=compression)
self.archive_preview = ""
self.archive_preview = []
self.count = 0
self.files_zipped = set()
@ -259,7 +259,7 @@ class ParallelZipper(object):
preview_path = arcname
if not preview_path:
preview_path = file_path
self.archive_preview += "{} - {}\n".format(preview_path, format_size(self.size))
self.archive_preview.append("{} - {}".format(preview_path, format_size(self.size)))
self.files_zipped.add(Path(file_path).as_posix())
if self._chunk_size <= 0 or self.size < self._chunk_size:
self._zipper_queue.put(self)
@ -294,7 +294,7 @@ class ParallelZipper(object):
parent_zip.writestr(child_name, child_zip.open(child_name).read())
self.files_zipped |= other.files_zipped
self.count += other.count
self.archive_preview += other.archive_preview
self.archive_preview.extend(other.archive_preview)
def close(self):
# type: () -> ()