Raise error on failed uploads (#820)

This commit is contained in:
Lavi 2022-11-21 08:31:49 -05:00 committed by GitHub
parent f6b9efe54e
commit 1d77b55dde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -8,7 +8,7 @@ import re
import logging import logging
from copy import deepcopy, copy from copy import deepcopy, copy
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor, as_completed
from tempfile import mkdtemp from tempfile import mkdtemp
from typing import Union, Optional, Sequence, List, Dict, Any, Mapping, Tuple from typing import Union, Optional, Sequence, List, Dict, Any, Mapping, Tuple
from zipfile import ZIP_DEFLATED from zipfile import ZIP_DEFLATED
@ -627,6 +627,7 @@ class Dataset(object):
chunk_size = int(self._dataset_chunk_size_mb if not chunk_size else chunk_size) chunk_size = int(self._dataset_chunk_size_mb if not chunk_size else chunk_size)
with ThreadPoolExecutor(max_workers=max_workers) as pool: with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = []
parallel_zipper = ParallelZipper( parallel_zipper = ParallelZipper(
chunk_size, chunk_size,
max_workers, max_workers,
@ -674,14 +675,14 @@ class Dataset(object):
preview = truncated_preview + (truncated_message if add_truncated_message else "") preview = truncated_preview + (truncated_message if add_truncated_message else "")
total_preview_size += len(preview) total_preview_size += len(preview)
pool.submit( futures.append(pool.submit(
self._task.upload_artifact, self._task.upload_artifact,
name=artifact_name, name=artifact_name,
artifact_object=Path(zip_path), artifact_object=Path(zip_path),
preview=preview, preview=preview,
delete_after_upload=True, delete_after_upload=True,
wait_on_upload=True, wait_on_upload=True,
) ))
for file_entry in self._dataset_file_entries.values(): for file_entry in self._dataset_file_entries.values():
if file_entry.local_path is not None and \ if file_entry.local_path is not None and \
Path(file_entry.local_path).as_posix() in zip_.files_zipped: Path(file_entry.local_path).as_posix() in zip_.files_zipped:
@ -690,6 +691,12 @@ class Dataset(object):
if file_entry.parent_dataset_id == self._id: if file_entry.parent_dataset_id == self._id:
file_entry.local_path = None file_entry.local_path = None
self._serialize() self._serialize()
num_threads_with_errors = 0
for future in as_completed(futures):
if future.exception():
num_threads_with_errors += 1
if num_threads_with_errors > 0:
raise ValueError(f"errors reported uploading {num_threads_with_errors} chunks")
self._task.get_logger().report_text( self._task.get_logger().report_text(
"File compression and upload completed: total size {}, {} chunk(s) stored (average size {})".format( "File compression and upload completed: total size {}, {} chunk(s) stored (average size {})".format(