renamed trains -> clearml

This commit is contained in:
allegroai
2020-12-22 21:17:56 +02:00
parent f27aed767c
commit a460df1e68
229 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
""" Debugging module """
from .timer import Timer
from .log import get_logger, get_null_logger, TqdmLog, add_options as add_log_options, \
apply_logging_args as parse_log_args, add_rotating_file_handler, add_time_rotating_file_handler
__all__ = ["Timer", "get_logger", "get_null_logger", "TqdmLog", "add_log_options", "parse_log_args",
"add_rotating_file_handler", "add_time_rotating_file_handler"]

197
clearml/debugging/log.py Normal file
View File

@@ -0,0 +1,197 @@
""" Logging convenience functions and wrappers """
import inspect
import logging
import logging.handlers
import os
import sys
from platform import system
from pathlib2 import Path
from six import BytesIO
default_level = logging.INFO
class _LevelRangeFilter(logging.Filter):
def __init__(self, min_level, max_level, name=''):
super(_LevelRangeFilter, self).__init__(name)
self.min_level = min_level
self.max_level = max_level
def filter(self, record):
return self.min_level <= record.levelno <= self.max_level
class LoggerRoot(object):
__base_logger = None
@classmethod
def _make_stream_handler(cls, level=None, stream=sys.stdout, colored=False):
ch = logging.StreamHandler(stream=stream)
ch.setLevel(level)
formatter = None
# if colored, try to import colorama & coloredlogs (by default, not in the requirements)
if colored:
try:
import colorama
from coloredlogs import ColoredFormatter
colorama.init()
formatter = ColoredFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
except ImportError:
colored = False
# if we don't need or failed getting colored formatter
if not colored or not formatter:
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
return ch
@classmethod
def get_base_logger(cls, level=None, stream=sys.stdout, colored=False):
if LoggerRoot.__base_logger:
return LoggerRoot.__base_logger
# avoid nested imports
from ..config import get_log_redirect_level
LoggerRoot.__base_logger = logging.getLogger('trains')
level = level if level is not None else default_level
LoggerRoot.__base_logger.setLevel(level)
redirect_level = get_log_redirect_level()
# Do not redirect to stderr if the target stream is already stderr
if redirect_level is not None and stream not in (None, sys.stderr):
# Adjust redirect level in case requested level is higher (e.g. logger is requested for CRITICAL
# and redirect is set for ERROR, in which case we redirect from CRITICAL)
redirect_level = max(level, redirect_level)
LoggerRoot.__base_logger.addHandler(
cls._make_stream_handler(redirect_level, sys.stderr, colored)
)
if level < redirect_level:
# Not all levels were redirected, remaining should be sent to requested stream
handler = cls._make_stream_handler(level, stream, colored)
handler.addFilter(_LevelRangeFilter(min_level=level, max_level=redirect_level - 1))
LoggerRoot.__base_logger.addHandler(handler)
else:
LoggerRoot.__base_logger.addHandler(
cls._make_stream_handler(level, stream, colored)
)
LoggerRoot.__base_logger.propagate = False
return LoggerRoot.__base_logger
@classmethod
def flush(cls):
if LoggerRoot.__base_logger:
for h in LoggerRoot.__base_logger.handlers:
h.flush()
def add_options(parser):
""" Add logging options to an argparse.ArgumentParser object """
level = logging.getLevelName(default_level)
parser.add_argument(
'--log-level', '-l', default=level, help='Log level (default is %s)' % level)
def apply_logging_args(args):
""" Apply logging args from an argparse.ArgumentParser parsed args """
global default_level
default_level = logging.getLevelName(args.log_level.upper())
def get_logger(path=None, level=None, stream=None, colored=False):
""" Get a python logging object named using the provided filename and preconfigured with a color-formatted
stream handler
"""
path = path or os.path.abspath((inspect.stack()[1])[1])
root_log = LoggerRoot.get_base_logger(level=default_level, stream=sys.stdout, colored=colored)
log = root_log.getChild(Path(path).stem)
if level is not None:
log.setLevel(level)
if stream:
ch = logging.StreamHandler(stream=stream)
if level is not None:
ch.setLevel(level)
log.propagate = True
return log
def _add_file_handler(logger, log_dir, fh, formatter=None):
""" Adds a file handler to a logger """
Path(log_dir).mkdir(parents=True, exist_ok=True)
if not formatter:
log_format = '%(asctime)s %(name)s x_x[%(levelname)s] %(message)s'
formatter = logging.Formatter(log_format)
fh.setFormatter(formatter)
logger.addHandler(fh)
def add_rotating_file_handler(logger, log_dir, log_file_prefix, max_bytes=10 * 1024 * 1024, backup_count=20,
formatter=None):
""" Create and add a rotating file handler to a logger """
fh = logging.handlers.RotatingFileHandler(
str(Path(log_dir) / ('%s.log' % log_file_prefix)), maxBytes=max_bytes, backupCount=backup_count)
_add_file_handler(logger, log_dir, fh, formatter)
def add_time_rotating_file_handler(logger, log_dir, log_file_prefix, when='midnight', formatter=None):
"""
Create and add a time rotating file handler to a logger.
Possible values for when are 'midnight', weekdays ('w0'-'W6', when 0 is Monday), and 's', 'm', 'h' amd 'd' for
seconds, minutes, hours and days respectively (case-insensitive)
"""
fh = logging.handlers.TimedRotatingFileHandler(
str(Path(log_dir) / ('%s.log' % log_file_prefix)), when=when)
_add_file_handler(logger, log_dir, fh, formatter)
def get_null_logger(name=None):
""" Get a logger with a null handler """
log = logging.getLogger(name if name else 'null')
if not log.handlers:
# avoid nested imports
from ..config import config
log.addHandler(logging.NullHandler())
log.propagate = config.get("log.null_log_propagate", False)
return log
class TqdmLog(object):
""" Tqdm (progressbar) wrapped logging class """
class _TqdmIO(BytesIO):
""" IO wrapper class for Tqdm """
def __init__(self, level=20, logger=None, *args, **kwargs):
self._log = logger or get_null_logger()
self._level = level
BytesIO.__init__(self, *args, **kwargs)
def write(self, buf):
self._buf = buf.strip('\r\n\t ')
def flush(self):
self._log.log(self._level, self._buf)
def __init__(self, total, desc='', log_level=20, ascii=False, logger=None, smoothing=0, mininterval=5, initial=0):
from tqdm import tqdm
self._io = self._TqdmIO(level=log_level, logger=logger)
self._tqdm = tqdm(total=total, desc=desc, file=self._io, ascii=ascii if not system() == 'Windows' else True,
smoothing=smoothing,
mininterval=mininterval, initial=initial)
def update(self, n=None):
if n is not None:
self._tqdm.update(n=n)
else:
self._tqdm.update()
def close(self):
self._tqdm.close()

112
clearml/debugging/timer.py Normal file
View File

@@ -0,0 +1,112 @@
""" Timing support """
import sys
import time
import six
class Timer(object):
"""A class implementing a simple timer, with a reset option """
def __init__(self):
self._start_time = 0.
self._diff = 0.
self._total_time = 0.
self._average_time = 0.
self._calls = 0
self.tic()
def reset(self):
self._start_time = 0.
self._diff = 0.
self.reset_average()
def reset_average(self):
""" Reset average counters (does not change current timer) """
self._total_time = 0
self._average_time = 0
self._calls = 0
def tic(self):
try:
# using time.time instead of time.clock because time time.clock
# does not normalize for multi threading
self._start_time = time.time()
except Exception:
pass
def toc(self, average=True):
self._diff = time.time() - self._start_time
self._total_time += self._diff
self._calls += 1
self._average_time = self._total_time / self._calls
if average:
return self._average_time
else:
return self._diff
@property
def average_time(self):
return self._average_time
@property
def total_time(self):
return self._total_time
def toc_with_reset(self, average=True, reset_if_calls=1000):
""" Enable toc with reset (slightly inaccurate if reset event occurs) """
if self._calls > reset_if_calls:
last_diff = time.time() - self._start_time
self._start_time = time.time()
self._total_time = last_diff
self._average_time = 0
self._calls = 0
return self.toc(average=average)
class TimersMixin(object):
def __init__(self):
self._timers = {}
def add_timers(self, *names):
for name in names:
self.add_timer(name)
def add_timer(self, name, timer=None):
if name in self._timers:
raise ValueError('timer %s already exists' % name)
timer = timer or Timer()
self._timers[name] = timer
return timer
def get_timer(self, name, default=None):
return self._timers.get(name, default)
def get_timers(self):
return self._timers
def _call_timer(self, name, callable, silent_fail=False):
try:
return callable(self._timers[name])
except KeyError:
if not silent_fail:
six.reraise(*sys.exc_info())
def reset_timers(self, *names):
for name in names:
self._call_timer(name, lambda t: t.reset())
def reset_average_timers(self, *names):
for name in names:
self._call_timer(name, lambda t: t.reset_average())
def tic_timers(self, *names):
for name in names:
self._call_timer(name, lambda t: t.tic())
def toc_timers(self, *names):
return [self._call_timer(name, lambda t: t.toc()) for name in names]
def toc_with_reset_timer(self, name, average=True, reset_if_calls=1000):
return self._call_timer(name, lambda t: t.toc_with_reset(average, reset_if_calls))

349
clearml/debugging/trace.py Normal file
View File

@@ -0,0 +1,349 @@
import os
import sys
import threading
import inspect
import time
import zipfile
__stream_write = None
__stream_flush = None
__patched_trace = False
__trace_level = 1
__trace_start = 0
__thread_id = None
__thread_so = None
def _thread_linux_id():
# System dependent, see e.g. /usr/include/x86_64-linux-gnu/asm/unistd_64.h (system call 186)
return __thread_so.syscall(186)
def _thread_py_id():
# return threading.get_ident()
return zipfile.crc32(int(threading.get_ident()).to_bytes(8, 'little'))
def _log_stderr(name, fnc, args, kwargs, is_return):
global __stream_write, __stream_flush, __trace_level, __trace_start, __thread_id
# noinspection PyBroadException
try:
if is_return and __trace_level not in (-1, -2):
return
if __trace_level not in (1, 2, -1, -2):
return
fnc_address = str(fnc).split(' at ')
fnc_address = '{}'.format(fnc_address[-1].replace('>', '')) if len(fnc_address) > 1 else ''
if __trace_level == 1 or __trace_level == -1:
t = '{:14} {}'.format(fnc_address, name)
elif __trace_level == 2 or __trace_level == -2:
a_args = str(args)[1:-1] if args else ''
a_kwargs = ' {}'.format(kwargs) if kwargs else ''
t = '{:14} {} ({}{})'.format(fnc_address, name, a_args, a_kwargs)
# get a nicer thread id
h = int(__thread_id())
ts = time.time() - __trace_start
__stream_write('{}{:<9.3f}:{:5}:{:8x}: [{}] {}\n'.format(
'-' if is_return else '', ts, os.getpid(),
h, threading.current_thread().name, t))
if __stream_flush:
__stream_flush()
except Exception:
pass
def _traced_call_method(name, fnc):
def _traced_call_int(self, *args, **kwargs):
_log_stderr(name, fnc, args, kwargs, False)
r = None
try:
ret = fnc(self, *args, **kwargs)
except Exception as ex:
r = ex
_log_stderr(name, fnc, args, kwargs, True)
if r:
raise r
return ret
return _traced_call_int
def _traced_call_cls(name, fnc):
class WrapperClass(object):
@classmethod
def _traced_call_int(cls, *args, **kwargs):
_log_stderr(name, fnc, args, kwargs, False)
r = None
try:
ret = fnc(*args, **kwargs)
except Exception as ex:
r = ex
_log_stderr(name, fnc, args, kwargs, True)
if r:
raise r
return ret
return WrapperClass.__dict__['_traced_call_int']
def _traced_call_static(name, fnc):
class WrapperStatic(object):
@staticmethod
def _traced_call_int(*args, **kwargs):
_log_stderr(name, fnc, args, kwargs, False)
r = None
try:
ret = fnc(*args, **kwargs)
except Exception as ex:
r = ex
_log_stderr(name, fnc, args, kwargs, True)
if r:
raise r
return ret
return WrapperStatic.__dict__['_traced_call_int']
def _traced_call_func(name, fnc):
def _traced_call_int(*args, **kwargs):
_log_stderr(name, fnc, args, kwargs, False)
r = None
try:
ret = fnc(*args, **kwargs)
except Exception as ex:
r = ex
_log_stderr(name, fnc, args, kwargs, True)
if r:
raise r
return ret
return _traced_call_int
def _patch_module(module, prefix='', basepath=None, basemodule=None, exclude_prefixes=[], only_prefix=[]):
if isinstance(module, str):
if basemodule is None:
basemodule = module + '.'
import importlib
importlib.import_module(module)
module = sys.modules.get(module)
if not module:
return
if not basepath:
basepath = os.path.sep.join(module.__file__.split(os.path.sep)[:-1]) + os.path.sep
# only sub modules
if not hasattr(module, '__file__') or (inspect.ismodule(module) and not module.__file__.startswith(basepath)):
if hasattr(module, '__module__') and module.__module__.startswith(basemodule):
# this is one of ours
pass
else:
# print('Skipping: {}'.format(module))
return
# Do not patch ourselves
if hasattr(module, '__file__') and module.__file__ == __file__:
return
prefix += module.__name__.split('.')[-1] + '.'
# Do not patch low level network layer
if prefix.startswith('trains.backend_api.session.') and prefix != 'trains.backend_api.session.':
if not prefix.endswith('.Session.') and '.token_manager.' not in prefix:
# print('SKIPPING: {}'.format(prefix))
return
if prefix.startswith('trains.backend_api.services.'):
return
for skip in exclude_prefixes:
if prefix.startswith(skip):
return
for fn in (m for m in dir(module) if not m.startswith('__')):
if fn in ('schema_property') or fn.startswith('_PostImportHookPatching__'):
continue
# noinspection PyBroadException
try:
fnc = getattr(module, fn)
except Exception:
continue
if inspect.ismodule(fnc):
_patch_module(fnc, prefix=prefix, basepath=basepath, basemodule=basemodule,
exclude_prefixes=exclude_prefixes, only_prefix=only_prefix)
elif inspect.isclass(fnc):
_patch_module(fnc, prefix=prefix, basepath=basepath, basemodule=basemodule,
exclude_prefixes=exclude_prefixes, only_prefix=only_prefix)
elif inspect.isroutine(fnc):
if only_prefix and all(p not in (prefix+str(fn)) for p in only_prefix):
continue
for skip in exclude_prefixes:
if (prefix+str(fn)).startswith(skip):
continue
# _log_stderr('Patching: {}'.format(prefix+fn))
if inspect.isclass(module):
# check if this is even in our module
if hasattr(fnc, '__module__') and fnc.__module__ != module.__module__:
pass # print('not ours {} {}'.format(module, fnc))
elif hasattr(fnc, '__qualname__') and fnc.__qualname__.startswith(module.__name__ + '.'):
if isinstance(module.__dict__[fn], classmethod):
setattr(module, fn, _traced_call_cls(prefix + fn, fnc))
elif isinstance(module.__dict__[fn], staticmethod):
setattr(module, fn, _traced_call_static(prefix + fn, fnc))
else:
setattr(module, fn, _traced_call_method(prefix + fn, fnc))
else:
# probably not ours hopefully static function
if hasattr(fnc, '__qualname__') and not fnc.__qualname__.startswith(module.__name__ + '.'):
pass # print('not ours {} {}'.format(module, fnc))
else:
# we should not get here
setattr(module, fn, _traced_call_static(prefix + fn, fnc))
elif inspect.ismodule(module):
setattr(module, fn, _traced_call_func(prefix + fn, fnc))
else:
# we should not get here
setattr(module, fn, _traced_call_func(prefix + fn, fnc))
def trace_trains(stream=None, level=1, exclude_prefixes=[], only_prefix=[]):
"""
DEBUG ONLY - Add full Trains package code trace
Output trace to filename or stream, default is sys.stderr
Trace level
-2: Trace function and arguments and returned call
-1: Trace function call (no arguments) and returned call
0: Trace disabled
1: Trace function call (no arguments). This is the default
2: Trace function and arguments
:param stream: stream or filename for trace log (default stderr)
:param int level: Trace level
"""
global __patched_trace, __stream_write, __stream_flush, __trace_level, __trace_start, __thread_id, __thread_so
__trace_level = level
if __patched_trace:
return
__patched_trace = True
if not __thread_id:
if sys.platform == 'linux':
import ctypes
__thread_so = ctypes.cdll.LoadLibrary('libc.so.6')
__thread_id = _thread_linux_id
else:
__thread_id = _thread_py_id
stderr_write = sys.stderr._original_write if hasattr(sys.stderr, '_original_write') else sys.stderr.write
if stream:
if isinstance(stream, str):
stream = open(stream, 'w')
__stream_write = stream.write
__stream_flush = stream.flush
else:
__stream_write = stderr_write
__stream_flush = None
from ..version import __version__
msg = 'Trains v{} - Starting Trace\n\n'.format(__version__)
# print to actual stderr
stderr_write(msg)
# store to stream
__stream_write(msg)
__stream_write('{:9}:{:5}:{:8}: {:14}\n'.format('seconds', 'pid', 'tid', 'self'))
__stream_write('{:9}:{:5}:{:8}:{:15}\n'.format('-' * 9, '-' * 5, '-' * 8, '-' * 15))
__trace_start = time.time()
_patch_module('trains', exclude_prefixes=exclude_prefixes or [], only_prefix=only_prefix or [])
def trace_level(level=1):
"""
Set trace level
-2: Trace function and arguments and returned call
-1: Trace function call (no arguments) and returned call
0: Trace disabled
1: Trace function call (no arguments). This is the default
2: Trace function and arguments
:param int level: Trace level
:return: True if trace level changed
"""
global __patched_trace, __trace_level
if not __patched_trace:
return False
__trace_level = level
return True
def print_traced_files(glob_mask, lines_per_tid=5, stream=sys.stdout, specify_pids=None):
"""
Collect trace lines from files (glob mask), sort by pid/tid and print ordered by time
:param glob_mask: file list to process ('*.txt')
:param lines_per_tid: number of lines per pid/tid to print
:param stream: output file stream, can accept file stream or filename(str). default is sys.stdout
:param specify_pids: optional list of pids to include
"""
from glob import glob
def hash_line(a_line):
return hash(':'.join(a_line.split(':')[1:]))
pids = {}
orphan_calls = set()
print_orphans = False
for fname in glob(glob_mask, recursive=False):
with open(fname, 'rt') as fd:
lines = fd.readlines()
for line in lines:
# noinspection PyBroadException
try:
_, pid, tid = line.split(':')[:3]
pid = int(pid)
except Exception:
continue
if specify_pids and pid not in specify_pids:
continue
if line.startswith('-'):
print_orphans = True
line = line[1:]
h = hash_line(line)
if h in orphan_calls:
orphan_calls.remove(h)
continue
else:
h = hash_line(line)
orphan_calls.add(h)
tids = pids.get(pid) if pid in pids else {}
tids[tid] = (tids.get(tid, []) + [line])[-lines_per_tid:]
pids[pid] = tids
# sort by time stamp
by_time = {}
for p, tids in pids.items():
for t, lines in tids.items():
ts = float(lines[-1].split(':')[0].strip()) + 0.000001 * len(by_time)
if print_orphans:
for i, line in enumerate(lines):
if i > 0 and hash_line(line) in orphan_calls:
lines[i] = ' ### Orphan ### {}'.format(line)
by_time[ts] = ''.join(lines) + '\n'
out_stream = open(stream, 'w') if isinstance(stream, str) else stream
for k in sorted(by_time.keys()):
out_stream.write(by_time[k] + '\n')
if isinstance(stream, str):
out_stream.close()
def end_of_program():
# stub
pass
if __name__ == '__main__':
# from trains import Task
# task = Task.init(project_name="examples", task_name="trace test")
# trace_trains('_trace.txt', level=2)
print_traced_files('_trace_*.txt', lines_per_tid=10)