From 447714eaa4ac09b4d44a41bfa31da3b1a23c52fe Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 6 Sep 2022 20:52:50 +0300 Subject: [PATCH] Support pipeline retrying failing tasks/steps --- clearml/automation/controller.py | 83 +++++++++++++++++++++++++------- 1 file changed, 66 insertions(+), 17 deletions(-) diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index abfe50dd..928412e9 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -11,7 +11,7 @@ from logging import getLogger from multiprocessing import Process, Queue from multiprocessing.pool import ThreadPool from threading import Thread, Event, RLock, current_thread -from time import time +from time import time, sleep from typing import Sequence, Optional, Mapping, Callable, Any, List, Dict, Union, Tuple from attr import attrib, attrs @@ -59,6 +59,7 @@ class PipelineController(object): _report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details') _evaluated_return_values = {} # TID: pipeline_name _add_to_evaluated_return_values = {} # TID: bool + _retries_left = {} # Node.name: int valid_job_status = ["failed", "cached", "completed", "aborted", "queued", "running", "skipped", "pending"] @@ -132,6 +133,7 @@ class PipelineController(object): auto_version_bump=True, # type: bool abort_on_failure=False, # type: bool add_run_number=True, # type: bool + retry_on_failure=None # type: Optional[int] ): # type: (...) -> None """ @@ -157,6 +159,8 @@ class PipelineController(object): and mark the pipeline as failed. :param add_run_number: If True (default), add the run number of the pipeline to the pipeline name. Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2" + :param retry_on_failure: In case of node failure, retry the node the number of times + indicated by this parameter. """ self._nodes = {} self._running_nodes = [] @@ -222,6 +226,7 @@ class PipelineController(object): self._monitored_nodes = {} # type: Dict[str, dict] self._abort_running_steps_on_failure = abort_on_failure + self._retry_on_failure = retry_on_failure # add direct link to the pipeline page if self._pipeline_as_sub_project and self._task: @@ -276,6 +281,7 @@ class PipelineController(object): post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa cache_executed_step=False, # type: bool base_task_factory=None, # type: Optional[Callable[[PipelineController.Node], Task]] + retry_on_failure=None # type: int ): # type: (...) -> bool """ @@ -394,7 +400,8 @@ class PipelineController(object): If `clone_base_task` is False there is no cloning, hence the base_task is used. :param base_task_factory: Optional, instead of providing a pre-existing Task, provide a Callable function to create the Task (returns Task object) - + :param retry_on_failure: In case of node failure, retry the node the number of times + indicated by this parameter. :return: True if successful """ @@ -458,6 +465,7 @@ class PipelineController(object): monitor_artifacts=monitor_artifacts or [], monitor_models=monitor_models or [], ) + self._retries_left[name] = retry_on_failure or self._retry_on_failure or 0 if self._task and not self._task.running_locally(): self.update_execution_plot() @@ -493,6 +501,7 @@ class PipelineController(object): pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa cache_executed_step=False, # type: bool + retry_on_failure=None # type: Optional[int] ): # type: (...) -> bool """ @@ -626,6 +635,8 @@ class PipelineController(object): Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. + :param retry_on_failure: In case of node failure, retry the node the number of times + indicated by this parameter. :return: True if successful """ # always store callback functions (even when running remotely) @@ -718,6 +729,7 @@ class PipelineController(object): monitor_models=monitor_models, job_code_section=job_code_section, ) + self._retries_left[name] = retry_on_failure or self._retry_on_failure or 0 return True @@ -1975,8 +1987,15 @@ class PipelineController(object): if not node.job: continue if node.job.is_stopped(aborted_nonresponsive_as_running=True): - completed_jobs.append(j) node_failed = node.job.is_failed() + if node_failed and self._retries_left.get(node.name) and self._retries_left[node.name] > 0: + self._task.get_logger().report_text("Node '{}' failed. Retrying...".format(node.name)) + node.job = None + node.executed = None + self._running_nodes.remove(j) + self._retries_left[node.name] -= 1 + continue + completed_jobs.append(j) node.executed = node.job.task_id() if not node_failed else False if j in launched_nodes: launched_nodes.remove(j) @@ -2526,6 +2545,7 @@ class PipelineDecorator(PipelineController): target_project=None, # type: Optional[str] abort_on_failure=False, # type: bool add_run_number=True, # type: bool + retry_on_failure=None # type: Optional[int] ): # type: (...) -> () """ @@ -2547,6 +2567,8 @@ class PipelineDecorator(PipelineController): and mark the pipeline as failed. :param add_run_number: If True (default), add the run number of the pipeline to the pipeline name. Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2" + :param retry_on_failure: In case of node failure, retry the node the number of times + indicated by this parameter. """ super(PipelineDecorator, self).__init__( name=name, @@ -2557,6 +2579,7 @@ class PipelineDecorator(PipelineController): target_project=target_project, abort_on_failure=abort_on_failure, add_run_number=add_run_number, + retry_on_failure=retry_on_failure ) # if we are in eager execution, make sure parent class knows it @@ -2610,12 +2633,11 @@ class PipelineDecorator(PipelineController): if not node.job: continue if node.job.is_stopped(aborted_nonresponsive_as_running=True): - completed_jobs.append(j) node_failed = node.job.is_failed() + completed_jobs.append(j) node.executed = node.job.task_id() if not node_failed else False if j in launched_nodes: launched_nodes.remove(j) - # check if we need to stop all running steps if node_failed and self._abort_running_steps_on_failure and not node.continue_on_fail: nodes_failed_stop_pipeline.append(node.name) @@ -2865,7 +2887,8 @@ class PipelineDecorator(PipelineController): helper_functions=None, # type: Optional[Sequence[Callable]] monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] - monitor_models=None # type: Optional[List[Union[str, Tuple[str, str]]]] + monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]] + retry_on_failure=None # type: Optional[int] ): # type: (...) -> Callable """ @@ -2939,6 +2962,8 @@ class PipelineDecorator(PipelineController): where the first string is the model name as it appears on the component Task, and the second is the target model name to put on the Pipeline Task Example: [('model_weights', 'final_model_weights'), ] + :param retry_on_failure: In case of node failure, retry the node the number of times + indicated by this parameter. :return: function wrapper """ @@ -3088,10 +3113,12 @@ class PipelineDecorator(PipelineController): # get node and park is as launched cls._singleton._launched_step_names.add(_node_name) _node = cls._singleton._nodes[_node_name] + cls._retries_left[_node_name] = \ + retry_on_failure or (cls._singleton._retry_on_failure if cls._singleton else 0) or 0 # The actual launch is a bit slow, we run it in the background launch_thread = Thread( - target=cls._component_launch, + target=cls._component_launch_with_failover, args=(_node_name, _node, kwargs_artifacts, kwargs, current_thread().ident)) def results_reference(return_name): @@ -3101,15 +3128,11 @@ class PipelineDecorator(PipelineController): launch_thread.join() except: # noqa pass - # wait until job is completed if not _node.job: if not _node.executed: raise ValueError("Job was not created and is also not cached/executed") return "{}.{}".format(_node.executed, return_name) - # wait in seconds - _node.job.wait(pool_period=5. if cls._debug_execute_step_process else 20., - aborted_nonresponsive_as_running=True) if _node.job.is_failed() and not _node.continue_on_fail: raise ValueError( 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())) @@ -3124,11 +3147,10 @@ class PipelineDecorator(PipelineController): launch_thread.join() except: # noqa pass - # wait until job is completed - _node.job.wait(pool_period=5. if cls._debug_execute_step_process else 20.) - if _node.job.is_failed(): + if (_node.job.is_failed() and not _node.continue_on_fail) or _node.job.is_aborted(): raise ValueError( - 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())) + 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()) + ) _node.executed = _node.job.task_id() @@ -3174,7 +3196,8 @@ class PipelineDecorator(PipelineController): multi_instance_support=False, # type: bool add_run_number=True, # type: bool args_map=None, # type: dict[str, List[str]] - start_controller_locally=False # type: bool + start_controller_locally=False, # type: bool + retry_on_failure=None # type: Optional[int] ): # type: (...) -> Callable """ @@ -3221,9 +3244,14 @@ class PipelineDecorator(PipelineController): - paramB: sectionB/paramB - paramC: sectionB/paramC - paramD: Args/paramD + :param relaunch_on_instance_failure: (Deprecated) If True, check if the machine a pipeline step ran on + was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported. + Default: False :param start_controller_locally: If True, start the controller on the local machine. The steps will run remotely if `PipelineDecorator.run_locally` or `PipelineDecorator.debug_pipeline` are not called. Default: False + :param retry_on_failure: In case of node failure, retry the node the number of times + indicated by this parameter. """ def decorator_wrap(func): @@ -3260,6 +3288,7 @@ class PipelineDecorator(PipelineController): target_project=target_project, abort_on_failure=abort_on_failure, add_run_number=add_run_number, + retry_on_failure=retry_on_failure ) ret_val = func(**pipeline_kwargs) LazyEvalWrapper.trigger_all_remote_references() @@ -3301,6 +3330,7 @@ class PipelineDecorator(PipelineController): target_project=target_project, abort_on_failure=abort_on_failure, add_run_number=add_run_number, + retry_on_failure=retry_on_failure ) a_pipeline._args_map = args_map or {} @@ -3325,7 +3355,6 @@ class PipelineDecorator(PipelineController): a_pipeline._task._set_runtime_properties( dict(multi_pipeline_counter=str(cls._multi_pipeline_call_counter))) - # serialize / deserialize state only if we are running locally a_pipeline._start(wait=False) # sync arguments back (post deserialization and casting back) @@ -3445,6 +3474,26 @@ class PipelineDecorator(PipelineController): """ return cls._wait_for_multi_pipelines() + @classmethod + def _component_launch_with_failover(cls, node_name, node, kwargs_artifacts, kwargs, tid): + cls._component_launch(node_name, node, kwargs_artifacts, kwargs, tid) + while True: + if node.job: + node.job.wait(pool_period=5. if cls._debug_execute_step_process else 20., + aborted_nonresponsive_as_running=True) + else: + sleep(2) + continue + if node.job.is_failed() and cls._retries_left.get(node_name) and cls._retries_left[node_name] > 0: + if cls._singleton and cls._singleton._task: + cls._singleton._task.get_logger().report_text("Node '{}' failed. Retrying...".format(node_name)) + node.job = None + node.executed = None + cls._component_launch(node_name, node, kwargs_artifacts, kwargs, tid) + cls._retries_left[node_name] -= 1 + else: + break + @classmethod def _component_launch(cls, node_name, node, kwargs_artifacts, kwargs, tid): _node_name = node_name