Add some convenience functionality to clearml-data (#526)

* Add some convenience functionality to clearml-data to allow for fast creation of new dataset versions.

* Added get_existing_project function to data utils and cleaned up typos and docstrings there

* Fixed black formatting
This commit is contained in:
Victor Sonck 2021-12-29 11:16:45 +01:00 committed by GitHub
parent 30c3968cd7
commit c226a74806
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 22 deletions

View File

@ -52,15 +52,29 @@ def make_message(s, **kwargs):
return s % args
def get_or_create_project(session, project_name, description=None):
def get_existing_project(session, project_name):
"""Return either the project ID if it exists, an empty string if it doesn't or None if backend request failed."""
res = session.send(projects.GetAllRequest(name=exact_match_regex(project_name), only_fields=['id']))
if not res:
return None
if res.response and res.response.projects:
return res.response.projects[0].id
return ""
def get_or_create_project(session, project_name, description=None):
"""Return the ID of an existing project, or if it does not exist, make a new one and return that ID instead."""
project_id = get_existing_project(session, project_name)
if project_id:
return project_id
if project_id == "":
# Project was not found, so create a new one
res = session.send(projects.CreateRequest(name=project_name, description=description or ''))
return res.response.id
# This should only happen if backend response was None and so project_id is also None
return None
def get_queue_id(session, queue):
# type: ('Session', str) -> Optional[str] # noqa: F821

View File

@ -15,7 +15,7 @@ from pathlib2 import Path
from .. import Task, StorageManager, Logger
from ..backend_api.session.client import APIClient
from ..backend_interface.task.development.worker import DevWorker
from ..backend_interface.util import mutually_exclusive, exact_match_regex
from ..backend_interface.util import mutually_exclusive, exact_match_regex, get_existing_project
from ..config import deferred_config
from ..debugging.log import LoggerRoot
from ..storage.helper import StorageHelper
@ -463,19 +463,25 @@ class Dataset(object):
self._dirty = False
self._serialize()
def finalize(self, verbose=False, raise_on_error=True):
# type: (bool, bool) -> bool
def finalize(self, verbose=False, raise_on_error=True, auto_upload=False):
# type: (bool, bool, bool) -> bool
"""
Finalize the dataset publish dataset Task. upload must first called to verify there are not pending uploads.
If files do need to be uploaded, it throws an exception (or return False)
:param verbose: If True print verbose progress report
:param raise_on_error: If True raise exception if dataset finalizing failed
:param auto_upload: Automatically upload dataset if not called yet, will upload to default location.
"""
# check we do not have files waiting for upload.
if self._dirty:
if raise_on_error:
if auto_upload:
self._task.get_logger().report_text("Pending uploads, starting dataset upload to {}"
.format(self.get_default_storage()))
self.upload()
elif raise_on_error:
raise ValueError("Cannot finalize dataset, pending uploads. Call Dataset.upload(...)")
else:
return False
status = self._task.get_status()
@ -898,7 +904,9 @@ class Dataset(object):
dataset_name=None, # type: Optional[str]
dataset_tags=None, # type: Optional[Sequence[str]]
only_completed=False, # type: bool
only_published=False # type: bool
only_published=False, # type: bool
auto_create=False, # type: bool
writable_copy=False # type: bool
):
# type: (...) -> "Dataset"
"""
@ -910,13 +918,21 @@ class Dataset(object):
:param dataset_tags: Requested Dataset tags (list of tag strings)
:param only_completed: Return only if the requested dataset is completed or published
:param only_published: Return only if the requested dataset is published
:param auto_create: Create new dataset if it does not exist yet
:param writable_copy: Get a newly created mutable dataset with the current one as its parent,
so new files can added to the instance.
:return: Dataset object
"""
mutually_exclusive(dataset_id=dataset_id, dataset_project=dataset_project, _require_at_least_one=False)
mutually_exclusive(dataset_id=dataset_id, dataset_name=dataset_name, _require_at_least_one=False)
if not any([dataset_id, dataset_project, dataset_name, dataset_tags]):
raise ValueError('Dataset selection provided not provided (id/name/project/tags')
raise ValueError("Dataset selection criteria not met. Didn't provide id/name/project/tags correctly.")
if auto_create and not get_existing_project(
session=Task._get_default_session(), project_name=dataset_project
):
tasks = []
else:
tasks = Task.get_tasks(
task_ids=[dataset_id] if dataset_id else None,
project_name=dataset_project,
@ -929,7 +945,12 @@ class Dataset(object):
status=['published'] if only_published else
['published', 'completed', 'closed'] if only_completed else None)
)
if not tasks:
if auto_create:
instance = Dataset.create(dataset_name=dataset_name, dataset_project=dataset_project,
dataset_tags=dataset_tags)
return instance
raise ValueError('Could not find Dataset {} {}'.format(
'id' if dataset_id else 'project/name',
dataset_id if dataset_id else (dataset_project, dataset_name)))
@ -952,6 +973,17 @@ class Dataset(object):
if force_download and local_state_file:
os.unlink(local_state_file)
# Now we have the requested dataset, but if we want a mutable copy instead, we create a new dataset with the
# current one as its parent. So one can add files to it and finalize as a new version.
if writable_copy:
writeable_instance = Dataset.create(
dataset_name=instance.name,
dataset_project=instance.project,
dataset_tags=instance.tags,
parent_datasets=[instance.id],
)
return writeable_instance
return instance
def get_logger(self):