Optimize locking for TaskHandler, avoid lock when shutting down

This commit is contained in:
allegroai 2020-04-09 13:08:46 +03:00
parent d9aa83380f
commit 7dae058359

View File

@ -1,7 +1,7 @@
import os import os
import sys import sys
import time import time
from logging import LogRecord, getLogger, basicConfig from logging import LogRecord, getLogger, basicConfig, getLevelName, INFO, WARNING, Formatter, makeLogRecord
from logging.handlers import BufferingHandler from logging.handlers import BufferingHandler
from threading import Thread, Event from threading import Thread, Event
from six.moves.queue import Queue from six.moves.queue import Queue
@ -46,9 +46,6 @@ class TaskHandler(BufferingHandler):
""" """
if self._task_id is None: if self._task_id is None:
return False return False
# Notice! protect against infinite loops, i.e. flush while sending previous records
# if self.lock._is_owned():
# return False
# if we need to add handlers to the base_logger, # if we need to add handlers to the base_logger,
# it will not automatically create stream one when first used, so we must manually configure it. # it will not automatically create stream one when first used, so we must manually configure it.
@ -122,11 +119,14 @@ class TaskHandler(BufferingHandler):
return return
self.acquire() self.acquire()
if not self.buffer: if self.buffer:
self.release() buffer = self.buffer
self.buffer = []
self.release()
if not buffer:
return return
buffer = self.buffer
self.buffer = []
try: try:
record_events = [self._record_to_event(record) for record in buffer] + [self._last_event] record_events = [self._record_to_event(record) for record in buffer] + [self._last_event]
self._last_event = None self._last_event = None
@ -139,38 +139,46 @@ class TaskHandler(BufferingHandler):
self._pending += 1 self._pending += 1
self._add_to_queue(batch_requests) self._add_to_queue(batch_requests)
self.release() def _create_thread_queue(self):
if self._queue:
return
self._queue = Queue()
self._exit_event = Event()
self._exit_event.clear()
# multiple workers could be supported as well
self._thread = Thread(target=self._daemon)
self._thread.daemon = True
self._thread.start()
def _add_to_queue(self, request): def _add_to_queue(self, request):
if not self._queue: self._create_thread_queue()
self._queue = Queue()
self._exit_event = Event()
self._exit_event.clear()
# multiple workers could be supported as well
self._thread = Thread(target=self._daemon)
self._thread.daemon = True
self._thread.start()
self._queue.put(request) self._queue.put(request)
def close(self, wait=False): def close(self, wait=False):
self.__log_stderr('Closing {} wait={}'.format(os.getpid(), wait)) # self.__log_stderr('Closing {} wait={}'.format(os.getpid(), wait))
# flush pending logs # flush pending logs
if not self._task_id: if not self._task_id:
return return
# avoid deadlocks just skip the lock, we are shutting down anyway
self.lock = None
self.flush() self.flush()
# shut down the TaskHandler, from this point onwards. No events will be logged # shut down the TaskHandler, from this point onwards. No events will be logged
self.acquire()
_thread = self._thread _thread = self._thread
self._thread = None self._thread = None
if self._queue: if self._queue:
self._exit_event.set() self._exit_event.set()
self._queue.put(None) self._queue.put(None)
self._task_id = None self._task_id = None
self.release()
if wait and _thread: if wait and _thread:
try: try:
_thread.join(timeout=self.__wait_for_flush_timeout) timeout = 1. if self._queue.empty() else self.__wait_for_flush_timeout
self.__log_stderr('Closing {} wait done'.format(os.getpid())) _thread.join(timeout=timeout)
if not self._queue.empty():
self.__log_stderr('Flush timeout {}s exceeded, dropping last lines'.format(timeout))
# self.__log_stderr('Closing {} wait done'.format(os.getpid()))
except: except:
pass pass
# call super and remove the handler # call super and remove the handler
@ -178,17 +186,16 @@ class TaskHandler(BufferingHandler):
def _send_events(self, a_request): def _send_events(self, a_request):
try: try:
if self._thread is None: # if self._thread is None:
self.__log_stderr('INFO: trains.log - ' # self.__log_stderr('Task.close() flushing remaining logs ({})'.format(self._pending))
'Task.close() flushing remaining logs ({})'.format(self._pending))
self._pending -= 1 self._pending -= 1
res = self.session.send(a_request) res = self.session.send(a_request)
if not res.ok(): if not res.ok():
self.__log_stderr("WARNING: trains.log._send_events: failed logging task to backend " self.__log_stderr("failed logging task to backend ({:d} lines, {})".format(
"({:d} lines, {})".format(len(a_request.requests), str(res.meta))) len(a_request.requests), str(res.meta)), level=WARNING)
except Exception as ex: except Exception as ex:
self.__log_stderr("WARNING: trains.log._send_events: Retrying, " # self.__log_stderr("Retrying, failed logging task to backend ({:d} lines): {}".format(
"failed logging task to backend ({:d} lines): {}".format(len(a_request.requests), ex)) # len(a_request.requests), ex))
# we should push ourselves back into the thread pool # we should push ourselves back into the thread pool
if self._queue: if self._queue:
self._pending += 1 self._pending += 1
@ -209,9 +216,12 @@ class TaskHandler(BufferingHandler):
if request: if request:
self._send_events(request) self._send_events(request)
leave = self._exit_event.wait(0) leave = self._exit_event.wait(0)
self.__log_stderr('INFO: trains.log - leaving {}'.format(os.getpid())) # self.__log_stderr('leaving {}'.format(os.getpid()))
@staticmethod @staticmethod
def __log_stderr(t): def __log_stderr(msg, level=INFO):
# output directly to stderr, make sure we do not catch it.
write = sys.stderr._original_write if hasattr(sys.stderr, '_original_write') else sys.stderr.write write = sys.stderr._original_write if hasattr(sys.stderr, '_original_write') else sys.stderr.write
write(t + '\n') write('{asctime} - {name} - {levelname} - {message}\n'.format(
asctime=Formatter().formatTime(makeLogRecord({})),
name='trains.log', levelname=getLevelName(level), message=msg))