Add support for better pipeline continue behavior including control of children

This commit is contained in:
allegroai 2024-05-17 10:19:11 +03:00
parent 47588da713
commit 3012837bf3
2 changed files with 185 additions and 49 deletions

View File

@ -75,35 +75,70 @@ class PipelineController(object):
@attrs
class Node(object):
name = attrib(type=str) # pipeline step name
base_task_id = attrib(type=str, default=None) # base Task ID to be cloned and launched
task_factory_func = attrib(type=Callable, default=None) # alternative to base_task_id, function creating a Task
queue = attrib(type=str, default=None) # execution queue name to use
parents = attrib(type=list, default=None) # list of parent DAG steps
timeout = attrib(type=float, default=None) # execution timeout limit
parameters = attrib(type=dict, default=None) # Task hyper-parameters to change
configurations = attrib(type=dict, default=None) # Task configuration objects to change
task_overrides = attrib(type=dict, default=None) # Task overrides to change
executed = attrib(type=str, default=None) # The actual executed Task ID (None if not executed yet)
status = attrib(type=str, default="pending") # The Node Task status (cached, aborted, etc.)
clone_task = attrib(type=bool, default=True) # If True cline the base_task_id, then execute the cloned Task
job = attrib(type=ClearmlJob, default=None) # ClearMLJob object
job_type = attrib(type=str, default=None) # task type (string)
job_started = attrib(type=float, default=None) # job startup timestamp (epoch ts in seconds)
job_ended = attrib(type=float, default=None) # job startup timestamp (epoch ts in seconds)
job_code_section = attrib(type=str, default=None) # pipeline code configuration section name
skip_job = attrib(type=bool, default=False) # if True, this step should be skipped
continue_on_fail = attrib(type=bool, default=False) # if True, the pipeline continues even if the step failed
cache_executed_step = attrib(type=bool, default=False) # if True this pipeline step should be cached
return_artifacts = attrib(type=list, default=None) # List of artifact names returned by the step
monitor_metrics = attrib(type=list, default=None) # List of metric title/series to monitor
monitor_artifacts = attrib(type=list, default=None) # List of artifact names to monitor
monitor_models = attrib(type=list, default=None) # List of models to monitor
explicit_docker_image = attrib(type=str, default=None) # The Docker image the node uses, specified at creation
recursively_parse_parameters = attrib(type=bool, default=False) # if True, recursively parse parameters in
# lists, dicts, or tuples
output_uri = attrib(type=Union[bool, str], default=None) # The default location for output models and other artifacts
draft = attrib(type=bool, default=False) # Specify whether to create the Task as a draft
# pipeline step name
name = attrib(type=str)
# base Task ID to be cloned and launched
base_task_id = attrib(type=str, default=None)
# alternative to base_task_id, function creating a Task
task_factory_func = attrib(type=Callable, default=None)
# execution queue name to use
queue = attrib(type=str, default=None)
# list of parent DAG steps
parents = attrib(type=list, default=None)
# execution timeout limit
timeout = attrib(type=float, default=None)
# Task hyper-parameters to change
parameters = attrib(type=dict, default=None)
# Task configuration objects to change
configurations = attrib(type=dict, default=None)
# Task overrides to change
task_overrides = attrib(type=dict, default=None)
# The actual executed Task ID (None if not executed yet)
executed = attrib(type=str, default=None)
# The Node Task status (cached, aborted, etc.)
status = attrib(type=str, default="pending")
# If True cline the base_task_id, then execute the cloned Task
clone_task = attrib(type=bool, default=True)
# ClearMLJob object
job = attrib(type=ClearmlJob, default=None)
# task type (string)
job_type = attrib(type=str, default=None)
# job startup timestamp (epoch ts in seconds)
job_started = attrib(type=float, default=None)
# job startup timestamp (epoch ts in seconds)
job_ended = attrib(type=float, default=None)
# pipeline code configuration section name
job_code_section = attrib(type=str, default=None)
# if True, this step should be skipped
skip_job = attrib(type=bool, default=False)
# if True this pipeline step should be cached
cache_executed_step = attrib(type=bool, default=False)
# List of artifact names returned by the step
return_artifacts = attrib(type=list, default=None)
# List of metric title/series to monitor
monitor_metrics = attrib(type=list, default=None)
# List of artifact names to monitor
monitor_artifacts = attrib(type=list, default=None)
# List of models to monitor
monitor_models = attrib(type=list, default=None)
# The Docker image the node uses, specified at creation
explicit_docker_image = attrib(type=str, default=None)
# if True, recursively parse parameters in lists, dicts, or tuples
recursively_parse_parameters = attrib(type=bool, default=False)
# The default location for output models and other artifacts
output_uri = attrib(type=Union[bool, str], default=None)
# Specify whether to create the Task as a draft
draft = attrib(type=bool, default=False)
# continue_behaviour dict, for private use. used to initialize fields related to continuation behaviour
continue_behaviour = attrib(type=dict, default=None)
# if True, the pipeline continues even if the step failed
continue_on_fail = attrib(type=bool, default=False)
# if True, the pipeline continues even if the step was aborted
continue_on_abort = attrib(type=bool, default=False)
# if True, the children of aborted steps are skipped
skip_children_on_abort = attrib(type=bool, default=True)
# if True, the children of failed steps are skipped
skip_children_on_fail = attrib(type=bool, default=True)
def __attrs_post_init__(self):
if self.parents is None:
@ -122,6 +157,12 @@ class PipelineController(object):
self.monitor_artifacts = []
if self.monitor_models is None:
self.monitor_models = []
if self.continue_behaviour is not None:
self.continue_on_fail = self.continue_behaviour.get("continue_on_fail", True)
self.continue_on_abort = self.continue_behaviour.get("continue_on_abort", True)
self.skip_children_on_fail = self.continue_behaviour.get("skip_children_on_fail", True)
self.skip_children_on_abort = self.continue_behaviour.get("skip_children_on_abort", True)
self.continue_behaviour = None
def copy(self):
# type: () -> PipelineController.Node
@ -424,7 +465,8 @@ class PipelineController(object):
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
recursively_parse_parameters=False, # type: bool
output_uri=None # type: Optional[Union[str, bool]]
output_uri=None, # type: Optional[Union[str, bool]]
continue_behaviour=None # type: Optional[dict]
):
# type: (...) -> bool
"""
@ -494,9 +536,10 @@ class PipelineController(object):
use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.
:param clone_base_task: If True (default), the pipeline will clone the base task, and modify/enqueue
the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created).
:param continue_on_fail: (default False). If True, failed step will not cause the pipeline to stop
:param continue_on_fail: (Deprecated, use `continue_behaviour` instead).
If True, failed step will not cause the pipeline to stop
(or marked as failed). Notice, that steps that are connected (or indirectly connected)
to the failed step will be skipped.
to the failed step will be skipped. Defaults to False
:param pre_execute_callback: Callback function, called when the step (Task) is created
and before it is sent for execution. Allows a user to modify the Task before launch.
Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object.
@ -569,9 +612,24 @@ class PipelineController(object):
:param output_uri: The storage / output url for this step. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:param continue_behaviour: Controls whether the pipeline will continue running after a
step failed/was aborted. Different behaviours can be set using a dictionary of boolean options. Supported options are:
- continue_on_fail - If True, the pipeline will continue even if the step failed.
If False, the pipeline will stop
- continue_on_abort - If True, the pipeline will continue even if the step was aborted.
If False, the pipeline will stop
- skip_children_on_fail - If True, the children of this step will be skipped if it failed.
If False, the children will run even if this step failed.
Any parameters passed from the failed step to its children will default to None
- skip_children_on_abort - If True, the children of this step will be skipped if it was aborted.
If False, the children will run even if this step was aborted.
Any parameters passed from the failed step to its children will default to None
If the keys are not present in the dictionary, their values will default to True
:return: True if successful
"""
if continue_on_fail:
warnings.warn("`continue_on_fail` is deprecated. Use `continue_behaviour` instead", DeprecationWarning)
# always store callback functions (even when running remotely)
if pre_execute_callback:
self._pre_step_callbacks[name] = pre_execute_callback
@ -627,7 +685,8 @@ class PipelineController(object):
monitor_metrics=monitor_metrics or [],
monitor_artifacts=monitor_artifacts or [],
monitor_models=monitor_models or [],
output_uri=self._output_uri if output_uri is None else output_uri
output_uri=self._output_uri if output_uri is None else output_uri,
continue_behaviour=continue_behaviour
)
self._retries[name] = 0
self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \
@ -675,7 +734,8 @@ class PipelineController(object):
tags=None, # type: Optional[Union[str, Sequence[str]]]
output_uri=None, # type: Optional[Union[str, bool]]
draft=False, # type: Optional[bool]
working_dir=None # type: Optional[str]
working_dir=None, # type: Optional[str]
continue_behaviour=None # type: Optional[dict]
):
# type: (...) -> bool
"""
@ -769,9 +829,10 @@ class PipelineController(object):
Example: ['model_weights_*', ]
:param time_limit: Default None, no time limit.
Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.
:param continue_on_fail: (default False). If True, failed step will not cause the pipeline to stop
:param continue_on_fail: (Deprecated, use `continue_behaviour` instead).
If True, failed step will not cause the pipeline to stop
(or marked as failed). Notice, that steps that are connected (or indirectly connected)
to the failed step will be skipped.
to the failed step will be skipped. Defaults to False
:param pre_execute_callback: Callback function, called when the step (Task) is created
and before it is sent for execution. Allows a user to modify the Task before launch.
Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object.
@ -846,9 +907,25 @@ class PipelineController(object):
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:param draft: (default False). If True, the Task will be created as a draft task.
:param working_dir: Working directory to launch the script from.
:param continue_behaviour: Controls whether the pipeline will continue running after a
step failed/was aborted. Different behaviours can be set using a dictionary of boolean options. Supported options are:
- continue_on_fail - If True, the pipeline will continue even if the step failed.
If False, the pipeline will stop
- continue_on_abort - If True, the pipeline will continue even if the step was aborted.
If False, the pipeline will stop
- skip_children_on_fail - If True, the children of this step will be skipped if it failed.
If False, the children will run even if this step failed.
Any parameters passed from the failed step to its children will default to None
- skip_children_on_abort - If True, the children of this step will be skipped if it was aborted.
If False, the children will run even if this step was aborted.
Any parameters passed from the failed step to its children will default to None
If the keys are not present in the dictionary, their values will default to True
:return: True if successful
"""
if continue_on_fail:
warnings.warn("`continue_on_fail` is deprecated. Use `continue_behaviour` instead", DeprecationWarning)
function_kwargs = function_kwargs or {}
default_kwargs = inspect.getfullargspec(function)
if default_kwargs and default_kwargs.args and default_kwargs.defaults:
@ -888,7 +965,8 @@ class PipelineController(object):
tags=tags,
output_uri=output_uri,
draft=draft,
working_dir=working_dir
working_dir=working_dir,
continue_behaviour=continue_behaviour
)
def start(
@ -2009,7 +2087,8 @@ class PipelineController(object):
tags=None, # type: Optional[Union[str, Sequence[str]]]
output_uri=None, # type: Optional[Union[str, bool]]
draft=False, # type: Optional[bool]
working_dir=None # type: Optional[str]
working_dir=None, # type: Optional[str]
continue_behaviour=None # type: Optional[dict]
):
# type: (...) -> bool
"""
@ -2103,9 +2182,10 @@ class PipelineController(object):
Example: ['model_weights_*', ]
:param time_limit: Default None, no time limit.
Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.
:param continue_on_fail: (default False). If True, failed step will not cause the pipeline to stop
:param continue_on_fail: (Deprecated, use `continue_behaviour` instead).
If True, failed step will not cause the pipeline to stop
(or marked as failed). Notice, that steps that are connected (or indirectly connected)
to the failed step will be skipped.
to the failed step will be skipped. Defaults to False
:param pre_execute_callback: Callback function, called when the step (Task) is created,
and before it is sent for execution. Allows a user to modify the Task before launch.
Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object.
@ -2180,6 +2260,19 @@ class PipelineController(object):
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:param draft: (default False). If True, the Task will be created as a draft task.
:param working_dir: Working directory to launch the step from.
:param continue_behaviour: Controls whether the pipeline will continue running after a
step failed/was aborted. Different behaviours can be set using a dictionary of boolean options. Supported options are:
- continue_on_fail - If True, the pipeline will continue even if the step failed.
If False, the pipeline will stop
- continue_on_abort - If True, the pipeline will continue even if the step was aborted.
If False, the pipeline will stop
- skip_children_on_fail - If True, the children of this step will be skipped if it failed.
If False, the children will run even if this step failed.
Any parameters passed from the failed step to its children will default to None
- skip_children_on_abort - If True, the children of this step will be skipped if it was aborted.
If False, the children will run even if this step was aborted.
Any parameters passed from the failed step to its children will default to None
If the keys are not present in the dictionary, their values will default to True
:return: True if successful
"""
@ -2320,6 +2413,7 @@ class PipelineController(object):
explicit_docker_image=docker,
output_uri=output_uri,
draft=draft,
continue_behaviour=continue_behaviour
)
self._retries[name] = 0
self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \
@ -2801,7 +2895,13 @@ class PipelineController(object):
self._final_failure[node.name] = True
completed_jobs.append(j)
node.executed = node.job.task_id() if not (node_failed or node.job.is_aborted()) else False
if node.job.is_aborted():
node.executed = node.job.task_id() if not node.skip_children_on_abort else False
elif node_failed:
node.executed = node.job.task_id() if not node.skip_children_on_fail else False
else:
node.executed = node.job.task_id()
if j in launched_nodes:
launched_nodes.remove(j)
# check if we need to stop all running steps
@ -3566,7 +3666,14 @@ class PipelineDecorator(PipelineController):
else:
self._final_failure[node.name] = True
completed_jobs.append(j)
node.executed = node.job.task_id() if not (node_failed or node.job.is_aborted()) else False
if node.job.is_aborted():
node.executed = node.job.task_id() if not node.skip_children_on_abort else False
elif node_failed:
node.executed = node.job.task_id() if not node.skip_children_on_fail else False
else:
node.executed = node.job.task_id()
if j in launched_nodes:
launched_nodes.remove(j)
# check if we need to stop all running steps
@ -3820,6 +3927,8 @@ class PipelineDecorator(PipelineController):
def _wait_for_node(cls, node):
pool_period = 5.0 if cls._debug_execute_step_process else 20.0
while True:
if not node.job:
break
node.job.wait(pool_period=pool_period, aborted_nonresponsive_as_running=True)
job_status = str(node.job.status(force=True))
if (
@ -3865,7 +3974,8 @@ class PipelineDecorator(PipelineController):
tags=None, # type: Optional[Union[str, Sequence[str]]]
output_uri=None, # type: Optional[Union[str, bool]]
draft=False, # type: Optional[bool]
working_dir=None # type: Optional[str]
working_dir=None, # type: Optional[str]
continue_behaviour=None # type: Optional[dict]
):
# type: (...) -> Callable
"""
@ -3887,9 +3997,10 @@ class PipelineDecorator(PipelineController):
have been executed successfully.
:param execution_queue: Optional, the queue to use for executing this specific step.
If not provided, the task will be sent to the pipeline's default execution queue
:param continue_on_fail: (default False). If True, a failed step will not cause the pipeline to stop
:param continue_on_fail: (Deprecated, use `continue_behaviour` instead).
If True, failed step will not cause the pipeline to stop
(or marked as failed). Notice, that steps that are connected (or indirectly connected)
to the failed step will be skipped.
to the failed step will be skipped. Defaults to False
:param docker: Specify the docker image to be used when executing the pipeline step remotely
:param docker_args: Add docker execution arguments for the remote execution
(use single string for all docker arguments).
@ -4007,10 +4118,26 @@ class PipelineDecorator(PipelineController):
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:param draft: (default False). If True, the Task will be created as a draft task.
:param working_dir: Working directory to launch the step from.
:param continue_behaviour: Controls whether the pipeline will continue running after a
step failed/was aborted. Different behaviours can be set using a dictionary of boolean options. Supported options are:
- continue_on_fail - If True, the pipeline will continue even if the step failed.
If False, the pipeline will stop
- continue_on_abort - If True, the pipeline will continue even if the step was aborted.
If False, the pipeline will stop
- skip_children_on_fail - If True, the children of this step will be skipped if it failed.
If False, the children will run even if this step failed.
Any parameters passed from the failed step to its children will default to None
- skip_children_on_abort - If True, the children of this step will be skipped if it was aborted.
If False, the children will run even if this step was aborted.
Any parameters passed from the failed step to its children will default to None
If the keys are not present in the dictionary, their values will default to True
:return: function wrapper
"""
def decorator_wrap(func):
if continue_on_fail:
warnings.warn("`continue_on_fail` is deprecated. Use `continue_behaviour` instead", DeprecationWarning)
# noinspection PyProtectedMember
unwrapped_func = CreateFromFunction._deep_extract_wrapped(func)
_name = name or str(unwrapped_func.__name__)
@ -4054,7 +4181,8 @@ class PipelineDecorator(PipelineController):
tags=tags,
output_uri=output_uri,
draft=draft,
working_dir=working_dir
working_dir=working_dir,
continue_behaviour=continue_behaviour
)
if cls._singleton:
@ -4222,8 +4350,13 @@ class PipelineDecorator(PipelineController):
except: # noqa
pass
# skipped job
if not _node.job:
return None
cls._wait_for_node(_node)
if (_node.job.is_failed() and not _node.continue_on_fail) or _node.job.is_aborted():
if (_node.job.is_failed() and not _node.continue_on_fail) or \
(_node.job.is_aborted() and not _node.job.continue_on_abort):
raise ValueError(
'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())
)
@ -4680,7 +4813,10 @@ class PipelineDecorator(PipelineController):
if not cls._singleton._abort_running_steps_on_failure:
for parent in _node.parents:
if cls._singleton._nodes[parent].status in ["failed", "aborted", "skipped"]:
parent = cls._singleton._nodes[parent]
if parent.status == "failed" and parent.skip_children_on_fail or \
parent.status == "aborted" and parent.skip_children_on_abort or \
parent.status == "skipped":
_node.skip_job = True
return

View File

@ -524,7 +524,7 @@ if __name__ == '__main__':
if artifact_name in parent_task.artifacts:
kwargs[k] = parent_task.artifacts[artifact_name].get(deserialization_function={artifact_deserialization_function_name})
else:
kwargs[k] = parent_task.get_parameters(cast=True)[return_section + '/' + artifact_name]
kwargs[k] = parent_task.get_parameters(cast=True).get(return_section + '/' + artifact_name)
results = {function_name}(**kwargs)
result_names = {function_return}
if result_names: