From 7dae0583597a0b005e655ef1323e4c0b1d1c8887 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 9 Apr 2020 13:08:46 +0300 Subject: [PATCH] Optimize locking for TaskHandler, avoid lock when shutting down --- trains/backend_interface/task/log.py | 74 ++++++++++++++++------------ 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/trains/backend_interface/task/log.py b/trains/backend_interface/task/log.py index fed00983..c4bccd9e 100644 --- a/trains/backend_interface/task/log.py +++ b/trains/backend_interface/task/log.py @@ -1,7 +1,7 @@ import os import sys import time -from logging import LogRecord, getLogger, basicConfig +from logging import LogRecord, getLogger, basicConfig, getLevelName, INFO, WARNING, Formatter, makeLogRecord from logging.handlers import BufferingHandler from threading import Thread, Event from six.moves.queue import Queue @@ -46,9 +46,6 @@ class TaskHandler(BufferingHandler): """ if self._task_id is None: return False - # Notice! protect against infinite loops, i.e. flush while sending previous records - # if self.lock._is_owned(): - # return False # if we need to add handlers to the base_logger, # it will not automatically create stream one when first used, so we must manually configure it. @@ -122,11 +119,14 @@ class TaskHandler(BufferingHandler): return self.acquire() - if not self.buffer: - self.release() + if self.buffer: + buffer = self.buffer + self.buffer = [] + self.release() + + if not buffer: return - buffer = self.buffer - self.buffer = [] + try: record_events = [self._record_to_event(record) for record in buffer] + [self._last_event] self._last_event = None @@ -139,38 +139,46 @@ class TaskHandler(BufferingHandler): self._pending += 1 self._add_to_queue(batch_requests) - self.release() + def _create_thread_queue(self): + if self._queue: + return + + self._queue = Queue() + self._exit_event = Event() + self._exit_event.clear() + # multiple workers could be supported as well + self._thread = Thread(target=self._daemon) + self._thread.daemon = True + self._thread.start() def _add_to_queue(self, request): - if not self._queue: - self._queue = Queue() - self._exit_event = Event() - self._exit_event.clear() - # multiple workers could be supported as well - self._thread = Thread(target=self._daemon) - self._thread.daemon = True - self._thread.start() + self._create_thread_queue() self._queue.put(request) def close(self, wait=False): - self.__log_stderr('Closing {} wait={}'.format(os.getpid(), wait)) + # self.__log_stderr('Closing {} wait={}'.format(os.getpid(), wait)) # flush pending logs if not self._task_id: return + # avoid deadlocks just skip the lock, we are shutting down anyway + self.lock = None + self.flush() # shut down the TaskHandler, from this point onwards. No events will be logged - self.acquire() _thread = self._thread self._thread = None if self._queue: self._exit_event.set() self._queue.put(None) self._task_id = None - self.release() + if wait and _thread: try: - _thread.join(timeout=self.__wait_for_flush_timeout) - self.__log_stderr('Closing {} wait done'.format(os.getpid())) + timeout = 1. if self._queue.empty() else self.__wait_for_flush_timeout + _thread.join(timeout=timeout) + if not self._queue.empty(): + self.__log_stderr('Flush timeout {}s exceeded, dropping last lines'.format(timeout)) + # self.__log_stderr('Closing {} wait done'.format(os.getpid())) except: pass # call super and remove the handler @@ -178,17 +186,16 @@ class TaskHandler(BufferingHandler): def _send_events(self, a_request): try: - if self._thread is None: - self.__log_stderr('INFO: trains.log - ' - 'Task.close() flushing remaining logs ({})'.format(self._pending)) + # if self._thread is None: + # self.__log_stderr('Task.close() flushing remaining logs ({})'.format(self._pending)) self._pending -= 1 res = self.session.send(a_request) if not res.ok(): - self.__log_stderr("WARNING: trains.log._send_events: failed logging task to backend " - "({:d} lines, {})".format(len(a_request.requests), str(res.meta))) + self.__log_stderr("failed logging task to backend ({:d} lines, {})".format( + len(a_request.requests), str(res.meta)), level=WARNING) except Exception as ex: - self.__log_stderr("WARNING: trains.log._send_events: Retrying, " - "failed logging task to backend ({:d} lines): {}".format(len(a_request.requests), ex)) + # self.__log_stderr("Retrying, failed logging task to backend ({:d} lines): {}".format( + # len(a_request.requests), ex)) # we should push ourselves back into the thread pool if self._queue: self._pending += 1 @@ -209,9 +216,12 @@ class TaskHandler(BufferingHandler): if request: self._send_events(request) leave = self._exit_event.wait(0) - self.__log_stderr('INFO: trains.log - leaving {}'.format(os.getpid())) + # self.__log_stderr('leaving {}'.format(os.getpid())) @staticmethod - def __log_stderr(t): + def __log_stderr(msg, level=INFO): + # output directly to stderr, make sure we do not catch it. write = sys.stderr._original_write if hasattr(sys.stderr, '_original_write') else sys.stderr.write - write(t + '\n') + write('{asctime} - {name} - {levelname} - {message}\n'.format( + asctime=Formatter().formatTime(makeLogRecord({})), + name='trains.log', levelname=getLevelName(level), message=msg))