From 22a65abdc1c074f2948b3d574b5a53c316d1564f Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 6 Jul 2019 22:58:01 +0300 Subject: [PATCH] Add new task state support --- .../task/development/stop_signal.py | 28 +++++---- .../task/development/worker.py | 58 ++++++++++++++++--- 2 files changed, 69 insertions(+), 17 deletions(-) diff --git a/trains/backend_interface/task/development/stop_signal.py b/trains/backend_interface/task/development/stop_signal.py index 1d15f22f..40e7c6cb 100644 --- a/trains/backend_interface/task/development/stop_signal.py +++ b/trains/backend_interface/task/development/stop_signal.py @@ -1,5 +1,5 @@ from ....config import config -from ....backend_interface import Task, TaskStatusEnum +from ....backend_api.services import tasks class TaskStopReason(object): @@ -13,14 +13,16 @@ class TaskStopSignal(object): _number_of_consecutive_reset_tests = 4 - _unexpected_statuses = ( - TaskStatusEnum.closed, - TaskStatusEnum.stopped, - TaskStatusEnum.failed, - TaskStatusEnum.published, - ) + # _unexpected_statuses = ( + # tasks.TaskStatusEnum.closed, + # tasks.TaskStatusEnum.stopped, + # tasks.TaskStatusEnum.failed, + # tasks.TaskStatusEnum.published, + # tasks.TaskStatusEnum.completed, + # ) def __init__(self, task): + from ....backend_interface import Task assert isinstance(task, Task) self.task = task self._task_reset_state_counter = 0 @@ -29,13 +31,19 @@ class TaskStopSignal(object): status = self.task.status message = self.task.data.status_message - if status == TaskStatusEnum.in_progress and "stopping" in message: + if status == tasks.TaskStatusEnum.in_progress and "stopping" in message: return TaskStopReason.stopped - if status in self._unexpected_statuses and "worker" not in message: + _expected_statuses = ( + tasks.TaskStatusEnum.created, + tasks.TaskStatusEnum.queued, + tasks.TaskStatusEnum.in_progress, + ) + + if status not in _expected_statuses and "worker" not in message: return TaskStopReason.status_changed - if status == TaskStatusEnum.created: + if status == tasks.TaskStatusEnum.created: self._task_reset_state_counter += 1 if self._task_reset_state_counter >= self._number_of_consecutive_reset_tests: diff --git a/trains/backend_interface/task/development/worker.py b/trains/backend_interface/task/development/worker.py index 306cd247..a6caf77f 100644 --- a/trains/backend_interface/task/development/worker.py +++ b/trains/backend_interface/task/development/worker.py @@ -1,26 +1,70 @@ from socket import gethostname import attr +from threading import Thread, Event -from ....config import config, running_remotely, dev_worker_name +from time import time + +from ....config import config +from ....backend_interface.task.development.stop_signal import TaskStopSignal +from ....backend_api.services import tasks -@attr.s class DevWorker(object): prefix = attr.ib(type=str, default="MANUAL:") report_period = float(config.get('development.worker.report_period_sec', 30.)) report_stdout = bool(config.get('development.worker.log_stdout', True)) + ping_period = 30. - @classmethod - def is_enabled(cls, model_updated=False): - return False + def __init__(self): + self._dev_stop_signal = None + self._thread = None + self._exit_event = Event() + self._task = None + self._support_ping = False - def status_report(self, timestamp=None): + def ping(self, timestamp=None): + try: + if self._task: + self._task.send(tasks.PingRequest(self._task.id)) + except Exception: + return False return True - def register(self): + def register(self, task): + if self._thread: + return True + if TaskStopSignal.enabled: + self._dev_stop_signal = TaskStopSignal(task=task) + self._support_ping = hasattr(tasks, 'PingRequest') + # if there is nothing to monitor, leave + if not self._support_ping and not self._dev_stop_signal: + return + self._task = task + self._exit_event.clear() + self._thread = Thread(target=self._daemon) + self._thread.daemon = True + self._thread.start() return True + def _daemon(self): + last_ping = time() + while self._task is not None: + if self._exit_event.wait(min(self.ping_period, self.report_period)): + return + # send ping request + if self._support_ping and (time() - last_ping) >= self.ping_period: + self.ping() + last_ping = time() + if self._dev_stop_signal: + stop_reason = self._dev_stop_signal.test() + if stop_reason and self._task: + self._task._dev_mode_stop_task(stop_reason) + def unregister(self): + self._exit_event.set() + self._dev_stop_signal = None + self._thread = None + self._task = None return True