Allow Dataset to be created on a running Task

This commit is contained in:
allegroai 2021-02-21 14:51:24 +02:00
parent a5a23510e8
commit 7dbf5471cc

View File

@ -65,7 +65,9 @@ class Dataset(object):
self._dataset_file_entries = {} # type: Dict[str, FileEntry]
# this will create a graph of all the dependencies we have, each entry lists it's own direct parents
self._dependency_graph = {} # type: Dict[str, List[str]]
self._created_task = False
if not task:
self._created_task = True
task = Task.create(
project_name=dataset_project, task_name=dataset_name, task_type=Task.TaskTypes.data_processing)
task.set_system_tags((task.get_system_tags() or []) + [self.__tag])
@ -84,7 +86,7 @@ class Dataset(object):
task._edit(script=task.data.script)
# if the task is running make sure we ping to the server so it will not be aborted by a watchdog
if task.status in ('created', 'in_progress'):
if self._created_task and task.status in ('created', 'in_progress'):
self._task_pinger = DevWorker()
self._task_pinger.register(task, stop_signal_support=False)
else:
@ -630,8 +632,8 @@ class Dataset(object):
return self._task.output_uri or self._task.get_logger().get_default_upload_destination()
@classmethod
def create(cls, dataset_name, dataset_project=None, parent_datasets=None):
# type: (str, Optional[str], Optional[Sequence[Union[str, Dataset]]]) -> Dataset
def create(cls, dataset_name, dataset_project=None, parent_datasets=None, use_current_task=False):
# type: (str, Optional[str], Optional[Sequence[Union[str, Dataset]]], bool) -> Dataset
"""
Create a new dataset. Multiple dataset parents are supported.
Merging of parent datasets is done based on the order,
@ -641,6 +643,8 @@ class Dataset(object):
:param dataset_project: Project containing the dataset.
If not specified, infer project name form parent datasets
:param parent_datasets: Expand a parent dataset by adding/removing files
:param use_current_task: False (default), a new Dataset task is created.
If True, the dataset is created on the current Task.
:return: Newly created Dataset object
"""
parent_datasets = [cls.get(dataset_id=p) if isinstance(p, str) else p for p in (parent_datasets or [])]
@ -660,7 +664,10 @@ class Dataset(object):
for p in parent_datasets:
dataset_file_entries.update(deepcopy(p._dataset_file_entries))
dependency_graph.update(deepcopy(p._dependency_graph))
instance = cls(_private=cls.__private_magic, dataset_project=dataset_project, dataset_name=dataset_name)
instance = cls(_private=cls.__private_magic,
dataset_project=dataset_project,
dataset_name=dataset_name,
task=Task.current_task() if use_current_task else None)
instance._task.get_logger().report_text('Dataset created', print_console=False)
instance._dataset_file_entries = dataset_file_entries
instance._dependency_graph = dependency_graph
@ -1203,6 +1210,11 @@ class Dataset(object):
def _add_script_call(self, func_name, **kwargs):
# type: (str, **Any) -> ()
# if we never created the Task, we should not add the script calls
if not self._created_task:
return
args = ', '.join('\n {}={}'.format(k, '\''+str(v)+'\'' if isinstance(v, (str, Path, _Path)) else v)
for k, v in kwargs.items())
if args: