From 68cf2745ffb03c781731996c284e150ca9b5bfbd Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 28 Jun 2022 21:07:46 +0300 Subject: [PATCH] Add support for dataset version, fix trigger for datasets --- clearml/automation/trigger.py | 99 +-- clearml/cli/data/__main__.py | 27 +- clearml/datasets/dataset.py | 1127 +++++++++++++++++++++++++++------ clearml/storage/util.py | 31 +- clearml/utilities/version.py | 39 +- 5 files changed, 1086 insertions(+), 237 deletions(-) diff --git a/clearml/automation/trigger.py b/clearml/automation/trigger.py index 83f1e89b..a34eee84 100644 --- a/clearml/automation/trigger.py +++ b/clearml/automation/trigger.py @@ -274,23 +274,23 @@ class TriggerScheduler(BaseScheduler): self._model_triggers.append(trigger) def add_dataset_trigger( - self, - schedule_task_id=None, # type: Union[str, Task] - schedule_queue=None, # type: str - schedule_function=None, # type: Callable[[str], None] - trigger_project=None, # type: str - trigger_name=None, # type: Optional[str] - trigger_on_publish=None, # type: bool - trigger_on_tags=None, # type: Optional[List[str]] - trigger_on_archive=None, # type: bool - trigger_required_tags=None, # type: Optional[List[str]] - name=None, # type: Optional[str] - target_project=None, # type: Optional[str] - add_tag=True, # type: Union[bool, str] - single_instance=False, # type: bool - reuse_task=False, # type: bool - task_parameters=None, # type: Optional[dict] - task_overrides=None, # type: Optional[dict] + self, + schedule_task_id=None, # type: Union[str, Task] + schedule_queue=None, # type: str + schedule_function=None, # type: Callable[[str], None] + trigger_project=None, # type: str + trigger_name=None, # type: Optional[str] + trigger_on_publish=None, # type: bool + trigger_on_tags=None, # type: Optional[List[str]] + trigger_on_archive=None, # type: bool + trigger_required_tags=None, # type: Optional[List[str]] + name=None, # type: Optional[str] + target_project=None, # type: Optional[str] + add_tag=True, # type: Union[bool, str] + single_instance=False, # type: bool + reuse_task=False, # type: bool + task_parameters=None, # type: Optional[dict] + task_overrides=None, # type: Optional[dict] ): # type: (...) -> None """ @@ -331,26 +331,51 @@ class TriggerScheduler(BaseScheduler): for example {'script.version_num': None, 'script.branch': 'main'} Notice: not available when reuse_task=True :return: True if job is successfully added to the scheduling list """ - trigger = DatasetTrigger( - base_task_id=schedule_task_id, - base_function=schedule_function, - queue=schedule_queue, - name=name, - target_project=target_project, - single_instance=single_instance, - task_parameters=task_parameters, - task_overrides=task_overrides, - add_tag=(add_tag if isinstance(add_tag, str) else (name or schedule_task_id)) if add_tag else None, - clone_task=not bool(reuse_task), - match_name=trigger_name, - project=Task.get_project_id(trigger_project) if trigger_project else None, - tags=trigger_on_tags, - required_tags=trigger_required_tags, - on_publish=trigger_on_publish, - on_archive=trigger_on_archive, - ) - trigger.verify() - self._dataset_triggers.append(trigger) + if trigger_project: + trigger_project_list = Task.get_projects( + name="^{}/\\.datasets/.*".format(trigger_project), search_hidden=True, _allow_extra_fields_=True + ) + for project in trigger_project_list: + trigger = DatasetTrigger( + base_task_id=schedule_task_id, + base_function=schedule_function, + queue=schedule_queue, + name=name, + target_project=target_project, + single_instance=single_instance, + task_parameters=task_parameters, + task_overrides=task_overrides, + add_tag=(add_tag if isinstance(add_tag, str) else (name or schedule_task_id)) if add_tag else None, + clone_task=not bool(reuse_task), + match_name=trigger_name, + project=project.id, + tags=trigger_on_tags, + required_tags=trigger_required_tags, + on_publish=trigger_on_publish, + on_archive=trigger_on_archive, + ) + trigger.verify() + self._dataset_triggers.append(trigger) + else: + trigger = DatasetTrigger( + base_task_id=schedule_task_id, + base_function=schedule_function, + queue=schedule_queue, + name=name, + target_project=target_project, + single_instance=single_instance, + task_parameters=task_parameters, + task_overrides=task_overrides, + add_tag=(add_tag if isinstance(add_tag, str) else (name or schedule_task_id)) if add_tag else None, + clone_task=not bool(reuse_task), + match_name=trigger_name, + tags=trigger_on_tags, + required_tags=trigger_required_tags, + on_publish=trigger_on_publish, + on_archive=trigger_on_archive, + ) + trigger.verify() + self._dataset_triggers.append(trigger) def add_task_trigger( self, diff --git a/clearml/cli/data/__main__.py b/clearml/cli/data/__main__.py index 8351b73c..d90e5ac7 100644 --- a/clearml/cli/data/__main__.py +++ b/clearml/cli/data/__main__.py @@ -75,6 +75,10 @@ def cli(): 'Example: a17b4fID1 f0ee5ID2 a17b4f09eID3') create.add_argument('--project', type=str, required=False, default=None, help='Dataset project name') create.add_argument('--name', type=str, required=True, default=None, help='Dataset name') + create.add_argument("--version", type=str, required=False, default=None, help="Dataset version") + create.add_argument( + "--output-uri", type=str, required=False, default=None, help="Output URI for files in this dataset" + ) create.add_argument('--tags', type=str, nargs='*', help='Dataset user Tags') create.set_defaults(func=ds_create) @@ -115,6 +119,7 @@ def cli(): help='[Optional] Dataset project name') sync.add_argument('--name', type=str, required=False, default=None, help='[Optional] Dataset project name') + sync.add_argument("--version", type=str, required=False, default=None, help="[Optional] Dataset version") sync.add_argument('--tags', type=str, nargs='*', help='[Optional] Dataset user Tags') sync.add_argument('--storage', type=str, default=None, @@ -219,6 +224,7 @@ def cli(): help='Specify dataset id (or use project/name instead). Default: previously accessed dataset.') ls.add_argument('--project', type=str, help='Specify dataset project name') ls.add_argument('--name', type=str, help='Specify dataset name') + ls.add_argument("--version", type=str, help="Specify dataset version", default=None) ls.add_argument('--filter', type=str, nargs='*', help='Filter files based on folder / wildcard, multiple filters are supported. ' 'Example: folder/date_*.json folder/sub-folder') @@ -323,7 +329,12 @@ def ds_get(args): def ds_list(args): print('List dataset content: {}'.format(args.id or (args.project, args.name))) print_args(args) - ds = Dataset.get(dataset_id=args.id or None, dataset_project=args.project or None, dataset_name=args.name or None) + ds = Dataset.get( + dataset_id=args.id or None, + dataset_project=args.project or None, + dataset_name=args.name or None, + dataset_version=args.version, + ) print('Listing dataset content') formatting = '{:64} | {:10,} | {:64}' print(formatting.replace(',', '').format('file name', 'size', 'hash')) @@ -506,13 +517,19 @@ def ds_add(args): def ds_create(args): - print('Creating a new dataset:') + print("Creating a new dataset:") print_args(args) - ds = Dataset.create(dataset_project=args.project, dataset_name=args.name, parent_datasets=args.parents) + ds = Dataset.create( + dataset_project=args.project, + dataset_name=args.name, + parent_datasets=args.parents, + dataset_version=args.version, + output_uri=args.output_uri, + ) if args.tags: ds.tags = ds.tags + args.tags - print('New dataset created id={}'.format(ds.id)) - clear_state({'id': ds.id}) + print("New dataset created id={}".format(ds.id)) + clear_state({"id": ds.id}) return ds.id diff --git a/clearml/datasets/dataset.py b/clearml/datasets/dataset.py index 469a5cf6..a101c4c2 100644 --- a/clearml/datasets/dataset.py +++ b/clearml/datasets/dataset.py @@ -1,12 +1,14 @@ +import calendar import json import os import shutil import psutil +import mimetypes from copy import deepcopy, copy from multiprocessing.pool import ThreadPool from concurrent.futures import ThreadPoolExecutor from tempfile import mkdtemp -from typing import Union, Optional, Sequence, List, Dict, Any, Mapping +from typing import Union, Optional, Sequence, List, Dict, Any, Mapping, Tuple from zipfile import ZIP_DEFLATED from attr import attrs, attrib @@ -14,21 +16,43 @@ from pathlib2 import Path from .. import Task, StorageManager, Logger from ..backend_api.session.client import APIClient +from ..backend_api import Session from ..backend_interface.task.development.worker import DevWorker -from ..backend_interface.util import mutually_exclusive, exact_match_regex, get_existing_project -from ..config import deferred_config +from ..backend_interface.util import mutually_exclusive, exact_match_regex, get_or_create_project +from ..config import deferred_config, running_remotely, get_remote_task_id from ..debugging.log import LoggerRoot from ..storage.helper import StorageHelper from ..storage.cache import CacheManager from ..storage.util import sha256sum, is_windows, md5text, format_size from ..utilities.matching import matches_any_wildcard from ..utilities.parallel import ParallelZipper +from ..utilities.version import Version try: from pathlib import Path as _Path # noqa except ImportError: _Path = None +try: + import pandas as pd +except ImportError: + pd = None + +try: + import pyarrow +except ImportError: + pyarrow = None + +try: + import fastparquet +except ImportError: + fastparquet = None + +try: + import numpy as np +except ImportError: + np = None + @attrs class FileEntry(object): @@ -75,15 +99,28 @@ class Dataset(object): __data_entry_name_prefix = 'data_' __cache_context = 'datasets' __tag = 'dataset' - __external_files_tag = 'external files' - __cache_folder_prefix = 'ds_' + __hidden_tag = "hidden" + __external_files_tag = "external files" + __cache_folder_prefix = "ds_" + __default_dataset_version = "1.0.0" __dataset_folder_template = CacheManager.set_context_folder_lookup(__cache_context, "{0}_archive_{1}") __preview_max_file_entries = 15000 __preview_max_size = 5 * 1024 * 1024 + __hyperparams_section = "Datasets" + __datasets_runtime_prop = "datasets" + __orig_datasets_runtime_prop_prefix = "orig_datasets" + __preview_tabular_table_count = deferred_config("dataset.preview.tabular.table_count", 10, transform=int) + __preview_tabular_row_count = deferred_config("dataset.preview.tabular.row_count", 10, transform=int) + __preview_media_image_count = deferred_config("dataset.preview.media.image_count", 10, transform=int) + __preview_media_video_count = deferred_config("dataset.preview.media.video_count", 10, transform=int) + __preview_media_audio_count = deferred_config("dataset.preview.media.audio_count", 10, transform=int) + __preview_media_html_count = deferred_config("dataset.preview.media.html_count", 10, transform=int) _dataset_chunk_size_mb = deferred_config("storage.dataset_chunk_size_mb", 512, transform=int) - def __init__(self, _private, task=None, dataset_project=None, dataset_name=None, dataset_tags=None): - # type: (int, Optional[Task], Optional[str], Optional[str], Optional[Sequence[str]]) -> () + def __init__( + self, _private, task=None, dataset_project=None, dataset_name=None, dataset_tags=None, dataset_version=None + ): + # type: (int, Optional[Task], Optional[str], Optional[str], Optional[Sequence[str]], Optional[str]) -> () """ Do not use directly! Use Dataset.create(...) or Dataset.get(...) instead. """ @@ -93,6 +130,9 @@ class Dataset(object): self._dataset_link_entries = {} # type: Dict[str, LinkEntry] # this will create a graph of all the dependencies we have, each entry lists it's own direct parents self._dependency_graph = {} # type: Dict[str, List[str]] + self._dataset_version = None + if dataset_version: + self._dataset_version = str(dataset_version).strip() if task: self._task_pinger = None self._created_task = False @@ -126,8 +166,17 @@ class Dataset(object): self.changed_files = {'files added': 0, 'files removed': 0, 'files modified': 0} else: self._created_task = True + dataset_project, parent_project = self._build_hidden_project_name(dataset_project, dataset_name) task = Task.create( project_name=dataset_project, task_name=dataset_name, task_type=Task.TaskTypes.data_processing) + if bool(Session.check_min_api_server_version("2.17")): + get_or_create_project(task.session, project_name=parent_project, system_tags=[self.__hidden_tag]) + get_or_create_project( + task.session, + project_name=dataset_project, + project_id=task.project, + system_tags=[self.__hidden_tag, self.__tag], + ) # set default output_uri task.output_uri = True task.set_system_tags((task.get_system_tags() or []) + [self.__tag]) @@ -158,6 +207,31 @@ class Dataset(object): # store current dataset Task self._task = task + if not self._dataset_version: + # noinspection PyProtectedMember + self._dataset_version = self._task._get_runtime_properties().get("version") + if not self._dataset_version: + _, latest_version = self._get_dataset_id_by_version(self.project, self.name) + if latest_version is not None: + # noinspection PyBroadException + try: + self._dataset_version = str(Version(latest_version).get_next_version()) + except Exception: + LoggerRoot.get_base_logger().warning( + "Could not auto-increment version {} of dataset with ID {}".format( + latest_version, self._task.id + ) + ) + runtime_props = { + "orig_dataset_name": self._task._get_runtime_properties().get("orig_dataset_name", self._task.name), + "orig_dataset_id": self._task._get_runtime_properties().get("orig_dataset_id", self._task.id), + } + if not self._dataset_version: + self._dataset_version = self.__default_dataset_version + runtime_props["version"] = self._dataset_version + self._task.set_user_properties(version=self._dataset_version) + # noinspection PyProtectedMember + self._task._set_runtime_properties(runtime_props) # store current dataset id self._id = task.id # store the folder where the dataset was downloaded to @@ -170,6 +244,8 @@ class Dataset(object): # store a cached lookup of the number of chunks each parent dataset has. # this will help with verifying we have n up-to-date partial local copy self._dependency_chunk_lookup = None # type: Optional[Dict[str, int]] + self._ds_total_size = None + self._ds_total_size_compressed = None @property def id(self): @@ -207,7 +283,7 @@ class Dataset(object): @property def project(self): # type: () -> str - return self._task.get_project_name() + return self._remove_hidden_part_from_dataset_project(self._task.get_project_name()) @property def name(self): @@ -316,12 +392,19 @@ class Dataset(object): self._dirty = True if dataset_path: dataset_path = dataset_path.lstrip("/") - if StorageManager.exists_file(source_url): - links = [source_url] - else: - if source_url[-1] != "/": - source_url = source_url + "/" - links = StorageManager.list(source_url, return_full_path=True) + # noinspection PyBroadException + try: + if StorageManager.exists_file(source_url): + links = [source_url] + else: + if source_url[-1] != "/": + source_url = source_url + "/" + links = StorageManager.list(source_url, return_full_path=True) + except Exception: + self._task.get_logger().warning( + "Could not list remote file(s) when adding {}".format(source_url) + ) + return 0 num_added = 0 num_modified = 0 for link in links: @@ -355,7 +438,10 @@ class Dataset(object): link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size ) num_modified += 1 - elif relative_path in self._dataset_link_entries and self._dataset_link_entries[relative_path].size != size: + elif ( + relative_path in self._dataset_link_entries + and self._dataset_link_entries[relative_path].size != size + ): if verbose: self._task.get_logger().report_text( "External file {} modified".format(link), @@ -560,7 +646,7 @@ class Dataset(object): self._data_artifact_name = self._get_next_data_artifact_name(self._data_artifact_name) self._task.get_logger().report_text( "Uploading dataset changes ({} files compressed to {}) to {}".format( - zip_.count, format_size(zip_.size), self.get_default_storage() + zip_.count, format_size(zip_.size, binary=True, use_b_instead_of_bytes=True), self.get_default_storage() ) ) total_size += zip_.size @@ -583,11 +669,12 @@ class Dataset(object): self._task.get_logger().report_text( "File compression and upload completed: total size {}, {} chunked stored (average size {})".format( - format_size(total_size), + format_size(total_size, binary=True, use_b_instead_of_bytes=True), chunks_count, - format_size(0 if chunks_count == 0 else total_size / chunks_count), + format_size(0 if chunks_count == 0 else total_size / chunks_count, binary=True, use_b_instead_of_bytes=True), ) ) + self._ds_total_size_compressed = total_size + self._get_total_size_compressed_parents() if chunks_count == 0: LoggerRoot.get_base_logger().info("No pending files, skipping upload.") @@ -640,9 +727,9 @@ class Dataset(object): self._add_script_call('finalize') if verbose: print('Updating statistics and genealogy') + self._report_dataset_preview() + self._report_dataset_struct() self._report_dataset_genealogy() - hashed_nodes = [self._get_dataset_id_hash(k) for k in self._dependency_graph.keys()] - self._task.comment = 'Dependencies: {}\n'.format(hashed_nodes) if self._using_current_task: self._task.flush(wait_for_uploads=True) else: @@ -655,6 +742,15 @@ class Dataset(object): return True + def set_description(self, description): + # type: (str) -> () + """ + Set description of the dataset + + :param description: Description to be set + """ + self._task.comment = description + def publish(self, raise_on_error=True): # type: (bool) -> bool """ @@ -971,7 +1067,9 @@ class Dataset(object): dataset_project=None, # type: Optional[str] dataset_tags=None, # type: Optional[Sequence[str]] parent_datasets=None, # type: Optional[Sequence[Union[str, Dataset]]] - use_current_task=False # type: bool + use_current_task=False, # type: bool + dataset_version=None, # type: Optional[str] + output_uri=None # type: Optional[str] ): # type: (...) -> "Dataset" """ @@ -986,6 +1084,17 @@ class Dataset(object): :param parent_datasets: Expand a parent dataset by adding/removing files :param use_current_task: False (default), a new Dataset task is created. If True, the dataset is created on the current Task. + :param dataset_version: Version of the new dataset. If not set, try to find the latest version + of the dataset with given `dataset_name` and `dataset_project` and auto-increment it. + :param output_uri: Location to upload the datasets file to, including preview samples. + The following are examples of ``output_uri`` values for the supported locations: + + - A shared folder: ``/mnt/share/folder`` + - S3: ``s3://bucket/folder`` + - Google Cloud Storage: ``gs://bucket-name/folder`` + - Azure Storage: ``azure://company.blob.core.windows.net/folder/`` + - Default file server: None + :return: Newly created Dataset object """ parent_datasets = [cls.get(dataset_id=p) if not isinstance(p, Dataset) else p for p in (parent_datasets or [])] @@ -993,12 +1102,12 @@ class Dataset(object): raise ValueError("Cannot inherit from a parent that was not finalized/closed") if dataset_name and not dataset_project and Task.current_task(): - LoggerRoot.get_base_logger().info('Dataset project not provided, using Current Task\'s project') + LoggerRoot.get_base_logger().info("Dataset project not provided, using Current Task's project") dataset_project = Task.current_task().get_project_name() # if dataset name + project are None, default to use current_task if dataset_project is None and dataset_name is None and not use_current_task: - LoggerRoot.get_base_logger().info('New dataset project/name not provided, storing on Current Task') + LoggerRoot.get_base_logger().info("New dataset project/name not provided, storing on Current Task") use_current_task = True # get project name @@ -1006,6 +1115,7 @@ class Dataset(object): if not parent_datasets: raise ValueError("Missing dataset project name. Could not infer project name from parent dataset.") # get project name from parent dataset + # noinspection PyProtectedMember dataset_project = parent_datasets[-1]._task.get_project_name() # merge datasets according to order @@ -1013,174 +1123,481 @@ class Dataset(object): dataset_link_entries = {} dependency_graph = {} for p in parent_datasets: + # noinspection PyProtectedMember dataset_file_entries.update(deepcopy(p._dataset_file_entries)) + # noinspection PyProtectedMember dataset_link_entries.update(deepcopy(p._dataset_link_entries)) + # noinspection PyProtectedMember dependency_graph.update(deepcopy(p._dependency_graph)) instance = cls(_private=cls.__private_magic, dataset_project=dataset_project, dataset_name=dataset_name, dataset_tags=dataset_tags, - task=Task.current_task() if use_current_task else None) + task=Task.current_task() if use_current_task else None, + dataset_version=dataset_version) + if output_uri and not Task._offline_mode: + instance._task.output_uri = output_uri instance._using_current_task = use_current_task - instance._task.get_logger().report_text('Dataset created', print_console=False) instance._dataset_file_entries = dataset_file_entries instance._dataset_link_entries = dataset_link_entries instance._dependency_graph = dependency_graph + # noinspection PyProtectedMember instance._dependency_graph[instance._id] = [p._id for p in parent_datasets] + # noinspection PyProtectedMember instance._serialize() + # noinspection PyProtectedMember + instance._report_dataset_struct() + # noinspection PyProtectedMember + instance._task.get_logger().report_text( + "ClearML results page: {}".format(instance._task.get_output_log_web_page()) + ) + instance._task.get_logger().report_text( + "ClearML dataset page: {}".format( + "{}/datasets/simple/{}/experiments/{}".format( + instance._task._get_app_server(), + instance._task.project if instance._task.project is not None else "*", + instance._task.id, + ) + ) + ) + # noinspection PyProtectedMember instance._task.flush(wait_for_uploads=True) + # noinspection PyProtectedMember cls._set_project_system_tags(instance._task) return instance + def _get_total_size_compressed_parents(self): + # type: () -> (int) + """ + :return: the compressed size of the files contained in the parent datasets + """ + parents = self._get_parents() + if not parents: + return 0 + runtime_tasks = Task._query_tasks( + task_ids=parents, + only_fields=["runtime.ds_total_size_compressed"], + search_hidden=True, + _allow_extra_fields_=True, + ) + compressed_size = 0 + for runtime_task in runtime_tasks: + compressed_size += runtime_task.runtime.get("ds_total_size_compressed", 0) + return compressed_size + @classmethod - def delete(cls, dataset_id=None, dataset_project=None, dataset_name=None, force=False): - # type: (Optional[str], Optional[str], Optional[str], bool) -> () + def _raise_on_dataset_used(cls, dataset_id): + # type: (str) -> () """ - Delete a dataset, raise exception if dataset is used by other dataset versions. - Use force=True to forcefully delete the dataset + Raise an exception if the given dataset is being used - :param dataset_id: Dataset id to delete - :param dataset_project: Project containing the dataset - :param dataset_name: Naming the new dataset - :param force: If True delete even if other datasets depend on the specified dataset version + :param dataset_id: ID of the dataset potentially being used """ - mutually_exclusive(dataset_id=dataset_id, dataset_project=dataset_project) - mutually_exclusive(dataset_id=dataset_id, dataset_name=dataset_name) - if not dataset_id: - tasks = Task.get_tasks( - project_name=dataset_project, + # noinspection PyProtectedMember + dependencies = Task._query_tasks( + system_tags=[cls.__tag], + type=[str(Task.TaskTypes.data_processing)], + only_fields=["created", "id", "name"], + search_text="{}".format(cls._get_dataset_id_hash(dataset_id)), + search_hidden=True, + _allow_extra_fields_=True, + ) + if dependencies: + dependencies = [d for d in dependencies if d.id != dataset_id] + if dependencies: + raise ValueError("Dataset id={} is used by datasets: {}".format(dataset_id, [d.id for d in dependencies])) + + @classmethod + def _get_dataset_ids_respecting_params( + cls, + dataset_id=None, + dataset_project=None, + dataset_name=None, + force=False, + dataset_version=None, + entire_dataset=None, + action=None + ): + if dataset_id: + return [dataset_id] + if entire_dataset: + if not force: + if action: + raise ValueError("Can only {} entire dataset if force is True".format(action)) + raise ValueError("Could not fetch ids for requested datasets") + hidden_dataset_project, _ = cls._build_hidden_project_name(dataset_project, dataset_name) + # noinspection PyProtectedMember + datasets = Task._query_tasks( + project_name=[hidden_dataset_project], task_name=exact_match_regex(dataset_name) if dataset_name else None, - task_filter=dict( - system_tags=[cls.__tag], - type=[str(Task.TaskTypes.data_processing)], - page_size=2, page=0,) + system_tags=[cls.__tag], + only_fields=["id"], + search_hidden=True, + _allow_extra_fields_=True, + ) + return [d.id for d in datasets] + dataset_id = cls._find_dataset_id( + dataset_project=dataset_project, dataset_name=dataset_name, dataset_version=dataset_version + ) + if not dataset_id: + raise ValueError( + "Could not find dataset to move to another project with project={} name={} version={}".format( + dataset_project, dataset_name, dataset_version + ) ) - if not tasks: - raise ValueError("Dataset project={} name={} could not be found".format(dataset_project, dataset_name)) - if len(tasks) > 1: - raise ValueError("Too many datasets matching project={} name={}".format(dataset_project, dataset_name)) - dataset_id = tasks[0].id - # check if someone is using the datasets if not force: - # todo: use Task runtime_properties - # noinspection PyProtectedMember - dependencies = Task._query_tasks( - system_tags=[cls.__tag], - type=[str(Task.TaskTypes.data_processing)], - only_fields=['created', 'id', 'name'], - search_text='{}'.format(cls._get_dataset_id_hash(dataset_id)) - ) - # filter us out - if dependencies: - dependencies = [d for d in dependencies if d.id != dataset_id] - if dependencies: - raise ValueError("Dataset id={} is used by datasets: {}".format( - dataset_id, [d.id for d in dependencies])) + cls._raise_on_dataset_used(dataset_id) + return [dataset_id] + + @classmethod + def delete( + cls, + dataset_id=None, # Optional[str] + dataset_project=None, # Optional[str] + dataset_name=None, # Optional[str] + force=False, # bool + dataset_version=None, # Optional[str] + entire_dataset=False, # bool + ): + # type: (...) -> () + """ + Delete the dataset(s). If multiple datasets match the parameters, + raise an Exception or move the entire dataset if `entire_dataset` is True and `force` is True + + :param dataset_id: The ID of the dataset(s) to be deleted + :param dataset_project: The project the dataset(s) to be deletedd belongs to + :param dataset_name: The name of the dataset(s) (before renaming) + :param force: If True, deleted the dataset(s) even when being used. Also required to be set to + True when `entire_dataset` is set. + :param dataset_version: The version of the dataset(s) to be deletedd + :param entire_dataset: If True, deleted all all datasets that match the given `dataset_project`, + `dataset_name`, `dataset_version`. Note that `force` has to be True if this paramer is True + """ + if not any([dataset_id, dataset_project, dataset_name]): + raise ValueError("Dataset deletion criteria not met. Didn't provide id/name/project correctly.") + + mutually_exclusive(dataset_id=dataset_id, dataset_project=dataset_project) + mutually_exclusive(dataset_id=dataset_id, dataset_name=dataset_name) - client = APIClient() - # notice the force here is a must, since the state is never draft # noinspection PyBroadException try: - t = client.tasks.get_by_id(dataset_id) - except Exception: - t = None - if not t: - raise ValueError("Dataset id={} could not be found".format(dataset_id)) - if str(t.type) != str(Task.TaskTypes.data_processing) or cls.__tag not in t.system_tags: - raise ValueError("Dataset id={} is not of type Dataset".format(dataset_id)) + dataset_ids = cls._get_dataset_ids_respecting_params( + dataset_id=dataset_id, + dataset_project=dataset_project, + dataset_name=dataset_name, + force=force, + dataset_version=dataset_version, + entire_dataset=entire_dataset, + action="delete", + ) + except Exception as e: + LoggerRoot.get_base_logger().warning("Error: {}".format(str(e))) + return + client = APIClient() + for dataset_id in dataset_ids: + task = Task.get_task(task_id=dataset_id) + if str(task.task_type) != str(Task.TaskTypes.data_processing) or cls.__tag not in ( + task.get_system_tags() or [] + ): + LoggerRoot.get_base_logger().warning("Task id={} is not of type Dataset".format(dataset_id)) + continue + for artifact in task.artifacts.values(): + h = StorageHelper.get(artifact.url) + # noinspection PyBroadException + try: + h.delete(artifact.url) + except Exception as ex: + LoggerRoot.get_base_logger().warning( + "Failed deleting remote file '{}': {}".format(artifact.url, ex) + ) + # this force is different than the force passed in Dataset.delete + # it indicated that we want delete a non-draft task + client.tasks.delete(task=dataset_id, force=True) - task = Task.get_task(task_id=dataset_id) - # first delete all the artifacts from the dataset - for artifact in task.artifacts.values(): - h = StorageHelper.get(artifact.url) + @classmethod + def rename( + cls, + new_name, # str + dataset_id=None, # Optional[str] + dataset_project=None, # Optional[str] + dataset_name=None, # Optional[str] + dataset_version=None, # Optional[str] + entire_dataset=False, # bool + force=False, # bool + ): + # type: (...) -> () + """ + Rename the dataset(s). If multiple datasets match the parameters, + raise an Exception or move the entire dataset if `entire_dataset` is True and `force` is True + + :param new_name: The new name of the dataset(s) to be renamed + :param dataset_id: The ID of the dataset(s) to be rename + :param dataset_project: The project the dataset(s) to be renamed belongs to + :param dataset_name: The name of the dataset(s) (before renaming) + :param dataset_version: The version of the dataset(s) to be renamed + :param entire_dataset: If True, rename all all datasets that match the given `dataset_project`, + `dataset_name`, `dataset_version`. Note that `force` has to be True if this paramer is True + :param force: If True, rename the dataset(s) even when being used. Also required to be set to + True when `entire_dataset` is set. + """ + if not any([dataset_id, dataset_project, dataset_name]): + raise ValueError("Dataset rename criteria not met. Didn't provide id/name/project correctly.") + + mutually_exclusive(dataset_id=dataset_id, dataset_project=dataset_project) + mutually_exclusive(dataset_id=dataset_id, dataset_name=dataset_name) + + # noinspection PyBroadException + try: + dataset_ids = cls._get_dataset_ids_respecting_params( + dataset_id=dataset_id, + dataset_project=dataset_project, + dataset_name=dataset_name, + force=force, + dataset_version=dataset_version, + entire_dataset=entire_dataset, + action="rename", + ) + except Exception as e: + LoggerRoot.get_base_logger().warning("Error: {}".format(str(e))) + return + for dataset_id in dataset_ids: + task = Task.get_task(task_id=dataset_id) + if not task.rename(new_name): + LoggerRoot.get_base_logger().warning("Could not rename dataset with ID {}".format(dataset_id)) + continue + cls._move_to_project_aux(task, task.get_project_name(), new_name) + + @classmethod + def _move_to_project_aux(cls, task, new_project, dataset_name): + """ + Move a task to another project. Helper function, useful when the task and name of + the corresponding dataset are known. + + :param task: A dataset's task + :param new_project: New project to move the dataset to + :param dataset_name: Name of the dataset + + :return: True if the dataset was moved and False otherwise + """ + hidden_dataset_project_, parent_project = cls._build_hidden_project_name(new_project, dataset_name) + get_or_create_project(task.session, project_name=parent_project, system_tags=[cls.__hidden_tag]) + return task.move_to_project(new_project_name=hidden_dataset_project_, system_tags=[cls.__hidden_tag, cls.__tag]) + + @classmethod + def move_to_project( + cls, + new_project, # str + dataset_id=None, # Optional[str] + dataset_project=None, # Optional[str] + dataset_name=None, # Optional[str] + dataset_version=None, # Optional[str] + entire_dataset=False, # bool + force=False, # bool + ): + # type: (...) -> () + """ + Move the dataset(s) to a another project. If multiple datasets match the parameters, + raise an Exception or move the entire dataset if `entire_dataset` is True and `force` is True + + :param new_project: New project to move the dataset(s) to + :param dataset_id: ID of the datasets(s) to move to new project + :param dataset_project: Project of the dataset(s) to move to new project + :param dataset_name: Name of the dataset(s) to move to new project + :param dataset_version: Version of the dataset(s) to move to new project + :param entire_dataset: If True, move all datasets that match the given `dataset_project`, + `dataset_name`, `dataset_version`. Note that `force` has to be True if this paramer is True + :param force: If True, move the dataset(s) even when being used. Also required to be set to + True when `entire_dataset` is set. + """ + if not any([dataset_id, dataset_project, dataset_name]): + raise ValueError("Dataset move criteria not met. Didn't provide id/name/project correctly.") + + mutually_exclusive(dataset_id=dataset_id, dataset_project=dataset_project) + mutually_exclusive(dataset_id=dataset_id, dataset_name=dataset_name) + + # noinspection PyBroadException + try: + dataset_ids = cls._get_dataset_ids_respecting_params( + dataset_id=dataset_id, + dataset_project=dataset_project, + dataset_name=dataset_name, + force=force, + dataset_version=dataset_version, + entire_dataset=entire_dataset, + action="move to project", + ) + except Exception as e: + LoggerRoot.get_base_logger().warning("Error: {}".format(str(e))) + return + for dataset_id in dataset_ids: # noinspection PyBroadException try: - h.delete(artifact.url) - except Exception as ex: - LoggerRoot.get_base_logger().warning('Failed deleting remote file \'{}\': {}'.format( - artifact.url, ex)) - - # now delete the actual task - client.tasks.delete(task=dataset_id, force=True) + dataset = cls.get(dataset_id=dataset_id, _dont_propulate_runtime_props=True) + except Exception: + dataset = None + if not dataset: + LoggerRoot.get_base_logger().warning("Could not find dataset to move to another project") + continue + if not bool(Session.check_min_api_server_version("2.17")): + LoggerRoot.get_base_logger().warning( + "Could not move dataset to another project because API version < 2.17" + ) + continue + cls._move_to_project_aux(dataset._task, new_project, dataset.name) @classmethod def get( - cls, - dataset_id=None, # type: Optional[str] - dataset_project=None, # type: Optional[str] - dataset_name=None, # type: Optional[str] - dataset_tags=None, # type: Optional[Sequence[str]] - only_completed=False, # type: bool - only_published=False, # type: bool - auto_create=False, # type: bool - writable_copy=False # type: bool + cls, + dataset_id=None, # type: Optional[str] + dataset_project=None, # type: Optional[str] + dataset_name=None, # type: Optional[str] + dataset_tags=None, # type: Optional[Sequence[str]] + only_completed=False, # type: bool + only_published=False, # type: bool + auto_create=False, # type: bool + writable_copy=False, # type: bool + dataset_version=None, # type: Optional[str] + alias=None, # type: Optional[str] + overridable=False, # type: bool + **kwargs ): # type: (...) -> "Dataset" """ - Get a specific Dataset. If only dataset_project is given, return the last Dataset in the Dataset project + Get a specific Dataset. If multiple datasets are found, the most recent one is returned - :param dataset_id: Requested Dataset ID - :param dataset_project: Requested Dataset project name - :param dataset_name: Requested Dataset name - :param dataset_tags: Requested Dataset tags (list of tag strings) + :param dataset_id: Requested dataset ID + :param dataset_project: Requested dataset project name + :param dataset_name: Requested dataset name + :param dataset_tags: Requested dataset tags (list of tag strings) :param only_completed: Return only if the requested dataset is completed or published :param only_published: Return only if the requested dataset is published :param auto_create: Create new dataset if it does not exist yet :param writable_copy: Get a newly created mutable dataset with the current one as its parent, so new files can added to the instance. + :param dataset_version: Requested version of the Dataset + :param alias: Alias of the dataset. If set, the 'alias : dataset ID' key-value pair + will be set under the hyperparameters section 'Datasets' + :param overridable: If True, allow overriding the dataset ID with a given alias in the + hyperparameters section. Useful when one wants to change the dataset used when running + a task remotely. If the alias parameter is not set, this parameter has no effect + :return: Dataset object """ - mutually_exclusive(dataset_id=dataset_id, dataset_project=dataset_project, _require_at_least_one=False) - mutually_exclusive(dataset_id=dataset_id, dataset_name=dataset_name, _require_at_least_one=False) if not any([dataset_id, dataset_project, dataset_name, dataset_tags]): raise ValueError("Dataset selection criteria not met. Didn't provide id/name/project/tags correctly.") - if auto_create and not get_existing_project( - session=Task._get_default_session(), project_name=dataset_project - ): - tasks = [] - else: - tasks = Task.get_tasks( - task_ids=[dataset_id] if dataset_id else None, - project_name=dataset_project, - task_name=exact_match_regex(dataset_name) if dataset_name else None, - tags=dataset_tags, - task_filter=dict( - system_tags=[cls.__tag, '-archived'], order_by=['-created'], + mutually_exclusive(dataset_id=dataset_id, dataset_project=dataset_project, _require_at_least_one=False) + mutually_exclusive(dataset_id=dataset_id, dataset_name=dataset_name, _require_at_least_one=False) + + current_task = Task.current_task() + + def get_instance(dataset_id_): + task = Task.get_task(task_id=dataset_id_) + if task.status == "created": + raise ValueError("Dataset id={} is in draft mode, delete and recreate it".format(task.id)) + force_download = False if task.status in ("stopped", "published", "closed", "completed") else True + if cls.__state_entry_name in task.artifacts: + local_state_file = StorageManager.get_local_copy( + remote_url=task.artifacts[cls.__state_entry_name].url, + cache_context=cls.__cache_context, + extract_archive=False, + name=task.id, + force_download=force_download, + ) + if not local_state_file: + raise ValueError("Could not load Dataset id={} state".format(task.id)) + else: + # we could not find the serialized state, start empty + local_state_file = {} + instance_ = cls._deserialize(local_state_file, task) + # remove the artifact, just in case + if force_download and local_state_file: + os.unlink(local_state_file) + return instance_ + + def finish_dataset_get(dataset, orig_dataset_id): + # noinspection PyProtectedMember + dataset_id_ = dataset._id + if not current_task or kwargs.get("_dont_populate_runtime_props"): + return dataset + if alias: + # noinspection PyProtectedMember + current_task._set_parameters( + {"{}/{}".format(cls.__hyperparams_section, alias): dataset_id_}, __update=True + ) + # noinspection PyProtectedMember + runtime_props = current_task._get_runtime_properties() + used_datasets = list(runtime_props.get(cls.__datasets_runtime_prop, [])) + runtime_props_to_set = {} + if dataset_id_ not in used_datasets: + used_datasets.append(dataset_id_) + runtime_props_to_set.update({cls.__datasets_runtime_prop: used_datasets}) + orig_dataset = get_instance(orig_dataset_id) + # noinspection PyProtectedMember + if orig_dataset._dataset_version: + runtime_props_to_set.update( + { + "{}.{}/{}".format( + cls.__orig_datasets_runtime_prop_prefix, orig_dataset.name, orig_dataset._dataset_version + ): orig_dataset_id + } + ) + else: + runtime_props_to_set.update( + {"{}.{}".format(cls.__orig_datasets_runtime_prop_prefix, orig_dataset.name): orig_dataset_id} + ) + if runtime_props_to_set: + # noinspection PyProtectedMember + current_task._set_runtime_properties(runtime_props_to_set) + return dataset + + if not dataset_id: + dataset_id = cls._find_dataset_id( + dataset_project=dataset_project, + dataset_name=dataset_name, + dataset_version=dataset_version, + raise_on_error=False, + dataset_tags=dataset_tags, + dataset_filter=dict( + system_tags=[cls.__tag, "-archived"], + order_by=["-created"], type=[str(Task.TaskTypes.data_processing)], - page_size=1, page=0, - status=['published'] if only_published else - ['published', 'completed', 'closed'] if only_completed else None) + page_size=1, + page=0, + status=["published"] + if only_published + else ["published", "completed", "closed"] + if only_completed + else None, + ), ) + if not dataset_id and not auto_create: + raise ValueError( + "Could not find Dataset {} {}".format( + "id" if dataset_id else "project/name/version", + dataset_id if dataset_id else (dataset_project, dataset_name, dataset_version), + ) + ) + orig_dataset_id_ = dataset_id - if not tasks: - if auto_create: - instance = Dataset.create(dataset_name=dataset_name, dataset_project=dataset_project, - dataset_tags=dataset_tags) - return instance - raise ValueError('Could not find Dataset {} {}'.format( - 'id' if dataset_id else 'project/name', - dataset_id if dataset_id else (dataset_project, dataset_name))) - task = tasks[0] - if task.status == 'created': - raise ValueError('Dataset id={} is in draft mode, delete and recreate it'.format(task.id)) - force_download = False if task.status in ('stopped', 'published', 'closed', 'completed') else True - if cls.__state_entry_name in task.artifacts: - local_state_file = StorageManager.get_local_copy( - remote_url=task.artifacts[cls.__state_entry_name].url, cache_context=cls.__cache_context, - extract_archive=False, name=task.id, force_download=force_download) - if not local_state_file: - raise ValueError('Could not load Dataset id={} state'.format(task.id)) - else: - # we could not find the serialized state, start empty - local_state_file = {} - - instance = cls._deserialize(local_state_file, task) - # remove the artifact, just in case - if force_download and local_state_file: - os.unlink(local_state_file) + if alias and overridable and running_remotely(): + remote_task = Task.get_task(task_id=get_remote_task_id()) + dataset_id = remote_task.get_parameter("{}/{}".format(cls.__hyperparams_section, alias)) + if not dataset_id: + if not auto_create: + raise ValueError( + "Could not find Dataset {} {}".format( + "id" if dataset_id else "project/name/version", + dataset_id if dataset_id else (dataset_project, dataset_name, dataset_version), + ) + ) + instance = Dataset.create( + dataset_name=dataset_name, dataset_project=dataset_project, dataset_tags=dataset_tags + ) + return finish_dataset_get(instance, instance._id) + instance = get_instance(dataset_id) # Now we have the requested dataset, but if we want a mutable copy instead, we create a new dataset with the # current one as its parent. So one can add files to it and finalize as a new version. if writable_copy: @@ -1190,9 +1607,9 @@ class Dataset(object): dataset_tags=instance.tags, parent_datasets=[instance.id], ) - return writeable_instance + return finish_dataset_get(writeable_instance, writeable_instance._id) - return instance + return finish_dataset_get(instance, orig_dataset_id_) def get_logger(self): # type: () -> Logger @@ -1297,7 +1714,9 @@ class Dataset(object): type=[str(Task.TaskTypes.data_processing)], tags=tags or None, status=['stopped', 'published', 'completed', 'closed'] if only_completed else None, - only_fields=['created', 'id', 'name', 'project', 'tags'] + only_fields=['created', 'id', 'name', 'project', 'tags'], + search_hidden=True, + _allow_extra_fields_=True ) project_ids = {d.project for d in datasets} # noinspection PyProtectedMember @@ -1451,7 +1870,38 @@ class Dataset(object): """ self._update_dependency_graph() + total_size = 0 + added_files_count = 0 + added_files_size = 0 + modified_files_count = 0 + modified_files_size = 0 + removed_files_count = 0 + removed_files_size = 0 + parent_datasets_ids = self._dependency_graph[self._id] + parent_file_entries = {} + for parent_dataset_id in parent_datasets_ids: + if parent_dataset_id == self._id: + continue + parent_dataset = self.get(parent_dataset_id) + parent_file_entries.update(parent_dataset._dataset_file_entries) + # we have to do this after we update the parent_file_entries because we might + # have duplicate file entries + for parent_file_entry_key, parent_file_entry_value in parent_file_entries.items(): + if parent_file_entry_key not in self._dataset_file_entries: + removed_files_count += 1 + removed_files_size -= parent_file_entry_value.size + for file in self._dataset_file_entries.values(): + total_size += file.size + if file.parent_dataset_id == self._id: + if file.relative_path in parent_file_entries: + modified_files_count += 1 + modified_files_size += file.size - parent_file_entries[file.relative_path].size + else: + added_files_count += 1 + added_files_size += file.size state = dict( + file_count=len(self._dataset_file_entries), + total_size=total_size, dataset_file_entries=[f.as_dict() for f in self._dataset_file_entries.values()], dataset_link_entries=[link.as_dict() for link in self._dataset_link_entries.values()], dependency_graph=self._dependency_graph, @@ -1459,20 +1909,44 @@ class Dataset(object): dirty=self._dirty, ) if update_dependency_chunk_lookup: - state['dependency_chunk_lookup'] = self._build_dependency_chunk_lookup() + state["dependency_chunk_lookup"] = self._build_dependency_chunk_lookup() - modified_files = [f['size'] for f in state['dataset_file_entries'] if f.get('parent_dataset_id') == self._id] - preview = \ - 'Dataset state\n' \ - 'Files added/modified: {0} - total size {1}\n' \ - 'Current dependency graph: {2}\n'.format( - len(modified_files), format_size(sum(modified_files)), - json.dumps(self._dependency_graph, indent=2, sort_keys=True)) + preview = ( + "Dataset state\n" + "Files added/modified: {0} - total size {1}\n" + "Current dependency graph: {2}\n".format( + modified_files_count + added_files_count, + format_size( + added_files_size + modified_files_size, + binary=True, + use_nonbinary_notation=True, + use_b_instead_of_bytes=True, + ), + json.dumps(self._dependency_graph, indent=2, sort_keys=True), + ) + ) # store as artifact of the Task and add the amount of files added or removed as metadata, so we can use those # later to create the table self._task.upload_artifact( - name=self.__state_entry_name, artifact_object=state, preview=preview, wait_on_upload=True, - metadata=self.changed_files + name=self.__state_entry_name, + artifact_object=state, + preview=preview, + wait_on_upload=True, + metadata=self.changed_files, + ) + self._ds_total_size = total_size + # noinspection PyProtectedMember + self._task._set_runtime_properties( + { + "ds_file_count": len(self._dataset_file_entries), + "ds_link_count": len(self._dataset_link_entries), + "ds_total_size": self._ds_total_size, + "ds_total_size_compressed": self._ds_total_size_compressed, + "ds_change_add": added_files_count, + "ds_change_remove": removed_files_count, + "ds_change_modify": modified_files_count, + "ds_change_size": added_files_size + modified_files_size + removed_files_size, + } ) def update_changed_files(self, num_files_added=None, num_files_modified=None, num_files_removed=None): @@ -1972,14 +2446,19 @@ class Dataset(object): nodes = self._get_dependencies_by_order(include_unused=True, include_current=True) # dataset name lookup # noinspection PyProtectedMember - node_names = {t.id: t.name for t in Task._query_tasks(task_ids=nodes, only_fields=['id', 'name'])} + node_names = { + t.id: t.name + for t in Task._query_tasks( + task_ids=nodes, only_fields=["id", "name"], search_hidden=True, _allow_extra_fields_=True + ) + } node_details = {} # Generate table and details table_values = [["Dataset id", "name", "removed", "modified", "added", "size"]] for node in nodes: count = 0 size = 0 - for f in self._dataset_file_entries.values(): + for f in list(self._dataset_file_entries.values()) + list(self._dataset_link_entries.values()): if f.parent_dataset_id == node: count += 1 size += f.size @@ -1998,9 +2477,22 @@ class Dataset(object): added = int(node_state_metadata.get('files added', 0)) modified = int(node_state_metadata.get('files modified', 0)) - table_values += [[node, node_names.get(node, ''), - removed, modified, added, format_size(size)]] - node_details[node] = [removed, modified, added, format_size(size)] + table_values += [ + [ + node, + node_names.get(node, ""), + removed, + modified, + added, + format_size(size, binary=True, use_nonbinary_notation=True, use_b_instead_of_bytes=True), + ] + ] + node_details[node] = [ + removed, + modified, + added, + format_size(size, binary=True, use_nonbinary_notation=True, use_b_instead_of_bytes=True), + ] # create DAG visited = [] @@ -2058,43 +2550,183 @@ class Dataset(object): # report genealogy if fig: - self._task.get_logger().report_plotly( - title='Dataset Genealogy', series='', iteration=0, figure=fig) + self._task.get_logger().report_plotly(title="__Dataset Genealogy", series="", iteration=0, figure=fig) # report detailed table self._task.get_logger().report_table( - title='Dataset Summary', series='Details', iteration=0, table_plot=table_values, - extra_layout={"title": "Files by parent dataset id"}) + title="__Dataset Summary", + series="Details", + iteration=0, + table_plot=table_values, + extra_layout={"title": "Files by parent dataset id"}, + ) # report the detailed content of the dataset as configuration, # this allows for easy version comparison in the UI - dataset_details = None - dataset_details_header = None - dataset_details_header_template = 'File Name ({} files) - File Size (total {}) - Hash (SHA2)\n' - if len(self._dataset_file_entries) < self.__preview_max_file_entries: - file_entries = sorted(self._dataset_file_entries.values(), key=lambda x: x.relative_path) - dataset_details_header = dataset_details_header_template.format( - len(file_entries), format_size(sum(f.size for f in file_entries)) - ) - dataset_details = dataset_details_header + \ - '\n'.join('{} - {} - {}'.format(f.relative_path, f.size, f.hash) for f in file_entries) - - # too large to store - if not dataset_details or len(dataset_details) > self.__preview_max_size: - if not dataset_details_header: - dataset_details_header = dataset_details_header_template.format( + dataset_details = "" + preview_index = 0 + file_entries = sorted(list(self._dataset_file_entries.values())) + sorted( + list(self._dataset_link_entries.values()), key=lambda x: x.link + ) + while preview_index < self.__preview_max_file_entries and preview_index < len(file_entries): + file = file_entries[preview_index] + if dataset_details: + dataset_details += "\n" + file_name = file.relative_path + if hasattr(file, "link"): + file_name = file.link + dataset_details += "{}, {}, {}".format( + file_name, + file.size if file.size is not None and not hasattr(file, "link") else "", + file.hash if file.hash else "", + ) + preview_index += 1 + if not self._ds_total_size: + self._report_dataset_struct() + if not self._dataset_link_entries: + dataset_details = ( + "File Name ({} files), File Size (total {}), Hash (SHA2)\n".format( len(self._dataset_file_entries), - format_size(sum(f.size for f in self._dataset_file_entries.values())) + format_size( + self._ds_total_size, binary=True, use_nonbinary_notation=True, use_b_instead_of_bytes=True + ), ) - dataset_details = dataset_details_header + 'Dataset content is too large to preview' + + dataset_details + ) + else: + dataset_details = ( + "File Name ({} files + {} links), File Size (total {}), Hash (SHA2)\n".format( + len(self._dataset_file_entries), + len(self._dataset_link_entries), + format_size( + self._ds_total_size, binary=True, use_nonbinary_notation=True, use_b_instead_of_bytes=True + ), + ) + + dataset_details + ) # noinspection PyProtectedMember self._task._set_configuration( - name='Dataset Content', description='Dataset content preview', - config_type='read-only', - config_text=dataset_details + name="Dataset Content", + description="Dataset content preview", + config_type="CSV", + config_text=dataset_details, ) + def _report_dataset_struct(self): + self._update_dependency_graph() + current_index = 0 + dataset_struct = {} + indices = {} + dependency_graph_ex = deepcopy(self._dependency_graph) + for parents in self._dependency_graph.values(): + for parent in parents: + if parent not in self._dependency_graph: + dependency_graph_ex[parent] = [] + for id_, parents in dependency_graph_ex.items(): + task = Task.get_task(task_id=id_) + dataset_struct_entry = {"job_id": id_, "status": task.status} + # noinspection PyProtectedMember + last_update = task._get_last_update() + if last_update: + last_update = calendar.timegm(last_update.timetuple()) + dataset_struct_entry["last_update"] = last_update + dataset_struct_entry["parents"] = parents + dataset_struct_entry["job_size"] = task._get_runtime_properties().get("ds_total_size") + dataset_struct_entry["name"] = task.name + dataset_struct_entry["version"] = task._get_runtime_properties().get("version") + dataset_struct[str(current_index)] = dataset_struct_entry + indices[id_] = str(current_index) + current_index += 1 + for id_, parents in dependency_graph_ex.items(): + dataset_struct[indices[id_]]["parents"] = [indices[p] for p in parents] + # noinspection PyProtectedMember + self._task._set_configuration( + name="Dataset Struct", + description="Structure of the dataset", + config_type="json", + config_text=json.dumps(dataset_struct, indent=2), + ) + + def _report_dataset_preview(self): + self.__preview_tabular_row_count = int(self.__preview_tabular_row_count) + + def convert_to_tabular_artifact(file_path_, file_extension_, compression_=None): + # noinspection PyBroadException + try: + if file_extension_ == ".csv" and pd: + return pd.read_csv( + file_path_, + nrows=self.__preview_tabular_row_count, + compression=compression_.lstrip(".") if compression_ else None, + ) + elif file_extension_ == ".parquet" or file_extension_ == ".parq": + if pyarrow: + pf = pyarrow.parquet.ParquetFile(file_path_) + preview_rows = next(pf.iter_batches(batch_size=self.__preview_tabular_row_count)) + return pyarrow.Table.from_batches([preview_rows]).to_pandas() + elif fastparquet: + return fastparquet.ParquetFile(file_path_).head(self.__preview_tabular_row_count).to_pandas() + elif (file_extension_ == ".npz" or file_extension_ == ".npy") and np: + return pd.DataFrame(np.loadtxt(file_path_, max_rows=self.__preview_tabular_row_count)) + except Exception: + pass + return None + + compression_extensions = {".gz", ".bz2", ".zip", ".xz", ".zst"} + tabular_extensions = {".csv", ".parquet", ".parq", ".npz", ".npy"} + preview_tables_count, preview_image_count, preview_video_count, preview_audio_count, preview_html_count = ( + 0, + 0, + 0, + 0, + 0, + ) + for file in self._dataset_file_entries.values(): + if file.local_path: + file_path = file.local_path + else: + file_path = file.relative_path + if not os.path.isfile(file_path): + continue + file_name = os.path.basename(file_path) + _, file_extension = os.path.splitext(file_path) + compression = None + if file_extension in compression_extensions: + compression = file_extension + _, file_extension = os.path.splitext(file_path[: -len(file_extension)]) + if file_extension in tabular_extensions and preview_tables_count >= self.__preview_tabular_table_count: + continue + artifact = convert_to_tabular_artifact(file_path, file_extension, compression) + if artifact is not None: + # noinspection PyBroadException + try: + self._task.get_logger().report_media( + "Tables", file_name, stream=artifact.to_csv(index=False), file_extension=".txt" + ) + preview_tables_count += 1 + except Exception: + pass + continue + if compression: + continue + guessed_type = mimetypes.guess_type(file_path) + if not guessed_type or not guessed_type[0]: + continue + guessed_type = guessed_type[0] + if guessed_type.startswith("image") and preview_image_count < self.__preview_media_image_count: + self._task.get_logger().report_media("Images", file_name, local_path=file_path) + preview_image_count += 1 + elif guessed_type.startswith("video") and preview_video_count < self.__preview_media_video_count: + self._task.get_logger().report_media("Videos", file_name, local_path=file_path) + preview_video_count += 1 + elif guessed_type.startswith("audio") and preview_audio_count < self.__preview_media_audio_count: + self._task.get_logger().report_media("Audio", file_name, local_path=file_path) + preview_audio_count += 1 + elif guessed_type == "text/html" and preview_html_count < self.__preview_media_html_count: + self._task.get_logger().report_media("HTML", file_name, local_path=file_path) + preview_html_count += 1 + @classmethod def _set_project_system_tags(cls, task): from ..backend_api.services import projects @@ -2261,6 +2893,139 @@ class Dataset(object): return chunk_selection + @classmethod + def _get_dataset_id_by_version(cls, dataset_project, dataset_name, dataset_version="latest"): + # type: (str, str, Optional[str]) -> Tuple[str, str] + """ + Gets the dataset ID that matches a project, name and a version. + + :param dataset_project: Corresponding dataset project + :param dataset_name: Corresponding dataset name + :param dataset_version: The version of the corresponding dataset. If set to 'latest', + then get the dataset with the latest version + + :return: A tuple containing 2 strings: the dataset ID and the version of that dataset + """ + # making sure we have the right project name here + hidden_dataset_project, _ = cls._build_hidden_project_name(dataset_project, dataset_name) + # noinspection PyProtectedMember + datasets = Task._query_tasks( + project_name=[hidden_dataset_project], + task_name=exact_match_regex(dataset_name) if dataset_name else None, + system_tags=[cls.__tag], + only_fields=["id", "runtime.version"], + search_hidden=True, + _allow_extra_fields_=True + ) + result_dataset = None + for dataset in datasets: + current_version = dataset.runtime.get("version") + if not current_version: + continue + if dataset_version == "latest" and ( + not result_dataset or Version(result_dataset.runtime["version"]) < Version(current_version) + ): + result_dataset = dataset + elif dataset_version == current_version: + result_dataset = dataset + break + if not result_dataset: + return None, None + return result_dataset.id, result_dataset.runtime.get("version") + + @classmethod + def _find_dataset_id( + cls, + dataset_project=None, # Optional[str] + dataset_name=None, # Optional[str] + dataset_version=None, # Optional[str] + dataset_tags=None, # Optional[Sequence[str]] + dataset_filter=None, # Optional[dict] + raise_on_error=True, # bool + ): + # type: (...) -> Optional[str] + """ + Find a dataset ID based on the given parameters + + :param dataset_project: Project of the dataset searched + :param dataset_name: Name of the dataset searched + :param dataset_verion: Version of the dataset searched + :param dataset_tags: List of tags of the dataset searched + :param dataset_filter: Filter the found datasets based on the criteria present in this dict. + Has the same behaviour as `task_filter` parameter in Task.get_tasks. If None, + the filter will have parameters set specific to datasets. + :param raise_on_error: If True and no dataset is found or more than 1 dataset is found, + raise an Exception. + """ + if not dataset_version: + if dataset_filter is None: + dataset_filter = dict( + system_tags=[cls.__tag], + type=[str(Task.TaskTypes.data_processing)], + page_size=2, + page=0, + ) + dataset_filter["search_hidden"] = True + dataset_filter["_allow_extra_fields_"] = True + hidden_dataset_project, _ = cls._build_hidden_project_name(dataset_project, dataset_name) + tasks = Task.get_tasks( + project_name=hidden_dataset_project, + task_name=exact_match_regex(dataset_name) if dataset_name else None, + tags=dataset_tags, + task_filter=dataset_filter, + ) + if not tasks and raise_on_error: + raise ValueError("Dataset project={} name={} could not be found".format(dataset_project, dataset_name)) + if len(tasks) > 1 and raise_on_error: + raise ValueError("Too many datasets matching project={} name={}".format(dataset_project, dataset_name)) + dataset_id = tasks[0].id + else: + dataset_id, _ = cls._get_dataset_id_by_version( + dataset_project, dataset_name, dataset_version=dataset_version + ) + if not dataset_id and raise_on_error: + raise ValueError( + "Dataset project={} name={} version={} could not be found".format( + dataset_project, dataset_name, dataset_version + ) + ) + return dataset_id + + @classmethod + def _build_hidden_project_name(cls, dataset_project, dataset_name): + # type: (str, str) -> Tuple[str, str] + """ + Build the corresponding hidden name of a dataset, given its `dataset_project` + and `dataset_name` + + :param dataset_project: Dataset's project + :param dataset_name: Dataset name passed by the user + + :return: Tuple of 2 strings, one is the corresponding hidden dataset project and one + is the parent project + """ + dataset_project = cls._remove_hidden_part_from_dataset_project(dataset_project) + if bool(Session.check_min_api_server_version("2.17")): + parent_project = "{}.datasets".format(dataset_project + "/" if dataset_project else "") + project_name = "{}/{}".format(parent_project, dataset_name) + else: + parent_project = None + project_name = dataset_project or "Datasets" + return project_name, parent_project + + @classmethod + def _remove_hidden_part_from_dataset_project(cls, dataset_project): + # type: (str, str) -> str + """ + The project name contains the '.datasets' part, as well as the dataset_name. + Remove those parts and return the project used when creating the dataset. + + :param dataset_project: Current project name + + :return: The project name without the '.datasets' part + """ + return dataset_project.partition("/.datasets/")[0] + @classmethod def _get_chunk_idx_from_artifact_name(cls, artifact_name): # type: (str) -> int diff --git a/clearml/storage/util.py b/clearml/storage/util.py index b8300113..9b2be9cd 100644 --- a/clearml/storage/util.py +++ b/clearml/storage/util.py @@ -144,14 +144,18 @@ def is_windows(): return sys.platform == 'win32' -def format_size(size_in_bytes, binary=False): - # type: (Union[int, float], bool) -> str +def format_size(size_in_bytes, binary=False, use_nonbinary_notation=False, use_b_instead_of_bytes=False): + # type: (Union[int, float], bool, bool, bool) -> str """ Return the size in human readable format (string) Matching humanfriendly.format_size outputs :param size_in_bytes: number of bytes :param binary: If `True` 1 Kb equals 1024 bytes, if False (default) 1 KB = 1000 bytes + :param use_nonbinary_notation: Only applies if binary is `True`. If this is `True`, + the binary scale (KiB, MiB etc.) will be replaced with the regular scale (KB, MB etc.) + :param use_b_instead_of_bytes: If `True`, return the formatted size with `B` as the + scale instead of `byte(s)` (when applicable) :return: string representation of the number of bytes (b,Kb,Mb,Gb, Tb,) >>> format_size(0) '0 bytes' @@ -168,16 +172,25 @@ def format_size(size_in_bytes, binary=False): """ size = float(size_in_bytes) # single byte is the exception here - if size == 1: - return '{} byte'.format(int(size)) + if size == 1 and not use_b_instead_of_bytes: + return "{} byte".format(int(size)) k = 1024 if binary else 1000 - scale = ('bytes', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB') if binary else ('bytes', 'KB', 'MB', 'GB', 'TB', 'PB') + scale = ( + ["bytes", "KiB", "MiB", "GiB", "TiB", "PiB"] + if (binary and not use_nonbinary_notation) + else ["bytes", "KB", "MB", "GB", "TB", "PB"] + ) + if use_b_instead_of_bytes: + scale[0] = "B" for i, m in enumerate(scale): - if size < k**(i+1) or i == len(scale)-1: - return ('{:.2f}'.format(size/(k**i)).rstrip('0').rstrip('.') - if i > 0 else '{}'.format(int(size))) + ' ' + m + if size < k ** (i + 1) or i == len(scale) - 1: + return ( + ("{:.2f}".format(size / (k ** i)).rstrip("0").rstrip(".") if i > 0 else "{}".format(int(size))) + + " " + + m + ) # we should never get here - return '{} {}'.format(int(size), scale[0]) + return "{} {}".format(int(size), scale[0]) def parse_size(size, binary=False): diff --git a/clearml/utilities/version.py b/clearml/utilities/version.py index 8492bac2..0b21e687 100644 --- a/clearml/utilities/version.py +++ b/clearml/utilities/version.py @@ -1,8 +1,9 @@ from __future__ import absolute_import, division, print_function -import collections -import re +from copy import deepcopy +from attr import attrs, attrib +import re import six if six.PY3: @@ -17,9 +18,14 @@ class InvalidVersion(ValueError): """ -_Version = collections.namedtuple( - "_Version", ["epoch", "release", "dev", "pre", "post", "local"] -) +@attrs +class _Version: + epoch = attrib() + release = attrib() + dev = attrib() + pre = attrib() + post = attrib() + local = attrib() class _BaseVersion(object): @@ -149,6 +155,29 @@ class Version(_BaseVersion): return "".join(parts) + def get_next_version(self): + def increment(part): + if isinstance(part, int): + return part + 1 + type_ = type(part) + part = list(part) + if isinstance(part[-1], int): + part[-1] += 1 + return type_(part) + + next_version = deepcopy(self) + if next_version._version.dev: + next_version._version.dev = increment(next_version._version.dev) + elif next_version._version.post: + next_version._version.post = increment(next_version._version.post) + elif next_version._version.pre: + next_version._version.pre = increment(next_version._version.pre) + elif next_version._version.release: + next_version._version.release = increment(next_version._version.release) + elif next_version._version.epoch: + next_version._version.epoch = increment(next_version._version.epoch) + return next_version + @property def epoch(self): return self._version.epoch