Fix parallel dataset verification fails on older versions of Python (#1144)

This commit is contained in:
allegroai 2023-11-05 21:01:11 +02:00
parent d6d8aa9318
commit 4303664d5b

View File

@ -3221,14 +3221,14 @@ class Dataset(object):
# type: (Path, Union[FileEntry, LinkEntry], Optional[int], Optional[dict]) -> bool # type: (Path, Union[FileEntry, LinkEntry], Optional[int], Optional[dict]) -> bool
# check if we need the file for the requested dataset part # check if we need the file for the requested dataset part
if ds_part is not None: if part is not None:
f_parts = ds_chunk_selection.get(file_entry.parent_dataset_id, []) f_parts = chunk_selection.get(file_entry.parent_dataset_id, [])
# file is not in requested dataset part, no need to check it. # file is not in requested dataset part, no need to check it.
if self._get_chunk_idx_from_artifact_name(file_entry.artifact_name) not in f_parts: if self._get_chunk_idx_from_artifact_name(file_entry.artifact_name) not in f_parts:
return True return True
# check if the local size and the stored size match (faster than comparing hash) # check if the local size and the stored size match (faster than comparing hash)
if (base_folder / file_entry.relative_path).stat().st_size != file_entry.size: if (target_base_folder / file_entry.relative_path).stat().st_size != file_entry.size:
return False return False
return True return True
@ -3240,23 +3240,19 @@ class Dataset(object):
tp = None tp = None
try: try:
futures_ = [] futures_ = []
tp = ThreadPoolExecutor(max_workers=max_workers) with ThreadPoolExecutor(max_workers=max_workers) as tp:
for f in self._dataset_file_entries.values(): for f in self._dataset_file_entries.values():
future = tp.submit(__verify_file_or_link, target_base_folder, f, part, chunk_selection) future = tp.submit(__verify_file_or_link, target_base_folder, f, part, chunk_selection)
futures_.append(future) futures_.append(future)
for f in self._dataset_link_entries.values(): for f in self._dataset_link_entries.values():
# don't check whether link is in dataset part, hence None for part and chunk_selection # don't check whether link is in dataset part, hence None for part and chunk_selection
future = tp.submit(__verify_file_or_link, target_base_folder, f, None, None) future = tp.submit(__verify_file_or_link, target_base_folder, f, None, None)
futures_.append(future) futures_.append(future)
verified = all(f.result() for f in futures_) verified = all(f.result() for f in futures_)
except Exception: except Exception:
verified = False verified = False
finally:
if tp is not None:
# we already have our result, close all pending checks (improves performance when verified==False)
tp.shutdown(cancel_futures=True)
return verified return verified