Add support for dataset version, fix trigger for datasets

This commit is contained in:
allegroai 2022-06-28 21:07:46 +03:00
parent a013dd5e55
commit 68cf2745ff
5 changed files with 1086 additions and 237 deletions

View File

@ -274,23 +274,23 @@ class TriggerScheduler(BaseScheduler):
self._model_triggers.append(trigger) self._model_triggers.append(trigger)
def add_dataset_trigger( def add_dataset_trigger(
self, self,
schedule_task_id=None, # type: Union[str, Task] schedule_task_id=None, # type: Union[str, Task]
schedule_queue=None, # type: str schedule_queue=None, # type: str
schedule_function=None, # type: Callable[[str], None] schedule_function=None, # type: Callable[[str], None]
trigger_project=None, # type: str trigger_project=None, # type: str
trigger_name=None, # type: Optional[str] trigger_name=None, # type: Optional[str]
trigger_on_publish=None, # type: bool trigger_on_publish=None, # type: bool
trigger_on_tags=None, # type: Optional[List[str]] trigger_on_tags=None, # type: Optional[List[str]]
trigger_on_archive=None, # type: bool trigger_on_archive=None, # type: bool
trigger_required_tags=None, # type: Optional[List[str]] trigger_required_tags=None, # type: Optional[List[str]]
name=None, # type: Optional[str] name=None, # type: Optional[str]
target_project=None, # type: Optional[str] target_project=None, # type: Optional[str]
add_tag=True, # type: Union[bool, str] add_tag=True, # type: Union[bool, str]
single_instance=False, # type: bool single_instance=False, # type: bool
reuse_task=False, # type: bool reuse_task=False, # type: bool
task_parameters=None, # type: Optional[dict] task_parameters=None, # type: Optional[dict]
task_overrides=None, # type: Optional[dict] task_overrides=None, # type: Optional[dict]
): ):
# type: (...) -> None # type: (...) -> None
""" """
@ -331,26 +331,51 @@ class TriggerScheduler(BaseScheduler):
for example {'script.version_num': None, 'script.branch': 'main'} Notice: not available when reuse_task=True for example {'script.version_num': None, 'script.branch': 'main'} Notice: not available when reuse_task=True
:return: True if job is successfully added to the scheduling list :return: True if job is successfully added to the scheduling list
""" """
trigger = DatasetTrigger( if trigger_project:
base_task_id=schedule_task_id, trigger_project_list = Task.get_projects(
base_function=schedule_function, name="^{}/\\.datasets/.*".format(trigger_project), search_hidden=True, _allow_extra_fields_=True
queue=schedule_queue, )
name=name, for project in trigger_project_list:
target_project=target_project, trigger = DatasetTrigger(
single_instance=single_instance, base_task_id=schedule_task_id,
task_parameters=task_parameters, base_function=schedule_function,
task_overrides=task_overrides, queue=schedule_queue,
add_tag=(add_tag if isinstance(add_tag, str) else (name or schedule_task_id)) if add_tag else None, name=name,
clone_task=not bool(reuse_task), target_project=target_project,
match_name=trigger_name, single_instance=single_instance,
project=Task.get_project_id(trigger_project) if trigger_project else None, task_parameters=task_parameters,
tags=trigger_on_tags, task_overrides=task_overrides,
required_tags=trigger_required_tags, add_tag=(add_tag if isinstance(add_tag, str) else (name or schedule_task_id)) if add_tag else None,
on_publish=trigger_on_publish, clone_task=not bool(reuse_task),
on_archive=trigger_on_archive, match_name=trigger_name,
) project=project.id,
trigger.verify() tags=trigger_on_tags,
self._dataset_triggers.append(trigger) required_tags=trigger_required_tags,
on_publish=trigger_on_publish,
on_archive=trigger_on_archive,
)
trigger.verify()
self._dataset_triggers.append(trigger)
else:
trigger = DatasetTrigger(
base_task_id=schedule_task_id,
base_function=schedule_function,
queue=schedule_queue,
name=name,
target_project=target_project,
single_instance=single_instance,
task_parameters=task_parameters,
task_overrides=task_overrides,
add_tag=(add_tag if isinstance(add_tag, str) else (name or schedule_task_id)) if add_tag else None,
clone_task=not bool(reuse_task),
match_name=trigger_name,
tags=trigger_on_tags,
required_tags=trigger_required_tags,
on_publish=trigger_on_publish,
on_archive=trigger_on_archive,
)
trigger.verify()
self._dataset_triggers.append(trigger)
def add_task_trigger( def add_task_trigger(
self, self,

View File

@ -75,6 +75,10 @@ def cli():
'Example: a17b4fID1 f0ee5ID2 a17b4f09eID3') 'Example: a17b4fID1 f0ee5ID2 a17b4f09eID3')
create.add_argument('--project', type=str, required=False, default=None, help='Dataset project name') create.add_argument('--project', type=str, required=False, default=None, help='Dataset project name')
create.add_argument('--name', type=str, required=True, default=None, help='Dataset name') create.add_argument('--name', type=str, required=True, default=None, help='Dataset name')
create.add_argument("--version", type=str, required=False, default=None, help="Dataset version")
create.add_argument(
"--output-uri", type=str, required=False, default=None, help="Output URI for files in this dataset"
)
create.add_argument('--tags', type=str, nargs='*', help='Dataset user Tags') create.add_argument('--tags', type=str, nargs='*', help='Dataset user Tags')
create.set_defaults(func=ds_create) create.set_defaults(func=ds_create)
@ -115,6 +119,7 @@ def cli():
help='[Optional] Dataset project name') help='[Optional] Dataset project name')
sync.add_argument('--name', type=str, required=False, default=None, sync.add_argument('--name', type=str, required=False, default=None,
help='[Optional] Dataset project name') help='[Optional] Dataset project name')
sync.add_argument("--version", type=str, required=False, default=None, help="[Optional] Dataset version")
sync.add_argument('--tags', type=str, nargs='*', sync.add_argument('--tags', type=str, nargs='*',
help='[Optional] Dataset user Tags') help='[Optional] Dataset user Tags')
sync.add_argument('--storage', type=str, default=None, sync.add_argument('--storage', type=str, default=None,
@ -219,6 +224,7 @@ def cli():
help='Specify dataset id (or use project/name instead). Default: previously accessed dataset.') help='Specify dataset id (or use project/name instead). Default: previously accessed dataset.')
ls.add_argument('--project', type=str, help='Specify dataset project name') ls.add_argument('--project', type=str, help='Specify dataset project name')
ls.add_argument('--name', type=str, help='Specify dataset name') ls.add_argument('--name', type=str, help='Specify dataset name')
ls.add_argument("--version", type=str, help="Specify dataset version", default=None)
ls.add_argument('--filter', type=str, nargs='*', ls.add_argument('--filter', type=str, nargs='*',
help='Filter files based on folder / wildcard, multiple filters are supported. ' help='Filter files based on folder / wildcard, multiple filters are supported. '
'Example: folder/date_*.json folder/sub-folder') 'Example: folder/date_*.json folder/sub-folder')
@ -323,7 +329,12 @@ def ds_get(args):
def ds_list(args): def ds_list(args):
print('List dataset content: {}'.format(args.id or (args.project, args.name))) print('List dataset content: {}'.format(args.id or (args.project, args.name)))
print_args(args) print_args(args)
ds = Dataset.get(dataset_id=args.id or None, dataset_project=args.project or None, dataset_name=args.name or None) ds = Dataset.get(
dataset_id=args.id or None,
dataset_project=args.project or None,
dataset_name=args.name or None,
dataset_version=args.version,
)
print('Listing dataset content') print('Listing dataset content')
formatting = '{:64} | {:10,} | {:64}' formatting = '{:64} | {:10,} | {:64}'
print(formatting.replace(',', '').format('file name', 'size', 'hash')) print(formatting.replace(',', '').format('file name', 'size', 'hash'))
@ -506,13 +517,19 @@ def ds_add(args):
def ds_create(args): def ds_create(args):
print('Creating a new dataset:') print("Creating a new dataset:")
print_args(args) print_args(args)
ds = Dataset.create(dataset_project=args.project, dataset_name=args.name, parent_datasets=args.parents) ds = Dataset.create(
dataset_project=args.project,
dataset_name=args.name,
parent_datasets=args.parents,
dataset_version=args.version,
output_uri=args.output_uri,
)
if args.tags: if args.tags:
ds.tags = ds.tags + args.tags ds.tags = ds.tags + args.tags
print('New dataset created id={}'.format(ds.id)) print("New dataset created id={}".format(ds.id))
clear_state({'id': ds.id}) clear_state({"id": ds.id})
return ds.id return ds.id

File diff suppressed because it is too large Load Diff

View File

@ -144,14 +144,18 @@ def is_windows():
return sys.platform == 'win32' return sys.platform == 'win32'
def format_size(size_in_bytes, binary=False): def format_size(size_in_bytes, binary=False, use_nonbinary_notation=False, use_b_instead_of_bytes=False):
# type: (Union[int, float], bool) -> str # type: (Union[int, float], bool, bool, bool) -> str
""" """
Return the size in human readable format (string) Return the size in human readable format (string)
Matching humanfriendly.format_size outputs Matching humanfriendly.format_size outputs
:param size_in_bytes: number of bytes :param size_in_bytes: number of bytes
:param binary: If `True` 1 Kb equals 1024 bytes, if False (default) 1 KB = 1000 bytes :param binary: If `True` 1 Kb equals 1024 bytes, if False (default) 1 KB = 1000 bytes
:param use_nonbinary_notation: Only applies if binary is `True`. If this is `True`,
the binary scale (KiB, MiB etc.) will be replaced with the regular scale (KB, MB etc.)
:param use_b_instead_of_bytes: If `True`, return the formatted size with `B` as the
scale instead of `byte(s)` (when applicable)
:return: string representation of the number of bytes (b,Kb,Mb,Gb, Tb,) :return: string representation of the number of bytes (b,Kb,Mb,Gb, Tb,)
>>> format_size(0) >>> format_size(0)
'0 bytes' '0 bytes'
@ -168,16 +172,25 @@ def format_size(size_in_bytes, binary=False):
""" """
size = float(size_in_bytes) size = float(size_in_bytes)
# single byte is the exception here # single byte is the exception here
if size == 1: if size == 1 and not use_b_instead_of_bytes:
return '{} byte'.format(int(size)) return "{} byte".format(int(size))
k = 1024 if binary else 1000 k = 1024 if binary else 1000
scale = ('bytes', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB') if binary else ('bytes', 'KB', 'MB', 'GB', 'TB', 'PB') scale = (
["bytes", "KiB", "MiB", "GiB", "TiB", "PiB"]
if (binary and not use_nonbinary_notation)
else ["bytes", "KB", "MB", "GB", "TB", "PB"]
)
if use_b_instead_of_bytes:
scale[0] = "B"
for i, m in enumerate(scale): for i, m in enumerate(scale):
if size < k**(i+1) or i == len(scale)-1: if size < k ** (i + 1) or i == len(scale) - 1:
return ('{:.2f}'.format(size/(k**i)).rstrip('0').rstrip('.') return (
if i > 0 else '{}'.format(int(size))) + ' ' + m ("{:.2f}".format(size / (k ** i)).rstrip("0").rstrip(".") if i > 0 else "{}".format(int(size)))
+ " "
+ m
)
# we should never get here # we should never get here
return '{} {}'.format(int(size), scale[0]) return "{} {}".format(int(size), scale[0])
def parse_size(size, binary=False): def parse_size(size, binary=False):

View File

@ -1,8 +1,9 @@
from __future__ import absolute_import, division, print_function from __future__ import absolute_import, division, print_function
import collections from copy import deepcopy
import re from attr import attrs, attrib
import re
import six import six
if six.PY3: if six.PY3:
@ -17,9 +18,14 @@ class InvalidVersion(ValueError):
""" """
_Version = collections.namedtuple( @attrs
"_Version", ["epoch", "release", "dev", "pre", "post", "local"] class _Version:
) epoch = attrib()
release = attrib()
dev = attrib()
pre = attrib()
post = attrib()
local = attrib()
class _BaseVersion(object): class _BaseVersion(object):
@ -149,6 +155,29 @@ class Version(_BaseVersion):
return "".join(parts) return "".join(parts)
def get_next_version(self):
def increment(part):
if isinstance(part, int):
return part + 1
type_ = type(part)
part = list(part)
if isinstance(part[-1], int):
part[-1] += 1
return type_(part)
next_version = deepcopy(self)
if next_version._version.dev:
next_version._version.dev = increment(next_version._version.dev)
elif next_version._version.post:
next_version._version.post = increment(next_version._version.post)
elif next_version._version.pre:
next_version._version.pre = increment(next_version._version.pre)
elif next_version._version.release:
next_version._version.release = increment(next_version._version.release)
elif next_version._version.epoch:
next_version._version.epoch = increment(next_version._version.epoch)
return next_version
@property @property
def epoch(self): def epoch(self):
return self._version.epoch return self._version.epoch