Improve stability and resilience on intermittent network connection

This commit is contained in:
allegroai
2019-08-19 21:17:53 +03:00
parent 0a8cf706bd
commit 3bc1ec2362
10 changed files with 92 additions and 51 deletions

View File

@@ -18,14 +18,14 @@ class AccessMixin(object):
obj = self.data
props = prop_path.split('.')
for i in range(len(props)):
obj = getattr(obj, props[i], None)
if obj is None:
if not hasattr(obj, props[i]):
msg = 'Task has no %s section defined' % '.'.join(props[:i + 1])
if log_on_error:
self.log.info(msg)
if raise_on_error:
raise ValueError(msg)
return default
obj = getattr(obj, props[i], None)
return obj
def _set_task_property(self, prop_path, value, raise_on_error=True, log_on_error=True):

View File

@@ -30,22 +30,22 @@ class TaskStopSignal(object):
def test(self):
# noinspection PyBroadException
try:
status = self.task.status
status = str(self.task.status)
message = self.task.data.status_message
if status == tasks.TaskStatusEnum.in_progress and "stopping" in message:
if status == str(tasks.TaskStatusEnum.in_progress) and "stopping" in message:
return TaskStopReason.stopped
_expected_statuses = (
tasks.TaskStatusEnum.created,
tasks.TaskStatusEnum.queued,
tasks.TaskStatusEnum.in_progress,
str(tasks.TaskStatusEnum.created),
str(tasks.TaskStatusEnum.queued),
str(tasks.TaskStatusEnum.in_progress),
)
if status not in _expected_statuses and "worker" not in message:
return TaskStopReason.status_changed
if status == tasks.TaskStatusEnum.created:
if status == str(tasks.TaskStatusEnum.created):
self._task_reset_state_counter += 1
if self._task_reset_state_counter >= self._number_of_consecutive_reset_tests:

View File

@@ -1,5 +1,3 @@
from socket import gethostname
import attr
from threading import Thread, Event
@@ -13,9 +11,9 @@ from ....backend_api.services import tasks
class DevWorker(object):
prefix = attr.ib(type=str, default="MANUAL:")
report_period = float(config.get('development.worker.report_period_sec', 30.))
report_period = float(max(config.get('development.worker.report_period_sec', 30.), 1.))
report_stdout = bool(config.get('development.worker.log_stdout', True))
ping_period = 30.
ping_period = float(max(config.get('development.worker.ping_period_sec', 30.), 1.))
def __init__(self):
self._dev_stop_signal = None
@@ -51,20 +49,23 @@ class DevWorker(object):
def _daemon(self):
last_ping = time()
while self._task is not None:
if self._exit_event.wait(min(self.ping_period, self.report_period)):
return
# send ping request
if self._support_ping and (time() - last_ping) >= self.ping_period:
self.ping()
last_ping = time()
if self._dev_stop_signal:
stop_reason = self._dev_stop_signal.test()
if stop_reason and self._task:
self._task._dev_mode_stop_task(stop_reason)
try:
if self._exit_event.wait(min(self.ping_period, self.report_period)):
return
# send ping request
if self._support_ping and (time() - last_ping) >= self.ping_period:
self.ping()
last_ping = time()
if self._dev_stop_signal:
stop_reason = self._dev_stop_signal.test()
if stop_reason and self._task:
self._task._dev_mode_stop_task(stop_reason)
except Exception:
pass
def unregister(self):
self._exit_event.set()
self._dev_stop_signal = None
self._thread = None
self._task = None
self._thread = None
self._exit_event.set()
return True

View File

@@ -1,6 +1,7 @@
import time
from logging import LogRecord, getLogger, basicConfig
from logging.handlers import BufferingHandler
from multiprocessing.pool import ThreadPool
from ...backend_api.services import events
from ...config import config
@@ -27,6 +28,7 @@ class TaskHandler(BufferingHandler):
self.last_timestamp = 0
self.counter = 1
self._last_event = None
self._thread_pool = ThreadPool(processes=1)
def shouldFlush(self, record):
"""
@@ -92,6 +94,7 @@ class TaskHandler(BufferingHandler):
def flush(self):
if not self.buffer:
return
self.acquire()
buffer = self.buffer
try:
@@ -100,11 +103,20 @@ class TaskHandler(BufferingHandler):
self.buffer = []
record_events = [self._record_to_event(record) for record in buffer]
self._last_event = None
requests = [events.AddRequest(e) for e in record_events if e]
res = self.session.send(events.AddBatchRequest(requests=requests))
if not res.ok():
print("Failed logging task to backend ({:d} lines, {})".format(len(buffer), str(res.meta)))
batch_requests = events.AddBatchRequest(requests=[events.AddRequest(e) for e in record_events if e])
except Exception:
batch_requests = None
print("Failed logging task to backend ({:d} lines)".format(len(buffer)))
finally:
self.release()
if batch_requests:
self._thread_pool.apply_async(self._send_events, args=(batch_requests, ))
def _send_events(self, a_request):
try:
res = self.session.send(a_request)
if not res.ok():
print("Failed logging task to backend ({:d} lines, {})".format(len(a_request.requests), str(res.meta)))
except Exception:
print("Failed logging task to backend ({:d} lines)".format(len(a_request.requests)))

View File

@@ -4,8 +4,6 @@ import itertools
import logging
from enum import Enum
from threading import RLock, Thread
from copy import copy
from six.moves.urllib.parse import urlparse, urlunparse
import six
@@ -300,7 +298,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
@property
def _status(self):
""" Return the task's cached status (don't reload if we don't have to) """
return self.data.status
return str(self.data.status)
@property
def input_model(self):
@@ -349,11 +347,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
Returns a simple metrics reporter instance
"""
if self._reporter is None:
try:
storage_uri = self.get_output_destination(log_on_error=False)
except ValueError:
storage_uri = None
self._reporter = Reporter(self._get_metrics_manager(storage_uri=storage_uri))
self._setup_reporter()
return self._reporter
def _get_metrics_manager(self, storage_uri):
@@ -366,6 +360,14 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
)
return self._metrics_manager
def _setup_reporter(self):
try:
storage_uri = self.get_output_destination(log_on_error=False)
except ValueError:
storage_uri = None
self._reporter = Reporter(self._get_metrics_manager(storage_uri=storage_uri))
return self._reporter
def _get_output_destination_suffix(self, extra_path=None):
return '/'.join(x for x in ('task_%s' % self.data.id, extra_path) if x)
@@ -403,7 +405,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
def publish(self, ignore_errors=True):
""" Signal that this task will be published """
if self.status != tasks.TaskStatusEnum.stopped:
if str(self.status) != str(tasks.TaskStatusEnum.stopped):
raise ValueError("Can't publish, Task is not stopped")
resp = self.send(tasks.PublishRequest(self.id), ignore_errors=ignore_errors)
assert isinstance(resp.response, tasks.PublishResponse)
@@ -471,7 +473,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
return uri
def _conditionally_start_task(self):
if self.status == tasks.TaskStatusEnum.created:
if str(self.status) == str(tasks.TaskStatusEnum.created):
self.started()
@property
@@ -700,8 +702,8 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
def _edit(self, **kwargs):
with self._edit_lock:
# Since we ae using forced update, make sure he task status is valid
if not self._data or (self.data.status not in (tasks.TaskStatusEnum.created,
tasks.TaskStatusEnum.in_progress)):
if not self._data or (str(self.data.status) not in (str(tasks.TaskStatusEnum.created),
str(tasks.TaskStatusEnum.in_progress))):
raise ValueError('Task object can only be updated if created or in_progress')
res = self.send(tasks.EditRequest(task=self.id, force=True, **kwargs), raise_on_errors=False)