mirror of
https://github.com/clearml/clearml
synced 2025-04-20 22:36:58 +00:00
Improve dataset version table
This commit is contained in:
parent
a8c2097fec
commit
fc76b9b423
@ -92,6 +92,17 @@ class Dataset(object):
|
|||||||
task.set_system_tags(task_system_tags + [self.__tag])
|
task.set_system_tags(task_system_tags + [self.__tag])
|
||||||
if dataset_tags:
|
if dataset_tags:
|
||||||
task.set_tags((task.get_tags() or []) + list(dataset_tags))
|
task.set_tags((task.get_tags() or []) + list(dataset_tags))
|
||||||
|
|
||||||
|
# Keep track of modified files (added, removed, modified)
|
||||||
|
# We also load the metadata from the existing task into this one, so we can add when
|
||||||
|
# e.g. add_files is called multiple times
|
||||||
|
task_state = task.artifacts.get('state')
|
||||||
|
if task_state:
|
||||||
|
# Metadata is visible in UI, so there will be no underscores there, hence the replace
|
||||||
|
self.changed_files = {key: task_state.metadata.get(key.replace('_', ' '), 0)
|
||||||
|
for key in {'files_added', 'files_removed', 'files_modified'}}
|
||||||
|
else:
|
||||||
|
self.changed_files = {'files added': 0, 'files removed': 0, 'files modified': 0}
|
||||||
else:
|
else:
|
||||||
self._created_task = True
|
self._created_task = True
|
||||||
task = Task.create(
|
task = Task.create(
|
||||||
@ -121,6 +132,8 @@ class Dataset(object):
|
|||||||
# set the newly created Dataset parent ot the current Task, so we know who created it.
|
# set the newly created Dataset parent ot the current Task, so we know who created it.
|
||||||
if Task.current_task() and Task.current_task().id != task.id:
|
if Task.current_task() and Task.current_task().id != task.id:
|
||||||
task.set_parent(Task.current_task())
|
task.set_parent(Task.current_task())
|
||||||
|
# Set the modified files to empty on dataset creation
|
||||||
|
self.changed_files = {'files added': 0, 'files removed': 0, 'files modified': 0}
|
||||||
|
|
||||||
# store current dataset Task
|
# store current dataset Task
|
||||||
self._task = task
|
self._task = task
|
||||||
@ -206,7 +219,7 @@ class Dataset(object):
|
|||||||
dataset_path=dataset_path, recursive=recursive, verbose=verbose)),
|
dataset_path=dataset_path, recursive=recursive, verbose=verbose)),
|
||||||
print_console=False)
|
print_console=False)
|
||||||
|
|
||||||
num_added = self._add_files(
|
num_added, num_modified = self._add_files(
|
||||||
path=path, wildcard=wildcard, local_base_folder=local_base_folder,
|
path=path, wildcard=wildcard, local_base_folder=local_base_folder,
|
||||||
dataset_path=dataset_path, recursive=recursive, verbose=verbose)
|
dataset_path=dataset_path, recursive=recursive, verbose=verbose)
|
||||||
|
|
||||||
@ -262,9 +275,12 @@ class Dataset(object):
|
|||||||
self._add_script_call(
|
self._add_script_call(
|
||||||
'remove_files', dataset_path=dataset_path, recursive=recursive)
|
'remove_files', dataset_path=dataset_path, recursive=recursive)
|
||||||
|
|
||||||
|
num_removed = num_files - len(self._dataset_file_entries)
|
||||||
self._serialize()
|
self._serialize()
|
||||||
|
# Update state
|
||||||
|
self.update_changed_files(num_files_removed=num_removed)
|
||||||
|
|
||||||
return num_files - len(self._dataset_file_entries)
|
return num_removed
|
||||||
|
|
||||||
def sync_folder(self, local_path, dataset_path=None, verbose=False):
|
def sync_folder(self, local_path, dataset_path=None, verbose=False):
|
||||||
# type: (Union[Path, _Path, str], Union[Path, _Path, str], bool) -> (int, int)
|
# type: (Union[Path, _Path, str], Union[Path, _Path, str], bool) -> (int, int)
|
||||||
@ -299,22 +315,26 @@ class Dataset(object):
|
|||||||
num_files = len(self._dataset_file_entries)
|
num_files = len(self._dataset_file_entries)
|
||||||
self._dataset_file_entries = {
|
self._dataset_file_entries = {
|
||||||
k: f for k, f in self._dataset_file_entries.items() if filter_f(f)}
|
k: f for k, f in self._dataset_file_entries.items() if filter_f(f)}
|
||||||
removed_files = num_files - len(self._dataset_file_entries)
|
num_removed = num_files - len(self._dataset_file_entries)
|
||||||
|
# Update the internal state
|
||||||
|
self.update_changed_files(num_files_removed=num_removed)
|
||||||
|
|
||||||
# add remaining files
|
# add remaining files, state is updated in _add_files
|
||||||
added_files = self._add_files(path=local_path, dataset_path=dataset_path, recursive=True, verbose=verbose)
|
num_added, num_modified = self._add_files(path=local_path, dataset_path=dataset_path,
|
||||||
|
recursive=True, verbose=verbose)
|
||||||
|
|
||||||
|
# How many of the files were modified? AKA have the same name but a different hash
|
||||||
|
|
||||||
if verbose:
|
if verbose:
|
||||||
self._task.get_logger().report_text(
|
self._task.get_logger().report_text(
|
||||||
'Syncing folder {} : {} files removed, {} added / modified'.format(
|
'Syncing folder {} : {} files removed, {} added / modified'.format(
|
||||||
local_path.as_posix(), removed_files, added_files))
|
local_path.as_posix(), num_removed, num_added + num_modified))
|
||||||
|
|
||||||
# update the task script
|
# update the task script
|
||||||
self._add_script_call(
|
self._add_script_call(
|
||||||
'sync_folder', local_path=local_path, dataset_path=dataset_path)
|
'sync_folder', local_path=local_path, dataset_path=dataset_path)
|
||||||
|
|
||||||
self._serialize()
|
return num_removed, num_added, num_modified
|
||||||
return removed_files, added_files
|
|
||||||
|
|
||||||
def upload(self, show_progress=True, verbose=False, output_url=None, compression=None, chunk_size=None):
|
def upload(self, show_progress=True, verbose=False, output_url=None, compression=None, chunk_size=None):
|
||||||
# type: (bool, bool, Optional[str], Optional[str], int) -> ()
|
# type: (bool, bool, Optional[str], Optional[str], int) -> ()
|
||||||
@ -1111,7 +1131,7 @@ class Dataset(object):
|
|||||||
recursive=True, # type: bool
|
recursive=True, # type: bool
|
||||||
verbose=False # type: bool
|
verbose=False # type: bool
|
||||||
):
|
):
|
||||||
# type: (...) -> int
|
# type: (...) -> tuple[int, int]
|
||||||
"""
|
"""
|
||||||
Add a folder into the current dataset. calculate file hash,
|
Add a folder into the current dataset. calculate file hash,
|
||||||
and compare against parent, mark files to be uploaded
|
and compare against parent, mark files to be uploaded
|
||||||
@ -1162,6 +1182,11 @@ class Dataset(object):
|
|||||||
pool.close()
|
pool.close()
|
||||||
self._task.get_logger().report_text('Hash generation completed')
|
self._task.get_logger().report_text('Hash generation completed')
|
||||||
|
|
||||||
|
# Get modified files, files with the same filename but a different hash
|
||||||
|
filename_hash_dict = {fe.relative_path: fe.hash for fe in file_entries}
|
||||||
|
modified_count = len([k for k, v in self._dataset_file_entries.items()
|
||||||
|
if k in filename_hash_dict and v.hash != filename_hash_dict[k]])
|
||||||
|
|
||||||
# merge back into the dataset
|
# merge back into the dataset
|
||||||
count = 0
|
count = 0
|
||||||
for f in file_entries:
|
for f in file_entries:
|
||||||
@ -1192,7 +1217,9 @@ class Dataset(object):
|
|||||||
if verbose:
|
if verbose:
|
||||||
self._task.get_logger().report_text('Unchanged {}'.format(f.relative_path))
|
self._task.get_logger().report_text('Unchanged {}'.format(f.relative_path))
|
||||||
|
|
||||||
return count
|
# We don't count the modified files as added files
|
||||||
|
self.update_changed_files(num_files_added=count - modified_count, num_files_modified=modified_count)
|
||||||
|
return count - modified_count, modified_count
|
||||||
|
|
||||||
def _update_dependency_graph(self):
|
def _update_dependency_graph(self):
|
||||||
"""
|
"""
|
||||||
@ -1238,9 +1265,28 @@ class Dataset(object):
|
|||||||
'Current dependency graph: {2}\n'.format(
|
'Current dependency graph: {2}\n'.format(
|
||||||
len(modified_files), format_size(sum(modified_files)),
|
len(modified_files), format_size(sum(modified_files)),
|
||||||
json.dumps(self._dependency_graph, indent=2, sort_keys=True))
|
json.dumps(self._dependency_graph, indent=2, sort_keys=True))
|
||||||
# store as artifact of the Task.
|
# store as artifact of the Task and add the amount of files added or removed as metadata, so we can use those
|
||||||
|
# later to create the table
|
||||||
self._task.upload_artifact(
|
self._task.upload_artifact(
|
||||||
name=self.__state_entry_name, artifact_object=state, preview=preview, wait_on_upload=True)
|
name=self.__state_entry_name, artifact_object=state, preview=preview, wait_on_upload=True,
|
||||||
|
metadata=self.changed_files
|
||||||
|
)
|
||||||
|
|
||||||
|
def update_changed_files(self, num_files_added=None, num_files_modified=None, num_files_removed=None):
|
||||||
|
"""
|
||||||
|
Update the internal state keeping track of added, modified and removed files.
|
||||||
|
|
||||||
|
:param num_files_added: Amount of files added when compared to the parent dataset
|
||||||
|
:param num_files_modified: Amount of files with the same name but a different hash when
|
||||||
|
compared to the parent dataset
|
||||||
|
:param num_files_removed: Amount of files removed when compared to the parent dataset
|
||||||
|
"""
|
||||||
|
if num_files_added:
|
||||||
|
self.changed_files['files added'] += num_files_added
|
||||||
|
if num_files_removed:
|
||||||
|
self.changed_files['files removed'] += num_files_removed
|
||||||
|
if num_files_modified:
|
||||||
|
self.changed_files['files modified'] += num_files_modified
|
||||||
|
|
||||||
def _download_dataset_archives(self):
|
def _download_dataset_archives(self):
|
||||||
"""
|
"""
|
||||||
@ -1616,11 +1662,24 @@ class Dataset(object):
|
|||||||
if f.parent_dataset_id == node:
|
if f.parent_dataset_id == node:
|
||||||
count += 1
|
count += 1
|
||||||
size += f.size
|
size += f.size
|
||||||
removed = len(self.list_removed_files(node))
|
# State is of type clearml.binding.artifacts.Artifact
|
||||||
modified = len(self.list_modified_files(node))
|
node_task = Task.get_task(task_id=node)
|
||||||
|
node_state_metadata = node_task.artifacts.get('state').metadata
|
||||||
|
# Backwards compatibility, if the task was made before the new table change, just use the old system
|
||||||
|
if not node_state_metadata:
|
||||||
|
node_dataset = Dataset.get(dataset_id=node)
|
||||||
|
removed = len(node_dataset.list_removed_files())
|
||||||
|
added = len(node_dataset.list_added_files())
|
||||||
|
modified = len(node_dataset.list_modified_files())
|
||||||
|
else:
|
||||||
|
# TODO: if new system is prevalent, get rid of old system
|
||||||
|
removed = int(node_state_metadata.get('files removed', 0))
|
||||||
|
added = int(node_state_metadata.get('files added', 0))
|
||||||
|
modified = int(node_state_metadata.get('files modified', 0))
|
||||||
|
|
||||||
table_values += [[node, node_names.get(node, ''),
|
table_values += [[node, node_names.get(node, ''),
|
||||||
removed, modified, max(0, count-modified), format_size(size)]]
|
removed, modified, added, format_size(size)]]
|
||||||
node_details[node] = [removed, modified, max(0, count-modified), format_size(size)]
|
node_details[node] = [removed, modified, added, format_size(size)]
|
||||||
|
|
||||||
# create DAG
|
# create DAG
|
||||||
visited = []
|
visited = []
|
||||||
|
Loading…
Reference in New Issue
Block a user