Add new task state support

This commit is contained in:
allegroai 2019-07-06 22:58:01 +03:00
parent b5219d98de
commit 22a65abdc1
2 changed files with 69 additions and 17 deletions

View File

@ -1,5 +1,5 @@
from ....config import config
from ....backend_interface import Task, TaskStatusEnum
from ....backend_api.services import tasks
class TaskStopReason(object):
@ -13,14 +13,16 @@ class TaskStopSignal(object):
_number_of_consecutive_reset_tests = 4
_unexpected_statuses = (
TaskStatusEnum.closed,
TaskStatusEnum.stopped,
TaskStatusEnum.failed,
TaskStatusEnum.published,
)
# _unexpected_statuses = (
# tasks.TaskStatusEnum.closed,
# tasks.TaskStatusEnum.stopped,
# tasks.TaskStatusEnum.failed,
# tasks.TaskStatusEnum.published,
# tasks.TaskStatusEnum.completed,
# )
def __init__(self, task):
from ....backend_interface import Task
assert isinstance(task, Task)
self.task = task
self._task_reset_state_counter = 0
@ -29,13 +31,19 @@ class TaskStopSignal(object):
status = self.task.status
message = self.task.data.status_message
if status == TaskStatusEnum.in_progress and "stopping" in message:
if status == tasks.TaskStatusEnum.in_progress and "stopping" in message:
return TaskStopReason.stopped
if status in self._unexpected_statuses and "worker" not in message:
_expected_statuses = (
tasks.TaskStatusEnum.created,
tasks.TaskStatusEnum.queued,
tasks.TaskStatusEnum.in_progress,
)
if status not in _expected_statuses and "worker" not in message:
return TaskStopReason.status_changed
if status == TaskStatusEnum.created:
if status == tasks.TaskStatusEnum.created:
self._task_reset_state_counter += 1
if self._task_reset_state_counter >= self._number_of_consecutive_reset_tests:

View File

@ -1,26 +1,70 @@
from socket import gethostname
import attr
from threading import Thread, Event
from ....config import config, running_remotely, dev_worker_name
from time import time
from ....config import config
from ....backend_interface.task.development.stop_signal import TaskStopSignal
from ....backend_api.services import tasks
@attr.s
class DevWorker(object):
prefix = attr.ib(type=str, default="MANUAL:")
report_period = float(config.get('development.worker.report_period_sec', 30.))
report_stdout = bool(config.get('development.worker.log_stdout', True))
ping_period = 30.
@classmethod
def is_enabled(cls, model_updated=False):
def __init__(self):
self._dev_stop_signal = None
self._thread = None
self._exit_event = Event()
self._task = None
self._support_ping = False
def ping(self, timestamp=None):
try:
if self._task:
self._task.send(tasks.PingRequest(self._task.id))
except Exception:
return False
def status_report(self, timestamp=None):
return True
def register(self):
def register(self, task):
if self._thread:
return True
if TaskStopSignal.enabled:
self._dev_stop_signal = TaskStopSignal(task=task)
self._support_ping = hasattr(tasks, 'PingRequest')
# if there is nothing to monitor, leave
if not self._support_ping and not self._dev_stop_signal:
return
self._task = task
self._exit_event.clear()
self._thread = Thread(target=self._daemon)
self._thread.daemon = True
self._thread.start()
return True
def _daemon(self):
last_ping = time()
while self._task is not None:
if self._exit_event.wait(min(self.ping_period, self.report_period)):
return
# send ping request
if self._support_ping and (time() - last_ping) >= self.ping_period:
self.ping()
last_ping = time()
if self._dev_stop_signal:
stop_reason = self._dev_stop_signal.test()
if stop_reason and self._task:
self._task._dev_mode_stop_task(stop_reason)
def unregister(self):
self._exit_event.set()
self._dev_stop_signal = None
self._thread = None
self._task = None
return True