diff --git a/trains/backend_interface/task/log.py b/trains/backend_interface/task/log.py index ff4dd6d2..98cb4d01 100644 --- a/trains/backend_interface/task/log.py +++ b/trains/backend_interface/task/log.py @@ -1,8 +1,10 @@ +import os import sys import time from logging import LogRecord, getLogger, basicConfig from logging.handlers import BufferingHandler -from multiprocessing.pool import ThreadPool +from threading import Thread, Event +from six.moves.queue import Queue from ...backend_api.services import events from ...config import config @@ -12,7 +14,7 @@ buffer_capacity = config.get('log.task_log_buffer_capacity', 100) class TaskHandler(BufferingHandler): __flush_max_history_seconds = 30. - __wait_for_flush_timeout = int(10) + __wait_for_flush_timeout = 10. __once = False @property @@ -30,7 +32,9 @@ class TaskHandler(BufferingHandler): self.last_timestamp = 0 self.counter = 1 self._last_event = None - self._thread_pool = None + self._exit_event = None + self._queue = None + self._thread = None self._pending = 0 def shouldFlush(self, record): @@ -64,8 +68,11 @@ class TaskHandler(BufferingHandler): return True # if the first entry in the log was too long ago. - if len(self.buffer) and (time.time() - self.buffer[0].created) > self.__flush_max_history_seconds: - return True + try: + if len(self.buffer) and (time.time() - self.buffer[0].created) > self.__flush_max_history_seconds: + return True + except: + pass return False @@ -91,14 +98,21 @@ class TaskHandler(BufferingHandler): self._last_event.msg += '\n' + msg return None - self._last_event = events.TaskLogEvent( + # if we have a previous event and it timed out, return it. + new_event = events.TaskLogEvent( task=self.task_id, timestamp=timestamp, level=record.levelname.lower(), worker=self.session.worker, msg=msg ) - return self._last_event + if self._last_event: + event = self._last_event + self._last_event = new_event + return event + + self._last_event = new_event + return None def flush(self): if self._task_id is None: @@ -114,7 +128,7 @@ class TaskHandler(BufferingHandler): buffer = self.buffer self.buffer = [] try: - record_events = [self._record_to_event(record) for record in buffer] + record_events = [self._record_to_event(record) for record in buffer] + [self._last_event] self._last_event = None batch_requests = events.AddBatchRequest(requests=[events.AddRequest(e) for e in record_events if e]) except Exception: @@ -122,66 +136,50 @@ class TaskHandler(BufferingHandler): batch_requests = None if batch_requests: - if not self._thread_pool: - self._thread_pool = ThreadPool(processes=1) self._pending += 1 - self._thread_pool.apply_async(self._send_events, args=(batch_requests, )) + self._add_to_queue(batch_requests) self.release() - def wait_for_flush(self, shutdown=False): - msg = 'Task.log.wait_for_flush: %d' - from ...utilities.os.lowlevel import threadpool_waited_join - ll = self.__log_stderr - ll(msg % 0) - self.acquire() - ll(msg % 1) - if self._thread_pool: - ll(msg % 2) - t = self._thread_pool - ll(msg % 3) - self._thread_pool = None - ll(msg % 4) - try: - ll(msg % 5) - t.terminate() - ll(msg % 6) - for i in range(int(self.__wait_for_flush_timeout)): - if self._pending <= 0: - break - self.__log_stderr('INFO: trains.Task - flushing console logs (timeout {}s)'.format( - float(self.__wait_for_flush_timeout-i))) - if threadpool_waited_join(t, timeout=1.0): - break - ll(msg % 7) - except Exception: - ll(msg % 8) - pass - if shutdown: - ll(msg % 9) - self._task_id = None - ll(msg % 10) - self.release() - ll(msg % 11) + def _add_to_queue(self, request): + if not self._queue: + self._queue = Queue() + self._exit_event = Event() + self._exit_event.clear() + # multiple workers could be supported as well + self._thread = Thread(target=self._daemon) + # self._thread.daemon = True + self._thread.start() + self._queue.put(request) def close(self, wait=False): - import os self.__log_stderr('Closing {} wait={}'.format(os.getpid(), wait)) - # super already calls self.flush() - super(TaskHandler, self).close() + # flush pending logs + if not self._task_id: + return + self.flush() # shut down the TaskHandler, from this point onwards. No events will be logged - if not wait: - self.acquire() - self._thread_pool = None - self._task_id = None - self.release() - else: - self.wait_for_flush(shutdown=True) + self.acquire() + _thread = self._thread + self._thread = None + if self._queue: + self._exit_event.set() + self._queue.put(None) + self._task_id = None + self.release() + if wait and _thread: + try: + _thread.join(timeout=self.__wait_for_flush_timeout) + self.__log_stderr('Closing {} wait done'.format(os.getpid())) + except: + pass + # call super and remove the handler + super(TaskHandler, self).close() def _send_events(self, a_request): try: - if self._thread_pool is None: - self.__log_stderr('INFO: trains.Task - ' + if self._thread is None: + self.__log_stderr('INFO: trains.log - ' 'Task.close() flushing remaining logs ({})'.format(self._pending)) self._pending -= 1 res = self.session.send(a_request) @@ -192,9 +190,26 @@ class TaskHandler(BufferingHandler): self.__log_stderr("WARNING: trains.log._send_events: Retrying, " "failed logging task to backend ({:d} lines): {}".format(len(a_request.requests), ex)) # we should push ourselves back into the thread pool - if self._thread_pool: + if self._queue: self._pending += 1 - self._thread_pool.apply_async(self._send_events, args=(a_request, )) + self._queue.put(a_request) + + def _daemon(self): + # multiple daemons are supported + leave = self._exit_event.wait(0) + request = True + while not leave or request: + # pull from queue + request = None + if self._queue: + try: + request = self._queue.get(block=not leave) + except: + pass + if request: + self._send_events(request) + leave = self._exit_event.wait(0) + self.__log_stderr('INFO: trains.log - leaving {}'.format(os.getpid())) @staticmethod def __log_stderr(t):