diff --git a/clearml/datasets/dataset.py b/clearml/datasets/dataset.py index 03ad3f02..0c94f10d 100644 --- a/clearml/datasets/dataset.py +++ b/clearml/datasets/dataset.py @@ -8,7 +8,7 @@ import re import logging from copy import deepcopy, copy from multiprocessing.pool import ThreadPool -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed from tempfile import mkdtemp from typing import Union, Optional, Sequence, List, Dict, Any, Mapping, Tuple from zipfile import ZIP_DEFLATED @@ -627,6 +627,7 @@ class Dataset(object): chunk_size = int(self._dataset_chunk_size_mb if not chunk_size else chunk_size) with ThreadPoolExecutor(max_workers=max_workers) as pool: + futures = [] parallel_zipper = ParallelZipper( chunk_size, max_workers, @@ -674,14 +675,14 @@ class Dataset(object): preview = truncated_preview + (truncated_message if add_truncated_message else "") total_preview_size += len(preview) - pool.submit( + futures.append(pool.submit( self._task.upload_artifact, name=artifact_name, artifact_object=Path(zip_path), preview=preview, delete_after_upload=True, wait_on_upload=True, - ) + )) for file_entry in self._dataset_file_entries.values(): if file_entry.local_path is not None and \ Path(file_entry.local_path).as_posix() in zip_.files_zipped: @@ -690,6 +691,12 @@ class Dataset(object): if file_entry.parent_dataset_id == self._id: file_entry.local_path = None self._serialize() + num_threads_with_errors = 0 + for future in as_completed(futures): + if future.exception(): + num_threads_with_errors += 1 + if num_threads_with_errors > 0: + raise ValueError(f"errors reported uploading {num_threads_with_errors} chunks") self._task.get_logger().report_text( "File compression and upload completed: total size {}, {} chunk(s) stored (average size {})".format(