1
0
mirror of https://github.com/clearml/clearml synced 2025-03-08 04:52:47 +00:00

Add Task.init() deferred_init capture all console outputs

This commit is contained in:
allegroai 2022-05-05 12:09:46 +03:00
parent a4e1bffe85
commit 4ce0e4faf3
4 changed files with 162 additions and 19 deletions
clearml

View File

@ -14,12 +14,14 @@ class StdStreamPatch(object):
_stderr_original_write = None
@staticmethod
def patch_std_streams(a_logger, connect_stdout=True, connect_stderr=True):
def patch_std_streams(a_logger, connect_stdout=True, connect_stderr=True, load_config_defaults=True):
if (connect_stdout or connect_stderr) and not PrintPatchLogger.patched and \
(not running_remotely() or DEBUG_SIMULATE_REMOTE_TASK.get()):
StdStreamPatch._stdout_proxy = PrintPatchLogger(sys.stdout, a_logger, level=logging.INFO) \
StdStreamPatch._stdout_proxy = PrintPatchLogger(
sys.stdout, a_logger, level=logging.INFO, load_config_defaults=load_config_defaults) \
if connect_stdout else None
StdStreamPatch._stderr_proxy = PrintPatchLogger(sys.stderr, a_logger, level=logging.ERROR) \
StdStreamPatch._stderr_proxy = PrintPatchLogger(
sys.stderr, a_logger, level=logging.ERROR, load_config_defaults=load_config_defaults) \
if connect_stderr else None
if StdStreamPatch._stdout_proxy:
@ -178,8 +180,8 @@ class PrintPatchLogger(object):
recursion_protect_lock = threading.RLock()
cr_flush_period = None
def __init__(self, stream, logger=None, level=logging.INFO):
if PrintPatchLogger.cr_flush_period is None:
def __init__(self, stream, logger=None, level=logging.INFO, load_config_defaults=True):
if load_config_defaults and PrintPatchLogger.cr_flush_period is None:
PrintPatchLogger.cr_flush_period = config.get("development.worker.console_cr_flush_period", 0)
PrintPatchLogger.patched = True
self._terminal = stream
@ -249,6 +251,20 @@ class PrintPatchLogger(object):
self._terminal.write(message)
def connect(self, logger):
# refresh if needed
if PrintPatchLogger.cr_flush_period is None:
PrintPatchLogger.cr_flush_period = config.get("development.worker.console_cr_flush_period", 0)
# if we had a previous log object, call flush before switching
if self._log and hasattr(self._log, '_flush_into_logger'):
# since we are not sure how flush should be called, we protect it
# noinspection PyBroadException
try:
# noinspection PyProtectedMember
self._log._flush_into_logger(a_future_func=logger)
except Exception:
pass
self._cur_line = ''
self._log = logger

View File

@ -79,6 +79,7 @@ class Logger(object):
if private_task.is_main_task() or (connect_stdout or connect_stderr or connect_logging) else None
self._connect_std_streams = connect_stdout or connect_stderr
self._connect_logging = connect_logging
self._default_max_sample_history = None
# Make sure urllib is never in debug/info,
disable_urllib3_info = config.get('log.disable_urllib3_info', True)
@ -95,8 +96,6 @@ class Logger(object):
if base_logger and base_logger.handlers:
StdStreamPatch.patch_logging_formatter(self, base_logger.handlers[0])
self._default_max_sample_history = None
@classmethod
def current_logger(cls):
# type: () -> Logger

View File

@ -77,7 +77,7 @@ from .utilities.seed import make_deterministic
from .utilities.lowlevel.threads import get_current_thread_id
from .utilities.process.mp import BackgroundMonitor, leave_process
from .utilities.matching import matches_any_wildcard
from .utilities.future_caller import FutureCaller
from .utilities.future_caller import FutureTaskCaller
# noinspection PyProtectedMember
from .backend_interface.task.args import _Arguments
@ -373,7 +373,7 @@ class Task(_Task):
finer control or wildcard strings.
In case of wildcard strings, the local path of a model file has to match at least one wildcard to be
saved/loaded by ClearML. Example:
{'pytorch' : '*.pt', 'tensorflow': '*'}
{'pytorch' : '*.pt', 'tensorflow': ['*.h5', '*']}
Keys missing from the dictionary default to ``True``, and an empty dictionary defaults to ``False``.
Supported keys for finer control:
{'tensorboard': {'report_hparams': bool}} # whether to report TensorBoard hyperparameters
@ -548,7 +548,7 @@ class Task(_Task):
def completed_cb(x):
Task.__main_task = x
task = FutureCaller(
task = FutureTaskCaller(
func=cls.init,
func_cb=completed_cb,
override_cls=cls,
@ -676,6 +676,22 @@ class Task(_Task):
# if we are deferred, stop here (the rest we do in the actual init)
if is_deferred:
from .backend_interface.logger import StdStreamPatch
# patch console outputs, we will keep them in memory until we complete the Task init
# notice we do not load config defaults, as they are not threadsafe
# we might also need to override them with the vault
StdStreamPatch.patch_std_streams(
task.get_logger(),
connect_stdout=(
auto_connect_streams is True) or (
isinstance(auto_connect_streams, dict) and auto_connect_streams.get('stdout', False)
),
connect_stderr=(
auto_connect_streams is True) or (
isinstance(auto_connect_streams, dict) and auto_connect_streams.get('stderr', False)
),
load_config_defaults=False,
)
return task # noqa
if auto_resource_monitoring and not is_sub_process_task_id:
@ -4004,6 +4020,8 @@ class Task(_Task):
WeightsFileHandler.model_wildcards[k] = [str(i) for i in v]
def callback(_, model_info):
if not model_info:
return None
parents = Framework.get_framework_parents(model_info.framework)
wildcards = []
for parent in parents:

View File

@ -1,20 +1,93 @@
from copy import deepcopy
from time import sleep
from six.moves.queue import Queue, Empty
from threading import Thread
from typing import Any, Callable, Optional, Type
class FutureCaller(object):
class _DeferredClass(object):
__slots__ = ('__queue', '__future_caller', '__future_func')
def __init__(self, a_future_caller, future_func):
self.__queue = Queue()
self.__future_caller = a_future_caller
self.__future_func = future_func
def __nested_caller(self, item, args, kwargs):
# wait until object is constructed
getattr(self.__future_caller, "id") # noqa
future_func = getattr(self.__future_caller, self.__future_func)
the_object = future_func()
the_object_func = getattr(the_object, item)
return the_object_func(*args, **kwargs)
def _flush_into_logger(self, a_future_object=None, a_future_func=None):
self.__close_queue(a_future_object=a_future_object, a_future_func=a_future_func)
def __close_queue(self, a_future_object=None, a_future_func=None):
# call this function when we Know the object is initialization is completed
if self.__queue is None:
return
_queue = self.__queue
self.__queue = None
while True:
# noinspection PyBroadException
try:
item, args, kwargs = _queue.get(block=False)
if a_future_object:
future_func = getattr(a_future_object, self.__future_func)
the_object = future_func()
the_object_func = getattr(the_object, item)
the_object_func(*args, **kwargs)
elif a_future_func:
the_object_func = getattr(a_future_func, item)
the_object_func(*args, **kwargs)
else:
self.__nested_caller(item, args, kwargs)
except Empty:
break
except Exception:
# import traceback
# stdout_print(''.join(traceback.format_exc()))
pass
def __getattr__(self, item):
def _caller(*args, **kwargs):
# if we already completed the background initialization, call functions immediately
# noinspection PyProtectedMember
if not self.__queue or self.__future_caller._FutureTaskCaller__executor is None:
return self.__nested_caller(item, args, kwargs)
# noinspection PyBroadException
try:
# if pool is still active call async
self.__queue.put((item, deepcopy(args) if args else args, deepcopy(kwargs) if kwargs else kwargs))
except Exception:
# assume we wait only if self.__pool was nulled between the if and now, so just call directly
return self.__nested_caller(item, args, kwargs)
# let's hope it is the right one
return True
return _caller
class FutureTaskCaller(object):
"""
FutureCaller is used to create a class via a functions async, in another thread.
FutureTaskCaller is used to create a class via a functions async, in another thread.
For example:
.. code-block:: py
future = FutureCaller().call(func=max, func_cb=None, override_cls=None, 1, 2)
future = FutureTaskCaller().call(func=max, func_cb=None, override_cls=None, 1, 2)
print('Running other code')
print(future.result()) # will print '2'
"""
__slots__ = ('__object', '__object_cls', '__executor')
__slots__ = ('__object', '__object_cls', '__executor', '__deferred_bkg_class')
@property
def __class__(self):
@ -25,17 +98,30 @@ class FutureCaller(object):
"""
__init__(*args, **kwargs) in another thread
:return: This FutureCaller instance
:return: This FutureTaskCaller instance
"""
self.__object = None
self.__object_cls = override_cls
self.__deferred_bkg_class = _DeferredClass(self, "get_logger")
self.__executor = Thread(target=self.__submit__, args=(func, func_cb, args, kwargs))
self.__executor.daemon = True
self.__executor.start()
def __submit__(self, fn, fn_cb, args, kwargs):
self.__object = fn(*args, **kwargs)
# background initialization call
_object = fn(*args, **kwargs)
# push all background calls (now that the initialization is complete)
if self.__deferred_bkg_class:
_deferred_bkg_class = self.__deferred_bkg_class
self.__deferred_bkg_class = None
# noinspection PyProtectedMember
_deferred_bkg_class._flush_into_logger(a_future_object=_object)
# store the initialized object
self.__object = _object
# callback function
if fn_cb is not None:
fn_cb(self.__object)
@ -45,8 +131,9 @@ class FutureCaller(object):
def __setattr__(self, item, value):
# make sure we can set the slots
if item in ["_FutureCaller__executor", "_FutureCaller__object", "_FutureCaller__object_cls"]:
return super(FutureCaller, self).__setattr__(item, value)
if item in ["_FutureTaskCaller__executor", "_FutureTaskCaller__object",
"_FutureTaskCaller__object_cls", "_FutureTaskCaller__deferred_bkg_class"]:
return super(FutureTaskCaller, self).__setattr__(item, value)
setattr(self.__result__(), item, value)
@ -61,6 +148,29 @@ class FutureCaller(object):
:return: The result of the called function
"""
if self.__executor:
self.__executor.join(timeout=timeout)
# since the test is not atomic, we assume that if we failed joining
# it is because someone else joined before us
# noinspection PyBroadException
try:
self.__executor.join(timeout=timeout)
except RuntimeError:
# this is probably calling ourselves from the same thread
raise
except Exception:
# wait until that someone else updated the __object
while self.__object is None:
sleep(1)
self.__executor = None
return self.__object
# This is the part where we are no longer generic, but __slots__
# inheritance is too cumbersome to actually inherit and make sure it works optimally
def get_logger(self):
if self.__object is not None:
return self.__object.get_logger()
if self.__deferred_bkg_class is None:
# we are shutting down, wait until object is available
return self.__result__().get_logger()
return self.__deferred_bkg_class