From f964783046b07b9edeee02d13a360d7c8dec56e8 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 24 Oct 2023 18:39:10 +0300 Subject: [PATCH] Improve parallel behavior --- clearml/datasets/dataset.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/clearml/datasets/dataset.py b/clearml/datasets/dataset.py index bcae9440..a56cdaa0 100644 --- a/clearml/datasets/dataset.py +++ b/clearml/datasets/dataset.py @@ -3215,17 +3215,17 @@ class Dataset(object): pool.close() def _verify_dataset_folder(self, target_base_folder, part, chunk_selection, max_workers): - # type: (Path, Optional[int], Optional[dict], Optional[int]) -> bool + # type: (Path, int, dict, int) -> bool - def verify_file_or_link(base_folder, ds_part, ds_chunk_selection, file_entry): - # type: (Path, Optional[int], Optional[dict], FileEntry) -> Optional[bool] + def __verify_file_or_link(target_base_folder, file_entry, part=None, chunk_selection=None): + # type: (Path, Union[FileEntry, LinkEntry], Optional[int], Optional[dict]) -> bool # check if we need the file for the requested dataset part if ds_part is not None: f_parts = ds_chunk_selection.get(file_entry.parent_dataset_id, []) # file is not in requested dataset part, no need to check it. if self._get_chunk_idx_from_artifact_name(file_entry.artifact_name) not in f_parts: - return None + return True # check if the local size and the stored size match (faster than comparing hash) if (base_folder / file_entry.relative_path).stat().st_size != file_entry.size: @@ -3237,21 +3237,26 @@ class Dataset(object): # check dataset file size, if we have a full match no need for parent dataset download / merge verified = True # noinspection PyBroadException + tp = None try: futures_ = [] - with ThreadPoolExecutor(max_workers=max_workers) as tp: - for f in self._dataset_file_entries.values(): - future = tp.submit(verify_file_or_link, target_base_folder, part, chunk_selection, f) - futures_.append(future) + tp = ThreadPoolExecutor(max_workers=max_workers) + for f in self._dataset_file_entries.values(): + future = tp.submit(__verify_file_or_link, target_base_folder, f, part, chunk_selection) + futures_.append(future) - for f in self._dataset_link_entries.values(): - # don't check whether link is in dataset part, hence None for part and chunk_selection - future = tp.submit(verify_file_or_link, target_base_folder, None, None, f) - futures_.append(future) + for f in self._dataset_link_entries.values(): + # don't check whether link is in dataset part, hence None for part and chunk_selection + future = tp.submit(__verify_file_or_link, target_base_folder, f, None, None) + futures_.append(future) - verified = all(f.result() is not False for f in futures_) + verified = all(f.result() for f in futures_) except Exception: verified = False + finally: + if tp is not None: + # we already have our result, close all pending checks (improves performance when verified==False) + tp.shutdown(cancel_futures=True) return verified