Add clearml-Data (Datasets) multi-chunk support

This commit is contained in:
allegroai 2021-09-25 23:07:49 +03:00
parent 0dd9ba8adc
commit 844c01e15b
8 changed files with 848 additions and 222 deletions

View File

@ -179,8 +179,8 @@ class PrintPatchLogger(object):
cr_flush_period = None
def __init__(self, stream, logger=None, level=logging.INFO):
if self.__class__.cr_flush_period is None:
self.__class__.cr_flush_period = config.get("development.worker.console_cr_flush_period", 0)
if PrintPatchLogger.cr_flush_period is None:
PrintPatchLogger.cr_flush_period = config.get("development.worker.console_cr_flush_period", 0)
PrintPatchLogger.patched = True
self._terminal = stream
self._log = logger

View File

@ -1266,6 +1266,34 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
self._edit(execution=execution)
return self.data.execution.artifacts or []
def _delete_artifacts(self, artifact_names):
# type: (Sequence[str]) -> bool
"""
Delete a list of artifacts, by artifact name, from the Task.
:param list artifact_names: list of artifact names
:return: True if successful
"""
if not Session.check_min_api_version('2.3'):
return False
if not isinstance(artifact_names, (list, tuple)):
raise ValueError('Expected artifact names as List[str]')
with self._edit_lock:
if Session.check_min_api_version("2.13") and not self._offline_mode:
req = tasks.DeleteArtifactsRequest(
task=self.task_id, artifacts=[{"key": n, "mode": "output"} for n in artifact_names], force=True)
res = self.send(req, raise_on_errors=False)
if not res or not res.response or not res.response.deleted:
return False
self.reload()
else:
self.reload()
execution = self.data.execution
execution.artifacts = [a for a in execution.artifacts or [] if a.key not in artifact_names]
self._edit(execution=execution)
return self.data.execution.artifacts or []
def _set_model_design(self, design=None):
# type: (str) -> ()
with self._edit_lock:

View File

@ -85,7 +85,7 @@ def get_epoch_beginning_of_time(timezone_info=None):
return datetime(1970, 1, 1).replace(tzinfo=timezone_info if timezone_info else utc_timezone)
def get_single_result(entity, query, results, log=None, show_results=10, raise_on_error=True, sort_by_date=True):
def get_single_result(entity, query, results, log=None, show_results=1, raise_on_error=True, sort_by_date=True):
if not results:
if not raise_on_error:
return None
@ -96,8 +96,12 @@ def get_single_result(entity, query, results, log=None, show_results=10, raise_o
if show_results:
if not log:
log = get_logger()
log.warning('More than one {entity} found when searching for `{query}`'
' (showing first {show_results} {entity}s follow)'.format(**locals()))
if show_results > 1:
log.warning('{num} {entity} found when searching for `{query}`'
' (showing first {show_results} {entity}s follow)'.format(num=len(results), **locals()))
else:
log.warning('{num} {entity} found when searching for `{query}`'.format(num=len(results), **locals()))
if sort_by_date:
relative_time = get_epoch_beginning_of_time()
# sort results based on timestamp and return the newest one

View File

@ -70,9 +70,7 @@ def cli():
subparsers = parser.add_subparsers(help='Dataset actions', dest='command')
create = subparsers.add_parser('create', help='Create a new dataset')
create.add_argument('--parents', type=str, nargs='*',
help='[Optional] Specify dataset parents IDs (i.e. merge all parents). '
'Example: a17b4fID1 f0ee5ID2 a17b4f09eID3')
create.add_argument('--parents', type=str, nargs='*', help='Specify dataset parents IDs (i.e. merge all parents)')
create.add_argument('--project', type=str, required=False, default=None, help='Dataset project name')
create.add_argument('--name', type=str, required=True, default=None, help='Dataset name')
create.add_argument('--tags', type=str, nargs='*', help='Dataset user Tags')
@ -100,20 +98,22 @@ def cli():
help='Local folder to sync (support for wildcard selection). '
'Example: ~/data/*.jpg')
sync.add_argument('--parents', type=str, nargs='*',
help='[Optional] Specify dataset parents IDs (i.e. merge all parents). '
'Example: a17b4fID1 f0ee5ID2 a17b4f09eID3')
help='[Optional - Create new dataset] Specify dataset parents IDs (i.e. merge all parents)')
sync.add_argument('--project', type=str, required=False, default=None,
help='[Optional] Dataset project name')
help='[Optional - Create new dataset] Dataset project name')
sync.add_argument('--name', type=str, required=False, default=None,
help='[Optional] Dataset project name')
help='[Optional - Create new dataset] Dataset project name')
sync.add_argument('--tags', type=str, nargs='*',
help='[Optional] Dataset user Tags')
help='[Optional - Create new dataset] Dataset user Tags')
sync.add_argument('--storage', type=str, default=None,
help='Remote storage to use for the dataset files (default: files_server). '
'Examples: \'s3://bucket/data\', \'gs://bucket/data\', \'azure://bucket/data\', '
'\'/mnt/shared/folder/data\'')
sync.add_argument('--skip-close', action='store_true', default=False,
help='Do not auto close dataset after syncing folders')
sync.add_argument('--chunk-size', default=-1, type=int,
help='Set dataset artifact chunk size in MB. Default -1, unlimited size. '
'Example: 512, dataset will be split and uploaded in 512mb chunks.')
sync.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting')
sync.set_defaults(func=ds_sync)
@ -136,6 +136,9 @@ def cli():
help='Remote storage to use for the dataset files (default: files_server). '
'Examples: \'s3://bucket/data\', \'gs://bucket/data\', \'azure://bucket/data\', '
'\'/mnt/shared/folder/data\'')
upload.add_argument('--chunk-size', default=-1, type=int,
help='Set dataset artifact chunk size in MB. Default -1, unlimited size. '
'Example: 512, dataset will be split and uploaded in 512mb chunks.')
upload.add_argument('--verbose', default=False, action='store_true', help='Verbose reporting')
upload.set_defaults(func=ds_upload)
@ -148,6 +151,9 @@ def cli():
'\'/mnt/shared/folder/data\'')
finalize.add_argument('--disable-upload', action='store_true', default=False,
help='Disable automatic upload when closing the dataset')
finalize.add_argument('--chunk-size', default=-1, type=int,
help='Set dataset artifact chunk size in MB. Default -1, unlimited size. '
'Example: 512, dataset will be split and uploaded in 512mb chunks.')
finalize.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting')
finalize.set_defaults(func=ds_close)
@ -216,6 +222,14 @@ def cli():
get.add_argument('--link', type=str, default=None,
help='Create a soft link (not supported on Windows) to a '
'read-only cached folder containing the dataset')
get.add_argument('--part', type=int, default=None,
help='Retrieve a partial copy of the dataset. '
'Part number (0 to `num-parts`-1) of total parts --num-parts.')
get.add_argument('--num-parts', type=int, default=None,
help='Total number of parts to divide the dataset to. '
'Notice minimum retrieved part is a single chunk in a dataset (or its parents).'
'Example: Dataset gen4, with 3 parents, each with a single chunk, '
'can be divided into 4 parts')
get.add_argument('--overwrite', action='store_true', default=False, help='If True, overwrite the target folder')
get.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting')
get.set_defaults(func=ds_get)
@ -274,7 +288,7 @@ def ds_get(args):
pass
if args.copy:
ds_folder = args.copy
ds.get_mutable_local_copy(target_folder=ds_folder)
ds.get_mutable_local_copy(target_folder=ds_folder, part=args.part, num_parts=args.num_parts)
else:
if args.link:
Path(args.link).mkdir(parents=True, exist_ok=True)
@ -286,7 +300,7 @@ def ds_get(args):
Path(args.link).unlink()
except Exception:
raise ValueError("Target directory {} is not empty. Use --overwrite.".format(args.link))
ds_folder = ds.get_local_copy()
ds_folder = ds.get_local_copy(part=args.part, num_parts=args.num_parts)
if args.link:
os.symlink(ds_folder, args.link)
ds_folder = args.link
@ -372,7 +386,10 @@ def ds_close(args):
raise ValueError("Pending uploads, cannot finalize dataset. run `clearml-data upload`")
# upload the files
print("Pending uploads, starting dataset upload to {}".format(args.storage or ds.get_default_storage()))
ds.upload(show_progress=True, verbose=args.verbose, output_url=args.storage or None)
ds.upload(show_progress=True,
verbose=args.verbose,
output_url=args.storage or None,
chunk_size=args.chunk_size or -1,)
ds.finalize()
print('Dataset closed and finalized')
@ -399,7 +416,7 @@ def ds_upload(args):
check_null_id(args)
print_args(args)
ds = Dataset.get(dataset_id=args.id)
ds.upload(verbose=args.verbose, output_url=args.storage or None)
ds.upload(verbose=args.verbose, output_url=args.storage or None, chunk_size=args.chunk_size or -1)
print('Dataset upload completed')
return 0
@ -443,7 +460,10 @@ def ds_sync(args):
if ds.is_dirty():
# upload the files
print("Pending uploads, starting dataset upload to {}".format(args.storage or ds.get_default_storage()))
ds.upload(show_progress=True, verbose=args.verbose, output_url=args.storage or None)
ds.upload(show_progress=True,
verbose=args.verbose,
output_url=args.storage or None,
chunk_size=args.chunk_size or -1, )
ds.finalize()
print('Dataset closed and finalized')

View File

@ -16,6 +16,7 @@ from .. import Task, StorageManager, Logger
from ..backend_api.session.client import APIClient
from ..backend_interface.task.development.worker import DevWorker
from ..backend_interface.util import mutually_exclusive, exact_match_regex
from ..config import deferred_config
from ..debugging.log import LoggerRoot
from ..storage.helper import StorageHelper
from ..storage.cache import CacheManager
@ -33,6 +34,8 @@ class FileEntry(object):
hash = attrib(default=None, type=str)
parent_dataset_id = attrib(default=None, type=str)
size = attrib(default=None, type=int)
# support multi part artifact storage
artifact_name = attrib(default=None, type=str)
# cleared when file is uploaded.
local_path = attrib(default=None, type=str)
@ -40,6 +43,7 @@ class FileEntry(object):
# type: () -> Dict
state = dict(relative_path=self.relative_path, hash=self.hash,
parent_dataset_id=self.parent_dataset_id, size=self.size,
artifact_name=self.artifact_name,
**dict([('local_path', self.local_path)] if self.local_path else ()))
return state
@ -47,13 +51,15 @@ class FileEntry(object):
class Dataset(object):
__private_magic = 42 * 1337
__state_entry_name = 'state'
__data_entry_name = 'data'
__default_data_entry_name = 'data'
__data_entry_name_prefix = 'data_'
__cache_context = 'datasets'
__tag = 'dataset'
__cache_folder_prefix = 'ds_'
__dataset_folder_template = CacheManager.set_context_folder_lookup(__cache_context, "{0}_archive_{1}")
__preview_max_file_entries = 15000
__preview_max_size = 5 * 1024 * 1024
_dataset_chunk_size_mb = deferred_config("storage.dataset_chunk_size_mb", 512, transform=int)
def __init__(self, _private, task=None, dataset_project=None, dataset_name=None, dataset_tags=None):
# type: (int, Optional[Task], Optional[str], Optional[str], Optional[Sequence[str]]) -> ()
@ -68,8 +74,14 @@ class Dataset(object):
if task:
self._task_pinger = None
self._created_task = False
task_status = task.data.status
# if we are continuing aborted Task, force the state
if str(task_status) == 'stopped':
task.mark_started(force=True)
task_status = 'in_progress'
# If we are reusing the main current Task, make sure we set its type to data_processing
if str(task.data.status) in ('created', 'in_progress'):
if str(task_status) in ('created', 'in_progress'):
if str(task.task_type) != str(Task.TaskTypes.data_processing):
task.set_task_type(task_type=Task.TaskTypes.data_processing)
task.set_system_tags((task.get_system_tags() or []) + [self.__tag])
@ -114,6 +126,11 @@ class Dataset(object):
# dirty flag, set True by any function call changing the dataset (regardless of weather it did anything)
self._dirty = False
self._using_current_task = False
# set current artifact name to be used (support for multiple upload sessions)
self._data_artifact_name = self._get_next_data_artifact_name()
# store a cached lookup of the number of chunks each parent dataset has.
# this will help with verifying we have n up-to-date partial local copy
self._dependency_chunk_lookup = None # type: Optional[Dict[str, int]]
@property
def id(self):
@ -247,8 +264,8 @@ class Dataset(object):
def sync_folder(self, local_path, dataset_path=None, verbose=False):
# type: (Union[Path, _Path, str], Union[Path, _Path, str], bool) -> (int, int)
"""
Synchronize the dataset with a local folder. The dataset is synchronized recursively from the `local_path` into
the `dataset_path` (default: dataset root).
Synchronize the dataset with a local folder. The dataset is synchronized from the
relative_base_folder (default: dataset root) and deeper with the specified local path.
:param local_path: Local folder to sync (assumes all files and recursive)
:param dataset_path: Target dataset path to sync with (default the root of the dataset)
@ -294,8 +311,8 @@ class Dataset(object):
self._serialize()
return removed_files, added_files
def upload(self, show_progress=True, verbose=False, output_url=None, compression=None):
# type: (bool, bool, Optional[str], Optional[str]) -> ()
def upload(self, show_progress=True, verbose=False, output_url=None, compression=None, chunk_size=None):
# type: (bool, bool, Optional[str], Optional[str], int) -> ()
"""
Start file uploading, the function returns when all files are uploaded.
@ -304,6 +321,9 @@ class Dataset(object):
:param output_url: Target storage for the compressed dataset (default: file server)
Examples: `s3://bucket/data`, `gs://bucket/data` , `azure://bucket/data` , `/mnt/share/data`
:param compression: Compression algorithm for the Zipped dataset file (default: ZIP_DEFLATED)
:param chunk_size: Artifact chunk size (MB) for the compressed dataset,
if not provided (None) use the default chunk size (512mb).
If -1 is provided, use a single zip artifact for the entire dataset change-set (old behaviour)
"""
# set output_url
if output_url:
@ -314,16 +334,34 @@ class Dataset(object):
dict(show_progress=show_progress, verbose=verbose, output_url=output_url, compression=compression)),
print_console=False)
list_zipped_artifacts = [] # List[Tuple[Path, int, str, str]]
list_file_entries = list(self._dataset_file_entries.values())
total_size = 0
chunk_size = int(self._dataset_chunk_size_mb if not chunk_size else chunk_size)
try:
from tqdm import tqdm # noqa
a_tqdm = tqdm(total=len(list_file_entries))
except ImportError:
a_tqdm = None
while list_file_entries:
fd, zip_file = mkstemp(
prefix='dataset.{}.'.format(self._id), suffix='.zip'
)
archive_preview = ''
count = 0
processed = 0
zip_file = Path(zip_file)
print('{}Compressing local files, chunk {} [remaining {} files]'.format(
'\n' if a_tqdm else '', 1+len(list_zipped_artifacts), len(list_file_entries)))
try:
with ZipFile(zip_file, 'w', allowZip64=True, compression=compression or ZIP_DEFLATED) as zf:
for file_entry in self._dataset_file_entries.values():
with ZipFile(zip_file.as_posix(), 'w', allowZip64=True, compression=compression or ZIP_DEFLATED) as zf:
for file_entry in list_file_entries:
processed += 1
if a_tqdm:
a_tqdm.update()
if not file_entry.local_path:
# file is located in a different version
# file is already in an uploaded artifact
continue
filename = Path(file_entry.local_path)
if not filename.is_file():
@ -339,8 +377,12 @@ class Dataset(object):
zf.write(filename.as_posix(), arcname=relative_file_name)
archive_preview += '{} - {}\n'.format(
relative_file_name, format_size(filename.stat().st_size))
file_entry.local_path = None
file_entry.artifact_name = self._data_artifact_name
count += 1
# limit the size of a single artifact
if chunk_size > 0 and zip_file.stat().st_size >= chunk_size * (1024**2):
break
except Exception as e:
# failed uploading folder:
LoggerRoot.get_base_logger().warning(
@ -349,17 +391,35 @@ class Dataset(object):
finally:
os.close(fd)
zip_file = Path(zip_file)
if not count:
zip_file.unlink()
total_size += zip_file.stat().st_size
# let's see what's left
list_file_entries = list_file_entries[processed:]
# update the artifact preview
archive_preview = 'Dataset archive content [{} files]:\n'.format(count) + archive_preview
# add into the list
list_zipped_artifacts += [(zip_file, count, archive_preview, self._data_artifact_name)]
# next artifact name to use
self._data_artifact_name = self._get_next_data_artifact_name(self._data_artifact_name)
if a_tqdm:
a_tqdm.close()
self._task.get_logger().report_text(
'File compression completed: total size {}, {} chunked stored (average size {})'.format(
format_size(total_size),
len(list_zipped_artifacts),
format_size(total_size / len(list_zipped_artifacts))))
if not list_zipped_artifacts:
LoggerRoot.get_base_logger().info('No pending files, skipping upload.')
self._dirty = False
self._serialize()
return True
archive_preview = 'Dataset archive content [{} files]:\n'.format(count) + archive_preview
for i, (zip_file, count, archive_preview, artifact_name) in enumerate(list_zipped_artifacts):
# noinspection PyBroadException
try:
# let's try to rename it
@ -368,18 +428,29 @@ class Dataset(object):
zip_file = new_zip_file
except Exception:
pass
# remove files that could not be zipped, containing Null relative Path
self._dataset_file_entries = {k: v for k, v in self._dataset_file_entries.items()
if v.relative_path is not None}
# start upload
zip_file_size = format_size(Path(zip_file).stat().st_size)
self._task.get_logger().report_text(
'Uploading compressed dataset changes ({} files, total {}) to {}'.format(
count, zip_file_size, self.get_default_storage()))
'Uploading compressed dataset changes {}/{} ({} files {}) to {}'.format(
i+1, len(list_zipped_artifacts), count, zip_file_size, self.get_default_storage()))
self._task.upload_artifact(
name=self.__data_entry_name, artifact_object=Path(zip_file), preview=archive_preview,
name=artifact_name, artifact_object=Path(zip_file), preview=archive_preview,
delete_after_upload=True, wait_on_upload=True)
self._task.get_logger().report_text('Upload completed ({})'.format(zip_file_size))
# mark as upload completed and serialize
for file_entry in self._dataset_file_entries.values():
if file_entry.parent_dataset_id == self._id and file_entry.artifact_name == artifact_name:
file_entry.local_path = None
# serialize current state
self._serialize()
# remove files that could not be zipped, containing Null relative Path
self._dataset_file_entries = {
k: v for k, v in self._dataset_file_entries.items() if v.relative_path is not None}
# report upload completed
self._task.get_logger().report_text('Upload completed ({})'.format(format_size(total_size)))
self._add_script_call(
'upload', show_progress=show_progress, verbose=verbose, output_url=output_url, compression=compression)
@ -409,7 +480,7 @@ class Dataset(object):
self._task.get_logger().report_text('Finalizing dataset', print_console=False)
# make sure we have no redundant parent versions
self._serialize()
self._serialize(update_dependency_chunk_lookup=True)
self._add_script_call('finalize')
if verbose:
print('Updating statistics and genealogy')
@ -453,14 +524,26 @@ class Dataset(object):
return self._task.get_status() not in (
Task.TaskStatusEnum.in_progress, Task.TaskStatusEnum.created, Task.TaskStatusEnum.failed)
def get_local_copy(self, use_soft_links=None, raise_on_error=True):
# type: (bool, bool) -> str
def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_on_error=True):
# type: (bool, Optional[int], Optional[int], bool) -> str
"""
return a base folder with a read-only (immutable) local copy of the entire dataset
download and copy / soft-link, files from all the parent dataset versions
:param use_soft_links: If True use soft links, default False on windows True on Posix systems
:param part: Optional, if provided only download the selected part (index) of the Dataset.
First part number is `0` and last part is `num_parts-1`
Notice, if `num_parts` is not provided, number of parts will be equal to the total number of chunks
(i.e. sum over all chunks from the specified Dataset including all parent Datasets).
This argument is passed to parent datasets, as well as the implicit `num_parts`,
allowing users to get a partial copy of the entire dataset, for multi node/step processing.
:param num_parts: Optional, If specified normalize the number of chunks stored to the
requested number of parts. Notice that the actual chunks used per part are rounded down.
Example: Assuming total 8 chunks for this dataset (including parent datasets),
and `num_parts=5`, the chunk index used per parts would be:
part=0 -> chunks[0,5], part=1 -> chunks[1,6], part=2 -> chunks[2,7], part=3 -> chunks[3, ]
:param raise_on_error: If True raise exception if dataset merging failed on any file
:return: A base folder for the entire dataset
"""
assert self._id
@ -468,11 +551,12 @@ class Dataset(object):
self._task = Task.get_task(task_id=self._id)
# now let's merge the parents
target_folder = self._merge_datasets(use_soft_links=use_soft_links, raise_on_error=raise_on_error)
target_folder = self._merge_datasets(
use_soft_links=use_soft_links, raise_on_error=raise_on_error, part=part, num_parts=num_parts)
return target_folder
def get_mutable_local_copy(self, target_folder, overwrite=False, raise_on_error=True):
# type: (Union[Path, _Path, str], bool, bool) -> Optional[str]
def get_mutable_local_copy(self, target_folder, overwrite=False, part=None, num_parts=None, raise_on_error=True):
# type: (Union[Path, _Path, str], bool, Optional[int], Optional[int], bool) -> Optional[str]
"""
return a base folder with a writable (mutable) local copy of the entire dataset
download and copy / soft-link, files from all the parent dataset versions
@ -480,7 +564,19 @@ class Dataset(object):
:param target_folder: Target folder for the writable copy
:param overwrite: If True, recursively delete the target folder before creating a copy.
If False (default) and target folder contains files, raise exception or return None
:param part: Optional, if provided only download the selected part (index) of the Dataset.
First part number is `0` and last part is `num_parts-1`
Notice, if `num_parts` is not provided, number of parts will be equal to the total number of chunks
(i.e. sum over all chunks from the specified Dataset including all parent Datasets).
This argument is passed to parent datasets, as well as the implicit `num_parts`,
allowing users to get a partial copy of the entire dataset, for multi node/step processing.
:param num_parts: Optional, If specified normalize the number of chunks stored to the
requested number of parts. Notice that the actual chunks used per part are rounded down.
Example: Assuming total 8 chunks for this dataset (including parent datasets),
and `num_parts=5`, the chunk index used per parts would be:
part=0 -> chunks[0,5], part=1 -> chunks[1,6], part=2 -> chunks[2,7], part=3 -> chunks[3, ]
:param raise_on_error: If True raise exception if dataset merging failed on any file
:return: A the target folder containing the entire dataset
"""
assert self._id
@ -497,8 +593,8 @@ class Dataset(object):
return None
shutil.rmtree(target_folder.as_posix())
ro_folder = self.get_local_copy(raise_on_error=raise_on_error)
shutil.copytree(ro_folder, target_folder.as_posix())
ro_folder = self.get_local_copy(part=part, num_parts=num_parts, raise_on_error=raise_on_error)
shutil.copytree(ro_folder, target_folder.as_posix(), symlinks=False)
return target_folder.as_posix()
def list_files(self, dataset_path=None, recursive=True, dataset_id=None):
@ -680,7 +776,7 @@ class Dataset(object):
If True, the dataset is created on the current Task.
:return: Newly created Dataset object
"""
parent_datasets = [cls.get(dataset_id=p) if isinstance(p, str) else p for p in (parent_datasets or [])]
parent_datasets = [cls.get(dataset_id=p) if not isinstance(p, Dataset) else p for p in (parent_datasets or [])]
if any(not p.is_final() for p in parent_datasets):
raise ValueError("Cannot inherit from a parent that was not finalized/closed")
@ -831,15 +927,19 @@ class Dataset(object):
if task.status == 'created':
raise ValueError('Dataset id={} is in draft mode, delete and recreate it'.format(task.id))
force_download = False if task.status in ('stopped', 'published', 'closed', 'completed') else True
if cls.__state_entry_name in task.artifacts:
local_state_file = StorageManager.get_local_copy(
remote_url=task.artifacts[cls.__state_entry_name].url, cache_context=cls.__cache_context,
extract_archive=False, name=task.id, force_download=force_download)
if not local_state_file:
raise ValueError('Could not load Dataset id={} state'.format(task.id))
else:
# we could not find the serialized state, start empty
local_state_file = {}
instance = cls._deserialize(local_state_file, task)
# remove the artifact, just in case
if force_download:
if force_download and local_state_file:
os.unlink(local_state_file)
return instance
@ -853,6 +953,23 @@ class Dataset(object):
"""
return self._task.get_logger()
def get_num_chunks(self, include_parents=True):
# type: (bool) -> int
"""
Return the number of chunks stored on this dataset
(it does not imply on the number of chunks parent versions store)
:param include_parents: If True (default),
return the total number of chunks from this version and all parent versions.
If False, only return the number of chunks we stored on this specific version.
:return: Number of chunks stored on the dataset.
"""
if not include_parents:
return len(self._get_data_artifact_names())
return sum(self._get_dependency_chunk_lookup().values())
@classmethod
def squash(cls, dataset_name, dataset_ids=None, dataset_project_name_pairs=None, output_url=None):
# type: (str, Optional[Sequence[Union[str, Dataset]]],Optional[Sequence[(str, str)]], Optional[str]) -> Dataset
@ -987,7 +1104,7 @@ class Dataset(object):
self._task.get_logger().report_text('Generating SHA2 hash for {} files'.format(len(file_entries)))
pool = ThreadPool(cpu_count() * 2)
try:
import tqdm
import tqdm # noqa
for _ in tqdm.tqdm(pool.imap_unordered(self._calc_file_hash, file_entries), total=len(file_entries)):
pass
except ImportError:
@ -1010,6 +1127,13 @@ class Dataset(object):
self._dataset_file_entries[f.relative_path] = f
count += 1
elif f.parent_dataset_id == self._id and ds_cur_f.parent_dataset_id == self._id:
# check if we have the file in an already uploaded chunk
if ds_cur_f.local_path is None:
# skipping, already uploaded.
if verbose:
self._task.get_logger().report_text('Skipping {}'.format(f.relative_path))
else:
# if we never uploaded it, mark for upload
if verbose:
self._task.get_logger().report_text('Re-Added {}'.format(f.relative_path))
self._dataset_file_entries[f.relative_path] = f
@ -1028,17 +1152,22 @@ class Dataset(object):
# collect all dataset versions
used_dataset_versions = set(f.parent_dataset_id for f in self._dataset_file_entries.values())
used_dataset_versions.add(self._id)
current_parents = self._dependency_graph.get(self._id)
current_parents = self._dependency_graph.get(self._id) or []
# remove parent versions we no longer need from the main version list
# per version, remove unnecessary parent versions, if we do not need them
self._dependency_graph = {k: [p for p in parents if p in used_dataset_versions]
self._dependency_graph = {
k: [p for p in parents or [] if p in used_dataset_versions]
for k, parents in self._dependency_graph.items() if k in used_dataset_versions}
# make sure we do not remove our parents, for geology sake
self._dependency_graph[self._id] = current_parents
def _serialize(self):
def _serialize(self, update_dependency_chunk_lookup=False):
# type: (bool) -> ()
"""
store current state of the Dataset for later use
:param update_dependency_chunk_lookup: If True, update the parent versions number of chunks
:return: object to be used for later deserialization
"""
self._update_dependency_graph()
@ -1049,6 +1178,9 @@ class Dataset(object):
id=self._id,
dirty=self._dirty,
)
if update_dependency_chunk_lookup:
state['dependency_chunk_lookup'] = self._build_dependency_chunk_lookup()
modified_files = [f['size'] for f in state['dataset_file_entries'] if f.get('parent_dataset_id') == self._id]
preview = \
'Dataset state\n' \
@ -1060,130 +1192,210 @@ class Dataset(object):
self._task.upload_artifact(
name=self.__state_entry_name, artifact_object=state, preview=preview, wait_on_upload=True)
def _download_dataset_archive(self):
def _download_dataset_archives(self):
"""
Download the dataset archive, return a link to locally stored zip file
:return: Path to locally stored zip file
:return: List of paths to locally stored zip files
"""
pass # TODO: implement
def _extract_dataset_archive(self):
def _extract_dataset_archive(
self,
force=False,
selected_chunks=None,
lock_target_folder=False,
cleanup_target_folder=True,
target_folder=None,
):
# type: (bool, Optional[List[int]], bool, bool, Optional[Path]) -> str
"""
Download the dataset archive, and extract the zip content to a cached folder.
Notice no merging is done.
:param force: If True extract dataset content even if target folder exists and is not empty
:param selected_chunks: Optional, if provided only download the selected chunks (index) of the Dataset.
Example: Assuming 8 chunks on this version
selected_chunks=[0,1,2]
:param lock_target_folder: If True, local the target folder so the next cleanup will not delete
Notice you should unlock it manually, or wait fro the process to fnish for auto unlocking.
:param cleanup_target_folder: If True remove target folder recursively
:param target_folder: If provided use the specified target folder, default, auto generate from Dataset ID.
:return: Path to a local storage extracted archive
"""
assert selected_chunks is None or isinstance(selected_chunks, (list, tuple))
if not self._task:
self._task = Task.get_task(task_id=self._id)
data_artifact_entries = self._get_data_artifact_names()
if selected_chunks is not None and data_artifact_entries:
data_artifact_entries = [
d for d in data_artifact_entries
if self._get_chunk_idx_from_artifact_name(d) in selected_chunks]
# get cache manager
local_folder = Path(target_folder) if target_folder else \
self._create_ds_target_folder(lock_target_folder=lock_target_folder)
# check if we have a dataset with empty change set
if not self._task.artifacts.get(self.__data_entry_name):
cache = CacheManager.get_cache_manager(cache_context=self.__cache_context)
local_folder = Path(cache.get_cache_folder()) / self._get_cache_folder_name()
local_folder.mkdir(parents=True, exist_ok=True)
if not data_artifact_entries:
return local_folder.as_posix()
# check if target folder is not empty
if not force and next(local_folder.glob('*'), None):
return local_folder.as_posix()
# if we got here, we need to clear the target folder
local_folder = local_folder.as_posix()
if cleanup_target_folder:
shutil.rmtree(local_folder, ignore_errors=True)
# verify target folder exists
Path(local_folder).mkdir(parents=True, exist_ok=True)
def _download_part(data_artifact_name):
# download the dataset zip
local_zip = StorageManager.get_local_copy(
remote_url=self._task.artifacts[self.__data_entry_name].url, cache_context=self.__cache_context,
remote_url=self._task.artifacts[data_artifact_name].url, cache_context=self.__cache_context,
extract_archive=False, name=self._id)
if not local_zip:
raise ValueError("Could not download dataset id={}".format(self._id))
local_folder = (Path(local_zip).parent / self._get_cache_folder_name()).as_posix()
# if we got here, we need to clear the target folder
shutil.rmtree(local_folder, ignore_errors=True)
raise ValueError("Could not download dataset id={} entry={}".format(self._id, data_artifact_name))
# noinspection PyProtectedMember
local_folder = StorageManager._extract_to_cache(
StorageManager._extract_to_cache(
cached_file=local_zip, name=self._id,
cache_context=self.__cache_context, target_folder=local_folder)
cache_context=self.__cache_context, target_folder=local_folder, force=True)
# download al parts in parallel
# if len(data_artifact_entries) > 1:
# pool = ThreadPool()
# pool.map(_download_part, data_artifact_entries)
# pool.close()
# else:
# _download_part(data_artifact_entries[0])
for d in data_artifact_entries:
_download_part(d)
return local_folder
def _merge_datasets(self, use_soft_links=None, raise_on_error=True):
# type: (bool, bool) -> str
def _create_ds_target_folder(self, part=None, num_parts=None, lock_target_folder=True):
# type: (Optional[int], Optional[int], bool) -> Path
cache = CacheManager.get_cache_manager(cache_context=self.__cache_context)
local_folder = Path(cache.get_cache_folder()) / self._get_cache_folder_name(part=part, num_parts=num_parts)
if lock_target_folder:
cache.lock_cache_folder(local_folder)
local_folder.mkdir(parents=True, exist_ok=True)
return local_folder
def _get_data_artifact_names(self):
# type: () -> List[str]
data_artifact_entries = [
a for a in self._task.artifacts
if a and (a == self.__default_data_entry_name or str(a).startswith(self.__data_entry_name_prefix))]
return data_artifact_entries
def _get_next_data_artifact_name(self, last_artifact_name=None):
# type: (Optional[str]) -> str
if not last_artifact_name:
data_artifact_entries = self._get_data_artifact_names()
if len(data_artifact_entries) < 1:
return self.__default_data_entry_name
else:
data_artifact_entries = [last_artifact_name]
prefix = self.__data_entry_name_prefix
prefix_len = len(prefix)
numbers = sorted([int(a[prefix_len:]) for a in data_artifact_entries if a.startswith(prefix)])
return '{}{:03d}'.format(prefix, numbers[-1]+1 if numbers else 1)
def _merge_datasets(self, use_soft_links=None, raise_on_error=True, part=None, num_parts=None):
# type: (bool, bool, Optional[int], Optional[int]) -> str
"""
download and copy / soft-link, files from all the parent dataset versions
:param use_soft_links: If True use soft links, default False on windows True on Posix systems
:param raise_on_error: If True raise exception if dataset merging failed on any file
:param part: Optional, if provided only download the selected part (index) of the Dataset.
Notice, if `num_parts` is not provided, number of parts will be equal to the number of chunks.
This argument is passed to parent versions, as well as the implicit `num_parts`,
allowing users to get a partial copy of the entire dataset, for multi node/step processing.
:param num_parts: Optional, If specified normalize the number of chunks stored to the
requested number of parts. Notice that the actual chunks used per part are rounded down.
Example: Assuming 8 chunks on this version, and `num_parts=5`, the chunk index used per parts would be:
part=0 -> chunks[0,5], part=1 -> chunks[1,6], part=2 -> chunks[2,7], part=3 -> chunks[3, ]
:return: the target folder
"""
assert part is None or (isinstance(part, int) and part >= 0)
assert num_parts is None or (isinstance(num_parts, int) and num_parts >= 1)
if use_soft_links is None:
use_soft_links = False if is_windows() else True
# check if we already have everything
target_base_folder, target_base_size = CacheManager.get_cache_manager(
cache_context=self.__cache_context).get_cache_file(local_filename=self._get_cache_folder_name())
if target_base_folder and target_base_size is not None:
target_base_folder = Path(target_base_folder)
# check dataset file size, if we have a full match no need for parent dataset download / merge
verified = True
# noinspection PyBroadException
try:
for f in self._dataset_file_entries.values():
if (target_base_folder / f.relative_path).stat().st_size != f.size:
verified = False
break
except Exception:
verified = False
if part is not None and not num_parts:
num_parts = self.get_num_chunks()
if verified:
# just create the dataset target folder
target_base_folder = self._create_ds_target_folder(
part=part, num_parts=num_parts, lock_target_folder=True)
# selected specific chunks if `part` was passed
chunk_selection = None if part is None else self._build_chunk_selection(part=part, num_parts=num_parts)
# check if target folder is not empty, see if it contains everything we need
if target_base_folder and next(target_base_folder.iterdir(), None):
if self._verify_dataset_folder(target_base_folder, part, chunk_selection):
target_base_folder.touch()
return target_base_folder.as_posix()
else:
LoggerRoot.get_base_logger().info('Dataset needs refreshing, fetching all parent datasets')
# we should delete the entire cache folder
shutil.rmtree(target_base_folder.as_posix())
# make sure we recreate the dataset target folder
target_base_folder.mkdir(parents=True, exist_ok=True)
# get the dataset dependencies (if `part` was passed, only selected the ones in the selected part)
dependencies_by_order = self._get_dependencies_by_order(include_unused=False, include_current=True) \
if chunk_selection is None else list(chunk_selection.keys())
# first get our dataset
target_base_folder = Path(self._extract_dataset_archive())
if self._id in dependencies_by_order:
self._extract_dataset_archive(
force=True,
selected_chunks=chunk_selection.get(self._id) if chunk_selection else None,
cleanup_target_folder=True,
target_folder=target_base_folder,
)
dependencies_by_order.remove(self._id)
# update target folder timestamp
target_base_folder.touch()
# create thread pool
pool = ThreadPool(cpu_count() * 2)
for dataset_version_id in self._get_dependencies_by_order():
# make sure we skip over empty dependencies
if dataset_version_id not in self._dependency_graph:
continue
ds = Dataset.get(dataset_id=dataset_version_id)
ds_base_folder = Path(ds._extract_dataset_archive())
ds_base_folder.touch()
def copy_file(file_entry):
if file_entry.parent_dataset_id != dataset_version_id:
return
source = (ds_base_folder / file_entry.relative_path).as_posix()
target = (target_base_folder / file_entry.relative_path).as_posix()
try:
# make sure we have can overwrite the target file
# noinspection PyBroadException
try:
os.unlink(target)
except Exception:
Path(target).parent.mkdir(parents=True, exist_ok=True)
# copy / link
if use_soft_links:
if not os.path.isfile(source):
raise ValueError("Extracted file missing {}".format(source))
os.symlink(source, target)
else:
shutil.copy2(source, target, follow_symlinks=True)
except Exception as ex:
LoggerRoot.get_base_logger().warning('{}\nFailed {} file {} to {}'.format(
ex, 'linking' if use_soft_links else 'copying', source, target))
return ex
return None
errors = pool.map(copy_file, self._dataset_file_entries.values())
if raise_on_error and any(errors):
raise ValueError("Dataset merging failed: {}".format([e for e in errors if e is not None]))
pool.close()
# if we have no dependencies, we can just return now
if not dependencies_by_order:
return target_base_folder.absolute().as_posix()
def _get_dependencies_by_order(self, include_unused=False):
# type: (bool) -> List[str]
# extract parent datasets
self._extract_parent_datasets(
target_base_folder=target_base_folder, dependencies_by_order=dependencies_by_order,
chunk_selection=chunk_selection, use_soft_links=use_soft_links,
raise_on_error=False, force=False)
# verify entire dataset (if failed, force downloading parent datasets)
if not self._verify_dataset_folder(target_base_folder, part, chunk_selection):
LoggerRoot.get_base_logger().info('Dataset parents need refreshing, re-fetching all parent datasets')
# we should delete the entire cache folder
self._extract_parent_datasets(
target_base_folder=target_base_folder, dependencies_by_order=dependencies_by_order,
chunk_selection=chunk_selection, use_soft_links=use_soft_links,
raise_on_error=raise_on_error, force=True)
return target_base_folder.absolute().as_posix()
def _get_dependencies_by_order(self, include_unused=False, include_current=True):
# type: (bool, bool) -> List[str]
"""
Return the dataset dependencies by order of application (from the last to the current)
:param bool include_unused: If True include unused datasets in the dependencies
:param include_unused: If True include unused datasets in the dependencies
:param include_current: If True include the current dataset ID as the last ID in the list
:return: list of str representing the datasets id
"""
roots = [self._id]
@ -1196,10 +1408,10 @@ class Dataset(object):
# add the parents of the current node, only if the parents are in the general graph node list
if include_unused and r not in self._dependency_graph:
roots.extend(list(reversed(
[p for p in self.get(dataset_id=r)._get_parents() if p not in roots])))
[p for p in (self.get(dataset_id=r)._get_parents() or []) if p not in roots])))
else:
roots.extend(list(reversed(
[p for p in self._dependency_graph.get(r, [])
[p for p in (self._dependency_graph.get(r) or [])
if p not in roots and (include_unused or (p in self._dependency_graph))])))
# make sure we cover leftovers
@ -1214,14 +1426,15 @@ class Dataset(object):
# add the parents of the current node, only if the parents are in the general graph node list
if include_unused and r not in self._dependency_graph:
roots.extend(list(reversed(
[p for p in self.get(dataset_id=r)._get_parents() if p not in roots])))
[p for p in (self.get(dataset_id=r)._get_parents() or []) if p not in roots])))
else:
roots.extend(list(reversed(
[p for p in self._dependency_graph.get(r, [])
[p for p in (self._dependency_graph.get(r) or [])
if p not in roots and (include_unused or (p in self._dependency_graph))])))
# skip our id
return list(reversed(dependencies[1:]))
dependencies = list(reversed(dependencies[1:]))
return (dependencies + [self._id]) if include_current else dependencies
def _get_parents(self):
# type: () -> Sequence[str]
@ -1248,10 +1461,24 @@ class Dataset(object):
instance = cls(_private=cls.__private_magic, task=task)
# assert instance._id == stored_state['id'] # They should match
instance._dependency_graph = stored_state['dependency_graph']
instance._dependency_graph = stored_state.get('dependency_graph', {})
instance._dirty = stored_state.get('dirty', False)
instance._dataset_file_entries = {
s['relative_path']: FileEntry(**s) for s in stored_state['dataset_file_entries']}
s['relative_path']: FileEntry(**s) for s in stored_state.get('dataset_file_entries', [])}
if stored_state.get('dependency_chunk_lookup') is not None:
instance._dependency_chunk_lookup = stored_state.get('dependency_chunk_lookup')
# update the last used artifact (remove the one we never serialized, they rae considered broken)
if task.status in ('in_progress', 'created', 'stopped'):
artifact_names = set([
a.artifact_name for a in instance._dataset_file_entries.values()
if a.artifact_name and a.parent_dataset_id == instance._id])
missing_artifact_name = set(instance._get_data_artifact_names()) - artifact_names
if missing_artifact_name:
instance._task._delete_artifacts(list(missing_artifact_name))
# if we removed any data artifact, update the next data artifact name
instance._data_artifact_name = instance._get_next_data_artifact_name()
return instance
@staticmethod
@ -1272,8 +1499,27 @@ class Dataset(object):
"""
return 'dsh{}'.format(md5text(dataset_id))
def _get_cache_folder_name(self):
def _build_dependency_chunk_lookup(self):
# type: () -> Dict[str, int]
"""
Build the dependency dataset id to number-of-chunks, lookup table
:return: lookup dictionary from dataset-id to number of chunks
"""
# with ThreadPool() as pool:
# chunks_lookup = pool.map(
# lambda d: (d, Dataset.get(dataset_id=d).get_num_chunks()),
# self._dependency_graph.keys())
# return dict(chunks_lookup)
chunks_lookup = map(
lambda d: (d, Dataset.get(dataset_id=d).get_num_chunks()),
self._dependency_graph.keys())
return dict(chunks_lookup)
def _get_cache_folder_name(self, part=None, num_parts=None):
# type: (Optional[int], Optional[int]) -> str
if part is None:
return '{}{}'.format(self.__cache_folder_prefix, self._id)
return '{}{}_{}_{}'.format(self.__cache_folder_prefix, self._id, part, num_parts)
def _add_script_call(self, func_name, **kwargs):
# type: (str, **Any) -> ()
@ -1306,7 +1552,7 @@ class Dataset(object):
hovertemplate='<extra></extra>',
)
# get DAG nodes
nodes = self._get_dependencies_by_order(include_unused=True) + [self.id]
nodes = self._get_dependencies_by_order(include_unused=True, include_current=True)
# dataset name lookup
# noinspection PyProtectedMember
node_names = {t.id: t.name for t in Task._query_tasks(task_ids=nodes, only_fields=['id', 'name'])}
@ -1393,14 +1639,24 @@ class Dataset(object):
# report the detailed content of the dataset as configuration,
# this allows for easy version comparison in the UI
dataset_details = None
dataset_details_header = None
dataset_details_header_template = 'File Name ({} files) - File Size (total {}) - Hash (SHA2)\n'
if len(self._dataset_file_entries) < self.__preview_max_file_entries:
file_entries = sorted(self._dataset_file_entries.values(), key=lambda x: x.relative_path)
dataset_details = \
'File Name - File Size - Hash (SHA2)\n' +\
dataset_details_header = dataset_details_header_template.format(
len(file_entries), format_size(sum(f.size for f in file_entries))
)
dataset_details = dataset_details_header + \
'\n'.join('{} - {} - {}'.format(f.relative_path, f.size, f.hash) for f in file_entries)
# too large to store
if not dataset_details or len(dataset_details) > self.__preview_max_size:
dataset_details = 'Dataset content is too large to preview'
if not dataset_details_header:
dataset_details_header = dataset_details_header_template.format(
len(self._dataset_file_entries),
format_size(sum(f.size for f in self._dataset_file_entries.values()))
)
dataset_details = dataset_details_header + 'Dataset content is too large to preview'
# noinspection PyProtectedMember
self._task._set_configuration(
@ -1428,3 +1684,154 @@ class Dataset(object):
:return: Return True means dataset has pending uploads, call 'upload' to start an upload process.
"""
return self._dirty
def _extract_parent_datasets(
self,
target_base_folder,
dependencies_by_order,
chunk_selection,
use_soft_links,
raise_on_error,
force
):
# type: (Path, List[str], Optional[dict], bool, bool, bool) -> ()
# create thread pool, for creating soft-links / copying
# todo: parallelize by parent datasets
pool = ThreadPool(cpu_count() * 2)
for dataset_version_id in dependencies_by_order:
# make sure we skip over empty dependencies
if dataset_version_id not in self._dependency_graph:
continue
selected_chunks = chunk_selection.get(dataset_version_id) if chunk_selection else None
ds = Dataset.get(dataset_id=dataset_version_id)
ds_base_folder = Path(ds._extract_dataset_archive(
selected_chunks=selected_chunks,
force=force,
lock_target_folder=True,
cleanup_target_folder=False,
))
ds_base_folder.touch()
def copy_file(file_entry):
if file_entry.parent_dataset_id != dataset_version_id or \
(selected_chunks is not None and
self._get_chunk_idx_from_artifact_name(file_entry.artifact_name) not in selected_chunks):
return
source = (ds_base_folder / file_entry.relative_path).as_posix()
target = (target_base_folder / file_entry.relative_path).as_posix()
try:
# make sure we have can overwrite the target file
# noinspection PyBroadException
try:
os.unlink(target)
except Exception:
Path(target).parent.mkdir(parents=True, exist_ok=True)
# copy / link
if use_soft_links:
if not os.path.isfile(source):
raise ValueError("Extracted file missing {}".format(source))
os.symlink(source, target)
else:
shutil.copy2(source, target, follow_symlinks=True)
except Exception as ex:
LoggerRoot.get_base_logger().warning('{}\nFailed {} file {} to {}'.format(
ex, 'linking' if use_soft_links else 'copying', source, target))
return ex
return None
errors = pool.map(copy_file, self._dataset_file_entries.values())
CacheManager.get_cache_manager(cache_context=self.__cache_context).unlock_cache_folder(
ds_base_folder.as_posix())
if raise_on_error and any(errors):
raise ValueError("Dataset merging failed: {}".format([e for e in errors if e is not None]))
pool.close()
def _verify_dataset_folder(self, target_base_folder, part, chunk_selection):
# type: (Path, Optional[int], Optional[dict]) -> bool
target_base_folder = Path(target_base_folder)
# check dataset file size, if we have a full match no need for parent dataset download / merge
verified = True
# noinspection PyBroadException
try:
for f in self._dataset_file_entries.values():
# check if we need it for the current part
if part is not None:
f_parts = chunk_selection.get(f.parent_dataset_id, [])
# this is not in our current part, no need to check it.
if self._get_chunk_idx_from_artifact_name(f.artifact_name) not in f_parts:
continue
# check if the local size and the stored size match (faster than comparing hash)
if (target_base_folder / f.relative_path).stat().st_size != f.size:
verified = False
break
except Exception:
verified = False
return verified
def _get_dependency_chunk_lookup(self):
# type: () -> Dict[str, int]
"""
Return The parent dataset ID to number of chunks lookup table
:return: Dict key is dataset ID, value is total number of chunks for the specific dataset version.
"""
if self._dependency_chunk_lookup is None:
self._dependency_chunk_lookup = self._build_dependency_chunk_lookup()
return self._dependency_chunk_lookup
def _build_chunk_selection(self, part, num_parts):
# type: (int, int) -> Dict[str, int]
"""
Build the selected chunks from each parent version, based on the current selection.
Notice that for a specific part, one can only get the chunks from parent versions (not including this one)
:param part: Current part index (between 0 and num_parts-1)
:param num_parts: Total number of parts to divide the dataset into
:return: Dict of Dataset ID and their respected chunks used for this part number
"""
# get the chunk dependencies
dependency_chunk_lookup = self._get_dependency_chunk_lookup()
# first collect the total number of chunks
total_chunks = sum(dependency_chunk_lookup.values())
avg_chunk_per_part = total_chunks // num_parts
leftover_chunks = total_chunks % num_parts
dependencies = self._get_dependencies_by_order(include_unused=False, include_current=True)
# create the part look up
ds_id_chunk_list = [(d, i) for d in dependencies for i in range(dependency_chunk_lookup.get(d, 1))]
# select the chunks for this part
if part < leftover_chunks:
indexes = ds_id_chunk_list[part*(avg_chunk_per_part+1):(part+1)*(avg_chunk_per_part+1)]
else:
ds_id_chunk_list = ds_id_chunk_list[leftover_chunks*(avg_chunk_per_part+1):]
indexes = ds_id_chunk_list[(part-leftover_chunks)*avg_chunk_per_part:
(part-leftover_chunks+1)*avg_chunk_per_part]
# convert to lookup
chunk_selection = {}
for d, i in indexes:
chunk_selection[d] = chunk_selection.get(d, []) + [i]
return chunk_selection
@classmethod
def _get_chunk_idx_from_artifact_name(cls, artifact_name):
# type: (str) -> int
if not artifact_name:
return -1
artifact_name = str(artifact_name)
if artifact_name == cls.__default_data_entry_name:
return 0
if artifact_name.startswith(cls.__data_entry_name_prefix):
return int(artifact_name[len(cls.__data_entry_name_prefix):])
return -1

View File

@ -1,13 +1,20 @@
import atexit
import hashlib
import os
import shutil
from collections import OrderedDict
from threading import RLock
from typing import Union, Optional, Tuple, Dict
from pathlib2 import Path
from .helper import StorageHelper
from .util import quote_url
from ..config import get_cache_dir, deferred_config
from ..debugging.log import LoggerRoot
from ..utilities.locks.utils import Lock as FileLock
from ..utilities.locks.exceptions import LockException
class CacheManager(object):
@ -19,17 +26,26 @@ class CacheManager(object):
__local_to_remote_url_lookup_max_size = 1024
_context_to_folder_lookup = dict()
_default_context_folder_template = "{0}_artifacts_archive_{1}"
_lockfile_prefix = '.lock.'
_lockfile_suffix = '.clearml'
class CacheContext(object):
_folder_locks = dict() # type: Dict[str, FileLock]
_lockfile_at_exit_cb = None
def __init__(self, cache_context, default_cache_file_limit=10):
# type: (str, int) -> None
self._context = str(cache_context)
self._file_limit = int(default_cache_file_limit)
self._rlock = RLock()
def set_cache_limit(self, cache_file_limit):
# type: (int) -> int
self._file_limit = max(self._file_limit, int(cache_file_limit))
return self._file_limit
def get_local_copy(self, remote_url, force_download):
# type: (str, bool) -> Optional[str]
helper = StorageHelper.get(remote_url)
if not helper:
raise ValueError("Storage access failed: {}".format(remote_url))
@ -59,6 +75,7 @@ class CacheManager(object):
@staticmethod
def upload_file(local_file, remote_url, wait_for_upload=True, retries=1):
# type: (str, str, bool, int) -> Optional[str]
helper = StorageHelper.get(remote_url)
result = helper.upload(
local_file, remote_url, async_enable=not wait_for_upload, retries=retries,
@ -68,11 +85,13 @@ class CacheManager(object):
@classmethod
def get_hashed_url_file(cls, url):
# type: (str) -> str
str_hash = hashlib.md5(url.encode()).hexdigest()
filename = url.split("/")[-1]
return "{}.{}".format(str_hash, quote_url(filename))
def get_cache_folder(self):
# type: () -> str
"""
:return: full path to current contexts cache folder
"""
@ -82,6 +101,7 @@ class CacheManager(object):
return folder.as_posix()
def get_cache_file(self, remote_url=None, local_filename=None):
# type: (Optional[str], Optional[str]) -> Tuple[str, Optional[int]]
"""
:param remote_url: check if we have the remote url in our cache
:param local_filename: if local_file is given, search for the local file/directory in the cache folder
@ -123,10 +143,52 @@ class CacheManager(object):
except Exception:
pass
# first exclude lock files
lock_files = dict()
files = []
for f in sorted(folder.iterdir(), reverse=True, key=sort_max_access_time):
if f.name.startswith(CacheManager._lockfile_prefix) and f.name.endswith(CacheManager._lockfile_suffix):
# parse the lock filename
name = f.name[len(CacheManager._lockfile_prefix):-len(CacheManager._lockfile_suffix)]
num, _, name = name.partition('.')
lock_files[name] = lock_files.get(name, []) + [f.as_posix()]
else:
files.append(f)
# remove new lock files from the list (we will delete them when time comes)
for f in files[:self._file_limit]:
lock_files.pop(f.name, None)
# delete old files
files = sorted(folder.iterdir(), reverse=True, key=sort_max_access_time)
files = files[self._file_limit:]
for f in files:
for i, f in enumerate(files):
if i < self._file_limit:
continue
# check if the file is in the lock folder list:
folder_lock = self._folder_locks.get(f.absolute().as_posix())
if folder_lock:
# pop from lock files
lock_files.pop(f.name, None)
continue
# check if someone else holds the lock file
locks = lock_files.get(f.name, [])
for l in locks:
try:
a_lock = FileLock(filename=l)
a_lock.acquire(timeout=0)
a_lock.release()
a_lock.delete_lock_file()
del a_lock
except LockException:
# someone have the lock skip the file
continue
# if we got here we need to pop from the lock_files, later we will delete the leftover entries
lock_files.pop(f.name, None)
# if we are here we can delete the file
if not f.is_dir():
# noinspection PyBroadException
try:
@ -135,23 +197,93 @@ class CacheManager(object):
pass
else:
try:
shutil.rmtree(f)
shutil.rmtree(f.as_posix())
except Exception as e:
# failed deleting folder
LoggerRoot.get_base_logger().debug(
"Exception {}\nFailed deleting folder {}".format(e, f)
)
# cleanup old lock files
for lock_files in lock_files.values():
for f in lock_files:
# noinspection PyBroadException
try:
os.unlink(f)
except BaseException:
pass
# if file doesn't exist, return file size None
# noinspection PyBroadException
try:
size = new_file.stat().st_size if new_file_exists else None
except Exception:
size = None
return new_file.as_posix(), size
def lock_cache_folder(self, local_path):
# type: (Union[str, Path]) -> ()
"""
Lock a specific cache folder, making sure it will not be deleted in the next
cache cleanup round
:param local_path: Path (str/Path) to a sub-folder inside the instance cache folder
"""
local_path = Path(local_path).absolute()
self._rlock.acquire()
if self._lockfile_at_exit_cb is None:
self._lockfile_at_exit_cb = True
atexit.register(self._lock_file_cleanup_callback)
lock = self._folder_locks.get(local_path.as_posix())
i = 0
# try to create a lock if we do not already have one (if we do, we assume it is locked)
while not lock:
lock_path = local_path.parent / '{}{:03d}.{}{}'.format(
CacheManager._lockfile_prefix, i, local_path.name, CacheManager._lockfile_suffix)
lock = FileLock(filename=lock_path)
# try to lock folder (if we failed to create lock, try nex number)
try:
lock.acquire(timeout=0)
break
except LockException:
# failed locking, maybe someone else already locked it.
del lock
lock = None
i += 1
# store lock
self._folder_locks[local_path.as_posix()] = lock
self._rlock.release()
def unlock_cache_folder(self, local_path):
# type: (Union[str, Path]) -> ()
"""
Lock a specific cache folder, making sure it will not be deleted in the next
cache cleanup round
:param local_path: Path (str/Path) to a sub-folder inside the instance cache folder
"""
local_path = Path(local_path).absolute()
self._rlock.acquire()
# pop lock
lock = self._folder_locks.pop(local_path.as_posix(), None)
if lock:
lock.release()
lock.delete_lock_file()
del lock
self._rlock.release()
@classmethod
def _lock_file_cleanup_callback(cls):
for lock in cls._folder_locks.values():
lock.release()
lock.delete_lock_file()
@classmethod
def get_cache_manager(cls, cache_context=None, cache_file_limit=None):
# type: (Optional[str], Optional[int]) -> CacheManager.CacheContext
cache_context = cache_context or cls._default_context
if cache_context not in cls.__cache_managers:
cls.__cache_managers[cache_context] = cls.CacheContext(
@ -165,6 +297,7 @@ class CacheManager(object):
@staticmethod
def get_remote_url(local_copy_path):
# type: (str) -> str
if not CacheManager._local_to_remote_url_lookup:
return local_copy_path
@ -178,6 +311,7 @@ class CacheManager(object):
@staticmethod
def _add_remote_url(remote_url, local_copy_path):
# type: (str, str) -> ()
# so that we can disable the cache lookup altogether
if CacheManager._local_to_remote_url_lookup is None:
return
@ -206,11 +340,13 @@ class CacheManager(object):
@classmethod
def set_context_folder_lookup(cls, context, name_template):
# type: (str, str) -> str
cls._context_to_folder_lookup[str(context)] = str(name_template)
return str(name_template)
@classmethod
def get_context_folder_lookup(cls, context):
# type: (Optional[str]) -> str
if not context:
return cls._default_context_folder_template
return cls._context_to_folder_lookup.get(str(context), cls._default_context_folder_template)

View File

@ -97,8 +97,16 @@ class StorageManager(object):
).set_cache_limit(cache_file_limit)
@classmethod
def _extract_to_cache(cls, cached_file, name, cache_context=None, target_folder=None, cache_path_encoding=None):
# type: (str, str, Optional[str], Optional[str], Optional[str]) -> str
def _extract_to_cache(
cls,
cached_file, # type: str
name, # type: str
cache_context=None, # type: Optional[str]
target_folder=None, # type: Optional[str]
cache_path_encoding=None, # type: Optional[str]
force=False, # type: bool
):
# type: (...) -> str
"""
Extract cached file to cache folder
:param str cached_file: local copy of archive file
@ -108,6 +116,7 @@ class StorageManager(object):
:param str cache_path_encoding: specify representation of the local path of the cached files,
this will always point to local cache folder, even if we have direct access file.
Used for extracting the cached archived based on cache_path_encoding
:param bool force: Force archive extraction even if target folder exists
:return: cached folder containing the extracted archive content
"""
if not cached_file:
@ -133,7 +142,7 @@ class StorageManager(object):
target_folder = cache_folder / CacheManager.get_context_folder_lookup(
cache_context).format(archive_suffix, name)
if target_folder.is_dir():
if target_folder.is_dir() and not force:
# noinspection PyBroadException
try:
target_folder.touch(exist_ok=True)
@ -143,9 +152,14 @@ class StorageManager(object):
base_logger = LoggerRoot.get_base_logger()
try:
# if target folder exists, meaning this is forced ao we extract directly into target folder
if target_folder.is_dir():
temp_target_folder = target_folder
else:
temp_target_folder = cache_folder / "{0}_{1}_{2}".format(
target_folder.name, time() * 1000, str(random()).replace('.', ''))
temp_target_folder.mkdir(parents=True, exist_ok=True)
if suffix == ".zip":
ZipFile(cached_file.as_posix()).extractall(path=temp_target_folder.as_posix())
elif suffix == ".tar.gz":
@ -155,6 +169,7 @@ class StorageManager(object):
with tarfile.open(cached_file.as_posix(), mode='r:gz') as file:
file.extractall(temp_target_folder.as_posix())
if temp_target_folder != target_folder:
# we assume we will have such folder if we already extract the file
# noinspection PyBroadException
try:

View File

@ -179,6 +179,22 @@ class Lock(object):
pass
self.fh = None
def delete_lock_file(self):
# type: () -> bool
"""
Remove the local file used for locking (fail if file is locked)
:return: True is successful
"""
if self.fh:
return False
# noinspection PyBroadException
try:
os.unlink(path=self.filename)
except BaseException:
return False
return True
def _get_fh(self):
'''Get a new filehandle'''
return open(self.filename, self.mode, **self.file_open_kwargs)