Verify dataset in parallel (#1131)

This commit is contained in:
charlienewey-odin 2023-10-07 14:39:08 +01:00 committed by GitHub
parent f02b1fc190
commit 9687ca3db1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2497,7 +2497,7 @@ class Dataset(object):
# check if target folder is not empty, see if it contains everything we need
if target_base_folder and next(target_base_folder.iterdir(), None):
if self._verify_dataset_folder(target_base_folder, part, chunk_selection):
if self._verify_dataset_folder(target_base_folder, part, chunk_selection, max_workers):
target_base_folder.touch()
self._release_lock_ds_target_folder(target_base_folder)
return target_base_folder.as_posix()
@ -2538,7 +2538,7 @@ class Dataset(object):
raise_on_error=False, force=False)
# verify entire dataset (if failed, force downloading parent datasets)
if not self._verify_dataset_folder(target_base_folder, part, chunk_selection):
if not self._verify_dataset_folder(target_base_folder, part, chunk_selection, max_workers):
LoggerRoot.get_base_logger().info('Dataset parents need refreshing, re-fetching all parent datasets')
# we should delete the entire cache folder
self._extract_parent_datasets(
@ -3214,31 +3214,42 @@ class Dataset(object):
raise ValueError("Dataset merging failed: {}".format([e for e in errors if e is not None]))
pool.close()
def _verify_dataset_folder(self, target_base_folder, part, chunk_selection):
# type: (Path, Optional[int], Optional[dict]) -> bool
def _verify_dataset_folder(self, target_base_folder, part, chunk_selection, max_workers):
# type: (Path, Optional[int], Optional[dict], Optional[int]) -> bool
def __verify_file_or_link(target_base_folder, part, chunk_selection, file_entry):
# type: (Path, Optional[int], Optional[dict], DatasetFileEntry) -> Optional[bool]
# check if we need the file for the requested dataset part
if part is not None:
f_parts = chunk_selection.get(file_entry.parent_dataset_id, [])
# file is not in requested dataset part, no need to check it.
if self._get_chunk_idx_from_artifact_name(file_entry.artifact_name) not in f_parts:
return None
# check if the local size and the stored size match (faster than comparing hash)
if (target_base_folder / file_entry.relative_path).stat().st_size != file_entry.size:
return False
return True
target_base_folder = Path(target_base_folder)
# check dataset file size, if we have a full match no need for parent dataset download / merge
verified = True
# noinspection PyBroadException
try:
for f in self._dataset_file_entries.values():
# check if we need it for the current part
if part is not None:
f_parts = chunk_selection.get(f.parent_dataset_id, [])
# this is not in our current part, no need to check it.
if self._get_chunk_idx_from_artifact_name(f.artifact_name) not in f_parts:
continue
futures_ = []
with ThreadPoolExecutor(max_workers=max_workers) as tp:
for f in self._dataset_file_entries.values():
future = tp.submit(__verify_file_or_link, target_base_folder, part, chunk_selection, f)
futures_.append(future)
# check if the local size and the stored size match (faster than comparing hash)
if (target_base_folder / f.relative_path).stat().st_size != f.size:
verified = False
break
for f in self._dataset_link_entries.values():
if (target_base_folder / f.relative_path).stat().st_size != f.size:
verified = False
break
for f in self._dataset_link_entries.values():
# don't check whether link is in dataset part, hence None for part and chunk_selection
future = tp.submit(__verify_file_or_link, target_base_folder, None, None, f)
futures_.append(future)
verified = all(f.result() != False for f in futures_)
except Exception:
verified = False