2020-04-09 09:56:02 +00:00
|
|
|
import os
|
2020-04-09 09:24:37 +00:00
|
|
|
import sys
|
2019-06-10 17:00:28 +00:00
|
|
|
import time
|
|
|
|
from logging import LogRecord, getLogger, basicConfig
|
|
|
|
from logging.handlers import BufferingHandler
|
2020-04-09 09:56:02 +00:00
|
|
|
from threading import Thread, Event
|
|
|
|
from six.moves.queue import Queue
|
2019-06-10 17:00:28 +00:00
|
|
|
|
|
|
|
from ...backend_api.services import events
|
|
|
|
from ...config import config
|
|
|
|
|
|
|
|
buffer_capacity = config.get('log.task_log_buffer_capacity', 100)
|
|
|
|
|
|
|
|
|
|
|
|
class TaskHandler(BufferingHandler):
|
|
|
|
__flush_max_history_seconds = 30.
|
2020-04-09 09:56:02 +00:00
|
|
|
__wait_for_flush_timeout = 10.
|
2019-06-10 17:00:28 +00:00
|
|
|
__once = False
|
|
|
|
|
|
|
|
@property
|
|
|
|
def task_id(self):
|
|
|
|
return self._task_id
|
|
|
|
|
|
|
|
@task_id.setter
|
|
|
|
def task_id(self, value):
|
|
|
|
self._task_id = value
|
|
|
|
|
|
|
|
def __init__(self, session, task_id, capacity=buffer_capacity):
|
|
|
|
super(TaskHandler, self).__init__(capacity)
|
|
|
|
self.task_id = task_id
|
|
|
|
self.session = session
|
|
|
|
self.last_timestamp = 0
|
|
|
|
self.counter = 1
|
|
|
|
self._last_event = None
|
2020-04-09 09:56:02 +00:00
|
|
|
self._exit_event = None
|
|
|
|
self._queue = None
|
|
|
|
self._thread = None
|
2020-04-09 09:24:37 +00:00
|
|
|
self._pending = 0
|
2019-06-10 17:00:28 +00:00
|
|
|
|
|
|
|
def shouldFlush(self, record):
|
|
|
|
"""
|
|
|
|
Should the handler flush its buffer?
|
|
|
|
|
|
|
|
Returns true if the buffer is up to capacity. This method can be
|
|
|
|
overridden to implement custom flushing strategies.
|
|
|
|
"""
|
2020-04-09 09:24:37 +00:00
|
|
|
if self._task_id is None:
|
|
|
|
return False
|
2019-06-10 17:00:28 +00:00
|
|
|
# Notice! protect against infinite loops, i.e. flush while sending previous records
|
|
|
|
# if self.lock._is_owned():
|
|
|
|
# return False
|
|
|
|
|
|
|
|
# if we need to add handlers to the base_logger,
|
|
|
|
# it will not automatically create stream one when first used, so we must manually configure it.
|
|
|
|
if not TaskHandler.__once:
|
|
|
|
base_logger = getLogger()
|
|
|
|
if len(base_logger.handlers) == 1 and isinstance(base_logger.handlers[0], TaskHandler):
|
|
|
|
if record.name != 'console' and not record.name.startswith('trains.'):
|
|
|
|
base_logger.removeHandler(self)
|
|
|
|
basicConfig()
|
|
|
|
base_logger.addHandler(self)
|
|
|
|
TaskHandler.__once = True
|
|
|
|
else:
|
|
|
|
TaskHandler.__once = True
|
|
|
|
|
|
|
|
# if we passed the max buffer
|
|
|
|
if len(self.buffer) >= self.capacity:
|
|
|
|
return True
|
|
|
|
|
|
|
|
# if the first entry in the log was too long ago.
|
2020-04-09 09:56:02 +00:00
|
|
|
try:
|
|
|
|
if len(self.buffer) and (time.time() - self.buffer[0].created) > self.__flush_max_history_seconds:
|
|
|
|
return True
|
|
|
|
except:
|
|
|
|
pass
|
2019-06-10 17:00:28 +00:00
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
def _record_to_event(self, record):
|
|
|
|
# type: (LogRecord) -> events.TaskLogEvent
|
2020-04-09 09:24:37 +00:00
|
|
|
if self._task_id is None:
|
|
|
|
return None
|
2019-06-10 17:00:28 +00:00
|
|
|
timestamp = int(record.created * 1000)
|
|
|
|
if timestamp == self.last_timestamp:
|
|
|
|
timestamp += self.counter
|
|
|
|
self.counter += 1
|
|
|
|
else:
|
|
|
|
self.last_timestamp = timestamp
|
|
|
|
self.counter = 1
|
|
|
|
|
2020-04-09 09:42:45 +00:00
|
|
|
# ignore backspaces (they are often used)
|
|
|
|
msg = record.getMessage().replace('\x08', '')
|
|
|
|
|
2019-06-10 17:00:28 +00:00
|
|
|
# unite all records in a single second
|
|
|
|
if self._last_event and timestamp - self._last_event.timestamp < 1000 and \
|
|
|
|
record.levelname.lower() == str(self._last_event.level):
|
|
|
|
# ignore backspaces (they are often used)
|
2020-04-09 09:42:45 +00:00
|
|
|
self._last_event.msg += '\n' + msg
|
2019-06-10 17:00:28 +00:00
|
|
|
return None
|
|
|
|
|
2020-04-09 09:56:02 +00:00
|
|
|
# if we have a previous event and it timed out, return it.
|
|
|
|
new_event = events.TaskLogEvent(
|
2019-06-10 17:00:28 +00:00
|
|
|
task=self.task_id,
|
|
|
|
timestamp=timestamp,
|
|
|
|
level=record.levelname.lower(),
|
|
|
|
worker=self.session.worker,
|
2020-04-09 09:42:45 +00:00
|
|
|
msg=msg
|
2019-06-10 17:00:28 +00:00
|
|
|
)
|
2020-04-09 09:56:02 +00:00
|
|
|
if self._last_event:
|
|
|
|
event = self._last_event
|
|
|
|
self._last_event = new_event
|
|
|
|
return event
|
|
|
|
|
|
|
|
self._last_event = new_event
|
|
|
|
return None
|
2019-06-10 17:00:28 +00:00
|
|
|
|
|
|
|
def flush(self):
|
2020-04-09 09:24:37 +00:00
|
|
|
if self._task_id is None:
|
|
|
|
return
|
|
|
|
|
2019-06-10 17:00:28 +00:00
|
|
|
if not self.buffer:
|
|
|
|
return
|
2019-08-19 18:17:53 +00:00
|
|
|
|
2019-06-10 17:00:28 +00:00
|
|
|
self.acquire()
|
2020-04-09 09:24:37 +00:00
|
|
|
if not self.buffer:
|
|
|
|
self.release()
|
|
|
|
return
|
2019-06-10 17:00:28 +00:00
|
|
|
buffer = self.buffer
|
2020-04-09 09:24:37 +00:00
|
|
|
self.buffer = []
|
2019-06-10 17:00:28 +00:00
|
|
|
try:
|
2020-04-09 09:56:02 +00:00
|
|
|
record_events = [self._record_to_event(record) for record in buffer] + [self._last_event]
|
2019-06-10 17:00:28 +00:00
|
|
|
self._last_event = None
|
2019-08-19 18:17:53 +00:00
|
|
|
batch_requests = events.AddBatchRequest(requests=[events.AddRequest(e) for e in record_events if e])
|
2019-06-10 17:00:28 +00:00
|
|
|
except Exception:
|
2020-04-09 09:42:45 +00:00
|
|
|
self.__log_stderr("WARNING: trains.log - Failed logging task to backend ({:d} lines)".format(len(buffer)))
|
2019-08-19 18:17:53 +00:00
|
|
|
batch_requests = None
|
|
|
|
|
|
|
|
if batch_requests:
|
2020-04-09 09:24:37 +00:00
|
|
|
self._pending += 1
|
2020-04-09 09:56:02 +00:00
|
|
|
self._add_to_queue(batch_requests)
|
2019-08-19 18:17:53 +00:00
|
|
|
|
2020-04-09 09:24:37 +00:00
|
|
|
self.release()
|
|
|
|
|
2020-04-09 09:56:02 +00:00
|
|
|
def _add_to_queue(self, request):
|
|
|
|
if not self._queue:
|
|
|
|
self._queue = Queue()
|
|
|
|
self._exit_event = Event()
|
|
|
|
self._exit_event.clear()
|
|
|
|
# multiple workers could be supported as well
|
|
|
|
self._thread = Thread(target=self._daemon)
|
2020-04-09 09:56:55 +00:00
|
|
|
self._thread.daemon = True
|
2020-04-09 09:56:02 +00:00
|
|
|
self._thread.start()
|
|
|
|
self._queue.put(request)
|
2020-04-09 09:24:37 +00:00
|
|
|
|
2020-04-09 09:39:09 +00:00
|
|
|
def close(self, wait=False):
|
2020-04-09 09:51:34 +00:00
|
|
|
self.__log_stderr('Closing {} wait={}'.format(os.getpid(), wait))
|
2020-04-09 09:56:02 +00:00
|
|
|
# flush pending logs
|
|
|
|
if not self._task_id:
|
|
|
|
return
|
|
|
|
self.flush()
|
2020-04-09 09:24:37 +00:00
|
|
|
# shut down the TaskHandler, from this point onwards. No events will be logged
|
2020-04-09 09:56:02 +00:00
|
|
|
self.acquire()
|
|
|
|
_thread = self._thread
|
|
|
|
self._thread = None
|
|
|
|
if self._queue:
|
|
|
|
self._exit_event.set()
|
|
|
|
self._queue.put(None)
|
|
|
|
self._task_id = None
|
|
|
|
self.release()
|
|
|
|
if wait and _thread:
|
|
|
|
try:
|
|
|
|
_thread.join(timeout=self.__wait_for_flush_timeout)
|
|
|
|
self.__log_stderr('Closing {} wait done'.format(os.getpid()))
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
# call super and remove the handler
|
|
|
|
super(TaskHandler, self).close()
|
2020-01-21 14:41:01 +00:00
|
|
|
|
2019-08-19 18:17:53 +00:00
|
|
|
def _send_events(self, a_request):
|
|
|
|
try:
|
2020-04-09 09:56:02 +00:00
|
|
|
if self._thread is None:
|
|
|
|
self.__log_stderr('INFO: trains.log - '
|
2020-04-09 09:46:30 +00:00
|
|
|
'Task.close() flushing remaining logs ({})'.format(self._pending))
|
2020-04-09 09:24:37 +00:00
|
|
|
self._pending -= 1
|
2019-08-19 18:17:53 +00:00
|
|
|
res = self.session.send(a_request)
|
|
|
|
if not res.ok():
|
2020-04-09 09:27:13 +00:00
|
|
|
self.__log_stderr("WARNING: trains.log._send_events: failed logging task to backend "
|
2020-04-09 09:24:37 +00:00
|
|
|
"({:d} lines, {})".format(len(a_request.requests), str(res.meta)))
|
2020-02-26 15:07:07 +00:00
|
|
|
except Exception as ex:
|
2020-04-09 09:27:13 +00:00
|
|
|
self.__log_stderr("WARNING: trains.log._send_events: Retrying, "
|
2020-04-09 09:24:37 +00:00
|
|
|
"failed logging task to backend ({:d} lines): {}".format(len(a_request.requests), ex))
|
2020-02-26 15:07:07 +00:00
|
|
|
# we should push ourselves back into the thread pool
|
2020-04-09 09:56:02 +00:00
|
|
|
if self._queue:
|
2020-04-09 09:24:37 +00:00
|
|
|
self._pending += 1
|
2020-04-09 09:56:02 +00:00
|
|
|
self._queue.put(a_request)
|
|
|
|
|
|
|
|
def _daemon(self):
|
|
|
|
# multiple daemons are supported
|
|
|
|
leave = self._exit_event.wait(0)
|
|
|
|
request = True
|
|
|
|
while not leave or request:
|
|
|
|
# pull from queue
|
|
|
|
request = None
|
|
|
|
if self._queue:
|
|
|
|
try:
|
|
|
|
request = self._queue.get(block=not leave)
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
if request:
|
|
|
|
self._send_events(request)
|
|
|
|
leave = self._exit_event.wait(0)
|
|
|
|
self.__log_stderr('INFO: trains.log - leaving {}'.format(os.getpid()))
|
2020-04-09 09:24:37 +00:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def __log_stderr(t):
|
2020-04-09 09:42:45 +00:00
|
|
|
write = sys.stderr._original_write if hasattr(sys.stderr, '_original_write') else sys.stderr.write
|
|
|
|
write(t + '\n')
|