diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index c5ba4691..80bd0d3d 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -1997,7 +1997,7 @@ class PipelineController(object): node = self._nodes[j] if not node.job: continue - if node.job.is_stopped(): + if node.job.is_stopped(aborted_nonresponsive_as_running=True): node_failed = node.job.is_failed() node.executed = node.job.task_id() if not node_failed else False if j in launched_nodes: @@ -2256,7 +2256,7 @@ class PipelineController(object): self._task._edit(models=models) # update the state (so that we do not scan the node twice) - if node.job.is_stopped(): + if node.job.is_stopped(aborted_nonresponsive_as_running=True): self._monitored_nodes[node.name]['completed'] = True def _get_target_project(self, return_project_id=False): @@ -2546,7 +2546,10 @@ class PipelineController(object): """ if not boto3 or not self._relaunch_on_instance_failure: return False - worker = node.job.worker().split(":")[-1] + worker = (node.job.worker() or "").split(":")[-1] + if not worker: + return False + if (worker, node.name) in self._relaunch_check_cache: return self._relaunch_check_cache[(worker, node.name)] # get credentials from all autoscalers (shouldn't be too many) @@ -2697,7 +2700,7 @@ class PipelineDecorator(PipelineController): node = self._nodes[j] if not node.job: continue - if node.job.is_stopped(): + if node.job.is_stopped(aborted_nonresponsive_as_running=True): node_failed = node.job.is_failed() if (node_failed or node.job.is_aborted()) and self._should_relaunch_node(node): continue @@ -2935,7 +2938,7 @@ class PipelineDecorator(PipelineController): def component( cls, _func=None, *, - return_values=('return_object', ), # type: Union[str, List[str]] + return_values=('return_object', ), # type: Union[str, Sequence[str]] name=None, # type: Optional[str] cache=False, # type: bool packages=None, # type: Optional[Union[str, Sequence[str]]] @@ -3206,7 +3209,8 @@ class PipelineDecorator(PipelineController): raise ValueError("Job was not created and is also not cached/executed") return "{}.{}".format(_node.executed, return_name) - _node.job.wait(pool_period=1 if cls._debug_execute_step_process else 5) + _node.job.wait(pool_period=1 if cls._debug_execute_step_process else 5, + aborted_nonresponsive_as_running=True) if _node.job.is_failed() and not _node.continue_on_fail: raise ValueError( 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())) @@ -3224,7 +3228,8 @@ class PipelineDecorator(PipelineController): while True: # wait until job is completed if _node.job: - _node.job.wait(pool_period=1 if cls._debug_execute_step_process else 5) + _node.job.wait(pool_period=1 if cls._debug_execute_step_process else 5, + aborted_nonresponsive_as_running=True) else: sleep(2) continue @@ -3486,9 +3491,10 @@ class PipelineDecorator(PipelineController): while waited: waited = False for node in list(a_pipeline._nodes.values()): - if node.executed or not node.job or node.job.is_stopped(): + if node.executed or not node.job or node.job.is_stopped(aborted_nonresponsive_as_running=True): continue - node.job.wait(pool_period=1 if cls._debug_execute_step_process else 5) + node.job.wait(pool_period=1 if cls._debug_execute_step_process else 5, + aborted_nonresponsive_as_running=True) waited = True # store the pipeline result of we have any: if return_value and pipeline_result is not None: diff --git a/clearml/automation/job.py b/clearml/automation/job.py index fc4c8d04..19de52cc 100644 --- a/clearml/automation/job.py +++ b/clearml/automation/job.py @@ -163,22 +163,25 @@ class BaseJob(object): self._last_status_ts = time() return self._last_status - def wait(self, timeout=None, pool_period=30.): - # type: (Optional[float], float) -> bool + def wait(self, timeout=None, pool_period=30., aborted_nonresponsive_as_running=False): + # type: (Optional[float], float, bool) -> bool """ Wait until the task is fully executed (i.e., aborted/completed/failed) :param timeout: maximum time (minutes) to wait for Task to finish :param pool_period: check task status every pool_period seconds + :param aborted_nonresponsive_as_running: (default: False) If True, ignore the stopped state if the backend + non-responsive watchdog sets this Task to stopped. This scenario could happen if + an instance running the job is killed without warning (e.g. spot instances) :return: True, if Task finished. """ tic = time() while timeout is None or time() - tic < timeout * 60.: - if self.is_stopped(): + if self.is_stopped(aborted_nonresponsive_as_running=aborted_nonresponsive_as_running): return True sleep(pool_period) - return self.is_stopped() + return self.is_stopped(aborted_nonresponsive_as_running=aborted_nonresponsive_as_running) def get_console_output(self, number_of_reports=1): # type: (int) -> Sequence[str] @@ -192,7 +195,7 @@ class BaseJob(object): return self.task.get_reported_console_output(number_of_reports=number_of_reports) def worker(self): - # type: () -> str + # type: () -> Optional[str] """ Return the current worker id executing this Job. If job is pending, returns None @@ -216,16 +219,35 @@ class BaseJob(object): """ return self.status() == Task.TaskStatusEnum.in_progress - def is_stopped(self): - # type: () -> bool + def is_stopped(self, aborted_nonresponsive_as_running=False): + # type: (bool) -> bool """ Return True, if job finished executing (for any reason) + :param aborted_nonresponsive_as_running: (default: False) If True, ignore the stopped state if the backend + non-responsive watchdog sets this Task to stopped. This scenario could happen if + an instance running the job is killed without warning (e.g. spot instances) + :return: True the task is currently one of these states, stopped / completed / failed / published. """ - return self.status() in ( - Task.TaskStatusEnum.stopped, Task.TaskStatusEnum.completed, - Task.TaskStatusEnum.failed, Task.TaskStatusEnum.published) + task_status = self.status() + # check if we are Not in any of the non-running states + if task_status not in (Task.TaskStatusEnum.stopped, Task.TaskStatusEnum.completed, + Task.TaskStatusEnum.failed, Task.TaskStatusEnum.published): + return False + + # notice the status update also refresh the "status_message" field on the Task + + # if we are stopped but the message says "non-responsive" it means for some reason the + # Task's instance was killed, we should ignore it if requested because we assume someone will bring it back + if aborted_nonresponsive_as_running and task_status == Task.TaskStatusEnum.stopped and \ + str(self.task.data.status_message).lower() == "forced stop (non-responsive)": + # if we are here it means the state is "stopped" but we should ignore it + # because the non-responsive watchdog set it. We assume someone (autoscaler) will relaunch it. + return False + else: + # if we do not need to ignore the nonactive state, it means this Task stopped + return True def is_failed(self): # type: () -> bool diff --git a/clearml/backend_interface/task/task.py b/clearml/backend_interface/task/task.py index 50b661b4..cbd9c449 100644 --- a/clearml/backend_interface/task/task.py +++ b/clearml/backend_interface/task/task.py @@ -1637,16 +1637,18 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): def get_status(self): # type: () -> str """ - Return The task status without refreshing the entire Task object object (only the status property) + Return The task status without refreshing the entire Task object (only the status property) TaskStatusEnum: ["created", "in_progress", "stopped", "closed", "failed", "completed", "queued", "published", "publishing", "unknown"] :return: str: Task status as string (TaskStatusEnum) """ - status = self._get_status()[0] + status, status_message = self._get_status() if self._data: self._data.status = status + self._data.status_message = str(status_message) + return str(status) def get_output_log_web_page(self):