diff --git a/trains/backend_interface/task/log.py b/trains/backend_interface/task/log.py index c4bccd9e..4f184716 100644 --- a/trains/backend_interface/task/log.py +++ b/trains/backend_interface/task/log.py @@ -7,6 +7,7 @@ from threading import Thread, Event from six.moves.queue import Queue from ...backend_api.services import events +from ...backend_api.session.session import MaxRequestSizeError from ...config import config buffer_capacity = config.get('log.task_log_buffer_capacity', 100) @@ -15,6 +16,7 @@ buffer_capacity = config.get('log.task_log_buffer_capacity', 100) class TaskHandler(BufferingHandler): __flush_max_history_seconds = 30. __wait_for_flush_timeout = 10. + __max_event_size = 1024*1024 __once = False @property @@ -86,30 +88,34 @@ class TaskHandler(BufferingHandler): self.counter = 1 # ignore backspaces (they are often used) - msg = record.getMessage().replace('\x08', '') + full_msg = record.getMessage().replace('\x08', '') - # unite all records in a single second - if self._last_event and timestamp - self._last_event.timestamp < 1000 and \ - record.levelname.lower() == str(self._last_event.level): - # ignore backspaces (they are often used) - self._last_event.msg += '\n' + msg - return None + return_events = [] + while full_msg: + msg = full_msg[:self.__max_event_size] + full_msg = full_msg[self.__max_event_size:] + # unite all records in a single second + if self._last_event and timestamp - self._last_event.timestamp < 1000 and \ + len(self._last_event.msg) + len(msg) < self.__max_event_size and \ + record.levelname.lower() == str(self._last_event.level): + # ignore backspaces (they are often used) + self._last_event.msg += '\n' + msg + continue + + # if we have a previous event and it timed out, return it. + new_event = events.TaskLogEvent( + task=self.task_id, + timestamp=timestamp, + level=record.levelname.lower(), + worker=self.session.worker, + msg=msg + ) + if self._last_event: + return_events.append(self._last_event) - # if we have a previous event and it timed out, return it. - new_event = events.TaskLogEvent( - task=self.task_id, - timestamp=timestamp, - level=record.levelname.lower(), - worker=self.session.worker, - msg=msg - ) - if self._last_event: - event = self._last_event self._last_event = new_event - return event - self._last_event = new_event - return None + return return_events def flush(self): if self._task_id is None: @@ -128,7 +134,7 @@ class TaskHandler(BufferingHandler): return try: - record_events = [self._record_to_event(record) for record in buffer] + [self._last_event] + record_events = [r for record in buffer for r in self._record_to_event(record)] + [self._last_event] self._last_event = None batch_requests = events.AddBatchRequest(requests=[events.AddRequest(e) for e in record_events if e]) except Exception: @@ -177,7 +183,8 @@ class TaskHandler(BufferingHandler): timeout = 1. if self._queue.empty() else self.__wait_for_flush_timeout _thread.join(timeout=timeout) if not self._queue.empty(): - self.__log_stderr('Flush timeout {}s exceeded, dropping last lines'.format(timeout)) + self.__log_stderr('Flush timeout {}s exceeded, dropping last {} lines'.format( + timeout, self._queue.qsize())) # self.__log_stderr('Closing {} wait done'.format(os.getpid())) except: pass @@ -193,9 +200,12 @@ class TaskHandler(BufferingHandler): if not res.ok(): self.__log_stderr("failed logging task to backend ({:d} lines, {})".format( len(a_request.requests), str(res.meta)), level=WARNING) + except MaxRequestSizeError: + self.__log_stderr("failed logging task to backend ({:d} lines) log size exceeded limit".format( + len(a_request.requests)), level=WARNING) except Exception as ex: - # self.__log_stderr("Retrying, failed logging task to backend ({:d} lines): {}".format( - # len(a_request.requests), ex)) + self.__log_stderr("Retrying, failed logging task to backend ({:d} lines): {}".format( + len(a_request.requests), ex)) # we should push ourselves back into the thread pool if self._queue: self._pending += 1