Fix single log request exceeds max packet size, set limit to 1MB per request

This commit is contained in:
allegroai 2020-04-16 16:41:18 +03:00
parent 7f4b100042
commit 4eaa77dbdb

View File

@ -7,6 +7,7 @@ from threading import Thread, Event
from six.moves.queue import Queue
from ...backend_api.services import events
from ...backend_api.session.session import MaxRequestSizeError
from ...config import config
buffer_capacity = config.get('log.task_log_buffer_capacity', 100)
@ -15,6 +16,7 @@ buffer_capacity = config.get('log.task_log_buffer_capacity', 100)
class TaskHandler(BufferingHandler):
__flush_max_history_seconds = 30.
__wait_for_flush_timeout = 10.
__max_event_size = 1024*1024
__once = False
@property
@ -86,30 +88,34 @@ class TaskHandler(BufferingHandler):
self.counter = 1
# ignore backspaces (they are often used)
msg = record.getMessage().replace('\x08', '')
full_msg = record.getMessage().replace('\x08', '')
# unite all records in a single second
if self._last_event and timestamp - self._last_event.timestamp < 1000 and \
record.levelname.lower() == str(self._last_event.level):
# ignore backspaces (they are often used)
self._last_event.msg += '\n' + msg
return None
return_events = []
while full_msg:
msg = full_msg[:self.__max_event_size]
full_msg = full_msg[self.__max_event_size:]
# unite all records in a single second
if self._last_event and timestamp - self._last_event.timestamp < 1000 and \
len(self._last_event.msg) + len(msg) < self.__max_event_size and \
record.levelname.lower() == str(self._last_event.level):
# ignore backspaces (they are often used)
self._last_event.msg += '\n' + msg
continue
# if we have a previous event and it timed out, return it.
new_event = events.TaskLogEvent(
task=self.task_id,
timestamp=timestamp,
level=record.levelname.lower(),
worker=self.session.worker,
msg=msg
)
if self._last_event:
return_events.append(self._last_event)
# if we have a previous event and it timed out, return it.
new_event = events.TaskLogEvent(
task=self.task_id,
timestamp=timestamp,
level=record.levelname.lower(),
worker=self.session.worker,
msg=msg
)
if self._last_event:
event = self._last_event
self._last_event = new_event
return event
self._last_event = new_event
return None
return return_events
def flush(self):
if self._task_id is None:
@ -128,7 +134,7 @@ class TaskHandler(BufferingHandler):
return
try:
record_events = [self._record_to_event(record) for record in buffer] + [self._last_event]
record_events = [r for record in buffer for r in self._record_to_event(record)] + [self._last_event]
self._last_event = None
batch_requests = events.AddBatchRequest(requests=[events.AddRequest(e) for e in record_events if e])
except Exception:
@ -177,7 +183,8 @@ class TaskHandler(BufferingHandler):
timeout = 1. if self._queue.empty() else self.__wait_for_flush_timeout
_thread.join(timeout=timeout)
if not self._queue.empty():
self.__log_stderr('Flush timeout {}s exceeded, dropping last lines'.format(timeout))
self.__log_stderr('Flush timeout {}s exceeded, dropping last {} lines'.format(
timeout, self._queue.qsize()))
# self.__log_stderr('Closing {} wait done'.format(os.getpid()))
except:
pass
@ -193,9 +200,12 @@ class TaskHandler(BufferingHandler):
if not res.ok():
self.__log_stderr("failed logging task to backend ({:d} lines, {})".format(
len(a_request.requests), str(res.meta)), level=WARNING)
except MaxRequestSizeError:
self.__log_stderr("failed logging task to backend ({:d} lines) log size exceeded limit".format(
len(a_request.requests)), level=WARNING)
except Exception as ex:
# self.__log_stderr("Retrying, failed logging task to backend ({:d} lines): {}".format(
# len(a_request.requests), ex))
self.__log_stderr("Retrying, failed logging task to backend ({:d} lines): {}".format(
len(a_request.requests), ex))
# we should push ourselves back into the thread pool
if self._queue:
self._pending += 1