From 844c01e15b635fc5ad308c0c486b9146947eaa0b Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 25 Sep 2021 23:07:49 +0300 Subject: [PATCH] Add clearml-Data (Datasets) multi-chunk support --- clearml/backend_interface/logger.py | 4 +- clearml/backend_interface/task/task.py | 28 + clearml/backend_interface/util.py | 10 +- clearml/cli/data/__main__.py | 46 +- clearml/datasets/dataset.py | 765 +++++++++++++++++++------ clearml/storage/cache.py | 142 ++++- clearml/storage/manager.py | 59 +- clearml/utilities/locks/utils.py | 16 + 8 files changed, 848 insertions(+), 222 deletions(-) diff --git a/clearml/backend_interface/logger.py b/clearml/backend_interface/logger.py index d9a5be2b..c4e8de2a 100644 --- a/clearml/backend_interface/logger.py +++ b/clearml/backend_interface/logger.py @@ -179,8 +179,8 @@ class PrintPatchLogger(object): cr_flush_period = None def __init__(self, stream, logger=None, level=logging.INFO): - if self.__class__.cr_flush_period is None: - self.__class__.cr_flush_period = config.get("development.worker.console_cr_flush_period", 0) + if PrintPatchLogger.cr_flush_period is None: + PrintPatchLogger.cr_flush_period = config.get("development.worker.console_cr_flush_period", 0) PrintPatchLogger.patched = True self._terminal = stream self._log = logger diff --git a/clearml/backend_interface/task/task.py b/clearml/backend_interface/task/task.py index 7016eb40..e1edf216 100644 --- a/clearml/backend_interface/task/task.py +++ b/clearml/backend_interface/task/task.py @@ -1266,6 +1266,34 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): self._edit(execution=execution) return self.data.execution.artifacts or [] + def _delete_artifacts(self, artifact_names): + # type: (Sequence[str]) -> bool + """ + Delete a list of artifacts, by artifact name, from the Task. + + :param list artifact_names: list of artifact names + :return: True if successful + """ + if not Session.check_min_api_version('2.3'): + return False + if not isinstance(artifact_names, (list, tuple)): + raise ValueError('Expected artifact names as List[str]') + + with self._edit_lock: + if Session.check_min_api_version("2.13") and not self._offline_mode: + req = tasks.DeleteArtifactsRequest( + task=self.task_id, artifacts=[{"key": n, "mode": "output"} for n in artifact_names], force=True) + res = self.send(req, raise_on_errors=False) + if not res or not res.response or not res.response.deleted: + return False + self.reload() + else: + self.reload() + execution = self.data.execution + execution.artifacts = [a for a in execution.artifacts or [] if a.key not in artifact_names] + self._edit(execution=execution) + return self.data.execution.artifacts or [] + def _set_model_design(self, design=None): # type: (str) -> () with self._edit_lock: diff --git a/clearml/backend_interface/util.py b/clearml/backend_interface/util.py index 489bffeb..2dc96f56 100644 --- a/clearml/backend_interface/util.py +++ b/clearml/backend_interface/util.py @@ -85,7 +85,7 @@ def get_epoch_beginning_of_time(timezone_info=None): return datetime(1970, 1, 1).replace(tzinfo=timezone_info if timezone_info else utc_timezone) -def get_single_result(entity, query, results, log=None, show_results=10, raise_on_error=True, sort_by_date=True): +def get_single_result(entity, query, results, log=None, show_results=1, raise_on_error=True, sort_by_date=True): if not results: if not raise_on_error: return None @@ -96,8 +96,12 @@ def get_single_result(entity, query, results, log=None, show_results=10, raise_o if show_results: if not log: log = get_logger() - log.warning('More than one {entity} found when searching for `{query}`' - ' (showing first {show_results} {entity}s follow)'.format(**locals())) + if show_results > 1: + log.warning('{num} {entity} found when searching for `{query}`' + ' (showing first {show_results} {entity}s follow)'.format(num=len(results), **locals())) + else: + log.warning('{num} {entity} found when searching for `{query}`'.format(num=len(results), **locals())) + if sort_by_date: relative_time = get_epoch_beginning_of_time() # sort results based on timestamp and return the newest one diff --git a/clearml/cli/data/__main__.py b/clearml/cli/data/__main__.py index 791226cf..f95d1ae3 100644 --- a/clearml/cli/data/__main__.py +++ b/clearml/cli/data/__main__.py @@ -70,9 +70,7 @@ def cli(): subparsers = parser.add_subparsers(help='Dataset actions', dest='command') create = subparsers.add_parser('create', help='Create a new dataset') - create.add_argument('--parents', type=str, nargs='*', - help='[Optional] Specify dataset parents IDs (i.e. merge all parents). ' - 'Example: a17b4fID1 f0ee5ID2 a17b4f09eID3') + create.add_argument('--parents', type=str, nargs='*', help='Specify dataset parents IDs (i.e. merge all parents)') create.add_argument('--project', type=str, required=False, default=None, help='Dataset project name') create.add_argument('--name', type=str, required=True, default=None, help='Dataset name') create.add_argument('--tags', type=str, nargs='*', help='Dataset user Tags') @@ -100,20 +98,22 @@ def cli(): help='Local folder to sync (support for wildcard selection). ' 'Example: ~/data/*.jpg') sync.add_argument('--parents', type=str, nargs='*', - help='[Optional] Specify dataset parents IDs (i.e. merge all parents). ' - 'Example: a17b4fID1 f0ee5ID2 a17b4f09eID3') + help='[Optional - Create new dataset] Specify dataset parents IDs (i.e. merge all parents)') sync.add_argument('--project', type=str, required=False, default=None, - help='[Optional] Dataset project name') + help='[Optional - Create new dataset] Dataset project name') sync.add_argument('--name', type=str, required=False, default=None, - help='[Optional] Dataset project name') + help='[Optional - Create new dataset] Dataset project name') sync.add_argument('--tags', type=str, nargs='*', - help='[Optional] Dataset user Tags') + help='[Optional - Create new dataset] Dataset user Tags') sync.add_argument('--storage', type=str, default=None, help='Remote storage to use for the dataset files (default: files_server). ' 'Examples: \'s3://bucket/data\', \'gs://bucket/data\', \'azure://bucket/data\', ' '\'/mnt/shared/folder/data\'') sync.add_argument('--skip-close', action='store_true', default=False, help='Do not auto close dataset after syncing folders') + sync.add_argument('--chunk-size', default=-1, type=int, + help='Set dataset artifact chunk size in MB. Default -1, unlimited size. ' + 'Example: 512, dataset will be split and uploaded in 512mb chunks.') sync.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting') sync.set_defaults(func=ds_sync) @@ -136,6 +136,9 @@ def cli(): help='Remote storage to use for the dataset files (default: files_server). ' 'Examples: \'s3://bucket/data\', \'gs://bucket/data\', \'azure://bucket/data\', ' '\'/mnt/shared/folder/data\'') + upload.add_argument('--chunk-size', default=-1, type=int, + help='Set dataset artifact chunk size in MB. Default -1, unlimited size. ' + 'Example: 512, dataset will be split and uploaded in 512mb chunks.') upload.add_argument('--verbose', default=False, action='store_true', help='Verbose reporting') upload.set_defaults(func=ds_upload) @@ -148,6 +151,9 @@ def cli(): '\'/mnt/shared/folder/data\'') finalize.add_argument('--disable-upload', action='store_true', default=False, help='Disable automatic upload when closing the dataset') + finalize.add_argument('--chunk-size', default=-1, type=int, + help='Set dataset artifact chunk size in MB. Default -1, unlimited size. ' + 'Example: 512, dataset will be split and uploaded in 512mb chunks.') finalize.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting') finalize.set_defaults(func=ds_close) @@ -216,6 +222,14 @@ def cli(): get.add_argument('--link', type=str, default=None, help='Create a soft link (not supported on Windows) to a ' 'read-only cached folder containing the dataset') + get.add_argument('--part', type=int, default=None, + help='Retrieve a partial copy of the dataset. ' + 'Part number (0 to `num-parts`-1) of total parts --num-parts.') + get.add_argument('--num-parts', type=int, default=None, + help='Total number of parts to divide the dataset to. ' + 'Notice minimum retrieved part is a single chunk in a dataset (or its parents).' + 'Example: Dataset gen4, with 3 parents, each with a single chunk, ' + 'can be divided into 4 parts') get.add_argument('--overwrite', action='store_true', default=False, help='If True, overwrite the target folder') get.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting') get.set_defaults(func=ds_get) @@ -274,7 +288,7 @@ def ds_get(args): pass if args.copy: ds_folder = args.copy - ds.get_mutable_local_copy(target_folder=ds_folder) + ds.get_mutable_local_copy(target_folder=ds_folder, part=args.part, num_parts=args.num_parts) else: if args.link: Path(args.link).mkdir(parents=True, exist_ok=True) @@ -286,7 +300,7 @@ def ds_get(args): Path(args.link).unlink() except Exception: raise ValueError("Target directory {} is not empty. Use --overwrite.".format(args.link)) - ds_folder = ds.get_local_copy() + ds_folder = ds.get_local_copy(part=args.part, num_parts=args.num_parts) if args.link: os.symlink(ds_folder, args.link) ds_folder = args.link @@ -372,7 +386,10 @@ def ds_close(args): raise ValueError("Pending uploads, cannot finalize dataset. run `clearml-data upload`") # upload the files print("Pending uploads, starting dataset upload to {}".format(args.storage or ds.get_default_storage())) - ds.upload(show_progress=True, verbose=args.verbose, output_url=args.storage or None) + ds.upload(show_progress=True, + verbose=args.verbose, + output_url=args.storage or None, + chunk_size=args.chunk_size or -1,) ds.finalize() print('Dataset closed and finalized') @@ -399,7 +416,7 @@ def ds_upload(args): check_null_id(args) print_args(args) ds = Dataset.get(dataset_id=args.id) - ds.upload(verbose=args.verbose, output_url=args.storage or None) + ds.upload(verbose=args.verbose, output_url=args.storage or None, chunk_size=args.chunk_size or -1) print('Dataset upload completed') return 0 @@ -443,7 +460,10 @@ def ds_sync(args): if ds.is_dirty(): # upload the files print("Pending uploads, starting dataset upload to {}".format(args.storage or ds.get_default_storage())) - ds.upload(show_progress=True, verbose=args.verbose, output_url=args.storage or None) + ds.upload(show_progress=True, + verbose=args.verbose, + output_url=args.storage or None, + chunk_size=args.chunk_size or -1, ) ds.finalize() print('Dataset closed and finalized') diff --git a/clearml/datasets/dataset.py b/clearml/datasets/dataset.py index fd2cd2e6..12fc345d 100644 --- a/clearml/datasets/dataset.py +++ b/clearml/datasets/dataset.py @@ -16,6 +16,7 @@ from .. import Task, StorageManager, Logger from ..backend_api.session.client import APIClient from ..backend_interface.task.development.worker import DevWorker from ..backend_interface.util import mutually_exclusive, exact_match_regex +from ..config import deferred_config from ..debugging.log import LoggerRoot from ..storage.helper import StorageHelper from ..storage.cache import CacheManager @@ -33,6 +34,8 @@ class FileEntry(object): hash = attrib(default=None, type=str) parent_dataset_id = attrib(default=None, type=str) size = attrib(default=None, type=int) + # support multi part artifact storage + artifact_name = attrib(default=None, type=str) # cleared when file is uploaded. local_path = attrib(default=None, type=str) @@ -40,6 +43,7 @@ class FileEntry(object): # type: () -> Dict state = dict(relative_path=self.relative_path, hash=self.hash, parent_dataset_id=self.parent_dataset_id, size=self.size, + artifact_name=self.artifact_name, **dict([('local_path', self.local_path)] if self.local_path else ())) return state @@ -47,13 +51,15 @@ class FileEntry(object): class Dataset(object): __private_magic = 42 * 1337 __state_entry_name = 'state' - __data_entry_name = 'data' + __default_data_entry_name = 'data' + __data_entry_name_prefix = 'data_' __cache_context = 'datasets' __tag = 'dataset' __cache_folder_prefix = 'ds_' __dataset_folder_template = CacheManager.set_context_folder_lookup(__cache_context, "{0}_archive_{1}") __preview_max_file_entries = 15000 __preview_max_size = 5 * 1024 * 1024 + _dataset_chunk_size_mb = deferred_config("storage.dataset_chunk_size_mb", 512, transform=int) def __init__(self, _private, task=None, dataset_project=None, dataset_name=None, dataset_tags=None): # type: (int, Optional[Task], Optional[str], Optional[str], Optional[Sequence[str]]) -> () @@ -68,8 +74,14 @@ class Dataset(object): if task: self._task_pinger = None self._created_task = False + task_status = task.data.status + # if we are continuing aborted Task, force the state + if str(task_status) == 'stopped': + task.mark_started(force=True) + task_status = 'in_progress' + # If we are reusing the main current Task, make sure we set its type to data_processing - if str(task.data.status) in ('created', 'in_progress'): + if str(task_status) in ('created', 'in_progress'): if str(task.task_type) != str(Task.TaskTypes.data_processing): task.set_task_type(task_type=Task.TaskTypes.data_processing) task.set_system_tags((task.get_system_tags() or []) + [self.__tag]) @@ -114,6 +126,11 @@ class Dataset(object): # dirty flag, set True by any function call changing the dataset (regardless of weather it did anything) self._dirty = False self._using_current_task = False + # set current artifact name to be used (support for multiple upload sessions) + self._data_artifact_name = self._get_next_data_artifact_name() + # store a cached lookup of the number of chunks each parent dataset has. + # this will help with verifying we have n up-to-date partial local copy + self._dependency_chunk_lookup = None # type: Optional[Dict[str, int]] @property def id(self): @@ -247,8 +264,8 @@ class Dataset(object): def sync_folder(self, local_path, dataset_path=None, verbose=False): # type: (Union[Path, _Path, str], Union[Path, _Path, str], bool) -> (int, int) """ - Synchronize the dataset with a local folder. The dataset is synchronized recursively from the `local_path` into - the `dataset_path` (default: dataset root). + Synchronize the dataset with a local folder. The dataset is synchronized from the + relative_base_folder (default: dataset root) and deeper with the specified local path. :param local_path: Local folder to sync (assumes all files and recursive) :param dataset_path: Target dataset path to sync with (default the root of the dataset) @@ -294,8 +311,8 @@ class Dataset(object): self._serialize() return removed_files, added_files - def upload(self, show_progress=True, verbose=False, output_url=None, compression=None): - # type: (bool, bool, Optional[str], Optional[str]) -> () + def upload(self, show_progress=True, verbose=False, output_url=None, compression=None, chunk_size=None): + # type: (bool, bool, Optional[str], Optional[str], int) -> () """ Start file uploading, the function returns when all files are uploaded. @@ -304,6 +321,9 @@ class Dataset(object): :param output_url: Target storage for the compressed dataset (default: file server) Examples: `s3://bucket/data`, `gs://bucket/data` , `azure://bucket/data` , `/mnt/share/data` :param compression: Compression algorithm for the Zipped dataset file (default: ZIP_DEFLATED) + :param chunk_size: Artifact chunk size (MB) for the compressed dataset, + if not provided (None) use the default chunk size (512mb). + If -1 is provided, use a single zip artifact for the entire dataset change-set (old behaviour) """ # set output_url if output_url: @@ -314,72 +334,123 @@ class Dataset(object): dict(show_progress=show_progress, verbose=verbose, output_url=output_url, compression=compression)), print_console=False) - fd, zip_file = mkstemp( - prefix='dataset.{}.'.format(self._id), suffix='.zip' - ) - archive_preview = '' - count = 0 + list_zipped_artifacts = [] # List[Tuple[Path, int, str, str]] + list_file_entries = list(self._dataset_file_entries.values()) + total_size = 0 + chunk_size = int(self._dataset_chunk_size_mb if not chunk_size else chunk_size) try: - with ZipFile(zip_file, 'w', allowZip64=True, compression=compression or ZIP_DEFLATED) as zf: - for file_entry in self._dataset_file_entries.values(): - if not file_entry.local_path: - # file is located in a different version - continue - filename = Path(file_entry.local_path) - if not filename.is_file(): - LoggerRoot.get_base_logger().warning( - "Could not store dataset file {}. File skipped".format(file_entry.local_path)) - # mark for removal - file_entry.relative_path = None - continue - if verbose: - self._task.get_logger().report_text('Compressing {}'.format(filename.as_posix())) + from tqdm import tqdm # noqa + a_tqdm = tqdm(total=len(list_file_entries)) + except ImportError: + a_tqdm = None - relative_file_name = file_entry.relative_path - zf.write(filename.as_posix(), arcname=relative_file_name) - archive_preview += '{} - {}\n'.format( - relative_file_name, format_size(filename.stat().st_size)) - file_entry.local_path = None - count += 1 - except Exception as e: - # failed uploading folder: - LoggerRoot.get_base_logger().warning( - 'Exception {}\nFailed zipping dataset.'.format(e)) - return False - finally: - os.close(fd) + while list_file_entries: + fd, zip_file = mkstemp( + prefix='dataset.{}.'.format(self._id), suffix='.zip' + ) + archive_preview = '' + count = 0 + processed = 0 + zip_file = Path(zip_file) + print('{}Compressing local files, chunk {} [remaining {} files]'.format( + '\n' if a_tqdm else '', 1+len(list_zipped_artifacts), len(list_file_entries))) + try: + with ZipFile(zip_file.as_posix(), 'w', allowZip64=True, compression=compression or ZIP_DEFLATED) as zf: + for file_entry in list_file_entries: + processed += 1 + if a_tqdm: + a_tqdm.update() + if not file_entry.local_path: + # file is already in an uploaded artifact + continue + filename = Path(file_entry.local_path) + if not filename.is_file(): + LoggerRoot.get_base_logger().warning( + "Could not store dataset file {}. File skipped".format(file_entry.local_path)) + # mark for removal + file_entry.relative_path = None + continue + if verbose: + self._task.get_logger().report_text('Compressing {}'.format(filename.as_posix())) - zip_file = Path(zip_file) + relative_file_name = file_entry.relative_path + zf.write(filename.as_posix(), arcname=relative_file_name) + archive_preview += '{} - {}\n'.format( + relative_file_name, format_size(filename.stat().st_size)) + file_entry.artifact_name = self._data_artifact_name + count += 1 - if not count: - zip_file.unlink() + # limit the size of a single artifact + if chunk_size > 0 and zip_file.stat().st_size >= chunk_size * (1024**2): + break + except Exception as e: + # failed uploading folder: + LoggerRoot.get_base_logger().warning( + 'Exception {}\nFailed zipping dataset.'.format(e)) + return False + finally: + os.close(fd) + + if not count: + zip_file.unlink() + + total_size += zip_file.stat().st_size + # let's see what's left + list_file_entries = list_file_entries[processed:] + # update the artifact preview + archive_preview = 'Dataset archive content [{} files]:\n'.format(count) + archive_preview + # add into the list + list_zipped_artifacts += [(zip_file, count, archive_preview, self._data_artifact_name)] + # next artifact name to use + self._data_artifact_name = self._get_next_data_artifact_name(self._data_artifact_name) + + if a_tqdm: + a_tqdm.close() + + self._task.get_logger().report_text( + 'File compression completed: total size {}, {} chunked stored (average size {})'.format( + format_size(total_size), + len(list_zipped_artifacts), + format_size(total_size / len(list_zipped_artifacts)))) + + if not list_zipped_artifacts: LoggerRoot.get_base_logger().info('No pending files, skipping upload.') self._dirty = False self._serialize() return True - archive_preview = 'Dataset archive content [{} files]:\n'.format(count) + archive_preview + for i, (zip_file, count, archive_preview, artifact_name) in enumerate(list_zipped_artifacts): + # noinspection PyBroadException + try: + # let's try to rename it + new_zip_file = zip_file.parent / 'dataset.{}.zip'.format(self._id) + zip_file.rename(new_zip_file) + zip_file = new_zip_file + except Exception: + pass + + # start upload + zip_file_size = format_size(Path(zip_file).stat().st_size) + self._task.get_logger().report_text( + 'Uploading compressed dataset changes {}/{} ({} files {}) to {}'.format( + i+1, len(list_zipped_artifacts), count, zip_file_size, self.get_default_storage())) + + self._task.upload_artifact( + name=artifact_name, artifact_object=Path(zip_file), preview=archive_preview, + delete_after_upload=True, wait_on_upload=True) + + # mark as upload completed and serialize + for file_entry in self._dataset_file_entries.values(): + if file_entry.parent_dataset_id == self._id and file_entry.artifact_name == artifact_name: + file_entry.local_path = None + # serialize current state + self._serialize() - # noinspection PyBroadException - try: - # let's try to rename it - new_zip_file = zip_file.parent / 'dataset.{}.zip'.format(self._id) - zip_file.rename(new_zip_file) - zip_file = new_zip_file - except Exception: - pass # remove files that could not be zipped, containing Null relative Path - self._dataset_file_entries = {k: v for k, v in self._dataset_file_entries.items() - if v.relative_path is not None} - # start upload - zip_file_size = format_size(Path(zip_file).stat().st_size) - self._task.get_logger().report_text( - 'Uploading compressed dataset changes ({} files, total {}) to {}'.format( - count, zip_file_size, self.get_default_storage())) - self._task.upload_artifact( - name=self.__data_entry_name, artifact_object=Path(zip_file), preview=archive_preview, - delete_after_upload=True, wait_on_upload=True) - self._task.get_logger().report_text('Upload completed ({})'.format(zip_file_size)) + self._dataset_file_entries = { + k: v for k, v in self._dataset_file_entries.items() if v.relative_path is not None} + # report upload completed + self._task.get_logger().report_text('Upload completed ({})'.format(format_size(total_size))) self._add_script_call( 'upload', show_progress=show_progress, verbose=verbose, output_url=output_url, compression=compression) @@ -409,7 +480,7 @@ class Dataset(object): self._task.get_logger().report_text('Finalizing dataset', print_console=False) # make sure we have no redundant parent versions - self._serialize() + self._serialize(update_dependency_chunk_lookup=True) self._add_script_call('finalize') if verbose: print('Updating statistics and genealogy') @@ -453,14 +524,26 @@ class Dataset(object): return self._task.get_status() not in ( Task.TaskStatusEnum.in_progress, Task.TaskStatusEnum.created, Task.TaskStatusEnum.failed) - def get_local_copy(self, use_soft_links=None, raise_on_error=True): - # type: (bool, bool) -> str + def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_on_error=True): + # type: (bool, Optional[int], Optional[int], bool) -> str """ return a base folder with a read-only (immutable) local copy of the entire dataset download and copy / soft-link, files from all the parent dataset versions :param use_soft_links: If True use soft links, default False on windows True on Posix systems + :param part: Optional, if provided only download the selected part (index) of the Dataset. + First part number is `0` and last part is `num_parts-1` + Notice, if `num_parts` is not provided, number of parts will be equal to the total number of chunks + (i.e. sum over all chunks from the specified Dataset including all parent Datasets). + This argument is passed to parent datasets, as well as the implicit `num_parts`, + allowing users to get a partial copy of the entire dataset, for multi node/step processing. + :param num_parts: Optional, If specified normalize the number of chunks stored to the + requested number of parts. Notice that the actual chunks used per part are rounded down. + Example: Assuming total 8 chunks for this dataset (including parent datasets), + and `num_parts=5`, the chunk index used per parts would be: + part=0 -> chunks[0,5], part=1 -> chunks[1,6], part=2 -> chunks[2,7], part=3 -> chunks[3, ] :param raise_on_error: If True raise exception if dataset merging failed on any file + :return: A base folder for the entire dataset """ assert self._id @@ -468,11 +551,12 @@ class Dataset(object): self._task = Task.get_task(task_id=self._id) # now let's merge the parents - target_folder = self._merge_datasets(use_soft_links=use_soft_links, raise_on_error=raise_on_error) + target_folder = self._merge_datasets( + use_soft_links=use_soft_links, raise_on_error=raise_on_error, part=part, num_parts=num_parts) return target_folder - def get_mutable_local_copy(self, target_folder, overwrite=False, raise_on_error=True): - # type: (Union[Path, _Path, str], bool, bool) -> Optional[str] + def get_mutable_local_copy(self, target_folder, overwrite=False, part=None, num_parts=None, raise_on_error=True): + # type: (Union[Path, _Path, str], bool, Optional[int], Optional[int], bool) -> Optional[str] """ return a base folder with a writable (mutable) local copy of the entire dataset download and copy / soft-link, files from all the parent dataset versions @@ -480,7 +564,19 @@ class Dataset(object): :param target_folder: Target folder for the writable copy :param overwrite: If True, recursively delete the target folder before creating a copy. If False (default) and target folder contains files, raise exception or return None + :param part: Optional, if provided only download the selected part (index) of the Dataset. + First part number is `0` and last part is `num_parts-1` + Notice, if `num_parts` is not provided, number of parts will be equal to the total number of chunks + (i.e. sum over all chunks from the specified Dataset including all parent Datasets). + This argument is passed to parent datasets, as well as the implicit `num_parts`, + allowing users to get a partial copy of the entire dataset, for multi node/step processing. + :param num_parts: Optional, If specified normalize the number of chunks stored to the + requested number of parts. Notice that the actual chunks used per part are rounded down. + Example: Assuming total 8 chunks for this dataset (including parent datasets), + and `num_parts=5`, the chunk index used per parts would be: + part=0 -> chunks[0,5], part=1 -> chunks[1,6], part=2 -> chunks[2,7], part=3 -> chunks[3, ] :param raise_on_error: If True raise exception if dataset merging failed on any file + :return: A the target folder containing the entire dataset """ assert self._id @@ -497,8 +593,8 @@ class Dataset(object): return None shutil.rmtree(target_folder.as_posix()) - ro_folder = self.get_local_copy(raise_on_error=raise_on_error) - shutil.copytree(ro_folder, target_folder.as_posix()) + ro_folder = self.get_local_copy(part=part, num_parts=num_parts, raise_on_error=raise_on_error) + shutil.copytree(ro_folder, target_folder.as_posix(), symlinks=False) return target_folder.as_posix() def list_files(self, dataset_path=None, recursive=True, dataset_id=None): @@ -680,7 +776,7 @@ class Dataset(object): If True, the dataset is created on the current Task. :return: Newly created Dataset object """ - parent_datasets = [cls.get(dataset_id=p) if isinstance(p, str) else p for p in (parent_datasets or [])] + parent_datasets = [cls.get(dataset_id=p) if not isinstance(p, Dataset) else p for p in (parent_datasets or [])] if any(not p.is_final() for p in parent_datasets): raise ValueError("Cannot inherit from a parent that was not finalized/closed") @@ -831,15 +927,19 @@ class Dataset(object): if task.status == 'created': raise ValueError('Dataset id={} is in draft mode, delete and recreate it'.format(task.id)) force_download = False if task.status in ('stopped', 'published', 'closed', 'completed') else True - local_state_file = StorageManager.get_local_copy( - remote_url=task.artifacts[cls.__state_entry_name].url, cache_context=cls.__cache_context, - extract_archive=False, name=task.id, force_download=force_download) - if not local_state_file: - raise ValueError('Could not load Dataset id={} state'.format(task.id)) + if cls.__state_entry_name in task.artifacts: + local_state_file = StorageManager.get_local_copy( + remote_url=task.artifacts[cls.__state_entry_name].url, cache_context=cls.__cache_context, + extract_archive=False, name=task.id, force_download=force_download) + if not local_state_file: + raise ValueError('Could not load Dataset id={} state'.format(task.id)) + else: + # we could not find the serialized state, start empty + local_state_file = {} instance = cls._deserialize(local_state_file, task) # remove the artifact, just in case - if force_download: + if force_download and local_state_file: os.unlink(local_state_file) return instance @@ -853,6 +953,23 @@ class Dataset(object): """ return self._task.get_logger() + def get_num_chunks(self, include_parents=True): + # type: (bool) -> int + """ + Return the number of chunks stored on this dataset + (it does not imply on the number of chunks parent versions store) + + :param include_parents: If True (default), + return the total number of chunks from this version and all parent versions. + If False, only return the number of chunks we stored on this specific version. + + :return: Number of chunks stored on the dataset. + """ + if not include_parents: + return len(self._get_data_artifact_names()) + + return sum(self._get_dependency_chunk_lookup().values()) + @classmethod def squash(cls, dataset_name, dataset_ids=None, dataset_project_name_pairs=None, output_url=None): # type: (str, Optional[Sequence[Union[str, Dataset]]],Optional[Sequence[(str, str)]], Optional[str]) -> Dataset @@ -987,7 +1104,7 @@ class Dataset(object): self._task.get_logger().report_text('Generating SHA2 hash for {} files'.format(len(file_entries))) pool = ThreadPool(cpu_count() * 2) try: - import tqdm + import tqdm # noqa for _ in tqdm.tqdm(pool.imap_unordered(self._calc_file_hash, file_entries), total=len(file_entries)): pass except ImportError: @@ -1010,10 +1127,17 @@ class Dataset(object): self._dataset_file_entries[f.relative_path] = f count += 1 elif f.parent_dataset_id == self._id and ds_cur_f.parent_dataset_id == self._id: - if verbose: - self._task.get_logger().report_text('Re-Added {}'.format(f.relative_path)) - self._dataset_file_entries[f.relative_path] = f - count += 1 + # check if we have the file in an already uploaded chunk + if ds_cur_f.local_path is None: + # skipping, already uploaded. + if verbose: + self._task.get_logger().report_text('Skipping {}'.format(f.relative_path)) + else: + # if we never uploaded it, mark for upload + if verbose: + self._task.get_logger().report_text('Re-Added {}'.format(f.relative_path)) + self._dataset_file_entries[f.relative_path] = f + count += 1 else: if verbose: self._task.get_logger().report_text('Unchanged {}'.format(f.relative_path)) @@ -1028,17 +1152,22 @@ class Dataset(object): # collect all dataset versions used_dataset_versions = set(f.parent_dataset_id for f in self._dataset_file_entries.values()) used_dataset_versions.add(self._id) - current_parents = self._dependency_graph.get(self._id) + current_parents = self._dependency_graph.get(self._id) or [] # remove parent versions we no longer need from the main version list # per version, remove unnecessary parent versions, if we do not need them - self._dependency_graph = {k: [p for p in parents if p in used_dataset_versions] - for k, parents in self._dependency_graph.items() if k in used_dataset_versions} + self._dependency_graph = { + k: [p for p in parents or [] if p in used_dataset_versions] + for k, parents in self._dependency_graph.items() if k in used_dataset_versions} # make sure we do not remove our parents, for geology sake self._dependency_graph[self._id] = current_parents - def _serialize(self): + def _serialize(self, update_dependency_chunk_lookup=False): + # type: (bool) -> () """ store current state of the Dataset for later use + + :param update_dependency_chunk_lookup: If True, update the parent versions number of chunks + :return: object to be used for later deserialization """ self._update_dependency_graph() @@ -1049,6 +1178,9 @@ class Dataset(object): id=self._id, dirty=self._dirty, ) + if update_dependency_chunk_lookup: + state['dependency_chunk_lookup'] = self._build_dependency_chunk_lookup() + modified_files = [f['size'] for f in state['dataset_file_entries'] if f.get('parent_dataset_id') == self._id] preview = \ 'Dataset state\n' \ @@ -1060,130 +1192,210 @@ class Dataset(object): self._task.upload_artifact( name=self.__state_entry_name, artifact_object=state, preview=preview, wait_on_upload=True) - def _download_dataset_archive(self): + def _download_dataset_archives(self): """ Download the dataset archive, return a link to locally stored zip file - :return: Path to locally stored zip file + :return: List of paths to locally stored zip files """ pass # TODO: implement - def _extract_dataset_archive(self): + def _extract_dataset_archive( + self, + force=False, + selected_chunks=None, + lock_target_folder=False, + cleanup_target_folder=True, + target_folder=None, + ): + # type: (bool, Optional[List[int]], bool, bool, Optional[Path]) -> str """ Download the dataset archive, and extract the zip content to a cached folder. Notice no merging is done. + :param force: If True extract dataset content even if target folder exists and is not empty + :param selected_chunks: Optional, if provided only download the selected chunks (index) of the Dataset. + Example: Assuming 8 chunks on this version + selected_chunks=[0,1,2] + :param lock_target_folder: If True, local the target folder so the next cleanup will not delete + Notice you should unlock it manually, or wait fro the process to fnish for auto unlocking. + :param cleanup_target_folder: If True remove target folder recursively + :param target_folder: If provided use the specified target folder, default, auto generate from Dataset ID. + :return: Path to a local storage extracted archive """ + assert selected_chunks is None or isinstance(selected_chunks, (list, tuple)) + if not self._task: self._task = Task.get_task(task_id=self._id) + + data_artifact_entries = self._get_data_artifact_names() + + if selected_chunks is not None and data_artifact_entries: + data_artifact_entries = [ + d for d in data_artifact_entries + if self._get_chunk_idx_from_artifact_name(d) in selected_chunks] + + # get cache manager + local_folder = Path(target_folder) if target_folder else \ + self._create_ds_target_folder(lock_target_folder=lock_target_folder) + # check if we have a dataset with empty change set - if not self._task.artifacts.get(self.__data_entry_name): - cache = CacheManager.get_cache_manager(cache_context=self.__cache_context) - local_folder = Path(cache.get_cache_folder()) / self._get_cache_folder_name() - local_folder.mkdir(parents=True, exist_ok=True) + if not data_artifact_entries: + return local_folder.as_posix() + + # check if target folder is not empty + if not force and next(local_folder.glob('*'), None): return local_folder.as_posix() - # download the dataset zip - local_zip = StorageManager.get_local_copy( - remote_url=self._task.artifacts[self.__data_entry_name].url, cache_context=self.__cache_context, - extract_archive=False, name=self._id) - if not local_zip: - raise ValueError("Could not download dataset id={}".format(self._id)) - local_folder = (Path(local_zip).parent / self._get_cache_folder_name()).as_posix() # if we got here, we need to clear the target folder - shutil.rmtree(local_folder, ignore_errors=True) - # noinspection PyProtectedMember - local_folder = StorageManager._extract_to_cache( - cached_file=local_zip, name=self._id, - cache_context=self.__cache_context, target_folder=local_folder) + local_folder = local_folder.as_posix() + if cleanup_target_folder: + shutil.rmtree(local_folder, ignore_errors=True) + # verify target folder exists + Path(local_folder).mkdir(parents=True, exist_ok=True) + + def _download_part(data_artifact_name): + # download the dataset zip + local_zip = StorageManager.get_local_copy( + remote_url=self._task.artifacts[data_artifact_name].url, cache_context=self.__cache_context, + extract_archive=False, name=self._id) + if not local_zip: + raise ValueError("Could not download dataset id={} entry={}".format(self._id, data_artifact_name)) + # noinspection PyProtectedMember + StorageManager._extract_to_cache( + cached_file=local_zip, name=self._id, + cache_context=self.__cache_context, target_folder=local_folder, force=True) + + # download al parts in parallel + # if len(data_artifact_entries) > 1: + # pool = ThreadPool() + # pool.map(_download_part, data_artifact_entries) + # pool.close() + # else: + # _download_part(data_artifact_entries[0]) + for d in data_artifact_entries: + _download_part(d) + return local_folder - def _merge_datasets(self, use_soft_links=None, raise_on_error=True): - # type: (bool, bool) -> str + def _create_ds_target_folder(self, part=None, num_parts=None, lock_target_folder=True): + # type: (Optional[int], Optional[int], bool) -> Path + cache = CacheManager.get_cache_manager(cache_context=self.__cache_context) + local_folder = Path(cache.get_cache_folder()) / self._get_cache_folder_name(part=part, num_parts=num_parts) + if lock_target_folder: + cache.lock_cache_folder(local_folder) + local_folder.mkdir(parents=True, exist_ok=True) + return local_folder + + def _get_data_artifact_names(self): + # type: () -> List[str] + data_artifact_entries = [ + a for a in self._task.artifacts + if a and (a == self.__default_data_entry_name or str(a).startswith(self.__data_entry_name_prefix))] + return data_artifact_entries + + def _get_next_data_artifact_name(self, last_artifact_name=None): + # type: (Optional[str]) -> str + if not last_artifact_name: + data_artifact_entries = self._get_data_artifact_names() + if len(data_artifact_entries) < 1: + return self.__default_data_entry_name + else: + data_artifact_entries = [last_artifact_name] + prefix = self.__data_entry_name_prefix + prefix_len = len(prefix) + numbers = sorted([int(a[prefix_len:]) for a in data_artifact_entries if a.startswith(prefix)]) + return '{}{:03d}'.format(prefix, numbers[-1]+1 if numbers else 1) + + def _merge_datasets(self, use_soft_links=None, raise_on_error=True, part=None, num_parts=None): + # type: (bool, bool, Optional[int], Optional[int]) -> str """ download and copy / soft-link, files from all the parent dataset versions :param use_soft_links: If True use soft links, default False on windows True on Posix systems :param raise_on_error: If True raise exception if dataset merging failed on any file + :param part: Optional, if provided only download the selected part (index) of the Dataset. + Notice, if `num_parts` is not provided, number of parts will be equal to the number of chunks. + This argument is passed to parent versions, as well as the implicit `num_parts`, + allowing users to get a partial copy of the entire dataset, for multi node/step processing. + :param num_parts: Optional, If specified normalize the number of chunks stored to the + requested number of parts. Notice that the actual chunks used per part are rounded down. + Example: Assuming 8 chunks on this version, and `num_parts=5`, the chunk index used per parts would be: + part=0 -> chunks[0,5], part=1 -> chunks[1,6], part=2 -> chunks[2,7], part=3 -> chunks[3, ] + :return: the target folder """ + assert part is None or (isinstance(part, int) and part >= 0) + assert num_parts is None or (isinstance(num_parts, int) and num_parts >= 1) + if use_soft_links is None: use_soft_links = False if is_windows() else True - # check if we already have everything - target_base_folder, target_base_size = CacheManager.get_cache_manager( - cache_context=self.__cache_context).get_cache_file(local_filename=self._get_cache_folder_name()) - if target_base_folder and target_base_size is not None: - target_base_folder = Path(target_base_folder) - # check dataset file size, if we have a full match no need for parent dataset download / merge - verified = True - # noinspection PyBroadException - try: - for f in self._dataset_file_entries.values(): - if (target_base_folder / f.relative_path).stat().st_size != f.size: - verified = False - break - except Exception: - verified = False + if part is not None and not num_parts: + num_parts = self.get_num_chunks() - if verified: + # just create the dataset target folder + target_base_folder = self._create_ds_target_folder( + part=part, num_parts=num_parts, lock_target_folder=True) + + # selected specific chunks if `part` was passed + chunk_selection = None if part is None else self._build_chunk_selection(part=part, num_parts=num_parts) + + # check if target folder is not empty, see if it contains everything we need + if target_base_folder and next(target_base_folder.iterdir(), None): + if self._verify_dataset_folder(target_base_folder, part, chunk_selection): + target_base_folder.touch() return target_base_folder.as_posix() else: LoggerRoot.get_base_logger().info('Dataset needs refreshing, fetching all parent datasets') + # we should delete the entire cache folder + shutil.rmtree(target_base_folder.as_posix()) + # make sure we recreate the dataset target folder + target_base_folder.mkdir(parents=True, exist_ok=True) + + # get the dataset dependencies (if `part` was passed, only selected the ones in the selected part) + dependencies_by_order = self._get_dependencies_by_order(include_unused=False, include_current=True) \ + if chunk_selection is None else list(chunk_selection.keys()) # first get our dataset - target_base_folder = Path(self._extract_dataset_archive()) + if self._id in dependencies_by_order: + self._extract_dataset_archive( + force=True, + selected_chunks=chunk_selection.get(self._id) if chunk_selection else None, + cleanup_target_folder=True, + target_folder=target_base_folder, + ) + dependencies_by_order.remove(self._id) + + # update target folder timestamp target_base_folder.touch() - # create thread pool - pool = ThreadPool(cpu_count() * 2) - for dataset_version_id in self._get_dependencies_by_order(): - # make sure we skip over empty dependencies - if dataset_version_id not in self._dependency_graph: - continue + # if we have no dependencies, we can just return now + if not dependencies_by_order: + return target_base_folder.absolute().as_posix() - ds = Dataset.get(dataset_id=dataset_version_id) - ds_base_folder = Path(ds._extract_dataset_archive()) - ds_base_folder.touch() + # extract parent datasets + self._extract_parent_datasets( + target_base_folder=target_base_folder, dependencies_by_order=dependencies_by_order, + chunk_selection=chunk_selection, use_soft_links=use_soft_links, + raise_on_error=False, force=False) - def copy_file(file_entry): - if file_entry.parent_dataset_id != dataset_version_id: - return - source = (ds_base_folder / file_entry.relative_path).as_posix() - target = (target_base_folder / file_entry.relative_path).as_posix() - try: - # make sure we have can overwrite the target file - # noinspection PyBroadException - try: - os.unlink(target) - except Exception: - Path(target).parent.mkdir(parents=True, exist_ok=True) + # verify entire dataset (if failed, force downloading parent datasets) + if not self._verify_dataset_folder(target_base_folder, part, chunk_selection): + LoggerRoot.get_base_logger().info('Dataset parents need refreshing, re-fetching all parent datasets') + # we should delete the entire cache folder + self._extract_parent_datasets( + target_base_folder=target_base_folder, dependencies_by_order=dependencies_by_order, + chunk_selection=chunk_selection, use_soft_links=use_soft_links, + raise_on_error=raise_on_error, force=True) - # copy / link - if use_soft_links: - if not os.path.isfile(source): - raise ValueError("Extracted file missing {}".format(source)) - os.symlink(source, target) - else: - shutil.copy2(source, target, follow_symlinks=True) - except Exception as ex: - LoggerRoot.get_base_logger().warning('{}\nFailed {} file {} to {}'.format( - ex, 'linking' if use_soft_links else 'copying', source, target)) - return ex - - return None - - errors = pool.map(copy_file, self._dataset_file_entries.values()) - if raise_on_error and any(errors): - raise ValueError("Dataset merging failed: {}".format([e for e in errors if e is not None])) - - pool.close() return target_base_folder.absolute().as_posix() - def _get_dependencies_by_order(self, include_unused=False): - # type: (bool) -> List[str] + def _get_dependencies_by_order(self, include_unused=False, include_current=True): + # type: (bool, bool) -> List[str] """ Return the dataset dependencies by order of application (from the last to the current) - :param bool include_unused: If True include unused datasets in the dependencies + :param include_unused: If True include unused datasets in the dependencies + :param include_current: If True include the current dataset ID as the last ID in the list :return: list of str representing the datasets id """ roots = [self._id] @@ -1196,10 +1408,10 @@ class Dataset(object): # add the parents of the current node, only if the parents are in the general graph node list if include_unused and r not in self._dependency_graph: roots.extend(list(reversed( - [p for p in self.get(dataset_id=r)._get_parents() if p not in roots]))) + [p for p in (self.get(dataset_id=r)._get_parents() or []) if p not in roots]))) else: roots.extend(list(reversed( - [p for p in self._dependency_graph.get(r, []) + [p for p in (self._dependency_graph.get(r) or []) if p not in roots and (include_unused or (p in self._dependency_graph))]))) # make sure we cover leftovers @@ -1214,14 +1426,15 @@ class Dataset(object): # add the parents of the current node, only if the parents are in the general graph node list if include_unused and r not in self._dependency_graph: roots.extend(list(reversed( - [p for p in self.get(dataset_id=r)._get_parents() if p not in roots]))) + [p for p in (self.get(dataset_id=r)._get_parents() or []) if p not in roots]))) else: roots.extend(list(reversed( - [p for p in self._dependency_graph.get(r, []) + [p for p in (self._dependency_graph.get(r) or []) if p not in roots and (include_unused or (p in self._dependency_graph))]))) # skip our id - return list(reversed(dependencies[1:])) + dependencies = list(reversed(dependencies[1:])) + return (dependencies + [self._id]) if include_current else dependencies def _get_parents(self): # type: () -> Sequence[str] @@ -1248,10 +1461,24 @@ class Dataset(object): instance = cls(_private=cls.__private_magic, task=task) # assert instance._id == stored_state['id'] # They should match - instance._dependency_graph = stored_state['dependency_graph'] + instance._dependency_graph = stored_state.get('dependency_graph', {}) instance._dirty = stored_state.get('dirty', False) instance._dataset_file_entries = { - s['relative_path']: FileEntry(**s) for s in stored_state['dataset_file_entries']} + s['relative_path']: FileEntry(**s) for s in stored_state.get('dataset_file_entries', [])} + if stored_state.get('dependency_chunk_lookup') is not None: + instance._dependency_chunk_lookup = stored_state.get('dependency_chunk_lookup') + + # update the last used artifact (remove the one we never serialized, they rae considered broken) + if task.status in ('in_progress', 'created', 'stopped'): + artifact_names = set([ + a.artifact_name for a in instance._dataset_file_entries.values() + if a.artifact_name and a.parent_dataset_id == instance._id]) + missing_artifact_name = set(instance._get_data_artifact_names()) - artifact_names + if missing_artifact_name: + instance._task._delete_artifacts(list(missing_artifact_name)) + # if we removed any data artifact, update the next data artifact name + instance._data_artifact_name = instance._get_next_data_artifact_name() + return instance @staticmethod @@ -1272,8 +1499,27 @@ class Dataset(object): """ return 'dsh{}'.format(md5text(dataset_id)) - def _get_cache_folder_name(self): - return '{}{}'.format(self.__cache_folder_prefix, self._id) + def _build_dependency_chunk_lookup(self): + # type: () -> Dict[str, int] + """ + Build the dependency dataset id to number-of-chunks, lookup table + :return: lookup dictionary from dataset-id to number of chunks + """ + # with ThreadPool() as pool: + # chunks_lookup = pool.map( + # lambda d: (d, Dataset.get(dataset_id=d).get_num_chunks()), + # self._dependency_graph.keys()) + # return dict(chunks_lookup) + chunks_lookup = map( + lambda d: (d, Dataset.get(dataset_id=d).get_num_chunks()), + self._dependency_graph.keys()) + return dict(chunks_lookup) + + def _get_cache_folder_name(self, part=None, num_parts=None): + # type: (Optional[int], Optional[int]) -> str + if part is None: + return '{}{}'.format(self.__cache_folder_prefix, self._id) + return '{}{}_{}_{}'.format(self.__cache_folder_prefix, self._id, part, num_parts) def _add_script_call(self, func_name, **kwargs): # type: (str, **Any) -> () @@ -1306,7 +1552,7 @@ class Dataset(object): hovertemplate='', ) # get DAG nodes - nodes = self._get_dependencies_by_order(include_unused=True) + [self.id] + nodes = self._get_dependencies_by_order(include_unused=True, include_current=True) # dataset name lookup # noinspection PyProtectedMember node_names = {t.id: t.name for t in Task._query_tasks(task_ids=nodes, only_fields=['id', 'name'])} @@ -1393,14 +1639,24 @@ class Dataset(object): # report the detailed content of the dataset as configuration, # this allows for easy version comparison in the UI dataset_details = None + dataset_details_header = None + dataset_details_header_template = 'File Name ({} files) - File Size (total {}) - Hash (SHA2)\n' if len(self._dataset_file_entries) < self.__preview_max_file_entries: file_entries = sorted(self._dataset_file_entries.values(), key=lambda x: x.relative_path) - dataset_details = \ - 'File Name - File Size - Hash (SHA2)\n' +\ + dataset_details_header = dataset_details_header_template.format( + len(file_entries), format_size(sum(f.size for f in file_entries)) + ) + dataset_details = dataset_details_header + \ '\n'.join('{} - {} - {}'.format(f.relative_path, f.size, f.hash) for f in file_entries) + # too large to store if not dataset_details or len(dataset_details) > self.__preview_max_size: - dataset_details = 'Dataset content is too large to preview' + if not dataset_details_header: + dataset_details_header = dataset_details_header_template.format( + len(self._dataset_file_entries), + format_size(sum(f.size for f in self._dataset_file_entries.values())) + ) + dataset_details = dataset_details_header + 'Dataset content is too large to preview' # noinspection PyProtectedMember self._task._set_configuration( @@ -1428,3 +1684,154 @@ class Dataset(object): :return: Return True means dataset has pending uploads, call 'upload' to start an upload process. """ return self._dirty + + def _extract_parent_datasets( + self, + target_base_folder, + dependencies_by_order, + chunk_selection, + use_soft_links, + raise_on_error, + force + ): + # type: (Path, List[str], Optional[dict], bool, bool, bool) -> () + # create thread pool, for creating soft-links / copying + # todo: parallelize by parent datasets + pool = ThreadPool(cpu_count() * 2) + for dataset_version_id in dependencies_by_order: + # make sure we skip over empty dependencies + if dataset_version_id not in self._dependency_graph: + continue + selected_chunks = chunk_selection.get(dataset_version_id) if chunk_selection else None + + ds = Dataset.get(dataset_id=dataset_version_id) + ds_base_folder = Path(ds._extract_dataset_archive( + selected_chunks=selected_chunks, + force=force, + lock_target_folder=True, + cleanup_target_folder=False, + )) + ds_base_folder.touch() + + def copy_file(file_entry): + if file_entry.parent_dataset_id != dataset_version_id or \ + (selected_chunks is not None and + self._get_chunk_idx_from_artifact_name(file_entry.artifact_name) not in selected_chunks): + return + source = (ds_base_folder / file_entry.relative_path).as_posix() + target = (target_base_folder / file_entry.relative_path).as_posix() + try: + # make sure we have can overwrite the target file + # noinspection PyBroadException + try: + os.unlink(target) + except Exception: + Path(target).parent.mkdir(parents=True, exist_ok=True) + + # copy / link + if use_soft_links: + if not os.path.isfile(source): + raise ValueError("Extracted file missing {}".format(source)) + os.symlink(source, target) + else: + shutil.copy2(source, target, follow_symlinks=True) + except Exception as ex: + LoggerRoot.get_base_logger().warning('{}\nFailed {} file {} to {}'.format( + ex, 'linking' if use_soft_links else 'copying', source, target)) + return ex + + return None + + errors = pool.map(copy_file, self._dataset_file_entries.values()) + + CacheManager.get_cache_manager(cache_context=self.__cache_context).unlock_cache_folder( + ds_base_folder.as_posix()) + + if raise_on_error and any(errors): + raise ValueError("Dataset merging failed: {}".format([e for e in errors if e is not None])) + pool.close() + + def _verify_dataset_folder(self, target_base_folder, part, chunk_selection): + # type: (Path, Optional[int], Optional[dict]) -> bool + target_base_folder = Path(target_base_folder) + # check dataset file size, if we have a full match no need for parent dataset download / merge + verified = True + # noinspection PyBroadException + try: + for f in self._dataset_file_entries.values(): + # check if we need it for the current part + if part is not None: + f_parts = chunk_selection.get(f.parent_dataset_id, []) + # this is not in our current part, no need to check it. + if self._get_chunk_idx_from_artifact_name(f.artifact_name) not in f_parts: + continue + + # check if the local size and the stored size match (faster than comparing hash) + if (target_base_folder / f.relative_path).stat().st_size != f.size: + verified = False + break + + except Exception: + verified = False + + return verified + + def _get_dependency_chunk_lookup(self): + # type: () -> Dict[str, int] + """ + Return The parent dataset ID to number of chunks lookup table + :return: Dict key is dataset ID, value is total number of chunks for the specific dataset version. + """ + if self._dependency_chunk_lookup is None: + self._dependency_chunk_lookup = self._build_dependency_chunk_lookup() + return self._dependency_chunk_lookup + + def _build_chunk_selection(self, part, num_parts): + # type: (int, int) -> Dict[str, int] + """ + Build the selected chunks from each parent version, based on the current selection. + Notice that for a specific part, one can only get the chunks from parent versions (not including this one) + :param part: Current part index (between 0 and num_parts-1) + :param num_parts: Total number of parts to divide the dataset into + :return: Dict of Dataset ID and their respected chunks used for this part number + """ + # get the chunk dependencies + dependency_chunk_lookup = self._get_dependency_chunk_lookup() + + # first collect the total number of chunks + total_chunks = sum(dependency_chunk_lookup.values()) + + avg_chunk_per_part = total_chunks // num_parts + leftover_chunks = total_chunks % num_parts + + dependencies = self._get_dependencies_by_order(include_unused=False, include_current=True) + # create the part look up + ds_id_chunk_list = [(d, i) for d in dependencies for i in range(dependency_chunk_lookup.get(d, 1))] + + # select the chunks for this part + if part < leftover_chunks: + indexes = ds_id_chunk_list[part*(avg_chunk_per_part+1):(part+1)*(avg_chunk_per_part+1)] + else: + ds_id_chunk_list = ds_id_chunk_list[leftover_chunks*(avg_chunk_per_part+1):] + indexes = ds_id_chunk_list[(part-leftover_chunks)*avg_chunk_per_part: + (part-leftover_chunks+1)*avg_chunk_per_part] + + # convert to lookup + chunk_selection = {} + for d, i in indexes: + chunk_selection[d] = chunk_selection.get(d, []) + [i] + + return chunk_selection + + @classmethod + def _get_chunk_idx_from_artifact_name(cls, artifact_name): + # type: (str) -> int + if not artifact_name: + return -1 + artifact_name = str(artifact_name) + + if artifact_name == cls.__default_data_entry_name: + return 0 + if artifact_name.startswith(cls.__data_entry_name_prefix): + return int(artifact_name[len(cls.__data_entry_name_prefix):]) + return -1 diff --git a/clearml/storage/cache.py b/clearml/storage/cache.py index 6a76047e..d348b18c 100644 --- a/clearml/storage/cache.py +++ b/clearml/storage/cache.py @@ -1,13 +1,20 @@ +import atexit import hashlib +import os import shutil from collections import OrderedDict +from threading import RLock +from typing import Union, Optional, Tuple, Dict + from pathlib2 import Path from .helper import StorageHelper from .util import quote_url from ..config import get_cache_dir, deferred_config from ..debugging.log import LoggerRoot +from ..utilities.locks.utils import Lock as FileLock +from ..utilities.locks.exceptions import LockException class CacheManager(object): @@ -19,17 +26,26 @@ class CacheManager(object): __local_to_remote_url_lookup_max_size = 1024 _context_to_folder_lookup = dict() _default_context_folder_template = "{0}_artifacts_archive_{1}" + _lockfile_prefix = '.lock.' + _lockfile_suffix = '.clearml' class CacheContext(object): + _folder_locks = dict() # type: Dict[str, FileLock] + _lockfile_at_exit_cb = None + def __init__(self, cache_context, default_cache_file_limit=10): + # type: (str, int) -> None self._context = str(cache_context) self._file_limit = int(default_cache_file_limit) + self._rlock = RLock() def set_cache_limit(self, cache_file_limit): + # type: (int) -> int self._file_limit = max(self._file_limit, int(cache_file_limit)) return self._file_limit def get_local_copy(self, remote_url, force_download): + # type: (str, bool) -> Optional[str] helper = StorageHelper.get(remote_url) if not helper: raise ValueError("Storage access failed: {}".format(remote_url)) @@ -59,6 +75,7 @@ class CacheManager(object): @staticmethod def upload_file(local_file, remote_url, wait_for_upload=True, retries=1): + # type: (str, str, bool, int) -> Optional[str] helper = StorageHelper.get(remote_url) result = helper.upload( local_file, remote_url, async_enable=not wait_for_upload, retries=retries, @@ -68,11 +85,13 @@ class CacheManager(object): @classmethod def get_hashed_url_file(cls, url): + # type: (str) -> str str_hash = hashlib.md5(url.encode()).hexdigest() filename = url.split("/")[-1] return "{}.{}".format(str_hash, quote_url(filename)) def get_cache_folder(self): + # type: () -> str """ :return: full path to current contexts cache folder """ @@ -82,6 +101,7 @@ class CacheManager(object): return folder.as_posix() def get_cache_file(self, remote_url=None, local_filename=None): + # type: (Optional[str], Optional[str]) -> Tuple[str, Optional[int]] """ :param remote_url: check if we have the remote url in our cache :param local_filename: if local_file is given, search for the local file/directory in the cache folder @@ -123,10 +143,52 @@ class CacheManager(object): except Exception: pass + # first exclude lock files + lock_files = dict() + files = [] + for f in sorted(folder.iterdir(), reverse=True, key=sort_max_access_time): + if f.name.startswith(CacheManager._lockfile_prefix) and f.name.endswith(CacheManager._lockfile_suffix): + # parse the lock filename + name = f.name[len(CacheManager._lockfile_prefix):-len(CacheManager._lockfile_suffix)] + num, _, name = name.partition('.') + lock_files[name] = lock_files.get(name, []) + [f.as_posix()] + else: + files.append(f) + + # remove new lock files from the list (we will delete them when time comes) + for f in files[:self._file_limit]: + lock_files.pop(f.name, None) + # delete old files - files = sorted(folder.iterdir(), reverse=True, key=sort_max_access_time) files = files[self._file_limit:] - for f in files: + for i, f in enumerate(files): + if i < self._file_limit: + continue + + # check if the file is in the lock folder list: + folder_lock = self._folder_locks.get(f.absolute().as_posix()) + if folder_lock: + # pop from lock files + lock_files.pop(f.name, None) + continue + + # check if someone else holds the lock file + locks = lock_files.get(f.name, []) + for l in locks: + try: + a_lock = FileLock(filename=l) + a_lock.acquire(timeout=0) + a_lock.release() + a_lock.delete_lock_file() + del a_lock + except LockException: + # someone have the lock skip the file + continue + + # if we got here we need to pop from the lock_files, later we will delete the leftover entries + lock_files.pop(f.name, None) + + # if we are here we can delete the file if not f.is_dir(): # noinspection PyBroadException try: @@ -135,23 +197,93 @@ class CacheManager(object): pass else: try: - shutil.rmtree(f) + shutil.rmtree(f.as_posix()) except Exception as e: # failed deleting folder LoggerRoot.get_base_logger().debug( "Exception {}\nFailed deleting folder {}".format(e, f) ) + # cleanup old lock files + for lock_files in lock_files.values(): + for f in lock_files: + # noinspection PyBroadException + try: + os.unlink(f) + except BaseException: + pass + # if file doesn't exist, return file size None # noinspection PyBroadException try: size = new_file.stat().st_size if new_file_exists else None except Exception: size = None + return new_file.as_posix(), size + def lock_cache_folder(self, local_path): + # type: (Union[str, Path]) -> () + """ + Lock a specific cache folder, making sure it will not be deleted in the next + cache cleanup round + :param local_path: Path (str/Path) to a sub-folder inside the instance cache folder + """ + local_path = Path(local_path).absolute() + self._rlock.acquire() + if self._lockfile_at_exit_cb is None: + self._lockfile_at_exit_cb = True + atexit.register(self._lock_file_cleanup_callback) + + lock = self._folder_locks.get(local_path.as_posix()) + i = 0 + # try to create a lock if we do not already have one (if we do, we assume it is locked) + while not lock: + lock_path = local_path.parent / '{}{:03d}.{}{}'.format( + CacheManager._lockfile_prefix, i, local_path.name, CacheManager._lockfile_suffix) + lock = FileLock(filename=lock_path) + + # try to lock folder (if we failed to create lock, try nex number) + try: + lock.acquire(timeout=0) + break + except LockException: + # failed locking, maybe someone else already locked it. + del lock + lock = None + i += 1 + + # store lock + self._folder_locks[local_path.as_posix()] = lock + self._rlock.release() + + def unlock_cache_folder(self, local_path): + # type: (Union[str, Path]) -> () + """ + Lock a specific cache folder, making sure it will not be deleted in the next + cache cleanup round + :param local_path: Path (str/Path) to a sub-folder inside the instance cache folder + """ + local_path = Path(local_path).absolute() + self._rlock.acquire() + # pop lock + lock = self._folder_locks.pop(local_path.as_posix(), None) + if lock: + lock.release() + lock.delete_lock_file() + del lock + + self._rlock.release() + + @classmethod + def _lock_file_cleanup_callback(cls): + for lock in cls._folder_locks.values(): + lock.release() + lock.delete_lock_file() + @classmethod def get_cache_manager(cls, cache_context=None, cache_file_limit=None): + # type: (Optional[str], Optional[int]) -> CacheManager.CacheContext cache_context = cache_context or cls._default_context if cache_context not in cls.__cache_managers: cls.__cache_managers[cache_context] = cls.CacheContext( @@ -165,6 +297,7 @@ class CacheManager(object): @staticmethod def get_remote_url(local_copy_path): + # type: (str) -> str if not CacheManager._local_to_remote_url_lookup: return local_copy_path @@ -178,6 +311,7 @@ class CacheManager(object): @staticmethod def _add_remote_url(remote_url, local_copy_path): + # type: (str, str) -> () # so that we can disable the cache lookup altogether if CacheManager._local_to_remote_url_lookup is None: return @@ -206,11 +340,13 @@ class CacheManager(object): @classmethod def set_context_folder_lookup(cls, context, name_template): + # type: (str, str) -> str cls._context_to_folder_lookup[str(context)] = str(name_template) return str(name_template) @classmethod def get_context_folder_lookup(cls, context): + # type: (Optional[str]) -> str if not context: return cls._default_context_folder_template return cls._context_to_folder_lookup.get(str(context), cls._default_context_folder_template) diff --git a/clearml/storage/manager.py b/clearml/storage/manager.py index 22c46174..b1c48888 100644 --- a/clearml/storage/manager.py +++ b/clearml/storage/manager.py @@ -97,8 +97,16 @@ class StorageManager(object): ).set_cache_limit(cache_file_limit) @classmethod - def _extract_to_cache(cls, cached_file, name, cache_context=None, target_folder=None, cache_path_encoding=None): - # type: (str, str, Optional[str], Optional[str], Optional[str]) -> str + def _extract_to_cache( + cls, + cached_file, # type: str + name, # type: str + cache_context=None, # type: Optional[str] + target_folder=None, # type: Optional[str] + cache_path_encoding=None, # type: Optional[str] + force=False, # type: bool + ): + # type: (...) -> str """ Extract cached file to cache folder :param str cached_file: local copy of archive file @@ -108,6 +116,7 @@ class StorageManager(object): :param str cache_path_encoding: specify representation of the local path of the cached files, this will always point to local cache folder, even if we have direct access file. Used for extracting the cached archived based on cache_path_encoding + :param bool force: Force archive extraction even if target folder exists :return: cached folder containing the extracted archive content """ if not cached_file: @@ -133,7 +142,7 @@ class StorageManager(object): target_folder = cache_folder / CacheManager.get_context_folder_lookup( cache_context).format(archive_suffix, name) - if target_folder.is_dir(): + if target_folder.is_dir() and not force: # noinspection PyBroadException try: target_folder.touch(exist_ok=True) @@ -143,9 +152,14 @@ class StorageManager(object): base_logger = LoggerRoot.get_base_logger() try: - temp_target_folder = cache_folder / "{0}_{1}_{2}".format( - target_folder.name, time() * 1000, str(random()).replace('.', '')) - temp_target_folder.mkdir(parents=True, exist_ok=True) + # if target folder exists, meaning this is forced ao we extract directly into target folder + if target_folder.is_dir(): + temp_target_folder = target_folder + else: + temp_target_folder = cache_folder / "{0}_{1}_{2}".format( + target_folder.name, time() * 1000, str(random()).replace('.', '')) + temp_target_folder.mkdir(parents=True, exist_ok=True) + if suffix == ".zip": ZipFile(cached_file.as_posix()).extractall(path=temp_target_folder.as_posix()) elif suffix == ".tar.gz": @@ -155,23 +169,24 @@ class StorageManager(object): with tarfile.open(cached_file.as_posix(), mode='r:gz') as file: file.extractall(temp_target_folder.as_posix()) - # we assume we will have such folder if we already extract the file - # noinspection PyBroadException - try: - # if rename fails, it means that someone else already manged to extract the file, delete the current - # folder and return the already existing cached zip folder - shutil.move(temp_target_folder.as_posix(), target_folder.as_posix()) - except Exception: - if target_folder.exists(): - target_folder.touch(exist_ok=True) - else: - base_logger.warning( - "Failed renaming {0} to {1}".format(temp_target_folder.as_posix(), target_folder.as_posix())) + if temp_target_folder != target_folder: + # we assume we will have such folder if we already extract the file + # noinspection PyBroadException try: - shutil.rmtree(temp_target_folder.as_posix()) - except Exception as ex: - base_logger.warning( - "Exception {}\nFailed deleting folder {}".format(ex, temp_target_folder.as_posix())) + # if rename fails, it means that someone else already manged to extract the file, delete the current + # folder and return the already existing cached zip folder + shutil.move(temp_target_folder.as_posix(), target_folder.as_posix()) + except Exception: + if target_folder.exists(): + target_folder.touch(exist_ok=True) + else: + base_logger.warning( + "Failed renaming {0} to {1}".format(temp_target_folder.as_posix(), target_folder.as_posix())) + try: + shutil.rmtree(temp_target_folder.as_posix()) + except Exception as ex: + base_logger.warning( + "Exception {}\nFailed deleting folder {}".format(ex, temp_target_folder.as_posix())) except Exception as ex: # failed extracting the file: base_logger.warning( diff --git a/clearml/utilities/locks/utils.py b/clearml/utilities/locks/utils.py index 6d390380..01f2f22d 100644 --- a/clearml/utilities/locks/utils.py +++ b/clearml/utilities/locks/utils.py @@ -179,6 +179,22 @@ class Lock(object): pass self.fh = None + def delete_lock_file(self): + # type: () -> bool + """ + Remove the local file used for locking (fail if file is locked) + + :return: True is successful + """ + if self.fh: + return False + # noinspection PyBroadException + try: + os.unlink(path=self.filename) + except BaseException: + return False + return True + def _get_fh(self): '''Get a new filehandle''' return open(self.filename, self.mode, **self.file_open_kwargs)