Add PipelineController auto_version_bump, fix PipelineController skipping queued Tasks.

This commit is contained in:
allegroai 2021-09-13 01:27:40 +03:00
parent edb076c462
commit 3dad7ccf65

View File

@ -64,6 +64,7 @@ class PipelineController(object):
pool_frequency=0.2, # type: float
add_pipeline_tags=False, # type: bool
target_project=None, # type: Optional[str]
auto_version_bump=True, # type: bool
):
# type: (...) -> None
"""
@ -77,6 +78,9 @@ class PipelineController(object):
:param bool add_pipeline_tags: (default: False) if True, add `pipe: <pipeline_task_id>` tag to all
steps (Tasks) created by this pipeline.
:param str target_project: If provided, all pipeline steps are cloned into the target project
:param bool auto_version_bump: If True (default), if the same pipeline version already exists
(with any difference from the current one), the current pipeline version will be bumped to a new version
version bump examples: 1.0.0 -> 1.0.1 , 1.2 -> 1.3, 10 -> 11 etc.
"""
self._nodes = {}
self._running_nodes = []
@ -103,13 +107,14 @@ class PipelineController(object):
self._step_ref_pattern = re.compile(self._step_pattern)
self._reporting_lock = RLock()
self._pipeline_task_status_failed = None
self._auto_version_bump = bool(auto_version_bump)
if not self._task:
self._task = Task.init(
project_name=project or 'Pipelines',
task_name=name or 'Pipeline {}'.format(datetime.now()),
task_type=Task.TaskTypes.controller,
auto_resource_monitoring=False,
reuse_last_task_id=False##
reuse_last_task_id=False
)
self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag])
self._task.set_user_properties(version=self._version)
@ -904,6 +909,11 @@ class PipelineController(object):
return params, pipeline_dag
def _verify_pipeline_version(self):
# if no version bump needed, just set the property
if not self._auto_version_bump:
self._task.set_user_properties(version=self._version)
return
# check if pipeline version exists, if it does increase version
pipeline_hash = self._get_task_hash()
# noinspection PyProtectedMember
@ -1161,11 +1171,11 @@ class PipelineController(object):
node.executed = node.job.task_id()
if task_factory_func_task:
task_factory_func_task.delete(raise_on_error=False)
self._running_nodes.append(node.name)
else:
self._running_nodes.append(node.name)
return node.job.launch(queue_name=node.queue or self._default_execution_queue)
self._running_nodes.append(node.name)
return True
def _update_execution_plot(self):