Fix logger in case a packet was dropped before it was overwritten

This commit is contained in:
allegroai 2020-04-09 12:56:02 +03:00
parent aa737e6b5b
commit 1b901b7d13

View File

@ -1,8 +1,10 @@
import os
import sys import sys
import time import time
from logging import LogRecord, getLogger, basicConfig from logging import LogRecord, getLogger, basicConfig
from logging.handlers import BufferingHandler from logging.handlers import BufferingHandler
from multiprocessing.pool import ThreadPool from threading import Thread, Event
from six.moves.queue import Queue
from ...backend_api.services import events from ...backend_api.services import events
from ...config import config from ...config import config
@ -12,7 +14,7 @@ buffer_capacity = config.get('log.task_log_buffer_capacity', 100)
class TaskHandler(BufferingHandler): class TaskHandler(BufferingHandler):
__flush_max_history_seconds = 30. __flush_max_history_seconds = 30.
__wait_for_flush_timeout = int(10) __wait_for_flush_timeout = 10.
__once = False __once = False
@property @property
@ -30,7 +32,9 @@ class TaskHandler(BufferingHandler):
self.last_timestamp = 0 self.last_timestamp = 0
self.counter = 1 self.counter = 1
self._last_event = None self._last_event = None
self._thread_pool = None self._exit_event = None
self._queue = None
self._thread = None
self._pending = 0 self._pending = 0
def shouldFlush(self, record): def shouldFlush(self, record):
@ -64,8 +68,11 @@ class TaskHandler(BufferingHandler):
return True return True
# if the first entry in the log was too long ago. # if the first entry in the log was too long ago.
if len(self.buffer) and (time.time() - self.buffer[0].created) > self.__flush_max_history_seconds: try:
return True if len(self.buffer) and (time.time() - self.buffer[0].created) > self.__flush_max_history_seconds:
return True
except:
pass
return False return False
@ -91,14 +98,21 @@ class TaskHandler(BufferingHandler):
self._last_event.msg += '\n' + msg self._last_event.msg += '\n' + msg
return None return None
self._last_event = events.TaskLogEvent( # if we have a previous event and it timed out, return it.
new_event = events.TaskLogEvent(
task=self.task_id, task=self.task_id,
timestamp=timestamp, timestamp=timestamp,
level=record.levelname.lower(), level=record.levelname.lower(),
worker=self.session.worker, worker=self.session.worker,
msg=msg msg=msg
) )
return self._last_event if self._last_event:
event = self._last_event
self._last_event = new_event
return event
self._last_event = new_event
return None
def flush(self): def flush(self):
if self._task_id is None: if self._task_id is None:
@ -114,7 +128,7 @@ class TaskHandler(BufferingHandler):
buffer = self.buffer buffer = self.buffer
self.buffer = [] self.buffer = []
try: try:
record_events = [self._record_to_event(record) for record in buffer] record_events = [self._record_to_event(record) for record in buffer] + [self._last_event]
self._last_event = None self._last_event = None
batch_requests = events.AddBatchRequest(requests=[events.AddRequest(e) for e in record_events if e]) batch_requests = events.AddBatchRequest(requests=[events.AddRequest(e) for e in record_events if e])
except Exception: except Exception:
@ -122,66 +136,50 @@ class TaskHandler(BufferingHandler):
batch_requests = None batch_requests = None
if batch_requests: if batch_requests:
if not self._thread_pool:
self._thread_pool = ThreadPool(processes=1)
self._pending += 1 self._pending += 1
self._thread_pool.apply_async(self._send_events, args=(batch_requests, )) self._add_to_queue(batch_requests)
self.release() self.release()
def wait_for_flush(self, shutdown=False): def _add_to_queue(self, request):
msg = 'Task.log.wait_for_flush: %d' if not self._queue:
from ...utilities.os.lowlevel import threadpool_waited_join self._queue = Queue()
ll = self.__log_stderr self._exit_event = Event()
ll(msg % 0) self._exit_event.clear()
self.acquire() # multiple workers could be supported as well
ll(msg % 1) self._thread = Thread(target=self._daemon)
if self._thread_pool: # self._thread.daemon = True
ll(msg % 2) self._thread.start()
t = self._thread_pool self._queue.put(request)
ll(msg % 3)
self._thread_pool = None
ll(msg % 4)
try:
ll(msg % 5)
t.terminate()
ll(msg % 6)
for i in range(int(self.__wait_for_flush_timeout)):
if self._pending <= 0:
break
self.__log_stderr('INFO: trains.Task - flushing console logs (timeout {}s)'.format(
float(self.__wait_for_flush_timeout-i)))
if threadpool_waited_join(t, timeout=1.0):
break
ll(msg % 7)
except Exception:
ll(msg % 8)
pass
if shutdown:
ll(msg % 9)
self._task_id = None
ll(msg % 10)
self.release()
ll(msg % 11)
def close(self, wait=False): def close(self, wait=False):
import os
self.__log_stderr('Closing {} wait={}'.format(os.getpid(), wait)) self.__log_stderr('Closing {} wait={}'.format(os.getpid(), wait))
# super already calls self.flush() # flush pending logs
super(TaskHandler, self).close() if not self._task_id:
return
self.flush()
# shut down the TaskHandler, from this point onwards. No events will be logged # shut down the TaskHandler, from this point onwards. No events will be logged
if not wait: self.acquire()
self.acquire() _thread = self._thread
self._thread_pool = None self._thread = None
self._task_id = None if self._queue:
self.release() self._exit_event.set()
else: self._queue.put(None)
self.wait_for_flush(shutdown=True) self._task_id = None
self.release()
if wait and _thread:
try:
_thread.join(timeout=self.__wait_for_flush_timeout)
self.__log_stderr('Closing {} wait done'.format(os.getpid()))
except:
pass
# call super and remove the handler
super(TaskHandler, self).close()
def _send_events(self, a_request): def _send_events(self, a_request):
try: try:
if self._thread_pool is None: if self._thread is None:
self.__log_stderr('INFO: trains.Task - ' self.__log_stderr('INFO: trains.log - '
'Task.close() flushing remaining logs ({})'.format(self._pending)) 'Task.close() flushing remaining logs ({})'.format(self._pending))
self._pending -= 1 self._pending -= 1
res = self.session.send(a_request) res = self.session.send(a_request)
@ -192,9 +190,26 @@ class TaskHandler(BufferingHandler):
self.__log_stderr("WARNING: trains.log._send_events: Retrying, " self.__log_stderr("WARNING: trains.log._send_events: Retrying, "
"failed logging task to backend ({:d} lines): {}".format(len(a_request.requests), ex)) "failed logging task to backend ({:d} lines): {}".format(len(a_request.requests), ex))
# we should push ourselves back into the thread pool # we should push ourselves back into the thread pool
if self._thread_pool: if self._queue:
self._pending += 1 self._pending += 1
self._thread_pool.apply_async(self._send_events, args=(a_request, )) self._queue.put(a_request)
def _daemon(self):
# multiple daemons are supported
leave = self._exit_event.wait(0)
request = True
while not leave or request:
# pull from queue
request = None
if self._queue:
try:
request = self._queue.get(block=not leave)
except:
pass
if request:
self._send_events(request)
leave = self._exit_event.wait(0)
self.__log_stderr('INFO: trains.log - leaving {}'.format(os.getpid()))
@staticmethod @staticmethod
def __log_stderr(t): def __log_stderr(t):