From a4b24b18fb7d45900834f70d15b9709fd8b8dec4 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 12 May 2022 23:44:20 +0300 Subject: [PATCH] Support parallel uploads and downloads --- clearml/datasets/dataset.py | 287 +++++++++--------- clearml/task.py | 2 +- clearml/utilities/future_caller.py | 176 ----------- clearml/utilities/parallel.py | 469 +++++++++++++++++++++++++++++ 4 files changed, 622 insertions(+), 312 deletions(-) delete mode 100644 clearml/utilities/future_caller.py create mode 100644 clearml/utilities/parallel.py diff --git a/clearml/datasets/dataset.py b/clearml/datasets/dataset.py index 84c7f39f..8e654841 100644 --- a/clearml/datasets/dataset.py +++ b/clearml/datasets/dataset.py @@ -1,12 +1,13 @@ import json import os import shutil +import psutil from copy import deepcopy, copy -from multiprocessing import cpu_count from multiprocessing.pool import ThreadPool -from tempfile import mkstemp, mkdtemp +from concurrent.futures import ThreadPoolExecutor +from tempfile import mkdtemp from typing import Union, Optional, Sequence, List, Dict, Any, Mapping -from zipfile import ZipFile, ZIP_DEFLATED +from zipfile import ZIP_DEFLATED from attr import attrs, attrib from pathlib2 import Path @@ -21,6 +22,7 @@ from ..storage.helper import StorageHelper from ..storage.cache import CacheManager from ..storage.util import sha256sum, is_windows, md5text, format_size from ..utilities.matching import matches_any_wildcard +from ..utilities.parallel import ParallelZipper try: from pathlib import Path as _Path # noqa @@ -222,6 +224,16 @@ class Dataset(object): # type: (List[str]) -> () self._task.set_tags(values or []) + def add_tags(self, tags): + # type: (Union[Sequence[str], str]) -> None + """ + Add Tags to this dataset. Old tags are not deleted. When executing a Task (experiment) remotely, + this method has no effect. + + :param tags: A list of tags which describe the Task to add. + """ + self._task.add_tags(tags) + def add_files( self, path, # type: Union[str, Path, _Path] @@ -282,13 +294,13 @@ class Dataset(object): A few examples: - Add file.jpg to the dataset. When retrieving a copy of the entire dataset (see dataset.get_local_copy()). This file will be located in "./my_dataset/new_folder/file.jpg". - add_external_files(source_url="s3://my_bucket/stuff/file.jpg", dataset_path="/my_dataset/new_folder/") + add_external_files(source_url="s3://my_bucket/stuff/file.jpg", target_dataset_folder="/my_dataset/new_folder/") - Add all jpg files located in s3 bucket called "my_bucket" to the dataset. - add_external_files(source_url="s3://my/bucket/", wildcard = "*.jpg", dataset_path="/my_dataset/new_folder/") + add_external_files(source_url="s3://my/bucket/", wildcard = "*.jpg",target_dataset_folder="/my_dataset/new_folder/") - Add the entire content of "remote_folder" to the dataset. - add_external_files(source_url="s3://bucket/remote_folder/", dataset_path="/my_dataset/new_folder/") + add_external_files(source_url="s3://bucket/remote_folder/", target_dataset_folder="/my_dataset/new_folder/") - Add the local file "/folder/local_file.jpg" to the dataset. - add_external_files(source_url="file:///folder/local_file.jpg", dataset_path="/my_dataset/new_folder/") + add_external_files(source_url="file:///folder/local_file.jpg", target_dataset_folder="/my_dataset/new_folder/") :param source_url: Source url link to add to the dataset, e.g. s3://bucket/folder/path, s3://bucket/folder/file.csv @@ -485,8 +497,10 @@ class Dataset(object): return num_removed, num_added, num_modified - def upload(self, show_progress=True, verbose=False, output_url=None, compression=None, chunk_size=None): - # type: (bool, bool, Optional[str], Optional[str], int) -> () + def upload( + self, show_progress=True, verbose=False, output_url=None, compression=None, chunk_size=None, max_workers=None + ): + # type: (bool, bool, Optional[str], Optional[str], int, Optional[int]) -> () """ Start file uploading, the function returns when all files are uploaded. @@ -498,138 +512,98 @@ class Dataset(object): :param chunk_size: Artifact chunk size (MB) for the compressed dataset, if not provided (None) use the default chunk size (512mb). If -1 is provided, use a single zip artifact for the entire dataset change-set (old behaviour) + :param max_workers: Numbers of threads to be spawned when zipping and uploading the files. + Defaults to the number of logical cores. """ + if not max_workers: + max_workers = psutil.cpu_count() + # set output_url if output_url: self._task.output_uri = output_url self._task.get_logger().report_text( - 'Uploading dataset files: {}'.format( - dict(show_progress=show_progress, verbose=verbose, output_url=output_url, compression=compression)), - print_console=False) + "Uploading dataset files: {}".format( + dict(show_progress=show_progress, verbose=verbose, output_url=output_url, compression=compression) + ), + print_console=False, + ) - list_zipped_artifacts = [] # List[Tuple[Path, int, str, str]] - list_file_entries = list(self._dataset_file_entries.values()) total_size = 0 + chunks_count = 0 + keep_as_file_entry = set() chunk_size = int(self._dataset_chunk_size_mb if not chunk_size else chunk_size) - try: - from tqdm import tqdm # noqa - a_tqdm = tqdm(total=len(list_file_entries)) - except ImportError: - a_tqdm = None - while list_file_entries: - fd, zip_file = mkstemp( - prefix='dataset.{}.'.format(self._id), suffix='.zip' + with ThreadPoolExecutor(max_workers=max_workers) as pool: + parallel_zipper = ParallelZipper( + chunk_size, + max_workers, + allow_zip_64=True, + compression=compression or ZIP_DEFLATED, + zip_prefix="dataset.{}.".format(self._id), + zip_suffix=".zip", + verbose=verbose, + task=self._task, + pool=pool, ) - archive_preview = '' - count = 0 - processed = 0 - zip_file = Path(zip_file) - print('{}Compressing local files, chunk {} [remaining {} files]'.format( - '\n' if a_tqdm else '', 1+len(list_zipped_artifacts), len(list_file_entries))) - try: - with ZipFile(zip_file.as_posix(), 'w', allowZip64=True, compression=compression or ZIP_DEFLATED) as zf: - for file_entry in list_file_entries: - processed += 1 - if a_tqdm: - a_tqdm.update() - if not file_entry.local_path: - # file is already in an uploaded artifact - continue - filename = Path(file_entry.local_path) - if not filename.is_file(): - LoggerRoot.get_base_logger().warning( - "Could not store dataset file {}. File skipped".format(file_entry.local_path)) - # mark for removal - file_entry.relative_path = None - continue - if verbose: - self._task.get_logger().report_text('Compressing {}'.format(filename.as_posix())) - - relative_file_name = file_entry.relative_path - zf.write(filename.as_posix(), arcname=relative_file_name) - archive_preview += '{} - {}\n'.format( - relative_file_name, format_size(filename.stat().st_size)) - file_entry.artifact_name = self._data_artifact_name - count += 1 - - # limit the size of a single artifact - if chunk_size > 0 and zip_file.stat().st_size >= chunk_size * (1024**2): - break - except Exception as e: - # failed uploading folder: - LoggerRoot.get_base_logger().warning( - 'Exception {}\nFailed zipping dataset.'.format(e)) - return False - finally: - os.close(fd) - - if not count: - zip_file.unlink() - else: - total_size += zip_file.stat().st_size - # update the artifact preview - archive_preview = "Dataset archive content [{} files]:\n".format(count) + archive_preview - # add into the list - list_zipped_artifacts += [(zip_file, count, archive_preview, self._data_artifact_name)] - # let's see what's left - list_file_entries = list_file_entries[processed:] - # next artifact name to use - self._data_artifact_name = self._get_next_data_artifact_name(self._data_artifact_name) - - if a_tqdm: - a_tqdm.close() + file_paths = [] + arcnames = {} + for f in self._dataset_file_entries.values(): + if not f.local_path: + keep_as_file_entry.add(f.relative_path) + continue + file_paths.append(f.local_path) + arcnames[f.local_path] = f.relative_path + for zip_ in parallel_zipper.zip_iter(file_paths, arcnames=arcnames): + zip_path = Path(zip_.zip_path) + artifact_name = self._data_artifact_name + self._data_artifact_name = self._get_next_data_artifact_name(self._data_artifact_name) + self._task.get_logger().report_text( + "Uploading dataset changes ({} files compressed to {}) to {}".format( + zip_.count, format_size(zip_.size), self.get_default_storage() + ) + ) + total_size += zip_.size + chunks_count += 1 + pool.submit( + self._task.upload_artifact, + name=artifact_name, + artifact_object=Path(zip_path), + preview=zip_.archive_preview, + delete_after_upload=True, + wait_on_upload=True, + ) + for file_entry in self._dataset_file_entries.values(): + if file_entry.local_path is not None and Path(file_entry.local_path).as_posix() in zip_.files_zipped: + keep_as_file_entry.add(file_entry.relative_path) + file_entry.artifact_name = artifact_name + if file_entry.parent_dataset_id == self._id: + file_entry.local_path = None + self._serialize() self._task.get_logger().report_text( - "File compression completed: total size {}, {} chunked stored (average size {})".format( + "File compression and upload completed: total size {}, {} chunked stored (average size {})".format( format_size(total_size), - len(list_zipped_artifacts), - format_size(0 if len(list_zipped_artifacts) == 0 else total_size / len(list_zipped_artifacts)), + chunks_count, + format_size(0 if chunks_count == 0 else total_size / chunks_count), ) ) - if not list_zipped_artifacts: - LoggerRoot.get_base_logger().info('No pending files, skipping upload.') + if chunks_count == 0: + LoggerRoot.get_base_logger().info("No pending files, skipping upload.") self._dirty = False self._serialize() return True - for i, (zip_file, count, archive_preview, artifact_name) in enumerate(list_zipped_artifacts): - # noinspection PyBroadException - try: - # let's try to rename it - new_zip_file = zip_file.parent / 'dataset.{}.zip'.format(self._id) - zip_file.rename(new_zip_file) - zip_file = new_zip_file - except Exception: - pass - - # start upload - zip_file_size = format_size(Path(zip_file).stat().st_size) - self._task.get_logger().report_text( - 'Uploading compressed dataset changes {}/{} ({} files {}) to {}'.format( - i+1, len(list_zipped_artifacts), count, zip_file_size, self.get_default_storage())) - - self._task.upload_artifact( - name=artifact_name, artifact_object=Path(zip_file), preview=archive_preview, - delete_after_upload=True, wait_on_upload=True) - - # mark as upload completed and serialize - for file_entry in self._dataset_file_entries.values(): - if file_entry.parent_dataset_id == self._id and file_entry.artifact_name == artifact_name: - file_entry.local_path = None - # serialize current state - self._serialize() - - # remove files that could not be zipped, containing Null relative Path + # remove files that could not be zipped self._dataset_file_entries = { - k: v for k, v in self._dataset_file_entries.items() if v.relative_path is not None} - # report upload completed - self._task.get_logger().report_text('Upload completed ({})'.format(format_size(total_size))) + k: v for k, v in self._dataset_file_entries.items() if v.relative_path in keep_as_file_entry + } + # report upload completed self._add_script_call( - 'upload', show_progress=show_progress, verbose=verbose, output_url=output_url, compression=compression) + "upload", show_progress=show_progress, verbose=verbose, output_url=output_url, compression=compression + ) self._dirty = False self._serialize() @@ -706,8 +680,8 @@ class Dataset(object): return self._task.get_status() not in ( Task.TaskStatusEnum.in_progress, Task.TaskStatusEnum.created, Task.TaskStatusEnum.failed) - def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_on_error=True): - # type: (bool, Optional[int], Optional[int], bool) -> str + def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_on_error=True, max_workers=None): + # type: (bool, Optional[int], Optional[int], bool, Optional[int]) -> str """ return a base folder with a read-only (immutable) local copy of the entire dataset download and copy / soft-link, files from all the parent dataset versions @@ -725,20 +699,31 @@ class Dataset(object): and `num_parts=5`, the chunk index used per parts would be: part=0 -> chunks[0,5], part=1 -> chunks[1,6], part=2 -> chunks[2,7], part=3 -> chunks[3, ] :param raise_on_error: If True raise exception if dataset merging failed on any file + :param max_workers: Number of threads to be spawned when getting the dataset copy. Defaults + to the number of logical cores. :return: A base folder for the entire dataset """ assert self._id if not self._task: self._task = Task.get_task(task_id=self._id) + if not max_workers: + max_workers = psutil.cpu_count() # now let's merge the parents target_folder = self._merge_datasets( - use_soft_links=use_soft_links, raise_on_error=raise_on_error, part=part, num_parts=num_parts) + use_soft_links=use_soft_links, + raise_on_error=raise_on_error, + part=part, + num_parts=num_parts, + max_workers=max_workers, + ) return target_folder - def get_mutable_local_copy(self, target_folder, overwrite=False, part=None, num_parts=None, raise_on_error=True): - # type: (Union[Path, _Path, str], bool, Optional[int], Optional[int], bool) -> Optional[str] + def get_mutable_local_copy( + self, target_folder, overwrite=False, part=None, num_parts=None, raise_on_error=True, max_workers=None + ): + # type: (Union[Path, _Path, str], bool, Optional[int], Optional[int], bool, Optional[int]) -> Optional[str] """ return a base folder with a writable (mutable) local copy of the entire dataset download and copy / soft-link, files from all the parent dataset versions @@ -758,10 +743,14 @@ class Dataset(object): and `num_parts=5`, the chunk index used per parts would be: part=0 -> chunks[0,5], part=1 -> chunks[1,6], part=2 -> chunks[2,7], part=3 -> chunks[3, ] :param raise_on_error: If True raise exception if dataset merging failed on any file + :param max_workers: Number of threads to be spawned when getting the dataset copy. Defaults + to the number of logical cores. :return: The target folder containing the entire dataset """ assert self._id + if not max_workers: + max_workers = psutil.cpu_count() target_folder = Path(target_folder).absolute() target_folder.mkdir(parents=True, exist_ok=True) # noinspection PyBroadException @@ -775,7 +764,9 @@ class Dataset(object): return None shutil.rmtree(target_folder.as_posix()) - ro_folder = self.get_local_copy(part=part, num_parts=num_parts, raise_on_error=raise_on_error) + ro_folder = self.get_local_copy( + part=part, num_parts=num_parts, raise_on_error=raise_on_error, max_workers=max_workers + ) shutil.copytree(ro_folder, target_folder.as_posix(), symlinks=False) return target_folder.as_posix() @@ -957,7 +948,7 @@ class Dataset(object): return None - pool = ThreadPool(cpu_count() * 2) + pool = ThreadPool(psutil.cpu_count()) matching_errors = pool.map(compare, self._dataset_file_entries.values()) pool.close() return [f.relative_path for f in matching_errors if f is not None] @@ -1377,7 +1368,7 @@ class Dataset(object): for f in file_entries ] self._task.get_logger().report_text('Generating SHA2 hash for {} files'.format(len(file_entries))) - pool = ThreadPool(cpu_count() * 2) + pool = ThreadPool(psutil.cpu_count()) try: import tqdm # noqa for _ in tqdm.tqdm(pool.imap_unordered(self._calc_file_hash, file_entries), total=len(file_entries)): @@ -1514,8 +1505,9 @@ class Dataset(object): lock_target_folder=False, cleanup_target_folder=True, target_folder=None, + max_workers=None ): - # type: (bool, Optional[List[int]], bool, bool, Optional[Path]) -> str + # type: (bool, Optional[List[int]], bool, bool, Optional[Path], Optional[int]) -> str """ First, extracts the archive present on the ClearML server containing this dataset's files. Then, download the remote files. Note that if a remote file was added to the ClearML server, then @@ -1530,15 +1522,20 @@ class Dataset(object): Notice you should unlock it manually, or wait for the process to finish for auto unlocking. :param cleanup_target_folder: If True remove target folder recursively :param target_folder: If provided use the specified target folder, default, auto generate from Dataset ID. + :param max_workers: Number of threads to be spawned when getting dataset files. Defaults + to the number of virtual cores. :return: Path to the local storage where the data was downloaded """ + if not max_workers: + max_workers = psutil.cpu_count() local_folder = self._extract_dataset_archive( force=force, selected_chunks=selected_chunks, lock_target_folder=lock_target_folder, cleanup_target_folder=cleanup_target_folder, target_folder=target_folder, + max_workers=max_workers ) self._download_external_files( target_folder=target_folder, lock_target_folder=lock_target_folder @@ -1612,8 +1609,9 @@ class Dataset(object): lock_target_folder=False, cleanup_target_folder=True, target_folder=None, + max_workers=None ): - # type: (bool, Optional[List[int]], bool, bool, Optional[Path]) -> str + # type: (bool, Optional[List[int]], bool, bool, Optional[Path], Optional[int]) -> str """ Download the dataset archive, and extract the zip content to a cached folder. Notice no merging is done. @@ -1626,6 +1624,7 @@ class Dataset(object): Notice you should unlock it manually, or wait for the process to finish for auto unlocking. :param cleanup_target_folder: If True remove target folder recursively :param target_folder: If provided use the specified target folder, default, auto generate from Dataset ID. + :param max_workers: Number of threads to be spawned when downloading and extracting the archives :return: Path to a local storage extracted archive """ @@ -1634,6 +1633,9 @@ class Dataset(object): if not self._task: self._task = Task.get_task(task_id=self._id) + if not max_workers: + max_workers = psutil.cpu_count() + data_artifact_entries = self._get_data_artifact_names() if selected_chunks is not None and data_artifact_entries: @@ -1671,9 +1673,15 @@ class Dataset(object): StorageManager._extract_to_cache( cached_file=local_zip, name=self._id, cache_context=self.__cache_context, target_folder=local_folder, force=True) + # noinspection PyBroadException + try: + Path(local_zip).unlink() + except Exception: + pass - for d in data_artifact_entries: - _download_part(d) + with ThreadPoolExecutor(max_workers=max_workers) as pool: + for d in data_artifact_entries: + pool.submit(_download_part, d) return local_folder @@ -1706,8 +1714,8 @@ class Dataset(object): numbers = sorted([int(a[prefix_len:]) for a in data_artifact_entries if a.startswith(prefix)]) return '{}{:03d}'.format(prefix, numbers[-1]+1 if numbers else 1) - def _merge_datasets(self, use_soft_links=None, raise_on_error=True, part=None, num_parts=None): - # type: (bool, bool, Optional[int], Optional[int]) -> str + def _merge_datasets(self, use_soft_links=None, raise_on_error=True, part=None, num_parts=None, max_workers=None): + # type: (bool, bool, Optional[int], Optional[int], Optional[int]) -> str """ download and copy / soft-link, files from all the parent dataset versions :param use_soft_links: If True use soft links, default False on windows True on Posix systems @@ -1720,12 +1728,17 @@ class Dataset(object): requested number of parts. Notice that the actual chunks used per part are rounded down. Example: Assuming 8 chunks on this version, and `num_parts=5`, the chunk index used per parts would be: part=0 -> chunks[0,5], part=1 -> chunks[1,6], part=2 -> chunks[2,7], part=3 -> chunks[3, ] + :param max_workers: Number of threads to be spawned when merging datasets. Defaults to the number + of logical cores. :return: the target folder """ assert part is None or (isinstance(part, int) and part >= 0) assert num_parts is None or (isinstance(num_parts, int) and num_parts >= 1) + if max_workers is None: + max_workers = psutil.cpu_count() + if use_soft_links is None: use_soft_links = False if is_windows() else True @@ -1762,6 +1775,7 @@ class Dataset(object): selected_chunks=chunk_selection.get(self._id) if chunk_selection else None, cleanup_target_folder=True, target_folder=target_base_folder, + max_workers=max_workers ) dependencies_by_order.remove(self._id) @@ -2108,12 +2122,14 @@ class Dataset(object): chunk_selection, use_soft_links, raise_on_error, - force + force, + max_workers=None ): - # type: (Path, List[str], Optional[dict], bool, bool, bool) -> () + # type: (Path, List[str], dict, bool, bool, bool, Optional[int]) -> () # create thread pool, for creating soft-links / copying - # todo: parallelize by parent datasets - pool = ThreadPool(cpu_count() * 2) + if not max_workers: + max_workers = psutil.cpu_count() + pool = ThreadPool(max_workers) for dataset_version_id in dependencies_by_order: # make sure we skip over empty dependencies if dataset_version_id not in self._dependency_graph: @@ -2126,6 +2142,7 @@ class Dataset(object): force=force, lock_target_folder=True, cleanup_target_folder=False, + max_workers=max_workers )) ds_base_folder.touch() diff --git a/clearml/task.py b/clearml/task.py index cf32f1d5..c76429ed 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -77,7 +77,7 @@ from .utilities.seed import make_deterministic from .utilities.lowlevel.threads import get_current_thread_id from .utilities.process.mp import BackgroundMonitor, leave_process from .utilities.matching import matches_any_wildcard -from .utilities.future_caller import FutureTaskCaller +from .utilities.parallel import FutureTaskCaller # noinspection PyProtectedMember from .backend_interface.task.args import _Arguments diff --git a/clearml/utilities/future_caller.py b/clearml/utilities/future_caller.py deleted file mode 100644 index 6d24058b..00000000 --- a/clearml/utilities/future_caller.py +++ /dev/null @@ -1,176 +0,0 @@ -from copy import deepcopy -from time import sleep - -from six.moves.queue import Queue, Empty -from threading import Thread -from typing import Any, Callable, Optional, Type - - -class _DeferredClass(object): - __slots__ = ('__queue', '__future_caller', '__future_func') - - def __init__(self, a_future_caller, future_func): - self.__queue = Queue() - self.__future_caller = a_future_caller - self.__future_func = future_func - - def __nested_caller(self, item, args, kwargs): - # wait until object is constructed - getattr(self.__future_caller, "id") # noqa - - future_func = getattr(self.__future_caller, self.__future_func) - the_object = future_func() - the_object_func = getattr(the_object, item) - return the_object_func(*args, **kwargs) - - def _flush_into_logger(self, a_future_object=None, a_future_func=None): - self.__close_queue(a_future_object=a_future_object, a_future_func=a_future_func) - - def __close_queue(self, a_future_object=None, a_future_func=None): - # call this function when we Know the object is initialization is completed - if self.__queue is None: - return - - _queue = self.__queue - self.__queue = None - while True: - # noinspection PyBroadException - try: - item, args, kwargs = _queue.get(block=False) - if a_future_object: - future_func = getattr(a_future_object, self.__future_func) - the_object = future_func() - the_object_func = getattr(the_object, item) - the_object_func(*args, **kwargs) - elif a_future_func: - the_object_func = getattr(a_future_func, item) - the_object_func(*args, **kwargs) - else: - self.__nested_caller(item, args, kwargs) - except Empty: - break - except Exception: - # import traceback - # stdout_print(''.join(traceback.format_exc())) - pass - - def __getattr__(self, item): - def _caller(*args, **kwargs): - # if we already completed the background initialization, call functions immediately - # noinspection PyProtectedMember - if not self.__queue or self.__future_caller._FutureTaskCaller__executor is None: - return self.__nested_caller(item, args, kwargs) - - # noinspection PyBroadException - try: - # if pool is still active call async - self.__queue.put((item, deepcopy(args) if args else args, deepcopy(kwargs) if kwargs else kwargs)) - except Exception: - # assume we wait only if self.__pool was nulled between the if and now, so just call directly - return self.__nested_caller(item, args, kwargs) - - # let's hope it is the right one - return True - - return _caller - - -class FutureTaskCaller(object): - """ - FutureTaskCaller is used to create a class via a functions async, in another thread. - - For example: - - .. code-block:: py - - future = FutureTaskCaller().call(func=max, func_cb=None, override_cls=None, 1, 2) - print('Running other code') - print(future.result()) # will print '2' - """ - __slots__ = ('__object', '__object_cls', '__executor', '__deferred_bkg_class') - - @property - def __class__(self): - return self.__object_cls - - def __init__(self, func, func_cb, override_cls, *args, **kwargs): - # type: (Callable, Optional[Callable], Type, *Any, **Any) -> None - """ - __init__(*args, **kwargs) in another thread - - :return: This FutureTaskCaller instance - """ - self.__object = None - self.__object_cls = override_cls - self.__deferred_bkg_class = _DeferredClass(self, "get_logger") - - self.__executor = Thread(target=self.__submit__, args=(func, func_cb, args, kwargs)) - self.__executor.daemon = True - self.__executor.start() - - def __submit__(self, fn, fn_cb, args, kwargs): - # background initialization call - _object = fn(*args, **kwargs) - - # push all background calls (now that the initialization is complete) - if self.__deferred_bkg_class: - _deferred_bkg_class = self.__deferred_bkg_class - self.__deferred_bkg_class = None - # noinspection PyProtectedMember - _deferred_bkg_class._flush_into_logger(a_future_object=_object) - - # store the initialized object - self.__object = _object - # callback function - if fn_cb is not None: - fn_cb(self.__object) - - def __getattr__(self, item): - # if we get here, by definition this is not a __slot__ entry, pass to the object - return getattr(self.__result__(), item) - - def __setattr__(self, item, value): - # make sure we can set the slots - if item in ["_FutureTaskCaller__executor", "_FutureTaskCaller__object", - "_FutureTaskCaller__object_cls", "_FutureTaskCaller__deferred_bkg_class"]: - return super(FutureTaskCaller, self).__setattr__(item, value) - - setattr(self.__result__(), item, value) - - def __result__(self, timeout=None): - # type: (Optional[float]) -> Any - """ - Wait and get the result of the function called with self.call() - - :param timeout: The maximum number of seconds to wait for the result. If None, - there is no limit for the wait time. - - :return: The result of the called function - """ - if self.__executor: - # since the test is not atomic, we assume that if we failed joining - # it is because someone else joined before us - # noinspection PyBroadException - try: - self.__executor.join(timeout=timeout) - except RuntimeError: - # this is probably calling ourselves from the same thread - raise - except Exception: - # wait until that someone else updated the __object - while self.__object is None: - sleep(1) - self.__executor = None - return self.__object - - # This is the part where we are no longer generic, but __slots__ - # inheritance is too cumbersome to actually inherit and make sure it works optimally - def get_logger(self): - if self.__object is not None: - return self.__object.get_logger() - - if self.__deferred_bkg_class is None: - # we are shutting down, wait until object is available - return self.__result__().get_logger() - - return self.__deferred_bkg_class diff --git a/clearml/utilities/parallel.py b/clearml/utilities/parallel.py new file mode 100644 index 00000000..ecf79356 --- /dev/null +++ b/clearml/utilities/parallel.py @@ -0,0 +1,469 @@ +import os + +from copy import deepcopy +from time import sleep +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Callable, Optional, Union, Generator, List, Type +from zipfile import ZipFile, ZIP_DEFLATED +from six.moves.queue import PriorityQueue, Queue, Empty +from pathlib2 import Path +from tempfile import mkstemp +from collections import deque +from threading import Thread + +from ..debugging.log import LoggerRoot +from ..storage.util import format_size + + +class _DeferredClass(object): + __slots__ = ('__queue', '__future_caller', '__future_func') + + def __init__(self, a_future_caller, future_func): + self.__queue = Queue() + self.__future_caller = a_future_caller + self.__future_func = future_func + + def __nested_caller(self, item, args, kwargs): + # wait until object is constructed + getattr(self.__future_caller, "id") # noqa + + future_func = getattr(self.__future_caller, self.__future_func) + the_object = future_func() + the_object_func = getattr(the_object, item) + return the_object_func(*args, **kwargs) + + def _flush_into_logger(self, a_future_object=None, a_future_func=None): + self.__close_queue(a_future_object=a_future_object, a_future_func=a_future_func) + + def __close_queue(self, a_future_object=None, a_future_func=None): + # call this function when we Know the object is initialization is completed + if self.__queue is None: + return + + _queue = self.__queue + self.__queue = None + while True: + # noinspection PyBroadException + try: + item, args, kwargs = _queue.get(block=False) + if a_future_object: + future_func = getattr(a_future_object, self.__future_func) + the_object = future_func() + the_object_func = getattr(the_object, item) + the_object_func(*args, **kwargs) + elif a_future_func: + the_object_func = getattr(a_future_func, item) + the_object_func(*args, **kwargs) + else: + self.__nested_caller(item, args, kwargs) + except Empty: + break + except Exception: + # import traceback + # stdout_print(''.join(traceback.format_exc())) + pass + + def __getattr__(self, item): + def _caller(*args, **kwargs): + # if we already completed the background initialization, call functions immediately + # noinspection PyProtectedMember + if not self.__queue or self.__future_caller._FutureTaskCaller__executor is None: + return self.__nested_caller(item, args, kwargs) + + # noinspection PyBroadException + try: + # if pool is still active call async + self.__queue.put((item, deepcopy(args) if args else args, deepcopy(kwargs) if kwargs else kwargs)) + except Exception: + # assume we wait only if self.__pool was nulled between the if and now, so just call directly + return self.__nested_caller(item, args, kwargs) + + # let's hope it is the right one + return True + + return _caller + + +class FutureTaskCaller(object): + """ + FutureTaskCaller is used to create a class via a functions async, in another thread. + + For example: + + .. code-block:: py + + future = FutureTaskCaller().call(func=max, func_cb=None, override_cls=None, 1, 2) + print('Running other code') + print(future.result()) # will print '2' + """ + __slots__ = ('__object', '__object_cls', '__executor', '__deferred_bkg_class') + + @property + def __class__(self): + return self.__object_cls + + def __init__(self, func, func_cb, override_cls, *args, **kwargs): + # type: (Callable, Optional[Callable], Type, *Any, **Any) -> None + """ + __init__(*args, **kwargs) in another thread + + :return: This FutureTaskCaller instance + """ + self.__object = None + self.__object_cls = override_cls + self.__deferred_bkg_class = _DeferredClass(self, "get_logger") + + self.__executor = Thread(target=self.__submit__, args=(func, func_cb, args, kwargs)) + self.__executor.daemon = True + self.__executor.start() + + def __submit__(self, fn, fn_cb, args, kwargs): + # background initialization call + _object = fn(*args, **kwargs) + + # push all background calls (now that the initialization is complete) + if self.__deferred_bkg_class: + _deferred_bkg_class = self.__deferred_bkg_class + self.__deferred_bkg_class = None + # noinspection PyProtectedMember + _deferred_bkg_class._flush_into_logger(a_future_object=_object) + + # store the initialized object + self.__object = _object + # callback function + if fn_cb is not None: + fn_cb(self.__object) + + def __getattr__(self, item): + # if we get here, by definition this is not a __slot__ entry, pass to the object + return getattr(self.__result__(), item) + + def __setattr__(self, item, value): + # make sure we can set the slots + if item in ["_FutureTaskCaller__executor", "_FutureTaskCaller__object", + "_FutureTaskCaller__object_cls", "_FutureTaskCaller__deferred_bkg_class"]: + return super(FutureTaskCaller, self).__setattr__(item, value) + + setattr(self.__result__(), item, value) + + def __result__(self, timeout=None): + # type: (Optional[float]) -> Any + """ + Wait and get the result of the function called with self.call() + + :param timeout: The maximum number of seconds to wait for the result. If None, + there is no limit for the wait time. + + :return: The result of the called function + """ + if self.__executor: + # since the test is not atomic, we assume that if we failed joining + # it is because someone else joined before us + # noinspection PyBroadException + try: + self.__executor.join(timeout=timeout) + except RuntimeError: + # this is probably calling ourselves from the same thread + raise + except Exception: + # wait until that someone else updated the __object + while self.__object is None: + sleep(1) + self.__executor = None + return self.__object + + # This is the part where we are no longer generic, but __slots__ + # inheritance is too cumbersome to actually inherit and make sure it works optimally + def get_logger(self): + if self.__object is not None: + return self.__object.get_logger() + + if self.__deferred_bkg_class is None: + # we are shutting down, wait until object is available + return self.__result__().get_logger() + + return self.__deferred_bkg_class + + +class ParallelZipper(object): + """ + Used to zip multiple files in zip chunks of a particular size, all in parallel + """ + class ZipperObject(object): + def __init__( + self, + chunk_size, # int + zipper_queue, # PriorityQueue[ParallelZipper.ZipperObject] + zipper_results, # Queue[ParallelZipper.ZipperObject] + allow_zip_64, # bool + compression, # Any + zip_prefix, # str + zip_suffix, # str + verbose, # bool + task, # Any + ): + # (...) -> ParallelZipper.ZipperObject + """ + Initialize a ParallelZipper.ZipperObject instance that holds its corresponding zip + file, as well as other relevant data + + :param chunk_size: Chunk size, in MB. The ParallelZipper will try its best not to exceed this size + when bulding up this zipper object, but that is not guaranteed + :param zipper_queue: PriorityQueue that holds ParallelZipper.ZipperObject instances. + When this ParallelZipper.ZipperObject can hold more data (i.e. chunk_size was not exceeded), + this object will reinsert itself in this queue to be reused by the ParallelZipper. + Else, a fresh ParallelZipper.ZipperObject will be inserted + :param zipper_results: Queue that holds ParallelZipper.ZipperObject instances. These instances + are added to this queue when chunk_size is exceeded + :param allow_zip_64: if True ZipFile will create files with ZIP64 extensions when + needed, otherwise it will raise an exception when this would be necessary + :param compression: ZipFile.ZIP_STORED (no compression), ZipFile.ZIP_DEFLATED (requires zlib), + ZipFile.ZIP_BZIP2 (requires bz2) or ZipFile.ZIP_LZMA (requires lzma). + :param zip_prefix: The zip file created by this object will have its name prefixed by this + :param zip_suffix: The zip file created by this object will have its name suffixed by this + :param verbose: If True, print data relevant to the file compression + :param task: ClearML Task, used for logging + + :return: ParallelZipper.ZipperObject instance + """ + self._chunk_size = chunk_size + self._zipper_queue = zipper_queue + self._zipper_results = zipper_results + self._allow_zip_64 = allow_zip_64 + self._compression = compression + self._zip_prefix = zip_prefix + self._zip_suffix = zip_suffix + self._verbose = verbose + self._task = task + self.fd, self.zip_path = mkstemp(prefix=zip_prefix, suffix=zip_suffix) + self.zip_path = Path(self.zip_path) + self.zip_file = ZipFile(self.zip_path.as_posix(), "w", allowZip64=allow_zip_64, compression=compression) + self.archive_preview = "" + self.count = 0 + self.files_zipped = set() + + def zip(self, file_path, arcname=None): + # type: (Union[str, Path], str) -> () + """ + Zips a file into the ZipFile created by this instance. This instance will either add + itself back to the PriorityQueue used to select the best zipping candidate or add itself + to the result Queue after exceeding self.chunk_size. + + :param file_path: Path to the file to be zipped + :param arcname: Name of the file in the archive + """ + if self._verbose and self._task: + self._task.get_logger().report_text("Compressing {}".format(Path(file_path).as_posix())) + self.zip_file.write(file_path, arcname=arcname) + self.count += 1 + preview_path = arcname + if not preview_path: + preview_path = file_path + self.archive_preview += "{} - {}\n".format(preview_path, format_size(self.size)) + self.files_zipped.add(Path(file_path).as_posix()) + if self._chunk_size <= 0 or self.size < self._chunk_size: + self._zipper_queue.put(self) + else: + self._zipper_queue.put( + ParallelZipper.ZipperObject( + self._chunk_size, + self._zipper_queue, + self._zipper_results, + self._allow_zip_64, + self._compression, + self._zip_prefix, + self._zip_suffix, + self._verbose, + self._task, + ) + ) + self._zipper_results.put(self) + + def merge(self, other): + # type: (ParallelZipper.ZipperObject) -> () + """ + Merges one ParallelZipper.ZipperObject instance into the current one. + All the files zipped by the other instance will be added to this instance, + as well as any other useful additional data + + :param other: ParallelZipper.ZipperObject instance to merge into this one + """ + with ZipFile(self.zip_path.as_posix(), "a") as parent_zip: + with ZipFile(other.zip_path.as_posix(), "r") as child_zip: + for child_name in child_zip.namelist(): + parent_zip.writestr(child_name, child_zip.open(child_name).read()) + self.files_zipped |= other.files_zipped + self.count += other.count + self.archive_preview += other.archive_preview + + def close(self): + # type: () -> () + """ + Attempts to close file descriptors associated to the ZipFile + """ + # noinspection PyBroadException + try: + self.zip_file.close() + os.close(self.fd) + except Exception: + pass + + def delete(self): + # type: () -> () + """ + Attempts to delete the ZipFile from the disk + """ + # noinspection PyBroadException + try: + self.close() + self.zip_path.unlink() + except Exception: + pass + + @property + def size(self): + # type: () -> () + """ + :return: Size of the ZipFile, in bytes + """ + return self.zip_path.stat().st_size + + def __lt__(self, other): + # we want to completely "fill" as many zip files as possible, hence the ">" comparison + return self.size > other.size + + def __init__( + self, + chunk_size, # type: int + max_workers, # type: int + allow_zip_64=True, # type: Optional[bool] + compression=ZIP_DEFLATED, # type: Optional[Any] + zip_prefix="", # type: Optional[str] + zip_suffix="", # type: Optional[str] + verbose=False, # type: Optional[bool] + task=None, # type: Optional[Any] + pool=None, # type: Optional[ThreadPoolExecutor] + ): + # type: (...) -> ParallelZipper + """ + Initialize the ParallelZipper. Each zip created by this object will have the following naming + format: [zip_prefix][zip_suffix] + + :param chunk_size: Chunk size, in MB. The ParallelZipper will try its best not to exceed this size, + but that is not guaranteed + :param max_workers: The maximum number of workers spawned when zipping the files + :param allow_zip_64: if True ZipFile will create files with ZIP64 extensions when + needed, otherwise it will raise an exception when this would be necessary + :param compression: ZipFile.ZIP_STORED (no compression), ZipFile.ZIP_DEFLATED (requires zlib), + ZipFile.ZIP_BZIP2 (requires bz2) or ZipFile.ZIP_LZMA (requires lzma). + :param zip_prefix: Zip file names will be prefixed by this + :param zip_suffix: Zip file names will pe suffixed by this + :param verbose: If True, print data relevant to the file compression + :param task: ClearML Task, used for logging + :param pool: Use this ThreadPoolExecutor instead of creating one. Note that this pool will not be + closed after zipping is finished. + + :return: ParallelZipper instance + """ + self._chunk_size = chunk_size * (1024 ** 2) + self._max_workers = max_workers + self._allow_zip_64 = allow_zip_64 + self._compression = compression + self._zip_prefix = zip_prefix + self._zip_suffix = zip_suffix + self._verbose = verbose + self._task = task + self._pool = pool + self._zipper_queue = PriorityQueue() + self._zipper_results = Queue() + + def zip_iter(self, file_paths, arcnames={}): + # type: (List[Union(str, Path)], Optional[dict[Union(str, Path), str]]) -> Generator[ParallelZipper.ZipperObject] + """ + Generator function that returns zip files as soon as they are available. + The zipping is done in parallel + + :param file_paths: List of paths to the files to zip + :param arcnames: Dictionary that maps the file path to what should be its name in the archive. + + :return: Generator of ParallelZipper.ZipperObjects + """ + while not self._zipper_queue.empty(): + self._zipper_queue.get_nowait() + for _ in range(self._max_workers): + self._zipper_queue.put( + ParallelZipper.ZipperObject( + self._chunk_size, + self._zipper_queue, + self._zipper_results, + self._allow_zip_64, + self._compression, + self._zip_prefix, + self._zip_suffix, + self._verbose, + self._task, + ) + ) + filtered_file_paths = [] + for file_path in file_paths: + if not Path(file_path).is_file(): + LoggerRoot.get_base_logger().warning("Could not store dataset file {}. File skipped".format(file_path)) + else: + filtered_file_paths.append(file_path) + file_paths = filtered_file_paths + + file_paths = sorted(file_paths, key=lambda k: Path(k).stat().st_size, reverse=True) + # zip in parallel + pooled = [] + if not self._pool: + pool = ThreadPoolExecutor(max_workers=self._max_workers) + else: + pool = self._pool + for f in file_paths: + zipper = self._zipper_queue.get() + pooled.append(pool.submit(zipper.zip, Path(f).as_posix(), arcname=arcnames.get(f))) + for result in self._yield_zipper_results(): + yield result + for task in pooled: + task.result() + if not self._pool: + pool.close() + + for result in self._yield_zipper_results(): + yield result + + zipper_results_leftover = [] + + # extract remaining results + while not self._zipper_queue.empty(): + result = self._zipper_queue.get() + if result.count != 0: + zipper_results_leftover.append(result) + else: + result.delete() + zipper_results_leftover = deque(sorted(zipper_results_leftover, reverse=True)) + + # merge zip files greedily if possible and get the paths as results + while len(zipper_results_leftover) > 0: + zip_ = zipper_results_leftover.pop() + zip_.close() + if zip_.size >= self._chunk_size > 0: + yield zip_ + continue + while len(zipper_results_leftover) > 0 and ( + self._chunk_size <= 0 or zipper_results_leftover[0].size + zip_.size < self._chunk_size + ): + child_zip = zipper_results_leftover.popleft() + child_zip.close() + zip_.merge(child_zip) + child_zip.delete() + yield zip_ + + def _yield_zipper_results(self): + while True: + try: + result = self._zipper_results.get_nowait() + result.close() + yield result + except Empty: + break