clearml/trains/backend_interface/task/development/stop_signal.py

62 lines
2.2 KiB
Python
Raw Normal View History

2019-06-10 17:00:28 +00:00
from ....config import config
class TaskStopReason(object):
stopped = "stopped"
reset = "reset"
status_changed = "status_changed"
class TaskStopSignal(object):
enabled = bool(config.get('development.support_stopping', False))
_number_of_consecutive_reset_tests = 4
def __init__(self, task):
2019-07-06 19:58:01 +00:00
from ....backend_interface import Task
2019-06-10 17:00:28 +00:00
assert isinstance(task, Task)
self.task = task
self._task_reset_state_counter = 0
self._status_in_progress = str(Task.TaskStatusEnum.in_progress)
self._status_created = str(Task.TaskStatusEnum.created)
self._status_expected_statuses = (
str(Task.TaskStatusEnum.created),
str(Task.TaskStatusEnum.queued),
str(Task.TaskStatusEnum.in_progress),
)
2019-06-10 17:00:28 +00:00
def test(self):
2019-07-08 20:27:53 +00:00
# noinspection PyBroadException
try:
# we use internal status read, so we do not need to constantly pull the entire task object,
# it might be large, and we want to also avoid the edit lock on it.
status, message = self.task._get_status()
status = str(status)
message = str(message)
2019-07-08 20:27:53 +00:00
if status == self._status_in_progress and "stopping" in message:
# make sure we syn the entire task object
self.task.reload()
2019-07-08 20:27:53 +00:00
return TaskStopReason.stopped
if status not in self._status_expected_statuses and "worker" not in message:
# make sure we syn the entire task object
self.task.reload()
2019-07-08 20:27:53 +00:00
return TaskStopReason.status_changed
2019-06-10 17:00:28 +00:00
if status == self._status_created:
2019-07-08 20:27:53 +00:00
self._task_reset_state_counter += 1
2019-06-10 17:00:28 +00:00
2019-07-08 20:27:53 +00:00
if self._task_reset_state_counter >= self._number_of_consecutive_reset_tests:
# make sure we syn the entire task object
self.task.reload()
2019-07-08 20:27:53 +00:00
return TaskStopReason.reset
2019-06-10 17:00:28 +00:00
self.task.log.warning(
2019-07-08 20:27:53 +00:00
"Task {} was reset! if state is consistent we shall terminate.".format(self.task.id),
)
else:
self._task_reset_state_counter = 0
except Exception:
return None