Support pipeline retrying failing tasks/steps

This commit is contained in:
allegroai 2022-09-06 20:52:50 +03:00
parent 5228b799c1
commit 447714eaa4

View File

@ -11,7 +11,7 @@ from logging import getLogger
from multiprocessing import Process, Queue from multiprocessing import Process, Queue
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from threading import Thread, Event, RLock, current_thread from threading import Thread, Event, RLock, current_thread
from time import time from time import time, sleep
from typing import Sequence, Optional, Mapping, Callable, Any, List, Dict, Union, Tuple from typing import Sequence, Optional, Mapping, Callable, Any, List, Dict, Union, Tuple
from attr import attrib, attrs from attr import attrib, attrs
@ -59,6 +59,7 @@ class PipelineController(object):
_report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details') _report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details')
_evaluated_return_values = {} # TID: pipeline_name _evaluated_return_values = {} # TID: pipeline_name
_add_to_evaluated_return_values = {} # TID: bool _add_to_evaluated_return_values = {} # TID: bool
_retries_left = {} # Node.name: int
valid_job_status = ["failed", "cached", "completed", "aborted", "queued", "running", "skipped", "pending"] valid_job_status = ["failed", "cached", "completed", "aborted", "queued", "running", "skipped", "pending"]
@ -132,6 +133,7 @@ class PipelineController(object):
auto_version_bump=True, # type: bool auto_version_bump=True, # type: bool
abort_on_failure=False, # type: bool abort_on_failure=False, # type: bool
add_run_number=True, # type: bool add_run_number=True, # type: bool
retry_on_failure=None # type: Optional[int]
): ):
# type: (...) -> None # type: (...) -> None
""" """
@ -157,6 +159,8 @@ class PipelineController(object):
and mark the pipeline as failed. and mark the pipeline as failed.
:param add_run_number: If True (default), add the run number of the pipeline to the pipeline name. :param add_run_number: If True (default), add the run number of the pipeline to the pipeline name.
Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2" Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2"
:param retry_on_failure: In case of node failure, retry the node the number of times
indicated by this parameter.
""" """
self._nodes = {} self._nodes = {}
self._running_nodes = [] self._running_nodes = []
@ -222,6 +226,7 @@ class PipelineController(object):
self._monitored_nodes = {} # type: Dict[str, dict] self._monitored_nodes = {} # type: Dict[str, dict]
self._abort_running_steps_on_failure = abort_on_failure self._abort_running_steps_on_failure = abort_on_failure
self._retry_on_failure = retry_on_failure
# add direct link to the pipeline page # add direct link to the pipeline page
if self._pipeline_as_sub_project and self._task: if self._pipeline_as_sub_project and self._task:
@ -276,6 +281,7 @@ class PipelineController(object):
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
cache_executed_step=False, # type: bool cache_executed_step=False, # type: bool
base_task_factory=None, # type: Optional[Callable[[PipelineController.Node], Task]] base_task_factory=None, # type: Optional[Callable[[PipelineController.Node], Task]]
retry_on_failure=None # type: int
): ):
# type: (...) -> bool # type: (...) -> bool
""" """
@ -394,7 +400,8 @@ class PipelineController(object):
If `clone_base_task` is False there is no cloning, hence the base_task is used. If `clone_base_task` is False there is no cloning, hence the base_task is used.
:param base_task_factory: Optional, instead of providing a pre-existing Task, :param base_task_factory: Optional, instead of providing a pre-existing Task,
provide a Callable function to create the Task (returns Task object) provide a Callable function to create the Task (returns Task object)
:param retry_on_failure: In case of node failure, retry the node the number of times
indicated by this parameter.
:return: True if successful :return: True if successful
""" """
@ -458,6 +465,7 @@ class PipelineController(object):
monitor_artifacts=monitor_artifacts or [], monitor_artifacts=monitor_artifacts or [],
monitor_models=monitor_models or [], monitor_models=monitor_models or [],
) )
self._retries_left[name] = retry_on_failure or self._retry_on_failure or 0
if self._task and not self._task.running_locally(): if self._task and not self._task.running_locally():
self.update_execution_plot() self.update_execution_plot()
@ -493,6 +501,7 @@ class PipelineController(object):
pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
cache_executed_step=False, # type: bool cache_executed_step=False, # type: bool
retry_on_failure=None # type: Optional[int]
): ):
# type: (...) -> bool # type: (...) -> bool
""" """
@ -626,6 +635,8 @@ class PipelineController(object):
Default: False, a new cloned copy of base_task is always used. Default: False, a new cloned copy of base_task is always used.
Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.
:param retry_on_failure: In case of node failure, retry the node the number of times
indicated by this parameter.
:return: True if successful :return: True if successful
""" """
# always store callback functions (even when running remotely) # always store callback functions (even when running remotely)
@ -718,6 +729,7 @@ class PipelineController(object):
monitor_models=monitor_models, monitor_models=monitor_models,
job_code_section=job_code_section, job_code_section=job_code_section,
) )
self._retries_left[name] = retry_on_failure or self._retry_on_failure or 0
return True return True
@ -1975,8 +1987,15 @@ class PipelineController(object):
if not node.job: if not node.job:
continue continue
if node.job.is_stopped(aborted_nonresponsive_as_running=True): if node.job.is_stopped(aborted_nonresponsive_as_running=True):
completed_jobs.append(j)
node_failed = node.job.is_failed() node_failed = node.job.is_failed()
if node_failed and self._retries_left.get(node.name) and self._retries_left[node.name] > 0:
self._task.get_logger().report_text("Node '{}' failed. Retrying...".format(node.name))
node.job = None
node.executed = None
self._running_nodes.remove(j)
self._retries_left[node.name] -= 1
continue
completed_jobs.append(j)
node.executed = node.job.task_id() if not node_failed else False node.executed = node.job.task_id() if not node_failed else False
if j in launched_nodes: if j in launched_nodes:
launched_nodes.remove(j) launched_nodes.remove(j)
@ -2526,6 +2545,7 @@ class PipelineDecorator(PipelineController):
target_project=None, # type: Optional[str] target_project=None, # type: Optional[str]
abort_on_failure=False, # type: bool abort_on_failure=False, # type: bool
add_run_number=True, # type: bool add_run_number=True, # type: bool
retry_on_failure=None # type: Optional[int]
): ):
# type: (...) -> () # type: (...) -> ()
""" """
@ -2547,6 +2567,8 @@ class PipelineDecorator(PipelineController):
and mark the pipeline as failed. and mark the pipeline as failed.
:param add_run_number: If True (default), add the run number of the pipeline to the pipeline name. :param add_run_number: If True (default), add the run number of the pipeline to the pipeline name.
Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2" Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2"
:param retry_on_failure: In case of node failure, retry the node the number of times
indicated by this parameter.
""" """
super(PipelineDecorator, self).__init__( super(PipelineDecorator, self).__init__(
name=name, name=name,
@ -2557,6 +2579,7 @@ class PipelineDecorator(PipelineController):
target_project=target_project, target_project=target_project,
abort_on_failure=abort_on_failure, abort_on_failure=abort_on_failure,
add_run_number=add_run_number, add_run_number=add_run_number,
retry_on_failure=retry_on_failure
) )
# if we are in eager execution, make sure parent class knows it # if we are in eager execution, make sure parent class knows it
@ -2610,12 +2633,11 @@ class PipelineDecorator(PipelineController):
if not node.job: if not node.job:
continue continue
if node.job.is_stopped(aborted_nonresponsive_as_running=True): if node.job.is_stopped(aborted_nonresponsive_as_running=True):
completed_jobs.append(j)
node_failed = node.job.is_failed() node_failed = node.job.is_failed()
completed_jobs.append(j)
node.executed = node.job.task_id() if not node_failed else False node.executed = node.job.task_id() if not node_failed else False
if j in launched_nodes: if j in launched_nodes:
launched_nodes.remove(j) launched_nodes.remove(j)
# check if we need to stop all running steps # check if we need to stop all running steps
if node_failed and self._abort_running_steps_on_failure and not node.continue_on_fail: if node_failed and self._abort_running_steps_on_failure and not node.continue_on_fail:
nodes_failed_stop_pipeline.append(node.name) nodes_failed_stop_pipeline.append(node.name)
@ -2865,7 +2887,8 @@ class PipelineDecorator(PipelineController):
helper_functions=None, # type: Optional[Sequence[Callable]] helper_functions=None, # type: Optional[Sequence[Callable]]
monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
monitor_models=None # type: Optional[List[Union[str, Tuple[str, str]]]] monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
retry_on_failure=None # type: Optional[int]
): ):
# type: (...) -> Callable # type: (...) -> Callable
""" """
@ -2939,6 +2962,8 @@ class PipelineDecorator(PipelineController):
where the first string is the model name as it appears on the component Task, where the first string is the model name as it appears on the component Task,
and the second is the target model name to put on the Pipeline Task and the second is the target model name to put on the Pipeline Task
Example: [('model_weights', 'final_model_weights'), ] Example: [('model_weights', 'final_model_weights'), ]
:param retry_on_failure: In case of node failure, retry the node the number of times
indicated by this parameter.
:return: function wrapper :return: function wrapper
""" """
@ -3088,10 +3113,12 @@ class PipelineDecorator(PipelineController):
# get node and park is as launched # get node and park is as launched
cls._singleton._launched_step_names.add(_node_name) cls._singleton._launched_step_names.add(_node_name)
_node = cls._singleton._nodes[_node_name] _node = cls._singleton._nodes[_node_name]
cls._retries_left[_node_name] = \
retry_on_failure or (cls._singleton._retry_on_failure if cls._singleton else 0) or 0
# The actual launch is a bit slow, we run it in the background # The actual launch is a bit slow, we run it in the background
launch_thread = Thread( launch_thread = Thread(
target=cls._component_launch, target=cls._component_launch_with_failover,
args=(_node_name, _node, kwargs_artifacts, kwargs, current_thread().ident)) args=(_node_name, _node, kwargs_artifacts, kwargs, current_thread().ident))
def results_reference(return_name): def results_reference(return_name):
@ -3101,15 +3128,11 @@ class PipelineDecorator(PipelineController):
launch_thread.join() launch_thread.join()
except: # noqa except: # noqa
pass pass
# wait until job is completed
if not _node.job: if not _node.job:
if not _node.executed: if not _node.executed:
raise ValueError("Job was not created and is also not cached/executed") raise ValueError("Job was not created and is also not cached/executed")
return "{}.{}".format(_node.executed, return_name) return "{}.{}".format(_node.executed, return_name)
# wait in seconds
_node.job.wait(pool_period=5. if cls._debug_execute_step_process else 20.,
aborted_nonresponsive_as_running=True)
if _node.job.is_failed() and not _node.continue_on_fail: if _node.job.is_failed() and not _node.continue_on_fail:
raise ValueError( raise ValueError(
'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())) 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()))
@ -3124,11 +3147,10 @@ class PipelineDecorator(PipelineController):
launch_thread.join() launch_thread.join()
except: # noqa except: # noqa
pass pass
# wait until job is completed if (_node.job.is_failed() and not _node.continue_on_fail) or _node.job.is_aborted():
_node.job.wait(pool_period=5. if cls._debug_execute_step_process else 20.)
if _node.job.is_failed():
raise ValueError( raise ValueError(
'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())) 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())
)
_node.executed = _node.job.task_id() _node.executed = _node.job.task_id()
@ -3174,7 +3196,8 @@ class PipelineDecorator(PipelineController):
multi_instance_support=False, # type: bool multi_instance_support=False, # type: bool
add_run_number=True, # type: bool add_run_number=True, # type: bool
args_map=None, # type: dict[str, List[str]] args_map=None, # type: dict[str, List[str]]
start_controller_locally=False # type: bool start_controller_locally=False, # type: bool
retry_on_failure=None # type: Optional[int]
): ):
# type: (...) -> Callable # type: (...) -> Callable
""" """
@ -3221,9 +3244,14 @@ class PipelineDecorator(PipelineController):
- paramB: sectionB/paramB - paramB: sectionB/paramB
- paramC: sectionB/paramC - paramC: sectionB/paramC
- paramD: Args/paramD - paramD: Args/paramD
:param relaunch_on_instance_failure: (Deprecated) If True, check if the machine a pipeline step ran on
was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported.
Default: False
:param start_controller_locally: If True, start the controller on the local machine. The steps will run :param start_controller_locally: If True, start the controller on the local machine. The steps will run
remotely if `PipelineDecorator.run_locally` or `PipelineDecorator.debug_pipeline` are not called. remotely if `PipelineDecorator.run_locally` or `PipelineDecorator.debug_pipeline` are not called.
Default: False Default: False
:param retry_on_failure: In case of node failure, retry the node the number of times
indicated by this parameter.
""" """
def decorator_wrap(func): def decorator_wrap(func):
@ -3260,6 +3288,7 @@ class PipelineDecorator(PipelineController):
target_project=target_project, target_project=target_project,
abort_on_failure=abort_on_failure, abort_on_failure=abort_on_failure,
add_run_number=add_run_number, add_run_number=add_run_number,
retry_on_failure=retry_on_failure
) )
ret_val = func(**pipeline_kwargs) ret_val = func(**pipeline_kwargs)
LazyEvalWrapper.trigger_all_remote_references() LazyEvalWrapper.trigger_all_remote_references()
@ -3301,6 +3330,7 @@ class PipelineDecorator(PipelineController):
target_project=target_project, target_project=target_project,
abort_on_failure=abort_on_failure, abort_on_failure=abort_on_failure,
add_run_number=add_run_number, add_run_number=add_run_number,
retry_on_failure=retry_on_failure
) )
a_pipeline._args_map = args_map or {} a_pipeline._args_map = args_map or {}
@ -3325,7 +3355,6 @@ class PipelineDecorator(PipelineController):
a_pipeline._task._set_runtime_properties( a_pipeline._task._set_runtime_properties(
dict(multi_pipeline_counter=str(cls._multi_pipeline_call_counter))) dict(multi_pipeline_counter=str(cls._multi_pipeline_call_counter)))
# serialize / deserialize state only if we are running locally
a_pipeline._start(wait=False) a_pipeline._start(wait=False)
# sync arguments back (post deserialization and casting back) # sync arguments back (post deserialization and casting back)
@ -3445,6 +3474,26 @@ class PipelineDecorator(PipelineController):
""" """
return cls._wait_for_multi_pipelines() return cls._wait_for_multi_pipelines()
@classmethod
def _component_launch_with_failover(cls, node_name, node, kwargs_artifacts, kwargs, tid):
cls._component_launch(node_name, node, kwargs_artifacts, kwargs, tid)
while True:
if node.job:
node.job.wait(pool_period=5. if cls._debug_execute_step_process else 20.,
aborted_nonresponsive_as_running=True)
else:
sleep(2)
continue
if node.job.is_failed() and cls._retries_left.get(node_name) and cls._retries_left[node_name] > 0:
if cls._singleton and cls._singleton._task:
cls._singleton._task.get_logger().report_text("Node '{}' failed. Retrying...".format(node_name))
node.job = None
node.executed = None
cls._component_launch(node_name, node, kwargs_artifacts, kwargs, tid)
cls._retries_left[node_name] -= 1
else:
break
@classmethod @classmethod
def _component_launch(cls, node_name, node, kwargs_artifacts, kwargs, tid): def _component_launch(cls, node_name, node, kwargs_artifacts, kwargs, tid):
_node_name = node_name _node_name = node_name