Fix Pipeline component is treated as aborted if running on instance that was killed externally (e.g. spot instances dying)

This commit is contained in:
allegroai 2022-08-20 22:56:53 +03:00
parent 1cc87c9a21
commit 5a83aa433d
3 changed files with 51 additions and 21 deletions

View File

@ -1997,7 +1997,7 @@ class PipelineController(object):
node = self._nodes[j]
if not node.job:
continue
if node.job.is_stopped():
if node.job.is_stopped(aborted_nonresponsive_as_running=True):
node_failed = node.job.is_failed()
node.executed = node.job.task_id() if not node_failed else False
if j in launched_nodes:
@ -2256,7 +2256,7 @@ class PipelineController(object):
self._task._edit(models=models)
# update the state (so that we do not scan the node twice)
if node.job.is_stopped():
if node.job.is_stopped(aborted_nonresponsive_as_running=True):
self._monitored_nodes[node.name]['completed'] = True
def _get_target_project(self, return_project_id=False):
@ -2546,7 +2546,10 @@ class PipelineController(object):
"""
if not boto3 or not self._relaunch_on_instance_failure:
return False
worker = node.job.worker().split(":")[-1]
worker = (node.job.worker() or "").split(":")[-1]
if not worker:
return False
if (worker, node.name) in self._relaunch_check_cache:
return self._relaunch_check_cache[(worker, node.name)]
# get credentials from all autoscalers (shouldn't be too many)
@ -2697,7 +2700,7 @@ class PipelineDecorator(PipelineController):
node = self._nodes[j]
if not node.job:
continue
if node.job.is_stopped():
if node.job.is_stopped(aborted_nonresponsive_as_running=True):
node_failed = node.job.is_failed()
if (node_failed or node.job.is_aborted()) and self._should_relaunch_node(node):
continue
@ -2935,7 +2938,7 @@ class PipelineDecorator(PipelineController):
def component(
cls,
_func=None, *,
return_values=('return_object', ), # type: Union[str, List[str]]
return_values=('return_object', ), # type: Union[str, Sequence[str]]
name=None, # type: Optional[str]
cache=False, # type: bool
packages=None, # type: Optional[Union[str, Sequence[str]]]
@ -3206,7 +3209,8 @@ class PipelineDecorator(PipelineController):
raise ValueError("Job was not created and is also not cached/executed")
return "{}.{}".format(_node.executed, return_name)
_node.job.wait(pool_period=1 if cls._debug_execute_step_process else 5)
_node.job.wait(pool_period=1 if cls._debug_execute_step_process else 5,
aborted_nonresponsive_as_running=True)
if _node.job.is_failed() and not _node.continue_on_fail:
raise ValueError(
'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()))
@ -3224,7 +3228,8 @@ class PipelineDecorator(PipelineController):
while True:
# wait until job is completed
if _node.job:
_node.job.wait(pool_period=1 if cls._debug_execute_step_process else 5)
_node.job.wait(pool_period=1 if cls._debug_execute_step_process else 5,
aborted_nonresponsive_as_running=True)
else:
sleep(2)
continue
@ -3486,9 +3491,10 @@ class PipelineDecorator(PipelineController):
while waited:
waited = False
for node in list(a_pipeline._nodes.values()):
if node.executed or not node.job or node.job.is_stopped():
if node.executed or not node.job or node.job.is_stopped(aborted_nonresponsive_as_running=True):
continue
node.job.wait(pool_period=1 if cls._debug_execute_step_process else 5)
node.job.wait(pool_period=1 if cls._debug_execute_step_process else 5,
aborted_nonresponsive_as_running=True)
waited = True
# store the pipeline result of we have any:
if return_value and pipeline_result is not None:

View File

@ -163,22 +163,25 @@ class BaseJob(object):
self._last_status_ts = time()
return self._last_status
def wait(self, timeout=None, pool_period=30.):
# type: (Optional[float], float) -> bool
def wait(self, timeout=None, pool_period=30., aborted_nonresponsive_as_running=False):
# type: (Optional[float], float, bool) -> bool
"""
Wait until the task is fully executed (i.e., aborted/completed/failed)
:param timeout: maximum time (minutes) to wait for Task to finish
:param pool_period: check task status every pool_period seconds
:param aborted_nonresponsive_as_running: (default: False) If True, ignore the stopped state if the backend
non-responsive watchdog sets this Task to stopped. This scenario could happen if
an instance running the job is killed without warning (e.g. spot instances)
:return: True, if Task finished.
"""
tic = time()
while timeout is None or time() - tic < timeout * 60.:
if self.is_stopped():
if self.is_stopped(aborted_nonresponsive_as_running=aborted_nonresponsive_as_running):
return True
sleep(pool_period)
return self.is_stopped()
return self.is_stopped(aborted_nonresponsive_as_running=aborted_nonresponsive_as_running)
def get_console_output(self, number_of_reports=1):
# type: (int) -> Sequence[str]
@ -192,7 +195,7 @@ class BaseJob(object):
return self.task.get_reported_console_output(number_of_reports=number_of_reports)
def worker(self):
# type: () -> str
# type: () -> Optional[str]
"""
Return the current worker id executing this Job. If job is pending, returns None
@ -216,16 +219,35 @@ class BaseJob(object):
"""
return self.status() == Task.TaskStatusEnum.in_progress
def is_stopped(self):
# type: () -> bool
def is_stopped(self, aborted_nonresponsive_as_running=False):
# type: (bool) -> bool
"""
Return True, if job finished executing (for any reason)
:param aborted_nonresponsive_as_running: (default: False) If True, ignore the stopped state if the backend
non-responsive watchdog sets this Task to stopped. This scenario could happen if
an instance running the job is killed without warning (e.g. spot instances)
:return: True the task is currently one of these states, stopped / completed / failed / published.
"""
return self.status() in (
Task.TaskStatusEnum.stopped, Task.TaskStatusEnum.completed,
Task.TaskStatusEnum.failed, Task.TaskStatusEnum.published)
task_status = self.status()
# check if we are Not in any of the non-running states
if task_status not in (Task.TaskStatusEnum.stopped, Task.TaskStatusEnum.completed,
Task.TaskStatusEnum.failed, Task.TaskStatusEnum.published):
return False
# notice the status update also refresh the "status_message" field on the Task
# if we are stopped but the message says "non-responsive" it means for some reason the
# Task's instance was killed, we should ignore it if requested because we assume someone will bring it back
if aborted_nonresponsive_as_running and task_status == Task.TaskStatusEnum.stopped and \
str(self.task.data.status_message).lower() == "forced stop (non-responsive)":
# if we are here it means the state is "stopped" but we should ignore it
# because the non-responsive watchdog set it. We assume someone (autoscaler) will relaunch it.
return False
else:
# if we do not need to ignore the nonactive state, it means this Task stopped
return True
def is_failed(self):
# type: () -> bool

View File

@ -1637,16 +1637,18 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
def get_status(self):
# type: () -> str
"""
Return The task status without refreshing the entire Task object object (only the status property)
Return The task status without refreshing the entire Task object (only the status property)
TaskStatusEnum: ["created", "in_progress", "stopped", "closed", "failed", "completed",
"queued", "published", "publishing", "unknown"]
:return: str: Task status as string (TaskStatusEnum)
"""
status = self._get_status()[0]
status, status_message = self._get_status()
if self._data:
self._data.status = status
self._data.status_message = str(status_message)
return str(status)
def get_output_log_web_page(self):