Fix pipeline retry mechanism interface

This commit is contained in:
allegroai 2022-09-26 23:27:13 +03:00
parent 77f25b82bd
commit dc3258ab0b

View File

@ -59,7 +59,7 @@ class PipelineController(object):
_report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details')
_evaluated_return_values = {} # TID: pipeline_name
_add_to_evaluated_return_values = {} # TID: bool
_retries_left = {} # Node.name: int
_retries = {} # Node.name: int
_retries_callbacks = {} # Node.name: Callable[[PipelineController, PipelineController.Node, int], bool] # noqa
valid_job_status = ["failed", "cached", "completed", "aborted", "queued", "running", "skipped", "pending"]
@ -72,7 +72,7 @@ class PipelineController(object):
queue = attrib(type=str, default=None) # execution queue name to use
parents = attrib(type=list, default=None) # list of parent DAG steps
timeout = attrib(type=float, default=None) # execution timeout limit
parameters = attrib(type=dict, default=None) # Task hyper parameters to change
parameters = attrib(type=dict, default=None) # Task hyper-parameters to change
configurations = attrib(type=dict, default=None) # Task configuration objects to change
task_overrides = attrib(type=dict, default=None) # Task overrides to change
executed = attrib(type=str, default=None) # The actual executed Task ID (None if not executed yet)
@ -134,8 +134,7 @@ class PipelineController(object):
auto_version_bump=True, # type: bool
abort_on_failure=False, # type: bool
add_run_number=True, # type: bool
retry_on_failure=None, # type: Optional[int]
retry_on_failure_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node, int], bool]] # noqa
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
):
# type: (...) -> None
"""
@ -161,23 +160,22 @@ class PipelineController(object):
and mark the pipeline as failed.
:param add_run_number: If True (default), add the run number of the pipeline to the pipeline name.
Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2"
:param retry_on_failure: In case of node failure, retry the node the number of times
indicated by this parameter.
:param retry_on_failure_callback: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int
representing the number of retries left for the node that failed
(default 0, can be set with `retry_on_failure` parameter).
The function must return a `bool`: True if the node should be retried and False otherwise.
If True, the node will be requeued and the number of retries left will be decremented by 1.
By default, if this callback is not specified, the function will be retried the number of
times indicated by `retry_on_failure`.
:param retry_on_failure: Integer (number of retries) or Callback function that returns True to allow a retry
- Integer: In case of node failure, retry the node the number of times indicated by this parameter.
- Callable: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int
representing the number of previous retries for the node that failed
The function must return a `bool`: True if the node should be retried and False otherwise.
If True, the node will be re-queued and the number of retries left will be decremented by 1.
By default, if this callback is not specified, the function will be retried the number of
times indicated by `retry_on_failure`.
.. code-block:: py
.. code-block:: py
def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# do something with the pipeline controller
return retries > 0
def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# do something with the pipeline controller
return retries < 5
"""
self._nodes = {}
self._running_nodes = []
@ -243,8 +241,9 @@ class PipelineController(object):
self._monitored_nodes = {} # type: Dict[str, dict]
self._abort_running_steps_on_failure = abort_on_failure
self._retry_on_failure = retry_on_failure
self._retry_on_failure_callback = retry_on_failure_callback or PipelineController._default_retry_on_failure_callback
self._def_max_retry_on_failure = retry_on_failure if isinstance(retry_on_failure, int) else 0
self._retry_on_failure_callback = retry_on_failure if callable(retry_on_failure) \
else self._default_retry_on_failure_callback
# add direct link to the pipeline page
if self._pipeline_as_sub_project and self._task:
@ -299,8 +298,7 @@ class PipelineController(object):
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
cache_executed_step=False, # type: bool
base_task_factory=None, # type: Optional[Callable[[PipelineController.Node], Task]]
retry_on_failure=None, # type: int
retry_on_failure_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node, int], bool]] # noqa
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
):
# type: (...) -> bool
"""
@ -419,23 +417,23 @@ class PipelineController(object):
If `clone_base_task` is False there is no cloning, hence the base_task is used.
:param base_task_factory: Optional, instead of providing a pre-existing Task,
provide a Callable function to create the Task (returns Task object)
:param retry_on_failure: In case of node failure, retry the node the number of times
indicated by this parameter.
:param retry_on_failure_callback: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int
representing the number of retries left for the node that failed
(default 0, can be set with `retry_on_failure` parameter).
The function must return a `bool`: True if the node should be retried and False otherwise.
If True, the node will be requeued and the number of retries left will be decremented by 1.
By default, if this callback is not specified, the function will be retried the number of
times indicated by `retry_on_failure`.
:param retry_on_failure: Integer (number of retries) or Callback function that returns True to allow a retry
- Integer: In case of node failure, retry the node the number of times indicated by this parameter.
- Callable: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int
representing the number of previous retries for the node that failed
The function must return a `bool`: True if the node should be retried and False otherwise.
If True, the node will be re-queued and the number of retries left will be decremented by 1.
By default, if this callback is not specified, the function will be retried the number of
times indicated by `retry_on_failure`.
.. code-block:: py
.. code-block:: py
def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# do something with the pipeline controller
return retries < 5
def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# do something with the pipeline controller
return retries > 0
:return: True if successful
"""
@ -499,12 +497,10 @@ class PipelineController(object):
monitor_artifacts=monitor_artifacts or [],
monitor_models=monitor_models or [],
)
self._retries_left[name] = retry_on_failure or self._retry_on_failure or 0
self._retries_callbacks[name] = (
retry_on_failure_callback
or self._retry_on_failure_callback
or PipelineController._default_retry_on_failure_callback
)
self._retries[name] = 0
self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \
(functools.partial(self._default_retry_on_failure_callback, max_retries=retry_on_failure)
if isinstance(retry_on_failure, int) else self._retry_on_failure_callback)
if self._task and not self._task.running_locally():
self.update_execution_plot()
@ -540,8 +536,7 @@ class PipelineController(object):
pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
cache_executed_step=False, # type: bool
retry_on_failure=None, # type: Optional[int]
retry_on_failure_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node, int], bool]] # noqa
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
):
# type: (...) -> bool
"""
@ -675,23 +670,23 @@ class PipelineController(object):
Default: False, a new cloned copy of base_task is always used.
Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.
:param retry_on_failure: In case of node failure, retry the node the number of times
indicated by this parameter.
:param retry_on_failure_callback: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int
representing the number of retries left for the node that failed
(default 0, can be set with `retry_on_failure` parameter).
The function must return a `bool`: True if the node should be retried and False otherwise.
If True, the node will be requeued and the number of retries left will be decremented by 1.
By default, if this callback is not specified, the function will be retried the number of
times indicated by `retry_on_failure`.
:param retry_on_failure: Integer (number of retries) or Callback function that returns True to allow a retry
- Integer: In case of node failure, retry the node the number of times indicated by this parameter.
- Callable: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int
representing the number of previous retries for the node that failed
The function must return a `bool`: True if the node should be retried and False otherwise.
If True, the node will be re-queued and the number of retries left will be decremented by 1.
By default, if this callback is not specified, the function will be retried the number of
times indicated by `retry_on_failure`.
.. code-block:: py
.. code-block:: py
def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# do something with the pipeline controller
return retries < 5
def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# do something with the pipeline controller
return retries > 0
:return: True if successful
"""
# always store callback functions (even when running remotely)
@ -784,12 +779,10 @@ class PipelineController(object):
monitor_models=monitor_models,
job_code_section=job_code_section,
)
self._retries_left[name] = retry_on_failure or self._retry_on_failure or 0
self._retries_callbacks[name] = (
retry_on_failure_callback
or self._retry_on_failure_callback
or PipelineController._default_retry_on_failure_callback
)
self._retries[name] = 0
self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \
(functools.partial(self._default_retry_on_failure_callback, max_retries=retry_on_failure)
if isinstance(retry_on_failure, int) else self._retry_on_failure_callback)
return True
@ -2048,14 +2041,16 @@ class PipelineController(object):
continue
if node.job.is_stopped(aborted_nonresponsive_as_running=True):
node_failed = node.job.is_failed()
if node_failed and self._retry_on_failure_callback(self, node, self._retries_left.get(node.name, 0)):
if node_failed and \
self._retry_on_failure_callback(self, node, self._retries.get(node.name, 0)):
self._task.get_logger().report_text("Node '{}' failed. Retrying...".format(node.name))
node.job = None
node.executed = None
self._running_nodes.remove(j)
if node.name in self._retries_left:
self._retries_left[node.name] -= 1
self._retries[node.name] = self._retries.get(node.name, 0) + 1
continue
completed_jobs.append(j)
node.executed = node.job.task_id() if not node_failed else False
if j in launched_nodes:
@ -2582,9 +2577,8 @@ class PipelineController(object):
return '<a href="{}"> {} </a>'.format(task_link_template.format(project=project_id, task=task_id), task_id)
@staticmethod
def _default_retry_on_failure_callback(_pipeline_controller, _node, retries):
return retries > 0
def _default_retry_on_failure_callback(self, _pipeline_controller, _node, retries, max_retries=None):
return retries < (self._def_max_retry_on_failure if max_retries is None else max_retries)
class PipelineDecorator(PipelineController):
@ -2610,8 +2604,7 @@ class PipelineDecorator(PipelineController):
target_project=None, # type: Optional[str]
abort_on_failure=False, # type: bool
add_run_number=True, # type: bool
retry_on_failure=None, # type: Optional[int]
retry_on_failure_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node, int], bool]] # noqa
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
):
# type: (...) -> ()
"""
@ -2633,22 +2626,23 @@ class PipelineDecorator(PipelineController):
and mark the pipeline as failed.
:param add_run_number: If True (default), add the run number of the pipeline to the pipeline name.
Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2"
:param retry_on_failure: In case of node failure, retry the node the number of times
indicated by this parameter.
:param retry_on_failure_callback: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int
representing the number of retries left for the node that failed (default 0). The
function must return a `bool`: True if the node should be retried and False otherwise.
If True, the node will be requeued and the number of retries left will be decremented by 1.
By default, if this callback is not specified, the function will be retried the number of
times indicated by `retry_on_failure`.
:param retry_on_failure: Integer (number of retries) or Callback function that returns True to allow a retry
- Integer: In case of node failure, retry the node the number of times indicated by this parameter.
- Callable: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int
representing the number of previous retries for the node that failed
The function must return a `bool`: True if the node should be retried and False otherwise.
If True, the node will be re-queued and the number of retries left will be decremented by 1.
By default, if this callback is not specified, the function will be retried the number of
times indicated by `retry_on_failure`.
.. code-block:: py
.. code-block:: py
def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# do something with the pipeline controller
return retries < 5
def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# do something with the pipeline controller
return retries > 0
"""
super(PipelineDecorator, self).__init__(
name=name,
@ -2660,7 +2654,6 @@ class PipelineDecorator(PipelineController):
abort_on_failure=abort_on_failure,
add_run_number=add_run_number,
retry_on_failure=retry_on_failure,
retry_on_failure_callback=retry_on_failure_callback
)
# if we are in eager execution, make sure parent class knows it
@ -2969,8 +2962,7 @@ class PipelineDecorator(PipelineController):
monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
retry_on_failure=None, # type: Optional[int]
retry_on_failure_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node, int], bool]] # noqa
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
):
# type: (...) -> Callable
"""
@ -3044,22 +3036,22 @@ class PipelineDecorator(PipelineController):
where the first string is the model name as it appears on the component Task,
and the second is the target model name to put on the Pipeline Task
Example: [('model_weights', 'final_model_weights'), ]
:param retry_on_failure: In case of node failure, retry the node the number of times
indicated by this parameter.
:param retry_on_failure_callback: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int
representing the number of retries left for the node that failed (default 0). The
function must return a `bool`: True if the node should be retried and False otherwise.
If True, the node will be requeued and the number of retries left will be decremented by 1.
By default, if this callback is not specified, the function will be retried the number of
times indicated by `retry_on_failure`.
:param retry_on_failure: Integer (number of retries) or Callback function that returns True to allow a retry
- Integer: In case of node failure, retry the node the number of times indicated by this parameter.
- Callable: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int
representing the number of previous retries for the node that failed
The function must return a `bool`: True if the node should be retried and False otherwise.
If True, the node will be re-queued and the number of retries left will be decremented by 1.
By default, if this callback is not specified, the function will be retried the number of
times indicated by `retry_on_failure`.
.. code-block:: py
.. code-block:: py
def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# do something with the pipeline controller
return retries > 0
def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# do something with the pipeline controller
return retries < 5
:return: function wrapper
"""
@ -3209,13 +3201,10 @@ class PipelineDecorator(PipelineController):
# get node and park is as launched
cls._singleton._launched_step_names.add(_node_name)
_node = cls._singleton._nodes[_node_name]
cls._retries_left[_node_name] = \
retry_on_failure or (cls._singleton._retry_on_failure if cls._singleton else 0) or 0
cls._retries_callbacks[_node_name] = retry_on_failure_callback or (
cls._singleton._retry_on_failure_callback
if cls._singleton
else cls._default_retry_on_failure_callback
)
cls._retries[_node_name] = 0
cls._retries_callbacks[_node_name] = retry_on_failure if callable(retry_on_failure) else \
(functools.partial(cls._singleton._default_retry_on_failure_callback, max_retries=retry_on_failure)
if isinstance(retry_on_failure, int) else cls._singleton._retry_on_failure_callback)
# The actual launch is a bit slow, we run it in the background
launch_thread = Thread(
@ -3298,8 +3287,7 @@ class PipelineDecorator(PipelineController):
add_run_number=True, # type: bool
args_map=None, # type: dict[str, List[str]]
start_controller_locally=False, # type: bool
retry_on_failure=None, # type: Optional[int]
retry_on_failure_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node, int], bool]] # noqa
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
):
# type: (...) -> Callable
"""
@ -3346,28 +3334,26 @@ class PipelineDecorator(PipelineController):
- paramB: sectionB/paramB
- paramC: sectionB/paramC
- paramD: Args/paramD
:param relaunch_on_instance_failure: (Deprecated) If True, check if the machine a pipeline step ran on
was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported.
Default: False
:param start_controller_locally: If True, start the controller on the local machine. The steps will run
remotely if `PipelineDecorator.run_locally` or `PipelineDecorator.debug_pipeline` are not called.
Default: False
:param retry_on_failure: In case of node failure, retry the node the number of times
indicated by this parameter.
:param retry_on_failure_callback: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int
representing the number of retries left for the node that failed (default 0). The
function must return a `bool`: True if the node should be retried and False otherwise.
If True, the node will be requeued and the number of retries left will be decremented by 1.
By default, if this callback is not specified, the function will be retried the number of
times indicated by `retry_on_failure`.
:param retry_on_failure: Integer (number of retries) or Callback function that returns True to allow a retry
- Integer: In case of node failure, retry the node the number of times indicated by this parameter.
- Callable: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int
representing the number of previous retries for the node that failed
The function must return a `bool`: True if the node should be retried and False otherwise.
If True, the node will be re-queued and the number of retries left will be decremented by 1.
By default, if this callback is not specified, the function will be retried the number of
times indicated by `retry_on_failure`.
.. code-block:: py
.. code-block:: py
def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# do something with the pipeline controller
return retries < 5
def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# do something with the pipeline controller
return retries > 0
"""
def decorator_wrap(func):
@ -3405,7 +3391,6 @@ class PipelineDecorator(PipelineController):
abort_on_failure=abort_on_failure,
add_run_number=add_run_number,
retry_on_failure=retry_on_failure,
retry_on_failure_callback=retry_on_failure_callback
)
ret_val = func(**pipeline_kwargs)
LazyEvalWrapper.trigger_all_remote_references()
@ -3448,7 +3433,6 @@ class PipelineDecorator(PipelineController):
abort_on_failure=abort_on_failure,
add_run_number=add_run_number,
retry_on_failure=retry_on_failure,
retry_on_failure_callback=retry_on_failure_callback
)
a_pipeline._args_map = args_map or {}
@ -3602,16 +3586,15 @@ class PipelineDecorator(PipelineController):
else:
sleep(2)
continue
if node.job.is_failed() and cls._retries_callbacks.get(node_name, cls._default_retry_on_failure_callback)(
cls._singleton, node, cls._retries_left.get(node_name, 0)
):
if node.job.is_failed() and node_name in cls._retries_callbacks and \
cls._retries_callbacks[node_name](cls._singleton, node, cls._retries.get(node_name, 0)):
if cls._singleton and cls._singleton._task:
cls._singleton._task.get_logger().report_text("Node '{}' failed. Retrying...".format(node_name))
node.job = None
node.executed = None
cls._retries[node_name] = cls._retries.get(node_name, 0) + 1
cls._component_launch(node_name, node, kwargs_artifacts, kwargs, tid)
if node_name in cls._retries_left:
cls._retries_left[node_name] -= 1
else:
break