clearml/trains/backend_interface/task/log.py

284 lines
10 KiB
Python
Raw Normal View History

import json
2020-04-09 09:24:37 +00:00
import sys
2019-06-10 17:00:28 +00:00
import time
from pathlib2 import Path
from logging import LogRecord, getLogger, basicConfig, getLevelName, INFO, WARNING, Formatter, makeLogRecord, warning
2019-06-10 17:00:28 +00:00
from logging.handlers import BufferingHandler
from threading import Thread, Event
from six.moves.queue import Queue
2019-06-10 17:00:28 +00:00
from ...backend_api.services import events
from ...backend_api.session.session import MaxRequestSizeError
2019-06-10 17:00:28 +00:00
from ...config import config
buffer_capacity = config.get('log.task_log_buffer_capacity', 100)
class TaskHandler(BufferingHandler):
__flush_max_history_seconds = 30.
__wait_for_flush_timeout = 10.
2020-05-24 05:16:12 +00:00
__max_event_size = 1024 * 1024
2019-06-10 17:00:28 +00:00
__once = False
__offline_filename = 'log.jsonl'
2019-06-10 17:00:28 +00:00
@property
def task_id(self):
return self._task_id
@task_id.setter
def task_id(self, value):
self._task_id = value
def __init__(self, task, capacity=buffer_capacity):
2019-06-10 17:00:28 +00:00
super(TaskHandler, self).__init__(capacity)
self.task_id = task.id
self.session = task.session
2019-06-10 17:00:28 +00:00
self.last_timestamp = 0
self.counter = 1
self._last_event = None
self._exit_event = None
self._queue = None
self._thread = None
2020-04-09 09:24:37 +00:00
self._pending = 0
self._offline_log_filename = None
if task.is_offline():
offline_folder = Path(task.get_offline_mode_folder())
offline_folder.mkdir(parents=True, exist_ok=True)
self._offline_log_filename = offline_folder / self.__offline_filename
2019-06-10 17:00:28 +00:00
def shouldFlush(self, record):
"""
Should the handler flush its buffer?
Returns true if the buffer is up to capacity. This method can be
overridden to implement custom flushing strategies.
"""
2020-04-09 09:24:37 +00:00
if self._task_id is None:
return False
2019-06-10 17:00:28 +00:00
# if we need to add handlers to the base_logger,
# it will not automatically create stream one when first used, so we must manually configure it.
if not TaskHandler.__once:
base_logger = getLogger()
if len(base_logger.handlers) == 1 and isinstance(base_logger.handlers[0], TaskHandler):
if record.name != 'console' and not record.name.startswith('trains.'):
base_logger.removeHandler(self)
basicConfig()
base_logger.addHandler(self)
TaskHandler.__once = True
else:
TaskHandler.__once = True
# if we passed the max buffer
if len(self.buffer) >= self.capacity:
return True
# if the first entry in the log was too long ago.
2020-07-04 19:52:09 +00:00
# noinspection PyBroadException
try:
if len(self.buffer) and (time.time() - self.buffer[0].created) > self.__flush_max_history_seconds:
return True
2020-07-04 19:52:09 +00:00
except Exception:
pass
2019-06-10 17:00:28 +00:00
return False
def _record_to_event(self, record):
# type: (LogRecord) -> events.TaskLogEvent
2020-04-09 09:24:37 +00:00
if self._task_id is None:
return None
2019-06-10 17:00:28 +00:00
timestamp = int(record.created * 1000)
if timestamp == self.last_timestamp:
timestamp += self.counter
self.counter += 1
else:
self.last_timestamp = timestamp
self.counter = 1
2020-04-09 09:42:45 +00:00
# ignore backspaces (they are often used)
full_msg = record.getMessage().replace('\x08', '')
return_events = []
while full_msg:
msg = full_msg[:self.__max_event_size]
full_msg = full_msg[self.__max_event_size:]
# unite all records in a single second
if self._last_event and timestamp - self._last_event.timestamp < 1000 and \
len(self._last_event.msg) + len(msg) < self.__max_event_size and \
record.levelname.lower() == str(self._last_event.level):
# ignore backspaces (they are often used)
self._last_event.msg += '\n' + msg
continue
# if we have a previous event and it timed out, return it.
new_event = events.TaskLogEvent(
task=self.task_id,
timestamp=timestamp,
level=record.levelname.lower(),
worker=self.session.worker,
msg=msg
)
if self._last_event:
return_events.append(self._last_event)
2020-04-09 09:42:45 +00:00
self._last_event = new_event
return return_events
2019-06-10 17:00:28 +00:00
def flush(self):
2020-04-09 09:24:37 +00:00
if self._task_id is None:
return
2019-06-10 17:00:28 +00:00
if not self.buffer:
return
buffer = None
2019-06-10 17:00:28 +00:00
self.acquire()
if self.buffer:
buffer = self.buffer
self.buffer = []
self.release()
if not buffer:
2020-04-09 09:24:37 +00:00
return
# noinspection PyBroadException
2019-06-10 17:00:28 +00:00
try:
record_events = [r for record in buffer for r in self._record_to_event(record)] + [self._last_event]
2019-06-10 17:00:28 +00:00
self._last_event = None
batch_requests = events.AddBatchRequest(requests=[events.AddRequest(e) for e in record_events if e])
2019-06-10 17:00:28 +00:00
except Exception:
2020-04-09 09:42:45 +00:00
self.__log_stderr("WARNING: trains.log - Failed logging task to backend ({:d} lines)".format(len(buffer)))
batch_requests = None
if batch_requests:
2020-04-09 09:24:37 +00:00
self._pending += 1
self._add_to_queue(batch_requests)
def _create_thread_queue(self):
if self._queue:
return
self._queue = Queue()
self._exit_event = Event()
self._exit_event.clear()
# multiple workers could be supported as well
self._thread = Thread(target=self._daemon)
self._thread.daemon = True
self._thread.start()
2020-04-09 09:24:37 +00:00
def _add_to_queue(self, request):
self._create_thread_queue()
self._queue.put(request)
2020-04-09 09:24:37 +00:00
def close(self, wait=False):
# self.__log_stderr('Closing {} wait={}'.format(os.getpid(), wait))
# flush pending logs
if not self._task_id:
return
# avoid deadlocks just skip the lock, we are shutting down anyway
self.lock = None
self.flush()
2020-04-09 09:24:37 +00:00
# shut down the TaskHandler, from this point onwards. No events will be logged
_thread = self._thread
self._thread = None
if self._queue:
self._exit_event.set()
self._queue.put(None)
self._task_id = None
if wait and _thread:
2020-07-04 19:52:09 +00:00
# noinspection PyBroadException
try:
timeout = 1. if self._queue.empty() else self.__wait_for_flush_timeout
_thread.join(timeout=timeout)
if not self._queue.empty():
self.__log_stderr('Flush timeout {}s exceeded, dropping last {} lines'.format(
timeout, self._queue.qsize()))
# self.__log_stderr('Closing {} wait done'.format(os.getpid()))
2020-07-04 19:52:09 +00:00
except Exception:
pass
# call super and remove the handler
super(TaskHandler, self).close()
def _send_events(self, a_request):
try:
self._pending -= 1
if self._offline_log_filename:
with open(self._offline_log_filename.as_posix(), 'at') as f:
f.write(json.dumps([b.to_dict() for b in a_request.requests]) + '\n')
return
# if self._thread is None:
# self.__log_stderr('Task.close() flushing remaining logs ({})'.format(self._pending))
res = self.session.send(a_request)
if res and not res.ok():
self.__log_stderr("failed logging task to backend ({:d} lines, {})".format(
len(a_request.requests), str(res.meta)), level=WARNING)
except MaxRequestSizeError:
self.__log_stderr("failed logging task to backend ({:d} lines) log size exceeded limit".format(
len(a_request.requests)), level=WARNING)
except Exception as ex:
self.__log_stderr("Retrying, failed logging task to backend ({:d} lines): {}".format(
len(a_request.requests), ex))
# we should push ourselves back into the thread pool
if self._queue:
2020-04-09 09:24:37 +00:00
self._pending += 1
self._queue.put(a_request)
def _daemon(self):
# multiple daemons are supported
leave = self._exit_event.wait(0)
request = True
while not leave or request:
# pull from queue
request = None
if self._queue:
2020-07-04 19:52:09 +00:00
# noinspection PyBroadException
try:
request = self._queue.get(block=not leave)
2020-07-04 19:52:09 +00:00
except Exception:
pass
if request:
self._send_events(request)
leave = self._exit_event.wait(0)
# self.__log_stderr('leaving {}'.format(os.getpid()))
2020-04-09 09:24:37 +00:00
@staticmethod
def __log_stderr(msg, level=INFO):
# output directly to stderr, make sure we do not catch it.
2020-04-09 09:42:45 +00:00
write = sys.stderr._original_write if hasattr(sys.stderr, '_original_write') else sys.stderr.write
write('{asctime} - {name} - {levelname} - {message}\n'.format(
asctime=Formatter().formatTime(makeLogRecord({})),
name='trains.log', levelname=getLevelName(level), message=msg))
@classmethod
def report_offline_session(cls, task, folder):
filename = Path(folder) / cls.__offline_filename
if not filename.is_file():
return False
2020-08-08 09:47:08 +00:00
with open(filename.as_posix(), 'rt') as f:
i = 0
while True:
try:
line = f.readline()
if not line:
break
list_requests = json.loads(line)
for r in list_requests:
r.pop('task', None)
i += 1
except StopIteration:
break
except Exception as ex:
warning('Failed reporting log, line {} [{}]'.format(i, ex))
batch_requests = events.AddBatchRequest(
requests=[events.TaskLogEvent(task=task.id, **r) for r in list_requests])
res = task.session.send(batch_requests)
if res and not res.ok():
warning("failed logging task to backend ({:d} lines, {})".format(
len(batch_requests.requests), str(res.meta)))
return True