mirror of
https://github.com/clearml/clearml
synced 2025-01-31 09:07:00 +00:00
Improve parallel behavior
This commit is contained in:
parent
c26efb83af
commit
f964783046
@ -3215,17 +3215,17 @@ class Dataset(object):
|
|||||||
pool.close()
|
pool.close()
|
||||||
|
|
||||||
def _verify_dataset_folder(self, target_base_folder, part, chunk_selection, max_workers):
|
def _verify_dataset_folder(self, target_base_folder, part, chunk_selection, max_workers):
|
||||||
# type: (Path, Optional[int], Optional[dict], Optional[int]) -> bool
|
# type: (Path, int, dict, int) -> bool
|
||||||
|
|
||||||
def verify_file_or_link(base_folder, ds_part, ds_chunk_selection, file_entry):
|
def __verify_file_or_link(target_base_folder, file_entry, part=None, chunk_selection=None):
|
||||||
# type: (Path, Optional[int], Optional[dict], FileEntry) -> Optional[bool]
|
# type: (Path, Union[FileEntry, LinkEntry], Optional[int], Optional[dict]) -> bool
|
||||||
|
|
||||||
# check if we need the file for the requested dataset part
|
# check if we need the file for the requested dataset part
|
||||||
if ds_part is not None:
|
if ds_part is not None:
|
||||||
f_parts = ds_chunk_selection.get(file_entry.parent_dataset_id, [])
|
f_parts = ds_chunk_selection.get(file_entry.parent_dataset_id, [])
|
||||||
# file is not in requested dataset part, no need to check it.
|
# file is not in requested dataset part, no need to check it.
|
||||||
if self._get_chunk_idx_from_artifact_name(file_entry.artifact_name) not in f_parts:
|
if self._get_chunk_idx_from_artifact_name(file_entry.artifact_name) not in f_parts:
|
||||||
return None
|
return True
|
||||||
|
|
||||||
# check if the local size and the stored size match (faster than comparing hash)
|
# check if the local size and the stored size match (faster than comparing hash)
|
||||||
if (base_folder / file_entry.relative_path).stat().st_size != file_entry.size:
|
if (base_folder / file_entry.relative_path).stat().st_size != file_entry.size:
|
||||||
@ -3237,21 +3237,26 @@ class Dataset(object):
|
|||||||
# check dataset file size, if we have a full match no need for parent dataset download / merge
|
# check dataset file size, if we have a full match no need for parent dataset download / merge
|
||||||
verified = True
|
verified = True
|
||||||
# noinspection PyBroadException
|
# noinspection PyBroadException
|
||||||
|
tp = None
|
||||||
try:
|
try:
|
||||||
futures_ = []
|
futures_ = []
|
||||||
with ThreadPoolExecutor(max_workers=max_workers) as tp:
|
tp = ThreadPoolExecutor(max_workers=max_workers)
|
||||||
for f in self._dataset_file_entries.values():
|
for f in self._dataset_file_entries.values():
|
||||||
future = tp.submit(verify_file_or_link, target_base_folder, part, chunk_selection, f)
|
future = tp.submit(__verify_file_or_link, target_base_folder, f, part, chunk_selection)
|
||||||
futures_.append(future)
|
futures_.append(future)
|
||||||
|
|
||||||
for f in self._dataset_link_entries.values():
|
for f in self._dataset_link_entries.values():
|
||||||
# don't check whether link is in dataset part, hence None for part and chunk_selection
|
# don't check whether link is in dataset part, hence None for part and chunk_selection
|
||||||
future = tp.submit(verify_file_or_link, target_base_folder, None, None, f)
|
future = tp.submit(__verify_file_or_link, target_base_folder, f, None, None)
|
||||||
futures_.append(future)
|
futures_.append(future)
|
||||||
|
|
||||||
verified = all(f.result() is not False for f in futures_)
|
verified = all(f.result() for f in futures_)
|
||||||
except Exception:
|
except Exception:
|
||||||
verified = False
|
verified = False
|
||||||
|
finally:
|
||||||
|
if tp is not None:
|
||||||
|
# we already have our result, close all pending checks (improves performance when verified==False)
|
||||||
|
tp.shutdown(cancel_futures=True)
|
||||||
|
|
||||||
return verified
|
return verified
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user