From 9687ca3db199c780cde6354e6efb2a26a2d785cc Mon Sep 17 00:00:00 2001 From: charlienewey-odin <145554036+charlienewey-odin@users.noreply.github.com> Date: Sat, 7 Oct 2023 14:39:08 +0100 Subject: [PATCH] Verify dataset in parallel (#1131) --- clearml/datasets/dataset.py | 51 ++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/clearml/datasets/dataset.py b/clearml/datasets/dataset.py index c0658f73..41783345 100644 --- a/clearml/datasets/dataset.py +++ b/clearml/datasets/dataset.py @@ -2497,7 +2497,7 @@ class Dataset(object): # check if target folder is not empty, see if it contains everything we need if target_base_folder and next(target_base_folder.iterdir(), None): - if self._verify_dataset_folder(target_base_folder, part, chunk_selection): + if self._verify_dataset_folder(target_base_folder, part, chunk_selection, max_workers): target_base_folder.touch() self._release_lock_ds_target_folder(target_base_folder) return target_base_folder.as_posix() @@ -2538,7 +2538,7 @@ class Dataset(object): raise_on_error=False, force=False) # verify entire dataset (if failed, force downloading parent datasets) - if not self._verify_dataset_folder(target_base_folder, part, chunk_selection): + if not self._verify_dataset_folder(target_base_folder, part, chunk_selection, max_workers): LoggerRoot.get_base_logger().info('Dataset parents need refreshing, re-fetching all parent datasets') # we should delete the entire cache folder self._extract_parent_datasets( @@ -3214,31 +3214,42 @@ class Dataset(object): raise ValueError("Dataset merging failed: {}".format([e for e in errors if e is not None])) pool.close() - def _verify_dataset_folder(self, target_base_folder, part, chunk_selection): - # type: (Path, Optional[int], Optional[dict]) -> bool + def _verify_dataset_folder(self, target_base_folder, part, chunk_selection, max_workers): + # type: (Path, Optional[int], Optional[dict], Optional[int]) -> bool + + def __verify_file_or_link(target_base_folder, part, chunk_selection, file_entry): + # type: (Path, Optional[int], Optional[dict], DatasetFileEntry) -> Optional[bool] + + # check if we need the file for the requested dataset part + if part is not None: + f_parts = chunk_selection.get(file_entry.parent_dataset_id, []) + # file is not in requested dataset part, no need to check it. + if self._get_chunk_idx_from_artifact_name(file_entry.artifact_name) not in f_parts: + return None + + # check if the local size and the stored size match (faster than comparing hash) + if (target_base_folder / file_entry.relative_path).stat().st_size != file_entry.size: + return False + + return True + target_base_folder = Path(target_base_folder) # check dataset file size, if we have a full match no need for parent dataset download / merge verified = True # noinspection PyBroadException try: - for f in self._dataset_file_entries.values(): - # check if we need it for the current part - if part is not None: - f_parts = chunk_selection.get(f.parent_dataset_id, []) - # this is not in our current part, no need to check it. - if self._get_chunk_idx_from_artifact_name(f.artifact_name) not in f_parts: - continue + futures_ = [] + with ThreadPoolExecutor(max_workers=max_workers) as tp: + for f in self._dataset_file_entries.values(): + future = tp.submit(__verify_file_or_link, target_base_folder, part, chunk_selection, f) + futures_.append(future) - # check if the local size and the stored size match (faster than comparing hash) - if (target_base_folder / f.relative_path).stat().st_size != f.size: - verified = False - break - - for f in self._dataset_link_entries.values(): - if (target_base_folder / f.relative_path).stat().st_size != f.size: - verified = False - break + for f in self._dataset_link_entries.values(): + # don't check whether link is in dataset part, hence None for part and chunk_selection + future = tp.submit(__verify_file_or_link, target_base_folder, None, None, f) + futures_.append(future) + verified = all(f.result() != False for f in futures_) except Exception: verified = False