Add support for deferred configuration

This commit is contained in:
allegroai
2021-09-09 22:00:40 +03:00
parent c85c05ef6a
commit 8ebf9be604
16 changed files with 337 additions and 199 deletions

View File

@@ -176,9 +176,11 @@ class PrintPatchLogger(object):
patched = False
lock = threading.Lock()
recursion_protect_lock = threading.RLock()
cr_flush_period = config.get("development.worker.console_cr_flush_period", 0)
cr_flush_period = None
def __init__(self, stream, logger=None, level=logging.INFO):
if self.__class__.cr_flush_period is None:
self.__class__.cr_flush_period = config.get("development.worker.console_cr_flush_period", 0)
PrintPatchLogger.patched = True
self._terminal = stream
self._log = logger

View File

@@ -12,7 +12,7 @@ from PIL import Image
from six.moves.urllib.parse import urlparse, urlunparse
from ...backend_api.services import events
from ...config import config
from ...config import config, deferred_config
from ...storage.util import quote_url
from ...utilities.attrs import attrs
from ...utilities.process.mp import SingletonLock
@@ -196,14 +196,17 @@ class ImageEventNoUpload(MetricsEventAdapter):
class UploadEvent(MetricsEventAdapter):
""" Image event adapter """
_format = '.' + str(config.get('metrics.images.format', 'JPEG')).upper().lstrip('.')
_quality = int(config.get('metrics.images.quality', 87))
_subsampling = int(config.get('metrics.images.subsampling', 0))
_format = deferred_config(
'metrics.images.format', 'JPEG',
transform=lambda x: '.' + str(x).upper().lstrip('.')
)
_quality = deferred_config('metrics.images.quality', 87, transform=int)
_subsampling = deferred_config('metrics.images.subsampling', 0, transform=int)
_file_history_size = deferred_config('metrics.file_history_size', 5, transform=int)
_upload_retries = 3
_metric_counters = {}
_metric_counters_lock = SingletonLock()
_file_history_size = int(config.get('metrics.file_history_size', 5))
@staticmethod
def _replace_slash(part):

View File

@@ -9,7 +9,7 @@ from pathlib2 import Path
from ...backend_api.services import events as api_events
from ..base import InterfaceBase
from ...config import config
from ...config import config, deferred_config
from ...debugging import get_logger
from ...storage.helper import StorageHelper
@@ -17,18 +17,19 @@ from .events import MetricsEventAdapter
from ...utilities.process.mp import SingletonLock
log = get_logger('metrics')
class Metrics(InterfaceBase):
""" Metrics manager and batch writer """
_storage_lock = SingletonLock()
_file_upload_starvation_warning_sec = config.get('network.metrics.file_upload_starvation_warning_sec', None)
_file_upload_starvation_warning_sec = deferred_config('network.metrics.file_upload_starvation_warning_sec', None)
_file_upload_retries = 3
_upload_pool = None
_file_upload_pool = None
__offline_filename = 'metrics.jsonl'
@classmethod
def _get_logger(cls):
return get_logger('metrics')
@property
def storage_key_prefix(self):
return self._storage_key_prefix
@@ -42,7 +43,7 @@ class Metrics(InterfaceBase):
storage_uri = storage_uri or self._storage_uri
return StorageHelper.get(storage_uri)
except Exception as e:
log.error('Failed getting storage helper for %s: %s' % (storage_uri, str(e)))
self._get_logger().error('Failed getting storage helper for %s: %s' % (storage_uri, str(e)))
finally:
self._storage_lock.release()
@@ -153,7 +154,7 @@ class Metrics(InterfaceBase):
if e:
entries.append(e)
except Exception as ex:
log.warning(str(ex))
self._get_logger().warning(str(ex))
events.remove(ev)
# upload the needed files
@@ -171,7 +172,7 @@ class Metrics(InterfaceBase):
url = storage.upload_from_stream(e.stream, e.url, retries=retries)
e.event.update(url=url)
except Exception as exp:
log.warning("Failed uploading to {} ({})".format(
self._get_logger().warning("Failed uploading to {} ({})".format(
upload_uri if upload_uri else "(Could not calculate upload uri)",
exp,
))
@@ -196,15 +197,16 @@ class Metrics(InterfaceBase):
t_f, t_u, t_ref = \
(self._file_related_event_time, self._file_upload_time, self._file_upload_starvation_warning_sec)
if t_f and t_u and t_ref and (t_f - t_u) > t_ref:
log.warning('Possible metrics file upload starvation: '
'files were not uploaded for {} seconds'.format(t_ref))
self._get_logger().warning(
'Possible metrics file upload starvation: '
'files were not uploaded for {} seconds'.format(t_ref))
# send the events in a batched request
good_events = [ev for ev in events if ev.upload_exception is None]
error_events = [ev for ev in events if ev.upload_exception is not None]
if error_events:
log.error("Not uploading {}/{} events because the data upload failed".format(
self._get_logger().error("Not uploading {}/{} events because the data upload failed".format(
len(error_events),
len(events),
))

View File

@@ -167,6 +167,9 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan
"""
log = metrics.log.getChild('reporter')
log.setLevel(log.level)
if self.__class__.max_float_num_digits is -1:
self.__class__.max_float_num_digits = config.get('metrics.plot_max_num_digits', None)
super(Reporter, self).__init__(session=metrics.session, log=log)
self._metrics = metrics
self._bucket_config = None
@@ -185,7 +188,7 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan
self._report_service.set_storage_uri(self._storage_uri)
storage_uri = property(None, _set_storage_uri)
max_float_num_digits = config.get('metrics.plot_max_num_digits', None)
max_float_num_digits = -1
@property
def async_enable(self):

View File

@@ -1,4 +1,4 @@
from ....config import config
from ....config import config, deferred_config
class TaskStopReason(object):
@@ -8,7 +8,7 @@ class TaskStopReason(object):
class TaskStopSignal(object):
enabled = bool(config.get('development.support_stopping', False))
enabled = deferred_config('development.support_stopping', False, transform=bool)
_number_of_consecutive_reset_tests = 4

View File

@@ -3,7 +3,7 @@ from threading import Thread, Event
from time import time
from ....config import config
from ....config import config, deferred_config
from ....backend_interface.task.development.stop_signal import TaskStopSignal
from ....backend_api.services import tasks
@@ -11,9 +11,13 @@ from ....backend_api.services import tasks
class DevWorker(object):
prefix = attr.ib(type=str, default="MANUAL:")
report_period = float(max(config.get('development.worker.report_period_sec', 30.), 1.))
report_stdout = bool(config.get('development.worker.log_stdout', True))
ping_period = float(max(config.get('development.worker.ping_period_sec', 30.), 1.))
report_stdout = deferred_config('development.worker.log_stdout', True)
report_period = deferred_config(
'development.worker.report_period_sec', 30.,
transform=lambda x: float(max(x, 1.0)))
ping_period = deferred_config(
'development.worker.ping_period_sec', 30.,
transform=lambda x: float(max(x, 1.0)))
def __init__(self):
self._dev_stop_signal = None

View File

@@ -18,8 +18,6 @@ from ....config.defs import (
from ....debugging import get_logger
from .util import get_command_output
_logger = get_logger("Repository Detection")
class DetectionError(Exception):
pass
@@ -53,6 +51,10 @@ class Detector(object):
_fallback = '_fallback'
_remote = '_remote'
@classmethod
def _get_logger(cls):
return get_logger("Repository Detection")
@attr.s
class Commands(object):
"""" Repository information as queried by a detector """
@@ -93,9 +95,9 @@ class Detector(object):
return get_command_output(fallback_command, path, strip=strip)
except (CalledProcessError, UnicodeDecodeError):
pass
_logger.warning("Can't get {} information for {} repo in {}".format(name, self.type_name, path))
self._get_logger().warning("Can't get {} information for {} repo in {}".format(name, self.type_name, path))
# full details only in debug
_logger.debug(
self._get_logger().debug(
"Can't get {} information for {} repo in {}: {}".format(
name, self.type_name, path, str(ex)
)
@@ -180,7 +182,7 @@ class Detector(object):
== 0
)
except CalledProcessError:
_logger.warning("Can't get {} status".format(self.type_name))
self._get_logger().warning("Can't get {} status".format(self.type_name))
except (OSError, EnvironmentError, IOError):
# File not found or can't be executed
pass

View File

@@ -13,26 +13,30 @@ from threading import Thread, Event
from .util import get_command_output, remove_user_pass_from_url
from ....backend_api import Session
from ....config import config
from ....config import config, deferred_config
from ....debugging import get_logger
from .detectors import GitEnvDetector, GitDetector, HgEnvDetector, HgDetector, Result as DetectionResult
_logger = get_logger("Repository Detection")
class ScriptInfoError(Exception):
pass
class ScriptRequirements(object):
_detailed_import_report = deferred_config('development.detailed_import_report', False)
_max_requirements_size = 512 * 1024
_packages_remove_version = ('setuptools', )
_ignore_packages = set()
@classmethod
def _get_logger(cls):
return get_logger("Repository Detection")
def __init__(self, root_folder):
self._root_folder = root_folder
def get_requirements(self, entry_point_filename=None, add_missing_installed_packages=False):
def get_requirements(self, entry_point_filename=None, add_missing_installed_packages=False,
detailed_req_report=None):
# noinspection PyBroadException
try:
from ....utilities.pigar.reqs import get_installed_pkgs_detail
@@ -48,9 +52,9 @@ class ScriptRequirements(object):
for k in guess:
if k not in reqs:
reqs[k] = guess[k]
return self.create_requirements_txt(reqs, local_pks)
return self.create_requirements_txt(reqs, local_pks, detailed=detailed_req_report)
except Exception as ex:
_logger.warning("Failed auto-generating package requirements: {}".format(ex))
self._get_logger().warning("Failed auto-generating package requirements: {}".format(ex))
return '', ''
@staticmethod
@@ -115,8 +119,11 @@ class ScriptRequirements(object):
return modules
@staticmethod
def create_requirements_txt(reqs, local_pks=None):
def create_requirements_txt(reqs, local_pks=None, detailed=None):
# write requirements.txt
if detailed is None:
detailed = ScriptRequirements._detailed_import_report
# noinspection PyBroadException
try:
conda_requirements = ''
@@ -193,30 +200,31 @@ class ScriptRequirements(object):
for k in sorted(forced_packages.keys()):
requirements_txt += ScriptRequirements._make_req_line(k, forced_packages.get(k))
requirements_txt_packages_only = \
requirements_txt + '\n# Skipping detailed import analysis, it is too large\n'
if detailed:
requirements_txt_packages_only = \
requirements_txt + '\n# Skipping detailed import analysis, it is too large\n'
# requirements details (in comments)
requirements_txt += '\n' + \
'# Detailed import analysis\n' \
'# **************************\n'
# requirements details (in comments)
requirements_txt += '\n' + \
'# Detailed import analysis\n' \
'# **************************\n'
if local_pks:
for k, v in local_pks.sorted_items():
if local_pks:
for k, v in local_pks.sorted_items():
requirements_txt += '\n'
requirements_txt += '# IMPORT LOCAL PACKAGE {0}\n'.format(k)
requirements_txt += ''.join(['# {0}\n'.format(c) for c in v.comments.sorted_items()])
for k, v in reqs.sorted_items():
if not v:
continue
requirements_txt += '\n'
requirements_txt += '# IMPORT LOCAL PACKAGE {0}\n'.format(k)
if k == '-e':
requirements_txt += '# IMPORT PACKAGE {0} {1}\n'.format(k, v.version)
else:
requirements_txt += '# IMPORT PACKAGE {0}\n'.format(k)
requirements_txt += ''.join(['# {0}\n'.format(c) for c in v.comments.sorted_items()])
for k, v in reqs.sorted_items():
if not v:
continue
requirements_txt += '\n'
if k == '-e':
requirements_txt += '# IMPORT PACKAGE {0} {1}\n'.format(k, v.version)
else:
requirements_txt += '# IMPORT PACKAGE {0}\n'.format(k)
requirements_txt += ''.join(['# {0}\n'.format(c) for c in v.comments.sorted_items()])
# make sure we do not exceed the size a size limit
return (requirements_txt if len(requirements_txt) < ScriptRequirements._max_requirements_size
else requirements_txt_packages_only,
@@ -252,7 +260,11 @@ class _JupyterObserver(object):
_sample_frequency = 30.
_first_sample_frequency = 3.
_jupyter_history_logger = None
_store_notebook_artifact = config.get('development.store_jupyter_notebook_artifact', True)
_store_notebook_artifact = deferred_config('development.store_jupyter_notebook_artifact', True)
@classmethod
def _get_logger(cls):
return get_logger("Repository Detection")
@classmethod
def observer(cls, jupyter_notebook_filename, log_history):
@@ -296,7 +308,7 @@ class _JupyterObserver(object):
from nbconvert.exporters.script import ScriptExporter
_script_exporter = ScriptExporter()
except Exception as ex:
_logger.warning('Could not read Jupyter Notebook: {}'.format(ex))
cls._get_logger().warning('Could not read Jupyter Notebook: {}'.format(ex))
return
# load pigar
# noinspection PyBroadException
@@ -486,6 +498,10 @@ class ScriptInfo(object):
plugins = [GitEnvDetector(), HgEnvDetector(), HgDetector(), GitDetector()]
""" Script info detection plugins, in order of priority """
@classmethod
def _get_logger(cls):
return get_logger("Repository Detection")
@classmethod
def _jupyter_install_post_store_hook(cls, jupyter_notebook_filename, log_history=False):
# noinspection PyBroadException
@@ -542,7 +558,7 @@ class ScriptInfo(object):
from ....config import config
password = config.get('development.jupyter_server_password', '')
if not password:
_logger.warning(
cls._get_logger().warning(
'Password protected Jupyter Notebook server was found! '
'Add `sdk.development.jupyter_server_password=<jupyter_password>` to ~/clearml.conf')
return os.path.join(os.getcwd(), 'error_notebook_not_found.py')
@@ -575,7 +591,7 @@ class ScriptInfo(object):
try:
r.raise_for_status()
except Exception as ex:
_logger.warning('Failed accessing the jupyter server{}: {}'.format(
cls._get_logger().warning('Failed accessing the jupyter server{}: {}'.format(
' [password={}]'.format(password) if server_info.get('password') else '', ex))
return os.path.join(os.getcwd(), 'error_notebook_not_found.py')
@@ -630,7 +646,7 @@ class ScriptInfo(object):
if entry_point_alternative.exists():
entry_point = entry_point_alternative
except Exception as ex:
_logger.warning('Failed accessing jupyter notebook {}: {}'.format(notebook_path, ex))
cls._get_logger().warning('Failed accessing jupyter notebook {}: {}'.format(notebook_path, ex))
# get local ipynb for observer
local_ipynb_file = entry_point.as_posix()
@@ -714,16 +730,18 @@ class ScriptInfo(object):
@classmethod
def _get_script_info(
cls, filepaths, check_uncommitted=True, create_requirements=True, log=None,
uncommitted_from_remote=False, detect_jupyter_notebook=True, add_missing_installed_packages=False):
uncommitted_from_remote=False, detect_jupyter_notebook=True,
add_missing_installed_packages=False, detailed_req_report=None):
jupyter_filepath = cls._get_jupyter_notebook_filename() if detect_jupyter_notebook else None
if jupyter_filepath:
scripts_path = [Path(os.path.normpath(jupyter_filepath)).absolute()]
else:
cwd = cls._cwd()
scripts_path = [Path(cls._absolute_path(os.path.normpath(f), cwd)) for f in filepaths if f]
if all(not f.is_file() for f in scripts_path):
scripts_path = [f for f in scripts_path if f.exists()]
if not scripts_path:
raise ScriptInfoError(
"Script file {} could not be found".format(scripts_path)
"Script file {} could not be found".format(filepaths)
)
scripts_dir = [f.parent for f in scripts_path]
@@ -737,10 +755,11 @@ class ScriptInfo(object):
)
)
plugin = next((p for p in cls.plugins if any(p.exists(d) for d in scripts_dir)), None)
repo_info = DetectionResult()
script_dir = scripts_dir[0]
script_path = scripts_path[0]
plugin = next((p for p in cls.plugins if p.exists(script_dir)), None)
repo_info = DetectionResult()
messages = []
auxiliary_git_diff = None
@@ -749,13 +768,8 @@ class ScriptInfo(object):
log.info("No repository found, storing script code instead")
else:
try:
for i, d in enumerate(scripts_dir):
repo_info = plugin.get_info(
str(d), include_diff=check_uncommitted, diff_from_remote=uncommitted_from_remote)
if not repo_info.is_empty():
script_dir = d
script_path = scripts_path[i]
break
repo_info = plugin.get_info(
str(script_dir), include_diff=check_uncommitted, diff_from_remote=uncommitted_from_remote)
except SystemExit:
raise
except Exception as ex:
@@ -799,7 +813,9 @@ class ScriptInfo(object):
requirements, conda_requirements = script_requirements.get_requirements(
entry_point_filename=script_path.as_posix()
if not repo_info.url and script_path.is_file() else None,
add_missing_installed_packages=add_missing_installed_packages)
add_missing_installed_packages=add_missing_installed_packages,
detailed_req_report=detailed_req_report,
)
else:
script_requirements = None
@@ -831,7 +847,8 @@ class ScriptInfo(object):
@classmethod
def get(cls, filepaths=None, check_uncommitted=True, create_requirements=True, log=None,
uncommitted_from_remote=False, detect_jupyter_notebook=True, add_missing_installed_packages=False):
uncommitted_from_remote=False, detect_jupyter_notebook=True, add_missing_installed_packages=False,
detailed_req_report=None):
try:
if not filepaths:
filepaths = [sys.argv[0], ]
@@ -842,6 +859,7 @@ class ScriptInfo(object):
uncommitted_from_remote=uncommitted_from_remote,
detect_jupyter_notebook=detect_jupyter_notebook,
add_missing_installed_packages=add_missing_installed_packages,
detailed_req_report=detailed_req_report,
)
except SystemExit:
pass

View File

@@ -47,7 +47,7 @@ from ..util import (
exact_match_regex, mutually_exclusive, )
from ...config import (
get_config_for_bucket, get_remote_task_id, TASK_ID_ENV_VAR,
running_remotely, get_cache_dir, DOCKER_IMAGE_ENV_VAR, get_offline_dir, get_log_to_backend, )
running_remotely, get_cache_dir, DOCKER_IMAGE_ENV_VAR, get_offline_dir, get_log_to_backend, deferred_config, )
from ...debugging import get_logger
from ...storage.helper import StorageHelper, StorageError
from .access import AccessMixin
@@ -70,12 +70,11 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
_force_requirements = {}
_ignore_requirements = set()
_store_diff = config.get('development.store_uncommitted_code_diff', False)
_store_remote_diff = config.get('development.store_code_diff_from_remote', False)
_report_subprocess_enabled = config.get('development.report_use_subprocess', sys.platform == 'linux')
_force_use_pip_freeze = \
config.get('development.detect_with_pip_freeze', False) or \
config.get('development.detect_with_conda_freeze', False)
_store_diff = deferred_config('development.store_uncommitted_code_diff', False)
_store_remote_diff = deferred_config('development.store_code_diff_from_remote', False)
_report_subprocess_enabled = deferred_config('development.report_use_subprocess', sys.platform == 'linux')
_force_use_pip_freeze = deferred_config(multi=[('development.detect_with_pip_freeze', False),
('development.detect_with_conda_freeze', False)])
_offline_filename = 'task.json'
class TaskTypes(Enum):
@@ -1815,6 +1814,12 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
return True
def _get_runtime_properties(self):
# type: () -> Mapping[str, str]
if not Session.check_min_api_version('2.13'):
return dict()
return dict(**self.data.runtime) if self.data.runtime else dict()
def _clear_task(self, system_tags=None, comment=None):
# type: (Optional[Sequence[str]], Optional[str]) -> ()
self._data.script = tasks.Script(

View File

@@ -5,14 +5,70 @@ import sys
from os.path import expandvars, expanduser
from ..backend_api import load_config
from ..backend_config import Config
from ..backend_config.bucket_config import S3BucketConfigurations
from .defs import * # noqa: F403
from .remote import running_remotely_task_id as _running_remotely_task_id
config_obj = load_config(Path(__file__).parent) # noqa: F405
config_obj.initialize_logging()
config = config_obj.get("sdk")
# config_obj = load_config(Path(__file__).parent) # noqa: F405
# config_obj.initialize_logging()
# config = config_obj.get("sdk")
from ..utilities.proxy_object import LazyEvalWrapper
class ConfigWrapper(object):
_config = None
@classmethod
def _init(cls):
if cls._config is None:
cls._config = load_config(Path(__file__).parent) # noqa: F405
cls._config.initialize_logging()
@classmethod
def get(cls, *args, **kwargs):
cls._init()
return cls._config.get(*args, **kwargs)
@classmethod
def set_overrides(cls, *args, **kwargs):
cls._init()
return cls._config.set_overrides(*args, **kwargs)
class ConfigSDKWrapper(object):
_config_sdk = None
@classmethod
def _init(cls):
if cls._config_sdk is None:
cls._config_sdk = ConfigWrapper.get("sdk")
@classmethod
def get(cls, *args, **kwargs):
cls._init()
return cls._config_sdk.get(*args, **kwargs)
@classmethod
def set_overrides(cls, *args, **kwargs):
cls._init()
return cls._config_sdk.set_overrides(*args, **kwargs)
def deferred_config(key=None, default=Config._MISSING, transform=None, multi=None):
return LazyEvalWrapper(
callback=lambda:
(ConfigSDKWrapper.get(key, default) if not multi else
next(ConfigSDKWrapper.get(*a) for a in multi if ConfigSDKWrapper.get(*a)))
if transform is None
else (transform() if key is None else transform(ConfigSDKWrapper.get(key, default) if not multi else # noqa
next(ConfigSDKWrapper.get(*a) for a in multi if ConfigSDKWrapper.get(*a)))))
config_obj = ConfigWrapper
config = ConfigSDKWrapper
""" Configuration object reflecting the merged SDK section of all available configuration files """

View File

@@ -19,7 +19,7 @@ from .backend_interface.logger import StdStreamPatch
from .backend_interface.task import Task as _Task
from .backend_interface.task.log import TaskHandler
from .backend_interface.util import mutually_exclusive
from .config import running_remotely, get_cache_dir, config, DEBUG_SIMULATE_REMOTE_TASK
from .config import running_remotely, get_cache_dir, config, DEBUG_SIMULATE_REMOTE_TASK, deferred_config
from .errors import UsageError
from .storage.helper import StorageHelper
from .utilities.plotly_reporter import SeriesInfo
@@ -57,7 +57,7 @@ class Logger(object):
"""
SeriesInfo = SeriesInfo
_tensorboard_logging_auto_group_scalars = False
_tensorboard_single_series_per_graph = config.get('metrics.tensorboard_single_series_per_graph', False)
_tensorboard_single_series_per_graph = deferred_config('metrics.tensorboard_single_series_per_graph', False)
def __init__(self, private_task, connect_stdout=True, connect_stderr=True, connect_logging=False):
"""
@@ -192,19 +192,18 @@ class Logger(object):
You can view the vectors plots in the **ClearML Web-App (UI)**, **RESULTS** tab, **PLOTS** sub-tab.
:param str title: The title (metric) of the plot.
:param str series: The series name (variant) of the reported histogram.
:param list(float) values: The series values. A list of floats, or an N-dimensional Numpy array containing
:param title: The title (metric) of the plot.
:param series: The series name (variant) of the reported histogram.
:param values: The series values. A list of floats, or an N-dimensional Numpy array containing
data for each histogram bar.
:type values: list(float), numpy.ndarray
:param int iteration: The reported iteration / step. Each ``iteration`` creates another plot.
:param list(str) labels: Labels for each bar group, creating a plot legend labeling each series. (Optional)
:param list(str) xlabels: Labels per entry in each bucket in the histogram (vector), creating a set of labels
:param iteration: The reported iteration / step. Each ``iteration`` creates another plot.
:param labels: Labels for each bar group, creating a plot legend labeling each series. (Optional)
:param xlabels: Labels per entry in each bucket in the histogram (vector), creating a set of labels
for each histogram bar on the x-axis. (Optional)
:param str xaxis: The x-axis title. (Optional)
:param str yaxis: The y-axis title. (Optional)
:param str mode: Multiple histograms mode, stack / group / relative. Default is 'group'.
:param dict extra_layout: optional dictionary for layout configuration, passed directly to plotly
:param xaxis: The x-axis title. (Optional)
:param yaxis: The y-axis title. (Optional)
:param mode: Multiple histograms mode, stack / group / relative. Default is 'group'.
:param extra_layout: optional dictionary for layout configuration, passed directly to plotly
example: extra_layout={'xaxis': {'type': 'date', 'range': ['2020-01-01', '2020-01-31']}}
"""
self._touch_title_series(title, series)
@@ -239,19 +238,18 @@ class Logger(object):
You can view the reported histograms in the **ClearML Web-App (UI)**, **RESULTS** tab, **PLOTS** sub-tab.
:param str title: The title (metric) of the plot.
:param str series: The series name (variant) of the reported histogram.
:param list(float) values: The series values. A list of floats, or an N-dimensional Numpy array containing
:param title: The title (metric) of the plot.
:param series: The series name (variant) of the reported histogram.
:param values: The series values. A list of floats, or an N-dimensional Numpy array containing
data for each histogram bar.
:type values: list(float), numpy.ndarray
:param int iteration: The reported iteration / step. Each ``iteration`` creates another plot.
:param list(str) labels: Labels for each bar group, creating a plot legend labeling each series. (Optional)
:param list(str) xlabels: Labels per entry in each bucket in the histogram (vector), creating a set of labels
:param iteration: The reported iteration / step. Each ``iteration`` creates another plot.
:param labels: Labels for each bar group, creating a plot legend labeling each series. (Optional)
:param xlabels: Labels per entry in each bucket in the histogram (vector), creating a set of labels
for each histogram bar on the x-axis. (Optional)
:param str xaxis: The x-axis title. (Optional)
:param str yaxis: The y-axis title. (Optional)
:param str mode: Multiple histograms mode, stack / group / relative. Default is 'group'.
:param dict extra_layout: optional dictionary for layout configuration, passed directly to plotly
:param xaxis: The x-axis title. (Optional)
:param yaxis: The y-axis title. (Optional)
:param mode: Multiple histograms mode, stack / group / relative. Default is 'group'.
:param extra_layout: optional dictionary for layout configuration, passed directly to plotly
example: extra_layout={'xaxis': {'type': 'date', 'range': ['2020-01-01', '2020-01-31']}}
"""
@@ -307,18 +305,14 @@ class Logger(object):
You can view the reported tables in the **ClearML Web-App (UI)**, **RESULTS** tab, **PLOTS** sub-tab.
:param str title: The title (metric) of the table.
:param str series: The series name (variant) of the reported table.
:param int iteration: The reported iteration / step.
:param title: The title (metric) of the table.
:param series: The series name (variant) of the reported table.
:param iteration: The reported iteration / step.
:param table_plot: The output table plot object
:type table_plot: pandas.DataFrame or Table as list of rows (list)
:param csv: path to local csv file
:type csv: str
:param url: A URL to the location of csv file.
:type url: str
:param extra_layout: optional dictionary for layout configuration, passed directly to plotly
example: extra_layout={'xaxis': {'type': 'date', 'range': ['2020-01-01', '2020-01-31']}}
:type extra_layout: dict
"""
mutually_exclusive(
UsageError, _check_none=True,
@@ -769,7 +763,7 @@ class Logger(object):
"""
For explicit reporting, report an image and upload its contents.
This method uploads the image to a preconfigured bucket (see :meth:`Logger.setup_upload`) with a key (filename)
This method uploads the image to a preconfigured bucket (see :meth:`Logger.set_default_upload_destination`)
describing the task ID, title, series and iteration.
For example:
@@ -790,22 +784,20 @@ class Logger(object):
- ``image``
- ``matrix``
:param str title: The title (metric) of the image.
:param str series: The series name (variant) of the reported image.
:param int iteration: The reported iteration / step.
:param str local_path: A path to an image file.
:param str url: A URL for the location of a pre-uploaded image.
:param title: The title (metric) of the image.
:param series: The series name (variant) of the reported image.
:param iteration: The reported iteration / step.
:param local_path: A path to an image file.
:param url: A URL for the location of a pre-uploaded image.
:param image: Image data (RGB).
:type image: numpy.ndarray, PIL.Image.Image
:param numpy.ndarray matrix: Image data (RGB).
:param matrix: Deperacted, Image data (RGB).
.. note::
The ``matrix`` paramater is deprecated. Use the ``image`` parameters.
:type matrix: 3D numpy.ndarray
:param int max_image_history: The maximum number of images to store per metric/variant combination.
The ``matrix`` parameter is deprecated. Use the ``image`` parameters.
:param max_image_history: The maximum number of images to store per metric/variant combination.
For an unlimited number, use a negative value. The default value is set in global configuration
(default=``5``).
:param bool delete_after_upload: After the upload, delete the local copy of the image
:param delete_after_upload: After the upload, delete the local copy of the image
The values are:
@@ -1147,7 +1139,6 @@ class Logger(object):
- ``True`` - Scalars without specific titles are grouped together in the "Scalars" plot, preserving
backward compatibility with ClearML automagical behavior.
- ``False`` - TensorBoard scalars without titles get a title/series with the same tag. (default)
:type group_scalars: bool
"""
cls._tensorboard_logging_auto_group_scalars = group_scalars
@@ -1165,7 +1156,6 @@ class Logger(object):
- ``True`` - Generate a separate plot for each TensorBoard scalar series.
- ``False`` - Group the TensorBoard scalar series together in the same plot. (default)
:type single_series: bool
"""
cls._tensorboard_single_series_per_graph = single_series
@@ -1205,11 +1195,10 @@ class Logger(object):
"""
print text to log (same as print to console, and also prints to console)
:param str msg: text to print to the console (always send to the backend and displayed in console)
:param msg: text to print to the console (always send to the backend and displayed in console)
:param level: logging level, default: logging.INFO
:type level: Logging Level
:param bool omit_console: Omit the console output, and only send the ``msg`` value to the log
:param bool force_send: Report with an explicit log level. Only supported if ``omit_console`` is True
:param omit_console: Omit the console output, and only send the ``msg`` value to the log
:param force_send: Report with an explicit log level. Only supported if ``omit_console`` is True
- ``True`` - Omit the console output.
- ``False`` - Print the console output. (default)
@@ -1271,24 +1260,17 @@ class Logger(object):
"""
Report an image, upload its contents, and present in plots section using plotly
Image is uploaded to a preconfigured bucket (see :meth:`Logger.setup_upload`) with a key (filename)
Image is uploaded to a preconfigured bucket (see :meth:`Logger.set_default_upload_destination`)
describing the task ID, title, series and iteration.
:param title: Title (AKA metric)
:type title: str
:param series: Series (AKA variant)
:type series: str
:param iteration: Iteration number
:type iteration: int
:param path: A path to an image file. Required unless matrix is provided.
:type path: str
:param matrix: A 3D numpy.ndarray object containing image data (RGB). Required unless filename is provided.
:type matrix: np.array
:param max_image_history: maximum number of image to store per metric/variant combination \
use negative value for unlimited. default is set in global configuration (default=5)
:type max_image_history: int
:param delete_after_upload: if True, one the file was uploaded the local copy will be deleted
:type delete_after_upload: boolean
"""
# if task was not started, we have to start it
@@ -1326,22 +1308,16 @@ class Logger(object):
"""
Upload a file and report it as link in the debug images section.
File is uploaded to a preconfigured storage (see :meth:`Logger.setup_upload`) with a key (filename)
File is uploaded to a preconfigured storage (see :meth:`Loggerset_default_upload_destination`)
describing the task ID, title, series and iteration.
:param title: Title (AKA metric)
:type title: str
:param series: Series (AKA variant)
:type series: str
:param iteration: Iteration number
:type iteration: int
:param path: A path to file to be uploaded
:type path: str
:param max_file_history: maximum number of files to store per metric/variant combination \
use negative value for unlimited. default is set in global configuration (default=5)
:type max_file_history: int
:param delete_after_upload: if True, one the file was uploaded the local copy will be deleted
:type delete_after_upload: boolean
"""
# if task was not started, we have to start it

View File

@@ -6,13 +6,13 @@ from pathlib2 import Path
from .helper import StorageHelper
from .util import quote_url
from ..config import get_cache_dir, config
from ..config import get_cache_dir, deferred_config
from ..debugging.log import LoggerRoot
class CacheManager(object):
__cache_managers = {}
_default_cache_file_limit = config.get("storage.cache.default_cache_manager_size", 100)
_default_cache_file_limit = deferred_config("storage.cache.default_cache_manager_size", 100)
_storage_manager_folder = "storage_manager"
_default_context = "global"
_local_to_remote_url_lookup = OrderedDict()
@@ -155,7 +155,8 @@ class CacheManager(object):
cache_context = cache_context or cls._default_context
if cache_context not in cls.__cache_managers:
cls.__cache_managers[cache_context] = cls.CacheContext(
cache_context, cache_file_limit or cls._default_cache_file_limit
cache_context,
cache_file_limit or cls._default_cache_file_limit
)
if cache_file_limit:
cls.__cache_managers[cache_context].set_cache_limit(cache_file_limit)

View File

@@ -34,19 +34,10 @@ from .callbacks import UploadProgressReport, DownloadProgressReport
from .util import quote_url
from ..backend_api.utils import get_http_session_with_retry
from ..backend_config.bucket_config import S3BucketConfigurations, GSBucketConfigurations, AzureContainerConfigurations
from ..config import config
from ..config import config, deferred_config
from ..debugging import get_logger
from ..errors import UsageError
log = get_logger('storage')
level = config.get('storage.log.level', None)
if level:
try:
log.setLevel(level)
except (TypeError, ValueError):
log.error('invalid storage log level in configuration: %s' % level)
class StorageError(Exception):
pass
@@ -59,6 +50,10 @@ class DownloadError(Exception):
@six.add_metaclass(ABCMeta)
class _Driver(object):
@classmethod
def get_logger(cls):
return get_logger('storage')
@abstractmethod
def get_container(self, container_name, config=None, **kwargs):
pass
@@ -107,6 +102,10 @@ class StorageHelper(object):
"""
_temp_download_suffix = '.partially'
@classmethod
def _get_logger(cls):
return get_logger('storage')
@attrs
class _PathSubstitutionRule(object):
registered_prefix = attrib(type=str)
@@ -128,7 +127,7 @@ class StorageHelper(object):
)
if any(prefix is None for prefix in (rule.registered_prefix, rule.local_prefix)):
log.warning(
StorageHelper._get_logger().warning(
"Illegal substitution rule configuration '{}[{}]': {}".format(
cls.path_substitution_config,
index,
@@ -138,7 +137,7 @@ class StorageHelper(object):
continue
if all((rule.replace_windows_sep, rule.replace_linux_sep)):
log.warning(
StorageHelper._get_logger().warning(
"Only one of replace_windows_sep and replace_linux_sep flags may be set."
"'{}[{}]': {}".format(
cls.path_substitution_config,
@@ -190,11 +189,10 @@ class StorageHelper(object):
_upload_pool = None
# collect all bucket credentials that aren't empty (ignore entries with an empty key or secret)
_s3_configurations = S3BucketConfigurations.from_config(config.get('aws.s3', {}))
_gs_configurations = GSBucketConfigurations.from_config(config.get('google.storage', {}))
_azure_configurations = AzureContainerConfigurations.from_config(config.get('azure.storage', {}))
_path_substitutions = _PathSubstitutionRule.load_list_from_config()
_s3_configurations = deferred_config('aws.s3', {}, transform=S3BucketConfigurations.from_config)
_gs_configurations = deferred_config('google.storage', {}, transform=GSBucketConfigurations.from_config)
_azure_configurations = deferred_config('azure.storage', {}, transform=AzureContainerConfigurations.from_config)
_path_substitutions = deferred_config(transform=_PathSubstitutionRule.load_list_from_config)
@property
def log(self):
@@ -236,10 +234,10 @@ class StorageHelper(object):
try:
instance = cls(base_url=base_url, url=url, logger=logger, canonize_url=False, **kwargs)
except (StorageError, UsageError) as ex:
log.error(str(ex))
cls._get_logger().error(str(ex))
return None
except Exception as ex:
log.error("Failed creating storage object {} Reason: {}".format(
cls._get_logger().error("Failed creating storage object {} Reason: {}".format(
base_url or url, ex))
return None
@@ -264,7 +262,15 @@ class StorageHelper(object):
def __init__(self, base_url, url, key=None, secret=None, region=None, verbose=False, logger=None, retries=5,
**kwargs):
self._log = logger or log
level = config.get('storage.log.level', None)
if level:
try:
self._get_logger().setLevel(level)
except (TypeError, ValueError):
self._get_logger().error('invalid storage log level in configuration: %s' % level)
self._log = logger or self._get_logger()
self._verbose = verbose
self._retries = retries
self._extra = {}
@@ -856,7 +862,7 @@ class StorageHelper(object):
else:
folder_uri = '/'.join((_base_url, folder_uri))
log.debug('Upload destination {} amended to {} for registration purposes'.format(
cls._get_logger().debug('Upload destination {} amended to {} for registration purposes'.format(
prev_folder_uri, folder_uri))
else:
raise ValueError('folder_uri: {} does not start with base url: {}'.format(folder_uri, _base_url))
@@ -1055,7 +1061,8 @@ class _HttpDriver(_Driver):
container = self._containers[obj.container_name]
res = container.session.delete(obj.url, headers=container.get_headers(obj.url))
if res.status_code != requests.codes.ok:
log.warning('Failed deleting object %s (%d): %s' % (obj.object_name, res.status_code, res.text))
self._get_logger().warning('Failed deleting object %s (%d): %s' % (
obj.object_name, res.status_code, res.text))
return False
return True
@@ -1086,7 +1093,7 @@ class _HttpDriver(_Driver):
obj = self._get_download_object(obj)
p = Path(local_path)
if not overwrite_existing and p.is_file():
log.warning('failed saving after download: overwrite=False and file exists (%s)' % str(p))
self.get_logger().warning('failed saving after download: overwrite=False and file exists (%s)' % str(p))
return
length = 0
with p.open(mode='wb') as f:
@@ -1156,7 +1163,7 @@ class _Stream(object):
self.closed = True
raise StopIteration()
except Exception as ex:
log.error('Failed downloading: %s' % ex)
_Driver.get_logger().error('Failed downloading: %s' % ex)
else:
# in/out stream
try:
@@ -1210,10 +1217,9 @@ class _Stream(object):
class _Boto3Driver(_Driver):
""" Boto3 storage adapter (simple, enough for now) """
_max_multipart_concurrency = config.get('aws.boto3.max_multipart_concurrency', 16)
_min_pool_connections = 512
_pool_connections = config.get('aws.boto3.pool_connections', 512)
_max_multipart_concurrency = deferred_config('aws.boto3.max_multipart_concurrency', 16)
_pool_connections = deferred_config('aws.boto3.pool_connections', 512)
_stream_download_pool_connections = 128
_stream_download_pool = None
@@ -1297,7 +1303,7 @@ class _Boto3Driver(_Driver):
Callback=callback,
)
except Exception as ex:
log.error('Failed uploading: %s' % ex)
self.get_logger().error('Failed uploading: %s' % ex)
return False
return True
@@ -1310,7 +1316,7 @@ class _Boto3Driver(_Driver):
num_download_attempts=container.config.retries),
Callback=callback)
except Exception as ex:
log.error('Failed uploading: %s' % ex)
self.get_logger().error('Failed uploading: %s' % ex)
return False
return True
@@ -1344,7 +1350,7 @@ class _Boto3Driver(_Driver):
try:
a_obj.download_fileobj(a_stream, Callback=cb, Config=cfg)
except Exception as ex:
log.error('Failed downloading: %s' % ex)
(log or self.get_logger()).error('Failed downloading: %s' % ex)
a_stream.close()
import boto3.s3.transfer
@@ -1366,7 +1372,7 @@ class _Boto3Driver(_Driver):
import boto3.s3.transfer
p = Path(local_path)
if not overwrite_existing and p.is_file():
log.warning('failed saving after download: overwrite=False and file exists (%s)' % str(p))
self.get_logger().warning('failed saving after download: overwrite=False and file exists (%s)' % str(p))
return
container = self._containers[obj.container_name]
obj.download_file(str(p),
@@ -1526,7 +1532,7 @@ class _GoogleCloudStorageDriver(_Driver):
blob = container.bucket.blob(object_name)
blob.upload_from_file(iterator)
except Exception as ex:
log.error('Failed uploading: %s' % ex)
self.get_logger().error('Failed uploading: %s' % ex)
return False
return True
@@ -1535,7 +1541,7 @@ class _GoogleCloudStorageDriver(_Driver):
blob = container.bucket.blob(object_name)
blob.upload_from_filename(file_path)
except Exception as ex:
log.error('Failed uploading: %s' % ex)
self.get_logger().error('Failed uploading: %s' % ex)
return False
return True
@@ -1553,7 +1559,7 @@ class _GoogleCloudStorageDriver(_Driver):
except ImportError:
pass
name = getattr(object, "name", "")
log.warning("Failed deleting object {}: {}".format(name, ex))
self.get_logger().warning("Failed deleting object {}: {}".format(name, ex))
return False
return not object.exists()
@@ -1572,7 +1578,7 @@ class _GoogleCloudStorageDriver(_Driver):
try:
a_obj.download_to_file(a_stream)
except Exception as ex:
log.error('Failed downloading: %s' % ex)
self.get_logger().error('Failed downloading: %s' % ex)
a_stream.close()
# return iterable object
@@ -1585,7 +1591,7 @@ class _GoogleCloudStorageDriver(_Driver):
def download_object(self, obj, local_path, overwrite_existing=True, delete_on_failure=True, callback=None, **_):
p = Path(local_path)
if not overwrite_existing and p.is_file():
log.warning('failed saving after download: overwrite=False and file exists (%s)' % str(p))
self.get_logger().warning('failed saving after download: overwrite=False and file exists (%s)' % str(p))
return
obj.download_to_filename(str(p))
@@ -1664,9 +1670,9 @@ class _AzureBlobServiceStorageDriver(_Driver):
)
return True
except AzureHttpError as ex:
log.error('Failed uploading (Azure error): %s' % ex)
self.get_logger().error('Failed uploading (Azure error): %s' % ex)
except Exception as ex:
log.error('Failed uploading: %s' % ex)
self.get_logger().error('Failed uploading: %s' % ex)
return False
def upload_object(self, file_path, container, object_name, callback=None, extra=None, **kwargs):
@@ -1690,9 +1696,9 @@ class _AzureBlobServiceStorageDriver(_Driver):
)
return True
except AzureHttpError as ex:
log.error('Failed uploading (Azure error): %s' % ex)
self.get_logger().error('Failed uploading (Azure error): %s' % ex)
except Exception as ex:
log.error('Failed uploading: %s' % ex)
self.get_logger().error('Failed uploading: %s' % ex)
finally:
if stream:
stream.close()
@@ -1727,7 +1733,7 @@ class _AzureBlobServiceStorageDriver(_Driver):
container.name,
obj.blob_name
)
cb = DownloadProgressReport(total_size_mb, verbose, remote_path, log)
cb = DownloadProgressReport(total_size_mb, verbose, remote_path, self.get_logger())
blob = container.blob_service.get_blob_to_bytes(
container.name,
obj.blob_name,
@@ -1738,7 +1744,7 @@ class _AzureBlobServiceStorageDriver(_Driver):
def download_object(self, obj, local_path, overwrite_existing=True, delete_on_failure=True, callback=None, **_):
p = Path(local_path)
if not overwrite_existing and p.is_file():
log.warning('failed saving after download: overwrite=False and file exists (%s)' % str(p))
self.get_logger().warning('failed saving after download: overwrite=False and file exists (%s)' % str(p))
return
download_done = threading.Event()

View File

@@ -50,7 +50,7 @@ from .binding.hydra_bind import PatchHydra
from .binding.click_bind import PatchClick
from .config import (
config, DEV_TASK_NO_REUSE, get_is_master_node, DEBUG_SIMULATE_REMOTE_TASK, PROC_MASTER_ID_ENV_VAR,
DEV_DEFAULT_OUTPUT_URI, )
DEV_DEFAULT_OUTPUT_URI, deferred_config, )
from .config import running_remotely, get_remote_task_id
from .config.cache import SessionCache
from .debugging.log import LoggerRoot
@@ -134,9 +134,9 @@ class Task(_Task):
__main_task = None # type: Optional[Task]
__exit_hook = None
__forked_proc_main_pid = None
__task_id_reuse_time_window_in_hours = float(config.get('development.task_reuse_time_window_in_hours', 24.0))
__detect_repo_async = config.get('development.vcs_repo_detect_async', False)
__default_output_uri = DEV_DEFAULT_OUTPUT_URI.get() or config.get('development.default_output_uri', None)
__task_id_reuse_time_window_in_hours = deferred_config('development.task_reuse_time_window_in_hours', 24.0, float)
__detect_repo_async = deferred_config('development.vcs_repo_detect_async', False)
__default_output_uri = DEV_DEFAULT_OUTPUT_URI.get() or deferred_config('development.default_output_uri', None)
class _ConnectedParametersType(object):
argparse = "argument_parser"
@@ -1987,6 +1987,8 @@ class Task(_Task):
# Remove the development system tag
system_tags = [t for t in task.get_system_tags() if t != self._development_tag]
self.set_system_tags(system_tags)
# if we leave the Task out there, it makes sense to make it editable.
self.reset(force=True)
# leave this process.
if exit_process:

View File

@@ -316,6 +316,8 @@ def is_std_or_local_lib(name):
# if we got here, the loader failed on us, meaning this is definitely a module and not std
return False
if not module_info:
if name == '__builtin__':
return True
return False
mpath = module_info.origin
# this is std

View File

@@ -195,8 +195,10 @@ class WrapperBase(type):
return mtd(*args, **kwargs)
return method
typed_class = attrs.get('_base_class_')
for name in mcs._special_names:
attrs[name] = make_method(name)
if not typed_class or hasattr(typed_class, name):
attrs[name] = make_method(name)
overrides = attrs.get('__overrides__', [])
# overrides.extend(k for k, v in attrs.items() if isinstance(v, lazy))
@@ -223,14 +225,68 @@ class LazyEvalWrapper(six.with_metaclass(WrapperBase)):
def _remoteref(self):
func = object.__getattribute__(self, "_remote_reference")
if func in LazyEvalWrapper._remote_reference_calls:
if func and func in LazyEvalWrapper._remote_reference_calls:
LazyEvalWrapper._remote_reference_calls.remove(func)
return func() if callable(func) else func
def __getattribute__(self, attr):
if attr in ('__isabstractmethod__', ):
return None
if attr in ('_remoteref', '_remote_reference'):
return object.__getattribute__(self, attr)
return getattr(LazyEvalWrapper._load_object(self), attr)
def __setattr__(self, attr, value):
setattr(LazyEvalWrapper._load_object(self), attr, value)
def __delattr__(self, attr):
delattr(LazyEvalWrapper._load_object(self), attr)
def __nonzero__(self):
return bool(LazyEvalWrapper._load_object(self))
def __bool__(self):
return bool(LazyEvalWrapper._load_object(self))
@staticmethod
def _load_object(self):
obj = object.__getattribute__(self, "_wrapped")
if obj is None:
cb = object.__getattribute__(self, "_callback")
obj = cb()
object.__setattr__(self, '_wrapped', obj)
return obj
@classmethod
def trigger_all_remote_references(cls):
for func in cls._remote_reference_calls:
if callable(func):
func()
cls._remote_reference_calls = []
def lazy_eval_wrapper_spec_class(class_type):
class TypedLazyEvalWrapper(six.with_metaclass(WrapperBase)):
_base_class_ = class_type
__slots__ = ['_wrapped', '_callback', '__weakref__']
def __init__(self, callback):
object.__setattr__(self, '_wrapped', None)
object.__setattr__(self, '_callback', callback)
def __nonzero__(self):
return bool(LazyEvalWrapper._load_object(self))
def __bool__(self):
return bool(LazyEvalWrapper._load_object(self))
def __getattribute__(self, attr):
if attr == '__isabstractmethod__':
return None
if attr == '__class__':
return class_type
return getattr(LazyEvalWrapper._load_object(self), attr)
return TypedLazyEvalWrapper