Fix pipeline monitor must be called after pipeline is completed (just in case we missed something)

This commit is contained in:
allegroai 2023-08-04 19:07:05 +03:00
parent 99c7eecbee
commit b846772138

View File

@ -785,7 +785,8 @@ class PipelineController(object):
pass pass
:param tags: A list of tags for the specific pipeline step. :param tags: A list of tags for the specific pipeline step.
When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. When executing a Pipeline remotely
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:return: True if successful :return: True if successful
""" """
@ -1431,7 +1432,8 @@ class PipelineController(object):
# type: (Union[Sequence[str], str]) -> None # type: (Union[Sequence[str], str]) -> None
""" """
Add tags to this pipeline. Old tags are not deleted. Add tags to this pipeline. Old tags are not deleted.
When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. When executing a Pipeline remotely
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:param tags: A list of tags for this pipeline. :param tags: A list of tags for this pipeline.
""" """
@ -2075,7 +2077,8 @@ class PipelineController(object):
pass pass
:param tags: A list of tags for the specific pipeline step. :param tags: A list of tags for the specific pipeline step.
When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. When executing a Pipeline remotely
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:return: True if successful :return: True if successful
""" """
@ -3190,7 +3193,8 @@ class PipelineController(object):
name=artifact_name, name=artifact_name,
artifact_object=artifact_object, artifact_object=artifact_object,
wait_on_upload=True, wait_on_upload=True,
extension_name=".pkl" if isinstance(artifact_object, dict) and not self._artifact_serialization_function else None, extension_name=".pkl" if isinstance(artifact_object, dict) and
not self._artifact_serialization_function else None,
serialization_function=self._artifact_serialization_function serialization_function=self._artifact_serialization_function
) )
@ -3468,6 +3472,7 @@ class PipelineDecorator(PipelineController):
# visualize pipeline state (plot) # visualize pipeline state (plot)
self.update_execution_plot() self.update_execution_plot()
self._scan_monitored_nodes()
if self._stop_event: if self._stop_event:
# noinspection PyBroadException # noinspection PyBroadException
@ -3803,7 +3808,8 @@ class PipelineDecorator(PipelineController):
pass pass
:param tags: A list of tags for the specific pipeline step. :param tags: A list of tags for the specific pipeline step.
When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. When executing a Pipeline remotely
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:return: function wrapper :return: function wrapper
""" """
@ -3955,8 +3961,9 @@ class PipelineDecorator(PipelineController):
# Note that for the first iteration (when `_node.name == _node_name`) # Note that for the first iteration (when `_node.name == _node_name`)
# we always increment the name, as the name is always in `_launched_step_names` # we always increment the name, as the name is always in `_launched_step_names`
while _node.name in cls._singleton._launched_step_names or ( while _node.name in cls._singleton._launched_step_names or (
_node.name in cls._singleton._nodes _node.name in cls._singleton._nodes and
and cls._singleton._nodes[_node.name].job_code_section != cls._singleton._nodes[_node_name].job_code_section cls._singleton._nodes[_node.name].job_code_section !=
cls._singleton._nodes[_node_name].job_code_section
): ):
_node.name = "{}_{}".format(_node_name, counter) _node.name = "{}_{}".format(_node_name, counter)
counter += 1 counter += 1