Allow tuning the behavior of pipeline_controller.is_successful

This commit is contained in:
Alex Burlacu 2023-03-23 18:10:23 +02:00
parent 067c817f30
commit c5b05fcd49

View File

@ -22,9 +22,10 @@ from .. import Logger
from ..automation import ClearmlJob
from ..backend_api import Session
from ..backend_interface.task.populate import CreateFromFunction
from ..backend_interface.util import get_or_create_project, exact_match_regex
from ..backend_interface.util import get_or_create_project
from ..config import get_remote_task_id
from ..debugging.log import LoggerRoot
from ..errors import UsageError
from ..model import BaseModel, OutputModel
from ..storage.util import hash_dict
from ..task import Task
@ -1152,14 +1153,41 @@ class PipelineController(object):
"""
return self._thread is not None and self._thread.is_alive()
def is_successful(self):
# type: () -> bool
def is_successful(self, fail_on_step_fail=True, fail_condition="all"):
# type: (bool, str) -> bool
"""
return True if the pipeline controller is fully executed and none of the steps / Tasks failed
Evaluate whether or not the pipeline is successful
:param fail_on_step_fail: If True (default), evaluate the pipeline steps' status to assess if the pipeline
is successful. If False, only evaluate the controller
:param fail_condition: Must be one of the following: 'all' (default), 'failed' or 'aborted'. If 'failed', this
function will return False if the pipeline failed and True if the pipeline was aborted. If 'aborted',
this function will return False if the pipeline was aborted and True if the pipeline failed. If 'all',
this function will return False in both cases.
:return: A boolean indicating whether all steps did not fail
:return: A boolean indicating whether the pipeline was successful or not. Note that if the pipeline is in a
running/pending state, this function will return False
"""
return self._thread and not self.is_running() and not self._pipeline_task_status_failed
if fail_condition == "all":
success_status = [Task.TaskStatusEnum.completed]
elif fail_condition == "failed":
success_status = [Task.TaskStatusEnum.completed, Task.TaskStatusEnum.stopped]
elif fail_condition == "aborted":
success_status = [Task.TaskStatusEnum.completed, Task.TaskStatusEnum.failed]
else:
raise UsageError("fail_condition needs to be one of the following: 'all', 'failed', 'aborted'")
if self._task.status not in success_status:
return False
if not fail_on_step_fail:
return True
self._update_nodes_status()
for node in self._nodes.values():
if node.status not in success_status:
return False
return True
def elapsed(self):
# type: () -> float