mirror of
https://github.com/clearml/clearml
synced 2025-06-16 11:28:31 +00:00
Support parallel uploads and downloads
This commit is contained in:
parent
3474d70afb
commit
a4b24b18fb
@ -1,12 +1,13 @@
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import psutil
|
||||
from copy import deepcopy, copy
|
||||
from multiprocessing import cpu_count
|
||||
from multiprocessing.pool import ThreadPool
|
||||
from tempfile import mkstemp, mkdtemp
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from tempfile import mkdtemp
|
||||
from typing import Union, Optional, Sequence, List, Dict, Any, Mapping
|
||||
from zipfile import ZipFile, ZIP_DEFLATED
|
||||
from zipfile import ZIP_DEFLATED
|
||||
|
||||
from attr import attrs, attrib
|
||||
from pathlib2 import Path
|
||||
@ -21,6 +22,7 @@ from ..storage.helper import StorageHelper
|
||||
from ..storage.cache import CacheManager
|
||||
from ..storage.util import sha256sum, is_windows, md5text, format_size
|
||||
from ..utilities.matching import matches_any_wildcard
|
||||
from ..utilities.parallel import ParallelZipper
|
||||
|
||||
try:
|
||||
from pathlib import Path as _Path # noqa
|
||||
@ -222,6 +224,16 @@ class Dataset(object):
|
||||
# type: (List[str]) -> ()
|
||||
self._task.set_tags(values or [])
|
||||
|
||||
def add_tags(self, tags):
|
||||
# type: (Union[Sequence[str], str]) -> None
|
||||
"""
|
||||
Add Tags to this dataset. Old tags are not deleted. When executing a Task (experiment) remotely,
|
||||
this method has no effect.
|
||||
|
||||
:param tags: A list of tags which describe the Task to add.
|
||||
"""
|
||||
self._task.add_tags(tags)
|
||||
|
||||
def add_files(
|
||||
self,
|
||||
path, # type: Union[str, Path, _Path]
|
||||
@ -282,13 +294,13 @@ class Dataset(object):
|
||||
A few examples:
|
||||
- Add file.jpg to the dataset. When retrieving a copy of the entire dataset (see dataset.get_local_copy()).
|
||||
This file will be located in "./my_dataset/new_folder/file.jpg".
|
||||
add_external_files(source_url="s3://my_bucket/stuff/file.jpg", dataset_path="/my_dataset/new_folder/")
|
||||
add_external_files(source_url="s3://my_bucket/stuff/file.jpg", target_dataset_folder="/my_dataset/new_folder/")
|
||||
- Add all jpg files located in s3 bucket called "my_bucket" to the dataset.
|
||||
add_external_files(source_url="s3://my/bucket/", wildcard = "*.jpg", dataset_path="/my_dataset/new_folder/")
|
||||
add_external_files(source_url="s3://my/bucket/", wildcard = "*.jpg",target_dataset_folder="/my_dataset/new_folder/")
|
||||
- Add the entire content of "remote_folder" to the dataset.
|
||||
add_external_files(source_url="s3://bucket/remote_folder/", dataset_path="/my_dataset/new_folder/")
|
||||
add_external_files(source_url="s3://bucket/remote_folder/", target_dataset_folder="/my_dataset/new_folder/")
|
||||
- Add the local file "/folder/local_file.jpg" to the dataset.
|
||||
add_external_files(source_url="file:///folder/local_file.jpg", dataset_path="/my_dataset/new_folder/")
|
||||
add_external_files(source_url="file:///folder/local_file.jpg", target_dataset_folder="/my_dataset/new_folder/")
|
||||
|
||||
:param source_url: Source url link to add to the dataset,
|
||||
e.g. s3://bucket/folder/path, s3://bucket/folder/file.csv
|
||||
@ -485,8 +497,10 @@ class Dataset(object):
|
||||
|
||||
return num_removed, num_added, num_modified
|
||||
|
||||
def upload(self, show_progress=True, verbose=False, output_url=None, compression=None, chunk_size=None):
|
||||
# type: (bool, bool, Optional[str], Optional[str], int) -> ()
|
||||
def upload(
|
||||
self, show_progress=True, verbose=False, output_url=None, compression=None, chunk_size=None, max_workers=None
|
||||
):
|
||||
# type: (bool, bool, Optional[str], Optional[str], int, Optional[int]) -> ()
|
||||
"""
|
||||
Start file uploading, the function returns when all files are uploaded.
|
||||
|
||||
@ -498,138 +512,98 @@ class Dataset(object):
|
||||
:param chunk_size: Artifact chunk size (MB) for the compressed dataset,
|
||||
if not provided (None) use the default chunk size (512mb).
|
||||
If -1 is provided, use a single zip artifact for the entire dataset change-set (old behaviour)
|
||||
:param max_workers: Numbers of threads to be spawned when zipping and uploading the files.
|
||||
Defaults to the number of logical cores.
|
||||
"""
|
||||
if not max_workers:
|
||||
max_workers = psutil.cpu_count()
|
||||
|
||||
# set output_url
|
||||
if output_url:
|
||||
self._task.output_uri = output_url
|
||||
|
||||
self._task.get_logger().report_text(
|
||||
'Uploading dataset files: {}'.format(
|
||||
dict(show_progress=show_progress, verbose=verbose, output_url=output_url, compression=compression)),
|
||||
print_console=False)
|
||||
"Uploading dataset files: {}".format(
|
||||
dict(show_progress=show_progress, verbose=verbose, output_url=output_url, compression=compression)
|
||||
),
|
||||
print_console=False,
|
||||
)
|
||||
|
||||
list_zipped_artifacts = [] # List[Tuple[Path, int, str, str]]
|
||||
list_file_entries = list(self._dataset_file_entries.values())
|
||||
total_size = 0
|
||||
chunks_count = 0
|
||||
keep_as_file_entry = set()
|
||||
chunk_size = int(self._dataset_chunk_size_mb if not chunk_size else chunk_size)
|
||||
try:
|
||||
from tqdm import tqdm # noqa
|
||||
a_tqdm = tqdm(total=len(list_file_entries))
|
||||
except ImportError:
|
||||
a_tqdm = None
|
||||
|
||||
while list_file_entries:
|
||||
fd, zip_file = mkstemp(
|
||||
prefix='dataset.{}.'.format(self._id), suffix='.zip'
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as pool:
|
||||
parallel_zipper = ParallelZipper(
|
||||
chunk_size,
|
||||
max_workers,
|
||||
allow_zip_64=True,
|
||||
compression=compression or ZIP_DEFLATED,
|
||||
zip_prefix="dataset.{}.".format(self._id),
|
||||
zip_suffix=".zip",
|
||||
verbose=verbose,
|
||||
task=self._task,
|
||||
pool=pool,
|
||||
)
|
||||
archive_preview = ''
|
||||
count = 0
|
||||
processed = 0
|
||||
zip_file = Path(zip_file)
|
||||
print('{}Compressing local files, chunk {} [remaining {} files]'.format(
|
||||
'\n' if a_tqdm else '', 1+len(list_zipped_artifacts), len(list_file_entries)))
|
||||
try:
|
||||
with ZipFile(zip_file.as_posix(), 'w', allowZip64=True, compression=compression or ZIP_DEFLATED) as zf:
|
||||
for file_entry in list_file_entries:
|
||||
processed += 1
|
||||
if a_tqdm:
|
||||
a_tqdm.update()
|
||||
if not file_entry.local_path:
|
||||
# file is already in an uploaded artifact
|
||||
continue
|
||||
filename = Path(file_entry.local_path)
|
||||
if not filename.is_file():
|
||||
LoggerRoot.get_base_logger().warning(
|
||||
"Could not store dataset file {}. File skipped".format(file_entry.local_path))
|
||||
# mark for removal
|
||||
file_entry.relative_path = None
|
||||
continue
|
||||
if verbose:
|
||||
self._task.get_logger().report_text('Compressing {}'.format(filename.as_posix()))
|
||||
|
||||
relative_file_name = file_entry.relative_path
|
||||
zf.write(filename.as_posix(), arcname=relative_file_name)
|
||||
archive_preview += '{} - {}\n'.format(
|
||||
relative_file_name, format_size(filename.stat().st_size))
|
||||
file_entry.artifact_name = self._data_artifact_name
|
||||
count += 1
|
||||
|
||||
# limit the size of a single artifact
|
||||
if chunk_size > 0 and zip_file.stat().st_size >= chunk_size * (1024**2):
|
||||
break
|
||||
except Exception as e:
|
||||
# failed uploading folder:
|
||||
LoggerRoot.get_base_logger().warning(
|
||||
'Exception {}\nFailed zipping dataset.'.format(e))
|
||||
return False
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
if not count:
|
||||
zip_file.unlink()
|
||||
else:
|
||||
total_size += zip_file.stat().st_size
|
||||
# update the artifact preview
|
||||
archive_preview = "Dataset archive content [{} files]:\n".format(count) + archive_preview
|
||||
# add into the list
|
||||
list_zipped_artifacts += [(zip_file, count, archive_preview, self._data_artifact_name)]
|
||||
# let's see what's left
|
||||
list_file_entries = list_file_entries[processed:]
|
||||
# next artifact name to use
|
||||
self._data_artifact_name = self._get_next_data_artifact_name(self._data_artifact_name)
|
||||
|
||||
if a_tqdm:
|
||||
a_tqdm.close()
|
||||
file_paths = []
|
||||
arcnames = {}
|
||||
for f in self._dataset_file_entries.values():
|
||||
if not f.local_path:
|
||||
keep_as_file_entry.add(f.relative_path)
|
||||
continue
|
||||
file_paths.append(f.local_path)
|
||||
arcnames[f.local_path] = f.relative_path
|
||||
for zip_ in parallel_zipper.zip_iter(file_paths, arcnames=arcnames):
|
||||
zip_path = Path(zip_.zip_path)
|
||||
artifact_name = self._data_artifact_name
|
||||
self._data_artifact_name = self._get_next_data_artifact_name(self._data_artifact_name)
|
||||
self._task.get_logger().report_text(
|
||||
"Uploading dataset changes ({} files compressed to {}) to {}".format(
|
||||
zip_.count, format_size(zip_.size), self.get_default_storage()
|
||||
)
|
||||
)
|
||||
total_size += zip_.size
|
||||
chunks_count += 1
|
||||
pool.submit(
|
||||
self._task.upload_artifact,
|
||||
name=artifact_name,
|
||||
artifact_object=Path(zip_path),
|
||||
preview=zip_.archive_preview,
|
||||
delete_after_upload=True,
|
||||
wait_on_upload=True,
|
||||
)
|
||||
for file_entry in self._dataset_file_entries.values():
|
||||
if file_entry.local_path is not None and Path(file_entry.local_path).as_posix() in zip_.files_zipped:
|
||||
keep_as_file_entry.add(file_entry.relative_path)
|
||||
file_entry.artifact_name = artifact_name
|
||||
if file_entry.parent_dataset_id == self._id:
|
||||
file_entry.local_path = None
|
||||
self._serialize()
|
||||
|
||||
self._task.get_logger().report_text(
|
||||
"File compression completed: total size {}, {} chunked stored (average size {})".format(
|
||||
"File compression and upload completed: total size {}, {} chunked stored (average size {})".format(
|
||||
format_size(total_size),
|
||||
len(list_zipped_artifacts),
|
||||
format_size(0 if len(list_zipped_artifacts) == 0 else total_size / len(list_zipped_artifacts)),
|
||||
chunks_count,
|
||||
format_size(0 if chunks_count == 0 else total_size / chunks_count),
|
||||
)
|
||||
)
|
||||
|
||||
if not list_zipped_artifacts:
|
||||
LoggerRoot.get_base_logger().info('No pending files, skipping upload.')
|
||||
if chunks_count == 0:
|
||||
LoggerRoot.get_base_logger().info("No pending files, skipping upload.")
|
||||
self._dirty = False
|
||||
self._serialize()
|
||||
return True
|
||||
|
||||
for i, (zip_file, count, archive_preview, artifact_name) in enumerate(list_zipped_artifacts):
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
# let's try to rename it
|
||||
new_zip_file = zip_file.parent / 'dataset.{}.zip'.format(self._id)
|
||||
zip_file.rename(new_zip_file)
|
||||
zip_file = new_zip_file
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# start upload
|
||||
zip_file_size = format_size(Path(zip_file).stat().st_size)
|
||||
self._task.get_logger().report_text(
|
||||
'Uploading compressed dataset changes {}/{} ({} files {}) to {}'.format(
|
||||
i+1, len(list_zipped_artifacts), count, zip_file_size, self.get_default_storage()))
|
||||
|
||||
self._task.upload_artifact(
|
||||
name=artifact_name, artifact_object=Path(zip_file), preview=archive_preview,
|
||||
delete_after_upload=True, wait_on_upload=True)
|
||||
|
||||
# mark as upload completed and serialize
|
||||
for file_entry in self._dataset_file_entries.values():
|
||||
if file_entry.parent_dataset_id == self._id and file_entry.artifact_name == artifact_name:
|
||||
file_entry.local_path = None
|
||||
# serialize current state
|
||||
self._serialize()
|
||||
|
||||
# remove files that could not be zipped, containing Null relative Path
|
||||
# remove files that could not be zipped
|
||||
self._dataset_file_entries = {
|
||||
k: v for k, v in self._dataset_file_entries.items() if v.relative_path is not None}
|
||||
# report upload completed
|
||||
self._task.get_logger().report_text('Upload completed ({})'.format(format_size(total_size)))
|
||||
k: v for k, v in self._dataset_file_entries.items() if v.relative_path in keep_as_file_entry
|
||||
}
|
||||
|
||||
# report upload completed
|
||||
self._add_script_call(
|
||||
'upload', show_progress=show_progress, verbose=verbose, output_url=output_url, compression=compression)
|
||||
"upload", show_progress=show_progress, verbose=verbose, output_url=output_url, compression=compression
|
||||
)
|
||||
|
||||
self._dirty = False
|
||||
self._serialize()
|
||||
@ -706,8 +680,8 @@ class Dataset(object):
|
||||
return self._task.get_status() not in (
|
||||
Task.TaskStatusEnum.in_progress, Task.TaskStatusEnum.created, Task.TaskStatusEnum.failed)
|
||||
|
||||
def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_on_error=True):
|
||||
# type: (bool, Optional[int], Optional[int], bool) -> str
|
||||
def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_on_error=True, max_workers=None):
|
||||
# type: (bool, Optional[int], Optional[int], bool, Optional[int]) -> str
|
||||
"""
|
||||
return a base folder with a read-only (immutable) local copy of the entire dataset
|
||||
download and copy / soft-link, files from all the parent dataset versions
|
||||
@ -725,20 +699,31 @@ class Dataset(object):
|
||||
and `num_parts=5`, the chunk index used per parts would be:
|
||||
part=0 -> chunks[0,5], part=1 -> chunks[1,6], part=2 -> chunks[2,7], part=3 -> chunks[3, ]
|
||||
:param raise_on_error: If True raise exception if dataset merging failed on any file
|
||||
:param max_workers: Number of threads to be spawned when getting the dataset copy. Defaults
|
||||
to the number of logical cores.
|
||||
|
||||
:return: A base folder for the entire dataset
|
||||
"""
|
||||
assert self._id
|
||||
if not self._task:
|
||||
self._task = Task.get_task(task_id=self._id)
|
||||
if not max_workers:
|
||||
max_workers = psutil.cpu_count()
|
||||
|
||||
# now let's merge the parents
|
||||
target_folder = self._merge_datasets(
|
||||
use_soft_links=use_soft_links, raise_on_error=raise_on_error, part=part, num_parts=num_parts)
|
||||
use_soft_links=use_soft_links,
|
||||
raise_on_error=raise_on_error,
|
||||
part=part,
|
||||
num_parts=num_parts,
|
||||
max_workers=max_workers,
|
||||
)
|
||||
return target_folder
|
||||
|
||||
def get_mutable_local_copy(self, target_folder, overwrite=False, part=None, num_parts=None, raise_on_error=True):
|
||||
# type: (Union[Path, _Path, str], bool, Optional[int], Optional[int], bool) -> Optional[str]
|
||||
def get_mutable_local_copy(
|
||||
self, target_folder, overwrite=False, part=None, num_parts=None, raise_on_error=True, max_workers=None
|
||||
):
|
||||
# type: (Union[Path, _Path, str], bool, Optional[int], Optional[int], bool, Optional[int]) -> Optional[str]
|
||||
"""
|
||||
return a base folder with a writable (mutable) local copy of the entire dataset
|
||||
download and copy / soft-link, files from all the parent dataset versions
|
||||
@ -758,10 +743,14 @@ class Dataset(object):
|
||||
and `num_parts=5`, the chunk index used per parts would be:
|
||||
part=0 -> chunks[0,5], part=1 -> chunks[1,6], part=2 -> chunks[2,7], part=3 -> chunks[3, ]
|
||||
:param raise_on_error: If True raise exception if dataset merging failed on any file
|
||||
:param max_workers: Number of threads to be spawned when getting the dataset copy. Defaults
|
||||
to the number of logical cores.
|
||||
|
||||
:return: The target folder containing the entire dataset
|
||||
"""
|
||||
assert self._id
|
||||
if not max_workers:
|
||||
max_workers = psutil.cpu_count()
|
||||
target_folder = Path(target_folder).absolute()
|
||||
target_folder.mkdir(parents=True, exist_ok=True)
|
||||
# noinspection PyBroadException
|
||||
@ -775,7 +764,9 @@ class Dataset(object):
|
||||
return None
|
||||
shutil.rmtree(target_folder.as_posix())
|
||||
|
||||
ro_folder = self.get_local_copy(part=part, num_parts=num_parts, raise_on_error=raise_on_error)
|
||||
ro_folder = self.get_local_copy(
|
||||
part=part, num_parts=num_parts, raise_on_error=raise_on_error, max_workers=max_workers
|
||||
)
|
||||
shutil.copytree(ro_folder, target_folder.as_posix(), symlinks=False)
|
||||
return target_folder.as_posix()
|
||||
|
||||
@ -957,7 +948,7 @@ class Dataset(object):
|
||||
|
||||
return None
|
||||
|
||||
pool = ThreadPool(cpu_count() * 2)
|
||||
pool = ThreadPool(psutil.cpu_count())
|
||||
matching_errors = pool.map(compare, self._dataset_file_entries.values())
|
||||
pool.close()
|
||||
return [f.relative_path for f in matching_errors if f is not None]
|
||||
@ -1377,7 +1368,7 @@ class Dataset(object):
|
||||
for f in file_entries
|
||||
]
|
||||
self._task.get_logger().report_text('Generating SHA2 hash for {} files'.format(len(file_entries)))
|
||||
pool = ThreadPool(cpu_count() * 2)
|
||||
pool = ThreadPool(psutil.cpu_count())
|
||||
try:
|
||||
import tqdm # noqa
|
||||
for _ in tqdm.tqdm(pool.imap_unordered(self._calc_file_hash, file_entries), total=len(file_entries)):
|
||||
@ -1514,8 +1505,9 @@ class Dataset(object):
|
||||
lock_target_folder=False,
|
||||
cleanup_target_folder=True,
|
||||
target_folder=None,
|
||||
max_workers=None
|
||||
):
|
||||
# type: (bool, Optional[List[int]], bool, bool, Optional[Path]) -> str
|
||||
# type: (bool, Optional[List[int]], bool, bool, Optional[Path], Optional[int]) -> str
|
||||
"""
|
||||
First, extracts the archive present on the ClearML server containing this dataset's files.
|
||||
Then, download the remote files. Note that if a remote file was added to the ClearML server, then
|
||||
@ -1530,15 +1522,20 @@ class Dataset(object):
|
||||
Notice you should unlock it manually, or wait for the process to finish for auto unlocking.
|
||||
:param cleanup_target_folder: If True remove target folder recursively
|
||||
:param target_folder: If provided use the specified target folder, default, auto generate from Dataset ID.
|
||||
:param max_workers: Number of threads to be spawned when getting dataset files. Defaults
|
||||
to the number of virtual cores.
|
||||
|
||||
:return: Path to the local storage where the data was downloaded
|
||||
"""
|
||||
if not max_workers:
|
||||
max_workers = psutil.cpu_count()
|
||||
local_folder = self._extract_dataset_archive(
|
||||
force=force,
|
||||
selected_chunks=selected_chunks,
|
||||
lock_target_folder=lock_target_folder,
|
||||
cleanup_target_folder=cleanup_target_folder,
|
||||
target_folder=target_folder,
|
||||
max_workers=max_workers
|
||||
)
|
||||
self._download_external_files(
|
||||
target_folder=target_folder, lock_target_folder=lock_target_folder
|
||||
@ -1612,8 +1609,9 @@ class Dataset(object):
|
||||
lock_target_folder=False,
|
||||
cleanup_target_folder=True,
|
||||
target_folder=None,
|
||||
max_workers=None
|
||||
):
|
||||
# type: (bool, Optional[List[int]], bool, bool, Optional[Path]) -> str
|
||||
# type: (bool, Optional[List[int]], bool, bool, Optional[Path], Optional[int]) -> str
|
||||
"""
|
||||
Download the dataset archive, and extract the zip content to a cached folder.
|
||||
Notice no merging is done.
|
||||
@ -1626,6 +1624,7 @@ class Dataset(object):
|
||||
Notice you should unlock it manually, or wait for the process to finish for auto unlocking.
|
||||
:param cleanup_target_folder: If True remove target folder recursively
|
||||
:param target_folder: If provided use the specified target folder, default, auto generate from Dataset ID.
|
||||
:param max_workers: Number of threads to be spawned when downloading and extracting the archives
|
||||
|
||||
:return: Path to a local storage extracted archive
|
||||
"""
|
||||
@ -1634,6 +1633,9 @@ class Dataset(object):
|
||||
if not self._task:
|
||||
self._task = Task.get_task(task_id=self._id)
|
||||
|
||||
if not max_workers:
|
||||
max_workers = psutil.cpu_count()
|
||||
|
||||
data_artifact_entries = self._get_data_artifact_names()
|
||||
|
||||
if selected_chunks is not None and data_artifact_entries:
|
||||
@ -1671,9 +1673,15 @@ class Dataset(object):
|
||||
StorageManager._extract_to_cache(
|
||||
cached_file=local_zip, name=self._id,
|
||||
cache_context=self.__cache_context, target_folder=local_folder, force=True)
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
Path(local_zip).unlink()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for d in data_artifact_entries:
|
||||
_download_part(d)
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as pool:
|
||||
for d in data_artifact_entries:
|
||||
pool.submit(_download_part, d)
|
||||
|
||||
return local_folder
|
||||
|
||||
@ -1706,8 +1714,8 @@ class Dataset(object):
|
||||
numbers = sorted([int(a[prefix_len:]) for a in data_artifact_entries if a.startswith(prefix)])
|
||||
return '{}{:03d}'.format(prefix, numbers[-1]+1 if numbers else 1)
|
||||
|
||||
def _merge_datasets(self, use_soft_links=None, raise_on_error=True, part=None, num_parts=None):
|
||||
# type: (bool, bool, Optional[int], Optional[int]) -> str
|
||||
def _merge_datasets(self, use_soft_links=None, raise_on_error=True, part=None, num_parts=None, max_workers=None):
|
||||
# type: (bool, bool, Optional[int], Optional[int], Optional[int]) -> str
|
||||
"""
|
||||
download and copy / soft-link, files from all the parent dataset versions
|
||||
:param use_soft_links: If True use soft links, default False on windows True on Posix systems
|
||||
@ -1720,12 +1728,17 @@ class Dataset(object):
|
||||
requested number of parts. Notice that the actual chunks used per part are rounded down.
|
||||
Example: Assuming 8 chunks on this version, and `num_parts=5`, the chunk index used per parts would be:
|
||||
part=0 -> chunks[0,5], part=1 -> chunks[1,6], part=2 -> chunks[2,7], part=3 -> chunks[3, ]
|
||||
:param max_workers: Number of threads to be spawned when merging datasets. Defaults to the number
|
||||
of logical cores.
|
||||
|
||||
:return: the target folder
|
||||
"""
|
||||
assert part is None or (isinstance(part, int) and part >= 0)
|
||||
assert num_parts is None or (isinstance(num_parts, int) and num_parts >= 1)
|
||||
|
||||
if max_workers is None:
|
||||
max_workers = psutil.cpu_count()
|
||||
|
||||
if use_soft_links is None:
|
||||
use_soft_links = False if is_windows() else True
|
||||
|
||||
@ -1762,6 +1775,7 @@ class Dataset(object):
|
||||
selected_chunks=chunk_selection.get(self._id) if chunk_selection else None,
|
||||
cleanup_target_folder=True,
|
||||
target_folder=target_base_folder,
|
||||
max_workers=max_workers
|
||||
)
|
||||
dependencies_by_order.remove(self._id)
|
||||
|
||||
@ -2108,12 +2122,14 @@ class Dataset(object):
|
||||
chunk_selection,
|
||||
use_soft_links,
|
||||
raise_on_error,
|
||||
force
|
||||
force,
|
||||
max_workers=None
|
||||
):
|
||||
# type: (Path, List[str], Optional[dict], bool, bool, bool) -> ()
|
||||
# type: (Path, List[str], dict, bool, bool, bool, Optional[int]) -> ()
|
||||
# create thread pool, for creating soft-links / copying
|
||||
# todo: parallelize by parent datasets
|
||||
pool = ThreadPool(cpu_count() * 2)
|
||||
if not max_workers:
|
||||
max_workers = psutil.cpu_count()
|
||||
pool = ThreadPool(max_workers)
|
||||
for dataset_version_id in dependencies_by_order:
|
||||
# make sure we skip over empty dependencies
|
||||
if dataset_version_id not in self._dependency_graph:
|
||||
@ -2126,6 +2142,7 @@ class Dataset(object):
|
||||
force=force,
|
||||
lock_target_folder=True,
|
||||
cleanup_target_folder=False,
|
||||
max_workers=max_workers
|
||||
))
|
||||
ds_base_folder.touch()
|
||||
|
||||
|
@ -77,7 +77,7 @@ from .utilities.seed import make_deterministic
|
||||
from .utilities.lowlevel.threads import get_current_thread_id
|
||||
from .utilities.process.mp import BackgroundMonitor, leave_process
|
||||
from .utilities.matching import matches_any_wildcard
|
||||
from .utilities.future_caller import FutureTaskCaller
|
||||
from .utilities.parallel import FutureTaskCaller
|
||||
# noinspection PyProtectedMember
|
||||
from .backend_interface.task.args import _Arguments
|
||||
|
||||
|
@ -1,176 +0,0 @@
|
||||
from copy import deepcopy
|
||||
from time import sleep
|
||||
|
||||
from six.moves.queue import Queue, Empty
|
||||
from threading import Thread
|
||||
from typing import Any, Callable, Optional, Type
|
||||
|
||||
|
||||
class _DeferredClass(object):
|
||||
__slots__ = ('__queue', '__future_caller', '__future_func')
|
||||
|
||||
def __init__(self, a_future_caller, future_func):
|
||||
self.__queue = Queue()
|
||||
self.__future_caller = a_future_caller
|
||||
self.__future_func = future_func
|
||||
|
||||
def __nested_caller(self, item, args, kwargs):
|
||||
# wait until object is constructed
|
||||
getattr(self.__future_caller, "id") # noqa
|
||||
|
||||
future_func = getattr(self.__future_caller, self.__future_func)
|
||||
the_object = future_func()
|
||||
the_object_func = getattr(the_object, item)
|
||||
return the_object_func(*args, **kwargs)
|
||||
|
||||
def _flush_into_logger(self, a_future_object=None, a_future_func=None):
|
||||
self.__close_queue(a_future_object=a_future_object, a_future_func=a_future_func)
|
||||
|
||||
def __close_queue(self, a_future_object=None, a_future_func=None):
|
||||
# call this function when we Know the object is initialization is completed
|
||||
if self.__queue is None:
|
||||
return
|
||||
|
||||
_queue = self.__queue
|
||||
self.__queue = None
|
||||
while True:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
item, args, kwargs = _queue.get(block=False)
|
||||
if a_future_object:
|
||||
future_func = getattr(a_future_object, self.__future_func)
|
||||
the_object = future_func()
|
||||
the_object_func = getattr(the_object, item)
|
||||
the_object_func(*args, **kwargs)
|
||||
elif a_future_func:
|
||||
the_object_func = getattr(a_future_func, item)
|
||||
the_object_func(*args, **kwargs)
|
||||
else:
|
||||
self.__nested_caller(item, args, kwargs)
|
||||
except Empty:
|
||||
break
|
||||
except Exception:
|
||||
# import traceback
|
||||
# stdout_print(''.join(traceback.format_exc()))
|
||||
pass
|
||||
|
||||
def __getattr__(self, item):
|
||||
def _caller(*args, **kwargs):
|
||||
# if we already completed the background initialization, call functions immediately
|
||||
# noinspection PyProtectedMember
|
||||
if not self.__queue or self.__future_caller._FutureTaskCaller__executor is None:
|
||||
return self.__nested_caller(item, args, kwargs)
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
# if pool is still active call async
|
||||
self.__queue.put((item, deepcopy(args) if args else args, deepcopy(kwargs) if kwargs else kwargs))
|
||||
except Exception:
|
||||
# assume we wait only if self.__pool was nulled between the if and now, so just call directly
|
||||
return self.__nested_caller(item, args, kwargs)
|
||||
|
||||
# let's hope it is the right one
|
||||
return True
|
||||
|
||||
return _caller
|
||||
|
||||
|
||||
class FutureTaskCaller(object):
|
||||
"""
|
||||
FutureTaskCaller is used to create a class via a functions async, in another thread.
|
||||
|
||||
For example:
|
||||
|
||||
.. code-block:: py
|
||||
|
||||
future = FutureTaskCaller().call(func=max, func_cb=None, override_cls=None, 1, 2)
|
||||
print('Running other code')
|
||||
print(future.result()) # will print '2'
|
||||
"""
|
||||
__slots__ = ('__object', '__object_cls', '__executor', '__deferred_bkg_class')
|
||||
|
||||
@property
|
||||
def __class__(self):
|
||||
return self.__object_cls
|
||||
|
||||
def __init__(self, func, func_cb, override_cls, *args, **kwargs):
|
||||
# type: (Callable, Optional[Callable], Type, *Any, **Any) -> None
|
||||
"""
|
||||
__init__(*args, **kwargs) in another thread
|
||||
|
||||
:return: This FutureTaskCaller instance
|
||||
"""
|
||||
self.__object = None
|
||||
self.__object_cls = override_cls
|
||||
self.__deferred_bkg_class = _DeferredClass(self, "get_logger")
|
||||
|
||||
self.__executor = Thread(target=self.__submit__, args=(func, func_cb, args, kwargs))
|
||||
self.__executor.daemon = True
|
||||
self.__executor.start()
|
||||
|
||||
def __submit__(self, fn, fn_cb, args, kwargs):
|
||||
# background initialization call
|
||||
_object = fn(*args, **kwargs)
|
||||
|
||||
# push all background calls (now that the initialization is complete)
|
||||
if self.__deferred_bkg_class:
|
||||
_deferred_bkg_class = self.__deferred_bkg_class
|
||||
self.__deferred_bkg_class = None
|
||||
# noinspection PyProtectedMember
|
||||
_deferred_bkg_class._flush_into_logger(a_future_object=_object)
|
||||
|
||||
# store the initialized object
|
||||
self.__object = _object
|
||||
# callback function
|
||||
if fn_cb is not None:
|
||||
fn_cb(self.__object)
|
||||
|
||||
def __getattr__(self, item):
|
||||
# if we get here, by definition this is not a __slot__ entry, pass to the object
|
||||
return getattr(self.__result__(), item)
|
||||
|
||||
def __setattr__(self, item, value):
|
||||
# make sure we can set the slots
|
||||
if item in ["_FutureTaskCaller__executor", "_FutureTaskCaller__object",
|
||||
"_FutureTaskCaller__object_cls", "_FutureTaskCaller__deferred_bkg_class"]:
|
||||
return super(FutureTaskCaller, self).__setattr__(item, value)
|
||||
|
||||
setattr(self.__result__(), item, value)
|
||||
|
||||
def __result__(self, timeout=None):
|
||||
# type: (Optional[float]) -> Any
|
||||
"""
|
||||
Wait and get the result of the function called with self.call()
|
||||
|
||||
:param timeout: The maximum number of seconds to wait for the result. If None,
|
||||
there is no limit for the wait time.
|
||||
|
||||
:return: The result of the called function
|
||||
"""
|
||||
if self.__executor:
|
||||
# since the test is not atomic, we assume that if we failed joining
|
||||
# it is because someone else joined before us
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.__executor.join(timeout=timeout)
|
||||
except RuntimeError:
|
||||
# this is probably calling ourselves from the same thread
|
||||
raise
|
||||
except Exception:
|
||||
# wait until that someone else updated the __object
|
||||
while self.__object is None:
|
||||
sleep(1)
|
||||
self.__executor = None
|
||||
return self.__object
|
||||
|
||||
# This is the part where we are no longer generic, but __slots__
|
||||
# inheritance is too cumbersome to actually inherit and make sure it works optimally
|
||||
def get_logger(self):
|
||||
if self.__object is not None:
|
||||
return self.__object.get_logger()
|
||||
|
||||
if self.__deferred_bkg_class is None:
|
||||
# we are shutting down, wait until object is available
|
||||
return self.__result__().get_logger()
|
||||
|
||||
return self.__deferred_bkg_class
|
469
clearml/utilities/parallel.py
Normal file
469
clearml/utilities/parallel.py
Normal file
@ -0,0 +1,469 @@
|
||||
import os
|
||||
|
||||
from copy import deepcopy
|
||||
from time import sleep
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Any, Callable, Optional, Union, Generator, List, Type
|
||||
from zipfile import ZipFile, ZIP_DEFLATED
|
||||
from six.moves.queue import PriorityQueue, Queue, Empty
|
||||
from pathlib2 import Path
|
||||
from tempfile import mkstemp
|
||||
from collections import deque
|
||||
from threading import Thread
|
||||
|
||||
from ..debugging.log import LoggerRoot
|
||||
from ..storage.util import format_size
|
||||
|
||||
|
||||
class _DeferredClass(object):
|
||||
__slots__ = ('__queue', '__future_caller', '__future_func')
|
||||
|
||||
def __init__(self, a_future_caller, future_func):
|
||||
self.__queue = Queue()
|
||||
self.__future_caller = a_future_caller
|
||||
self.__future_func = future_func
|
||||
|
||||
def __nested_caller(self, item, args, kwargs):
|
||||
# wait until object is constructed
|
||||
getattr(self.__future_caller, "id") # noqa
|
||||
|
||||
future_func = getattr(self.__future_caller, self.__future_func)
|
||||
the_object = future_func()
|
||||
the_object_func = getattr(the_object, item)
|
||||
return the_object_func(*args, **kwargs)
|
||||
|
||||
def _flush_into_logger(self, a_future_object=None, a_future_func=None):
|
||||
self.__close_queue(a_future_object=a_future_object, a_future_func=a_future_func)
|
||||
|
||||
def __close_queue(self, a_future_object=None, a_future_func=None):
|
||||
# call this function when we Know the object is initialization is completed
|
||||
if self.__queue is None:
|
||||
return
|
||||
|
||||
_queue = self.__queue
|
||||
self.__queue = None
|
||||
while True:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
item, args, kwargs = _queue.get(block=False)
|
||||
if a_future_object:
|
||||
future_func = getattr(a_future_object, self.__future_func)
|
||||
the_object = future_func()
|
||||
the_object_func = getattr(the_object, item)
|
||||
the_object_func(*args, **kwargs)
|
||||
elif a_future_func:
|
||||
the_object_func = getattr(a_future_func, item)
|
||||
the_object_func(*args, **kwargs)
|
||||
else:
|
||||
self.__nested_caller(item, args, kwargs)
|
||||
except Empty:
|
||||
break
|
||||
except Exception:
|
||||
# import traceback
|
||||
# stdout_print(''.join(traceback.format_exc()))
|
||||
pass
|
||||
|
||||
def __getattr__(self, item):
|
||||
def _caller(*args, **kwargs):
|
||||
# if we already completed the background initialization, call functions immediately
|
||||
# noinspection PyProtectedMember
|
||||
if not self.__queue or self.__future_caller._FutureTaskCaller__executor is None:
|
||||
return self.__nested_caller(item, args, kwargs)
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
# if pool is still active call async
|
||||
self.__queue.put((item, deepcopy(args) if args else args, deepcopy(kwargs) if kwargs else kwargs))
|
||||
except Exception:
|
||||
# assume we wait only if self.__pool was nulled between the if and now, so just call directly
|
||||
return self.__nested_caller(item, args, kwargs)
|
||||
|
||||
# let's hope it is the right one
|
||||
return True
|
||||
|
||||
return _caller
|
||||
|
||||
|
||||
class FutureTaskCaller(object):
|
||||
"""
|
||||
FutureTaskCaller is used to create a class via a functions async, in another thread.
|
||||
|
||||
For example:
|
||||
|
||||
.. code-block:: py
|
||||
|
||||
future = FutureTaskCaller().call(func=max, func_cb=None, override_cls=None, 1, 2)
|
||||
print('Running other code')
|
||||
print(future.result()) # will print '2'
|
||||
"""
|
||||
__slots__ = ('__object', '__object_cls', '__executor', '__deferred_bkg_class')
|
||||
|
||||
@property
|
||||
def __class__(self):
|
||||
return self.__object_cls
|
||||
|
||||
def __init__(self, func, func_cb, override_cls, *args, **kwargs):
|
||||
# type: (Callable, Optional[Callable], Type, *Any, **Any) -> None
|
||||
"""
|
||||
__init__(*args, **kwargs) in another thread
|
||||
|
||||
:return: This FutureTaskCaller instance
|
||||
"""
|
||||
self.__object = None
|
||||
self.__object_cls = override_cls
|
||||
self.__deferred_bkg_class = _DeferredClass(self, "get_logger")
|
||||
|
||||
self.__executor = Thread(target=self.__submit__, args=(func, func_cb, args, kwargs))
|
||||
self.__executor.daemon = True
|
||||
self.__executor.start()
|
||||
|
||||
def __submit__(self, fn, fn_cb, args, kwargs):
|
||||
# background initialization call
|
||||
_object = fn(*args, **kwargs)
|
||||
|
||||
# push all background calls (now that the initialization is complete)
|
||||
if self.__deferred_bkg_class:
|
||||
_deferred_bkg_class = self.__deferred_bkg_class
|
||||
self.__deferred_bkg_class = None
|
||||
# noinspection PyProtectedMember
|
||||
_deferred_bkg_class._flush_into_logger(a_future_object=_object)
|
||||
|
||||
# store the initialized object
|
||||
self.__object = _object
|
||||
# callback function
|
||||
if fn_cb is not None:
|
||||
fn_cb(self.__object)
|
||||
|
||||
def __getattr__(self, item):
|
||||
# if we get here, by definition this is not a __slot__ entry, pass to the object
|
||||
return getattr(self.__result__(), item)
|
||||
|
||||
def __setattr__(self, item, value):
|
||||
# make sure we can set the slots
|
||||
if item in ["_FutureTaskCaller__executor", "_FutureTaskCaller__object",
|
||||
"_FutureTaskCaller__object_cls", "_FutureTaskCaller__deferred_bkg_class"]:
|
||||
return super(FutureTaskCaller, self).__setattr__(item, value)
|
||||
|
||||
setattr(self.__result__(), item, value)
|
||||
|
||||
def __result__(self, timeout=None):
|
||||
# type: (Optional[float]) -> Any
|
||||
"""
|
||||
Wait and get the result of the function called with self.call()
|
||||
|
||||
:param timeout: The maximum number of seconds to wait for the result. If None,
|
||||
there is no limit for the wait time.
|
||||
|
||||
:return: The result of the called function
|
||||
"""
|
||||
if self.__executor:
|
||||
# since the test is not atomic, we assume that if we failed joining
|
||||
# it is because someone else joined before us
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.__executor.join(timeout=timeout)
|
||||
except RuntimeError:
|
||||
# this is probably calling ourselves from the same thread
|
||||
raise
|
||||
except Exception:
|
||||
# wait until that someone else updated the __object
|
||||
while self.__object is None:
|
||||
sleep(1)
|
||||
self.__executor = None
|
||||
return self.__object
|
||||
|
||||
# This is the part where we are no longer generic, but __slots__
|
||||
# inheritance is too cumbersome to actually inherit and make sure it works optimally
|
||||
def get_logger(self):
|
||||
if self.__object is not None:
|
||||
return self.__object.get_logger()
|
||||
|
||||
if self.__deferred_bkg_class is None:
|
||||
# we are shutting down, wait until object is available
|
||||
return self.__result__().get_logger()
|
||||
|
||||
return self.__deferred_bkg_class
|
||||
|
||||
|
||||
class ParallelZipper(object):
|
||||
"""
|
||||
Used to zip multiple files in zip chunks of a particular size, all in parallel
|
||||
"""
|
||||
class ZipperObject(object):
|
||||
def __init__(
|
||||
self,
|
||||
chunk_size, # int
|
||||
zipper_queue, # PriorityQueue[ParallelZipper.ZipperObject]
|
||||
zipper_results, # Queue[ParallelZipper.ZipperObject]
|
||||
allow_zip_64, # bool
|
||||
compression, # Any
|
||||
zip_prefix, # str
|
||||
zip_suffix, # str
|
||||
verbose, # bool
|
||||
task, # Any
|
||||
):
|
||||
# (...) -> ParallelZipper.ZipperObject
|
||||
"""
|
||||
Initialize a ParallelZipper.ZipperObject instance that holds its corresponding zip
|
||||
file, as well as other relevant data
|
||||
|
||||
:param chunk_size: Chunk size, in MB. The ParallelZipper will try its best not to exceed this size
|
||||
when bulding up this zipper object, but that is not guaranteed
|
||||
:param zipper_queue: PriorityQueue that holds ParallelZipper.ZipperObject instances.
|
||||
When this ParallelZipper.ZipperObject can hold more data (i.e. chunk_size was not exceeded),
|
||||
this object will reinsert itself in this queue to be reused by the ParallelZipper.
|
||||
Else, a fresh ParallelZipper.ZipperObject will be inserted
|
||||
:param zipper_results: Queue that holds ParallelZipper.ZipperObject instances. These instances
|
||||
are added to this queue when chunk_size is exceeded
|
||||
:param allow_zip_64: if True ZipFile will create files with ZIP64 extensions when
|
||||
needed, otherwise it will raise an exception when this would be necessary
|
||||
:param compression: ZipFile.ZIP_STORED (no compression), ZipFile.ZIP_DEFLATED (requires zlib),
|
||||
ZipFile.ZIP_BZIP2 (requires bz2) or ZipFile.ZIP_LZMA (requires lzma).
|
||||
:param zip_prefix: The zip file created by this object will have its name prefixed by this
|
||||
:param zip_suffix: The zip file created by this object will have its name suffixed by this
|
||||
:param verbose: If True, print data relevant to the file compression
|
||||
:param task: ClearML Task, used for logging
|
||||
|
||||
:return: ParallelZipper.ZipperObject instance
|
||||
"""
|
||||
self._chunk_size = chunk_size
|
||||
self._zipper_queue = zipper_queue
|
||||
self._zipper_results = zipper_results
|
||||
self._allow_zip_64 = allow_zip_64
|
||||
self._compression = compression
|
||||
self._zip_prefix = zip_prefix
|
||||
self._zip_suffix = zip_suffix
|
||||
self._verbose = verbose
|
||||
self._task = task
|
||||
self.fd, self.zip_path = mkstemp(prefix=zip_prefix, suffix=zip_suffix)
|
||||
self.zip_path = Path(self.zip_path)
|
||||
self.zip_file = ZipFile(self.zip_path.as_posix(), "w", allowZip64=allow_zip_64, compression=compression)
|
||||
self.archive_preview = ""
|
||||
self.count = 0
|
||||
self.files_zipped = set()
|
||||
|
||||
def zip(self, file_path, arcname=None):
|
||||
# type: (Union[str, Path], str) -> ()
|
||||
"""
|
||||
Zips a file into the ZipFile created by this instance. This instance will either add
|
||||
itself back to the PriorityQueue used to select the best zipping candidate or add itself
|
||||
to the result Queue after exceeding self.chunk_size.
|
||||
|
||||
:param file_path: Path to the file to be zipped
|
||||
:param arcname: Name of the file in the archive
|
||||
"""
|
||||
if self._verbose and self._task:
|
||||
self._task.get_logger().report_text("Compressing {}".format(Path(file_path).as_posix()))
|
||||
self.zip_file.write(file_path, arcname=arcname)
|
||||
self.count += 1
|
||||
preview_path = arcname
|
||||
if not preview_path:
|
||||
preview_path = file_path
|
||||
self.archive_preview += "{} - {}\n".format(preview_path, format_size(self.size))
|
||||
self.files_zipped.add(Path(file_path).as_posix())
|
||||
if self._chunk_size <= 0 or self.size < self._chunk_size:
|
||||
self._zipper_queue.put(self)
|
||||
else:
|
||||
self._zipper_queue.put(
|
||||
ParallelZipper.ZipperObject(
|
||||
self._chunk_size,
|
||||
self._zipper_queue,
|
||||
self._zipper_results,
|
||||
self._allow_zip_64,
|
||||
self._compression,
|
||||
self._zip_prefix,
|
||||
self._zip_suffix,
|
||||
self._verbose,
|
||||
self._task,
|
||||
)
|
||||
)
|
||||
self._zipper_results.put(self)
|
||||
|
||||
def merge(self, other):
|
||||
# type: (ParallelZipper.ZipperObject) -> ()
|
||||
"""
|
||||
Merges one ParallelZipper.ZipperObject instance into the current one.
|
||||
All the files zipped by the other instance will be added to this instance,
|
||||
as well as any other useful additional data
|
||||
|
||||
:param other: ParallelZipper.ZipperObject instance to merge into this one
|
||||
"""
|
||||
with ZipFile(self.zip_path.as_posix(), "a") as parent_zip:
|
||||
with ZipFile(other.zip_path.as_posix(), "r") as child_zip:
|
||||
for child_name in child_zip.namelist():
|
||||
parent_zip.writestr(child_name, child_zip.open(child_name).read())
|
||||
self.files_zipped |= other.files_zipped
|
||||
self.count += other.count
|
||||
self.archive_preview += other.archive_preview
|
||||
|
||||
def close(self):
|
||||
# type: () -> ()
|
||||
"""
|
||||
Attempts to close file descriptors associated to the ZipFile
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.zip_file.close()
|
||||
os.close(self.fd)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def delete(self):
|
||||
# type: () -> ()
|
||||
"""
|
||||
Attempts to delete the ZipFile from the disk
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.close()
|
||||
self.zip_path.unlink()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@property
|
||||
def size(self):
|
||||
# type: () -> ()
|
||||
"""
|
||||
:return: Size of the ZipFile, in bytes
|
||||
"""
|
||||
return self.zip_path.stat().st_size
|
||||
|
||||
def __lt__(self, other):
|
||||
# we want to completely "fill" as many zip files as possible, hence the ">" comparison
|
||||
return self.size > other.size
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
chunk_size, # type: int
|
||||
max_workers, # type: int
|
||||
allow_zip_64=True, # type: Optional[bool]
|
||||
compression=ZIP_DEFLATED, # type: Optional[Any]
|
||||
zip_prefix="", # type: Optional[str]
|
||||
zip_suffix="", # type: Optional[str]
|
||||
verbose=False, # type: Optional[bool]
|
||||
task=None, # type: Optional[Any]
|
||||
pool=None, # type: Optional[ThreadPoolExecutor]
|
||||
):
|
||||
# type: (...) -> ParallelZipper
|
||||
"""
|
||||
Initialize the ParallelZipper. Each zip created by this object will have the following naming
|
||||
format: [zip_prefix]<random_string>[zip_suffix]
|
||||
|
||||
:param chunk_size: Chunk size, in MB. The ParallelZipper will try its best not to exceed this size,
|
||||
but that is not guaranteed
|
||||
:param max_workers: The maximum number of workers spawned when zipping the files
|
||||
:param allow_zip_64: if True ZipFile will create files with ZIP64 extensions when
|
||||
needed, otherwise it will raise an exception when this would be necessary
|
||||
:param compression: ZipFile.ZIP_STORED (no compression), ZipFile.ZIP_DEFLATED (requires zlib),
|
||||
ZipFile.ZIP_BZIP2 (requires bz2) or ZipFile.ZIP_LZMA (requires lzma).
|
||||
:param zip_prefix: Zip file names will be prefixed by this
|
||||
:param zip_suffix: Zip file names will pe suffixed by this
|
||||
:param verbose: If True, print data relevant to the file compression
|
||||
:param task: ClearML Task, used for logging
|
||||
:param pool: Use this ThreadPoolExecutor instead of creating one. Note that this pool will not be
|
||||
closed after zipping is finished.
|
||||
|
||||
:return: ParallelZipper instance
|
||||
"""
|
||||
self._chunk_size = chunk_size * (1024 ** 2)
|
||||
self._max_workers = max_workers
|
||||
self._allow_zip_64 = allow_zip_64
|
||||
self._compression = compression
|
||||
self._zip_prefix = zip_prefix
|
||||
self._zip_suffix = zip_suffix
|
||||
self._verbose = verbose
|
||||
self._task = task
|
||||
self._pool = pool
|
||||
self._zipper_queue = PriorityQueue()
|
||||
self._zipper_results = Queue()
|
||||
|
||||
def zip_iter(self, file_paths, arcnames={}):
|
||||
# type: (List[Union(str, Path)], Optional[dict[Union(str, Path), str]]) -> Generator[ParallelZipper.ZipperObject]
|
||||
"""
|
||||
Generator function that returns zip files as soon as they are available.
|
||||
The zipping is done in parallel
|
||||
|
||||
:param file_paths: List of paths to the files to zip
|
||||
:param arcnames: Dictionary that maps the file path to what should be its name in the archive.
|
||||
|
||||
:return: Generator of ParallelZipper.ZipperObjects
|
||||
"""
|
||||
while not self._zipper_queue.empty():
|
||||
self._zipper_queue.get_nowait()
|
||||
for _ in range(self._max_workers):
|
||||
self._zipper_queue.put(
|
||||
ParallelZipper.ZipperObject(
|
||||
self._chunk_size,
|
||||
self._zipper_queue,
|
||||
self._zipper_results,
|
||||
self._allow_zip_64,
|
||||
self._compression,
|
||||
self._zip_prefix,
|
||||
self._zip_suffix,
|
||||
self._verbose,
|
||||
self._task,
|
||||
)
|
||||
)
|
||||
filtered_file_paths = []
|
||||
for file_path in file_paths:
|
||||
if not Path(file_path).is_file():
|
||||
LoggerRoot.get_base_logger().warning("Could not store dataset file {}. File skipped".format(file_path))
|
||||
else:
|
||||
filtered_file_paths.append(file_path)
|
||||
file_paths = filtered_file_paths
|
||||
|
||||
file_paths = sorted(file_paths, key=lambda k: Path(k).stat().st_size, reverse=True)
|
||||
# zip in parallel
|
||||
pooled = []
|
||||
if not self._pool:
|
||||
pool = ThreadPoolExecutor(max_workers=self._max_workers)
|
||||
else:
|
||||
pool = self._pool
|
||||
for f in file_paths:
|
||||
zipper = self._zipper_queue.get()
|
||||
pooled.append(pool.submit(zipper.zip, Path(f).as_posix(), arcname=arcnames.get(f)))
|
||||
for result in self._yield_zipper_results():
|
||||
yield result
|
||||
for task in pooled:
|
||||
task.result()
|
||||
if not self._pool:
|
||||
pool.close()
|
||||
|
||||
for result in self._yield_zipper_results():
|
||||
yield result
|
||||
|
||||
zipper_results_leftover = []
|
||||
|
||||
# extract remaining results
|
||||
while not self._zipper_queue.empty():
|
||||
result = self._zipper_queue.get()
|
||||
if result.count != 0:
|
||||
zipper_results_leftover.append(result)
|
||||
else:
|
||||
result.delete()
|
||||
zipper_results_leftover = deque(sorted(zipper_results_leftover, reverse=True))
|
||||
|
||||
# merge zip files greedily if possible and get the paths as results
|
||||
while len(zipper_results_leftover) > 0:
|
||||
zip_ = zipper_results_leftover.pop()
|
||||
zip_.close()
|
||||
if zip_.size >= self._chunk_size > 0:
|
||||
yield zip_
|
||||
continue
|
||||
while len(zipper_results_leftover) > 0 and (
|
||||
self._chunk_size <= 0 or zipper_results_leftover[0].size + zip_.size < self._chunk_size
|
||||
):
|
||||
child_zip = zipper_results_leftover.popleft()
|
||||
child_zip.close()
|
||||
zip_.merge(child_zip)
|
||||
child_zip.delete()
|
||||
yield zip_
|
||||
|
||||
def _yield_zipper_results(self):
|
||||
while True:
|
||||
try:
|
||||
result = self._zipper_results.get_nowait()
|
||||
result.close()
|
||||
yield result
|
||||
except Empty:
|
||||
break
|
Loading…
Reference in New Issue
Block a user