clearml/trains/backend_interface/task/log.py

203 lines
7.1 KiB
Python
Raw Normal View History

2020-04-09 09:24:37 +00:00
import sys
2019-06-10 17:00:28 +00:00
import time
from logging import LogRecord, getLogger, basicConfig
from logging.handlers import BufferingHandler
from multiprocessing.pool import ThreadPool
2019-06-10 17:00:28 +00:00
from ...backend_api.services import events
from ...config import config
buffer_capacity = config.get('log.task_log_buffer_capacity', 100)
class TaskHandler(BufferingHandler):
__flush_max_history_seconds = 30.
__wait_for_flush_timeout = int(10)
2019-06-10 17:00:28 +00:00
__once = False
@property
def task_id(self):
return self._task_id
@task_id.setter
def task_id(self, value):
self._task_id = value
def __init__(self, session, task_id, capacity=buffer_capacity):
super(TaskHandler, self).__init__(capacity)
self.task_id = task_id
self.session = session
self.last_timestamp = 0
self.counter = 1
self._last_event = None
2020-04-09 09:24:37 +00:00
self._thread_pool = None
self._pending = 0
2019-06-10 17:00:28 +00:00
def shouldFlush(self, record):
"""
Should the handler flush its buffer?
Returns true if the buffer is up to capacity. This method can be
overridden to implement custom flushing strategies.
"""
2020-04-09 09:24:37 +00:00
if self._task_id is None:
return False
2019-06-10 17:00:28 +00:00
# Notice! protect against infinite loops, i.e. flush while sending previous records
# if self.lock._is_owned():
# return False
# if we need to add handlers to the base_logger,
# it will not automatically create stream one when first used, so we must manually configure it.
if not TaskHandler.__once:
base_logger = getLogger()
if len(base_logger.handlers) == 1 and isinstance(base_logger.handlers[0], TaskHandler):
if record.name != 'console' and not record.name.startswith('trains.'):
base_logger.removeHandler(self)
basicConfig()
base_logger.addHandler(self)
TaskHandler.__once = True
else:
TaskHandler.__once = True
# if we passed the max buffer
if len(self.buffer) >= self.capacity:
return True
# if the first entry in the log was too long ago.
if len(self.buffer) and (time.time() - self.buffer[0].created) > self.__flush_max_history_seconds:
return True
return False
def _record_to_event(self, record):
# type: (LogRecord) -> events.TaskLogEvent
2020-04-09 09:24:37 +00:00
if self._task_id is None:
return None
2019-06-10 17:00:28 +00:00
timestamp = int(record.created * 1000)
if timestamp == self.last_timestamp:
timestamp += self.counter
self.counter += 1
else:
self.last_timestamp = timestamp
self.counter = 1
2020-04-09 09:42:45 +00:00
# ignore backspaces (they are often used)
msg = record.getMessage().replace('\x08', '')
2019-06-10 17:00:28 +00:00
# unite all records in a single second
if self._last_event and timestamp - self._last_event.timestamp < 1000 and \
record.levelname.lower() == str(self._last_event.level):
# ignore backspaces (they are often used)
2020-04-09 09:42:45 +00:00
self._last_event.msg += '\n' + msg
2019-06-10 17:00:28 +00:00
return None
self._last_event = events.TaskLogEvent(
task=self.task_id,
timestamp=timestamp,
level=record.levelname.lower(),
worker=self.session.worker,
2020-04-09 09:42:45 +00:00
msg=msg
2019-06-10 17:00:28 +00:00
)
return self._last_event
def flush(self):
2020-04-09 09:24:37 +00:00
if self._task_id is None:
return
2019-06-10 17:00:28 +00:00
if not self.buffer:
return
2019-06-10 17:00:28 +00:00
self.acquire()
2020-04-09 09:24:37 +00:00
if not self.buffer:
self.release()
return
2019-06-10 17:00:28 +00:00
buffer = self.buffer
2020-04-09 09:24:37 +00:00
self.buffer = []
2019-06-10 17:00:28 +00:00
try:
record_events = [self._record_to_event(record) for record in buffer]
self._last_event = None
batch_requests = events.AddBatchRequest(requests=[events.AddRequest(e) for e in record_events if e])
2019-06-10 17:00:28 +00:00
except Exception:
2020-04-09 09:42:45 +00:00
self.__log_stderr("WARNING: trains.log - Failed logging task to backend ({:d} lines)".format(len(buffer)))
batch_requests = None
if batch_requests:
2020-04-09 09:24:37 +00:00
if not self._thread_pool:
self._thread_pool = ThreadPool(processes=1)
self._pending += 1
self._thread_pool.apply_async(self._send_events, args=(batch_requests, ))
2020-04-09 09:24:37 +00:00
self.release()
def wait_for_flush(self, shutdown=False):
msg = 'Task.log.wait_for_flush: %d'
from ...utilities.os.lowlevel import threadpool_waited_join
2020-04-09 09:24:37 +00:00
ll = self.__log_stderr
ll(msg % 0)
self.acquire()
2020-04-09 09:24:37 +00:00
ll(msg % 1)
if self._thread_pool:
ll(msg % 2)
t = self._thread_pool
ll(msg % 3)
self._thread_pool = None
ll(msg % 4)
try:
ll(msg % 5)
t.terminate()
2020-04-09 09:24:37 +00:00
ll(msg % 6)
for i in range(int(self.__wait_for_flush_timeout)):
if self._pending <= 0:
break
self.__log_stderr('INFO: trains.Task - flushing console logs (timeout {}s)'.format(
float(self.__wait_for_flush_timeout-i)))
if threadpool_waited_join(t, timeout=1.0):
break
2020-04-09 09:24:37 +00:00
ll(msg % 7)
except Exception:
ll(msg % 8)
pass
if shutdown:
ll(msg % 9)
self._task_id = None
ll(msg % 10)
self.release()
2020-04-09 09:24:37 +00:00
ll(msg % 11)
def close(self, wait=False):
import os
self.__log_stderr('Closing {} wait={}'.format(os.getpid(), wait))
2020-04-09 09:24:37 +00:00
# super already calls self.flush()
super(TaskHandler, self).close()
# shut down the TaskHandler, from this point onwards. No events will be logged
if not wait:
self.acquire()
self._thread_pool = None
self._task_id = None
self.release()
else:
self.wait_for_flush(shutdown=True)
def _send_events(self, a_request):
try:
2020-04-09 09:24:37 +00:00
if self._thread_pool is None:
self.__log_stderr('INFO: trains.Task - '
'Task.close() flushing remaining logs ({})'.format(self._pending))
2020-04-09 09:24:37 +00:00
self._pending -= 1
res = self.session.send(a_request)
if not res.ok():
self.__log_stderr("WARNING: trains.log._send_events: failed logging task to backend "
2020-04-09 09:24:37 +00:00
"({:d} lines, {})".format(len(a_request.requests), str(res.meta)))
except Exception as ex:
self.__log_stderr("WARNING: trains.log._send_events: Retrying, "
2020-04-09 09:24:37 +00:00
"failed logging task to backend ({:d} lines): {}".format(len(a_request.requests), ex))
# we should push ourselves back into the thread pool
2020-04-09 09:24:37 +00:00
if self._thread_pool:
self._pending += 1
self._thread_pool.apply_async(self._send_events, args=(a_request, ))
@staticmethod
def __log_stderr(t):
2020-04-09 09:42:45 +00:00
write = sys.stderr._original_write if hasattr(sys.stderr, '_original_write') else sys.stderr.write
write(t + '\n')