Add clearml-data support for links (#585)

This commit is contained in:
allegroai 2022-04-27 17:00:09 +03:00
parent 42fa0dde65
commit 24da3e3e08
5 changed files with 650 additions and 168 deletions

View File

@ -78,7 +78,7 @@ def cli():
create.add_argument('--tags', type=str, nargs='*', help='Dataset user Tags')
create.set_defaults(func=ds_create)
add = subparsers.add_parser('add', help='Add files to the dataset')
add = subparsers.add_parser('add', help='Add files or links to the dataset')
add.add_argument('--id', type=str, required=False,
help='Previously created dataset id. Default: previously created/accessed dataset')
add.add_argument('--dataset-folder', type=str, default=None,
@ -86,6 +86,15 @@ def cli():
add.add_argument('--files', type=str, nargs='*',
help='Files / folders to add (support for wildcard selection). '
'Example: ~/data/*.jpg ~/data/jsons')
add.add_argument(
"--links",
type=str,
nargs="*",
help=(
"Links to files / folders to add. Supports s3, gs, azure links. "
"Example: s3://bucket/data azure://bucket/folder"
),
)
add.add_argument('--non-recursive', action='store_true', default=False,
help='Disable recursive scan of files')
add.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting')
@ -120,17 +129,17 @@ def cli():
sync.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting')
sync.set_defaults(func=ds_sync)
remove = subparsers.add_parser('remove', help='Remove files from the dataset')
remove = subparsers.add_parser('remove', help='Remove files/links from the dataset')
remove.add_argument('--id', type=str, required=False,
help='Previously created dataset id. Default: previously created/accessed dataset')
remove.add_argument('--files', type=str, required=True, nargs='*',
remove.add_argument('--files', type=str, required=False, nargs='*',
help='Files / folders to remove (support for wildcard selection). '
'Notice: File path is the dataset path not the local path. '
'Example: data/*.jpg data/jsons/')
remove.add_argument('--non-recursive', action='store_true', default=False,
help='Disable recursive scan of files')
remove.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting')
remove.set_defaults(func=ds_remove_files)
remove.set_defaults(func=ds_remove)
upload = subparsers.add_parser('upload', help='Upload the local dataset changes to the server')
upload.add_argument('--id', type=str, required=False,
@ -271,7 +280,7 @@ def ds_verify(args):
def ds_get(args):
print('Download dataset id {}'.format(args.id))
print("Download dataset id {}".format(args.id))
check_null_id(args)
print_args(args)
ds = Dataset.get(dataset_id=args.id)
@ -307,7 +316,7 @@ def ds_get(args):
if args.link:
os.symlink(ds_folder, args.link)
ds_folder = args.link
print('Dataset local copy available: {}'.format(ds_folder))
print("Dataset local copy available for files at: {}".format(ds_folder))
return 0
@ -321,16 +330,18 @@ def ds_list(args):
print('-' * len(formatting.replace(',', '').format('-', '-', '-')))
filters = args.filter if args.filter else [None]
file_entries = ds.file_entries_dict
link_entries = ds.link_entries_dict
num_files = 0
total_size = 0
for mask in filters:
files = ds.list_files(dataset_path=mask, dataset_id=ds.id if args.modified else None)
num_files += len(files)
for f in files:
e = file_entries[f]
print(formatting.format(e.relative_path, e.size, e.hash))
e = link_entries.get(f)
if file_entries.get(f):
e = file_entries[f]
print(formatting.format(e.relative_path, e.size, str(e.hash)))
total_size += e.size
print('Total {} files, {} bytes'.format(num_files, total_size))
return 0
@ -424,17 +435,16 @@ def ds_upload(args):
return 0
def ds_remove_files(args):
print('Removing files/folder from dataset id {}'.format(args.id))
def ds_remove(args):
print("Removing files/folder from dataset id {}".format(args.id))
check_null_id(args)
print_args(args)
ds = Dataset.get(dataset_id=args.id)
num_files = 0
for file in args.files:
num_files += ds.remove_files(
dataset_path=file,
recursive=not args.non_recursive, verbose=args.verbose)
print('{} files removed'.format(num_files))
for file in (args.files or []):
num_files += ds.remove_files(dataset_path=file, recursive=not args.non_recursive, verbose=args.verbose)
message = "{} file{} removed".format(num_files, "s" if num_files != 1 else "")
print(message)
return 0
@ -476,16 +486,21 @@ def ds_sync(args):
def ds_add(args):
print('Adding files/folder to dataset id {}'.format(args.id))
print("Adding files/folder/links to dataset id {}".format(args.id))
check_null_id(args)
print_args(args)
ds = Dataset.get(dataset_id=args.id)
num_files = 0
for file in args.files:
for file in args.files or []:
num_files += ds.add_files(
path=file, recursive=not args.non_recursive,
verbose=args.verbose, dataset_path=args.dataset_folder or None)
print('{} file{} added'.format(num_files, 's' if num_files > 1 else ''))
path=file, recursive=not args.non_recursive, verbose=args.verbose, dataset_path=args.dataset_folder or None
)
for link in args.links or []:
num_files += ds.add_external_files(
link, dataset_path=args.dataset_folder or None, recursive=not args.non_recursive, verbose=args.verbose
)
message = "{} file{} added".format(num_files, "s" if num_files != 1 else "")
print(message)
return 0
@ -505,9 +520,9 @@ def main():
exit(cli())
except KeyboardInterrupt:
print('\nUser aborted')
except Exception as ex:
print('\nError: {}'.format(ex))
exit(1)
#except Exception as ex:
# print('\nError: {}'.format(ex))
# exit(1)
if __name__ == '__main__':

View File

@ -2,7 +2,6 @@ import json
import os
import shutil
from copy import deepcopy, copy
from fnmatch import fnmatch
from multiprocessing import cpu_count
from multiprocessing.pool import ThreadPool
from tempfile import mkstemp, mkdtemp
@ -21,6 +20,7 @@ from ..debugging.log import LoggerRoot
from ..storage.helper import StorageHelper
from ..storage.cache import CacheManager
from ..storage.util import sha256sum, is_windows, md5text, format_size
from ..utilities.matching import matches_any_wildcard
try:
from pathlib import Path as _Path # noqa
@ -48,6 +48,24 @@ class FileEntry(object):
return state
@attrs
class LinkEntry(object):
link = attrib(default=None, type=str)
relative_path = attrib(default=None, type=str)
parent_dataset_id = attrib(default=None, type=str)
size = attrib(default=None, type=int)
hash = attrib(default=None, type=str)
def as_dict(self):
# type: () -> Dict
return dict(
link=self.link,
relative_path=self.relative_path,
parent_dataset_id=self.parent_dataset_id,
size=self.size,
)
class Dataset(object):
__private_magic = 42 * 1337
__state_entry_name = 'state'
@ -55,6 +73,7 @@ class Dataset(object):
__data_entry_name_prefix = 'data_'
__cache_context = 'datasets'
__tag = 'dataset'
__external_files_tag = 'external files'
__cache_folder_prefix = 'ds_'
__dataset_folder_template = CacheManager.set_context_folder_lookup(__cache_context, "{0}_archive_{1}")
__preview_max_file_entries = 15000
@ -69,6 +88,7 @@ class Dataset(object):
assert _private == self.__private_magic
# key for the dataset file entries are the relative path within the data
self._dataset_file_entries = {} # type: Dict[str, FileEntry]
self._dataset_link_entries = {} # type: Dict[str, LinkEntry]
# this will create a graph of all the dependencies we have, each entry lists it's own direct parents
self._dependency_graph = {} # type: Dict[str, List[str]]
if task:
@ -98,9 +118,8 @@ class Dataset(object):
# e.g. add_files is called multiple times
task_state = task.artifacts.get('state')
if task_state:
# Metadata is visible in UI, so there will be no underscores there, hence the replace
self.changed_files = {key: task_state.metadata.get(key.replace('_', ' '), 0)
for key in {'files_added', 'files_removed', 'files_modified'}}
self.changed_files = {key: int(task_state.metadata.get(key, 0))
for key in {'files added', 'files removed', 'files modified'}}
else:
self.changed_files = {'files added': 0, 'files removed': 0, 'files modified': 0}
else:
@ -160,6 +179,11 @@ class Dataset(object):
# type: () -> List[FileEntry]
return list(self._dataset_file_entries.values())
@property
def link_entries(self):
# type: () -> List[LinkEntry]
return list(self._dataset_link_entries.values())
@property
def file_entries_dict(self):
# type: () -> Mapping[str, FileEntry]
@ -169,6 +193,15 @@ class Dataset(object):
"""
return self._dataset_file_entries
@property
def link_entries_dict(self):
# type: () -> Mapping[str, LinkEntry]
"""
Notice this call returns an internal representation, do not modify!
:return: dict with relative file path as key, and LinkEntry as value
"""
return self._dataset_link_entries
@property
def project(self):
# type: () -> str
@ -205,7 +238,7 @@ class Dataset(object):
:param path: Add a folder/file to the dataset
:param wildcard: add only specific set of files.
Wildcard matching, can be a single string or a list of wildcards)
Wildcard matching, can be a single string or a list of wildcards.
:param local_base_folder: files will be located based on their relative path from local_base_folder
:param dataset_path: where in the dataset the folder/files should be located
:param recursive: If True match all wildcard files recursively
@ -232,13 +265,126 @@ class Dataset(object):
return num_added
def add_external_files(
self,
source_url, # type: str
wildcard=None, # type: Optional[Union[str, Sequence[str]]]
dataset_path=None, # type: Optional[str]
recursive=True, # type: bool
verbose=False, # type: bool
):
# type: (...) -> ()
"""
Adds an external files or a folder to the current dataset.
External file links can be from cloud storage (s3://, gs://, azure://) or local / network storage (file://).
Calculates file size for each file and compare against parent.
A few examples:
# Adds file.jpg to the dataset. When retrieving a copy of the entire dataset (see dataset.get_local_copy())
# this file will be located in "./my_dataset/new_folder/file.jpg"
add_external_files(source_url="s3://my_bucket/stuff/file.jpg", target_dataset_folder="/my_dataset/new_folder/")
# Adds all jpg files located in s3 bucket called "my_bucket" to the dataset.
add_external_files(source_url="s3://my/bucket/", wildcard = "*.jpg",target_dataset_folder="/my_dataset/new_folder/")
# Adds the entire content of "remote_folder" to the dataset.
add_external_files(source_url="s3://bucket/remote_folder/", target_dataset_folder="/my_dataset/new_folder/")
# Adds the local file "/folder/local_file.jpg" to the dataset.
add_external_files(source_url="file:///folder/local_file.jpg", target_dataset_folder="/my_dataset/new_folder/")
:param source_url: Source url link to add to the dataset,
e.g. s3://bucket/folder/path, s3://bucket/folder/file.csv
:param wildcard: add only specific set of files.
Wildcard matching, can be a single string or a list of wildcards.
:param dataset_path: The location in the dataset where the file will be downloaded into.
E.g: for source_url='s3://bucket/remote_folder/image.jpg' and dataset_path='s3_files',
'image.jpg' will be downloaded to 's3_files/image.jpg' (relative path to the dataset)
:param recursive: If True match all wildcard files recursively
:param verbose: If True print to console files added/modified
:return: number of file links added
"""
self._dirty = True
if dataset_path:
dataset_path = dataset_path.lstrip("/")
if StorageManager.exists_file(source_url):
links = [source_url]
else:
if source_url[-1] != "/":
source_url = source_url + "/"
links = StorageManager.list(source_url, return_full_path=True)
num_added = 0
num_modified = 0
for link in links:
relative_path = link[len(source_url):]
if not relative_path:
relative_path = source_url.split("/")[-1]
if not matches_any_wildcard(relative_path, wildcard, recursive=recursive):
continue
try:
relative_path = Path(os.path.join(dataset_path or ".", relative_path)).as_posix()
size = StorageManager.get_file_size_bytes(link, silence_errors=True)
already_added_file = self._dataset_file_entries.get(relative_path)
if relative_path not in self._dataset_link_entries:
if verbose:
self._task.get_logger().report_text(
"External file {} added".format(link),
print_console=False,
)
self._dataset_link_entries[relative_path] = LinkEntry(
link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size
)
num_added += 1
elif already_added_file and already_added_file.size != size:
if verbose:
self._task.get_logger().report_text(
"External file {} modified".format(link),
print_console=False,
)
del self._dataset_file_entries[relative_path]
self._dataset_link_entries[relative_path] = LinkEntry(
link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size
)
num_modified += 1
elif relative_path in self._dataset_link_entries and self._dataset_link_entries[relative_path].size != size:
if verbose:
self._task.get_logger().report_text(
"External file {} modified".format(link),
print_console=False,
)
self._dataset_link_entries[relative_path] = LinkEntry(
link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size
)
num_modified += 1
else:
if verbose:
self._task.get_logger().report_text(
"External file {} skipped as it was not modified".format(link),
print_console=False,
)
except Exception as e:
if verbose:
self._task.get_logger().report_text(
"Error '{}' encountered trying to add external file {}".format(e, link),
print_console=False,
)
self._task.add_tags([self.__external_files_tag])
self._add_script_call(
"add_external_files",
source_url=source_url,
wildcard=wildcard,
dataset_path=dataset_path,
recursive=recursive,
verbose=verbose,
)
self.update_changed_files(num_files_added=num_added, num_files_modified=num_modified)
self._serialize()
return num_added
def remove_files(self, dataset_path=None, recursive=True, verbose=False):
# type: (Optional[str], bool, bool) -> int
"""
Remove files from the current dataset
:param dataset_path: Remove files from the dataset.
The path is always relative to the dataset (e.g 'folder/file.bin')
The path is always relative to the dataset (e.g 'folder/file.bin').
External files can also be removed by their links (e.g. 's3://bucket/file')
:param recursive: If True match all wildcard files recursively
:param verbose: If True print to console files removed
:return: Number of files removed
@ -251,42 +397,44 @@ class Dataset(object):
if dataset_path and dataset_path.startswith('/'):
dataset_path = dataset_path[1:]
num_files = len(self._dataset_file_entries)
org_files = list(self._dataset_file_entries.keys()) if verbose else None
org_files = list(self._dataset_file_entries.keys()) + list(self._dataset_link_entries.keys())
if not recursive:
self._dataset_file_entries = {
k: v for k, v in self._dataset_file_entries.items()
if not fnmatch(k + '/', dataset_path + '/')}
else:
wildcard = dataset_path.split('/')[-1]
path = dataset_path[:-len(dataset_path)] + '*'
self._dataset_file_entries = {
k: v
for k, v in self._dataset_file_entries.items()
if not matches_any_wildcard(k, dataset_path, recursive=recursive)
}
self._dataset_link_entries = {
k: v
for k, v in self._dataset_link_entries.items()
if not matches_any_wildcard(k, dataset_path, recursive=recursive)
and not matches_any_wildcard(v.link, dataset_path, recursive=recursive)
}
self._dataset_file_entries = {
k: v for k, v in self._dataset_file_entries.items()
if not (fnmatch(k, path) and fnmatch(k if '/' in k else '/{}'.format(k), '*/' + wildcard))}
if verbose and org_files:
for f in org_files:
if f not in self._dataset_file_entries:
removed = 0
for f in org_files:
if f not in self._dataset_file_entries and f not in self._dataset_link_entries:
if verbose:
self._task.get_logger().report_text('Remove {}'.format(f))
removed += 1
# update the task script
self._add_script_call(
'remove_files', dataset_path=dataset_path, recursive=recursive)
num_removed = num_files - len(self._dataset_file_entries)
self._serialize()
# Update state
self.update_changed_files(num_files_removed=num_removed)
return num_removed
self.update_changed_files(num_files_removed=removed)
return removed
def sync_folder(self, local_path, dataset_path=None, verbose=False):
# type: (Union[Path, _Path, str], Union[Path, _Path, str], bool) -> (int, int)
"""
Synchronize the dataset with a local folder. The dataset is synchronized from the
relative_base_folder (default: dataset root) and deeper with the specified local path.
relative_base_folder (default: dataset root) and deeper with the specified local path.
Note that if a remote file is identified in as being modified when syncing, it will
be added as a FileEntry, ready to be uploaded to the ClearML server. This version of the
file is considered "newer" and it will be downloaded instead of the one stored at its
remote address when calling Dataset.get_local_copy().
:param local_path: Local folder to sync (assumes all files and recursive)
:param dataset_path: Target dataset path to sync with (default the root of the dataset)
@ -610,7 +758,7 @@ class Dataset(object):
part=0 -> chunks[0,5], part=1 -> chunks[1,6], part=2 -> chunks[2,7], part=3 -> chunks[3, ]
:param raise_on_error: If True raise exception if dataset merging failed on any file
:return: A the target folder containing the entire dataset
:return: The target folder containing the entire dataset
"""
assert self._id
target_folder = Path(target_folder).absolute()
@ -634,7 +782,7 @@ class Dataset(object):
# type: (Optional[str], bool, Optional[str]) -> List[str]
"""
returns a list of files in the current dataset
If dataset_id is provided, return a list of files that remained unchanged since the specified dataset_version
If dataset_id is provided, return a list of files that remained unchanged since the specified dataset_id
:param dataset_path: Only match files matching the dataset_path (including wildcards).
Example: 'folder/sub/*.json'
@ -645,26 +793,44 @@ class Dataset(object):
:return: List of files with relative path
(files might not be available locally until get_local_copy() is called)
"""
files = self._dataset_file_entries.keys() if not dataset_id else \
[k for k, v in self._dataset_file_entries.items() if v.parent_dataset_id == dataset_id]
files = (
list(self._dataset_file_entries.keys())
if not dataset_id
else [
k
for k, v in self._dataset_file_entries.items()
if v.parent_dataset_id == dataset_id
]
)
files.extend(
list(self._dataset_link_entries.keys())
if not dataset_id
else [
k
for k, v in self._dataset_link_entries.items()
if v.parent_dataset_id == dataset_id
]
)
files = list(set(files))
if not dataset_path:
return sorted(files)
if dataset_path.startswith('/'):
if dataset_path.startswith("/"):
dataset_path = dataset_path[1:]
if not recursive:
return sorted([k for k in files if fnmatch(k + '/', dataset_path + '/')])
wildcard = dataset_path.split('/')[-1]
path = dataset_path[:-len(wildcard)] + '*'
return sorted([k for k in files if fnmatch(k, path) and fnmatch(k, '*/' + wildcard)])
return sorted(
[
f
for f in files
if matches_any_wildcard(f, dataset_path, recursive=recursive)
]
)
def list_removed_files(self, dataset_id=None):
# type: (str) -> List[str]
"""
return a list of files removed when comparing to a specific dataset_version
return a list of files removed when comparing to a specific dataset_id
:param dataset_id: dataset id (str) to compare against, if None is given compare against the parents datasets
:return: List of files with relative path
@ -675,14 +841,17 @@ class Dataset(object):
for ds_id in datasets:
dataset = self.get(dataset_id=ds_id)
unified_list |= set(dataset._dataset_file_entries.keys())
unified_list |= set(dataset._dataset_link_entries.keys())
removed_list = [f for f in unified_list if f not in self._dataset_file_entries]
removed_list = [
f for f in unified_list if f not in self._dataset_file_entries and f not in self._dataset_link_entries
]
return sorted(removed_list)
def list_modified_files(self, dataset_id=None):
# type: (str) -> List[str]
"""
return a list of files modified when comparing to a specific dataset_version
return a list of files modified when comparing to a specific dataset_id
:param dataset_id: dataset id (str) to compare against, if None is given compare against the parents datasets
:return: List of files with relative path
@ -693,15 +862,29 @@ class Dataset(object):
for ds_id in datasets:
dataset = self.get(dataset_id=ds_id)
unified_list.update(dict((k, v.hash) for k, v in dataset._dataset_file_entries.items()))
modified_list = [k for k, v in self._dataset_file_entries.items()
if k in unified_list and v.hash != unified_list[k]]
return sorted(modified_list)
unified_list_sizes = dict()
for ds_id in datasets:
dataset = self.get(dataset_id=ds_id)
for k, v in dataset._dataset_link_entries.items():
unified_list_sizes[k] = v.size
if k in dataset._dataset_file_entries:
unified_list_sizes[k] = dataset._dataset_file_entries[k].size
for k, v in self._dataset_link_entries.items():
if k not in unified_list_sizes:
continue
size = v.size
if k in self._dataset_file_entries:
size = self._dataset_file_entries[k].size
if size != unified_list_sizes[k]:
modified_list.append(k)
return sorted(list(set(modified_list)))
def list_added_files(self, dataset_id=None):
# type: (str) -> List[str]
"""
return a list of files added when comparing to a specific dataset_version
return a list of files added when comparing to a specific dataset_id
:param dataset_id: dataset id (str) to compare against, if None is given compare against the parents datasets
:return: List of files with relative path
@ -712,9 +895,13 @@ class Dataset(object):
for ds_id in datasets:
dataset = self.get(dataset_id=ds_id)
unified_list |= set(dataset._dataset_file_entries.keys())
added_list = [f for f in self._dataset_file_entries.keys() if f not in unified_list]
return sorted(added_list)
unified_list |= set(dataset._dataset_link_entries.keys())
added_list = [
f
for f in list(self._dataset_file_entries.keys()) + list(self._dataset_link_entries.keys())
if f not in unified_list
]
return sorted(list(set(added_list)))
def get_dependency_graph(self):
"""
@ -831,9 +1018,11 @@ class Dataset(object):
# merge datasets according to order
dataset_file_entries = {}
dataset_link_entries = {}
dependency_graph = {}
for p in parent_datasets:
dataset_file_entries.update(deepcopy(p._dataset_file_entries))
dataset_link_entries.update(deepcopy(p._dataset_link_entries))
dependency_graph.update(deepcopy(p._dependency_graph))
instance = cls(_private=cls.__private_magic,
dataset_project=dataset_project,
@ -843,6 +1032,7 @@ class Dataset(object):
instance._using_current_task = use_current_task
instance._task.get_logger().report_text('Dataset created', print_console=False)
instance._dataset_file_entries = dataset_file_entries
instance._dataset_link_entries = dataset_link_entries
instance._dependency_graph = dependency_graph
instance._dependency_graph[instance._id] = [p._id for p in parent_datasets]
instance._serialize()
@ -1073,7 +1263,7 @@ class Dataset(object):
temp_folder = Path(mkdtemp(prefix='squash-datasets.'))
pool = ThreadPool()
for ds in datasets:
base_folder = Path(ds._extract_dataset_archive())
base_folder = Path(ds._get_dataset_files())
files = [f.relative_path for f in ds.file_entries if f.parent_dataset_id == ds.id]
pool.map(
lambda x:
@ -1087,6 +1277,8 @@ class Dataset(object):
dataset_project=datasets[0].project, dataset_name=dataset_name, parent_datasets=list(parents))
squashed_ds._task.get_logger().report_text('Squashing dataset', print_console=False)
squashed_ds.add_files(temp_folder)
for ds in datasets:
squashed_ds._dataset_link_entries.update(ds._dataset_link_entries)
squashed_ds.upload(output_url=output_url)
squashed_ds.finalize()
return squashed_ds
@ -1148,11 +1340,13 @@ class Dataset(object):
:param recursive: If True match all wildcard files recursively
:param verbose: If True print to console added files
"""
if dataset_path and dataset_path.startswith('/'):
dataset_path = dataset_path[1:]
if dataset_path:
dataset_path = dataset_path.lstrip("/")
path = Path(path)
local_base_folder = Path(local_base_folder or path)
wildcard = wildcard or '*'
wildcard = wildcard or ["*"]
if isinstance(wildcard, str):
wildcard = [wildcard]
# single file, no need for threading
if path.is_file():
if not local_base_folder.is_dir():
@ -1168,13 +1362,19 @@ class Dataset(object):
raise ValueError("Could not find file/folder \'{}\'", path.as_posix())
# prepare a list of files
files = list(path.rglob(wildcard)) if recursive else list(path.glob(wildcard))
file_entries = []
for w in wildcard:
files = list(path.rglob(w)) if recursive else list(path.glob(w))
file_entries.extend([f for f in files if f.is_file()])
file_entries = list(set(file_entries))
file_entries = [
FileEntry(
parent_dataset_id=self._id,
local_path=f.absolute().as_posix(),
relative_path=(Path(dataset_path or '.') / f.relative_to(local_base_folder)).as_posix())
for f in files if f.is_file()]
relative_path=(Path(dataset_path or ".") / f.relative_to(local_base_folder)).as_posix(),
)
for f in file_entries
]
self._task.get_logger().report_text('Generating SHA2 hash for {} files'.format(len(file_entries)))
pool = ThreadPool(cpu_count() * 2)
try:
@ -1196,10 +1396,16 @@ class Dataset(object):
for f in file_entries:
ds_cur_f = self._dataset_file_entries.get(f.relative_path)
if not ds_cur_f:
if (
f.relative_path in self._dataset_link_entries
and f.size == self._dataset_link_entries[f.relative_path].size
):
continue
if verbose:
self._task.get_logger().report_text('Add {}'.format(f.relative_path))
self._dataset_file_entries[f.relative_path] = f
count += 1
if f.relative_path not in self._dataset_link_entries:
count += 1
elif ds_cur_f.hash != f.hash:
if verbose:
self._task.get_logger().report_text('Modified {}'.format(f.relative_path))
@ -1255,6 +1461,7 @@ class Dataset(object):
state = dict(
dataset_file_entries=[f.as_dict() for f in self._dataset_file_entries.values()],
dataset_link_entries=[link.as_dict() for link in self._dataset_link_entries.values()],
dependency_graph=self._dependency_graph,
id=self._id,
dirty=self._dirty,
@ -1286,11 +1493,11 @@ class Dataset(object):
:param num_files_removed: Amount of files removed when compared to the parent dataset
"""
if num_files_added:
self.changed_files['files added'] += num_files_added
self.changed_files["files added"] += num_files_added
if num_files_removed:
self.changed_files['files removed'] += num_files_removed
self.changed_files["files removed"] += num_files_removed
if num_files_modified:
self.changed_files['files modified'] += num_files_modified
self.changed_files["files modified"] += num_files_modified
def _download_dataset_archives(self):
"""
@ -1299,6 +1506,104 @@ class Dataset(object):
"""
pass # TODO: implement
def _get_dataset_files(
self,
force=False,
selected_chunks=None,
lock_target_folder=False,
cleanup_target_folder=True,
target_folder=None,
):
# type: (bool, Optional[List[int]], bool, bool, Optional[Path]) -> str
"""
First, extracts the archive present on the ClearML server containing this dataset's files.
Then, download the remote files. Note that if a remote file was added to the ClearML server, then
it won't be downloaded from the remote storage unless it is added again using
Dataset.add_external_files().
:param force: If True extract dataset content even if target folder exists and is not empty
:param selected_chunks: Optional, if provided only download the selected chunks (index) of the Dataset.
Example: Assuming 8 chunks on this version
selected_chunks=[0,1,2]
:param lock_target_folder: If True, local the target folder so the next cleanup will not delete
Notice you should unlock it manually, or wait for the process to finish for auto unlocking.
:param cleanup_target_folder: If True remove target folder recursively
:param target_folder: If provided use the specified target folder, default, auto generate from Dataset ID.
:return: Path to the local storage where the data was downloaded
"""
local_folder = self._extract_dataset_archive(
force=force,
selected_chunks=selected_chunks,
lock_target_folder=lock_target_folder,
cleanup_target_folder=cleanup_target_folder,
target_folder=target_folder,
)
self._download_external_files(
target_folder=target_folder, lock_target_folder=lock_target_folder
)
return local_folder
def _download_external_files(
self, target_folder=None, lock_target_folder=False
):
# (Union(Path, str), bool) -> None
"""
Downloads external files in the dataset. These files will be downloaded
at relative_path (the path relative to the target_folder). Note that
the download will not overwrite any existing files. Hence, if the file
was already downloaded from the ClearML server, it will not be overwritten.
:param target_folder: If provided use the specified target folder, default, auto generate from Dataset ID.
:param lock_target_folder: If True, local the target folder so the next cleanup will not delete
Notice you should unlock it manually, or wait for the process to finish for auto unlocking.
"""
target_folder = (
Path(target_folder)
if target_folder
else self._create_ds_target_folder(
lock_target_folder=lock_target_folder
)
).as_posix()
dependencies = self._get_dependencies_by_order(
include_unused=False, include_current=True
)
links = {}
for dependency in dependencies:
ds = Dataset.get(dependency)
links.update(ds._dataset_link_entries)
links.update(self._dataset_link_entries)
for relative_path, link in links.items():
target_path = os.path.join(target_folder, relative_path)
if os.path.exists(target_path):
LoggerRoot.get_base_logger().info(
"{} already exists. Skipping downloading {}".format(
target_path, link
)
)
continue
ok = False
error = None
try:
helper = StorageHelper.get(link.link)
ok = helper.download_to_file(
link.link,
target_path,
overwrite_existing=False,
verbose=False,
direct_access=False,
silence_errors=True
)
except Exception as e:
error = e
if not ok:
log_string = "Failed downloading {}".format(link.link)
if error:
log_string += " Error is '{}'".format(error)
LoggerRoot.get_base_logger().info(log_string)
else:
link.size = Path(target_path).stat().st_size
def _extract_dataset_archive(
self,
force=False,
@ -1317,7 +1622,7 @@ class Dataset(object):
Example: Assuming 8 chunks on this version
selected_chunks=[0,1,2]
:param lock_target_folder: If True, local the target folder so the next cleanup will not delete
Notice you should unlock it manually, or wait fro the process to fnish for auto unlocking.
Notice you should unlock it manually, or wait for the process to finish for auto unlocking.
:param cleanup_target_folder: If True remove target folder recursively
:param target_folder: If provided use the specified target folder, default, auto generate from Dataset ID.
@ -1366,13 +1671,6 @@ class Dataset(object):
cached_file=local_zip, name=self._id,
cache_context=self.__cache_context, target_folder=local_folder, force=True)
# download al parts in parallel
# if len(data_artifact_entries) > 1:
# pool = ThreadPool()
# pool.map(_download_part, data_artifact_entries)
# pool.close()
# else:
# _download_part(data_artifact_entries[0])
for d in data_artifact_entries:
_download_part(d)
@ -1458,7 +1756,7 @@ class Dataset(object):
# first get our dataset
if self._id in dependencies_by_order:
self._extract_dataset_archive(
self._get_dataset_files(
force=True,
selected_chunks=chunk_selection.get(self._id) if chunk_selection else None,
cleanup_target_folder=True,
@ -1564,7 +1862,11 @@ class Dataset(object):
instance._dependency_graph = stored_state.get('dependency_graph', {})
instance._dirty = stored_state.get('dirty', False)
instance._dataset_file_entries = {
s['relative_path']: FileEntry(**s) for s in stored_state.get('dataset_file_entries', [])}
s["relative_path"]: FileEntry(**s) for s in stored_state.get("dataset_file_entries", [])
}
instance._dataset_link_entries = {
s["relative_path"]: LinkEntry(**s) for s in stored_state.get("dataset_link_entries", [])
}
if stored_state.get('dependency_chunk_lookup') is not None:
instance._dependency_chunk_lookup = stored_state.get('dependency_chunk_lookup')
@ -1818,7 +2120,7 @@ class Dataset(object):
selected_chunks = chunk_selection.get(dataset_version_id) if chunk_selection else None
ds = Dataset.get(dataset_id=dataset_version_id)
ds_base_folder = Path(ds._extract_dataset_archive(
ds_base_folder = Path(ds._get_dataset_files(
selected_chunks=selected_chunks,
force=force,
lock_target_folder=True,
@ -1884,6 +2186,11 @@ class Dataset(object):
verified = False
break
for f in self._dataset_link_entries.values():
if (target_base_folder / f.relative_path).stat().st_size != f.size:
verified = False
break
except Exception:
verified = False

View File

@ -30,7 +30,6 @@ from requests.exceptions import ConnectionError
from six import binary_type, StringIO
from six.moves.queue import Queue, Empty
from six.moves.urllib.parse import urlparse
from six.moves.urllib.request import url2pathname
from .callbacks import UploadProgressReport, DownloadProgressReport
from .util import quote_url
@ -246,7 +245,7 @@ class StorageHelper(object):
pass
force_create = kwargs.pop('__force_create', False)
if (instance_key in cls._helpers) and (not force_create):
if (instance_key in cls._helpers) and (not force_create) and base_url != "file://":
return cls._helpers[instance_key]
# Don't canonize URL since we already did it
@ -372,24 +371,22 @@ class StorageHelper(object):
# if this is not a known scheme assume local file
# If the scheme is file, use only the path segment, If not, use the entire URL
if self._scheme == 'file':
if self._scheme == "file":
url = parsed.path
url = url.replace("\\", "/")
# url2pathname is specifically intended to operate on (urlparse result).path
# and returns a cross-platform compatible result
driver_uri = url2pathname(url)
path_driver_uri = Path(driver_uri)
# if path_driver_uri.is_file():
# driver_uri = str(path_driver_uri.parent)
# elif not path_driver_uri.exists():
# # assume a folder and create
# # Path(driver_uri).mkdir(parents=True, exist_ok=True)
# pass
self._driver = _FileStorageDriver(str(path_driver_uri.root))
self._container = None
url = parsed.path
if parsed.netloc:
url = os.path.join(parsed.netloc, url.lstrip(os.path.sep))
self._driver = _FileStorageDriver(Path(url))
# noinspection PyBroadException
try:
self._container = self._driver.get_container("")
except Exception:
self._container = None
@classmethod
def terminate_uploads(cls, force=True, timeout=2.0):
@ -506,6 +503,39 @@ class StorageHelper(object):
"""
cls._path_substitutions = list()
def get_object_size_bytes(self, remote_url, silence_errors=False):
# type: (str, bool) -> [int, None]
"""
Get size of the remote file in bytes.
:param str remote_url: The url where the file is stored.
E.g. 's3://bucket/some_file.txt', 'file://local/file'
:param bool silence_errors: Silence errors that might occur
when fetching the size of the file. Default: False
:return: The size of the file in bytes.
None if the file could not be found or an error occurred.
"""
size = None
obj = self.get_object(remote_url, silence_errors=silence_errors)
if not obj:
return None
try:
if isinstance(self._driver, _HttpDriver) and obj:
obj = self._driver._get_download_object(obj) # noqa
size = obj.headers.get("Content-Length", 0)
elif hasattr(obj, "size"):
size = obj.size
# Google storage has the option to reload the object to get the size
if size is None and hasattr(obj, "reload"):
obj.reload()
size = obj.size
elif hasattr(obj, "content_length"):
size = obj.content_length
except (ValueError, AttributeError, KeyError):
pass
return size
def verify_upload(self, folder_uri='', raise_on_error=True, log_on_error=True):
"""
Verify that this helper can upload files to a folder.
@ -636,11 +666,14 @@ class StorageHelper(object):
except TypeError:
res = self._driver.list_container_objects(self._container)
return [
result = [
obj.name
for obj in res if
obj.name.startswith(prefix) and obj.name != prefix
for obj in res
if (obj.name.startswith(prefix) or self._base_url == "file://") and obj.name != prefix
]
if self._base_url == "file://":
result = [Path(f).as_posix() for f in result]
return result
else:
return [obj.name for obj in self._driver.list_container_objects(self._container)]
@ -651,7 +684,9 @@ class StorageHelper(object):
overwrite_existing=False,
delete_on_failure=True,
verbose=None,
skip_zero_size_check=False
skip_zero_size_check=False,
silence_errors=False,
direct_access=True
):
def next_chunk(astream):
if isinstance(astream, binary_type):
@ -671,7 +706,7 @@ class StorageHelper(object):
# Check if driver type supports direct access:
direct_access_path = self.get_driver_direct_access(remote_path)
if direct_access_path:
if direct_access_path and direct_access:
return direct_access_path
temp_local_path = None
@ -686,11 +721,15 @@ class StorageHelper(object):
),
)
return local_path
if remote_path.startswith("file://"):
Path(local_path).parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(direct_access_path, local_path)
return local_path
# we download into temp_local_path so that if we accidentally stop in the middle,
# we won't think we have the entire file
temp_local_path = '{}_{}{}'.format(local_path, time(), self._temp_download_suffix)
obj = self._get_object(remote_path)
obj = self.get_object(remote_path, silence_errors=silence_errors)
if not obj:
return None
@ -705,23 +744,9 @@ class StorageHelper(object):
# noinspection PyBroadException
Path(temp_local_path).parent.mkdir(parents=True, exist_ok=True)
# try to get file size
try:
if isinstance(self._driver, _HttpDriver) and obj:
obj = self._driver._get_download_object(obj) # noqa
total_size_mb = float(obj.headers.get('Content-Length', 0)) / (1024 * 1024)
elif hasattr(obj, 'size'):
size = obj.size
# Google storage has the option to reload the object to get the size
if size is None and hasattr(obj, 'reload'):
obj.reload()
size = obj.size
total_size_mb = 0 if size is None else float(size) / (1024 * 1024)
elif hasattr(obj, 'content_length'):
total_size_mb = float(obj.content_length) / (1024 * 1024)
except (ValueError, AttributeError, KeyError):
pass
total_size_bytes = self.get_object_size_bytes(remote_path, silence_errors=silence_errors)
if total_size_bytes is not None:
total_size_mb = float(total_size_bytes) / (1024 * 1024)
# if driver supports download with callback, use it (it might be faster)
if hasattr(self._driver, 'download_object'):
@ -787,7 +812,7 @@ class StorageHelper(object):
def download_as_stream(self, remote_path, chunk_size=None):
remote_path = self._canonize_url(remote_path)
try:
obj = self._get_object(remote_path)
obj = self.get_object(remote_path)
return self._driver.download_object_as_stream(
obj, chunk_size=chunk_size, verbose=self._verbose, log=self.log
)
@ -815,7 +840,7 @@ class StorageHelper(object):
self._log.error("Could not download file : %s, err:%s " % (remote_path, str(e)))
def delete(self, path):
return self._driver.delete_object(self._get_object(path))
return self._driver.delete_object(self.get_object(path))
def check_write_permissions(self, dest_path=None):
# create a temporary file, then delete it
@ -1024,7 +1049,18 @@ class StorageHelper(object):
return dest_path
def _get_object(self, path):
def get_object(self, path, silence_errors=False):
# type: (str, bool) -> object
"""
Gets the remote object stored at path. The data held by the object
differs depending on where it is stored.
:param str path: the path where the remote object is stored
:param bool silence_errors: Silence errors that might occur
when fetching the remote object
:return: The remote object
"""
object_name = self._normalize_object_name(path)
try:
return self._driver.get_object(
@ -1032,7 +1068,8 @@ class StorageHelper(object):
except ConnectionError:
raise DownloadError
except Exception as e:
self.log.warning('Storage helper problem for {}: {}'.format(str(object_name), str(e)))
if not silence_errors:
self.log.warning("Storage helper problem for {}: {}".format(str(object_name), str(e)))
return None
@staticmethod
@ -2253,6 +2290,16 @@ class _FileStorageDriver(_Driver):
:return: An Object instance.
"""
if os.path.isfile(os.path.join(container_name, object_name)):
return self.Object(
name=object_name,
container=container_name,
size=Path(object_name).stat().st_size,
driver=self,
extra=None,
hash=None,
meta_data=None,
)
container = self._make_container(container_name)
return self._make_object(container, object_name)
@ -2352,8 +2399,8 @@ class _FileStorageDriver(_Driver):
:param extra: (optional) Extra attributes (driver specific).
:type extra: ``dict``
"""
path = self.get_container_cdn_url(container, check=True)
print(file_path, container, object_name)
path = self.get_container_cdn_url(container, check=False)
obj_path = os.path.join(path, object_name)
base_path = os.path.dirname(obj_path)

View File

@ -7,6 +7,7 @@ from random import random
from time import time
from typing import List, Optional
from zipfile import ZipFile
from six.moves.urllib.parse import urlparse
from pathlib2 import Path
@ -25,9 +26,9 @@ class StorageManager(object):
@classmethod
def get_local_copy(
cls, remote_url, cache_context=None, extract_archive=True, name=None, force_download=False,
cls, remote_url, cache_context=None, extract_archive=True, name=None, force_download=False
):
# type: (str, Optional[str], bool, Optional[str], bool) -> str
# type: (str, Optional[str], bool, Optional[str], bool) -> [str, None]
"""
Get a local copy of the remote file. If the remote URL is a direct file access,
the returned link is the same, otherwise a link to a local copy of the url file is returned.
@ -49,7 +50,6 @@ class StorageManager(object):
cache_path_encoding = Path(cache.get_cache_folder()) / cache.get_hashed_url_file(remote_url)
return cls._extract_to_cache(
cached_file, name, cache_context, cache_path_encoding=cache_path_encoding.as_posix())
return cached_file
@classmethod
@ -249,17 +249,99 @@ class StorageManager(object):
res.wait()
@classmethod
def download_folder(
cls, remote_url, local_folder=None, match_wildcard=None, overwrite=False, skip_zero_size_check=False
def download_file(
cls, remote_url, local_folder=None, overwrite=False, skip_zero_size_check=False, silence_errors=False
):
# type: (str, Optional[str], Optional[str], bool, bool) -> Optional[str]
# type: (str, Optional[str], bool, bool, bool) -> Optional[str]
"""
Download remote file to the local machine, maintaining the sub folder structure from the
remote storage.
.. note::
If we have a remote file `s3://bucket/sub/file.ext` then
`StorageManager.download_file('s3://bucket/sub/file.ext', '~/folder/')`
will create `~/folder/sub/file.ext`
:param str remote_url: Source remote storage location, path of `remote_url` will
be created under the target local_folder. Supports S3/GS/Azure and shared filesystem.
Example: 's3://bucket/data/'
:param bool overwrite: If False, and target files exist do not download.
If True always download the remote files. Default False.
:param bool skip_zero_size_check: If True no error will be raised for files with zero bytes size.
:param bool silence_errors: If True, silence errors that might pop up when trying to downlaod
files stored remotely. Default False
:return: Path to downloaded file or None on error
"""
if not local_folder:
local_folder = CacheManager.get_cache_manager().get_cache_folder()
local_path = os.path.join(
str(Path(local_folder).absolute()), str(Path(urlparse(remote_url).path)).lstrip(os.path.sep)
)
helper = StorageHelper.get(remote_url)
return helper.download_to_file(
remote_url,
local_path,
overwrite_existing=overwrite,
skip_zero_size_check=skip_zero_size_check,
silence_errors=silence_errors,
)
@classmethod
def exists_file(cls, remote_url):
# type: (str) -> bool
"""
Check if remote file exists. Note that this function will return
False for directories.
:param str remote_url: The url where the file is stored.
E.g. 's3://bucket/some_file.txt', 'file://local/file'
:return: True is the remote_url stores a file and False otherwise
"""
if remote_url.startswith('file://'):
return os.path.isfile(remote_url[len('file://'):])
helper = StorageHelper.get(remote_url)
obj = helper.get_object(remote_url)
if not obj:
return False
return len(StorageManager.list(remote_url)) == 0
@classmethod
def get_file_size_bytes(cls, remote_url, silence_errors=False):
# type: (str, bool) -> [int, None]
"""
Get size of the remote file in bytes.
:param str remote_url: The url where the file is stored.
E.g. 's3://bucket/some_file.txt', 'file://local/file'
:param bool silence_errors: Silence errors that might occur
when fetching the size of the file. Default: False
:return: The size of the file in bytes.
None if the file could not be found or an error occurred.
"""
helper = StorageHelper.get(remote_url)
return helper.get_object_size_bytes(remote_url)
@classmethod
def download_folder(
cls,
remote_url,
local_folder=None,
match_wildcard=None,
overwrite=False,
skip_zero_size_check=False,
silence_errors=False,
):
# type: (str, Optional[str], Optional[str], bool, bool, bool) -> Optional[str]
"""
Download remote folder recursively to the local machine, maintaining the sub folder structure
from the remote storage.
.. note::
If we have a local file `s3://bucket/sub/file.ext` then
If we have a remote file `s3://bucket/sub/file.ext` then
`StorageManager.download_folder('s3://bucket/', '~/folder/')`
will create `~/folder/sub/file.ext`
@ -273,6 +355,8 @@ class StorageManager(object):
:param bool overwrite: If False, and target files exist do not download.
If True always download the remote files. Default False.
:param bool skip_zero_size_check: If True no error will be raised for files with zero bytes size.
:param bool silence_errors: If True, silence errors that might pop up when trying to downlaod
files stored remotely. Default False
:return: Target local folder
"""
@ -293,26 +377,28 @@ class StorageManager(object):
with ThreadPool() as pool:
for path in helper.list(prefix=remote_url):
remote_path = str(Path(helper.base_url) / Path(path)) \
if helper.get_driver_direct_access(helper.base_url) else \
"{}/{}".format(helper.base_url.rstrip('/'), path.lstrip('/'))
remote_path = (
str(Path(helper.base_url) / Path(path))
if helper.get_driver_direct_access(helper.base_url)
else "{}/{}".format(helper.base_url.rstrip("/"), path.lstrip("/"))
)
if match_wildcard and not fnmatch.fnmatch(remote_path, match_wildcard):
continue
local_url = os.path.join(
str(Path(local_folder)),
str(Path(remote_path[len(remote_url):])).lstrip(os.path.sep)
)
if not os.path.exists(local_url) or os.path.getsize(local_url) == 0:
results.append(
pool.apply_async(
helper.download_to_file,
args=(remote_path, local_url),
kwds={"overwrite_existing": overwrite, "skip_zero_size_check": skip_zero_size_check},
)
results.append(
pool.apply_async(
cls.download_file,
args=(remote_path, local_folder),
kwds={
"overwrite": overwrite,
"skip_zero_size_check": skip_zero_size_check,
"silence_errors": silence_errors,
},
)
)
for res in results:
res.wait()
if not results and not silence_errors:
LoggerRoot.get_base_logger().warning("Did not download any files matching {}".format(remote_url))
return local_folder
@classmethod
@ -338,4 +424,7 @@ class StorageManager(object):
except Exception as ex:
LoggerRoot.get_base_logger().warning("Can not list files for '{}' - {}".format(remote_url, ex))
names_list = None
if helper.base_url == 'file://':
return ["{}/{}".format(remote_url.rstrip('/'), name) for name in names_list] if return_full_path else names_list
return ["{}/{}".format(helper.base_url, name) for name in names_list] if return_full_path else names_list

View File

@ -1,20 +1,44 @@
import fnmatch
from pathlib2 import Path
from fnmatch import fnmatch
from typing import Union
def matches_any_wildcard(pattern, wildcards):
# type: (str, Union[str, list]) -> bool
def matches_any_wildcard(path, wildcards, recursive=True):
# type: (str, Union[str, list], bool) -> bool
"""
Checks if given pattern matches any supplied wildcard
:param pattern: pattern to check
:param path: path to check
:param wildcards: wildcards to check against
:param recursive: whether or not the check is recursive. Default: True
E.g. for path='directory/file.ext' and wildcards='*.ext',
recursive=False will return False, but recursive=True will
return True
:return: True if pattern matches any wildcard and False otherwise
:return: True if the path matches any wildcard and False otherwise
"""
if isinstance(wildcards, str):
path = Path(path).as_posix()
if wildcards is None:
wildcards = ["*"]
if not isinstance(wildcards, list):
wildcards = [wildcards]
wildcards = [str(w) for w in wildcards]
if not recursive:
path = path.split("/")
for wildcard in wildcards:
if fnmatch.fnmatch(pattern, wildcard):
return True
return False
if not recursive:
wildcard = wildcard.split("/")
matched = True
if len(path) != len(wildcard):
continue
for path_segment, wildcard_segment in zip(path, wildcard):
if not fnmatch(path_segment, wildcard_segment):
matched = False
break
if matched:
return True
else:
wildcard_file = wildcard.split("/")[-1]
wildcard_dir = wildcard[: -len(wildcard_file)] + "*"
if fnmatch(path, wildcard_dir) and fnmatch("/" + path, "*/" + wildcard_file):
return True